SpringBoot | 第三十八章:基于RabbitMQ实现消息延迟队列方案

前言

前段时间在编写通用的消息通知服务时,由于需要实现类似通知失败时,需要延后几分钟再次进行发送,进行多次尝试后,进入定时发送机制。此机制,在原先对接银联支付时,银联的异步通知也是类似的,在第一次通知失败后,支付标准服务会重发,最多发送五次,每次的间隔时间为1、4、8、16分钟等。本文就简单讲解下使用RabbitMQ实现延时消息队列功能。

一点知识

在此之前,简单说明下基于RabbitMQ实现延时队列的相关知识及说明下延时队列的使用场景。

延时队列使用场景

在很多的业务场景中,延时队列可以实现很多功能,此类业务中,一般上是非实时的,需要延迟处理的,需要进行重试补偿的。

  1. 订单超时关闭:在支付场景中,一般上订单在创建后30分钟或1小时内未支付的,会自动取消订单。
  2. 短信或者邮件通知:在一些注册或者下单业务时,需要在1分钟或者特定时间后进行短信或者邮件发送相关资料的。本身此类业务于主业务是无关联性的,一般上的做法是进行异步发送。
  3. 重试场景:比如消息通知,在第一次通知出现异常时,会在隔几分钟之后进行再次重试发送。

RabbitMQ实现延时队列

本身在RabbitMQ中是未直接提供延时队列功能的,但可以使用TTL(Time-To-Live,存活时间)DLX(Dead-Letter-Exchange,死信队列交换机)的特性实现延时队列的功能。

存活时间(Time-To-Live 简称 TTL)

RabbitMQ中可以对队列和消息分别设置TTL,TTL表明了一条消息可在队列中存活的最大时间。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在TTL时间后死亡成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。

死信交换(Dead Letter Exchanges 简称 DLX)

上个知识点也提到了,设置了TTL的消息或队列最终会成为Dead Letter,当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定此DLX的队列就是死信队列。

一个消息变成死信一般上是由于以下几种情况;

  1. 消息被拒绝
  2. 消息过期
  3. 队列达到了最大长度。

所以,通过TTLDLX的特性可以模拟实现延时队列的功能。当队列中的消息超时成为死信后,会把消息死信重新发送到配置好的交换机中,然后分发到真实的消费队列。故简单来说,我们可以创建2个队列,一个队列用于发送消息,一个队列用于消息过期后的转发的目标队列。

SpringBoot集成RabbitMQ实现延时队列实战

以下使用SpringBoot集成RabbitMQ进行实战说明,在进行http消息通知时,若通知失败(地址不可用或者连接超时)时,将此消息转入延时队列中,待特定时间后进行重新发送。

0.引入pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<!-- rabbit -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 简化http操作 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-http</artifactId>
<version>4.5.16</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-json</artifactId>
<version>4.5.16</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

1.编写rabbitmq配置文件(关键配置)
RabbitConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/** 
*
* @ClassName 类名:RabbitConfig
* @Description 功能说明:
* <p>
* TODO
*</p>
************************************************************************
* @date 创建日期:2019年7月17日
* @author 创建人:oKong
* @version 版本号:V1.0
*<p>
***************************修订记录*************************************
*
* 2019年7月17日 oKong 创建该类功能。
*
***********************************************************************
*</p>
*/
@Configuration
public class RabbitConfig {

@Autowired
ConnectionFactory connectionFactory;

/**
* 消费者线程数 设置大点 大概率是能通知到的
*/
@Value("${http.notify.concurrency:50}")
int concurrency;

/**
* 延迟队列的消费者线程数 可设置小点
*/
@Value("${http.notify.delay.concurrency:20}")
int delayConcurrency;

@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(connectionFactory);
}

@Bean
public DirectExchange httpMessageNotifyDirectExchange(RabbitAdmin rabbitAdmin) {
//durable 是否持久化
//autoDelete 是否自动删除,即服务端或者客服端下线后 交换机自动删除
DirectExchange directExchange = new DirectExchange(ApplicationConstant.HTTP_MESSAGE_EXCHANGE,true,false);
directExchange.setAdminsThatShouldDeclare(rabbitAdmin);
return directExchange;
}

//设置消息队列
@Bean
public Queue httpMessageStartQueue(RabbitAdmin rabbitAdmin) {

/*
创建接收队列,4个参数
name - 队列名称
durable - false,不进行持有化
exclusive - true,独占性
autoDelete - true,自动删除*/
Queue queue = new Queue(ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME, true, false, false);
queue.setAdminsThatShouldDeclare(rabbitAdmin);
return queue;
}

