Netty学习example示例

article/2025/6/25 21:58:52

在这里插入图片描述

文章目录

  • simple
    • Server端
      • NettyServer
      • NettyServerHandler
    • Client端
      • NettyClient
      • NettyClientHandler
  • tcp(粘包和拆包)
    • Server端
      • NettyTcpServer
      • NettyTcpServerHandler
    • Client端
      • NettyTcpClient
      • NettyTcpClientHandler
  • protocol
    • codec
      • CustomMessageDecoder
      • CustomMessageEncoder
    • server端
      • ProtocolServer
      • ProtocolServerHandler
    • client端
      • ProtocolClient
      • ProtocolClientHandler
  • http
    • Server端
      • HttpServer
      • HttpServerHandler
    • Client端
      • HttpClient
      • HttpClientHandler
  • ws
    • Server端
      • WsServer
      • WsServerHandler
    • Client端
      • WsClient
      • WebSocketClientHandler
  • protobuf
    • Server端
      • NettyServer
      • NettyServerHandler
      • Student.proto
    • Client端
      • NettyClient
      • NettyClientHandler

simple

Server端

NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;@Slf4j
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();log.info("server stop");} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务端接收到客户端数据:{}", msg);ctx.writeAndFlush("服务端收到客户端的数据: " + msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【NettyServerHandler->userEventTriggered】: {}", evt);}public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught异常:", cause);ctx.close();}public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded:{}", ctx.channel().remoteAddress());}public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved:{}", ctx.channel().remoteAddress());}public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("channelRegistered:{}", ctx.channel().remoteAddress());}public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("channelUnregistered:{}", ctx.channel().remoteAddress());}public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("客户端连接:{}", ctx.channel().remoteAddress());}public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("客户端断开连接:{}", ctx.channel().remoteAddress());}}

Client端

NettyClient

@Slf4j
public class NettyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);Channel channel = connectFuture.sync().channel();System.out.println("客户端连接成功");Scanner sc = new Scanner(System.in);while (true) {System.out.println("请输入内容: ");String line = sc.nextLine();if (line == null || line.isEmpty()) {continue;} else if ("exit".equals(line)) {channel.close();break;}channel.writeAndFlush(line);}channel.closeFuture().sync();System.out.println("客户端关闭");} catch (Exception e) {log.error("客户端发生异常: ", e);}}}

NettyClientHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info( "【NettyClientHandler->channelRead】: {}", msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info( "异常: {}", cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelActive】: {}", ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelInactive】: {}", ctx);}
}

tcp(粘包和拆包)

Server端

NettyTcpServer

@Slf4j
public class NettyTcpServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyTcpServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind("127.0.0.1", 9090);Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

NettyTcpServerHandler

@Slf4j
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);String content = new String(bytes, StandardCharsets.UTF_8);log.info("服务端接收到的数据字节长度为:{}, 内容为: {}", bytes.length, content);ByteBuf buf = Unpooled.copiedBuffer(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(buf);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("异常: {}", cause.getMessage());ctx.close();}
}

Client端

NettyTcpClient

@Slf4j
public class NettyTcpClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyTcpClientHandler());}});ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));Channel channel = connectFuture.sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("客户端发生异常: ", e);} finally {group.shutdownGracefully();}}}

NettyTcpClientHandler

