Netty框架

NIO框架存在的问题

客户端关闭会导致服务端空轮询

当客户端主动与服务端断开连接时,会导致READ事件一直被触发,即selector.select()会直接通过,并且是可读的状态,但是实际上读到的数据是空的。

image-20230912111813493

我在第一次发现这个问题时,也是相同的现象。但是之后复现,却变成了如下这个不同的报错,两者的原因应该是一致的,但是呈现的效果却不同。无法读到空数据,直接会报错。导致不同结果的根本原因,可能要以后再探究了。

image-20230912112056768


粘包、拆包问题

操作系统通过TCP协议发送数据时,也会将数据先存放在缓冲区,至于什么时候真正地发出这些数据,由TCP本身决定,我们无法控制。也就是说,比如我们发送两个数据包(P1、P2),理想情况下,这两个包应该依次到达服务端,并且服务端正确读取两次数据,但是由于TCP传输数据的时间、顺序不可控,可能出现如下情况:

  1. P1、P2被合在一起发送给了服务端(粘包)
  2. 完整的P1和P2的前半部分合在一起发送给了服务端(拆包)
  3. P1的前半部分被单独作为一个部分发给了服务端,后半部分和P2一起发送给了服务端(拆包)

粘包、拆包问题,也有一些常见的解决方案:

  1. 消息定长。只接收、发送固定大小的消息长度,如果不够,空位补空格,只有接收了固定的字节长度后,才会作为一个完整的数据包进行处理。
  2. 在每个包的末尾使用固定的分隔符
  3. 将消息分为头部和消息体,在头部中保存当前整个数据包的长度

走进Netty框架

Netty是由JBOSS提供的一个开源Java网络编程框架,主要对Java的NIO包进行了再次封装,提供了更强大、稳定的功能和易于使用的api。

微服务之间的远程调用,也可以使用Netty来完成,比如Dubbo的RPC框架。包括SpringWebFlux框架,也抛弃了内嵌Tomcat,而使用Netty作为通信框架。

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.97.Final</version>
</dependency>

ByteBuf介绍

Netty没有使用NIO中提供了ByteBuffer来进行数据装载,而是自定义了一个ByteBuf类。该类与NIO中的ByteBuffer相比有如下几个优势:

  1. 写操作完成后,无需flip()翻转
  2. 具有比ByteBuffer更快的响应速度
  3. 可以动态扩容

读写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
// 创建一个初始容量为10的ByteBuf缓冲区,Unpooled是用于快速生成ByteBuf的工具类
ByteBuf buf = Unpooled.buffer(10);
LOGGER.info("初始状态:{}", buf.array());
// 写入一个Int数据
buf.writeInt(892323985);
LOGGER.info("写入Int后:{}", buf.array());

// 无需翻转,直接读取一个short数据
buf.readShort();
LOGGER.info("读出short后:{}", buf.array());

// 丢弃操作,将当前的可读部分内容丢到最前面,并且读写指针向前移动丢弃的距离
buf.discardReadBytes();
LOGGER.info("丢弃之后:{}", buf.array());

// 清空操作,读写指针都归零
buf.clear();
LOGGER.info("清空之后:{}", buf.array());
}

划分操作

1
2
3
4
5
6
7
8
9
10
11
12
13
private static void slice() {
// 和ByteBuffer一样,将一个byte[]直接包装进缓冲区。但是将写指针设置到了最后
ByteBuf buf = Unpooled.wrappedBuffer("abcdefg".getBytes()); // Unpooled.copiedBuffer() 复制操作也能将数据拷贝进一个新的缓冲区

LOGGER.info("内容:{}", Arrays.toString(buf.array()));
// 读取一个字节
buf.readByte();
// 现在读指针位于1,然后进行划分
ByteBuf slice = buf.slice();

LOGGER.info("偏移地址:{}", slice.arrayOffset());
LOGGER.info("内容:{}", Arrays.toString(slice.array()));
}

动态扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static void dynamicExpansion() {
// 通过指定最大容量,就无法动态扩容,如Unpooled.buffer(10, 10);
ByteBuf buf = Unpooled.buffer(10);
LOGGER.info("缓冲区容量:{} 字节", buf.capacity());

// 写一个字符串
String str = "陆离,混斗";
if (str.getBytes(StandardCharsets.UTF_8).length > buf.maxCapacity() - buf.writerIndex()) {
LOGGER.info("超出字符限制,不能写入");
} else {
buf.writeCharSequence("陆离,混斗", StandardCharsets.UTF_8);
}
LOGGER.info("缓冲区容量:{} 字节", buf.capacity());
}

非池化和池化 缓冲区生成器

