驾驭风暴:RabbitMQ实战之旅——揭秘消息队列的无限潜能

本文最后更新于:6 个月前

破冰

  • 🔥 推荐阅读:
消息队列基础知识总结 | JavaGuide(Java面试 + 学习指南)
java消息队列基础和RabbitMQ相关概念 - 掘金 (juejin.cn)
Windows10上RabbitMQ安装和启动详细步骤 - 行业资讯 - 电子产品设计开发与电子技术学习交流! (52dianzi.com)

思维碰撞

基础知识介绍

小试牛刀(实操)

RabbitMQ

下载安装

🍝 RabbitMQ 官网:Documentation: Table of Contents — RabbitMQ

🍜 RabbitMQ 下载(如下图所示):

  • 下载地址:Downloads - Erlang/OTP
  • 因为 RabbitMQ 服务端是使用并发式语言 Erlang 编写的,安装 Rabbit MQ 的前提是安装 Erlang

image-20231025224213573

🍖 Erlang 下载(如下图所示):

image-20231025224007271

安装监控面板

  • sbin 目录下执行以下命令,安装 rabbitmq 监控面板(2023/10/25晚)
1
rabbitmq-plugins enable rabbitmq_management

image-20231025224956930

启动

  • sbin 目录下执行以下命令,启动 rabbitmq
1
rabbitmq-server.bat
  • 可以在任务管理器处检查 RabbitMQ 服务是否正常启动

image-20231025220542254

登入监控面板

  • 访问 http://localhost:15672,输入初始账号密码,登入监控面板
默认账号:guest
默认密码:guest

image-20231025225355025

SpringBoot 集成 RabbitMQ

简单的 demo 演示

导入依赖

🥣 RabbitMQ 的相关依赖坐标可以在 mvn 中找到,依赖坐标可以在官方文档中查到:

image-20231026130836071


image-20231026131014940

  • MVN 仓库中查找到依赖并导入:
🍖 Maven Repository: com.rabbitmq » amqp-client » 5.19.0 (mvnrepository.com)
1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.19.0</version>
</dependency>

简单的 Producer / Receiver

🍟 定义简单的生产者和消费者,完成 demo 代码的编写。当然,这部分代码也可以在官方文档中找到

🍛

生产消息 demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 消息生产者
*/
public class MessageProducer {

private final static String QUEUE_NAME = "hello3";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}

消费消息 demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 消息消费者
*/
public class MessageReceiver {

private final static String QUEUE_NAME = "hello3";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}

效果展示

🍿 启动 MessageProducer 分别生产 hello、hello1、hello2 三条消息

🌭 再启动 MessageReceiver 分别消费 hello、hello2 两条消息

image-20231026160932226

  • RabbitMQ 监控面板中的显示效果如下:(2023/10/26午)

image-20231026160619142

Work Queue 多消费者

🍖 推荐阅读:RabbitMQ tutorial - Work Queues — RabbitMQ

  • 生产者 MultiMesProducer(2024/01/15午)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MultiMesProducer {

private static final String TASK_QUEUE_NAME = "multi_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 队列持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

// 发送多条消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
// 消息持久化
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
  • 消费者 MultiMesReceiver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class MultiMesReceiver {

private static final String TASK_QUEUE_NAME = "multi_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

final Connection connection = factory.newConnection();

for (int i = 0; i < 2; i++) {
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 控制单个消费者的处理任务积压数
channel.basicQos(1);

int finalI = i;
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");

try {
// 处理工作
System.out.println(" [x] Received '" + "编号:" + finalI + ":" + message + "'");
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 开启消费监听
channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> {
});
}
}
}
  • 开启生产者,生产六条消息:

image-20240115165134585

  • 开启消费者(两个),各每隔两秒消费一条消息:

image-20240115165250014

Publish / Subcribe 发布订阅

  • 生产者 FanoutProducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class FanoutProducer {
private static final String EXCHANGE_NAME = "fanout-exchange";

public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 创建交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
} catch (TimeoutException | IOException e) {
throw new RuntimeException(e);
}
}
}
  • 消费者 FanoutReceiver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class FanoutConsumer {
private static final String EXCHANGE_NAME = "fanout-exchange";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel1 = connection.createChannel();
Channel channel2 = connection.createChannel();
// 声明交换机
channel1.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 创建队列,随机分配一个队列名称
String queueName1 = "nice_queue";
channel1.queueDeclare(queueName1, true, false, false, null);
channel2.queueBind(queueName1, EXCHANGE_NAME, "");

String queueName2 = "fuck_queue";
channel1.queueDeclare(queueName2, true, false, false, null);
channel2.queueBind(queueName2, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" nice " + message + "'");
};

DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" fuck " + message + "'");
};

channel1.basicConsume(queueName1, true, deliverCallback1, consumer -> {
});
channel2.basicConsume(queueName2, true, deliverCallback2, consumer -> {
});
}
}
  • 生产者生产消息,所有消费者都能够收到:(2024/01/15午)

image-20240115171624222

image-20240115171641966

亮点集锦


驾驭风暴:RabbitMQ实战之旅——揭秘消息队列的无限潜能
http://example.com/2023/10/25/驾驭风暴:RabbitMQ实战之旅——揭秘消息队列的无限潜能/
作者
Memory
发布于
2023年10月25日
更新于
2023年10月26日
许可协议