@Slf4j
public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;log.info("客户端接收到数据:{}", byteBuf.toString(StandardCharsets.UTF_8));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {/*粘包:1. 这里连续发送10次byteBuf,发现服务端有可能1次就全部接收了,也有可能3次接受了,也有可能4次接收了,这是不确定的,这也就意味着基于底层NIO的tcp的数据传输 是基于流式传输的,会出现粘包的问题。2. 因此服务端必须 自行处理粘包问题,区分消息边界3. 这里测试的时候,可以多启动几个客户端来观察4. 这里示例的粘包示例与上面simple的区别在于:这里是在短时间内连续发送*//*for (int i = 0; i < 10; i++) {ByteBuf byteBuf = Unpooled.copiedBuffer(("hello, server " + i).getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(byteBuf);}*//*拆包:1. 这里1次发送了1个10000字节长的数据,而服务端分多次收到,有可能是2次,有可能是1次, 这是不确定的,2. 假设真实数据包就有这么长,那么服务端可能需要分多次才能接收到完整的数据包,3. 同时,我们发现总的数据长度服务端都接收到了,这说明底层NIO的tcp的数据传输 是可靠的4. 1条比较长的消息,服务端分多次才能收到,所以服务端需要解决拆包的问题,将多次接收到的消息转为1条完整的消息5. 这里示例的拆包示例与上面simple的区别在于:这里1次发送的消息数据很长*/StringBuilder sb = new StringBuilder();for (int i = 0; i < 1000; i++) {sb.append("Netty拆包示例|");}ctx.writeAndFlush(Unpooled.copiedBuffer(sb.toString().getBytes(StandardCharsets.UTF_8)));log.info("客户端发送数据长度:{}", sb.toString().length());/* 拆包 与 粘包 的核心问题就是 tcp是流式传输的,tcp可以保证数据可靠传输,但需要对方在接收时需要能区分出消息边界,从而获取1条完整的消息 */}}

protocol

codec

使用自定义协议,编解码器,识别消息边界,处理粘包和拆包问题

CustomMessageDecoder

public class CustomMessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int len = in.readInt();if (in.readableBytes() < len) {in.resetReaderIndex();return;}byte[] bytes = new byte[len];in.readBytes(bytes);out.add(CustomMessage.builder().len(len).content(bytes).build());}
}

CustomMessageEncoder

public class CustomMessageEncoder extends MessageToByteEncoder<CustomMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {out.writeInt(msg.getLen());out.writeBytes(msg.getContent());}
}

server端

ProtocolServer

@Slf4j
public class ProtocolServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new CustomMessageDecoder());pipeline.addLast(new CustomMessageEncoder());pipeline.addLast(new ProtocolServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 9090));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}log.info("server stop");}}

ProtocolServerHandler

@Slf4j
public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 这里直接转, 如果不能转的话, 就说明前面的解码有问题CustomMessage customMessage = (CustomMessage) msg;log.info("服务端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));// 将消息回过去(需要加上对应的编码器)ctx.writeAndFlush(customMessage);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("ProtocolServerHandler异常: {}", cause.getMessage());ctx.close();}
}

client端

ProtocolClient

@Slf4j
public class ProtocolClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new CustomMessageEncoder());pipeline.addLast(new CustomMessageDecoder());pipeline.addLast(new ProtocolClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("localhost", 9090);Channel channel = connectFuture.sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.info("client error", e);} finally {group.shutdownGracefully();}}}

ProtocolClientHandler

@Slf4j
public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 这里直接转, 如果不能转的话, 就说明前面的解码有问题CustomMessage customMessage = (CustomMessage) msg;log.info("客户端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));}@Overridepublic void channelActive(ChannelHandlerContext ctx) {for (int i = 1; i <= 20; i++) {byte[] bytes = ("hello, server " + i).getBytes(StandardCharsets.UTF_8);CustomMessage message = CustomMessage.builder().content(bytes).len(bytes.length).build();ctx.writeAndFlush(message);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("【ProtocolClientHandler->exceptionCaught】: {}", cause.getMessage());}
}

http

Server端

HttpServer