上述代码创建缓冲区都是使用的Unpooled工具类,查看其代码,实际是通过UnpooledByteBufAllocator.DEFAULT这样一个非池化缓冲区生成器进行创建。

顾名思义,池化缓冲区利用了池化思想,将缓冲区通过设置内存池,进行内存块的复用,这样就不用频繁申请内存,避免了性能损失。

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf buf = allocator.directBuffer(10); //申请一个容量为10的直接缓冲区
buf.writeChar('T'); //随便操作操作
System.out.println(buf.readChar());
buf.release(); //释放此缓冲区

ByteBuf buf2 = allocator.directBuffer(10); //重新再申请一个同样大小的直接缓冲区
System.out.println(buf2 == buf);
}

在使用完一个缓冲区之后,将其进行资源释放,当我们再次申请一个同样大小的缓冲区时,会直接得到之前已经申请好的缓冲区,所以,PooledByteBufAllocator实际上是将ByteBuf实例放入池中在进行复用。


Netty工作模型

Netty以主从Reactor多线程模型为基础,构建出了一套高效的工作模型。

image-20230912165110096
  • Netty抽象出两组线程池BossGroup和WorkerGroup,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责读写。
  • 无论是BossGroup还是WorkerGroup,都是使用EventLoop(事件循环,不断地进行事件通知)来进行事件监听。整个Netty也是使用事件驱动来运作,比如客户端已经准备好读写、连接建立时,都会进行事件通知。EventLoopGroup就是BossGroup和WorkerGroup的具体实现。
  • 在BossGroup之后,会正常将SocketChannel绑定到WorkerGroup中的其中一个EventLoop上,进行后续的读写监听。

Channel详解

io.netty.channel.Channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
ChannelId id(); //通道ID
EventLoop eventLoop(); //获取此通道所属的EventLoop,因为一个Channel在它的生命周期内只能注册到一个EventLoop中
Channel parent(); //Channel是具有层级关系的,这里是返回父Channel
ChannelConfig config();
boolean isOpen(); //通道当前的相关状态
boolean isRegistered();
boolean isActive();
ChannelMetadata metadata(); //通道相关信息
SocketAddress localAddress();
SocketAddress remoteAddress();
ChannelFuture closeFuture(); //关闭通道,但是会用到ChannelFuture
boolean isWritable();
long bytesBeforeUnwritable();
long bytesBeforeWritable();
Unsafe unsafe();
ChannelPipeline pipeline(); //流水线,之后也会说
ByteBufAllocator alloc(); //可以直接从Channel拿到ByteBufAllocator的实例,来分配ByteBuf
Channel read();
Channel flush(); //刷新,基操
}

Netty中的Channel主要特点:

  1. 所有的IO操作都是异步的

    方法调用了之后就直接返回,那么操作的结果如何获取?答案是通过ChannelFuture


ctx.close() 和 ctx.channel().close()

1. ctx.close()会触发当前Handler和当前Handler之前的close事件;

2. channel.close()所有Handler都会触发;

1
2
3
4
5
6
ch.pipeline().addLast(new InboundHandler1());  
ch.pipeline().addLast(new InboundHandler2());
ch.pipeline().addLast(new OutboundHandler1());
ch.pipeline().addLast(new OutboundHandler2());

链表中的顺序为head->in1->in2->out1->out2->tail

如果在InboundHandler2中执行的是ctx.channel().writeAndFlush,执行顺序是:

1
2
3
4
InboundHandler1 
InboundHandler2
OutboundHandler2
OutboundHandler1

如果在InboundHandler2中执行的是ctx.writeAndFlush,执行顺序是:

1
2
InboundHandler1 
InboundHandler2

这样的顺序对于write操作和close都是一样道理的。

Netty4中Handler的执行顺序以及ctx.close() 与 ctx.channel().close()的区别 - 简书 (jianshu.com)


Netty客户端重连

指数退避机制

可以理解为每次重连失败时,就把重连的时间设置为之前的指数级别。例如2秒、4秒、8秒。

Netty客户端中使用指数退避方式重连

客户端连接服务器时,调用 Bootstrap 的 connect 方法:

1
bootstrap.connect(host, port)

这个方法会返回 ChannelFuture ,ChannelFuture 实现了 netty 的 Future , 而Future 继承自 java.util.concurrent.Future ,这是异步操作的结果,因此 connect 方法是一个异步方法。我们可以通过调用addListener 添加监听器,监听是否连接成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {

bootstrap.connect(host, port).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
LOGGER.info("连接服务器成功!");
} else if (retry == 0) {
LOGGER.error("重连次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
LOGGER.error(new Date() + ": 连接失败,第" + order + "次重连……");
// bootstrap.config().group().schedule()实现定时任务逻辑
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
}
}
});
}