//队列绑定交换机
@Bean
public Binding bindingStartQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageStartQueue) {
Binding binding = BindingBuilder.bind(httpMessageStartQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_START_RK);
binding.setAdminsThatShouldDeclare(rabbitAdmin);
return binding;
}

@Bean
public Queue httpMessageOneQueue(RabbitAdmin rabbitAdmin) {
Queue queue = new Queue(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME, true, false, false);
queue.setAdminsThatShouldDeclare(rabbitAdmin);
return queue;
}

@Bean
public Binding bindingOneQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageOneQueue) {
Binding binding = BindingBuilder.bind(httpMessageOneQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_ONE_RK);
binding.setAdminsThatShouldDeclare(rabbitAdmin);
return binding;
}

//-------------设置延迟队列--开始--------------------
@Bean
public Queue httpDelayOneQueue() {
//name - 队列名称
//durable - true
//exclusive - false
//autoDelete - false
return QueueBuilder.durable("http.message.dlx.one")
//以下是重点:当变成死信队列时,会转发至 路由为x-dead-letter-exchange及x-dead-letter-routing-key的队列中
.withArgument("x-dead-letter-exchange", ApplicationConstant.HTTP_MESSAGE_EXCHANGE)
.withArgument("x-dead-letter-routing-key", ApplicationConstant.HTTP_MESSAGE_ONE_RK)
.withArgument("x-message-ttl", 1*60*1000)//1分钟 过期时间(单位:毫秒),当过期后 会变成死信队列,之后进行转发
.build();
}
//绑定到交换机上
@Bean
public Binding bindingDelayOneQuene(RabbitAdmin rabbitAdmin, DirectExchange httpMessageNotifyDirectExchange, Queue httpDelayOneQueue) {
Binding binding = BindingBuilder.bind(httpDelayOneQueue).to(httpMessageNotifyDirectExchange).with("delay.one");
binding.setAdminsThatShouldDeclare(rabbitAdmin);
return binding;
}
//-------------设置延迟队列--结束--------------------

//建议将正常的队列和延迟处理的队列分开
//设置监听容器
@Bean("notifyListenerContainer")
public SimpleRabbitListenerContainerFactory httpNotifyListenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手动ack
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(1);
factory.setConcurrentConsumers(concurrency);
return factory;
}

// 设置监听容器
@Bean("delayNotifyListenerContainer")
public SimpleRabbitListenerContainerFactory httpDelayNotifyListenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手动ack
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(1);
factory.setConcurrentConsumers(delayConcurrency);
return factory;
}
}

ApplicationConstant.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ApplicationConstant {

/**
* 发送http通知的 exchange 队列
*/
public static final String HTTP_MESSAGE_EXCHANGE = "http.message.exchange";

/**
* 配置消息队列和路由key值
*/
public static final String HTTP_MESSAGE_START_QUEUE_NAME = "http.message.start";
public static final String HTTP_MESSAGE_START_RK = "rk.start";

public static final String HTTP_MESSAGE_ONE_QUEUE_NAME = "http.message.one";
public static final String HTTP_MESSAGE_ONE_RK = "rk.one";

/**
* 通知队列对应的延迟队列关系,即过期队列之后发送到下一个的队列信息,可以根据实际情况添加,当然也可以根据一定规则自动生成
*/
public static final Map<String,String> delayRefMap = new HashMap<String, String>() {
/**
*
*/
private static final long serialVersionUID = -779823216035682493L;

{
put(HTTP_MESSAGE_START_QUEUE_NAME, "delay.one");
}
};
}

简单来说,就是创建一个正常消息发送队列,用于接收http消息请求的参数,同时进行http请求。同时,创建一个延时队列,设置其x-dead-letter-exchangex-dead-letter-routing-keyx-message-ttl值,将其转发到正常的队列中。使用一个map对象维护一个关系,当正常消息异常时,需要发送的延时队列的队列名称,当然时间场景汇总,根据需要可以进行动态配置或者根据一定规则进行动态映射。

