当前位置: 首页 > news >正文

Temporal Join,一探究竟

flink sql的join分为Regular Joins、Interval Joins、Temporal Joins和Lookup Join。

尽管官方文档中对上述几种join类型的概念和原理有很好的解释,但针对Temporal Joins仍有以下一些疑惑。

  • 1.官方文档中给出的tempoal join中语法如下,但紧随着分别说明只有基于事件时间方式使用可以FOR SYSTEM_TIME AS OF leftTable.rowtime语法,而基于处理时间方式却只能使用temporal table function join语法?
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
  • 2.基于事件时间和处理时间方式join的实现细节是什么?如何理解不同时间语义下的“时态”?
  • 3.join结果是何时、如何下发的?
  • 4.表中的记录在状态中如何存储?状态如何清理?
  • 5.如何判断不需要(过期)记录的版本?
  • 6.基于处理时间的temporal join和lookup join的区别是什么?

带着上述问题,开始尝试从源码中(1.16)获取答案。

temporal join两种时间属性的Operator实现类在StreamExecTemporalJoin#createJoinOperator方法中初始化。

ps: 一件事找到正确入口便已成功了30%,那么这个入口是如何找到的?在源码中通过关键字硬硬硬搜出来的……

private TwoInputStreamOperator<RowData, RowData, RowData> createJoinOperator(...) {// ...// 根据table.exec.state.ttl配置项计算最小、大留存时间,temporal join中将会根据此来计算状态的留存时间long minRetentionTime = config.getStateRetentionTime();long maxRetentionTime = TableConfigUtils.getMaxIdleStateRetentionTime(config);if (rightTimeAttributeIndex >= 0) {// 基于事件时间的temporal joinreturn new TemporalRowTimeJoinOperator(...);} else {// 基于处理时间的temporal joinif (isTemporalFunctionJoin) {// 使用temporal table function join时,初始化TemporalProcessTimeJoinOperatorreturn new TemporalProcessTimeJoinOperator(...);} else {// 否则抛出异常,官方原因如下// The exsiting TemporalProcessTimeJoinOperator has already supported temporal table join. // However, the semantic of this implementation is problematic, because the join processing for left stream doesn't wait for the complete snapshot of temporal table,// this may mislead users in production environment. See FLINK-19830 for more details.throw new TableException("Processing-time temporal join is not supported yet.");}}
}

因此Temporal Join基于事件时间和处理时间的operator实现类分别为TemporalRowTimeJoinOperatorTemporalProcessTimeJoinOperator,二者继承共同父类BaseTwoInputStreamOperatorWithStateRetention

temporal继承关系

基于处理时间的temporal join只有使用temporal table function join方式时才会初始化TemporalProcessTimeJoinOperator,否则抛出异常。FLINK-19830中说明了这样做的原因:当前TemporalProcessTimeJoinOperator实现中,由于左表记录并不会等待右表记录的完整快照,存在可能会误导用户的语义问题。之所以支持temporal table function join方式,是由于这种方式已经存在很长时间的历史原因,出于兼容性烤考虑,所以继续支持它。

1. BaseTwoInputStreamOperatorWithStateRetention

BaseTwoInputStreamOperatorWithStateRetention,一个允许子类基于TTL清理它们状态的抽象TwoInputStreamOperator

该抽象类的主要作用是根据table.exec.state.ttl,默认值0配置项,决定是否开启处理时间timer(table.exec.state.ttl>1时开启)。开启后,对表中每条记录的key最多维护一个处理时间timer。子类通过实现其唯一的抽象方法cleanupState(long time)来决定timer触发时的具体动作。

  • 关键成员属性
// 取值table.exec.state.ttl
private final long minRetentionTime;
// 取值table.exec.state.ttl * 3 / 2
private final long maxRetentionTime;
// 保存已注册timer的触发时间
private transient ValueState<Long> latestRegisteredCleanupTimer;

ValueState<Long> latestRegisteredCleanupTimer状态中保存的是已注册的处理时间timer的触发时间,通过该状态值判断处理时间timer是否注册、更新等操作。除此之外minRetentionTimemaxRetentionTime表示状态最小、最大留存时间,用于计算处理时间timer的时间。

  • 关键方法
