【Kafka使用方式以及原理】
Kafka生产者发送消息的方式
Kafka生产者发送消息主要通过以下三种方式:
同步发送
生产者发送消息后,会阻塞等待Broker的响应,确认消息是否成功写入。这种方式可靠性高,但吞吐量较低。代码示例:
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
RecordMetadata metadata = producer.send(record).get();
异步发送
生产者发送消息后立即返回,通过回调函数处理Broker的响应。这种方式吞吐量高,但需要自行处理失败情况。代码示例:
producer.send(record, (metadata, exception) -> {if (exception != null) {// 处理失败逻辑}
});
异步发送(无回调)
生产者直接发送消息而不关心结果,适用于对可靠性要求不高的场景。吞吐量最高,但可能丢失消息。代码示例:
producer.send(record);
Kafka生产者发送消息的特点
分区策略
生产者可以通过指定分区键(Key)控制消息写入的分区。若未指定Key,则采用轮询策略分配分区。支持自定义分区器(Partitioner)。
消息确认机制(acks)
acks=0
:生产者不等待Broker确认,消息可能丢失。acks=1
:Leader副本写入成功后即返回响应。acks=all/-1
:需所有ISR副本写入成功,可靠性最高。
批量发送(Batch)
生产者会将多条消息合并为一个批次发送,减少网络开销。通过linger.ms
和batch.size
参数控制批处理行为。
消息重试
网络异常或Leader切换时,生产者会自动重试发送消息。可通过retries
和retry.backoff.ms
参数配置重试策略。
幂等性与事务
- 幂等性:通过启用
enable.idempotence=true
避免消息重复发送。 - 事务:支持跨分区原子性写入,需配置
transactional.id
。
缓冲区机制
生产者维护一个内存缓冲区(buffer.memory
),暂存待发送消息。缓冲区满时,发送调用会被阻塞或抛出异常。
Kafka消费者的基本工作流程
Kafka消费者通过订阅主题(Topic)或特定分区(Partition)来消费消息。消费者组(Consumer Group)机制允许并行处理消息,每个消费者组内的消费者独立消费不同分区的数据。
消费者启动时会向Kafka集群发送元数据请求,获取订阅主题的分区信息。消费者与分区建立连接后,通过轮询(Poll)机制从分区拉取消息。消费者会定期提交偏移量(Offset)到Kafka,记录消费进度。
消费者配置关键参数
bootstrap.servers
: Kafka集群的地址列表。group.id
: 消费者所属的组名称。auto.offset.reset
: 当无初始偏移量时如何处理(earliest
或latest
)。enable.auto.commit
: 是否自动提交偏移量(默认true
)。max.poll.records
: 单次Poll返回的最大消息数。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
消息消费模式
订阅主题模式
消费者订阅一个或多个主题,Kafka自动分配分区:
consumer.subscribe(Arrays.asList("topic1", "topic2"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}
}
手动分配分区模式
直接指定消费的分区,绕过消费者组协调:
TopicPartition partition = new TopicPartition("topic1", 0);
consumer.assign(Arrays.asList(partition));
偏移量管理
自动提交
配置enable.auto.commit=true
,Kafka定期提交偏移量(默认5秒一次)。
手动同步提交
精确控制提交时机,确保消息处理完成后再提交:
consumer.commitSync();
手动异步提交
非阻塞式提交,需处理回调:
consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.err.println("Commit failed: " + offsets);}
});
消费者再平衡(Rebalance)
当消费者组内成员变化(如新增或下线消费者)时,Kafka触发再平衡,重新分配分区。可通过ConsumerRebalanceListener
接口实现自定义逻辑:
consumer.subscribe(Arrays.asList("topic1"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 分区被回收前的处理(如提交偏移量)}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// 新分区分配后的处理(如恢复状态)}
});
处理消费延迟与积压
- 调整
max.poll.records
减少单次Poll的数据量。 - 优化消息处理逻辑,避免阻塞Poll线程。
- 增加消费者实例数量(不超过分区数)。
- 监控消费者延迟指标(如
consumer_lag
)。