Netty学习example示例(含官方示例代码)

article/2025/6/17 6:37:30

Netty 官方示例代码 已clone至gitee

文章目录

  • simple
    • Server端
      • NettyServer
      • NettyServerHandler
    • Client端
      • NettyClient
      • NettyClientHandler
  • tcp(粘包和拆包)
    • Server端
      • NettyTcpServer
      • NettyTcpServerHandler
    • Client端
      • NettyTcpClient
      • NettyTcpClientHandler
  • protocol
    • codec
      • CustomMessageDecoder
      • CustomMessageEncoder
    • server端
      • ProtocolServer
      • ProtocolServerHandler
    • client端
      • ProtocolClient
      • ProtocolClientHandler
  • http
    • Server端
      • HttpServer
      • HttpServerHandler
    • Client端
      • HttpClient
      • HttpClientHandler
  • ws
    • Server端
      • WsServer
      • WsServerHandler
    • Client端
      • WsClient
      • WebSocketClientHandler
  • protobuf
    • Server端
      • NettyServer
      • NettyServerHandler
      • Student.proto
    • Client端
      • NettyClient
      • NettyClientHandler

在这里插入图片描述

simple

Server端

NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;@Slf4j
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();log.info("server stop");} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务端接收到客户端数据:{}", msg);ctx.writeAndFlush("服务端收到客户端的数据: " + msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【NettyServerHandler->userEventTriggered】: {}", evt);}public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught异常:", cause);ctx.close();}public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded:{}", ctx.channel().remoteAddress());}public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved:{}", ctx.channel().remoteAddress());}public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("channelRegistered:{}", ctx.channel().remoteAddress());}public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("channelUnregistered:{}", ctx.channel().remoteAddress());}public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("客户端连接:{}", ctx.channel().remoteAddress());}public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("客户端断开连接:{}", ctx.channel().remoteAddress());}}

Client端

NettyClient

@Slf4j
public class NettyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);Channel channel = connectFuture.sync().channel();System.out.println("客户端连接成功");Scanner sc = new Scanner(System.in);while (true) {System.out.println("请输入内容: ");String line = sc.nextLine();if (line == null || line.isEmpty()) {continue;} else if ("exit".equals(line)) {channel.close();break;}channel.writeAndFlush(line);}channel.closeFuture().sync();System.out.println("客户端关闭");} catch (Exception e) {log.error("客户端发生异常: ", e);}}}

NettyClientHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info( "【NettyClientHandler->channelRead】: {}", msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info( "异常: {}", cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelActive】: {}", ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelInactive】: {}", ctx);}
}

tcp(粘包和拆包)

Server端

NettyTcpServer

@Slf4j
public class NettyTcpServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyTcpServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind("127.0.0.1", 9090);Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

NettyTcpServerHandler

@Slf4j
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);String content = new String(bytes, StandardCharsets.UTF_8);log.info("服务端接收到的数据字节长度为:{}, 内容为: {}", bytes.length, content);ByteBuf buf = Unpooled.copiedBuffer(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(buf);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("异常: {}", cause.getMessage());ctx.close();}
}

Client端

NettyTcpClient

@Slf4j
public class NettyTcpClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyTcpClientHandler());}});ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));Channel channel = connectFuture.sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("客户端发生异常: ", e);} finally {group.shutdownGracefully();}}}

NettyTcpClientHandler

