文章目录
- 概要
- 1 Hello World
- 1.1 什么是netty?
- 1.2 Netty 的核心特性
- 1.3 初识 netty
- 2 Netty 的核心组件
- 2.1 EventLoop 和 EventLoopGroup
- 2.1.1 基本概念
- 2.1.2 与 Channel 关联
- 2.1.3 EventLoopGroup 的实现
- 2.1.4 常用方法
- 2.2 Channel
- 2.2.1 Channel 的类型
- 2.2.2 Channel 的基本功能
- 2.2.3 常用方法
- 2.2.4 ChannelPipeline
- 2.2.5 获取和关闭channel
- 2.3 Future 和 Promise
- 2.3.1 Future 异步回调
- 3.3.2 Future 状态 和 方法
- 3.3.3 Promise 设置状态
- 3.3.4 Promise 的常用方法
- 3.3.5 Promise 与 Future 的对比
- 2.4 Handler 和 Pipeline
- 2.4.1 Handler 分类与作用
- 2.4.2 Handler和Pipeline 的工作机制
- 2.4.3 Handler 实现类和方法
- 2.4.4. Pipeline 常用方法
- 3 其他
概要
简单整理netty相关知识点。
1 Hello World
1.1 什么是netty?
Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。
Netty 的核心设计目标是:
- 高性能:充分利用 Java NIO 的非阻塞特性。
- 可扩展性:适用于从简单客户端到复杂服务器的大多数网络应用场景。
- 易用性:提供高度抽象的 API,屏蔽 NIO 编程的复杂细节。-
1.2 Netty 的核心特性
1. 异步和事件驱动
Netty 的通信是基于事件驱动的,事件由事件循环管理。
异步特性让 Netty 在处理高并发时具有天然优势,避免阻塞操作。
2. 高效的线程模型
Netty 使用少量线程处理大量连接,通过 I/O 多路复用机制实现。
默认采用 Reactor 模式,分为 Boss 和 Worker 两类线程。
3. 灵活的编解码
提供丰富的编码器和解码器支持,包括 Protobuf、HTTP、WebSocket 等。
支持自定义协议解析。
4. 内存管理
使用内置的 ByteBuf 代替 Java 的 ByteBuffer,提供动态扩展、零拷贝等高效操作。
1.3 初识 netty
1 目标
开发一个简单的服务器端和客户端
- 客户端向服务器端发送 hello, world
- 服务器仅接收,不返回
2 加入依赖
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.39.Final</version>
</dependency>
3 服务端代码
public class HelloServer {public static void main(String[] args) {// 1.启动器,负责组装 netty 组件,启动服务器new ServerBootstrap()// 2. BossEventLoop, WorkerEventLoop(selector,thread), group组.group(new NioEventLoopGroup())// 3. 选择服务器的 ServerSocketChannel 实现.channel(NioServerSocketChannel.class) //OIO BIO// 4. boss负责处理连接, worker(child)负责处理读写,决定了 worker(child) 能执行哪些操作(handler).childHandler(// 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 6. 添加具体 handlerch.pipeline().addLast(new StringDecoder()); // 将 ByteBuf 转换为字符串ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ // 自定义 handler@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 打印上一步转换好的字符串System.out.println(msg);}});}}).bind(8087);}
}
代码解读:
- 1 处,创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector 后面会详细展开
- 2 处,选择服务 Scoket 实现类,其中 NioServerSocketChannel 表示基于 NIO 的服务器端实现,其它实现还有
- 3 处,为啥方法叫 childHandler,是接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
- 4 处,ServerSocketChannel 绑定的监听端口
- 5 处,SocketChannel 的处理器,解码 ByteBuf => String
- 6 处,SocketChannel 的业务处理器,使用上一个处理器的处理结果
4 客户端代码
public class HelloClient {public static void main(String[] args) throws InterruptedException {// 1. 创建启动类new Bootstrap()// 2. 添加 EventLoop.group(new NioEventLoopGroup())// 3. 选择客户端 channel 实现.channel(NioSocketChannel.class)// 4. 添加处理器.handler(new ChannelInitializer<NioSocketChannel>() {//在连接建立后被调用@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder()); //将客户端的字符串转换为编码发送到服务器端}})// 5. 连接服务器.connect(new InetSocketAddress("localhost", 8080)).sync().channel()// 6. 向服务器发送数据.writeAndFlush("hello world");}
}
代码解读:
- 1 处,创建 NioEventLoopGroup,同 Server
- 2 处,选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现,其它实现还有
- 3 处,添加 SocketChannel 的处理器,ChannelInitializer 处理器(仅执行一次),它的作用是待客户端 SocketChannel 建立连接后,执行 initChannel 以便添加更多的处理器
- 4 处,指定要连接的服务器和端口
- 5 处,Netty 中很多方法都是异步的,如 connect,这时需要使用 sync 方法等待 connect 建立连接完毕
- 6 处,获取 channel 对象,它即为通道抽象,可以进行数据读写操作
- 7 处,写入消息并清空缓冲区
- 8 处,消息会经过通道 handler 处理,这里是将 String => ByteBuf 发出数据经过网络传输,到达服务器端,服务器端 5 和 6 处的 handler 先后被触发,走完一个流程
5 流程梳理
💡 提示
一开始需要树立正确的观念
- 把 channel 理解为数据的通道
- 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 的加工,会变成其它类型对象,最后输出又变成 ByteBuf
- 把 handler 理解为数据的处理工序
- 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- handler 分 Inbound 和 Outbound 两类
- 把 eventLoop 理解为处理数据的工人
- 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
- 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
- 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人
service、client、channel、channelPipeline的关系
2 Netty 的核心组件
2.1 EventLoop 和 EventLoopGroup
2.1.1 基本概念
EventLoop 是一个 单线程的事件循环,用于处理 I/O 操作、普通任务和定时任务。
- I/O 事件处理:EventLoop 会循环监听和处理 I/O 事件(如网络连接、数据读取、数据写入等)
- 任务调度:EventLoop 可以执行异步任务,通常是定时任务或需要在 I/O 线程上执行的任务。
EventLoopGroup 是多个 EventLoop 的容器,负责管理其生命周期,并为 Netty 中的 I/O 操作分配线程。
- 负责创建、分配和管理多个 EventLoop, 每个 EventLoop 绑定一个独立的线程。
- 为每个新连接(Channel)分配一个 EventLoop,保证 Channel 的事件总是由同一个 EventLoop 处理。
- 将任务(普通任务、定时任务、I/O 事件)分配到 EventLoop 中运行。
2.1.2 与 Channel 关联
- EventLoopGroup 管理多个 EventLoop, 每个 EventLoop 与一个线程绑定。
- 一个 EventLoop 内部维护了一个 selector 来管理服务多个 Channel。
- 每个 Channel 绑定到唯一的 EventLoop,从而保证线程安全。
2.1.3 EventLoopGroup 的实现
EventLoopGroup 是一个接口,我们创建对象时要创建接口的实现,其中前三个可以处理 io操作、普通任务、定时任务,第四个不能处理I/O操作
2.1.4 常用方法
1 EventLoop 的方法:
2 EventLoopGrop 的方法
2.2 Channel
Channel 是 Netty 中用于数据传输的核心组件,代表了 I/O 操作的端点。它用于处理连接、接收和发送数据,并且与具体的 I/O 模型(如 NIO、Epoll 或 KQueue)紧密集成。
- Channel 主要用于处理 I/O 操作,支持异步 I/O 事件的处理和数据传输。
- 在 Netty 中,Channel 是与客户端和服务端之间的连接一一对应的。
2.2.1 Channel 的类型
Netty 提供了多种类型的 Channel,每种类型根据不同的协议和传输方式进行优化。常见的 Channel 类型如下:
2.2.2 Channel 的基本功能
Netty 的 Channel 提供了一些基础的操作,以下是最常用的一些功能:
- 连接管理: 用于创建、绑定、连接和关闭连接。
- 数据读写: 支持从 Channel 中读取数据和向 Channel 写入数据。
- 事件触发: 可以处理来自客户端或服务器的 I/O 事件(如接收数据、写入数据等)。
- 流量控制: 支持背压机制,控制数据的读写速率。
2.2.3 常用方法
2.2.4 ChannelPipeline
ChannelPipeline 是一个链式结构,负责管理 I/O 操作的处理逻辑。在一个 Channel 中,每个数据的读取、写入操作都会经过一系列处理器(ChannelHandler)的加工。这些处理器可以执行多种任务,如编码、解码、协议解析等。
- ChannelHandler:用于处理 I/O 操作的具体业务逻辑,如编解码、业务处理、异常处理等。
- ChannelPipeline:由多个 ChannelHandler 组成,形成一个处理链。每当发生 I/O 事件时,事件会在 ChannelPipeline 中按照顺序流动,逐个交给对应的处理器处理。
2.2.5 获取和关闭channel
由于connect和close方法是异步执行的,也就是在另外的线程中执行,主线程只负责调用,所以获取 channel 分为两种情况,一种是在主线程中获取channel并执行业务代码,另一种情况是在执行connect的线程执行业务代码。关闭channel同样,要么是主线程,要么是close的线程。
1 客户端通过同步的方式获取和关闭 channel
public class Client {private static final Logger log = LoggerFactory.getLogger(Main.class);public static void main(String[] args) throws InterruptedException {ChannelFuture connectFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());}}).connect("localhost", 8888);// 1. 在主线程中获取channel,并输出connectFuture.sync(); // 阻塞,直到connect线程执行成功Channel channel = connectFuture.channel();log.info("{}", channel);new Thread(() -> {Scanner sc = new Scanner(System.in);while (true) {String line = sc.nextLine();if ("q".equals(line)) {channel.close();// close是异步的,所以如果在close后面直接写善后工作的代码,无法保证其在close之后执行break;}channel.writeAndFlush(line);}}).start();// 2. 关闭channel, 并执行关闭后的善后工作ChannelFuture closeFuture = channel.closeFuture(); // 获取关闭的结果closeFuture.sync(); // 阻塞, 直到调用closelog.info("{}", closeFuture);}
}
// 输出
20:29:15.955 [main] - [id: 0xa8522feb, L:/127.0.0.1:2185 - R:localhost/127.0.0.1:8888]
20:29:15.960 [main] - 0
20:29:15.960 [main] - 1
hello
q
20:29:42.378 [main] - AbstractChannel$CloseFuture@70e9c95d(success)
2 客户端通过异步的方式获取和关闭channel
public class Client {private static final Logger log = LoggerFactory.getLogger(Main.class);public static void main(String[] args) throws InterruptedException {ChannelFuture connectFuture = new Bootstrap().group(new NioEventLoopGroup()).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new StringEncoder());}}).connect("localhost", 8888);// 1. 在connect线程中获取channel,并输出connectFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {Channel channel = future.channel();log.info("{}", channel);Scanner sc = new Scanner(System.in);while (true) {String line = sc.nextLine();if ("q".equals(line)) {channel.close();break;}channel.writeAndFlush(line);}}});// 2. 关闭channel, 并执行关闭后的善后工作log.info("0");ChannelFuture closeFuture = connectFuture.sync().channel().closeFuture(); // 关闭并返回结果closeFuture.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {log.info("{}", future);}});}
}
观察可以看到,对于异步执行业务代码都是通过 ChannelFuture 的 addListener 方法给这个future添加一个回调函数。
2.3 Future 和 Promise
在 Netty 中,Future 是一个异步操作的结果容器,用于表示当前任务的执行状态以及操作的结果或失败原因。相比于 Java 原生的 Future 接口,Netty 提供了功能更加强大的 ChannelFuture 和 Promise,支持异步回调和链式操作,极大地方便了异步编程。
2.3.1 Future 异步回调
1 同步操作
“hello world” 由主线程输出。这点跟 Java juc 中的 future 相同,调用get时阻塞等待future返回结果,然后继续业务的处理
public class Main {private static final Logger log = LoggerFactory.getLogger(Main.class);public static void main(String[] args) throws ExecutionException, InterruptedException {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();EventLoop next = eventLoopGroup.next();Future<String> resFuture = next.submit(() -> {sleep(1000);return "hello world";});log.info("main run");log.info("{}", resFuture.get());eventLoopGroup.shutdownGracefully();}
}
// 输出
21:49:34.367 [main] - main run
21:49:35.379 [main] - hello world
2. 异步回调
观察输出可以看到 “hello world” 不再是由 main 线程输出
public class Main {private static final Logger log = LoggerFactory.getLogger(Main.class);public static void main(String[] args) throws ExecutionException, InterruptedException {EventLoopGroup eventLoopGroup = new NioEventLoopGroup();EventLoop next = eventLoopGroup.next();Future<String> resFuture = next.submit(() -> {sleep(1000);return "hello world";});log.info("main run");resFuture.addListener(new GenericFutureListener<Future<? super String>>() {@Overridepublic void operationComplete(Future<? super String> future) throws Exception {log.info("{}", resFuture.get());eventLoopGroup.shutdownGracefully();}});log.info("main is over");}
}
// 输出22:01:14.140 [main] - main run
22:01:14.142 [main] - main is over
22:01:15.147 [nioEventLoopGroup-2-1] - hello world
3.3.2 Future 状态 和 方法
3.3.3 Promise 设置状态
在 Netty 中,Promise 是 Future 的扩展,它不仅是一个异步操作的结果容器,还提供了手动设置操作结果的能力,可以主动控制操作完成的状态(成功或失败),而不仅仅是被动等待。
public class Main {private static final Logger log = LoggerFactory.getLogger(Main.class);public static void main(String[] args) throws ExecutionException, InterruptedException {NioEventLoopGroup group = new NioEventLoopGroup();EventLoop eventLoop = group.next();DefaultPromise<String> promise = new DefaultPromise<>(eventLoop); // 手动创建new Thread(() -> {try {sleep(1000);promise.setSuccess("hello world"); // 手动设置} catch (InterruptedException e) {throw new RuntimeException(e);}}).start();log.info("main running");log.info("{}", promise.get());}
}
// 输出
22:29:29.243 [main] - main running
22:29:30.252 [main] - hello world
3.3.4 Promise 的常用方法
3.3.5 Promise 与 Future 的对比
2.4 Handler 和 Pipeline
- 在 Netty 中,Handler 是用于处理 I/O 事件和数据的核心组件。负责在 Channel 的生命周期中处理各种事件,例如连接建立、数据读取、异常捕获等。
- Pipeline(即 ChannelPipeline)是一个 双向链表,用于管理和组织多个 Handler,实现事件的分层处理和流式处理。每个 Channel 都关联一个 ChannelPipeline,它是 I/O 事件从生成到完成的核心通道。
2.4.1 Handler 分类与作用
Handler 分为两种类型:
- 1 ChannelInboundHandler
作用:处理入站事件
常用方法:
- 2 ChannelOutboundHanderl
作用:处理出站事件
常用方法:
2.4.2 Handler和Pipeline 的工作机制
Handler 是通过 ChannelPipeline 进行组织和管理的(ChannelPipeline管理着Handler)。每个 Channel 都有一个 ChannelPipeline,它是一个双向链表,包含一组 Handler。当有 I/O 事件发生时,事件会在 ChannelPipeline 中沿链表传播。
- 入站事件传播:从链表头开始,事件依次传递给 ChannelInboundHandler(handler的一个实现类)。
- 出站事件传播:从链表尾开始,事件依次传递给 ChannelOutboundHandler(handler的一个实现类)。
下面代码我们向服务端的 ChannelPipeline 中添加了 编码器、解码器、3个入站handler和3个出站handler
ChannelHandlerContext 中也包含 writeAndFlush,如果我们把上面 h3 中的 ch.writeAndFlu
sh(msg); 换成 ctx.writeAndFlush(msg);,可以发现写入事件并没有经过 h4,h5,h6事件处理。原因如下
2.4.3 Handler 实现类和方法
Netty 提供了以下常用的 Handler 实现:
2.4.4. Pipeline 常用方法
ChannelPipeline 提供了一系列操作,用于管理 Handler 和触发事件。
3 其他
待补充