当前位置: 首页 > news >正文

【Fifty Project - D33】

连带着端午一起给自己放了一个小长假,返校的第一天开始了无尽的挨骂QAQ,一个小论文从上午讨论到下午,是真的一坨,也是真的不想写哇

今日完成记录

TimePlan完成情况
8:30 - 9:30RabbitMQ学习
9:30 - 11:00挨骂
13:30 - 14:30练胸
14:30 - 15:30继续挨骂
15:30 - 17:30RabbitMQ学习
19:00 - 20:30篮球
20:30 - 22:00羽毛球

RabbitMQ

粗略完成了初级篇,今天开始高级篇,主要是关于消息队列的可靠性部分

主要学习问题有两个:

  1. 如何保证MQ消息的可靠性
  2. 如果真的出现了消息发送失败,有没有其他的兜底方案

在讨论这两个问题之前,先分析一下在MQ发送消息到消费消息成功这个过程中,哪些环节可能出现丢失消息?
在这里插入图片描述
如上图,可能发生消息丢失的情况有:

  1. 生产者发送消息给交换机:
    • 连接MQ失败
    • 发送消息找不到交换机
    • 交换机接收但是找不到匹配的队列【路由失败】
    • 消息到达MQ后处理消息的进程发生异常
  2. MQ导致消息丢失:
    • 消息发送到队列后未被消费就宕机了
  3. 消费者处理消息时:
    • 消息接收后未处理就宕机
    • 消息接收后处理过程抛出异常

因此,我们应该在这三个方面保证MQ的可靠性:

  1. 确保生产者一定会将消息发送到MQ
  2. 确保MQ不会弄丢消息
  3. 确保消费者成功消费消息

生产者的可靠性

生产者重试机制

首先针对第一种场景:生产者发送消息时,由于网络导致的MQ连接中断
为了解决这个问题,SpringAMQP提供了消息发送的重试机制:当RabbitTemplate与MQ连接超时后,进行多次重试
修改application.yaml增加相关设置

spring:rabbitmq:connection-timeout: 1stemplate:retry:enabled: trueinitial-interval: 1000ms # 失败后的初始等待时间multiplier: 1 # 失败后下次的等待时长倍数 下次等待时间 = 上一次等待时间/initial-interval * multipliermax-attempts: 3 # 最大尝试重连次数

简单测试一下,首先停掉rabbitmq服务,模拟网络连接失败,然后启动测试脚本尝试发送消息,观察日志情况。

06-05 19:24:41:919  INFO 33216 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.4.41:5672]
06-05 19:24:43:948  INFO 33216 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.4.41:5672]
06-05 19:24:45:970  INFO 33216 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.4.41:5672]

观察发现等待时间间隔是2s,并不是1s,实际上这是因为其他操作的一些耗时和计算开销。当设置初始间隔为2s,退避指数为2,最大等待时间为20s,发现等待时间变化是:3,5,9,11,21,21,21。每一项减去1刚好符合计算公式,所以这个1s应该就是一些其余的计算开销。

这个重试是阻塞的,所以如果对业务性能有要求,最好禁用这个机制,或者是使用异步线程执行发送消息的代码(避免阻塞业务线程)

生产者确认机制

一般来说,生产者和MQ之间的网络通畅,基本不会出现消息丢失的情况,因此大多数情况不需要考虑这个问题,但是在少数情况下,也会出现消息发送到MQ后丢失的现象:

  1. MQ内部处理消息的进程异常
  2. 生产者发送消息到MQ后没找到exchange
  3. 发送到exchange后没找到queue,无法路由

因此设计了生产者确认机制,publisher confirm和publisher return。publisher confirm适用于解决消息是否到达MQ的问题,而publisher return是解决消息无法路由到队列(可能是队列已满或者是所有队列均不匹配)的问题。一个是消息接收确认,一个是消息投递失败回退。

publisher confirm

机制:
生产者发送消息后,MQ会异步返回一个确认信号(ack/nack)

  • ack: 消息成功写入队列(或者持久化磁盘)
  • nack:消息未能处理(服务器故障或者队列满)

特点:

  • 异步回调(不阻塞生产者)
  • 支持批量确认
  • 确保生产者到MQ的可靠传输

(1)开启生产者确认

// yaml 设置
spring:rabbitmq:publisher-confirm-type: correlated

publisher-confirm-type有三个选项:

  1. none 关闭
  2. simple 同步阻塞等待MQ回执
  3. correlated 异步等待MQ回执

(2)配置消息确认回调函数【每个消息发送都要定义自己的消息确定回调函数】