@Slf4j
public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;log.info("客户端接收到数据:{}", byteBuf.toString(StandardCharsets.UTF_8));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {/*粘包:1. 这里连续发送10次byteBuf,发现服务端有可能1次就全部接收了,也有可能3次接受了,也有可能4次接收了,这是不确定的,这也就意味着基于底层NIO的tcp的数据传输 是基于流式传输的,会出现粘包的问题。2. 因此服务端必须 自行处理粘包问题,区分消息边界3. 这里测试的时候,可以多启动几个客户端来观察4. 这里示例的粘包示例与上面simple的区别在于:这里是在短时间内连续发送*//*for (int i = 0; i < 10; i++) {ByteBuf byteBuf = Unpooled.copiedBuffer(("hello, server " + i).getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(byteBuf);}*//*拆包:1. 这里1次发送了1个10000字节长的数据,而服务端分多次收到,有可能是2次,有可能是1次, 这是不确定的,2. 假设真实数据包就有这么长,那么服务端可能需要分多次才能接收到完整的数据包,3. 同时,我们发现总的数据长度服务端都接收到了,这说明底层NIO的tcp的数据传输 是可靠的4. 1条比较长的消息,服务端分多次才能收到,所以服务端需要解决拆包的问题,将多次接收到的消息转为1条完整的消息5. 这里示例的拆包示例与上面simple的区别在于:这里1次发送的消息数据很长*/StringBuilder sb = new StringBuilder();for (int i = 0; i < 1000; i++) {sb.append("Netty拆包示例|");}ctx.writeAndFlush(Unpooled.copiedBuffer(sb.toString().getBytes(StandardCharsets.UTF_8)));log.info("客户端发送数据长度:{}", sb.toString().length());/* 拆包 与 粘包 的核心问题就是 tcp是流式传输的,tcp可以保证数据可靠传输,但需要对方在接收时需要能区分出消息边界,从而获取1条完整的消息 */}}

protocol

codec

使用自定义协议,编解码器,识别消息边界,处理粘包和拆包问题

CustomMessageDecoder

public class CustomMessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int len = in.readInt();if (in.readableBytes() < len) {in.resetReaderIndex();return;}byte[] bytes = new byte[len];in.readBytes(bytes);out.add(CustomMessage.builder().len(len).content(bytes).build());}
}

CustomMessageEncoder

public class CustomMessageEncoder extends MessageToByteEncoder<CustomMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {out.writeInt(msg.getLen());out.writeBytes(msg.getContent());}
}

server端

ProtocolServer

@Slf4j
public class ProtocolServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new CustomMessageDecoder());pipeline.addLast(new CustomMessageEncoder());pipeline.addLast(new ProtocolServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 9090));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}log.info("server stop");}}

ProtocolServerHandler

@Slf4j
public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 这里直接转, 如果不能转的话, 就说明前面的解码有问题CustomMessage customMessage = (CustomMessage) msg;log.info("服务端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));// 将消息回过去(需要加上对应的编码器)ctx.writeAndFlush(customMessage);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("ProtocolServerHandler异常: {}", cause.getMessage());ctx.close();}
}

client端

ProtocolClient

@Slf4j
public class ProtocolClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new CustomMessageEncoder());pipeline.addLast(new CustomMessageDecoder());pipeline.addLast(new ProtocolClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("localhost", 9090);Channel channel = connectFuture.sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.info("client error", e);} finally {group.shutdownGracefully();}}}

ProtocolClientHandler

@Slf4j
public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 这里直接转, 如果不能转的话, 就说明前面的解码有问题CustomMessage customMessage = (CustomMessage) msg;log.info("客户端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));}@Overridepublic void channelActive(ChannelHandlerContext ctx) {for (int i = 1; i <= 20; i++) {byte[] bytes = ("hello, server " + i).getBytes(StandardCharsets.UTF_8);CustomMessage message = CustomMessage.builder().content(bytes).len(bytes.length).build();ctx.writeAndFlush(message);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("【ProtocolClientHandler->exceptionCaught】: {}", cause.getMessage());}
}

http

Server端

HttpServer

