当前位置: 首页 > news >正文

Rocket客户端消息确认机制

消息确认机制

RocketMQ要⽀持互联⽹⾦融场景,那么消息安全是必须优先保障的。⽽消息安全有两⽅⾯的要求,⼀⽅⾯是⽣产者要能确保将消息发送到
Broker上。另⼀⽅⾯是消费者要能确保从Broker上争取获取到消息。

  1. 消息⽣产端采⽤消息确认加多次重试的机制保证消息正常发送到RocketMQ
  • 第⼀种称为单向发送
    单向发送⽅式下,消息⽣产者只管往Broker发送消息,⽽全然不关⼼Broker端有没有成功接收到消息。
  • 第⼆种称为同步发送
    同步发送⽅式下,消息⽣产者在往Broker端发送消息后,会阻塞当前线程,等待Broker端的相应结果。
  • 第三种称为异步发送
    异步发送机制下,⽣产者在向Broker发送消息时,会同时注册⼀个回调函数。接下来⽣产者并不等待Broker的响应。当Broker端有响应数据过来时,⾃动触发回调函数进⾏对应的处理。
  1. 消息消费者端采⽤状态确认机制保证消费者⼀定能正常处理对应的消息
    换成了Broker等待消费者返回消息处理状态。
consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}})

这个返回值是⼀个枚举值,有两个选项 CONSUME_SUCCESSRECONSUME_LATER。如果消费者返回CONSUME_SUCCESS,那么消息⾃然就处理结束了。但是如果消费者没有处理成功,返回的是RECONSUME_LATER,Broker就会过⼀段时间再发起消息重试。

为了要兼容重试机制的成功率和性能,RocketMQ设计了⼀套⾮常完善的消息重试机制,从⽽尽可能保证消费者能够正常处理⽤户的订单信息。

  • Broker不可能⽆限制的向消费失败的消费者推送消息。如果消费者⼀直没有恢复,Broker显然不可能⼀直⽆限制的推送,这会浪费集群很多的性能。所以,Broker会记录每⼀个消息的重试次数。如果⼀个消息经过很多次重试后,消费者依然⽆法正常处理,那么Broker会将这个消息推⼊到消费者组对应的死信Topic中。死信Topic相当于windows当中的垃圾桶。你可以⼈⼯介⼊对死信Topic中的消息进⾏补救,也可以直接彻底删除这些消息。RocketMQ默认的最⼤重试次数是16次
  • 为了让这些重试的消息不会影响Topic下其他正常的消息,Broker会给每个消费者组设计对应的重试Topic。MessageQueue是⼀个具有严格FIFO特性的数据结构。如果需要重试的这些消息还是放在原来的MessageQueue中,就会对当前MessageQueue产⽣阻塞,让其他正常的消息⽆法处理。RocketMQ的做法是给每个消费者组⾃动⽣成⼀个对应的重试Topic。在消息需要重试时,会先移动到对应的重试Topic中。后续Broker只要从这些重试Topic中不断拿出消息,往消费者组重新推送即可。这样,这些重试的消息有了⾃⼰单独的队列,就不会影响到Topic下的其他消息了。
  • RocketMQ中设定的消费者组都是订阅主题和消费逻辑相同的服务备份,所以当消息重试时,Broker只要往消费者组中随意⼀个实例推送即可。这是消息重试机制能够正常运⾏的基础。但是,在客户端的具体实现时,MQDefaultMQConsumer并没有强制规定消费者组不能重复。也就是说,你完全可以实现出⼀些订阅主题和消费逻辑完全不同的消费者服务,共同组成⼀个消费组。在这种情况下,RocketMQ不会报错,但是消息的处理逻辑就⽆法保持⼀致了。这会给业务带来很⼤的麻烦。这是在实际应⽤时需要注意的地⽅。
  • Broker端最终只通过消费者组返回的状态来确定消息有没有处理成功。⾄于消费者组⾃⼰的业务执⾏是否正常,Broker端是没有办法知道的。因此,在实现消费者的业务逻辑时,应该要尽量使⽤同步实现⽅式,保证在⾃⼰业务处理完成之后再向Broker端返回状态。⽽应该尽量避免异步的⽅式处理业务逻辑。
  1. 消费者组也可以⾃⾏指定起始消费位点
    Broker端通过Consumer返回的状态来推进所属消费者组对应的Offset。但是,这⾥还是会造成⼀种分裂,消息最终是由Consumer来处理,但是消息却是由Broker推送过来的

使⽤消息队列要如何解决这样的问题呢?这时,就可以创建另外⼀个新的消费者组,并通过ConsumerFromWhere属性指定这个消费者组的消费起点,从⽽让这个新的消费者组去消费之前发送过的历史消息。⽽这个ConsumerFromWhere属性并不是直接指定Offset的数值,因为客户端也不知道Broker端记录的Offset数值是多少。RocketMQ就提供了⼀个枚举值。

public enum ConsumeFromWhere {CONSUME_FROM_LAST_OFFSET, //从对列的最后⼀条消息开始消费 
CONSUME_FROM_FIRST_OFFSET, //从对列的第⼀条消息开始消费 
CONSUME_FROM_TIMESTAMP; //从某⼀个时间点开始重新消费 
}
http://www.lqws.cn/news/129187.html

相关文章:

  • 电路图识图基础知识-降压启动(十五)
  • 2. 库的操作
  • RabbitMQ 的异步化、解耦和流量削峰三大核心机制
  • hadoop集群单词统计(ssh与web)
  • GPUCUDA 发展编年史:从 3D 渲染到 AI 大模型时代(上)
  • 涂胶协作机器人解决方案 | Kinova Link 6 Cobot在涂胶工业的方案应用与价值
  • 线性模型选择中容易被忽视的关键洞察
  • 树莓派系列教程第九弹:Cpolar内网穿透搭建NAS
  • Linux 下支持 **截图 + 录屏** 的高级工具对比
  • c#开发AI模型对话
  • 相机--相机标定实操
  • JavaScript性能优化实战技术
  • webPack基本使用步骤
  • 时序数据库IoTDB与EdgeX Foundry集成适配服务介绍
  • 使用PyQt5的图形用户界面(GUI)开发教程
  • 功能测试、性能测试、安全测试详解
  • linux如何配置wifi连接
  • 机器学习算法分类
  • Neo4j 认证与授权:原理、技术与最佳实践深度解析
  • (每日一道算法题)求根节点到叶节点数字之和
  • 【高校论文】DFORMER重新思考用于语义分割的RGBD表示学习[南开国防科大]
  • C++多态与继承实战解析
  • Python-面向对象
  • RabbitMQ 在解决数据库高并发问题中的定位和核心机制
  • 数据结构与算法学习笔记(Acwing 提高课)----动态规划·树形DP
  • 《小明的一站式套餐服务平台:抽象工厂模式》
  • StarRocks与Apache Iceberg:构建高效湖仓一体的实时分析平台
  • 物联网控制技术期末复习 知识点总结 第二章 单片机
  • 【Python训练营打卡】day43 @浙大疏锦行
  • 高并发区块链系统实战:从架构设计到性能优化