@Test
void testSendMQDirect(){CorrelationData correlationData = new CorrelationData();correlationData.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {@Overridepublic void onFailure(Throwable ex) {// 这里是回调失败,而不是nacklog.error("send msg fail", ex);}@Overridepublic void onSuccess(CorrelationData.Confirm result) {// 这里是成功拿到回执if(result.isAck()){log.error("send msg success, get ack");}else{log.error("send msg success, get nack, reason: {}", result.getReason());}}});rt.convertAndSend("df.direct1", "pork", "this is a msg to test confirm",  correlationData);
}
publisher return

机制:

  • 当消息无法路由到任何队列,MQ会将消息退回给生产者
  • 需要设置mandatory=true或者mandatory-expression="true"

触发条件:

  • routing key无法匹配任何队列
  • 匹配到的所有队列都已经满了无法接受消息

特点:

  • 只有在消息无法投递时触发【异步】
  • 需要生产者处理退回的消息【重试或者记录日志等】
  • 确保消息不会因为路由失败而丢失

(1)开启生产者退回

spring:rabbitmq:publisher-returns: true

(2) 设置回退消息处理函数【一个template只需要一次配置】

@Slf4j
@Configuration
@AllArgsConstructor
public class MqConfig {private final RabbitTemplate rt;@PostConstructpublic void init(){rt.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {log.error("触发return");log.debug("exchange: {}", returned.getExchange());log.debug("routingKey: {}", returned.getRoutingKey());log.debug("message: {}", returned.getMessage());log.debug("replyCode: {}", returned.getReplyCode());log.debug("replyText: {}", returned.getReplyText());}});}
}

我们设置一个不存在绑定关系的routing key,然后发送消息测试,实验结果如下,可见发送到MQ成功,打印了ack,但是因为路由失败,出发了消息回退

06-06 10:34:14:273 ERROR 35176 --- [2.168.4.41:5672] com.itheima.publisher.MQTest             : send msg success, get ack
06-06 10:34:14:273 ERROR 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig    : 触发return
06-06 10:34:14:273 DEBUG 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig    : exchange: df.direct1
06-06 10:34:14:275 DEBUG 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig    : routingKey: pork1
06-06 10:34:14:275 DEBUG 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig    : message: (Body:'"this is a msg to test confirm"' MessageProperties [headers={spring_returned_message_correlation=a4d61a01-79e2-445e-a844-885c7b4634b9, __TypeId__=java.lang.String}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
06-06 10:34:14:275 DEBUG 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig    : replyCode: 312
06-06 10:34:14:275 DEBUG 35176 --- [nectionFactory1] com.itheima.publisher.config.MqConfig    : replyText: NO_ROUTE
http://www.lqws.cn/news/165709.html

相关文章:

  • select、poll、epoll 与 Reactor 模式
  • UI学习—cell的复用和自定义cell
  • linux 串口调试命令 stty
  • SELinux是什么以及如何编写SELinux策略
  • Git操作记录
  • 知识蒸馏:从模型输出到深层理解
  • JAVA开发工具——IntelliJ IDEA
  • 在不同型号的手机或平板上后台运行Aidlux
  • 上门预约行业技术方案全解析:小程序、App还是H5?如何选择?
  • Nature Methods | OmiCLIP:整合组织病理学与空间转录组学的AI模型
  • 【JavaWeb】Linux项目部署
  • 复变函数中的对数函数及其MATLAB演示
  • 如何利用Haption力反馈遥操作机器人解决远程操作难题
  • 2021 RoboCom 世界机器人开发者大赛-高职组(初赛)解题报告 | 珂学家
  • Linux Core 文件查看和调试
  • 第七十四篇 高并发场景下的Java并发容器:用生活案例讲透技术原理
  • 用 NGINX 构建高效 SMTP 代理`ngx_mail_smtp_module`
  • 深度学习N2周:构建词典
  • 板凳-------Mysql cookbook学习 (十)
  • ReLU 激活函数:重大缺陷一去不复返!
  • reverse笔记
  • 目标检测实战:让AI“看见“并定位物体(superior哥AI系列第11期)
  • Web后端基础(Maven基础)
  • 8天Python从入门到精通【itheima】-69~70(字符串的常见定义和操作+案例练习)
  • 从Copilot到Agent,AI Coding是如何进化的?
  • 2025爱分析·银行数字化应用实践报告|爱分析报告
  • [闭源saas选项]Pinecone:为向量数据库而生的实时语义搜索引擎
  • 关于智能体API参考接口
  • SpringBoot自动化部署实战技术文章大纲
  • 前端使用 preview 插件预览docx文件