@Slf4j
public class HttpServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler("【服务端主】")).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("loggingHandler", new LoggingHandler("【服务端从】"));pipeline.addLast("httpServerCodec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));pipeline.addLast("httpServerHandler", new HttpServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080));channelFuture.sync();log.info("http服务器启动成功, 您可以访问: http://localhost:8080/test");channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("服务端发生异常: ", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

HttpServerHandler

@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {log.info("【HttpServerHandler->处理】:{}", msg);if (msg instanceof FullHttpRequest) {FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;String uri = fullHttpRequest.uri();log.info("【uri】:{}", uri);HttpMethod method = fullHttpRequest.method();log.info("【method】:{}", method);// 响应回去byte[] bytes = ("服务器收到时间" + LocalDateTime.now()).getBytes(StandardCharsets.UTF_8);DefaultFullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,Unpooled.copiedBuffer(bytes));fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, bytes.length);fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");ChannelPromise promise = ctx.newPromise();promise.addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {log.info("操作完成");log.info("isDone: {}", future.isDone());log.info("isSuccess: {}", future.isSuccess());log.info("isCancelled: {}", future.isCancelled());log.info("hasException: {}", future.cause() != null, future.cause());}});ctx.writeAndFlush(fullHttpResponse, promise);log.info("刚刚写完");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.error("【HttpServerHandler->userEventTriggered】:{}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("【HttpServerHandler->exceptionCaught】", cause);}public void channelRegistered(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelRegistered】");}public void channelUnregistered(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelUnregistered】");}public void handlerAdded(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->handlerAdded】");}public void handlerRemoved(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->handlerRemoved】");}public void channelActive(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelActive】");}public void channelInactive(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelInactive】");}}

Client端

HttpClient

@Slf4j
public class HttpClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("loggingHandler", new LoggingHandler(LogLevel.DEBUG));pipeline.addLast("httpClientCodec", new HttpClientCodec());pipeline.addLast("", new HttpObjectAggregator(10 * 1024));pipeline.addLast("httpClientHandler", new HttpClientHandler());}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080);channelFuture.sync();Channel channel = channelFuture.channel();sendGetRequest(channel);// 等待通道关闭channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("客户端发生异常: ", e);} finally {// 遇到问题, 调用此方法后客户端没有正常关闭, 将netty版本4.1.20.FINAL切换到4.1.76.FINAL即可group.shutdownGracefully();log.info("关闭group-finally");}log.info("客户端执行完毕");}private static void sendGetRequest(Channel channel) throws URISyntaxException {String url = "http://localhost:8080/test"; // 测试URLURI uri = new URI(url);String host = uri.getHost();String path = uri.getRawPath() + (uri.getRawQuery() == null ? "" : "?" + uri.getRawQuery());// 构建HTTP请求FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,HttpMethod.GET,path,Unpooled.EMPTY_BUFFER);request.headers().set(HttpHeaderNames.HOST, host).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE).set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);// 发送请求ChannelFuture channelFuture = channel.writeAndFlush(request);log.info("Request sent: " + request);}}

HttpClientHandler

@Slf4j
public class HttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse response) {// 处理响应log.info("处理响应, 响应头: {}", response.headers().toString());log.info("处理响应, 响应体: {}", response.content().toString(CharsetUtil.UTF_8));// 关闭连接ctx.channel().close();log.info("关闭连接");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info( "异常: {}", cause.getMessage());ctx.close();}
}

ws

Server端

WsServer