@Slf4j
public class HttpServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler("【服务端主】")).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("loggingHandler", new LoggingHandler("【服务端从】"));pipeline.addLast("httpServerCodec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));pipeline.addLast("httpServerHandler", new HttpServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080));channelFuture.sync();log.info("http服务器启动成功, 您可以访问: http://localhost:8080/test");channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("服务端发生异常: ", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

HttpServerHandler

@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {log.info("【HttpServerHandler->处理】:{}", msg);if (msg instanceof FullHttpRequest) {FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;String uri = fullHttpRequest.uri();log.info("【uri】:{}", uri);HttpMethod method = fullHttpRequest.method();log.info("【method】:{}", method);// 响应回去byte[] bytes = ("服务器收到时间" + LocalDateTime.now()).getBytes(StandardCharsets.UTF_8);DefaultFullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,Unpooled.copiedBuffer(bytes));fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, bytes.length);fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");ChannelPromise promise = ctx.newPromise();promise.addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {log.info("操作完成");log.info("isDone: {}", future.isDone());log.info("isSuccess: {}", future.isSuccess());log.info("isCancelled: {}", future.isCancelled());log.info("hasException: {}", future.cause() != null, future.cause());}});ctx.writeAndFlush(fullHttpResponse, promise);log.info("刚刚写完");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.error("【HttpServerHandler->userEventTriggered】:{}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("【HttpServerHandler->exceptionCaught】", cause);}public void channelRegistered(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelRegistered】");}public void channelUnregistered(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelUnregistered】");}public void handlerAdded(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->handlerAdded】");}public void handlerRemoved(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->handlerRemoved】");}public void channelActive(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelActive】");}public void channelInactive(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelInactive】");}}

Client端

HttpClient

@Slf4j
public class HttpClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("loggingHandler", new LoggingHandler(LogLevel.DEBUG));pipeline.addLast("httpClientCodec", new HttpClientCodec());pipeline.addLast("", new HttpObjectAggregator(10 * 1024));pipeline.addLast("httpClientHandler", new HttpClientHandler());}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080);channelFuture.sync();Channel channel = channelFuture.channel();sendGetRequest(channel);// 等待通道关闭channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("客户端发生异常: ", e);} finally {// 遇到问题, 调用此方法后客户端没有正常关闭, 将netty版本4.1.20.FINAL切换到4.1.76.FINAL即可group.shutdownGracefully();log.info("关闭group-finally");}log.info("客户端执行完毕");}private static void sendGetRequest(Channel channel) throws URISyntaxException {String url = "http://localhost:8080/test"; // 测试URLURI uri = new URI(url);String host = uri.getHost();String path = uri.getRawPath() + (uri.getRawQuery() == null ? "" : "?" + uri.getRawQuery());// 构建HTTP请求FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,HttpMethod.GET,path,Unpooled.EMPTY_BUFFER);request.headers().set(HttpHeaderNames.HOST, host).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE).set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);// 发送请求ChannelFuture channelFuture = channel.writeAndFlush(request);log.info("Request sent: " + request);}}

HttpClientHandler

@Slf4j
public class HttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse response) {// 处理响应log.info("处理响应, 响应头: {}", response.headers().toString());log.info("处理响应, 响应体: {}", response.content().toString(CharsetUtil.UTF_8));// 关闭连接ctx.channel().close();log.info("关闭连接");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info( "异常: {}", cause.getMessage());ctx.close();}
}

ws

Server端

WsServer

