Flink03-学习-套接字分词流自动写入工具

article/2025/6/8 0:23:43

上一节中通过如下命令启动服务摸来模拟Socket流。请添加图片描述
现在我们写一个ServerSocket来模拟让流自动写入不用手动操作。

pom.xml和上一节一致不需要修改

编写代码

同样适用Socket流

 // 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");

FlinkServer
继承Thread启动线程

package org.example.snow.demo3;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @author snowsong*/
public class FlinkServer extends Thread{@Overridepublic void run() {String ip = "0.0.0.0";int port = 8886;StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);word.print();try {executionEnvironment.execute("stream!");} catch (Exception e) {throw new RuntimeException(e);}}}

NumRandom
使用ServerSocket实现一个持续的流输出

package org.example.snow.demo3;import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;/*** @author snowsong*/
public class RandomNumClient extends Thread {@Overridepublic void run() {// 随机生成数字String ip = "0.0.0.0";int port = 8886;try {ServerSocket serverSocket = new ServerSocket();InetSocketAddress address = new InetSocketAddress(ip, port);// 灵活绑定服务器地址serverSocket.bind(address);// 监听并接收客户端的连接请求,有阻塞特性,当调用该方法的时候,线程会暂停执行,直到有客户端连接上来Socket accept = serverSocket.accept();// 获取输入流,读取客户端发送的数据OutputStream outputStream = accept.getOutputStream();// 包装成打印流,方便写入数据 true 自动刷新缓冲区PrintWriter printWriter = new PrintWriter(outputStream, true);Random random = new Random();// 遍历for (int i = 0; i < 10; i++) {// 生成随机数int num = random.nextInt(10) + 1;printWriter.println("随机数:" + num);System.out.println("send to flink:" + num);Thread.sleep(100);}} catch (Exception e) {throw new RuntimeException(e);}super.run();}
}

启动类

package org.example.snow.demo3;/*** @author snowsong*/
public class StartApp {public static void main(String[] args) throws Exception {RandomNumClient randomNumClient = new RandomNumClient();FlinkServer flinkServer = new FlinkServer();flinkServer.start();randomNumClient.start();}
}

运行结果

请添加图片描述


http://www.hkcw.cn/article/jhXGEIPaZM.shtml

相关文章

2022年 国内税务年鉴PDF电子版Excel

2022年 国内税务年鉴PDF电子版Excelhttps://download.csdn.net/download/2401_84585615/89784658 https://download.csdn.net/download/2401_84585615/89784658 2022年国内税务年鉴是对中国税收政策、税制改革和税务管理实践的全面总结。这份年鉴详细记录了中国税收系统的整体状…

Gitee Wiki:以知识管理赋能 DevSecOps,推动关键领域软件自主演进

关键领域软件研发中的知识管理困境 传统文档管理模式问题显著 关键领域软件研发领域&#xff0c;传统文档管理模式问题显著&#xff1a;文档存储无系统&#xff0c;查找困难&#xff0c;降低效率&#xff1b;更新不及时&#xff0c;与实际脱节&#xff0c;误导开发&#xff1…

Hadoop 3.x 伪分布式 8088端口无法访问问题处理

【Hadoop】YARN ResourceManager 启动后 8088 端口无法访问问题排查与解决(伪分布式启动Hadoop) 在配置和启动 Hadoop YARN 模块时&#xff0c;发现虽然 ResourceManager 正常启动&#xff0c;JPS 进程中也显示无误&#xff0c;但通过浏览器访问 http://主机IP:8088 时却无法打…

【最小生成树】P2573 [SCOI2012] 滑雪

题目 洛谷&#xff1a;P2573 [SCOI2012] 滑雪 分析 题目条件要点分析&#xff1a; 这道题要求 i 能到达 j 的前提是 i 、j 之间有一条连通的边并且i 的高度比 j 高。这意味着本题给出的是一个有向图。时间胶囊可以返回到上一个景点&#xff0c;可以无限使用&#xff0c;意…

2.2.2 06年T2

Stratford的两大对立力量&#xff1a;令人讽刺的居民与令人同情的公司 - 2006年考研英语Text 2精析 本文解析2006年考研英语Text 2&#xff0c;揭示Stratford小镇居民与皇家莎士比亚剧团(RSC)的深层矛盾。 一、原文与翻译 Paragraph 1&#xff1a;对立双方的形成 L1: Stratfor…

基于人工智能算法实现的AI五子棋博弈

1. 项目概述 本项目实现了一个完整的五子棋游戏系统&#xff0c;包含游戏界面、交互逻辑和人工智能对战功能。 系统采用Python语言开发&#xff0c;使用Pygame库进行图形界面渲染&#xff0c;实现了三种游戏模式&#xff1a;人人对战、人机对战和AI对战。 AI算法基于博弈树搜…

在 Ubuntu 系统上使用 Python 的 Matplotlib 库时遇到的字体缺失问题

报错问题 findfont: Font family [SimHei] not found. Falling back to DejaVu Sans. 在现实图片时尝试显示中文字符命令行报错&#xff0c;在图片中显示方框。 最终解决方案 在尝试了各种方法之后&#xff0c;在代码中添加下图中选中行&#xff0c;问题直接解决。

webstrom中git插件勾选提交部分文件时却出现提交全部问题怎么解决

原因是我有个.husky的文件制定了执行提交的时候就是提交所有的文件 修改.husky/pre-commit文件就可以啦 #!/usr/bin/env sh . "$(dirname -- "$0")/_/husky.sh"# 获取通过 WebStorm 提交的暂存文件&#xff08;仅勾选的部分&#xff09; STAGED_FILES$(gi…

KINGCMS被入侵

现象会强制跳转到 一个异常网站,请掉截图代码. 代码中包含经过混淆处理的JavaScript&#xff0c;它使用了一种技术来隐藏其真实功能。代码中使用了eval函数来执行动态生成的代码&#xff0c;这是一种常见的技术&#xff0c;恶意脚本经常使用它来隐藏其真实目的。 这段脚本会检…

CMS32M65xx/67xx系列CoreMark跑分测试

CMS32M65xx/67xx系列CoreMark跑分测试 1、参考资料准备 1.1、STM32官方跑分链接 1.2、官网链接 官方移植文档&#xff0c;如下所示&#xff0c;点击红框处-移植文档: A new whitepaper and video explain how to port CoreMark-Pro to bare-metal 1.3、测试软件git下载链接 …

Vue.js教学第十八章:Vue 与后端交互(二):Axios 拦截器与高级应用

Vue 与后端交互(二):Axios 拦截器与高级应用 在上一篇文章中,我们学习了 Axios 的基本用法,包括如何发送不同类型的 HTTP 请求以及基本的配置选项。本文将深入剖析 Axios 的拦截器功能,探讨请求拦截器和响应拦截器的作用、配置方法和应用场景,通过实例展示如何利用拦截…

【信创-k8s】海光/兆芯+银河麒麟V10离线部署k8s1.31.8+kubesphere4.1.3

❝ KubeSphere V4已经开源半年多&#xff0c;而且v4.1.3也已经出来了&#xff0c;修复了众多bug。介于V4优秀的LuBan架构&#xff0c;核心组件非常少&#xff0c;资源占用也显著降低&#xff0c;同时带来众多功能和便利性。我们决定与时俱进&#xff0c;使用1.30版本的Kubernet…

【判断酒酒花数】2022-3-31

缘由对超长正整数的处理&#xff1f; - C语言论坛 - 编程论坛 void 判断酒酒花数(_int64 n) {//缘由https://bbs.bccn.net/thread-508634-1-1.html_int64 t n; int h 0, j 0;//while (j < 3)h t % 10, t / 10, j;//整数的个位十位百位之和是其前缀while (t > 0)h t…

oauth2.0

OAuth 2.0 的工作原理和流程。 OAuth 2.0 是一个授权框架&#xff0c;它允许第三方应用获取对用户资源的有限访问权限&#xff0c;而无需获取用户的密码。以下是详细说明&#xff1a; 1. OAuth 2.0 的四个主要角色 资源所有者&#xff08;Resource Owner&#xff09; 通常是…

笔记本/台式C盘扩容:删除、压缩、跨分区与重分配—「小白教程」

删除C盘右侧分区以扩展 删除分区&#xff0c;也会删除分区中所有资料&#xff0c;请注意备份所有重要资料。 1.WinX选择磁盘管理&#xff0c;右键点击C盘右侧分区&#xff0c;选择删除卷&#xff0c;原分区会变成黑色的“未分配”空间&#xff1b; 2.此时右键C盘选择“扩展卷…

【Bluedroid】蓝牙启动之sdp_init 源码解析

本文围绕Android蓝牙协议栈中 SDP&#xff08;Service Discovery Protocol&#xff0c;服务发现协议&#xff09;模块的初始化函数sdp_init展开&#xff0c;结合核心控制块tSDP_CB及关联数据结构&#xff08;如tL2CAP_CFG_INFO、tCONN_CB等&#xff09;的定义与协作逻辑&#x…

C++学习-入门到精通【13】标准库的容器和迭代器

C学习-入门到精通【13】标准库的容器和迭代器 目录 C学习-入门到精通【13】标准库的容器和迭代器一、标准模板库简介1.容器简介2.STL容器总览3.近容器4.STL容器的通用函数5.首类容器的通用typedef6.对容器元素的要求 二、迭代器简介1.使用istream_iterator输入&#xff0c;使用…

UE5 2D角色PaperZD插件动画状态机学习笔记

UE5 2D角色PaperZD插件动画状态机学习笔记 0.安装PaperZD插件 这是插件下载安装地址 https://www.fab.com/zh-cn/listings/6664e3b5-e376-47aa-a0dd-f7bbbd5b93c0 1.右键创建PaperZD 动画序列 2.添加动画序列 3&#xff0c;右键创建PaperZD AnimBP &#xff08;动画蓝图&am…

你的台式机PCIe插槽到底是几条lane

目录 1.如何查看台式机支持的PCIe插槽的模式 2.查看台式机主板型号 3.主板PCIe插槽配置确认 4.实际模式与理论模式不匹配原因 5.解决方案 在【PCIe XDMA开发】XDMA与MIG位宽一致性要求一文中&#xff0c;我们讨论了PCIe带宽计算过程。那么实际带宽与理论计算带宽是否能够一致或…

微软PowerBI考试 PL300-Power BI 入门

Power BI 入门 上篇更新了微软PowerBI考试 PL-300学习指南&#xff0c;今天分享PowerBI入门学习内容。 简介 Microsoft Power BI 是一个完整的报表解决方案&#xff0c;通过开发工具和联机平台提供数据准备、数据可视化、分发和管理。 Power BI 可以从使用单个数据源的简单…