// 注册、更新处理时间timer,由子类主动调用
protected void registerProcessingCleanupTimer()
// timer触发时自动执行
public final void onProcessingTime
// 唯一的抽象方法,子类实现具体的timer触发时的执行逻辑
public abstract void cleanupState(long time)

registerProcessingCleanupTimer()方法负责新增、更新处理时间timer,由子类主动调用,核心逻辑如下

if(table.exec.state.ttl>1){if(timer未注册){注册处理时间timer,timer时间=curProcessingTime+maxRetentionTime;}else if(curProcessingTime+minRetentionTime > 已注册timer的时间){删除旧的timer;更新处理时间timer,timer时间=curProcessingTime+maxRetentionTime;}
}

onProcessingTime()方法在timer触发时自动执行,主要逻辑为执行cleanupState(long time)抽象方法,核心逻辑如下

public final void onProcessingTime(InternalTimer<Object, VoidNamespace> timer) throws Exception {if(table.exec.state.ttl>1){if(timer已注册 && 保存在状态中的time==timer){执行cleanupState(long time),该方法由子类具体实现;清空latestRegisteredCleanupTimer状态;}}
}

画个图来理解下处理时间timer

处理时间timer

假设table.exec.state.ttl配置为2h,则minRetentionTime=2h,maxRetentionTime=3h

1.在processTime=10:00时刻首次调用registerProcessingCleanupTimer()方法注册timer,该timer将会在curProcesstime+maxRetentionTime(13:00) 时触发执行。
2.在(10:00,11:00)的时间范围内,继续调用registerProcessingCleanupTimer()方法,将不会导致timer发生变更。
3.在[11:00,-)之后的时间内,继续调用registerProcessingCleanupTimer()方法,由于processTime+minRetentionTime > 13:00,将导致timer不断被重置。
如果之后始终不断调用registerProcessingCleanupTimer()方法,则timer将一直不会触发,只有在timer触发之前未调用registerProcessingCleanupTimer()方法时,timer才会被触发执行。

2. 基于事件时间的 Temporal join Operator

TemporalRowTimeJoinOperatorprocessElement1()processElement2()方法分别负责处理左右表中的每条记录。两个方法的处理逻辑类似。主要为以下步骤

  • 1.获取(左/右表)记录中事件时间
  • 2.将记录存储到对应状态中
  • 3.使用最小事件时间注册事件时间timer
  • 4.调用父类中的registerProcessingCleanupTimer()方法,注册处理时间timer

有以下几个关键点

  • 当左/右表中记录到达后并没有执行join操作并输出结果,仅仅是将数据放入到各自的状态中,并使用最小事件时间注册事件时间timer
  • 为什么是使用最小事件事件注册事件时间timer?

用于存储左右表记录状态的成员属性定义如下

private transient MapState<Long, RowData> leftState;
private transient MapState<Long, RowData> rightState;

二者的区别主要体现在key实际存储的内容上,leftState中key保存的内容是一个从1开始的自增数字,当左表中第一条数据到达后其状态中key=1,第二条数据其状态中key=2,依次递增。而rightState中key保存的是右表中每条记录的事件时间,这样设计的原因将从emitResultAndCleanUpState(long currentWatermark)方法中得到解释。

如何理解使用最小事件时间注册事件时间timer?

基于事件时间的temporal join的工作原理是先保存左右表中记录到状态中并注册timer,当watermark超过timer时间,在timer触发执行时来处理状态中的数据并输出结果。

如果为每个key的全部数据都注册事件时间timer,就会导致注册的timer数量巨大。例如左表记录中key=A的事件时间在流中的体现形式为{9,8,5,2,1},如果为这5个事件时间都注册timer,当收到watermark(10)时,这5个timer都将会触发,造成冗余。为了避免这种情况,对同一个数据记录key仅使用最小事件时间(事件时间1)注册的事件时间timer。这样不仅大大减少了timer数量,当收到watermark(10)时,只有一个timer被触发执行。

最小事件时间timer将会保存在ValueState<Long> registeredTimer,由左右表中的记录共同更新,当左/右表的数据记录到达processElement1()/processElement2()方法后,根据当前记录的事件时间和registeredTimer状态中已注册timer时间进行比较,使用最小事件时间重新注册或更新timer(更新操作为先删除旧timer,再注册新timer)。

上述过程在registerSmallestTimer(long timestamp)方法中完成,核心逻辑如下

