当前位置: 首页 > news >正文

【八股消消乐】消息队列优化—消息丢失

在这里插入图片描述

😊你好,我是小航,一个正在变秃、变强的文艺倾年。
🔔本专栏《八股消消乐》旨在记录个人所背的八股文,包括Java/Go开发、Vue开发、系统架构、大模型开发、具身智能、机器学习、深度学习、力扣算法等相关知识点,期待与你一同探索、学习、进步,一起卷起来叭!

目录

  • 题目
  • 答案
    • 写入语义
    • ISR
    • 消息丢失
      • 生产者发送
      • 主从同步
      • unclean 选举
      • 刷盘
      • 消费者提交
    • 简历优化
      • 基本思路
        • 本地消息表
        • 消息队列不丢失
        • 消费者一定消费
      • 在 Kafka 上支持消息回查机制
        • 回查实现
        • 数据存储
        • 有序消息

题目

💬技术栈:RocketMQ、Kafka、RabbitMQ

🔍简历内容:熟悉Kafka写入语义、ISR、OSR、unclean选举基本原理。自主实现Kafka回查中间件并开源,利用分区表保证回查机制的高性能和高可用,基于哈希计算实现消息有序,最终实现消息回查机制,解决了xx业务消息丢失问题。

🚩面试问:

  • 在支持 Kafka 回查机制中,要是回查中间件把消息转发到业务 topic 了,但是标记成已发送失败,会发生什么?
  • 在支持 Kafka 回查机制中,你可以考虑把关系型数据库换成 Redis,这样换的话有什么优缺点?

在这里插入图片描述

💡建议暂停思考10s,你有答案了嘛?如果你有不同题解,欢迎评论区留言、打卡。


答案

写入语义

在 Kafka 中,消息被存储在分区中。为了避免分区所在的消息服务器宕机,分区本身也是一个主从结构。

在这里插入图片描述

我们在写入消息的时候,既可以是写入主分区,也可以是写入了主分区之后再写入一部分从分区。这方面 Kafka 做得比较灵活,它让生产者来决定写入语义。这个控制参数叫做 acks,它的取值有三个

  • 0:就是所谓的 “fire and forget”,意思就是发送之后就不管了,也就是说 broker 是否收到,收到之后是否持久化,是否进行了主从同步,全都不管。在这里插入图片描述
  • 1:当主分区写入成功的时候,就认为已经发送成功了。在这里插入图片描述
  • all:不仅写入了主分区,还同步给了所有 ISR 成员。在这里插入图片描述

ISR

ISR(In-Sync Replicas):和主分区保持了主从同步的所有从分区。比如说,一个主分区本身有 3 个从分区,但是因为网络之类的问题,导致其中一个从分区和主分区失去了联系,没办法同步数据,那么对于这个主分区来说,它的 ISR 就是剩下的 2 个分区。

在这里插入图片描述

Kafka 对 ISR 里面的分区数量有没有限制呢?比如说,我有一个主分区,有 11 个从分区,那我的 ISR 可以只有一个从分区吗?

是可以的,我们能够通过min.insync.replicas这个参数来配置。比如说当你设置 min.insync.replicas = 2 的时候,就意味着 ISR 里面至少要有两个从分区。如果分区数量不足,那么生产者在配置 acks = all 的时候,发送消息会失败。与它对应的一个概念是 OSR,也就是不在 ISR 里面的分区集合。

消息丢失

生产者发送

ack = 0时,broker 可能根本没收到,或者收到了但是处理新消息的时候遇到 Bug 了

在这里插入图片描述

主从同步

ack=1时,只要求写入主分区就可以。在写入主分区之后,主分区所在 broker 立刻就崩溃了。这个时候发起新的主分区选举,不管是哪个从分区被选上,它都缺了这条消息

在这里插入图片描述

unclean 选举

ack=all时,在允许 unclean 选举的情况下,如果 ISR 里面没有任何分区,那么 Kafka 就会选择第一个从分区来作为主分区。unclean 选出来的新的主分区也可能少了部分数据。

在这里插入图片描述

刷盘

acks=1:主分区返回写入成功的消息,但是这个时候消息可能还在操作系统的 page cache 里面

acks=all:主分区返回写入成功的消息,不管是主分区还是 ISR 中的从分区,这条消息都可能还在 page cache 里面

在这里插入图片描述

在 Kafka 中,控制刷盘的参数有三个:

  • log.flush.interval.messages:控制消息到多少条就要强制刷新磁盘。Kafka 会在写入 page cache 的时候顺便检测一下。【定量刷】
  • log.flush.interval.ms:间隔多少毫秒就刷新数据到磁盘上。【定时刷】
  • log.flush.scheduler.interval.ms:间隔多少毫秒,就检测数据是否需要刷新到磁盘上。【定时刷】

