Kafka4.0初体验
一、What is Kafka?
官网地址:https://kafka.apache.org/
- 超过80%的财富100强公司信任并使用Kafka;
- Apache Kafka 是一个开源的
分布式事件流处理平台
,由 LinkedIn 开发并后来捐赠给 Apache 软件基金会。它被设计用于高效处理实时数据流,支持高吞吐量、低延迟的消息传递,广泛应用于大规模数据处理场景。
核心功能与特点
- 分布式与高可用
Kafka 采用分布式架构,支持横向扩展,通过分区(partition)和副本(replica)机制确保数据的高可靠性和容错能力。 - 高吞吐量与低延迟
即使在海量数据场景下,Kafka 也能保持毫秒级的消息传递速度,每秒可处理数百万条消息。 - 事件流处理(Event Streaming)
支持实时发布(produce)、订阅(consume)、存储(store)和处理(process)数据流,适用于日志聚合、用户活动跟踪、IoT 设备数据等场景。 - 持久化存储
消息可持久化到磁盘,并支持按时间或大小保留策略,避免数据丢失。 - 多语言支持
提供 Java、Python、Go 等多种客户端 API,易于集成到不同技术栈。
为什么 80% 的财富 100 强企业使用 Kafka?
- 成熟稳定:经过 LinkedIn、Netflix、Uber 等超大规模验证。
- 生态丰富:与 Confluent(Kafka 商业化公司)合作提供企业级支持,集成 Connector(如数据库、云服务)。
- 云原生友好:支持 Kubernetes,兼容 AWS MSK、Confluent Cloud 等托管服务。
简单来说,Kafka 是现代数据架构的“中枢神经系统”,帮助企业在实时数据时代高效流动和利用信息。
谁在使用Kafka?
结论:Kafka 在头部企业中的普及率极高,尤其是在制造业、保险、IT服务等领域已达到全覆盖(10/10)。金融(银行、保险)、电信、零售等数据密集型行业也广泛采用,但存在少数企业未使用(如银行业7/10)
Kafka的起源
- 起源背景(2000年代末的 LinkedIn)
- 问题驱动:
LinkedIn 作为一个快速增长的职业社交平台,面临以下技术痛点:数据管道复杂
:系统间依赖大量点对点(ad-hoc)的数据集成,导致维护成本高、可靠性差。实时性不足
:传统消息队列(如 ActiveMQ)无法支撑高吞吐、低延迟的日志流和用户活动跟踪。扩展性瓶颈
:现有系统难以水平扩展,无法应对每天数十亿条消息的规模。
- 目标:
设计一个统一、高吞吐、低延迟的实时数据管道,将系统间的数据流标准化
- 核心设计者与早期开发
- 创始团队:
- Jay Kreps(LinkedIn 首席工程师):主导 Kafka 的设计,命名灵感来自作家卡夫卡(Franz Kafka),认为它“适合分布式系统”。
- Neha Narkhede 和 Jun Rao:共同开发了 Kafka 的核心架构。
- 初版发布:
- 2010 年,Kafka 在 LinkedIn 内部上线,用于日志聚合、用户行为跟踪等场景。
- 2011 年,LinkedIn 将 Kafka 开源,并提交给 Apache 软件基金会 孵化。
大师门取名字也是根据自己的喜好来取名,在我们看来有可能感觉很随意!
-
技术灵感和创新
Kafka 的设计融合了多种分布式系统的思想:-
日志结构存储:借鉴了事务日志(如数据库 WAL)和日志合并(Log-Structured Merge Trees)的理念。
-
发布-订阅模型:参考了传统消息队列(如 JMS),但优化了吞吐量和持久化。
-
分布式共识:早期依赖 ZooKeeper 管理元数据,后来逐步自研(KIP-500 去 ZooKeeper 化)。
-
关键创新:
-
分区(Partition)和副本(Replica)机制:实现水平扩展和高可用性。
-
顺序磁盘 I/O + 零拷贝:突破性能瓶颈,支持海量数据实时处理。
- 开源与生态爆发
- 2012 年:成为 Apache 顶级项目。
- 2014 年:核心团队创立 Confluent 公司,提供 Kafka 的商业化支持(如托管服务、企业版工具)。
- 2017 年后:
- Kafka 被广泛用于流处理(结合 Kafka Streams、Flink 等)。
- 云厂商推出托管服务(如 AWS MSK、Confluent Cloud)。
- 为什么 Kafka 能成功?
- 精准定位:填补了传统消息队列与大数据批处理之间的空白(实时流数据)。
- 简单而强大的设计:以“日志”为核心抽象,兼顾可靠性和性能。
- 社区驱动:开源生态迅速扩展,支持多语言客户端和丰富连接器(Connectors)。
Kafka运行环境前置要求
Kafka是由Scala语言(占80%,20%Java)编写而成,Scala运行在Java虚拟机上,并兼容现有的Java程序,因此部署Kakfa的时候,需要先安装JDK环境;
注意:kafka_2.13-4.0.0 最新版需要JDK17+的环境
Kafka源码: https://github.com/apache/kafka
Scala官网:https://www.scala-lang.org/
本地环境必须安装了Java 17+;(Java17、Java21、Java22都可以);
JDK长期支持版:https://www.oracle.com/java/technologies/java-se-support-roadmap.html
Kafka版本迭代演进
-
Kafka前期项目版本似乎有点凌乱,Kafka在1.x之前的版本,是采用4位版本号;
-
比如:0.8.2.2、0.9.0.1、0.10.0.0…等等;
-
在1.x之后,kafka 采用 Major.Minor.Patch 三位版本号;
- Major表示大版本,通常是一些重大改变,因此彼此之间功能可能会不兼容;
- Minor表示小版本,通常是一些新功能的增加;
- Patch表示修订版,主要为修复一些重点Bug而发布的版本;
-
比如:Kafka 2.1.3,大版本就是2,小版本是1,Patch版本为3,是为修复Bug发布的第3个版本;
-
Kafka总共发布了8个大版本,分别是0.7.x、0.8.x、0.9.x、0.10.x、0.11.x、1.x、2.x 及 3.x 版本,截止目前,最新版本是Kafka 4.0.0,也是最新稳定版本;
Kafka运行环境JDK安装
1、下载JDK:https://www.oracle.com/java/technologies/downloads/#java17
2、解压缩:tar -zxvf jdk-17_linux-x64_bin.tar.gz -C /usr/local
3、配置JDK环境变量:
export JAVA_HOME=/usr/local/jdk-17.0.7
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/
开始体验
第一步、下载最新的kafka版本并且解压
Linux
tar -xzf kafka_2.13-4.0.0.tgz
cd kafka_2.13-4.0.0
Windows
# 解压 .tgz 文件
tar -xzf kafka_2.13-4.0.0.tgz# 进入解压后的目录
cd kafka_2.13-4.0.0
第二步、启动Kafka的运行环境
1、首先生成一个Cluster UUID
- Linux
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
- Windows
使用 PowerShell 命令替换
$KAFKA_CLUSTER_ID=$(.\bin\windows\kafka-storage.bat random-uuid)
2、格式化日志目录
- Linux
$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
- Windows
在powershell中
# 格式化存储目录(使用 .bat 文件)
.\bin\windows\kafka-storage.bat format --standalone --cluster-id $KAFKA_CLUSTER_ID --config .\config\server.properties
注:$KAFKA_CLUSTER_ID需要用上面生成的值替换
如果不进行日志格式化启动会提示
[2025-06-29 17:17:48,236] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2025-06-29 17:17:48,766] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.RuntimeException: No readable meta.properties files found.at org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.verify(MetaPropertiesEnsemble.java:480) ~[kafka-metadata-4.0.0.jar:?]at kafka.server.KafkaRaftServer$.initializeLogDirs(KafkaRaftServer.scala:141) ~[kafka_2.13-4.0.0.jar:?]at kafka.server.KafkaRaftServer.<init>(KafkaRaftServer.scala:56) ~[kafka_2.13-4.0.0.jar:?]at kafka.Kafka$.buildServer(Kafka.scala:68) ~[kafka_2.13-4.0.0.jar:?]at kafka.Kafka$.main(Kafka.scala:75) [kafka_2.13-4.0.0.jar:?]at kafka.Kafka.main(Kafka.scala) [kafka_2.13-4.0.0.jar:?]
3、启动 Kafka Server
- Linux
$ bin/kafka-server-start.sh config/server.properties
- Windows (PowserShell工具)
.\bin\windows\kafka-server-start.bat .\config\server.properties
4、创建一个topic来存储你的事件
Kafka 是一个分布式事件流处理平台,它允许您在多台机器上读取、写入、存储和处理事件(在文档中也被称为记录或消息)
- Linux
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
- Windows(PowserShell工具)
# 使用 .bat 文件(注意路径是 \windows\)
.\bin\windows\kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092
显示新主题的详细信息(比如分区数量)
- Linux
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- Windows(PowserShell工具)
.\bin\windows\kafka-topics.bat --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: RZC2UrhoSPKnq7twM5cimg PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:
PS D:\kafka_2.13-4.0.0>
5、写入消息到主题
- Linux
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event
- Windows(PowserShell工具)
D:\kafka_2.13-4.0.0> .\bin\windows\kafka-console-producer.bat --topic quickstart-events --bootstrap-server localhost:9092
>This my first event
>this is my second event
6、读取事件消息
- Linux
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
- Windows(PowserShell工具)
PS D:\kafka_2.13-4.0.0> .\bin\windows\kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This my first event
this is my second event
7、使用 Kafka Connect 将你的数据作为事件流导入/导出
补充说明
- Kafka Connect
Kafka 官方提供的工具,专门用于在 Kafka 和其他系统(如数据库、云服务等)之间高效传输数据。 - 事件流(streams of events)
指数据以连续的、实时的事件流形式处理,而非传统的批量传输。 - 典型场景
- 从数据库导出变更到 Kafka(如 Debezium 实现 CDC)
- 将 Kafka 数据导入到 Elasticsearch/S3/HDFS 等
7.1 修改config/connect-standalone.properties
修改或者添加plugin.path
配置
- Linux
$ echo "plugin.path=libs/connect-file-4.0.0.jar" >> config/connect-standalone.properties
7.2 准备数据
- Linux
$ echo -e "foo\nbar" > test.txt
- Windows(PowserShell工具)
$ echo foo > test.txt
$ echo bar >> test.txt
7.3 执行如下命令
- Linux
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
- Windows(PowserShell工具)
D:\kafka_2.13-4.0.0> .\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-file-source.properties .\config\connect-file-sink.properties
在客户端查看
- Linux
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
- Windows
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic connect-test --from-beginning --property consumer.encoding=UTF-8
使用Docker启动运行Kafka
使用Docker镜像启动
1、拉取Kafka镜像:docker pull apache/kafka:4.0.0
2、启动Kafka容器:docker run -p 9092:9092 apache/kafka:4.0.0
查看已安装的镜像:docker images
删除镜像:docker rmi apache/kafka:4.0.0
总结