当前位置: 首页 > news >正文

Flink03-学习-套接字分词流自动写入工具

上一节中通过如下命令启动服务摸来模拟Socket流。请添加图片描述
现在我们写一个ServerSocket来模拟让流自动写入不用手动操作。

pom.xml和上一节一致不需要修改

编写代码

同样适用Socket流

 // 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");

FlinkServer
继承Thread启动线程

package org.example.snow.demo3;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @author snowsong*/
public class FlinkServer extends Thread{@Overridepublic void run() {String ip = "0.0.0.0";int port = 8886;StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);word.print();try {executionEnvironment.execute("stream!");} catch (Exception e) {throw new RuntimeException(e);}}}

NumRandom
使用ServerSocket实现一个持续的流输出

package org.example.snow.demo3;import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;/*** @author snowsong*/
public class RandomNumClient extends Thread {@Overridepublic void run() {// 随机生成数字String ip = "0.0.0.0";int port = 8886;try {ServerSocket serverSocket = new ServerSocket();InetSocketAddress address = new InetSocketAddress(ip, port);// 灵活绑定服务器地址serverSocket.bind(address);// 监听并接收客户端的连接请求,有阻塞特性,当调用该方法的时候,线程会暂停执行,直到有客户端连接上来Socket accept = serverSocket.accept();// 获取输入流,读取客户端发送的数据OutputStream outputStream = accept.getOutputStream();// 包装成打印流,方便写入数据 true 自动刷新缓冲区PrintWriter printWriter = new PrintWriter(outputStream, true);Random random = new Random();// 遍历for (int i = 0; i < 10; i++) {// 生成随机数int num = random.nextInt(10) + 1;printWriter.println("随机数:" + num);System.out.println("send to flink:" + num);Thread.sleep(100);}} catch (Exception e) {throw new RuntimeException(e);}super.run();}
}

启动类

package org.example.snow.demo3;/*** @author snowsong*/
public class StartApp {public static void main(String[] args) throws Exception {RandomNumClient randomNumClient = new RandomNumClient();FlinkServer flinkServer = new FlinkServer();flinkServer.start();randomNumClient.start();}
}

运行结果

请添加图片描述

http://www.lqws.cn/news/98011.html

相关文章:

  • 剑指offer14_二进制中1的个数
  • 用HTML5 Canvas打造交互式心形粒子动画:从基础到优化实战
  • 链表题解——反转链表【LeetCode】
  • (三)动手学线性神经网络:从数学原理到代码实现
  • 机器学习——主成分分析PCA
  • 数据库密码加密
  • lanqiaoOJ 1508:N皇后问题 ← dfs
  • SpringBoot项目打包成war包
  • Kdump 介绍与使用方式
  • Samtec技术支持 | 新型评估和开发套件
  • Agno:使用简单代码构建AI智能体
  • 百万级临床试验数据库TrialPanorama发布!AI助力新药研发与临床评价迎来新基石
  • MySQL - Windows 中 MySQL 禁用开机自启,并在需要时手动启动
  • 编译 Linux openssl
  • Asp.net core 使用EntityFrame Work
  • 一、基础环境配置
  • Walle-Web:打造轻量级高效的DevOps自动化部署平台
  • 【数据库】《DBA实战手记》- 读书笔记
  • centos中的ulimit命令
  • Python数据分析及可视化中常用的6个库及函数(一)
  • 【JAVA版】意象CRM客户关系管理系统+uniapp全开源
  • python调用硅基流动的视觉语言模型
  • Python基于SVM技术的手写数字识别问题项目实战
  • Vue3 + Vite:我的 Qiankun 微前端主子应用实践指南
  • 研发型企业如何面对源代码保密问题
  • one-hot编码VS对象嵌入表示
  • Java详解LeetCode 热题 100(25):LeetCode 141. 环形链表(Linked List Cycle)详解
  • 架构设计的目标:高内聚、低耦合的本质
  • 【文献精读】Explaining grokking through circuit efficiency
  • Unity 性能优化终极指南 — GameObject 篇