@Slf4j
public class WsServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("httpServerCodec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));WebSocketServerProtocolConfig config = WebSocketServerProtocolConfig.newBuilder().websocketPath("/ws").checkStartsWith(true).build();pipeline.addLast("wsProtocolHandler", new WebSocketServerProtocolHandler(config));pipeline.addLast("wsServerHandler", new WsServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 9090);channelFuture.sync();log.info("ws服务启动成功");channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.error("服务端发生异常: ", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}log.info("ws服务关闭");}}

WsServerHandler

@Slf4j
public class WsServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private static Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();private static AttributeKey<String> ATTRIBUTE_KEY_TOKEN = AttributeKey.valueOf("token");private static AttributeKey<Boolean> ATTRIBUTE_KEY_REPEAT = AttributeKey.valueOf("repeat");@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) throws Exception {log.info("【WsServerHandler->处理】:{}", webSocketFrame);if (webSocketFrame instanceof TextWebSocketFrame) {TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;log.info("【textWebSocketFrame.text()】:{}", textWebSocketFrame.text());sendAll(ctx.channel(), textWebSocketFrame.text());}}private void sendAll(Channel channel, String text) {CHANNELS.forEach((token, ch) -> {if (channel != ch) {ch.writeAndFlush(new TextWebSocketFrame(text));}});}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【WsServerHandler->userEventTriggered】: {}", evt);if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String requestUri = handshakeComplete.requestUri();String subprotocol = handshakeComplete.selectedSubprotocol();log.info("【requestUri】:{}", requestUri);log.info("【subprotocol】:{}", subprotocol);handleAuth(requestUri, ctx);}}private void handleAuth(String requestUri, ChannelHandlerContext ctx) {try {Map<String, String> queryParams = getQueryParams(requestUri);String token = queryParams.get("token");log.info("【token】:{}", token);if (token == null) {ctx.close();log.info("token为空, 关闭channel");} else {ctx.channel().attr(ATTRIBUTE_KEY_TOKEN).set(token);Channel oldChannel = CHANNELS.put(token, ctx.channel());if (oldChannel != null) {oldChannel.attr(ATTRIBUTE_KEY_REPEAT).set(true);oldChannel.close();} else {sendAll(ctx.channel(), "欢迎" + token + "进入聊天室");}}} catch (Exception e) {ctx.close();}}private static Map<String, String> getQueryParams(String requestUri) throws URISyntaxException {URI uri = new URI(requestUri);String query = uri.getQuery();Map<String, String> queryParams = new HashMap<>();if (query != null) {String[] params = query.split("&");for (String param : params) {String[] keyValue = param.split("=");String key = keyValue[0];String value = keyValue.length > 1 ? keyValue[1] : "";queryParams.put(key, value);}}return queryParams;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("【WsServerHandler->exceptionCaught】", cause);}public void handlerAdded(ChannelHandlerContext ctx) {log.info("【WsServerHandler->handlerAdded】");}public void handlerRemoved(ChannelHandlerContext ctx) {log.info("【WsServerHandler->handlerRemoved】");}public void channelRegistered(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelRegistered】");}public void channelUnregistered(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelUnregistered】");}public void channelActive(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelActive】");}public void channelInactive(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelInactive】");Channel channel = ctx.channel();Boolean isRepeat = channel.attr(ATTRIBUTE_KEY_REPEAT).get() != null&& channel.attr(ATTRIBUTE_KEY_REPEAT).get();if (!isRepeat) {CHANNELS.computeIfPresent(ctx.attr(ATTRIBUTE_KEY_TOKEN).get(), (key, ch) -> {CHANNELS.remove(channel.attr(ATTRIBUTE_KEY_TOKEN));sendAll(channel, channel.attr(ATTRIBUTE_KEY_TOKEN).get() + "离开聊天室");return null;});}}}

Client端

WsClient

@Slf4j
public class WsClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {CountDownLatch connectLatch = new CountDownLatch(1);Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpClientCodec());pipeline.addLast(new HttpObjectAggregator(10 * 1024));WebSocketClientProtocolConfig config = WebSocketClientProtocolConfig.newBuilder().handleCloseFrames(false).build();WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:9090/ws/1?token=abc"),WebSocketVersion.V13,null,true,new DefaultHttpHeaders());pipeline.addLast(new WebSocketClientProtocolHandler(webSocketClientHandshaker, config));pipeline.addLast(new WebSocketClientHandler(connectLatch));}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090);Channel channel = channelFuture.channel();channelFuture.addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {System.err.println("Connection failed: " + future.cause());connectLatch.countDown(); // 确保不会死等}});// 等待连接完成(带超时)if (!connectLatch.await(10, TimeUnit.SECONDS)) {throw new RuntimeException("Connection timed out");}Scanner sc = new Scanner(System.in);while (true) {System.out.print("请输入:");String line = sc.nextLine();if (StringUtil.isNullOrEmpty(line)) {continue;}if ("exit".equals(line)) {channel.close();break;} else {// 发送消息WebSocketFrame frame = new TextWebSocketFrame(line);channelFuture.channel().writeAndFlush(frame);}}channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("客户端发生异常: ", e);} finally {group.shutdownGracefully();}}}

WebSocketClientHandler

@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private CountDownLatch connectLatch;public WebSocketClientHandler(CountDownLatch connectLatch) {this.connectLatch = connectLatch;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {// 处理接收到的WebSocket帧if (frame instanceof TextWebSocketFrame) {String text = ((TextWebSocketFrame) frame).text();System.out.println("Received: " + text);} else if (frame instanceof PingWebSocketFrame) {// 响应Ping帧ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));System.out.println("Responded to ping");} else if (frame instanceof CloseWebSocketFrame) {System.out.println("Received close frame");ctx.close();} else if (frame instanceof BinaryWebSocketFrame) {System.out.println("Received binary data: " + frame.content().readableBytes() + " bytes");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 处理握手完成事件if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {System.out.println("WebSocket handshake complete event");// 握手完成后可以发送初始消息connectLatch.countDown();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.println("WebSocket error: ");cause.printStackTrace();ctx.close();}}

protobuf

Server端

NettyServer

@Slf4j
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));pipeline.addLast(new NettyServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();log.info("server stop");} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerHandler

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务端接收到客户端数据:{}", msg);if (msg instanceof StudentPOJO.Student) {StudentPOJO.Student student = (StudentPOJO.Student) msg;log.info( "客户端发送的数据:{}, {}, {}", student, student.getId(), student.getName());}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【NettyServerHandler->userEventTriggered】: {}", evt);}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught异常:", cause);ctx.close();}public void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded:{}", ctx.channel().remoteAddress());}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved:{}", ctx.channel().remoteAddress());}public void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info("channelRegistered:{}", ctx.channel().remoteAddress());}public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info("channelUnregistered:{}", ctx.channel().remoteAddress());}public void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("channelActive:{}", ctx.channel().remoteAddress());}public void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("channelInactive:{}", ctx.channel().remoteAddress());}}

