当前位置: 首页 > news >正文

使用pyflink编写demo并将任务提交到yarn集群

目录

背景

一、pyflink安装

二、编写demo程序

三、提交yarn前准备

 四、提交任务

五、踩坑记录

1、提交任务时客户端出现语法错误

2、提交任务时客户端出现lzma包找不到

3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM”

5、提交任务时taskmanager上报错出现Permission denied

六、总结


背景

        最近项目需要,学习研究使用python来开发flink任务,在此做相关笔记。

一、pyflink安装

        在本地执行如下命令,需要注意的是,pyflink必须要求python版本大于等于3.6,我本地是3.12

创建虚拟环境

python -m venv venv

切换虚拟环境

venv/bin/activate

执行安装命令

pip install apache-flink==1.14

 安装成功后如下

二、编写demo程序

        这里直copy官方教程

import argparse
import logging
import sysfrom pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtfword_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())# write all the data to one filet_env.get_config().get_configuration().set_string("parallelism.default", "1")# define the sourceif input_path is not None:t_env.create_temporary_table('source',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).build()).option('path', input_path).format('csv').build())tab = t_env.from_path('source')else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")tab = t_env.from_elements(map(lambda i: (i,), word_count_data),DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))# define the sinkif output_path is not None:t_env.create_temporary_table('sink',TableDescriptor.for_connector('filesystem').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).option('path', output_path).format(FormatDescriptor.for_format('canal-json').build()).build())else:print("Printing result to stdout. Use --output to specify output path.")t_env.create_temporary_table('sink',TableDescriptor.for_connector('print').schema(Schema.new_builder().column('word', DataTypes.STRING()).column('count', DataTypes.BIGINT()).build()).build())@udtf(result_types=[DataTypes.STRING()])def split(line: Row):for s in line[0].split():yield Row(s)# compute word counttab.flat_map(split).alias('word') \.group_by(col('word')) \.select(col('word'), lit(1).count) \.execute_insert('sink') \.wait()# remove .wait if submitting to a remote cluster, refer to# https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster# for more detailsif __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

在本地执行该脚本,输出如下

三、提交yarn前准备

准备一台linux服务器,并装有flink客户端(我使用的版本是flink1.14.2,这里不说客户端如何安装了,下载包解压安装即可)

在服务器上搭建pyflink运行环境,参考第一章节

将demo程序上传到该服务器上

其中env为python虚拟环境目录,py_env.zip为将env使用zip进行压缩的文件

 四、提交任务

/home/master/flink-1.14.2/bin/flink run -pyarch py_env.zip -m yarn-cluster -py /home/zhubao/demo.py -pyexec py_env.zip/env/bin/python

看到终端打印如下日志

访问yarn集群web管理页面,在running下看到有对应的任务时,即表示任务已经提交到yarn集群

查看任务详情

五、踩坑记录

1、提交任务时客户端出现语法错误

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]File "main.py", line 55ds.print()^
SyntaxError: invalid syntax

解决方法:上述问题排查发现是flink客户端版本差异导致,编写的demo代码,和flink客户端版本要一致,否则会出现一些莫名其妙的问题,统一调整flink版本为一致,包括pyflink,flink客户端等

2、提交任务时客户端出现lzma包找不到

Traceback (most recent call last):File "/home/zhubao/env/lib/python3.7/site-packages/fastavro/read.py", line 2, in <module>from . import _readFile "fastavro/_read.pyx", line 11, in init fastavro._readFile "/home/master/python3/lib/python3.7/lzma.py", line 27, in <module>from _lzma import *
ModuleNotFoundError: No module named '_lzma'

解决方法:该错误表明系统缺少Python的LZMA压缩模块依赖(_lzma),这是Python标准库中处理.xz压缩文件的底层C扩展模块,需要进行安装

# 使用root权限执行如下命令
yum install -y xz-devel python-backports-lzma
# 使用普通用户执行安装lzma包命令
pip install backports.lzma -i https://pypi.tuna.tsinghua.edu.cn/simple --trusted-host pypi.tuna.tsinghua.edu.cn

安装完成后,需要对lzma文件进行修改,找到lzma.py文件,一般在$PYTHON_HOME/lib/python3.7目录下(根据实际版本),主要是加上try except