2.创建监听类,用于消息的消费操作,此处使用@RabbitListener来消费消息(当然也可以使用SimpleMessageListenerContainer进行消息配置的),创建了一个正常消息监听和延时队列监听,由于一般上异常通知是低概率事件,可根据不同的监听容器进行差异化配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/** 
*
* @ClassName 类名:HttpMessagerLister
* @Description 功能说明:http通知消费监听接口
* <p>
* TODO
*</p>
************************************************************************
* @date 创建日期:2019年7月17日
* @author 创建人:oKong
* @version 版本号:V1.0
*<p>
***************************修订记录*************************************
*
* 2019年7月17日 oKong 创建该类功能。
*
***********************************************************************
*</p>
*/
@Component
@Slf4j
public class HttpMessagerLister {

@Autowired
HttpMessagerService messagerService;

@RabbitListener(id = "httpMessageNotifyConsumer", queues = {ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME}, containerFactory = "notifyListenerContainer")
public void httpMessageNotifyConsumer(Message message, Channel channel) throws Exception {
doHandler(message, channel);
}

@RabbitListener(id= "httpDelayMessageNotifyConsumer", queues = {
ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME,}, containerFactory = "delayNotifyListenerContainer")
public void httpDelayMessageNotifyConsumer(Message message, Channel channel) throws Exception {
doHandler(message, channel);
}

private void doHandler(Message message, Channel channel) throws Exception {
String body = new String(message.getBody(),"utf-8");
String queue = message.getMessageProperties().getConsumerQueue();
log.info("接收到通知请求:{},队列名:{}",body, queue);
//消息对象转换
try {
HttpEntity httpNotifyDto = JSONUtil.toBean(body, HttpEntity.class);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//发送通知
messagerService.notify(queue, httpNotifyDto);
} catch(Exception e) {
log.error(e.getMessage());
//ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}

HttpMessagerService.java:消息真正处理的类,此类是关键,这里未进行日志记录,真实场景中,强烈建议进行消息通知的日志存储,防止日后信息的查看,同时也能通过发送状态,在重试次数都失败后,进行定时再次发送功能,同时也有据可查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Component
@Slf4j
public class HttpMessagerService {

@Autowired
AmqpTemplate mqTemplate;

public void notify(String queue,HttpEntity httpEntity) {
//发起请求
log.info("开始发起http请求:{}", httpEntity);
try {
switch(httpEntity.getMethod().toLowerCase()) {
case "POST":
HttpUtil.post(httpEntity.getUrl(), httpEntity.getParams());
break;
case "GET":
default:
HttpUtil.get(httpEntity.getUrl(), httpEntity.getParams());
}
} catch (Exception e) {
//发生异常,放入延迟队列中
String nextRk = ApplicationConstant.delayRefMap.get(queue);
if(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME.equals(queue)) {
//若已经是最后一个延迟队列的消息队列了,则后续可直接放入数据库中 待后续定时策略进行再次发送
log.warn("http通知已经通知N次失败,进入定时进行发起通知,url={}", httpEntity.getUrl());
} else {
log.warn("http重新发送通知:{}, 通知队列rk为:{}, 原队列:{}", httpEntity.getUrl(), nextRk, queue);
mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, nextRk, cn.hutool.json.JSONUtil.toJsonStr(httpEntity));
}
}
}
}

3.创建控制层服务(真实场景中,如SpringCloud微服务中,一般上是创建个api接口,供其他服务进行调用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Slf4j
@RestController
@Api(tags = "http测试接口")
public class HttpDemoController {

@Autowired
AmqpTemplate mqTemplate;

@PostMapping("/send")
@ApiOperation(value="send",notes = "发送http测试")
public String sendHttp(@RequestBody HttpEntity httpEntity) {
//发送http请求
log.info("开始发起http请求,发布异步消息:{}", httpEntity);
mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, ApplicationConstant.HTTP_MESSAGE_START_RK, cn.hutool.json.JSONUtil.toJsonStr(httpEntity));
return "发送成功:url=" + httpEntity.getUrl();
}
}

4.配置文件添加RabbitMQ相关配置信息

1
2
3
4
5
6
7
8
9
10
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

# 通知-消费者线程数 设置大点 大概率是能通知到的
http.notify.concurrency=150
# 延迟队列的消费者线程数 可设置小点
http.notify.delay.concurrency=10

5.编写启动类。

1
2
3
4
5
6
7
8
9
@SpringBootApplication
@Slf4j
public class DelayQueueApplication {

public static void main(String[] args) throws Exception {
SpringApplication.run(DelayQueueApplication.class, args);
log.info("spring-boot-rabbitmq-delay-queue-chapter38服务启动!");
}
}

6.启动服务。使用swagger进行简单调用测试。

  • 正常通知:

正常通知

1
2
3
2019-07-20 23:52:23.792  INFO 65216 --- [nio-8080-exec-1] c.l.l.s.c.controller.HttpDemoController  : 开始发起http请求,发布异步消息:HttpEntity(url=www.baidu.com, params={a=1}, method=get)
2019-07-20 23:52:23.794 INFO 65216 --- [TaskExecutor-97] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知请求:{"method":"get","params":{"a":1},"url":"www.baidu.com"},队列名:http.message.start
2019-07-20 23:52:23.794 INFO 65216 --- [TaskExecutor-97] c.l.l.s.c.service.HttpMessagerService : 开始发起http请求:HttpEntity(url=www.baidu.com, params={a=1}, method=get)

  • 异常通知:访问一个不存在的地址

mark

1
2
3
4
2019-07-20 23:53:14.699  INFO 65216 --- [nio-8080-exec-4] c.l.l.s.c.controller.HttpDemoController  : 开始发起http请求,发布异步消息:HttpEntity(url=www.baidu.com1, params={a=1}, method=get)
2019-07-20 23:53:14.705 INFO 65216 --- [TaskExecutor-84] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知请求:{"method":"get","params":{"a":1},"url":"www.baidu.com1"},队列名:http.message.start
2019-07-20 23:53:14.705 INFO 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService : 开始发起http请求:HttpEntity(url=www.baidu.com1, params={a=1}, method=get)
2019-07-20 23:53:14.706 WARN 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService : http重新发送通知:www.baidu.com1, 通知队列rk为:delay.one, 原队列:http.message.start

RabbitMQ后台中,可以看见http.message.dlx.one队列中存在这需要延时处理的消息,在一分钟后会转发至http.message.one队列中。

mark

在一分钟后,可以看见消息本再次消费了。

1
2
3
2019-07-20 23:54:14.722  INFO 65216 --- [TaskExecutor-16] c.l.l.s.chapter38.mq.HttpMessagerLister  : 接收到通知请求:{"method":"get","params":{"a":1},"url":"www.baidu.com1"},队列名:http.message.one
2019-07-20 23:54:14.723 INFO 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService : 开始发起http请求:HttpEntity(url=www.baidu.com1, params={a=1}, method=get)
2019-07-20 23:54:14.723 WARN 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService : http通知已经通知N次失败,进入定时进行发起通知,url=www.baidu.com1

一些最佳实践

在正式场景中,一般上补偿或者重试机制大概率是不会发送的,倘若发生时,一般上是第三方业务系统出现了问题,故一般上在进行补充时,应该在非高峰期进行操作,故应该对延时监听器,应该在高峰期时停止消费,在非高峰期时进行消费。同时,还可以根据不同的通知类型,放入不一样的延时队列中,保障业务的正常。这里简单说明下,动态停止或者启动演示监听器的方式。一般上是使用RabbitListenerEndpointRegistry对象获取延时监听器,之后进行动态停止或者启用。可设置@RabbitListener的id属性,直接进行获取,当然也可以直接获取所有的监听器,进行自定义判断了。

1
2
3
4
5
6
7
8
9
10
11
12
13
 @Autowired
RabbitListenerEndpointRegistry registry;

@GetMapping("/set")
@ApiOperation(value = "set", notes = "设置消息监听器的状态")
public String setSimpleMessageListenerContainer(String status) {
if("1".equals(status)) {
registry.getListenerContainer("httpDelayMessageNotifyConsumer").start();
} else {
registry.getListenerContainer("httpDelayMessageNotifyConsumer").stop();
}
return status;
}

这里,只是简单进行演示说明,在真实场景下,可以使用定时器,判断当前是否为高峰期,进而进行动态设置监听器的状态。

参考资料

  1. https://www.rabbitmq.com/admin-guide.html
  2. https://www.rabbitmq.com/ttl.html

总结

本文主要简单介绍了基于RabbitMQ实现延时队列的功能。对于需要实现更加灵活的配置及功能时,如可自定义配置通知次数等,大家可根据自己的需求进行添加,可以使用动态创建队列的方式。当然使用延时队列的方式还有很多,比如可以使用redis的key值过期回调机制使用,也可以使用定时机制。另,发现好久没有写文章了,感觉写的有点乱,还望见谅呀~

最后

目前互联网上很多大佬都有SpringBoot系列教程,如有雷同,请多多包涵了。原创不易,码字不易,还希望大家多多支持。若文中有所错误之处,还望提出,谢谢。

老生常谈

  • 个人QQ:499452441
  • 微信公众号:lqdevOps

公众号

个人博客:http://blog.lqdev.cn
完整示例:基于RabbitMQ实现消息延迟队列方案
原文地址:https://blog.lqdev.cn/2019/07/21/springboot/chapter-thirty-eight/