Student.proto

syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值string name = 2;
}
// 执行命令 protoc.exe --java_out=生成路径 Student.proto路径

Client端

NettyClient

@Slf4j
public class NettyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));pipeline.addLast(new NettyClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);Channel channel = connectFuture.sync().channel();log.info("客户端连接成功");channel.closeFuture().sync();log.info("客户端关闭");} catch (Exception e) {log.error("客户端发生异常: ", e);}}}

NettyClientHandler

@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info( "【NettyClientHandler->channelRead】: {}", msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info( "异常: {}", cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelActive】: {}", ctx);StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1).setName("张三san").build();ctx.writeAndFlush(student);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelInactive】: {}", ctx);}
}

http://www.hkcw.cn/article/yHZkHNLXdo.shtml

相关文章

男友还原女子在三亚被蛇咬身亡过程 延误救治引争议

近日,一名27岁女游客在三亚被蛇咬伤后不幸身亡,引起了广泛关注。6月1日晚,李丽和男友张鹏刚抵达三亚,在入住酒店后外出散步时,李丽的脚趾突然被绿化带窜出的不明生物咬伤。由于光线昏暗,两人只看到伤口上有两点牙印,怀疑是被蛇咬伤。张鹏紧急背起女友回到酒店求助,但未…

博主:榴莲价格暴跌只是开始 从奢侈品到平价果的蜕变

初夏时节,水果市场上的主角非榴莲莫属。曾经被视为“水果贵族”的榴莲价格迎来历史性下降——杭州猫山王从65元/斤跌至36.8元/斤,金华批发市场金枕榴莲更是降至18元/斤,价格直接腰斩。这场席卷全国的“榴莲降价潮”背后隐藏着复杂的市场因素。走进杭州某水果店,扑鼻而来的榴…

小沈阳晒照回应演唱会争议 接受批评继续努力