log.flush.interval.messages 和 log.flush.interval.ms 都设置了的话,那么就是它们俩之间任何一个条件满足了,都会刷新磁盘。

技巧:Kafka 要么是 定量刷,要么是 定时刷

消费者提交

消费者提交:消费者提交了偏移量,但是最终却没有消费的情况

比如说线程池形态的异步消费,消费者线程拿到消息就直接提交,然后再转交给工作线程。在转交之前,或者工作线程正在处理的时候,消费者都有可能宕机。于是一个消息本来并没有被消费,但是却被提交了,这也叫做消息丢失。

简历优化

素材准备:

  • 你或者你的同事有没有因为消息丢失而出现线上故障?如果出现过,那么是因为什么原因引起的故障?后来又是怎么解决的?或者说你用什么措施来防止再次出现这种问题?
  • 核心业务的 topic 的分区数量是多少每一个主分区有多少个从分区
  • 你的业务发送消息的时候是不是异步发送acks 的设置是多少
  • 你使用的 Kafka 里面那三个刷盘参数的值分别是多少?如果和默认值不一样,为什么修改
  • 你有没有遇到一些场景是必须要发送消息成功的?在这些场景下是如何保证业务方一定把消息发送出来的?

关键点:消息是不能丢失的。也就是说,发送者在完成业务之后,一定要把消息发送出去,而消费者也一定要消费这个消息。所以,需要在发送方、消息队列本身以及消费者三个方面,都做了很多事情来保证消息绝对不丢失。

基本思路

解决方案:确保业务操作和发送消息要么都不执行,要么都执行。其实就是分布式事务问题,大体方案有两种:本地消息表和消息回查

本地消息表

在发送方,我采用的是本地消息表解决方案。简单来说,就是在业务操作的过程中,在本地消息表里面记录一条待发消息,做成一个本地数据库事务。然后尝试立刻发送消息,如果发送成功,那么就把本地消息表里对应的数据删除,或者把状态标记成已发送

如果这个时候失败了,就可以立刻尝试重试。同时,还要有一个异步的补发机制,扫描本地消息表,找出已经过了一段时间,比如说三分钟,但是还没有发送成功的待发消息,然后补发

异步补发机制:有一个线程定时扫描数据库,找到需要发送但是又没有发送的消息发送出去。类似这样一个SQL执行效果:

SELECT * FROM msg_tab WHERE create_time < now() - 3min AND status = '未发送'
#找出三分钟前还没发送出去的消息,然后补发。

在这里插入图片描述

  1. 如果已经提交事务了,那么即便服务器立刻宕机了也没关系。因为我们的异步补发机制会找出这条消息,进行补发。

  2. 如果消息发送成功了,但是还没把数据库里的消息状态更新成已发送,也没关系,异步补发机制还是会找出这条消息,再发一次。也就是说,在这种情况下会发送两次。

  3. 如果在重试的过程中,重发成功了但是还没把消息状态更新成已发送,和第 2 点一样,也是依赖于异步补发机制。

在本地消息表里面额外增加了一个新的列用来控制重试的间隔和重试的次数。如果最终补发都失败了就会告警。这个时候就需要人手工介入了。

本地消息表至少有两个关键列:一个是消息体列,里面存储了消息的数据;另一个是重试机制列,里面可以只存储重试次数,也可以存储重试间隔、已重试次数、最大重试次数

总结:这种解决方案其实就是把一个分布式事务转变成本地事务 + 补偿机制。在这里案例里面,我们的分布式事务是要求执行业务操作并且发消息,那么就转化成一个本地事务,这个本地事务包含了业务操作,以及下一步做什么。然后,补偿机制会查看本地事务提交的数据,找出需要执行但是又没有执行成功的下一步,执行。这里的下一步,就是发送消息。

消息队列不丢失

在关键业务上,我一般都是把 acks 设置成 all 并且禁用 unclean 选举,来确保消息发送到了消息队列上,而且不会丢失。同时 log.flush.interval.messages、log.flush.interval.ms 和 log.flush.scheduler.interval.ms 三个参数会影响刷盘,这三个值我们公司设置的是 10000、2000、3000

理论上来说,这种配置确实还是有一点消息丢失的可能,但是概率已经非常低了。只有一种可能,就是消息队列完成主从同步之后,主分区和 ISR 的从分区都没来得及刷盘就崩溃了,才会丢失消息。这个时候真丢失了消息,就只能人手工补发了

