Netty入门到实战
- Netty
- Netty 核心组件和流程分析
- 数据流转的底层核心
- ByteBuf 原理分析
- Netty 解决半包、粘包问题
- Netty 实现心跳检查机制
Netty
NIO 是一种 I/O 模型,netty 是基于 NIO 开发出来的一款异步事件驱动框架,它是一个通用的网络应用程序框架。netty 简化了 NIO 网络编程的开发,本质就是对NIO的封装和升级。它支持多种协议,如常用的协议有:HTTP(基于TCP的半双工协议)、WebSocket(支持全双工通信的协议,它一般被称为HTTP的进阶协议,它先是通过TCP三次握手建立连接后,然后通过HTTP握手转换成WebSocket协议,转换成功会发101状态码,之后就是WebSocket协议进行联系了,具体的可以看这篇(因为使用Netty的话使用WebSocket协议比较多):WebSocket协议:5分钟从入门到精通)。
Netty 核心组件和流程分析
- EventLoop(事件循环):它是一个循环,不断地等待事件的发生并处理这些事件。事件和NIO一样可以是网络连接的建立、数据的读取和写入、定时器的触发等等。一旦注册,将处理通道内的所有 I/O 操作。一个EventLoop实例通常会处理多个Channel,但这可能取决于实现细节和内部机制。
- EventLoopGroup(EventLoop的集合):它负责管理和调度多个 EventLoop 的创建和分配。可以理解它为线程池,然后其中每个线程都包含一个EventLoop。一般情况下,一个应用程序只需要一个 EventLoopGroup 来处理所有的网络事件。它俩的关系可以类比于 ThreadGroup和Thread之间的关系。内部封装了NIO中的Selector多路复用器。
- Channel(通道):与 java NIO 中的 SocketChannel 一样,可以进行数据的读取和写入。
- ChannelPipeline(通道管道):它由一系列的处理器(Handler)组成,用于处理数据的流动。ChannelPipeline 中的每个处理器叫做 ChannelHandler,这些ChannelHandler 负责处理不同的网络事件,例如数据的读取、写入和处理,连接的建立和关闭,异常的处理等等。**当数据进入管道的时候,它会
依次
经过 ChannelPipeline 中的每个 ChannelHandler 进行处理。**每个 ChannelHandler 可以对数据进行修改、转换或执行特定的业务逻辑。处理完毕会流通到下一个ChannelHandler。好似流水线。 - ChannelHanlder(通道处理器):用来处理 Channel 中的事件和数据的组件。使用的话是被封装到 ChannelPipeline 管道中。
- ChannelInboundHandler:用于处理入站事件,例如连接建立、数据读取等。
- ChannelOutboundHandler:用于处理出站事件,例如数据写入、连接关闭等。
- SimpleChannelInboundHandler:继承自ChannelInboundHandler,简化了消息处理的逻辑。
- SimpleChannelOutboundHandler:继承自ChannelOutboundHandler,简化了消息发送的逻辑。
- HttpServerCodec:它负责处理 HTTP 请求和响应的编解码。
- HttpObjectAggregator:将 HTTP 请求的多个部分合并成一个完整的 FullHttpRequest。
- WebSocketServerProtocolHandler:处理 WebSocket 协议的握手和帧的编解码。
- ByteBuf(字节缓冲区):ByteBuf 是 Netty 中的字节容器,用于高效地存储和传输字节数据。与Java NIO 的 ByteBuffer 相比,ByteBuf 提供了更灵活的API和更高效的内存管理。
- Future(异步操作结果):Netty中的操作都是异步的,Future用来获取操作的状态和结果。
- Bootstrap(引导类):Bootstrap是启动客户端的类,负责配置和启动客户端的相关组件。
- ServerBootstrap(服务器引导类):ServerBootstrap是创建和启动服务器的类,用于配置和管理服务器的各个组件。其实是对NIO ServerSocketChannel 的封装。
注意:当你使用某工具调试的时候,服务端和网络调试工具并不是直接发送的。网路助手-》操作系统-》网络-》对方操作系统-》找到对应进程,是以这个路线发送数据和接收数据的,此过程都是0/1数据传输,在Netty中,使用的是 ByteBuf
进行字节的传输和存储。
下面是Netty的一个流程简图(这里的NioServerSocketChannel被构造时,会执行一个init方法,为这个channel中的Pipeline里设置handler,准备好去处理Selector选出的事件,大概流程就关联起来了):
这还有个 EventLoopGroup 封装多路复用器的一过程,且支持多线程处理(会向ServerBootstrap传递俩个EventLoopGroup,第一个称为BoosGroup用来处理新用户的连接请求,而第二个称为WorkerGroup,用来处理读写操作和业务逻辑,也是为什么一般给BoosGroup线程数设置为1就够用的原因,因为它只处理连接请求):
数据流转的底层核心
在叙述数据流转底层核心时,先阐述 Netty 所提供的字符串入站、出站处理器。
从下面继承关系可以看见 StringDecoder 是 Netty 提供的入站处理器(继承了ChannelInBoundHandlerAdapter)
从下面的继承关系同样可以看出 StringEncoder 是 Netty 提供的出站处理器(继承了 ChannelOutBoundHandlerAdapter)
上述阐述过 ChannelPipeline 是用来封装 ChannelHandler 的管道,那它是怎么存储 ChannelHandler 的呢?其实底层是使用了双向链表进行存储的,至于为什么使用双向链表我觉得就三点:
- 双向遍历(单链表无法完成):在服务端接收到事件的时候(接收到客户端来的信息时)会从头遍历到尾 ChannelHandler 进行事件处理;而在服务端向客户端回消息/发送消息时是从尾部遍历到头部 ChannelHandler 进行处理,双向链表是最佳选择。
- 动态调整(单链表无法完成):单链表进行删除操作的话需要遍历整个链表,而无法动态的进行删除、添加。
- 上下文可传递事件(单链表无法完成):使用双向链表的话,在一个节点上可以轻松的获取到其前一个和后一个节点,那在处理事件时,如果需要将一通道处理器的事件转换到下一个,就可以轻松的实现上下文传递。
拿下面配的 ChannelPipeline 进行数据流转说明
@Override
public void initChannel(SocketChannel ch) throws Exception {
Charset gbk = Charset.forName("GBK");
ch.pipeline().addLast("decoder",new StringDecoder(gbk));
ch.pipeline().addLast("encoder",new StringEncoder(gbk));
ch.pipeline().addLast(new DiscardServerHandler());// 将handler封装到Pipeline中
}
(这图有点问题哈,右边那个应该是双向链表的)
阐述一下流程:
- 客户端发送服务端数据(myz(byteBuf类型)):从 head 节点出发-》下一个Inbound(入站)StringDecoder,此时 myz 数据类型成了 String-》StringEncoder 是 Outbound,所以跳过-》ChatServerHandler 是我自定义的 InboundHandler,在 channleRead 方法中处理字符串类型的 msg 即可。
- 服务端回客户端数据(hello(String类型)):从 tail 节点出发-》ChatServerHandler是入站处理器跳过-》StringEncoder 是出站处理器,此时将 hello 转换成了 ByteBuf-》StringDecoder 入站处理器,跳过。
所以,当配置我们的 ChannelPipeline 中的 ChannelHandler 的时候,顺序有时是很重要的,需要考虑好数据流转的方向和处理方案。
ByteBuf 原理分析
ByteBuf 主要是通过两个index:readerIndex 和 writerIndex 来完成数据的读写,整个缓冲区分为三个部分:可丢弃部分、可读部分、可写部分。其实如果知道 nio 包下的 ByteBuffer 的话,这个理解起来也不难。(至于为什么源码里面要加个er咱也不懂)需注意,readerIndex 永远都是在 writerIndex 的前面的。
初始位置
当缓冲区为空的时候,writerIndex 和 readerIndex 都为 0,整个区域都为可写部分。
写了一点数据后
读了一点数据后
丢弃部分被清理之后
咱还是看看几个方法,对这俩指针理解理解,ByteBuf 组件用处还是很广的。
下面的 readInt 方法源码中读取 ByteBuf 中的前四个字节,让 readerIndex 指针向前移动四。
@Override
public int readInt() {
checkReadableBytes0(4);
int v = _getInt(readerIndex);
readerIndex += 4;
return v;
}
下面的 writeByte 方法源码中写入了 value 值,花费一个字节,让writerIndex 向前移一位。
@Override
public ByteBuf writeByte(int value) {
ensureWritable0(1);
_setByte(writerIndex++, value);
return this;
}
下面的 readableBytes 方法源码中返回可读区域的字节长度
@Override
public int readableBytes() {
return writerIndex - readerIndex;
}
下面的 markReaderIndex 方法源码中用一个标志变量标记一下读指针位置,好在下次需要是使用 resetReaderIndex 方法进行 readerIndex 读指针重置位置。
@Override
public ByteBuf markReaderIndex() {
markedReaderIndex = readerIndex;
return this;
}
@Override
public ByteBuf resetReaderIndex() {
readerIndex(markedReaderIndex);
return this;
}
Netty 解决半包、粘包问题
当使用 TCP 协议进行数据传输的时候,数据通常被分割成小块进行发送,这些小块被称为数据包,由于TCP协议的工作方式是流水线的形式,可能会出现半包和粘包,它俩是 TCP 协议中常见的问题,由于数据的分段和合并导致接收方无法准确识别数据包的边界。一般通过在应用层进行处理和解析,可以有效地解决这些问题。
怎样的流水线?咱有请 ChatGPT 帮小编回答:
假设有两个人,人A和人B,他们之间要进行数据传输。这个过程可以分为发送方和接收方两个角色。
发送方(人A):
- 发送方将要传输的数据拆分成一些小的数据块,称为数据包。
- 发送方将这些数据包按顺序放入发送缓冲区。
- 发送方通过网络将数据包发送给接收方。
接收方(人B):
- 接收方在接收缓冲区中接收到数据包。
- 接收方将接收到的数据包按顺序放入接收缓冲区。
- 接收方从接收缓冲区读取数据包,并将它们组装成完整的数据。
在这个过程中,TCP协议使用了流水线的思想来提高效率和可靠性。
发送方的流水线:
- 应用程序将数据传递给发送方。
- 发送方将数据拆分成数据包并放入发送缓冲区。
- 发送方通过网络将数据包发送给接收方。
- 发送方等待接收方的确认信息。
接收方的流水线:
- 接收方从网络接收数据包。
- 接收方将接收到的数据包放入接收缓冲区。
- 接收方从接收缓冲区读取数据包,并将它们组装成完整的数据。
- 接收方向发送方发送确认信息,告知发送方已正确接收到数据。
通过这种流水线的方式,TCP协议可以高效地处理数据的分段和组装,确保数据的可靠传输。发送方和接收方之间的通信是有序的,每个数据包都经过确认和校验,以确保数据的准确性和完整性。
- 半包(拆包)问题:半包问题指的是接收方无法一次完整地接收到发送方发送的一个完整的数据包。就是说接收方只能分批次接收缓存中的数据。本来五次可以接受完的数据,却7次才收到。
- 粘包问题:粘包问题是接收方接收到的数据包粘到了一起,多个数据包被合并成一个大的数据包。就是说服务端可以接受多个小数据包,然后一下接收了多个,导致本来十次接收完的接收了7次就接受完了。
具体的Netty如何解决这个问题的三种方案,看下面的 B 站讲解视频(我觉得讲的挺好的)
Netty 解决TCP的粘包拆包问题
这里只阐述第三种解决方案:
- 自定义包结构体:就是每次发送数据的同时,将数据的长度也带上,但是存放数据长度的位置一定要定好,否则服务端无法识别哪个字节代表的是数据长度。
-
自定义解码器(继承ByteToMessageDecoder实现decode方法)
/** * 通过对入站消息进行译码处理TCP拆包粘包问题 */ public class SolveMessageSafetyHandler extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if(in.readableBytes()<4){ System.out.println("无数据长度"); return; } int contentLen = in.readInt(); if(contentLen>in.readableBytes()){ System.out.println("可读数据与数据长度不匹配,重新等待数据传输!"); in.resetReaderIndex();// 重置readerIndex,也就是readerIndex-4 return; } byte[] res = new byte[contentLen]; in.readBytes(res); out.add(new String(res));// 传向下一个handler in.markReaderIndex(); // 标记一下,下次读好直接读 } }
-
向服务器中的 ChannelPipeline 中配置这个 ChannelHandler。
@Override public void initChannel(SocketChannel ch) throws Exception { Charset gbk = Charset.forName("GBK"); ch.pipeline().addLast(new SolveMessageSafetyHandler()); // ch.pipeline().addLast(new FixedLengthFrameDecoder(23)); // ch.pipeline().addLast("decoder",new StringDecoder(gbk)); ch.pipeline().addLast("encoder",new StringEncoder(gbk)); ch.pipeline().addLast(new DiscardServerHandler());// 将handler封装到Pipeline中 }
这里需要注意一个问题,Netty 中一个 ChannelPipeline 只允许有一个 ByteToMessageDecoder 处理器。
-
测试(运行一py脚本,本来是会出现粘包问题,代码会运行结果如下)
import socket import uuid s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('127.0.0.1', 8888)) string = "I Love You, You know?哈" body = bytes(string, 'utf-8') bodyLen = len(body) data_len = bodyLen.to_bytes(4) for i in range(100): print(i) s.sendall(data_len + body)
结果没有任何问题~
Netty 实现心跳检查机制
使用心跳机制去维护客户端和服务端之间的连续,其实大多数的工具实现都一样。比如Eureka注册中心是定期去检测服务是否超时没有与其联系,然后执行去除服务操作。而 Netty 中是不断地检测中的,且是以事件的形式处理超时后的结果。
简单聊聊这种机制的实现就是(以读操作为例):先定义需要在多少时间内服务器需读一次-》服务/客户端上线即初始化对应的数据(比如上一次读的时间)-》在没读的过程中也不断地判断有没有超时,如果超时了就让对应的 ChannelHandler 去处理用户时间触发的业务。如果没超时继续监听。-》如果执行了读操作,那就更新超时时间、更新上一次读的时间等等,然后继续监听。
这篇博客总结的还行,推荐给大家。
Netty 如何通过心跳检测机制实现空闲自动断开
对应的事件状态有哪些?(读空闲状态、写空闲状态、读和写都空闲的状态)
public enum IdleState {
/**
* No data was received for a while.
*/
READER_IDLE,
/**
* No data was sent for a while.
*/
WRITER_IDLE,
/**
* No data was either received or sent for a while.
*/
ALL_IDLE
}
继承 ChannelInboundHandlerAdapter
去重写 userEventTriggered
去处理用户事件触发。如下面代码示例(比如下面设置如果读超时超过三次就把客户端和服务端给断开。):
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
int readTimeOut = 0;
/**
* 触发了某某时间会回调这个方法
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if(event.state() == IdleState.READER_IDLE){
readTimeOut++;
}
if(readTimeOut >= 3){
System.out.println("读闲置超时三次,关闭连接");
ctx.channel().close();
}
}
}
然后记得在 ChannelPipeline 中配置一下这个 ChannelHandler-》HeartBeatHandler。
public class DiscardServer {
private int port;
public DiscardServer(int port){
this.port = port;
}
public void run() throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 线程池
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 参数1:读闲置、参数2:写闲置、参数3:全超时
// 读闲置设置的时间超过的话会抛出读事件,
// 写-》写事件,
// 读写都闲置超时后设置的时间的话则抛出全超时时间
ch.pipeline().addLast(new IdleStateHandler(2,2,5));
ch.pipeline().addLast(new HeartBeatHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
System.out.println("tcp start success");
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}