6月2日下午,艺人小沈阳发文回应女儿登台唱跳遭到的批评。他表示接受一切批评的声音,并表示还需继续努力,希望能对得起观众和喜欢他们的人。5月31日,小沈阳在沈阳举办演唱会,他的女儿沈佳润作为神秘嘉宾登场。沈佳润自信地进行唱跳表演,展现出女团范儿。然而,观众对此褒贬…

华为OD机试真题——最小矩阵宽度(宽度最小的子矩阵)(2025B卷:200分)Java/python/JavaScript/C/C++/GO最佳实现

2025 B卷 200分 题型 本专栏内全部题目均提供Java、python、JavaScript、C、C++、GO六种语言的最佳实现方式; 并且每种语言均涵盖详细的问题分析、解题思路、代码实现、代码详解、3个测试用例以及综合分析; 本文收录于专栏:《2025华为OD真题目录+全流程解析+备考攻略+经验分…

钟楚曦秀手臂肌肉线条 海边锻炼展现强健体魄

6月3日,演员钟楚曦在个人社交平台分享了一组近照,并配文:“只能就地拿两瓶海水练练了”。照片中,她穿着礼服裙在海边展示肌肉好身材,令人赞叹。照片里的钟楚曦妆发精致,扎着丸子头,化着红唇美妆,身穿吊带超短裙,身材火辣苗条。尽管造型绝美,她却双手拿着两个装满海水…

官方通报女子打砸多辆汽车 事件视频引发关注

今天(3日),一段视频引起了广泛关注。视频中,一名女子手持铁棍无差别攻击过往车辆,并多次跳上车顶打砸。河南省洛阳市伊川县人民政府城关街道办事处在今天下午发布了情况通报。责任编辑:0882

郑钦文法网女单8强发球质量位居第一 鏖战虽败犹荣

在6月3日进行的2025年法国网球公开赛女单四分之一决赛中,赛会8号种子、中国选手郑钦文以6:7(3)、3:6不敌现世界排名第一的白俄罗斯选手萨巴伦卡,止步八强。比赛首盘即陷入鏖战,时长超过一个小时。郑钦文开局手感火热,在第3局成功破发,一度取得3:1的领先优势。但在随后的几…

赶考出行这些事项要留意 关注天气平安迎考

距离2025年高考还有4天,这个时期全国大部分地区雨水充沛,加上六月初天气炎热,容易形成局部暴雨。每年高考期间,部分地区会出现不同程度的降雨。考生和家长需及时关注天气预报,提前规划赴考路线,注意防暑降温,并确保出行安全。祝所有考生平安迎考,金榜题名。责任编辑:0…

缅甸发生4.1级地震 震源深度10千米

中国地震台网正式测定:6月3日21时28分在缅甸(北纬21.23度,东经99.54度)发生4.1级地震,震源深度10千米。(总台央视记者 张腾飞)责任编辑:0882

追火箭上火星 科技旅游火出圈 科技创新激发文旅活力

近年来,我国科技事业取得历史性成就,载人航天、深空探测、“人造太阳”等科技成果捷报频传,进一步激发了全社会对科技创新的关注。新奇有趣的科技旅游成为越来越多群众的选择。各地不断探索优化硬件和完善服务,以吸引更多游客。海南文昌的瑶光火箭观礼平台是当地首个航天观…

杜海涛:跟歌手女主持约会的一天 甜蜜互动引关注

2025年6月3日,杜海涛在微博上分享了与妻子沈梦辰的日常互动,标题为“跟歌手女主持约会的一天”,展现了两人温馨而甜蜜的生活。尽管标题未直接提及沈梦辰的名字,但从后续内容中可以确认这位“女主持”正是沈梦辰。杜海涛因急性扁桃体发炎身体不适,沈梦辰连夜从长沙飞往北京…

37岁包文婧感叹产后减肥不易 二胎满月喜上加喜

