当前位置: 首页 > news >正文

flink如何支持kafka容灾自动切换

背景

在flink消费kafka消息时,我们会指定连接的kafka服务器的地址以及起始消费偏移等信息,一旦指定,当kafka服务器挂掉后,flink也会由于连接不上服务器而导致失败,这里想要解决的问题是当kafka在机房A挂掉后,如果机房B有对kafka进行容灾的频道,那么flink怎么可以做到自动切换到机房B的进行kafka消费?同理,当机房A数据恢复后,如何自动切回到机房A进行消费?这个过程自动发生而不需要手动修改kafka的地址

技术实现

flink消费kafka的实现类是FlinkKafkaConsumerBase,这个类内部有一个功能:可以自动发现满足某个规则的kafka主题并消费,其关键代码如下:

private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {discoveryLoopThread =new Thread(() -> {try {// --------------------- partition discovery loop// ---------------------// throughout the loop, we always eagerly check if we are still// running before// performing the next operation, so that we can escape the loop as// soon as possiblewhile (running) {if (LOG.isDebugEnabled()) {LOG.debug("Consumer subtask {} is trying to discover new partitions ...",getRuntimeContext().getIndexOfThisSubtask());}final List<KafkaTopicPartition> discoveredPartitions;try {discoveredPartitions =partitionDiscoverer.discoverPartitions();} catch (AbstractPartitionDiscoverer.WakeupException| AbstractPartitionDiscoverer.ClosedException e) {// the partition discoverer may have been closed or woken up// before or during the discovery;// this would only happen if the consumer was canceled;// simply escape the loopbreak;}// no need to add the discovered partitions if we were closed// during the meantimeif (running && !discoveredPartitions.isEmpty()) {kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);}// do not waste any time sleeping if we're not running anymoreif (running && discoveryIntervalMillis != 0) {try {Thread.sleep(discoveryIntervalMillis);} catch (InterruptedException iex) {// may be interrupted if the consumer was canceled// midway; simply escape the loopbreak;}}}} catch (Exception e) {discoveryLoopErrorRef.set(e);} finally {// calling cancel will also let the fetcher loop escape// (if not running, cancel() was already called)if (running) {cancel();}}},"Kafka Partition Discovery for "+ getRuntimeContext().getTaskNameWithSubtasks());discoveryLoopThread.start();}

如上所示,他是通过开启一个线程,然后定时检测的方式来发现是否有新的符合规则条件的主题,如果有添加到消费队列中,读者会不会很好奇,我们讨论的是flink如何对kafka进行容灾切换,你和我说这个主题自动发现做什么?
其实这里想表达的是一样的思路,我们进行kafka容灾的切换也是可以这样做,我们开启一个线程,然后线程里面不停的检测当前消费的kafka集群的连通性是否正常,如果连接不上,那么表明发生了kafka的机房容灾,flink需要切换到kafka的机房B进行消费,那么这里剩下的就只是如何确定消费的偏移量的问题了

http://www.lqws.cn/news/475399.html

相关文章:

  • 一次性理解Java垃圾回收--简单直接方便面试时使用
  • 半导体二极管
  • C# Quartz.net 定时任务
  • c# .netCreateLinkedTokenSource链接令牌,取消信号异步执行
  • 二、海思网卡数据流程
  • 多源异构数据接入与实时分析:衡石科技的技术突破
  • 虚拟与现实交融视角下定制开发开源AI智能名片S2B2C商城小程序赋能新零售商业形态研究
  • 【node】Mac m1 安装nvm 和node
  • Vulkan 学习笔记15—Mipmap 与多重采样
  • Vue3+TypeScript+Element Plus 表格展开行优化方案
  • MongoDB:索引
  • 【机器学习的五大核心步骤】从零构建一个智能系统
  • Linux 服务器运维:磁盘管理与网络配置
  • go excel解析库xuri/excelize中的SAX
  • LLMs之Embedding:Qwen3 Embedding的简介、安装和使用方法、案例应用之详细攻略
  • 【weaviate】分布式数据写入之LSM树深度解析:读写放大的权衡
  • 数据库(1)-SQL
  • webpack+vite前端构建工具 -6从loader本质看各种语言处理 7webpack处理html
  • 案例:塔能科技以“数字光网”重塑某市照明绿色生态
  • Docker 运行RAGFlow 搭建RAG知识库
  • LeapMotion-PhysicalHandsManager 类详解
  • 7.5.1散列表的基本概念
  • 测试工程师实战:用 LangChain+deepseek构建多轮对话测试辅助聊天机器人
  • 深入解析Flink Local模式启动流程源码:揭开作业初始化的神秘面纱
  • vue3 el-table 行颜色根据 字段改变
  • 企业级安全实践:SSL 加密与权限管理(二)
  • python 常见数学公式函数使用详解
  • 【HarmonyOS Next之旅】DevEco Studio使用指南(三十六) -> 配置构建(三)
  • swift-17-字面量协议、模式匹配、条件编译
  • Java 21 的虚拟线程与桥接模式:构建高性能并发系统