// 入参timestamp为左右表中当前记录的事件时间
private void registerSmallestTimer(long timestamp) throws IOException {Long currentRegisteredTimer = registeredTimer.value();if(currentRegisteredTimer == null){registeredTimer.update(timestamp);注册timer;}else if(currentRegisteredTimer > timestamp){删除旧timer;registeredTimer.update(timestamp);注册新timer;}
}

processElement1()方法相比processElement2()稍有不同的地方在于,processElement1()方法中需要为左表中每条记录计算存储在leftState中的自增序号。使用ValueState<Long> nextLeftIndex状态保存自增序号。计算当前记录自增序号也很简单:状态值+1。

已注册的事件时间timer触发时onEventTime()方法将会执行,在该方法进行状态清理和结果输出。核心逻辑如下

  • 1.registeredTimer.clear();
  • 2.调用emitResultAndCleanUpState(long currentWatermark)方法来清理状态和下发结果,该方法会返回leftState中未处理数据中最小的事件时间(lastUnprocessedTime);
  • 3.lastUnprocessedTime有效的情况下,使用lastUnprocessedTime重新注册事件时间timer;
  • 4.调用父类方法,注册或清理处理时间timer,该步骤具体逻辑如下
if (stateCleaningEnabled) {if (lastUnprocessedTime < Long.MAX_VALUE || !rightState.isEmpty()) {registerProcessingCleanupTimer();} else {cleanupLastTimer();//  清空保存左表记录自增序号的状态nextLeftIndex.clear();}
}

最最最核心的join逻辑将在emitResultAndCleanUpState(long currentWatermark)方法中揭晓。该方法主要处理逻辑如下

1.从右表记录状态(rightState)中获取全部数据到本地list中(rightRowsSorted),该list已按照事件时间升序排序;
2.对左表记录状态(leftState)中全部数据,按照<=currentWatermark条件分成两部分,将满足条件的数据从状态中取出到本地map(orderedLeftRecords),并从状态中删除,该map使用TreeMap结构默认根据key(自增序号)升序排序,对不满足条件的数据部分计算其最小事件时间(lastUnprocessedTime);
因此之所以使用自增序号作为leftState状态中key的目是为了在处理左表记录时与记录到达顺序保持一致。
3.挨个处理orderedLeftRecords中数据,对每一条记录根据其事件时间使用二分查找从rightRowsSorted中获取右表中相同事件时间的的记录,如果右表中无相同事件时间的记录,则返回右表中<左表记录事件时间且最新事件时间的右表记录(rightRow);
4.根据第3步rightRow结果、join条件、是否是left join输出不同结果,该步骤具体逻辑如下

if (rightRow存在 && rightRow in(INSERT,UPDATE_AFTER)) {if (join条件成立) {输出左表 join 右表结果} else {if (isLeftOuterJoin) {输出左表 JOIN null结果}}
} else {if (isLeftOuterJoin) {输出左表 JOIN null结果}
}

5.orderedLeftRecords.clear();
6.删除右表状态(rightState)中记录事件时间<=currentWatermark的全部记录,但是在<=currentWatermark的记录中会保留中下事件时间最大的那条记录,这意味着如果右表记录已经全部过期,rightState中也会保留一条最新事件时间的记录。
通过记录事件时间<=currentWatermark的条件来判断右表中记录是否已失效。
7.返回lastUnprocessedTime时间。

最后父类cleanupState(long time)的方法实现如下。负责清空全部状态数据。

@Override
public void cleanupState(long time) {leftState.clear();rightState.clear();nextLeftIndex.clear();registeredTimer.clear();
}

以上即从TemporalRowTimeJoinOperator的关键逻辑。可以得到该join仅支持inner join或left join,join过程是以左表数据记录为主的,右表数据记录作为维表被动等待左表记录进行匹配。这也解释了官方文档中描述中“基于事件时间的temporal join允许对版本表进行join”,为什么仅将右表称为版本表的原因。

3. 基于处理时间的 Temporal join Operator

相比于TemporalRowTimeJoinOperator实现类,TemporalProcessTimeJoinOperator实现类要简单很多。仅仅定义了一个ValueState<RowData> rightState来保存右表记录中相同key的一条数据。左表记录不会通过状态进行存储,这也意味这在左表记录到达时将会立即输出join结果。

processElement1()方法中处理左表记录的流程如下

  • 1.从右表状态(rightState)中获取右表记录(rightSideRow);
  • 2.根据右表记录、join条件、是否left join来处理输出结果,该过程具体逻辑如下
if (rightSideRow == null) {// 右表记录为空if (left join) {输出左表 join null结果} else {return;}
} else {if (join条件成立) {输出左表 join 右表结果} else {if (left join) {输出左表 join null结果}}// 调用父类方法注册处理时间timer,进行状态清理registerProcessingCleanupTimer();
}

processElement2()方法中对右表记录的处理仅存储到状态或从状态中删除,核心逻辑如下

if (记录类型 in(INSERT,UPDATE_AFTER)) {//将记录保存到状态中,调用父类方法注册处理时间timer,进行状态清理rightState.update(element.getValue());registerProcessingCleanupTimer();
} else {// 清空右表记录状态,然后删除父类中的处理时间timerrightState.clear();cleanupLastTimer();
}

最后父类最后父类cleanupState(long time)的方法实现如下,清空保存右表记录的状态.

@Override
public void cleanupState(long time) {rightState.clear();
}

在基于处理时间的temporal join同样仅支持inner join或left join。以左表记录为主,右表记录被动等待左表记录进行匹配。

同基于事件时间的temporal join实现的最大区别在于左表记录并不会等待右表记录。当左表中key=A的记录到达后,尽管右表中同样存在key=A的数据,但是在左表key=A的记录到达时右表记录还未保存到状态中,左表记录仍然是无法正确关联到右表数据的。这就是上文提到的可能会引发用户误解的语义问题。

单纯从TemporalProcessTimeJoinOperator的实现来看,无论是(LATERAL TemporalTableFunction(o.proctime))形式的 temporal table function join还是 (FOR SYSTEM_TIME AS OF leftTable.processTime)形式的 temporal table join都是支持的。实际之所以必须使用temporal table function join,如上文提到的出于历史兼容和避免语义歧义考虑。

http://www.lqws.cn/news/503281.html

相关文章:

  • 【服务器】教程 — Linux上如何挂载服务器NAS
  • GitHub Actions 的深度解析与概念介绍
  • 智能制造——解读基于AI框架的智能工厂设计思路【附全文阅读】
  • 【论文阅读 | CVPRW 2023 |CSSA :基于通道切换和空间注意力的多模态目标检测】
  • CSS 实现文本溢出省略号(三种主流方式,适配单行 多行)
  • PHP 华为云H5上传文件:临时链接上传文件和POST表单直传
  • 华为云Flexus+DeepSeek征文|基于华为云Flexus Dify复用优秀 AI Agent 应用教程
  • Elasticsearch | 索引和模板字段管理:增加新字段的详细操作
  • 《C++初阶之类和对象》【初始化列表 + 自定义类型转换 + static成员】
  • React性能优化精髓之一:频繁setState导致滚动卡顿的解决方案
  • Jmeter接口自动化测试框架
  • 从零开始学习Spring Cloud Alibaba (一)
  • Gradio可视化构建聊天机器人
  • 开源模型应用落地-让AI更懂你的每一次交互-用Redis Stack与LangChain解锁大模型的长期记忆潜能(二)
  • 前端后端文件下载防抖实现方案
  • 【大模型学习】项目练习:套壳DeepSeek
  • 阿里最新开源:Mnn3dAvatar 3D数字人框架, 无需联网,本地部署可离线运行,支持多模态实时交互
  • 索引优化SEO帮助你的网站内容更快被搜索引擎发现
  • Python的GUI库选择指南(深度拓展)
  • C++ —— STL容器 —— vector的模拟实现
  • 【Java开发日记】我们详细地讲解一下 Java 异常及要如何处理
  • 快速sincos算法,stm32测试
  • 如何轻松地将照片从 iPhone 传输到计算机
  • 【LLaMA-Factory 实战系列】三、命令行篇 - YAML 配置与高效微调 Qwen2.5-VL
  • iOS应用开发中的性能调试与数据分析:一套完整实战工具流程
  • 学习threejs,使用kokomi、gsap实现图片环效果
  • AI智能化高效办公:WPS AI全场景深度应用指南
  • pyqt setContentsMargins
  • 左神算法之数字字符串解码方案计数算法
  • Kafka 监控与调优实战指南(二)