【Fifty Project - D33】
连带着端午一起给自己放了一个小长假,返校的第一天开始了无尽的挨骂QAQ,一个小论文从上午讨论到下午,是真的一坨,也是真的不想写哇
今日完成记录
Time | Plan | 完成情况 |
---|---|---|
8:30 - 9:30 | RabbitMQ学习 | √ |
9:30 - 11:00 | 挨骂 | √ |
13:30 - 14:30 | 练胸 | √ |
14:30 - 15:30 | 继续挨骂 | √ |
15:30 - 17:30 | RabbitMQ学习 | √ |
19:00 - 20:30 | 篮球 | √ |
20:30 - 22:00 | 羽毛球 | √ |
RabbitMQ
粗略完成了初级篇,今天开始高级篇,主要是关于消息队列的可靠性部分
主要学习问题有两个:
- 如何保证MQ消息的可靠性
- 如果真的出现了消息发送失败,有没有其他的兜底方案
在讨论这两个问题之前,先分析一下在MQ发送消息到消费消息成功这个过程中,哪些环节可能出现丢失消息?
如上图,可能发生消息丢失的情况有:
- 生产者发送消息给交换机:
- 连接MQ失败
- 发送消息找不到交换机
- 交换机接收但是找不到匹配的队列【路由失败】
- 消息到达MQ后处理消息的进程发生异常
- MQ导致消息丢失:
- 消息发送到队列后未被消费就宕机了
- 消费者处理消息时:
- 消息接收后未处理就宕机
- 消息接收后处理过程抛出异常
因此,我们应该在这三个方面保证MQ的可靠性:
- 确保生产者一定会将消息发送到MQ
- 确保MQ不会弄丢消息
- 确保消费者成功消费消息
生产者的可靠性
生产者重试机制
首先针对第一种场景:生产者发送消息时,由于网络导致的MQ连接中断
为了解决这个问题,SpringAMQP提供了消息发送的重试机制:当RabbitTemplate与MQ连接超时后,进行多次重试
修改application.yaml增加相关设置
spring:rabbitmq:connection-timeout: 1stemplate:retry:enabled: trueinitial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数 下次等待时间 = 上一次等待时间/initial-interval * multipliermax-attempts: 3 # 最大尝试重连次数
简单测试一下,首先停掉rabbitmq服务,模拟网络连接失败,然后启动测试脚本尝试发送消息,观察日志情况。
06-05 19:24:41:919 INFO 33216 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.4.41:5672]
06-05 19:24:43:948 INFO 33216 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.4.41:5672]
06-05 19:24:45:970 INFO 33216 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [192.168.4.41:5672]
观察发现等待时间间隔是2s,并不是1s,实际上这是因为其他操作的一些耗时和计算开销。当设置初始间隔为2s,退避指数为2,最大等待时间为20s,发现等待时间变化是:3,5,9,11,21,21,21。每一项减去1刚好符合计算公式,所以这个1s应该就是一些其余的计算开销。
这个重试是阻塞的,所以如果对业务性能有要求,最好禁用这个机制,或者是使用异步线程执行发送消息的代码(避免阻塞业务线程)
生产者确认机制
一般来说,生产者和MQ之间的网络通畅,基本不会出现消息丢失的情况,因此大多数情况不需要考虑这个问题,但是在少数情况下,也会出现消息发送到MQ后丢失的现象:
- MQ内部处理消息的进程异常
- 生产者发送消息到MQ后没找到exchange
- 发送到exchange后没找到queue,无法路由
因此设计了生产者确认机制,publisher confirm和publisher return。publisher confirm适用于解决消息是否到达MQ的问题,而publisher return是解决消息无法路由到队列(可能是队列已满或者是所有队列均不匹配)的问题。一个是消息接收确认,一个是消息投递失败回退。
publisher confirm
机制:
生产者发送消息后,MQ会异步返回一个确认信号(ack/nack)
- ack: 消息成功写入队列(或者持久化磁盘)
- nack:消息未能处理(服务器故障或者队列满)
特点:
- 异步回调(不阻塞生产者)
- 支持批量确认
- 确保生产者到MQ的可靠传输
(1)开启生产者确认
// yaml 设置
spring:rabbitmq:publisher-confirm-type: correlated
publisher-confirm-type
有三个选项:
- none 关闭
- simple 同步阻塞等待MQ回执
- correlated 异步等待MQ回执
(2)配置消息确认回调函数【每个消息发送都要定义自己的消息确定回调函数】
@Test
void testSendMQDirect(){CorrelationData correlationData = new CorrelationData();correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 这里是回调失败,而不是nacklog.error("send msg fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 这里是成功拿到回执if(result.isAck()){log.error("send msg success, get ack");}else{log.error("send msg success, get nack, reason: {}", result.getReason());}}});rt.convertAndSend("df.direct1", "pork", "this is a msg to test confirm", correlationData);
}
publisher return
机制:
- 当消息无法路由到任何队列,MQ会将消息退回给生产者
- 需要设置
mandatory=true
或者mandatory-expression="true"
触发条件:
- routing key无法匹配任何队列
- 匹配到的所有队列都已经满了无法接受消息
特点:
- 只有在消息无法投递时触发【异步】
- 需要生产者处理退回的消息【重试或者记录日志等】
- 确保消息不会因为路由失败而丢失
(1)开启生产者退回
spring:rabbitmq:publisher-returns: true
(2) 设置回退消息处理函数【一个template只需要一次配置】
@Slf4j
@Configuration
@AllArgsConstructor
public class MqConfig {private final RabbitTemplate rt;@PostConstructpublic void init(){rt.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}
我们设置一个不存在绑定关系的routing key,然后发送消息测试,实验结果如下,可见发送到MQ成功,打印了ack,但是因为路由失败,出发了消息回退
06-06 10:34:14:273 ERROR 35176 --- [2.168.4.41:5672] com.itheima.publisher.MQTest : send msg success, get ack
06-06 10:34:14:273 ERROR 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig : 触发return
06-06 10:34:14:273 DEBUG 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig : exchange: df.direct1
06-06 10:34:14:275 DEBUG 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig : routingKey: pork1
06-06 10:34:14:275 DEBUG 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig : message: (Body:'"this is a msg to test confirm"' MessageProperties [headers={spring_returned_message_correlation=a4d61a01-79e2-445e-a844-885c7b4634b9, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
06-06 10:34:14:275 DEBUG 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig : replyCode: 312
06-06 10:34:14:275 DEBUG 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig : replyText: NO_ROUTE