作者:星巴刻
前文《 NioEventLoop 的职责》从外部的角度阐述了 NioEventLoop 的首要责任:为注册在 NioEventLoop 上的 channels 服务,发现 channels 上的 I/O 事件,然后将事件转交 channel 流水线 pipeline 处理。
本文是它的后续,剖析 NioEventLoop 的内部逻辑以及周边的一些代码,阐述 NioEventLoop 如何履行上述责任。
一、NioEventLoop 的继承体系
下图清晰地呈现了 NioEventLoop 的继承体系:
图中呈现了 NioEventLoop 类的继承层级,虽然没有呈现它实现了哪些接口,不过不妨碍对这个图进行一些解读:
1、NioEventLoop 可以被追溯到 java.util.concurrent.ExecutorService 接口。这个信息表明 NioEventLoop 能够执行 Runnable 任务,从而保证了这些要执行的任务是在 NioEventLoop 的线程中执行,而非外部线程来执行。NioEventLoop 内置的一些 Runnable 任务包括了对 channel 的 register、系统缓冲区满后而推迟的 flush 任务等。
2、NioEventLoop 继承于 io.netty.util.concurrent.SingleThreadEventExecutor。这个信息表明,NioEventLoop 是单线程的。根据 SingleThreadEventExecutor 的说明,提交到 SingleThreadEventExecutor 的任务,将按照提交的先后顺序执行。
需要注意的是,开发不要随便用 NioEventLoop 执行长任务以免造成 I/O 阻塞,给 NioEventLoop 履行其首要职责添堵。更不能在 NioEventLoop 执行含有类似 Thread.sleep()、Object.wait()、ReentrantLock.lock()、CountDownLatch.await() 等等之类的程序,尽量避免使用 synchronized 同步锁,这些都可能导致 NioEventLoop 线程中断(线程这口气要是断了,I/O 的命就没了)。
二、SingleThreadEventExecutor 简述
io.netty.util.concurrent.SingleThreadEventExecutor 内部包含了一个任务 FIFO 队列,用于存储提交到该类的 Runnable 任务。它也创建了一个定制的单线程对象,规定了线程执行之后结束之前的一些处理,比如把未执行完成的任务执行完毕、关闭 NioEventLoop 的 Selector 等。它提供了addTask、offerTask、removeTask、runAllTasks等仅供子类可以调用的方法。但没有具体实现线程具体要执行的内容,它空出了 run 方法让子类来实现,所以 SingleThreadEventExecutor 类本身默认不会从它的任务队列中取出任务进行执行:
protected abstract void run();
作为子类的 NioEventLoop 将实现这个方法,它也就成了 NioEventLoop 的重要入口方法。NioEventLoop 的 run 方法实现后,将被 SingleThreadEventExecutor 里创建的线程执行, NioEventLoop? 所负的对 channels 的职责,将在 run 方法中实现。
三、NioEventLoop 的创建
一般,应用程序开发时并不会直接创建 NioEventLoop,而是通过创建 NioEventLoopGroup 提供给 Netty 的 Bootstrap 或 ServerBootstrap。所以 NioEventLoop 并没有 public 的构建方法,它的创建必须由与它同一个 package 下的 NioEventLoopGroup 负责。简单地说,NioEventLoopGroup 可以认为是 NioEventLoop 的工厂。NioEventLoopGroup 的 next() 方法是提供 NioEventLoop 的工厂方法,用于返回一个提前创建好的 NioEventLoop。NioEventLoopGroup 提供了多个构造方法,构造 NioEventLoopGroup 时,可以指定 nThreads 参数表示这个 NioEventLoopGroup 要创建多少个 NioEventLoop;如果不指定 nThreads或指定为 0,则默认创建 2 倍 CPU 核数的 NioEventLoop。NioEventLoop 的获取的示例代码如下:
NioEventLoopGroup gorup = new NioEventLoopGroup(4);
NioEventLoop loop = group.next();
NioEventLoop 被创建时,会打开一个自己独有的 java.nio.channels.Selector。Selector 是 Java Nio 的选择器,NioEventLoop 之所以能够发现 channels 的 I/O 事件,正是使用了它。假如我是一个服务端程序,有一个 NioEventLoop 作为 parent, 有 16 个 NioEventLoop 作为 child,那么这个程序内部将由 1 + 16 = 17 的 NioEventLoop,每个 NioEventLoop 分别含有 1 个自己的 Selector,因此这个程序内部总共将含有 17 个 Selector。
四、将 Channel 注册到 NioEventLoop
为了侦听客户端连接的到来,需要将创建的 NioServerSocketChannel 注册到作为 parent 的 NioEventLoop 上;为了发现并处理客户端连接的可读可写事件,需要将表示客户端连接的 NioSocketChannel 注册到分配给它的、充当 child 角色的某个 NioEventLoop 上。这里要注意的是,充当 child 角色的 NioEventLoop 在程序里有多个,一个 NioEventLoop 可以负责多个客户端连接,但是每一个客户端连接只能被一个 NioEventLoop 负责。
NioEventLoop 的父类 SingleThreadEventLoop 提供了如下注册方法,可以用于注册 NioServerSocketChannel 或 NioSocketChannel:
以上代码片段表示,SingleThreadEventLoop.register(channel, promiss) 方法将注册调用转交给 channel 的 unsafe 对象的 register 方法完成,也就是说实际由 NioServerSocketChannel 或 NioSocketChannel 的 unsafe.register() 方法来具体完成注册工作。这两个类的 unsafe 对象刚好都在它们的父类? AbstractChanel 中定义和创建。在 AbstractUnsafe.register() 方法中,首先会判断当前执行 register 的线程是在负责该 channel 的 NioEventLoop 的内部线程中,如果是则继续,如果不是则会把接下来的工作封装为任务,提交到 NioEventLoop 的任务队列中,从而保证最终执行 register 操作的线程是由 NioEventLoop 的内部线程来执行。下面是 AbstractUnsafe 中关于注册的核心代码,如前所述,这个代码将由 NioEventLoop 的内部线程来执行:
register0(promiss) 方法首先先调用 channel 的 doRegister() 方法,在 AbstractChanel 中,doRegister() 是一个抽象方法,作为 NioServerSocketChannel、NioSocketChannel 的父类的 AbstractNioChannel 类实现了 doRegister() 方法,它的实现体现了这里的本质:取出 NioServerSocketChannel、NioSocketChannel 底层的 java.nio.channels.SelectableChannel 对象,调用它提供 register 方法,将 SelectableChannel 注册到负责 NioServerSocketChannel、NioSocketChannel 的 NioEventLoop 所创建的 selector 上:
有点意思的是,调用 javaChannel().register() 方法,传入的 interestOps(第 2 个参数)此时并不指定任何值,只是简单地传入 0,这说明 Netty 对 channel I/O 事件兴趣的注册并不是在注册时设置的。
回到 "code: AbstractUnsafe.register0",程序调用完 channel.doRegister 后,关联的 pipleline 有 2 个重要方法被调用到: pipleline.fireChannelRegistered 和 pipleline.fireChannelActive 。从而将 channel 的注册事件、就绪事件传递到 pipeline 中,应用程序可以可以处理了。
此处也可以看出,channel 第一次的 register 状态将先于 active 状态送到 pipleline。但如果 channel 后续取消注册后的再次注册,register 会继续发送给 pipeline,而 active 则不会。
pipleline.fireChannelRegistered 发送后,注册到 pipeline 的 ChannelInboutHandler 的 channelRegistered(ctx)将被调用。
但 pipleline.fireChannelActive 发送后,注册到 pipeline 的 ChannelInboutHandler,不仅仅 channelActive(ctx) 会被调用到,默认情况下还会修改该 channel 的 selectionKey 的 interestOps值,使得当有信息到来时 channelRead(ctx) 也会被调用到(除非 channel.config.isAutoConfig 配置为 false 了); 这个逻辑由 DefaultChannelPipeline 实现:
上图中 readIfIsAutoRead() 方法调用了 channel.read() 方法,通过一系列迂回,最终将调用到 AbstractNioChannel 的 doBeginRead 方法,在这个方法中修改 selectionKey 的 interestOps,增加 readInterstOp 操作(对 NioServerSocketChannel 而言 readInterstOp 指的是 OP_ACCEPT,对 NioSocketChannel 而言,readInterstOp 指的是 OP_READ),表明程序可以开始处理读请求:
五、通过 Selector 发现 I/O 事件,并转交给 pipeline 处理
假设此时 NioEventLoop 已经被注册了 1 个或多个 Channel 了。现在回到 NioEventLoop 实现 SingleThreadEventExecutor 要求的 run 方法的地方,run 方法大致的流程如下:
1. 通过 selector.select 或 selectNow 方法从 Selector 中发现当前发生的 I/O 事件.
select 方法是阻塞的,selectNow 是非阻塞的。NioEventLoop 内部会进行决策,什么时候使用阻塞,什么时候使用非阻塞。简单地,如果 NioEventLoop 中当前的任务队列中有任务时候,会使用 selectNow 非阻塞;如果当前任务队列中没有任务,则会优先进行一次非阻塞 selectNow,如果没有新的 I/O 事件,则再使用阻塞的 select。
2. 调用 selector.select 阻塞方法时候,会传入一个超时时间
如果不传入一个超时时间,selector.select 会一直阻塞下去直到由 I/O 事件的到来。如果真这样,那这个期间线程将被挂起,注册到 NioEventLoop 中的定时任务(比如Idle检测任务)就得不到执行。所以 Netty 调用阻塞 select 方法之前,会根据当前定时任务情况,计算出一个可以等待 I/O 事件的最迟时间,以便恢复线程。
当有 I/O 事件处理时,Netty 处理这些事件的顺序是一个一个 channel 处理,对每个 channel 的事件处理顺序是,先处理 OP_CONNECT、后处理 OP_WRITE,最后处理 OP_READ 和 OP_ACCEPT。这里有 2 个细节:
一,优先处理 OP_CONNECT 事件。优先处理 OP_CONNECT 事件是因为 Java Nio 要求,对于客户端新建立的连接,必须先完成对 channel 的 finishConnect 调用后,才能使用该 channel,否则会抛出 NotYetConnectedException 异常;
二,把处理 OP_WRITE 的优先级提高到比 OP_READ、OP_ACCEPT前,目的是为了希望先处理 OP_WRITE 腾出一些堆内存空间,使得有更多的空闲内容能够为OP_READ、OP_ACCEPT 服务。
3. OP_CONNET 事件的处理
OP_CONNET 事件在客户端程序中出现,代表底层已经和远程建立好连接了。根据 Java Nio 的要求,此时需要调用 java.nio.channels.SocketChannel 的 finishConnect() 方法,完成连接的建立。NioSocketChannel 的 doFinishConnect 完成这个工作。
对应用程序而言,此时连接已经建立,就要进入 active 状态。pipeline 的 channelActive(ctx) 要被调用到。AbstractNioChannel.AbstractNioUnsafe 的 fulfillConnectPromise 方法完成这个工作。
最后,对于一个连接而言,OP_CONNECT 只会发生一次,所以一旦接收到 OP_CONNECT 事件后,就可以取消对这类事件的关注了,通过 selectionKey.interestOps(k.interestOps() &~SelectionKey.OP_CONNECT) 完成这个工作。
4. OP_WRITE 事件的处理
OP_WRITE 事件表示当前 channel 的系统缓冲区有空,应用程序可以往系统缓冲区写入信息。Netty 对 OP_WRITE 事件的响应就是调用 channel.unsafe().forceFlush()。
一般情况下,系统缓冲区都不会满,除非是瞬间发送大量的信息或者客户端接收速度相对较慢。因此,Netty 默认情况下,不会向 selectionKey 注册对 OP_WRITE 感兴趣,否则每次调用 selector.select 将因为具备 OP_WRITE 条件立即返回而不会阻塞,导致大量 CPU 被浪费。
应用程序调用 channel.write(msg),并不会把信息直接写入系统(发送)缓冲区,而是先把要写入的信息写入专属于 channel 的 ChannelOutboundBuffer(一个 channel 实例有一个专属的 ChannelOutboundBuffer 实例)。只有当调用 channel.flush() 或 ctx.flush() 时,Netty 才会真正尝试把信息写入系统缓冲区。在 NioSocketChannel.doWrite(ChannelOutboundBuffer) 方法尝试将 ChannelOutboundBuffer 中的信息写入系统缓冲区时,如果系统缓冲区已满,Netty 无法写入任何一个字节到系统缓冲区,此时 doWrite 将调用 incompleteWrite(true) 才向该 channel 的 selectionKey 注册对 OP_WRITE 感兴趣。需要注意 Netty 这里有一个小技巧,即使无法全部写完,但只要 doWrite 能够往系统缓冲区写入哪怕一个字节,Netty 都不会向 selectionKey 注册 OP_WRITE,而是调用 incompleteWrite(false) 往任务队列的增加一个新的 flush() 任务,过一会再尝试进行 flush() 。这个小技巧体现了 Netty 对细节的把握:或许一会系统缓冲区就好了呢?
5、OP_READ、OP_ACCEPT 事件的处理
OP_READ 事件表示,当前系统缓冲区有数据需要应用程序来读?。籓P_ACCEPT 事件表示,当前有新的客户端建立到服务端来了。这两个事件分别对应 NioSocketChannel 和 NioServerScoketChannel。Netty 把这个 I/O 事件抽象为 channel 有可读的事件。因此,Netty 直接调用各自 channel.unsafe 的 read() 方法直接处理:
对于 NioServerSocketChannel,channel.unsafe 由 AbstractNioMessageChannel 的NioMessageUnsafe 实现。unsafe.read() 方法将调用到 NioServerSocketChannel的 doReadMessages() 方法,从 Java Nio 的 Channel 中 accept 出一个 SocketChannel:
对于 NioSocketChannel, channel.unsafe 由 AbstractNioByteChannel 中的 NioByteUnsafe 实现,核心代码是最终调用到 NioSocketChannel 的 doReadBytes(ByteBuf) 方法,从 Java Nio 的 channel 中读取字节信息到 Netty 的 ByteBuf 中,然后调用 pipeline 的 fireChannelRead(ByteBuf):
经过多次调用 doReadBytes(ByteBuf) 以及 pipeline.fireChannelRead(ByteBuf),系统缓冲区的可读字节信息都读取完毕后 ,unsafe 的 read 方法会调用一次 pipeline.fireChannelReadComplete()。根据这个信息,这里有一个小技巧,尽量不要在 ChannelHandler 的 channelRead(ctx) 方法中调用 ctx 的 flush,而是在 pipeline.channelReadComplete(ctx) 再调用 flush,以减少过多的 flush 调用。
六、小花絮:SingleThreadEventExecutor 内部线程并不自动启动
SingleThreadEventExecutor 内部的线程不会自动启动,只有第一次接收外部任务时候才会自动启动!
为啥不默认启动呢,反正最终都要启动为何这么羞涩?这或许和 NioEventLoopGroup 构建时默认就把所有的 NioEventLoop 构建完毕有关,因为是提前构建但不一定实际使用,所以就先不 startThread 了。
下面这个代码是玩了一下一些特性的时候,需要特殊处理下:自己创建 Java Nio 的 Channel,然后注册到 Netty 的 NioEventLoop 中。需要自己写一个画红线部分的代码提交一个空方法来启动 NioEventLoop 内部的线程!
七、总结
NioEventLoop 比较繁杂,但它却是 Netty 内部的重要控制类,需要很好地弄懂它。
NioEventLoop 无法自己创建,必须由 NioEventLoopGroup 创建。多个 AbstractNioChannel 可以注册到一个 NioEventLoop 上。NioEventLoop 内部有一个专属的 Selector 对象以及一个线程。这个线程从 Selector 中 select 出感兴趣的 I/O 事件,调用 channel 的 unsafe 对象来处理。unsafe 负责按照 Java Nio 规范处理 I/O 事件,同时调用 channel.pipeline 的相关方法,使得应用程序提供的 ChannelHandler 能够得到回调。
2017-11-19