SpringBoot中优雅使用RabbitMQ

前言

最近项目中需要用到延时队列的功能,想过很多实现方式,比如Java自带的delayQuene实现起来简单,时间轮,Redis的有序集合等。但由于各种原因最终选择了RabbitMQ实现。

delayQuene占用内存大。数据量少,逻辑简单不想引入别的技术时建议使用。

时间轮只知道概念接触不深。

redis的本质还是一个内存型数据库。用sleep线程1s的方式取数据不够优雅。

系统中已经有模块使用了RabbitMQ,只需要引入一个延时队列插件(rabbitmq_delayed_message_exchange)即可。

任性的技术选型。

简介

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而聚类和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端。(维基百科)

基本概念

Publisher:发布者(或称为生产者)负责生产消息并将其投递到指定的交换器上。

Consumer:订阅者(或称为消费者)负责从消息队列中取得消息的客户端应用程序。

Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

Binding:绑定,用于消息队列和交换器之间的关联。

Queue:消息队列,用来保存消息直到发送给消费者。

安装

选用Docker镜像安装,因为方便。

step1: 下载rabbitmq-delayed-message-exchange插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez

step2:准备dockerfile文件

1
2
3
FROM rabbitmq:3-management
COPY ["rabbitmq_delayed_message_exchange-3.9.0.ez" , "/plugins/"]
RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange

step3:构建镜像

1
docker build -t rabbitmq:3-management-delay .

step4:运行Docker容器,设置账号为admin 密码为123456

1
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3-management-delay

step5:验证安装结果

打开http://localhost:15672/ 输入账号密码

如果type下拉框中出现x-delayed-message就说明插件安装成功了。

配置

引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加 RabbitMQ 地址和帐号

1
2
3
4
5
6
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456

发送简单消息

先来看一个最简配置的demo,发送简单消息,不需要用到交换机,只需要配置消费者和发布(生产)者就可以了。

创建队列

1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class RabbitConfig {

public static final String SIMPLE_QUEUE_NAME = "simple_queue";

// 创建队列
@Bean
public Queue simpleQueue() {
return new Queue(RabbitConfig.SIMPLE_QUEUE_NAME, true);
}
}

配置发布(生产)者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class RabbitmqPublish {

private final RabbitTemplate rabbitTemplate;

public RabbitmqPublish(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

public void sendSimpleMsg(String message) {
rabbitTemplate.convertAndSend(RabbitConfig.SIMPLE_QUEUE_NAME, message);
}
}

配置消费者

1
2
3
4
5
6
7
8
9
10
11
@Component
@Slf4j
public class RabbitmqConsumer {

// 消费对应名称队列上的消息
@RabbitListener(queues = RabbitConfig.SIMPLE_QUEUE_NAME)
@RabbitHandler
public void consumerSimpleMessage(@Payload String message) {
log.info("simple队列的内容:{}", message);
}
}

创建控制器发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
@RestController
@RequestMapping("rabbit")
public class IndexController {

@Autowired
RabbitmqPublish rabbitmqPublish;

@GetMapping("simple")
public String publishSimpleMessage() {
rabbitmqPublish.sendSimpleMsg("简单消息" + new Date().getTime());
return "发送完成";
}
}

发送延迟消息

来到本文的重点,使用fanout类型的交换机(Exchange)演示,此时会把消息路由到与该交换器绑定的所有队列中。

Exchange类型说明

1.fanout: 发送消息到所有绑定队列(会忽略routingKey)。

2.direct:发送消息到对应RountingKey队列。

3.topic:相当于同时拥有fanout和direct类型的能力(* 用于匹配一个单词, # 用于匹配零个或者多个单词)。

RabbitConfig.java RabbitMQ配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Configuration
public class RabbitConfig {

public static final String DELAY_QUEUE_ONE_NAME = "delay_queue_one";

public static final String DELAY_QUEUE_TWO_NAME = "delay_queue_two";

public static final String DELAY_EXCHANGE = "my.delay.direct";

// 创建队列1
@Bean
public Queue delayQueueOne() {
return new Queue(RabbitConfig.DELAY_QUEUE_ONE_NAME, true);
}

// 创建队列2
@Bean
public Queue delayQueueTwo() {
return new Queue(RabbitConfig.DELAY_QUEUE_TWO_NAME, true);
}

// 创建交换机
@Bean
public CustomExchange delayExchange() {
Map<String, Object> pMap = new HashMap<>();
pMap.put("x-delayed-type", "fanout");
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, pMap);
}

// 队列与交换机绑定
@Bean
public Binding delayQueueOneBind() {
return BindingBuilder.bind(delayQueueOne()).to(delayExchange()).with("").noargs();
}

@Bean
public Binding delayQueueTwoBind() {
return BindingBuilder.bind(delayQueueTwo()).to(delayExchange()).with("").noargs();
}
}

RabbitmqPublish.java 发布(生产)者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class RabbitmqPublish {

private final RabbitTemplate rabbitTemplate;

public RabbitmqPublish(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

public void sendDelayMsg(Article article, int delay) {
// 这里routingKey指定了不会生效,fanout类型会广播给所有订阅者
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE, "", article, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(delay * 1000); // 毫秒为单位,指定此消息的延时时长
return message;
});
}
}

RabbitmqConsumer.java 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
@Slf4j
public class RabbitmqConsumer {
//消费延时消息
@RabbitListener(queues = RabbitConfig.DELAY_QUEUE_ONE_NAME)
@RabbitHandler
public void consumerDelayOneMessage(@Payload Article article) throws IOException {
log.info("延迟队列ONE 标题:{} 作者:{} 内容:{}",
article.getTitle(), article.getAuthor(), article.getContent());
}

@RabbitListener(queues = RabbitConfig.DELAY_QUEUE_TWO_NAME)
public void consumerDelayTWOMessage(Article article, Message message, Channel channel) {
log.info("延迟队列TWO 标题:{} 作者:{} 内容:{}",
article.getTitle(), article.getAuthor(), article.getContent());
}
}

IndexController.java 发送延时消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
@RequestMapping("rabbit")
public class IndexController {

@Autowired
RabbitmqPublish rabbitmqPublish;

@GetMapping("delay")
public String publishDelayMessage() {
Article article = Article.builder().title("三体").author("刘慈欣").content("正文。。。").build();
rabbitmqPublish.sendDelayMsg(article, 2);
rabbitmqPublish.sendDelayMsg(article, 5);
return "延时消息发送完成:" + new Date().getTime();
}
}

总结

今天从RabbitMQ的安装讲起到发送延时消息。本篇介绍的是使用 rabbitmq-delayed-message-exchange 插件的形式。使用RabbitMQ的高级特性TTL(Time To Live)配合死信队列也能实现延时队列的功能。
先创建一个队列进入此队列中的消息设置一个TTL,那么这条消息如果在TTL时间段内没有被消费就会进入死信队列,再创建一个Consumer消费死信队列中的数据,也就变相实现了延时队列的功能。
最后我把完整的代码放在Github上有需要的同学可以start这个项目。

https://github.com/pepsiyoung/hexo-blog-demo/tree/main/rabbitmq-demo