6月1日,包文婧在社交账号上分享了庆祝二胎满月的视频,宣布她终于结束了坐月子的生活。37岁的包文婧选择了一个较短的28天坐月子期,产后状态非常好。刚出月子,包文婧就化了漂亮的妆容,显得非常高兴。她穿着坐月子的衣服出现,透露自己产前体重为134斤,出月子时减到了110斤…

韩国改革新党候选人李俊锡宣布败选 得票率7.7%位列第三

韩国改革新党候选人李俊锡于6月3日晚宣布败选。当天,韩国举行了第21届总统选举。根据韩国三大电视台在投票结束后公布的联合出口民调,共同民主党总统候选人李在明以51.7%的得票率领先,国民力量党候选人金文洙以39.3%的得票率紧随其后,而李俊锡仅获得7.7%的选票。责任编辑:…

评论员:韩国选民拒为亲美政客买账 金文洙大选败局已定

在韩国大选进入关键阶段之际,国民力量党候选人金文洙前往仁川自由公园举行集会拉票。他在麦克阿瑟铜像前默哀,并带领工作人员集体下跪,向参加集会的民众求支持。这一举动不仅暴露了他的亲美立场,还展示了其高超的表演技巧。然而,这些表演并未赢得大多数选民的支持。金文洙…

乐基儿二度离婚后首谈黎明:没联系了 满脸笑容送祝福

近日,44岁的模特乐基儿在二度离婚后首次公开谈论前夫黎明。她在采访中表示两人已无联系,并笑着祝福黎明演唱会成功。2000年,黎明和乐基儿初次相识;2008年3月13日,他们在马尔代夫举行婚礼;2012年10月3日,黎明通过其公司宣布与乐基儿离婚。2023年9月10日,乐基儿向媒体证实…

乌偷袭俄成功带来哪些深远影响 无人机改变战争规则

一架藏在卡车木质顶棚下的FPV无人机突然激活,引擎的嗡鸣撕裂了西伯利亚军事基地的寂静。它冲向停机坪上庞大的图-95战略轰炸机,火光瞬间吞没了这架曾威慑欧洲的“空中巨兽”。2025年6月1日深夜,俄罗斯五个州同时上演了类似的场景。乌克兰国家安全局将这场代号“蛛网”的行动…

荷兰首相宣布辞职 内阁将解散 看守政府继续运作

荷兰首相斯霍夫于6月3日宣布,将向国王威廉-亚历山大递交内阁辞呈,政府内阁将解散。斯霍夫表示,内阁将继续作为看守政府,并致力于解决与安全相关的问题。同日,荷兰自由党领导人维尔德斯宣布退出联合政府。责任编辑:0764

华为OD机试真题——模拟工作队列(2025B卷:200分)Java/python/JavaScript/C/C++/GO最佳实现

2025 B卷 200分 题型 本专栏内全部题目均提供Java、python、JavaScript、C、C++、GO六种语言的最佳实现方式; 并且每种语言均涵盖详细的问题分析、解题思路、代码实现、代码详解、3个测试用例以及综合分析; 本文收录于专栏:《2025华为OD真题目录+全流程解析+备考攻略+经验分…

白俄罗斯总统抵达北京释放何信号 特朗普急了

白俄罗斯总统卢卡申科宣布访华三天,这一消息引发了国际关注。与此同时,白宫主动放出风声,称中美会谈即将举行,这释放了何种信号?白俄罗斯总统专机抵达北京之际,美国似乎被冷落。自五月底以来,关于特朗普希望与中方会面的消息不断传出。特朗普曾表示确信会与中方通话,但…

山西一小区地下水“高烧”到72摄氏度 地热原因成谜

近日,山西长治锦绣司马小区的居民反映,小区内一些业主地下室温度高达40摄氏度,附近抽出来的地下水温度更是达到72摄氏度。多个部门前来调查,但未能找到地热原因。视频显示,小区外面绿化带已被挖开,几根黑色粗管堆放在一起,一根细管从绿化带位置延伸到下水井,管中不断有…