上一节中通过如下命令启动服务摸来模拟Socket流。
现在我们写一个ServerSocket来模拟让流自动写入不用手动操作。
pom.xml和上一节一致不需要修改
编写代码
同样适用Socket流
// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");
FlinkServer
继承Thread启动线程
package org.example.snow.demo3;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @author snowsong*/
public class FlinkServer extends Thread{@Overridepublic void run() {String ip = "0.0.0.0";int port = 8886;StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);word.print();try {executionEnvironment.execute("stream!");} catch (Exception e) {throw new RuntimeException(e);}}}
NumRandom
使用ServerSocket实现一个持续的流输出
package org.example.snow.demo3;import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;/*** @author snowsong*/
public class RandomNumClient extends Thread {@Overridepublic void run() {// 随机生成数字String ip = "0.0.0.0";int port = 8886;try {ServerSocket serverSocket = new ServerSocket();InetSocketAddress address = new InetSocketAddress(ip, port);// 灵活绑定服务器地址serverSocket.bind(address);// 监听并接收客户端的连接请求,有阻塞特性,当调用该方法的时候,线程会暂停执行,直到有客户端连接上来Socket accept = serverSocket.accept();// 获取输入流,读取客户端发送的数据OutputStream outputStream = accept.getOutputStream();// 包装成打印流,方便写入数据 true 自动刷新缓冲区PrintWriter printWriter = new PrintWriter(outputStream, true);Random random = new Random();// 遍历for (int i = 0; i < 10; i++) {// 生成随机数int num = random.nextInt(10) + 1;printWriter.println("随机数:" + num);System.out.println("send to flink:" + num);Thread.sleep(100);}} catch (Exception e) {throw new RuntimeException(e);}super.run();}
}
启动类
package org.example.snow.demo3;/*** @author snowsong*/
public class StartApp {public static void main(String[] args) throws Exception {RandomNumClient randomNumClient = new RandomNumClient();FlinkServer flinkServer = new FlinkServer();flinkServer.start();randomNumClient.start();}
}