@Slf4j
public class WsServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("httpServerCodec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));WebSocketServerProtocolConfig config = WebSocketServerProtocolConfig.newBuilder().websocketPath("/ws").checkStartsWith(true).build();pipeline.addLast("wsProtocolHandler", new WebSocketServerProtocolHandler(config));pipeline.addLast("wsServerHandler", new WsServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 9090);channelFuture.sync();log.info("ws服务启动成功");channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.error("服务端发生异常: ", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}log.info("ws服务关闭");}}

WsServerHandler

@Slf4j
public class WsServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private static Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();private static AttributeKey<String> ATTRIBUTE_KEY_TOKEN = AttributeKey.valueOf("token");private static AttributeKey<Boolean> ATTRIBUTE_KEY_REPEAT = AttributeKey.valueOf("repeat");@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) throws Exception {log.info("【WsServerHandler->处理】:{}", webSocketFrame);if (webSocketFrame instanceof TextWebSocketFrame) {TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;log.info("【textWebSocketFrame.text()】:{}", textWebSocketFrame.text());sendAll(ctx.channel(), textWebSocketFrame.text());}}private void sendAll(Channel channel, String text) {CHANNELS.forEach((token, ch) -> {if (channel != ch) {ch.writeAndFlush(new TextWebSocketFrame(text));}});}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【WsServerHandler->userEventTriggered】: {}", evt);if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String requestUri = handshakeComplete.requestUri();String subprotocol = handshakeComplete.selectedSubprotocol();log.info("【requestUri】:{}", requestUri);log.info("【subprotocol】:{}", subprotocol);handleAuth(requestUri, ctx);}}private void handleAuth(String requestUri, ChannelHandlerContext ctx) {try {Map<String, String> queryParams = getQueryParams(requestUri);String token = queryParams.get("token");log.info("【token】:{}", token);if (token == null) {ctx.close();log.info("token为空, 关闭channel");} else {ctx.channel().attr(ATTRIBUTE_KEY_TOKEN).set(token);Channel oldChannel = CHANNELS.put(token, ctx.channel());if (oldChannel != null) {oldChannel.attr(ATTRIBUTE_KEY_REPEAT).set(true);oldChannel.close();} else {sendAll(ctx.channel(), "欢迎" + token + "进入聊天室");}}} catch (Exception e) {ctx.close();}}private static Map<String, String> getQueryParams(String requestUri) throws URISyntaxException {URI uri = new URI(requestUri);String query = uri.getQuery();Map<String, String> queryParams = new HashMap<>();if (query != null) {String[] params = query.split("&");for (String param : params) {String[] keyValue = param.split("=");String key = keyValue[0];String value = keyValue.length > 1 ? keyValue[1] : "";queryParams.put(key, value);}}return queryParams;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("【WsServerHandler->exceptionCaught】", cause);}public void handlerAdded(ChannelHandlerContext ctx) {log.info("【WsServerHandler->handlerAdded】");}public void handlerRemoved(ChannelHandlerContext ctx) {log.info("【WsServerHandler->handlerRemoved】");}public void channelRegistered(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelRegistered】");}public void channelUnregistered(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelUnregistered】");}public void channelActive(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelActive】");}public void channelInactive(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelInactive】");Channel channel = ctx.channel();Boolean isRepeat = channel.attr(ATTRIBUTE_KEY_REPEAT).get() != null&& channel.attr(ATTRIBUTE_KEY_REPEAT).get();if (!isRepeat) {CHANNELS.computeIfPresent(ctx.attr(ATTRIBUTE_KEY_TOKEN).get(), (key, ch) -> {CHANNELS.remove(channel.attr(ATTRIBUTE_KEY_TOKEN));sendAll(channel, channel.attr(ATTRIBUTE_KEY_TOKEN).get() + "离开聊天室");return null;});}}}

Client端

WsClient

@Slf4j
public class WsClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {CountDownLatch connectLatch = new CountDownLatch(1);Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpClientCodec());pipeline.addLast(new HttpObjectAggregator(10 * 1024));WebSocketClientProtocolConfig config = WebSocketClientProtocolConfig.newBuilder().handleCloseFrames(false).build();WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:9090/ws/1?token=abc"),WebSocketVersion.V13,null,true,new DefaultHttpHeaders());pipeline.addLast(new WebSocketClientProtocolHandler(webSocketClientHandshaker, config));pipeline.addLast(new WebSocketClientHandler(connectLatch));}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090);Channel channel = channelFuture.channel();channelFuture.addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {System.err.println("Connection failed: " + future.cause());connectLatch.countDown(); // 确保不会死等}});// 等待连接完成(带超时)if (!connectLatch.await(10, TimeUnit.SECONDS)) {throw new RuntimeException("Connection timed out");}Scanner sc = new Scanner(System.in);while (true) {System.out.print("请输入:");String line = sc.nextLine();if (StringUtil.isNullOrEmpty(line)) {continue;}if ("exit".equals(line)) {channel.close();break;} else {// 发送消息WebSocketFrame frame = new TextWebSocketFrame(line);channelFuture.channel().writeAndFlush(frame);}}channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("客户端发生异常: ", e);} finally {group.shutdownGracefully();}}}

WebSocketClientHandler

@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private CountDownLatch connectLatch;public WebSocketClientHandler(CountDownLatch connectLatch) {this.connectLatch = connectLatch;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {// 处理接收到的WebSocket帧if (frame instanceof TextWebSocketFrame) {String text = ((TextWebSocketFrame) frame).text();System.out.println("Received: " + text);} else if (frame instanceof PingWebSocketFrame) {// 响应Ping帧ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));System.out.println("Responded to ping");} else if (frame instanceof CloseWebSocketFrame) {System.out.println("Received close frame");ctx.close();} else if (frame instanceof BinaryWebSocketFrame) {System.out.println("Received binary data: " + frame.content().readableBytes() + " bytes");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 处理握手完成事件if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {System.out.println("WebSocket handshake complete event");// 握手完成后可以发送初始消息connectLatch.countDown();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.println("WebSocket error: ");cause.printStackTrace();ctx.close();}}

protobuf

Server端

NettyServer

@Slf4j
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));pipeline.addLast(new NettyServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();log.info("server stop");} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerHandler

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务端接收到客户端数据:{}", msg);if (msg instanceof StudentPOJO.Student) {StudentPOJO.Student student = (StudentPOJO.Student) msg;log.info( "客户端发送的数据:{}, {}, {}", student, student.getId(), student.getName());}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【NettyServerHandler->userEventTriggered】: {}", evt);}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught异常:", cause);ctx.close();}public void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded:{}", ctx.channel().remoteAddress());}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved:{}", ctx.channel().remoteAddress());}public void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info("channelRegistered:{}", ctx.channel().remoteAddress());}public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info("channelUnregistered:{}", ctx.channel().remoteAddress());}public void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("channelActive:{}", ctx.channel().remoteAddress());}public void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("channelInactive:{}", ctx.channel().remoteAddress());}}