早期我们就遇到过一个消息丢失的案例。我们有一个业务在发送消息的时候把 acks 设置成了 0,后来有一天消息队列真的崩了,等再恢复过来的时候,已经找不到消息了。而这个业务其实是一个很核心的业务,所以我们把 acks 的设置改成了 all。之后消息队列再崩溃,也确实没再出现过丢失的问题。

消费者一定消费

要确保消费者肯定消费消息,大多数时候都不需要额外做什么,但是如果业务上有使用异步消费机制就要小心一些。比如说我的 A 业务里面就采用了异步消费来提高消费速率,我利用批量消费、批量提交来保证异步消费的同时,也不会出现未消费的问题。

在 Kafka 上支持消息回查机制

消息回查机制:消息队列允许你在发送消息的时候,先发一个准备请求,里面带着你的消息。这个时候消息队列并不会把消息转交给消费者,而是当业务完成之后,你需要再发一个确认请求,这时候消息中间件才会把消息转交给消费者

在这里插入图片描述

如果我的业务成功了,但是我的确认请求没发怎么办?所以这时候就有了消息回查,也就是消息在没收到确认请求的时候,会反过来问一下你,这个消息要不要交给消费者

在这里插入图片描述

消息回查机制依赖于消息队列的支持。RocketMQ 是支持的,但是不幸的是 Kafka 和 RabbitMQ 都不支持

我们公司用的是 Kafka,它并不支持消息回查机制,所以我在公司里面设计过一个系统来支持回查功能

它的基本步骤是这样的:

  1. 应用代码把准备消息发送到 topic=look_back_msg 上。里面包含业务 topic、消息体、业务类型、业务 ID、准备状态、回查接口
  2. 回查中间件消费这个 look_back_msg,把消息内容存储到数据库里
  3. 应用代码执行完业务操作之后,再发送一个消息到 look_back_msg 上,带上业务类型、业务 ID 和提交状态这些信息。如果应用代码执行业务出错了,那么就使用回滚状态。
  4. 回查中间件查询消息内容,转发到业务 topic 上

在这里插入图片描述

回查实现

回查中间件会异步地扫描长时间未提交的消息,然后回查业务方

在这里插入图片描述

如果业务方最终没有发送提交消息,那么回查中间件会找出这些长时间没提交的消息,执行回查。回查中间件执行回查的关键点是利用准备消息中带着的回查接口配置来发起调用。我设计了一个通用的机制,支持 HTTP 调用或者 RPC 调用

对于 HTTP 调用来说,业务方需要提供回查 URL。而对于 RPC 调用来说,业务方需要实现我提供的回查接口,然后提供对应的服务名。我在回查的时候会带上业务类型和业务 ID业务方需要告诉我这个消息能不能提交,也就是要不要发给业务 topic

举个例子:如果是回查一个订单业务的 HTTP 接口,那么业务方需要告诉我回查 URL,那么回查中间件发出的请求就类似于下面这样。

method: POST
URL: https://abc.com:8080/order/lookback
Body: {"biz_type": "order","biz_id": "oid-123"
}

业务方返回的响应:

Body: {"biz_type": "order","biz_id": "oid-123","status": "提交" //如果业务没成功,那么可以是回滚
}

而如果是 RPC 接口,回查中间件就可以定义一个接口,要求所有的业务方都实现这个接口

type MsgLookBack interface{LookBack(bizType string, bizID int) Status
}

然后业务方需要提供服务名字,比如说 abc.com.order.msg_look_back,回查中间件会利用 RPC 的泛化调用功能,发起调用。如果你不理解泛化调用,那么你可以说你暂时只支持了 HTTP 回查接口,但是将来可以轻易扩展到 RPC 调用上。

数据存储

为了保证我这个回查机制的高性能和高可靠,我使用了分区表。我按照时间进行分区,并且历史分区是可以快速归档的,毕竟这个回查机制使用的数据库只是临时存储一下消息数据而已。当然,后续随着业务扩展,我觉得这个地方是可以考虑使用分库分表的,比如说按照业务 topic 来分库分表。

有序消息

这个方案是要求准备消息和提交消息是有序的,也就是说,同一个业务的准备消息一定要先于提交消息。解决方案也很简单,在发送的时候要求业务方按照业务 ID 计算一个哈希值,然后除以分区数量的余数,就是目标分区


往期精彩专栏内容,欢迎订阅:

🔗【八股消消乐】20250627:消息队列优化—消息积压
🔗【八股消消乐】20250625:消息队列优化—消息有序
🔗【八股消消乐】20250624:消息队列优化—延迟消息
🔗【八股消消乐】20250623:消息队列优化—系统架构设计
🔗【八股消消乐】20250622:Elasticsearch查询优化
🔗【八股消消乐】20250620:Elasticsearch优化—检索Labubu
🔗【八股消消乐】20250619:构建微服务架构体系—保证服务高可用
🔗【八股消消乐】20250615:构建微服务架构体系—链路超时控制
🔗【八股消消乐】20250614:构建微服务架构体系—实现制作库与线上库分离
🔗【八股消消乐】20250612:构建微服务架构体系—限流算法优化
🔗【八股消消乐】20250611:构建微服务架构体系—降级策略全总结
🔗【八股消消乐】20250610:构建微服务架构体系—熔断恢复抖动优化
🔗【八股消消乐】20250609:构建微服务架构体系—负载均衡算法如何优化
🔗【八股消消乐】20250608:构建微服务架构体系—服务注册与发现
🔗【八股消消乐】20250607:MySQL存储引擎InnoDB知识点汇总
🔗【八股消消乐】20250606:MySQL参数优化大汇总
🔗【八股消消乐】20250605:端午节产生的消费数据,如何分表分库?
🔗【八股消消乐】20250604:如何解决SQL线上死锁事故
🔗【八股消消乐】20250603:索引失效与优化方法总结
🔗【八股消消乐】20250512:慢SQL优化手段总结
🔗【八股消消乐】20250511:项目中如何排查内存持续上升问题
🔗【八股消消乐】20250510:项目中如何优化JVM内存分配?
🔗【八股消消乐】20250509:你在项目中如何优化垃圾回收机制?
🔗【八股消消乐】20250508:Java编译优化技术在项目中的应用
🔗【八股消消乐】20250507:你了解JVM内存模型吗?
🔗【八股消消乐】20250506:你是如何设置线程池大小?
🔗【八股消消乐】20250430:十分钟带背Duubo中大厂经典面试题
🔗【八股消消乐】20250429:你是如何在项目场景中选取最优并发容器?
🔗【八股消消乐】20250428:你是项目中如何优化多线程上下文切换?
🔗【八股消消乐】20250427:发送请求有遇到服务不可用吗?如何解决?

📌 [ 笔者 ]   文艺倾年
📃 [ 更新 ]   2025.6.29
❌ [ 勘误 ]   /* 暂无 */
📜 [ 声明 ]   由于作者水平有限,本文有错误和不准确之处在所难免,本人也很想知道这些错误,恳望读者批评指正!

在这里插入图片描述

http://www.lqws.cn/news/564427.html

相关文章:

  • python pyecharts 数据分析及可视化(2)
  • 基于Pandas和FineBI的昆明职位数据分析与可视化实现(三)- 职位数据统计分析
  • MAC 地址在 TCP 网络中的全面解析:从基础概念到高级应用
  • 【Redis原理】Redis事务与线程模型
  • StarRocks 3.5 新特性解读:Snapshot 快照恢复、大导入性能全面升级、分区管理更智能
  • opensuse/debian grub启动界面太模糊?
  • Wpf布局之WrapPanel面板!
  • 3.1.1、CAN总线单个设备环回测试
  • Git常见使用
  • WPF学习笔记(11)数据模板DataTemplate与数据模板选择器DataTemplateSelector
  • Mybatis学习总结
  • 鸿蒙5:自定义构建函数
  • 力扣第84题-柱状图中最大的矩形
  • Leetcode 3600. Maximize Spanning Tree Stability with Upgrades
  • Docker安装的gitlab配置ssl证书
  • 协作机器人优化自动化工作流程,提升工作效率
  • vue3报错No known conditions for “./lib/locale/lang/zh-cn”
  • HTML响应式Web设计
  • 链表题解——环形链表 II【LeetCode】
  • RK3588集群服务器性能优化案例:电网巡检集群、云手机集群、工业质检集群
  • Qwen2.5-7B-Instruct模型推理速度与量化对比分析
  • 【数据集】中国2016-2022年 城市土地利用数据集 CULU
  • 4_Flink CEP
  • 现代 JavaScript (ES6+) 入门到实战(四):数组的革命 map/filter/reduce - 告别 for 循环
  • Vue3 根据路由配置实现动态菜单
  • git常见问题汇总-重复提交/删除已提交文件等问题
  • RabbitMQ 工作模式
  • 海量数据存储与分析:HBase、ClickHouse、Doris三款数据库对比
  • 用celery作为信息中间件
  • AlpineLinux安装部署MariaDB