当前位置: 首页 > news >正文

【Kafka使用方式以及原理】

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka生产者发送消息的方式

Kafka生产者发送消息主要通过以下三种方式:

同步发送
生产者发送消息后,会阻塞等待Broker的响应,确认消息是否成功写入。这种方式可靠性高,但吞吐量较低。代码示例:

ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
RecordMetadata metadata = producer.send(record).get();

异步发送
生产者发送消息后立即返回,通过回调函数处理Broker的响应。这种方式吞吐量高,但需要自行处理失败情况。代码示例:

producer.send(record, (metadata, exception) -> {if (exception != null) {// 处理失败逻辑}
});

异步发送(无回调)
生产者直接发送消息而不关心结果,适用于对可靠性要求不高的场景。吞吐量最高,但可能丢失消息。代码示例:

producer.send(record);

Kafka生产者发送消息的特点

分区策略
生产者可以通过指定分区键(Key)控制消息写入的分区。若未指定Key,则采用轮询策略分配分区。支持自定义分区器(Partitioner)。

消息确认机制(acks)

  • acks=0:生产者不等待Broker确认,消息可能丢失。
  • acks=1:Leader副本写入成功后即返回响应。
  • acks=all/-1:需所有ISR副本写入成功,可靠性最高。

批量发送(Batch)
生产者会将多条消息合并为一个批次发送,减少网络开销。通过linger.msbatch.size参数控制批处理行为。

消息重试
网络异常或Leader切换时,生产者会自动重试发送消息。可通过retriesretry.backoff.ms参数配置重试策略。

幂等性与事务

  • 幂等性:通过启用enable.idempotence=true避免消息重复发送。
  • 事务:支持跨分区原子性写入,需配置transactional.id

缓冲区机制
生产者维护一个内存缓冲区(buffer.memory),暂存待发送消息。缓冲区满时,发送调用会被阻塞或抛出异常。

Kafka消费者的基本工作流程

Kafka消费者通过订阅主题(Topic)或特定分区(Partition)来消费消息。消费者组(Consumer Group)机制允许并行处理消息,每个消费者组内的消费者独立消费不同分区的数据。

消费者启动时会向Kafka集群发送元数据请求,获取订阅主题的分区信息。消费者与分区建立连接后,通过轮询(Poll)机制从分区拉取消息。消费者会定期提交偏移量(Offset)到Kafka,记录消费进度。

消费者配置关键参数

  • bootstrap.servers: Kafka集群的地址列表。
  • group.id: 消费者所属的组名称。
  • auto.offset.reset: 当无初始偏移量时如何处理(earliestlatest)。
  • enable.auto.commit: 是否自动提交偏移量(默认true)。
  • max.poll.records: 单次Poll返回的最大消息数。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

消息消费模式

订阅主题模式
消费者订阅一个或多个主题,Kafka自动分配分区:

consumer.subscribe(Arrays.asList("topic1", "topic2"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}
}

手动分配分区模式
直接指定消费的分区,绕过消费者组协调:

TopicPartition partition = new TopicPartition("topic1", 0);
consumer.assign(Arrays.asList(partition));

偏移量管理

自动提交
配置enable.auto.commit=true,Kafka定期提交偏移量(默认5秒一次)。

手动同步提交
精确控制提交时机,确保消息处理完成后再提交:

consumer.commitSync();

手动异步提交
非阻塞式提交,需处理回调:

consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.err.println("Commit failed: " + offsets);}
});

消费者再平衡(Rebalance)

当消费者组内成员变化(如新增或下线消费者)时,Kafka触发再平衡,重新分配分区。可通过ConsumerRebalanceListener接口实现自定义逻辑:

consumer.subscribe(Arrays.asList("topic1"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 分区被回收前的处理(如提交偏移量)}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// 新分区分配后的处理(如恢复状态)}
});

处理消费延迟与积压

  • 调整max.poll.records减少单次Poll的数据量。
  • 优化消息处理逻辑,避免阻塞Poll线程。
  • 增加消费者实例数量(不超过分区数)。
  • 监控消费者延迟指标(如consumer_lag)。
http://www.lqws.cn/news/584767.html

相关文章:

  • 安卓获取图片(相机拍摄/相册选择)
  • Android Telephony 网络状态中的 NAS 信息
  • window显示驱动开发—在注册表中设置 DXGI 信息
  • 【C语言】知识总结·内存函数
  • 三态门Multisim电路仿真——硬件工程师笔记
  • 优雅草蜻蜓HR人才招聘系统v2.0.9上线概要 -优雅草新产品上线
  • Amazon Athena:无服务器交互式查询服务的终极解决方案
  • 33. 搜索旋转排序数组
  • pytorch底层原理学习--PyTorch 架构梳理
  • FreePDFv3.0.0:颠覆你的文献阅读习惯
  • 16014.rtsp推流服务器
  • C++ 第四阶段 STL 容器 - 第五讲:详解 std::set 与 std::unordered_set
  • TDH社区开发版安装教程
  • [学习]M-QAM的数学原理与调制解调原理详解(仿真示例)
  • [面试]手写题-Promise.all() Promise.race()
  • 机器学习20-线性网络思考
  • 第三十六章 CAN——控制器局域网络接口
  • 字节跳动 C++ QT PC客户端面试
  • 论文中用matplotlib画的图,如何保持大小一致。
  • Vue2中使用DHTMLX Gantt
  • 深入理解Webpack的灵魂:Tapable插件架构解析
  • 使用Dirichlet分布进行随机初始化
  • 文心大模型 4.5 系列开源首发:技术深度解析与应用指南
  • StackGAN(堆叠生成对抗网络)
  • vscode 改注释的颜色,默认是灰色的,想改成红色
  • Prompt Enginering
  • 会议室预约系统的典型架构
  • Prompt 精通之路(一)- AI 时代的新语言:到底什么是 Prompt?为什么它如此重要?
  • Python 数据分析与机器学习入门 (五):Matplotlib 数据可视化基础
  • ubuntu源码安装python3.13遇到Could not build the ssl module!解决方法