当前位置: 首页 > news >正文

Spark 写入hive表解析

FileOutputCommitter中提交mapreduce.fileoutputcommitter.algorithm.version有v1和v2两个版本。

v1版本Spark写入文件的流程:

1.当task完成的时候,会将task的结果文件先写入到临时目录下面。

2.所有的task完成后,将所有的结果文件写入到结果目录。

3.删除临时目录,生成标记文件。

v1的弊端:第二步写入操作是在driver端进行的,而且是单线程进行。当结果文件数量很多的时候,耗时就会线性增加。

v2版本Spark写入文件的流程:

1.当task完成的时候,将task的结果文件直接写入结果目录。

2.所有的task完成后,删除临时目录,生成标记文件。

可以看的v2比v1少了一个rename的过程。写入结果目录是发生在Executor上,task是同时进行的,相当于多线程,速度更快。但是存在一致性问题,在所有task写入结果目录过程,用户可以看到部分数据。

FileOutputCommitter源码解析

对应的类是org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

commit task

Executor端完成task会执行commitTask,可以看到当algorithmVersion == 1的时候会执行rename操作,将task结果文件到committedTaskPath(临时文件)。

algorithmVersion不是1的时候,使用mergePaths将结果文件直接写入到outputPath

commit job

在driver执行,当job完成(所有的task完成),执行commitJob

algorithmVersion == 1,执行mergePaths将结果文件直接写入到outputPath,这里就是性能瓶颈所在的地方,这里是单线程进行的,要是单个文件耗时20ms,1000个文件就是20000ms(20s),它是线性增加的。

algorithmVersion == 2的话,在commitTask中已经写入到结果目录了。

最后删除临时目录,生成标记文件。

和hive结合使用

上面提到使用v2的时候,很有可能出现结果目录中出现部分文件的情况,这要是hive的表目录就能在这个表读取到部分数据,这是不对的。应该保证原子性,要么读取的数据是旧的数据,要么是新的数据,不能读取部分数据。

入口类:org.apache.spark.sql.hive.execution.InsertIntoHiveTable

hive的临时目录生成

类似这种:{hive表数据目录}/.hive-staging_hive_2025-06-26_17-33-07_088_701862708150638596-1/-ext-10000/

processInsert

这里就一个简单的场景,就是写入到非分区表。

首先是先将数据写入到hive的临时目录中,在load到hive表中。

写入hive临时目录

可以看到commiter变量,这个就是上面的FileOutputCommitter,有两种提交方式。

runJob就是spark提交了任务,开始执行。这里执行task,task完成回调commitTask方法,runJob是阻塞的,会等到所有task完成。然后回调commitJob方法。根据上面FileOutputCommitter,此时数据已经全部写入了hive的临时目录下面了。

loadTable

loadTable是使用反射的方式执行_loadTableMethod_方法,对应就是Hive的loadTable方法。

hive的loadTable中如果是替换就是replaceFiles,否则是copyFiles。

replaceFiles

对应sparksql的 insert overwrite

首先将原有的目录下面的文件都放到回收站,即删除,再将结果文件从临时目录移动到正式目录。

moveFiles 移动结果文件。可以看到使用线程池(默认是25个线程)进行结果文件的重命名来移动文件的。不同于FileOutputCommitter的v1中单线程进行rename,Hive是多线程来rename的,效率更高。

copyFiles

对应sparksql的 insert into

可以看到它也是使用线程池进行并行操作的。

整体流程

v2流程,建议使用v2,效率更高。

http://www.lqws.cn/news/543655.html

相关文章:

  • Spring Boot项目开发实战销售管理系统——系统设计!
  • 知名流体控制解决方案供应商“永盛科技”与商派ShopeX达成B2B商城项目合作
  • iOS 远程调试与离线排查实战:构建非现场问题复现机制
  • 报道称CoreWeave洽谈收购Core Scientific,后者涨超30%
  • NV025NV033美光固态闪存NV038NV040
  • 《二分枚举答案(配合数据结构)》题集
  • Python Selenium 滚动到特定元素
  • Selenium基本用法
  • Spring Boot 性能优化与最佳实践
  • 6.27_JAVA_面试(被抽到了)
  • 洛谷P5021 [NOIP 2018 提高组] 赛道修建
  • 深入理解 Linux `poll` 模型:`select` 的增强版
  • 记录一次飞书文档转md嵌入vitepress做静态站点
  • 微信小程序进度条progress支持渐变色
  • Stable Diffusion入门-ControlNet 深入理解-第三课:结构类模型大揭秘——深度、分割与法线贴图
  • 【LeetCode 热题 100】42. 接雨水——(解法三)单调栈
  • FPGA在嵌入式图像处理中的深度应用!
  • 深圳中青宝互动网络股份有限公司游戏运维工程师面试题(笔
  • python实战项目79:采集知乎话题下的所有回答
  • 【用户权限】超级用户(二)
  • win7实现永恒之蓝ms17_010漏洞之445端口
  • matlab实现相控超声波成像
  • 推荐一个基于C#开发的跨平台构建自动化系统!
  • 通信无BUG,ethernet ip转profinet网关,汽车焊接设备通信有心机
  • 面向大语言模型幻觉的关键数据集:系统性综述与分类法_DEEPSEEK
  • Spring Boot整合Redis指南
  • 从电费追缴到碳减排:一个预付费系统如何重塑校园能源生态
  • 使用 Vcpkg 安装 Qt 时的常见问题与解决方法
  • CloudFormation 实现 GitHub Actions OIDC 与 AWS ECR 的安全集成
  • pikachu漏洞练习---File Inclusion(文件包含漏洞)和Unsafe Fileupload(不安全的文件上传)