Beam2.61.0版本消费kafka重复问题排查
1.问题出现过程
在测试环境测试flink的job的任务消费kafka的情况,通过往job任务发送一条消息,然后flink web ui上消费出现了两条。然后通过重启JobManager和TaskManager后,任务从checkpoint恢复后就会出现重复消费。当任务不从checkpoint恢复的时候,任务不会出现重复消费的情况。由此可见是beam从checkpoint恢复的时候出现了重复消费的问题。
2.任务排查过程
由于我们beam使用的是FlinkRunner,所以Beam消费Kafka会基于Flink的Source的规范实现相关的Source。
Flink中的Source实现的几个重要的类:Source:工厂类负责实例化以下的几个组件SourceSplit:封装数据源的逻辑分片(如文件块、Kafka 分区区间)。SplitEnumerator:负责分片发现与分配逻辑。SourceReader:处理分片数据读取与反序列化。
在Beam中分别实现的Flink的KafkaSource是以下这几个类:
FlinkUnboundedSource
FlinkSourceSplit
FlinkSourceSplitEnumerator
FlinkSourceReaderBase <- FlinkUnboundedSourceReader
其中在Flink中Source算子的执行和SourceOpearator和SourceCoordinator这两个类有关,他们的执行顺序如下:
-
初始化阶段
-
SourceCoordinator 优先启动:在 JobMaster(JobManager)启动时,SourceCoordinator 作为独立组件被创建,并负责初始化 SplitEnumerator(分片枚举器)。
-
SourceOperator 后续启动:在 TaskManager 上,每个并行任务实例(Task)启动时,会初始化 SourceOperator,并在其
open()
方法中创建 SourceReader(数据读取器)。
-
-
运行时协作
-
分片分配:SourceCoordinator 的 SplitEnumerator 通过 RPC 响应 SourceOperator 的分片请求(如AddSplitEvent),动态分配分片(Split)。
-
数据读取:SourceOperator 将分配到的分片交给内部的 SourceReader,通过
pollNext()
方法读取数据并发送到下游。
-
SourceOperator类逻辑
@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStreamOperator<OUT>implements OperatorEventHandler,PushingAsyncDataInput<OUT>,TimestampsAndWatermarks.WatermarkUpdateListener {
/** The state that holds the currently assigned splits. */// 状态存储当前被分配的分片信息private ListState<SplitT> readerState;@Overridepublic void open() throws Exception {// 初始化Reader操作initReader();
// in the future when we this one is migrated to the "eager initialization" operator// (StreamOperatorV2), then we should evaluate this during operator construction.if (emitProgressiveWatermarks) {eventTimeLogic =TimestampsAndWatermarks.createProgressiveEventTimeLogic(watermarkStrategy,sourceMetricGroup,getProcessingTimeService(),getExecutionConfig().getAutoWatermarkInterval());} else {eventTimeLogic =TimestampsAndWatermarks.createNoOpEventTimeLogic(watermarkStrategy, sourceMetricGroup);}
// restore the state if necessary.// 从checkpoint状态中恢复出上一次被分配的分片信息final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());if (!splits.isEmpty()) {LOG.info("Restoring state for {} split(s) to reader.", splits.size());// 然后把分片信息添加到Reader中sourceReader.addSplits(splits);}
// Register the reader to the coordinator.registerReader();
sourceMetricGroup.idlingStarted();// Start the reader after registration, sending messages in start is allowed.sourceReader.start();
eventTimeLogic.startPeriodicWatermarkEmits();}// SourceOperator处理算子的对应事件public void handleOperatorEvent(OperatorEvent event) {if (event instanceof WatermarkAlignmentEvent) {updateMaxDesiredWatermark((WatermarkAlignmentEvent) event);checkWatermarkAlignment();checkSplitWatermarkAlignment();} else if (event instanceof AddSplitEvent) {// 处理新增分片的事件:对应任务第一次消费,或者有心的分片增加了(对应到kafka中就是分区数增加了)handleAddSplitsEvent(((AddSplitEvent<SplitT>) event));} else if (event instanceof SourceEventWrapper) {sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent());} else if (event instanceof NoMoreSplitsEvent) {sourceReader.notifyNoMoreSplits();} else if (event instanceof IsProcessingBacklogEvent) {if (eventTimeLogic != null) {eventTimeLogic.emitImmediateWatermark(System.currentTimeMillis());}output.emitRecordAttributes(new RecordAttributesBuilder(Collections.emptyList()).setBacklog(((IsProcessingBacklogEvent) event).isProcessingBacklog()).build());} else {throw new IllegalStateException("Received unexpected operator event " + event);}}private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {try {List<SplitT> newSplits = event.splits(splitSerializer);numSplits += newSplits.size();if (operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {// For splits arrived before the main output is initialized, store them into the// pending list. Outputs of these splits will be created once the main output is// ready.outputPendingSplits.addAll(newSplits);} else {// Create output directly for new splits if the main output is already initialized.createOutputForSplits(newSplits);}// 将新增的分片信息添加到reader中。sourceReader.addSplits(newSplits);} catch (IOException e) {throw new FlinkRuntimeException("Failed to deserialize the splits.", e);}}
}
以上可以看到在SourceOperator中,SourceReader新增分片的地方有两个:Open()
函数中从checkpoint中恢复的和handleAddSplitsEvent()
中添加的分片信息,然后继续看看sourceReader.addSplits(newSplits)
中调用的是FlinkSourceReaderBase#addSplits(newSplits)
方法。
由于Beam中kafka的FlinkSourceReader分别对应有界和无界,所以中间有一个抽象的类FlinkSourceReaderBase
FlinkSourceReaderBase类
public abstract class FlinkSourceReaderBase<T, OutputT>implements SourceReader<OutputT, FlinkSourceSplit<T>> {// 这是一个队列,存储的是分片信息 private final Queue<FlinkSourceSplit<T>> sourceSplits = new ArrayDeque<>();@Overridepublic void addSplits(List<FlinkSourceSplit<T>> splits) {checkExceptionAndMaybeThrow();LOG.info("Adding splits {}", splits);// 往队列中添加了分片信息sourceSplits.addAll(splits);waitingForSplitChangeFuture.get().complete(null);}protected final Optional<ReaderAndOutput> createAndTrackNextReader() throws IOException {// 从队列中消费分片 FlinkSourceSplit<T> sourceSplit = sourceSplits.poll();if (sourceSplit != null) {// 然后根据分片创建对应的Reader,进行消费Kafka的数据。Source.Reader<T> reader = createReader(sourceSplit);ReaderAndOutput readerAndOutput = new ReaderAndOutput(sourceSplit.splitId(), reader, false);beamSourceReaders.put(sourceSplit.splitIndex(), readerAndOutput);return Optional.of(readerAndOutput);}return Optional.empty();}
}
所以看到以上的代码其实很清楚了,消费kafka重复很有可能是因为分片被重复添加导致的,由于在Kafka中KafkaConsumer在指定分区和Offset的情况下,是可以多个消费者在同一个消费者组中消费同一个分区的。
接下来使用arthas去监控sourceReader.addSplits(newSplits)
的地方的调用情况:
// 监控SourceOperator#open()方法
watch org.apache.flink.util.CollectionUtil iterableToList '{params,returnObj,throwExp}' -n 5 -x 3
// 监控SourceOperator#handleAddSplitsEvent()方法
watch org.apache.flink.streaming.api.operators.SourceOperator handleAddSplitsEvent '{params,returnObj,throwExp}' -n 5 -x 3
最终观察到这两个地方都被调用了,所以问题就是因为checkpoint恢复的时候添加了分片信息,而从SourceCoordinator
中调用FlinkSourceSplitEnumerator()
计算分片的地方又添加了一次导致最终kafka消费重复了。
FlinkSourceSplitEnumerator类
public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {private static final Logger LOG = LoggerFactory.getLogger(FlinkSourceSplitEnumerator.class);private final SplitEnumeratorContext<FlinkSourceSplit<T>> context;private final Source<T> beamSource;private final PipelineOptions pipelineOptions;private final int numSplits;private final Map<Integer, List<FlinkSourceSplit<T>>> pendingSplits;// 这里标识split计算是否被初始化过private boolean splitsInitialized; public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> context,Source<T> beamSource,PipelineOptions pipelineOptions,int numSplits) {this.context = context;this.beamSource = beamSource;this.pipelineOptions = pipelineOptions;this.numSplits = numSplits;this.pendingSplits = new HashMap<>(numSplits);// 这里看到永远都是false,所以无论有没有从checkpoint恢复过,这里都会执行过一次。 this.splitsInitialized = false;}@Overridepublic void start() {context.callAsync(() -> {// 执行分片计算的操作,计算哪些kafka分区被分配给哪个并行度try {LOG.info("Starting source {}", beamSource);List<? extends Source<T>> beamSplitSourceList = splitBeamSource();Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();int i = 0;for (Source<T> beamSplitSource : beamSplitSourceList) {int targetSubtask = i % context.currentParallelism();List<FlinkSourceSplit<T>> splitsForTask =flinkSourceSplitsList.computeIfAbsent(targetSubtask, ignored -> new ArrayList<>());splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));i++;}return flinkSourceSplitsList;} catch (Exception e) {throw new RuntimeException(e);}},(sourceSplits, error) -> {if (error != null) {throw new RuntimeException("Failed to start source enumerator.", error);} else {pendingSplits.putAll(sourceSplits);// 这里标识设置为true了 splitsInitialized = true;// 将分配好的分片信息通过rpc发送给SourceOpeartor,对应并行度的task取对应并行度的分片信息。sendPendingSplitsToSourceReaders();}});}}
以上看到FlinkSourceSplitEnumerator
被初始化的时候splitsInitialized
被设置为false,然后接着看实例化FlinkSourceSplitEnumerator
的FlinkSource中的逻辑。
public abstract class FlinkSource<T, OutputT>implements Source<OutputT, FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {// 这里是没有checkpoint的时候执行的 @Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext) throws Exception {return new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits);}
// 这里是从checkppoint中恢复的地方@Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>restoreEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)throws Exception {// 在这里实例化了FlinkSourceSplitEnumeratorFlinkSourceSplitEnumerator<T> enumerator =new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits);checkpoint.forEach((subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));return enumerator;}}
以上看到在实例化FlinkSourceSplitEnumerator
的地方,只要是从checkpoint中恢复的时候,将标识splitsInitialized
设置为true,那么就不会从checkpoint中恢复的时候,去重复计算和添加分片从而导致重复消费了。
3.问题解决
后来在Beam的2.64.0版本中,发现这个bug已经被修复了,FlinkSource
中restoreEnumerator的地方已经加上了判断逻辑了。
public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {
@Overridepublic SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>restoreEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext,Map<Integer, List<FlinkSourceSplit<T>>> checkpoint)throws Exception {// 这里将splitInitialized标识设置为了trueSplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> enumerator =createEnumerator(enumContext, true);checkpoint.forEach((subtaskId, splitsForSubtask) -> enumerator.addSplitsBack(splitsForSubtask, subtaskId));return enumerator;}public SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>>createEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> enumContext, boolean splitInitialized)throws Exception {
if (boundedness == Boundedness.BOUNDED) {return new LazyFlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);} else {return new FlinkSourceSplitEnumerator<>(enumContext, beamSource, serializablePipelineOptions.get(), numSplits, splitInitialized);}}}
public class FlinkSourceSplitEnumerator<T>implements SplitEnumerator<FlinkSourceSplit<T>, Map<Integer, List<FlinkSourceSplit<T>>>> {public FlinkSourceSplitEnumerator(SplitEnumeratorContext<FlinkSourceSplit<T>> context,Source<T> beamSource,PipelineOptions pipelineOptions,int numSplits,boolean splitsInitialized) {
this.context = context;this.beamSource = beamSource;this.pipelineOptions = pipelineOptions;this.numSplits = numSplits;this.pendingSplits = new HashMap<>(numSplits);this.splitsInitialized = splitsInitialized;}
@Overridepublic void start() {// 这里加上了判断逻辑了,为true不会执行了if (!splitsInitialized) {initializeSplits();}}
private void initializeSplits() {context.callAsync(() -> {try {LOG.info("Starting source {}", beamSource);List<? extends Source<T>> beamSplitSourceList = splitBeamSource();Map<Integer, List<FlinkSourceSplit<T>>> flinkSourceSplitsList = new HashMap<>();int i = 0;for (Source<T> beamSplitSource : beamSplitSourceList) {int targetSubtask = i % context.currentParallelism();List<FlinkSourceSplit<T>> splitsForTask =flinkSourceSplitsList.computeIfAbsent(targetSubtask, ignored -> new ArrayList<>());splitsForTask.add(new FlinkSourceSplit<>(i, beamSplitSource));i++;}return flinkSourceSplitsList;} catch (Exception e) {throw new RuntimeException(e);}},(sourceSplits, error) -> {if (error != null) {throw new RuntimeException("Failed to start source enumerator.", error);} else {pendingSplits.putAll(sourceSplits);splitsInitialized = true;sendPendingSplitsToSourceReaders();}});}
}
4.其他问题
从上可以看到Beam的KafkaSource实际上对比Flink原生的KafkaSource其实还有很多功能上的不足,比如说:
1.Beam中KafkaSource当从checkpoint恢复任务时,且这时候手动增加了Kafka的分区数实际上是不会被消费到的。
2.Beam中KafkaSource没有动态分区发现的功能,既不能在不手动重启任务且不从checkpoint恢复的情况下下消费到新分区的。