前言
最近项目中需要用到延时队列的功能,想过很多实现方式,比如Java自带的delayQuene实现起来简单,时间轮,Redis的有序集合等。但由于各种原因最终选择了RabbitMQ实现。
delayQuene占用内存大。数据量少,逻辑简单不想引入别的技术时建议使用。
时间轮只知道概念接触不深。
redis的本质还是一个内存型数据库。用sleep线程1s的方式取数据不够优雅。
系统中已经有模块使用了RabbitMQ,只需要引入一个延时队列插件(rabbitmq_delayed_message_exchange)即可。
任性的技术选型。
简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而聚类和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。(维基百科)
基本概念
Publisher:发布者(或称为生产者)负责生产消息并将其投递到指定的交换器上。
Consumer:订阅者(或称为消费者)负责从消息队列中取得消息的客户端应用程序。
Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Binding:绑定,用于消息队列和交换器之间的关联。
Queue:消息队列,用来保存消息直到发送给消费者。
安装
选用Docker镜像安装,因为方便。
step1: 下载rabbitmq-delayed-message-exchange插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
step2:准备dockerfile文件
1 2 3
| FROM rabbitmq:3-management COPY ["rabbitmq_delayed_message_exchange-3.9.0.ez" , "/plugins/"] RUN rabbitmq-plugins enable --offline rabbitmq_delayed_message_exchange
|
step3:构建镜像
1
| docker build -t rabbitmq:3-management-delay .
|
step4:运行Docker容器,设置账号为admin 密码为123456
1
| docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3-management-delay
|
step5:验证安装结果
打开http://localhost:15672/ 输入账号密码
如果type下拉框中出现x-delayed-message就说明插件安装成功了。
配置
引入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
添加 RabbitMQ 地址和帐号
1 2 3 4 5 6
| spring: rabbitmq: host: localhost port: 5672 username: admin password: 123456
|
发送简单消息
先来看一个最简配置的demo,发送简单消息,不需要用到交换机,只需要配置消费者和发布(生产)者就可以了。
创建队列
1 2 3 4 5 6 7 8 9 10 11
| @Configuration public class RabbitConfig { public static final String SIMPLE_QUEUE_NAME = "simple_queue"; @Bean public Queue simpleQueue() { return new Queue(RabbitConfig.SIMPLE_QUEUE_NAME, true); } }
|
配置发布(生产)者
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component public class RabbitmqPublish {
private final RabbitTemplate rabbitTemplate;
public RabbitmqPublish(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; }
public void sendSimpleMsg(String message) { rabbitTemplate.convertAndSend(RabbitConfig.SIMPLE_QUEUE_NAME, message); } }
|
配置消费者
1 2 3 4 5 6 7 8 9 10 11
| @Component @Slf4j public class RabbitmqConsumer {
@RabbitListener(queues = RabbitConfig.SIMPLE_QUEUE_NAME) @RabbitHandler public void consumerSimpleMessage(@Payload String message) { log.info("simple队列的内容:{}", message); } }
|
创建控制器发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13
| @RestController @RequestMapping("rabbit") public class IndexController {
@Autowired RabbitmqPublish rabbitmqPublish;
@GetMapping("simple") public String publishSimpleMessage() { rabbitmqPublish.sendSimpleMsg("简单消息" + new Date().getTime()); return "发送完成"; } }
|
发送延迟消息
来到本文的重点,使用fanout类型的交换机(Exchange)演示,此时会把消息路由到与该交换器绑定的所有队列中。
Exchange类型说明
1.fanout: 发送消息到所有绑定队列(会忽略routingKey)。
2.direct:发送消息到对应RountingKey队列。
3.topic:相当于同时拥有fanout和direct类型的能力(* 用于匹配一个单词, # 用于匹配零个或者多个单词)。
RabbitConfig.java RabbitMQ配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @Configuration public class RabbitConfig { public static final String DELAY_QUEUE_ONE_NAME = "delay_queue_one";
public static final String DELAY_QUEUE_TWO_NAME = "delay_queue_two"; public static final String DELAY_EXCHANGE = "my.delay.direct"; @Bean public Queue delayQueueOne() { return new Queue(RabbitConfig.DELAY_QUEUE_ONE_NAME, true); }
@Bean public Queue delayQueueTwo() { return new Queue(RabbitConfig.DELAY_QUEUE_TWO_NAME, true); } @Bean public CustomExchange delayExchange() { Map<String, Object> pMap = new HashMap<>(); pMap.put("x-delayed-type", "fanout"); return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, pMap); } @Bean public Binding delayQueueOneBind() { return BindingBuilder.bind(delayQueueOne()).to(delayExchange()).with("").noargs(); }
@Bean public Binding delayQueueTwoBind() { return BindingBuilder.bind(delayQueueTwo()).to(delayExchange()).with("").noargs(); } }
|
RabbitmqPublish.java 发布(生产)者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Component public class RabbitmqPublish { private final RabbitTemplate rabbitTemplate;
public RabbitmqPublish(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendDelayMsg(Article article, int delay) { rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE, "", article, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(delay * 1000); return message; }); } }
|
RabbitmqConsumer.java 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Component @Slf4j public class RabbitmqConsumer { @RabbitListener(queues = RabbitConfig.DELAY_QUEUE_ONE_NAME) @RabbitHandler public void consumerDelayOneMessage(@Payload Article article) throws IOException { log.info("延迟队列ONE 标题:{} 作者:{} 内容:{}", article.getTitle(), article.getAuthor(), article.getContent()); }
@RabbitListener(queues = RabbitConfig.DELAY_QUEUE_TWO_NAME) public void consumerDelayTWOMessage(Article article, Message message, Channel channel) { log.info("延迟队列TWO 标题:{} 作者:{} 内容:{}", article.getTitle(), article.getAuthor(), article.getContent()); } }
|
IndexController.java 发送延时消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @RestController @RequestMapping("rabbit") public class IndexController {
@Autowired RabbitmqPublish rabbitmqPublish;
@GetMapping("delay") public String publishDelayMessage() { Article article = Article.builder().title("三体").author("刘慈欣").content("正文。。。").build(); rabbitmqPublish.sendDelayMsg(article, 2); rabbitmqPublish.sendDelayMsg(article, 5); return "延时消息发送完成:" + new Date().getTime(); } }
|
总结
今天从RabbitMQ的安装讲起到发送延时消息。本篇介绍的是使用 rabbitmq-delayed-message-exchange 插件的形式。使用RabbitMQ的高级特性TTL(Time To Live)配合死信队列也能实现延时队列的功能。
先创建一个队列进入此队列中的消息设置一个TTL,那么这条消息如果在TTL时间段内没有被消费就会进入死信队列,再创建一个Consumer消费死信队列中的数据,也就变相实现了延时队列的功能。
最后我把完整的代码放在Github上有需要的同学可以start这个项目。
https://github.com/pepsiyoung/hexo-blog-demo/tree/main/rabbitmq-demo