修改完成后保存退出,重新执行解决该问题

3、提交任务时客户端出现“org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM”

Traceback (most recent call last):File "main.py", line 99, in <module>word_count(known_args.input, known_args.output)File "main.py", line 39, in word_countds = env.from_collection(word_count_data)File "/home/zhubao/env/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", line 958, in from_collectionreturn self._from_collection(collection, type_info)File "/home/zhubao/env/lib/python3.7/site-packages/pyflink/datastream/stream_execution_environment.py", line 981, in _from_collectionj_input_format = PythonTypeUtils.getCollectionInputFormat(File "/home/zhubao/env/lib/python3.7/site-packages/py4j/java_gateway.py", line 1550, in __getattr__"{0}.{1} does not exist in the JVM".format(self._fqn, name))
py4j.protocol.Py4JError: org.apache.flink.streaming.api.utils.PythonTypeUtils.getCollectionInputFormat does not exist in the JVM

解决方法:还是版本不匹配导致,请确保pyflink与客户端版本一致

Caused by: java.io.IOException: Failed to execute the command: python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin'))
output: Traceback (most recent call last):File "<string>", line 1, in <module>
ImportError: No module named pyflink

解决方法:在flink配置文件中,加上python执行环境,打开flink-conf.yaml文件,一般在$FLINK_HOME/conf目录下,编辑该文件,在末尾加上python执行环境

python.client.executable: /home/zhubao/env/bin/python

5、提交任务时taskmanager上报错出现Permission denied

Caused by: java.io.IOException: Cannot run program "/home/zhubao/env/bin/python": error=13, Permission denied

解决方法:找了一些方案,但最终通过将整个python执行环境打包提交到yarn上解决,方法如下

将python环境达成zip包

zip -r py_env.zip /home/zhubao/env/

提交任务命令增加指定环境包与执行环境

/home/master/flink-1.14.2/bin/flink run -pyarch py_env.zip -m yarn-cluster -py /home/zhubao/demo.py -pyexec py_env.zip/env/bin/python

六、总结

        以上是使用pyflink进行flink任务开发,以及将任务提交到yarn集群方法。

http://www.lqws.cn/news/519049.html

相关文章:

  • 大塘至浦北高速:解锁分布式光伏“交能融合”密码,引领绿色交通革命
  • Redis HyperLogLog误差率0.81%的由来:从算法原理到Redis实现
  • UNIAPP入门基础
  • 如何快速将iPhone中的文本保存到电脑上
  • [架构之美]在Linux上通过源码编译安装Nginx(十四)
  • golang实现一个mysql中随机获取cookies的API
  • 数字隔离器,如何扛起现代智能家电的电气安全“大旗”
  • [Java实战]Windows系统JDK21安装与JDK8切换指南(三十九)
  • 利用亮数据实现海外网站数据自动抓取
  • 回归预测 | Matlab实现KAN神经网络多输入单输出回归预测模型
  • 【CUDA调优指南】缓存访存流程
  • 商务年度总结汇报PPT模版分享
  • 板凳-------Mysql cookbook学习 (十--10)
  • 笔记02:布线-差分对的设置与添加
  • 定制开发开源AI智能名片与S2B2C商城小程序的内容分发体系构建:基于“1+N“素材复用模型的创新实践
  • 旧物回收小程序:让旧物重获新生的魔法钥匙
  • 14.Linux Docker
  • Mac安装Apache CXF的时候报错:/Library/Internet: No such file or directory
  • 淘宝API安全合规指南:避免数据泄露与封禁
  • 智能质检对呼叫中心职场有什么作用
  • 深入剖析 Spring AOP
  • 迁移学习—基于猫狗数据集
  • 【DataWhale组队学习】AI办公实践与应用-数据分析
  • Ubuntu 物理桌面远程访问教程(基于 RealVNC / mstsc)
  • 北斗导航 | 基于CNN-LSTM-PSO算法的接收机自主完好性监测算法
  • Spring Boot 项目文档编写工具推荐
  • 聚焦OpenVINO与OpenCV颜色通道转换的实践指南
  • UniApp 开发第一个项目
  • 防静电地板更换不是建材更新,而是重铸安全防线!
  • nn.Embedding 和 word2vec 的区别