Student.proto

syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值string name = 2;
}
// 执行命令 protoc.exe --java_out=生成路径 Student.proto路径

Client端

NettyClient

@Slf4j
public class NettyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));pipeline.addLast(new NettyClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);Channel channel = connectFuture.sync().channel();log.info("客户端连接成功");channel.closeFuture().sync();log.info("客户端关闭");} catch (Exception e) {log.error("客户端发生异常: ", e);}}}

NettyClientHandler

@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info( "【NettyClientHandler->channelRead】: {}", msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info( "异常: {}", cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelActive】: {}", ctx);StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1).setName("张三san").build();ctx.writeAndFlush(student);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelInactive】: {}", ctx);}
}

http://www.hkcw.cn/article/HjZJacVMUY.shtml

相关文章

Linux系统精准定位创建句柄的进程

在Linux系统中&#xff0c;可以通过以下方法精准定位创建句柄的进程&#xff1a; &#x1f50d; 一、核心排查命令 ​​lsof 命令​​ ​​查看所有进程的句柄占用​​&#xff1a; lsof | awk {print $1, $2} | sort | uniq -c | sort -nr | head -n 20 ​​输出说明​​&…

ASP.NET Core OData 实践——Lesson8增删改查单值类型Property(C#)

大纲 支持的接口主要模型设计控制器设计数据源查询(GET)查询基类类型Entity的基础类型属性的值查询派生类型Entity的基础类型属性值查询基类类型Entity的派生类型属性值查询派生类型Entity的派生类型属性值 完整更新(PUT)完整更新基类类型Entity的基础类型属性值完整更新派生类…

(LeetCode 每日一题)135. 分发糖果 ( 贪心 )

题目&#xff1a;135. 分发糖果 思路&#xff1a;贪心两遍循环&#xff0c;时间复杂度0(n)。 在满足所有人都有一个糖果的情况下&#xff0c;进行两遍循环 第一遍循环&#xff1a;从左到右&#xff0c;满足当ratings[i]>ratings[i-1]时&#xff0c;v[i]v[i-1]1 第二遍循环&a…

DAX权威指南6:DAX 高级概念(扩展表)、DAX 计算常见优化

文章目录 十四、 DAX 高级概念14.1 扩展表14.1.1 扩展表的定义14.1.2 表扩展与双向过滤14.1.3 筛选上下文传播14.1.4 RELATED 和 LOOKUPVALUE14.1.5 扩展表结构在表定义时就已经确定 14.2 表筛选和列筛选14.2.1 表筛选和列筛选14.2.1.1 DAX筛选机制 14.2.2 ALL函数的真实含义14…

selenium-自动更新谷歌浏览器驱动

1、简介 selenium最初是一个自动化测试工具&#xff0c;而爬虫中使用它主要是为了解决requests无法直接执行JavaScript代码的问题&#xff0c;因为有些网页数据是通过JavaScript动态加载的。selenium本质是通过驱动浏览器&#xff0c;完全模拟浏览器的操作&#xff0c;比如输入…

高效视频倍速播放插件推荐

软件介绍 本文介绍一款名为Global Speed的视频速度控制插件&#xff0c;该插件在插件市场评分极高&#xff0c;被公认为目前最好用的视频倍速插件之一。 插件安装与基本功能 安装Global Speed插件后&#xff0c;用户只需点击插件图标即可选择播放倍数&#xff0c;最高支持16…

60、Polly瞬态故障处理

Polly 是一个.NET 弹性和瞬态故障处理库&#xff0c;在分布式系统和微服务架构中&#xff0c;可有效处理网络不稳定、服务依赖故障或资源限制等引发的瞬态故障&#xff0c;保障系统的稳定性和可靠性&#xff0c;以下是其常见策略及应用说明&#xff1a; 核心策略 1.重试策略 …

vulnyx loweb writeup

信息收集 arp-scan nmap 这里也可以接其它选项进行深入的扫描&#xff0c;因为我是打完后再写的wp&#xff0c;第一次做的时候我是扫了的&#xff0c;但是我没有得到什么有用的信息&#xff0c;所以写wp的时候我就没放出来了。这里直接去web 获取userFlag 一个默认的apache2 …

学员匿名举报教练骚扰后怀疑店长泄密:这两个教练在找她!

学员匿名举报教练骚扰后怀疑店长泄密。小孙反映,她感觉被健身房的两位教练骚扰了,她给记者看了聊天记录,对方曾说“从你身上让我能感受到你无比强大的力量”,“果然是顶级白月光”。最近让她倍感困扰的是,店长把她匿名举报的事告诉了涉事教练,导致这两个教练在找她,她要…

距2025高考还有4天 家长准备了什么礼物?

距2025高考还有4天 家长准备了什么礼物?!今天已经是6月2号,距离高考只剩下4天。每年高考结束后,许多高三学生都迫不及待地想要收到家长为他们准备的礼物,比如手机或电脑。高中三年对学生来说确实很辛苦,无论是在身体上还是精神上都付出了很多。高考结束后,学生们确实需要…

深圳天气 六一“不下雨通知”逗乐众人

深圳天气 六一“不下雨通知”逗乐众人!上海入汛首日遭遇了持续整天的大雨。截至18时,累计降水量显示普降大雨,市区徐家汇站在下午三点前几乎未曾停歇。雨雾和低云导致垂直能见度极低,全市最高气温仅在18-20℃之间,许多市民穿起了长袖甚至羊毛衫,这种天气让全国网友感到惊…

男子迷路拒绝救援后又求助 自信误判险酿祸

5月31日端午节,在北京房山一处野山中,一名男子登山迷路。他给警方打电话询问下山道路,警方随后联系了房山蓝天救援队。晚上8点多,当救援队员询问男子详细信息时,男子表示不想麻烦救援队,称自己能找到路下山。尽管如此,为了安全起见,房山蓝天救援队还是启动了救援程序。…

印度一游客摸老虎自拍遭袭击 触摸禁忌区引攻击

泰国普吉岛知名观光景点“老虎王国”近日发生了一起惊险事件。一名印度游客在与老虎合影时,因触摸老虎遭到攻击,现场画面在社交媒体上引发了广泛关注。该景点以“一生仅有一次的与虎互动体验”为卖点,吸引了众多游客。事发时,这名游客手持链条与老虎并排站立,驯兽师正用棍…

李亚鹏宣布将幼儿园无偿移交 为社会做贡献

6月1日,知名演员李亚鹏现身北京培德书院幼儿园六一活动。在活动现场,他宣布将把培德书院幼儿园无偿移交给一位资深教育家管理。他表示,人来到这个世界上总要为社会做点什么,这与个人财富无关,而是个人的价值观。培德书院幼儿园由李亚鹏于2011年前后创办,定位高端民办教育…

男子为省30元钱不幸离世 高原缺氧悲剧引发关注

46岁的河南卡车司机常志荣近日在青藏线因高原缺氧离世。他的骨灰及车辆由多名爱心司机跨越2400多公里,从五道梁地区送回老家安阳林州。其中一位司机表示:“不让家属承担一切费用,中国人就该互相帮助”。青海五道梁地区海拔4665米,含氧量不足海平面50%。5月27日,常志荣在此…

Spring框架学习day6--事务管理

Spring事务管理 Spring事务管理是在AOP的基础上&#xff0c;当我们的方法完全执行成功后&#xff0c;再提交事务&#xff0c;如果方法中有异常&#xff0c;就不提交事务 Spring中的事务管理有两种方式&#xff1a; ​ 1.编程式事务 ​ 需要我们在业务代码中手动提交 ​ 2.声明式…

【C盘瘦身】Docker安装目录占用C盘过大,一键移动给C盘瘦身

文章目录 前言一、Docker移动3步骤1. 进入docker的设置页面2. 点击“Browse”&#xff0c;选择D盘新建的目标目录3. 点击“Apply & restart”&#xff0c;选择Yes4. 移动完毕 二、结果检查 前言 最近安装了Dify&#xff0c;由于是Docker安装&#xff0c;不想安装完后&…

百度golang研发一面面经

输入一个网址&#xff0c;到显示界面&#xff0c;中间的过程是怎样的 IP 报文段的结构是什么 Innodb 的底层结构 知道几种设计模式 工厂模式 简单工厂模式&#xff1a;根据传入类型参数判断创建哪种类型对象工厂方法模式&#xff1a;由子类决定实例化哪个类抽象工厂模式&#…

CTF:网络安全的实战演练场

文章目录 每日一句正能量前言一、CTF简介&#xff08;一&#xff09;什么是CTF&#xff1f;&#xff08;二&#xff09;CTF的历史 二、CTF比赛形式&#xff08;一&#xff09;线上赛&#xff08;Online CTF&#xff09;&#xff08;二&#xff09;线下赛&#xff08;Offline CT…

Javaweb学习——day1(JavaWeb 介绍与开发环境搭建)

文章目录 1. 什么是 JavaWeb&#xff1f;2. HTTP 协议与请求响应流程2.1 HTTP 概述&#xff1a;2.2 请求响应流程&#xff1a;2.3 常见 HTTP 方法&#xff1a; 3. 开发环境搭建3.1 安装 Tomcat3.1.1 我的是win11 64位版&#xff0c;选择下载如图&#xff1a;3.1.2 解压 Tomcat3…