当前位置: 首页 > news >正文

Kafka集成Flume/Spark/Flink(大数据)/SpringBoot

Kafka集成Flume

在这里插入图片描述

Flume生产者

在这里插入图片描述
③、安装Flume,上传apache-flume的压缩包.tar.gz到Linux系统的software,并解压到/opt/module目录下,并修改其名称为flume
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Flume消费者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka集成Spark

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

生产者

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

object SparkKafkaProducer{def main(args:Array[String]):Unit = {//配置信息val properties  = new Properties()properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092")properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])//创建一个生产者var producer = new KafkaProducer[String,String](properties)//发送数据for(i <- 1 to 5){producer.send(new ProducerRecord[String,String]("first","atguigu"+i))}//关闭资源producer.close()}
}

在这里插入图片描述

消费者
在这里插入图片描述

Object SparkKafkaConsumer{def main(args:Array[String]):Unit = {//初始化上下文环境val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")val ssc = new StreamingContext(conf,Seconds(3))//消费数据val kafkapara = Map[String,Object](ConsumerConfig.BOOT_STRAP_SERVERS_CONFIG->"hadoop102:9092,hadoop103:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG->"test")val kafkaDStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreFerConsistent,ConsumerStrategies.Subscribe[String,String](Set("first"),kafkapara))val valueDStream = kafkaDStream.map(record=>record.value())valueDStream.print()//执行代码,并阻塞ssc.start()ssc.awaitTermination()}
}

Kafka集成Flink

在这里插入图片描述

创建maven项目,导入以下依赖
在这里插入图片描述
resources里面添加log4j.properties文件,可以更改打印日志的级别为error
在这里插入图片描述

Flink生产者

public class FlinkafkaProducer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//准备数据源ArrayList<String> wordList = new ArrayList<>();wordList.add("hello");wordList.add("atguigu");DataStreamSource<String> stream = env.fromCollection();//创建一个kafka生产者Properties properteis = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("first",new SimpleStringSchema(),properties);//添加数据源Kafka生产者stream.addSink(kafkaProducer);//执行env.execute();}
}

在这里插入图片描述

Flink消费者

public class FlinkafkaConsumer1{public static void main(String[] args){//获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//创建一个消费者Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("first",new SimpleSStringSchema(),properties);//关联消费者和flink流env.addSource(kafkaConsumer).print();//执行env.execute();}
}

Kafka集成SpringBoot

在这里插入图片描述
在这里插入图片描述

生产者
在这里插入图片描述
在这里插入图片描述
通过浏览器发送
在这里插入图片描述
在这里插入图片描述

消费者

在这里插入图片描述

在这里插入图片描述

http://www.lqws.cn/news/72631.html

相关文章:

  • 【设计模式-3.6】结构型——桥接模式
  • React 组件异常捕获机制详解
  • 打卡第34天:MLP神经网络训练
  • 4、ubuntu系统 | 文本和目录操作函数
  • react 生命周期
  • Java 2D 图形类总结与分类
  • 自定义Shell命令行解释器
  • 数据结构哈希表总结
  • [SC]SystemC中常用的宏和小工具
  • 抛砖引玉:RadarDet4D,NuScenes数据集Radar模态目标检测第二名(即将开源)
  • uniapp-商城-77-shop(8.2-商品列表,地址信息添加,级联选择器picker)
  • 3. TypeScript 中的数据类型
  • Linux磁盘管理
  • 业务到解决方案构想
  • SQL 中的 `CASE WHEN` 如何使用?
  • 达梦数据库 Windows 系统安装教程
  • CentOS8.3+Kubernetes1.32.5+Docker28.2.2高可用集群二进制部署
  • 状态机实现文件单词统计
  • 人工智能在智能制造业中的创新应用与未来趋势
  • HealthBench医疗AI评估基准:技术路径与核心价值深度分析(上)
  • 架构师面试题整理
  • VitalInsight智能体检报告解读
  • 【深度学习-Day 21】框架入门:神经网络模型构建核心指南 (Keras PyTorch)
  • 每天总结一个html标签——a标签
  • CMake指令:string(字符串操作)
  • Linux--进程概念
  • 车载诊断架构 --- DTC消抖参数(Trip Counter DTCConfirmLimit )
  • 05-power BI高级筛选器filter与Values人工造表
  • NVM,Node.Js 管理工具
  • NodeJS全栈WEB3面试题——P4Node.js后端集成 服务端设计