当前位置: 首页 > news >正文

使用pyflink进行kafka实时数据消费

目录

背景

代码demo

 踩坑记录

1、kafka连接器,kafka客户端jar包找不到

2、java模块系统访问限制

3、执行demo任务,一直报错连接kafka topic超时

总结 


背景

        实际项目中经常遇到source是kafka,需要实时消费kafka某个topic中的数据,在此写个demo学习了解。

        环境:本地windows10,python版本3.8.5,pyflink版本1.14.2,kafka版本2.12_2.0.1

代码demo

        代码很简单,在本地启动一个flink任务,消费对端一个kafka的topic,从中取出数据并进行打印,代码如下:

import os
os.environ["_JAVA_OPTIONS"] = "--add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema# 设置执行环境
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)  # 设置并行度
env.add_jars("file:///F:/learn/flinkdemo/flink-connector-kafka_2.12-1.14.2.jar","file:///F:/learn/flinkdemo/kafka-clients-3.6.1.jar")# Kafka 消费者配置
kafka_topic = 'zhubao'
kafka_bootstrap_servers = 'xxx:9092'  # Kafka 服务器地址和端口
consumer = FlinkKafkaConsumer(kafka_topic, SimpleStringSchema(), properties={'bootstrap.servers': kafka_bootstrap_servers,'group.id': 'zhubao-group','auto.offset.reset': 'earliest','session.timeout.ms': '120000','request.timeout.ms': '120000','max.poll.interval.ms': '600000'
})# 将 Kafka 数据源添加到执行环境中
data_stream = env.add_source(consumer)# 数据处理逻辑,例如打印数据
data_stream.print()# 启动执行环境
env.execute("Flink Kafka Consumer Example")

使用kafka客户端生产一些数据,在本地则能实时消费到数据 

在本地终端执行上述flink demo任务,看到相关消费结果 

 踩坑记录

1、kafka连接器,kafka客户端jar包找不到

        上述demo代码中,开头需要在程序运行环境中引入两个jar包,分别是kafka连接器和kafka客户端

env.add_jars("file:///F:/learn/flinkdemo/flink-connector-kafka_2.12-1.14.2.jar","file:///F:/learn/flinkdemo/kafka-clients-3.6.1.jar")

这两个包默认在本地是没有的,需要进行额外的下载,下载地址:

Flink Kafka连接器‌:

  • 下载地址:https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-kafka_2.12/

Kafka客户端‌:

  • 下载地址:https://repo.maven.apache.org/maven2/org/apache/kafka/kafka-clients/

注意选择对应的版本,我的本地环境是pyflink1.14.2,因此下载的jiar包是对应版本,客户端是3.6.1版本

此外,在add_jars上,需要使用file:///方式,并且最好是绝对路径,否则会报jar包找不到等类似错误

2、java模块系统访问限制

py4j.protocol.Py4JJavaError: An error occurred while calling o10.addSource.
: java.lang.reflect.InaccessibleObjectException: Unable to make field private static final long java.util.Properties.serialVersionUID accessible: module java.base does not "opens java.util" to unnamed module @32cf48b7

        该报错是因为在windows环境Java模块系统(JPMS)的访问限制导致,需通过JVM参数解除模块封装限制。

        解决方法:在程序启动开头,增加JVM启动参数,注意格式,如下:

import os
os.environ["_JAVA_OPTIONS"] = "--add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED"

3、执行demo任务,一直报错连接kafka topic超时

Traceback (most recent call last):File ".\kafkademo.py", line 31, in <module>env.execute("Flink Kafka Consumer Example")File "F:\learn\flinkdemo\venv\lib\site-packages\pyflink\datastream\stream_execution_environment.py", line 691, in executereturn JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))File "F:\learn\flinkdemo\venv\lib\site-packages\py4j\java_gateway.py", line 1285, in __call__return_value = get_return_value(File "F:\learn\flinkdemo\venv\lib\site-packages\pyflink\util\exceptions.py", line 146, in decoreturn f(*a, **kw)File "F:\learn\flinkdemo\venv\lib\site-packages\py4j\protocol.py", line 326, in get_return_valueraise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o10.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed....at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
...
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)at akka.actor.Actor.aroundReceive(Actor.scala:537)at akka.actor.Actor.aroundReceive$(Actor.scala:535)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)at akka.actor.ActorCell.invoke(ActorCell.scala:548)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)at akka.dispatch.Mailbox.run(Mailbox.scala:231)at akka.dispatch.Mailbox.exec(Mailbox.scala:243)... 5 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition zhubao-0 could be determined

        报错信息很长,重点关注最后的

Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition zhubao-0 could be determined

        解决方法:尝试对程序增加各种延长超时时间,但都没有啥效果,最后排查寻找发现,是由于本地hosts未配置kafka的域名映射,需要在本地hosts里增加对应映射关系

添加后,问题得以解决

总结 

        该demo很简单,完成在windows环境上使用pyflink进行连接kafka并进行数据消费,主要在windows上并且使用pyflink做开发,遇到一些奇奇怪怪的问题,在此做个记录,方便后续查阅。

http://www.lqws.cn/news/581941.html

相关文章:

  • 在 Vue3 + Element Plus 中实现 el-table 拖拽排序功能
  • mysql 小版本升级实战分享
  • 基于librdkafka开发的C++客户端,生产者生产发送数据失败问题处理
  • 百度文心大模型4.5系列正式开源,开源会给百度带来什么?
  • 网络安全2023—新安全新发展
  • MySQL (四):连接查询和索引
  • macos 安装 xcode
  • 【软考中级·软件评测师】下午题·面向对象测试之架构考点全析:分层、分布式、微内核与事件驱动
  • 基于C#的OPCServer应用开发,引用WtOPCSvr.dll
  • python | numpy小记(五):理解 NumPy 中的 `np.arccos`:反余弦函数
  • 卡片跳转到应用页面(router事件)
  • 一文详解Modbus协议原理、技术细节及软件辅助调试
  • 华为云Flexus+DeepSeek征文|​​华为云ModelArts Studio大模型 + WPS:AI智能PPT生成解决方案​
  • 基于时间策略+应用过滤的游戏防沉迷方案:技术实现与工具推荐
  • 本地服务器部署后外网怎么访问不了?内网地址映射互联网上无法连接问题的排查
  • 【Pandas】pandas DataFrame Flags
  • AR 学习:开启未来学习新视界​
  • Stable Diffusion 项目实战落地:从0到1 掌握ControlNet 第四篇 风格化字体大揭秘:从线稿到涂鸦,ControlNet让文字焕发新生
  • SQuirreL SQL:一个免费的通用数据库开发工具
  • OpenWrt | 使用 Docker 运行 iperf3
  • 2 大语言模型基础-2.2 生成式预训练语言模型GPT-2.2.2 有监督下游任务微调-Instruct-GPT强化学习奖励模型的结构改造与维度转换解析
  • AI行业深度观察:从资本竞逐到技术突破的全面解析
  • 38.docker启动python解释器,pycharm通过SSH服务直连
  • 物联网基础
  • 【Mars3d】支持的basemaps数组与layers数组的坐标系列举
  • 电脑开机加速工具,优化启动项管理
  • 感受新复古主义独特魅力,新大洲本田NS150LA上市
  • Docker从部署到实战
  • ADB 安装 APK 及处理安装弹窗的方法
  • 人工智能基石:SVM支持向量机全解析(附Python实战)