Netty:核心实现

核心概念

Netty 核心实现包含下面几种:

详细分析

EventLoop

EventLoop 有不同的实现,如 NioEventLoop,EpollEventLoop 等,不同的实现对应不同的底层系统调用,如 NioEventLoop 基于 select 系统调用, 而后者基于 epoll。下面以 NioEventLoop 为例说明 EventLoop 核心元素与接口实现。

NioEventLoop 核心属性为 Selector,即基于 select 系统调用的多路复用器。JDK 的 nio 包中已经提供了实现。 核心方法包含两个:

EventLoopGroup

其常用核心属性有两个。

    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }

分别是 nThreads 和 threadFactory.

nThreads 表示当前 EventLoopGroups 会创建的 EventLoop 的个数,因为 EventLoop 都是单线程(SingleThreadEventLoop子类)的,估计因此 此参数不叫 nEventLoop 而叫 nThreads。

threadFactory 为创建 EventLoop 线程的线程工厂,一般用来定制线程名字。

ChannelPipeline

在 EventLoop 部分内容已经说过,当 channel 事件来临后,会交给 channel 的 pipeline 处理,下面看下 pipeline 的内容。

下图展示的是一个 I/O 事件如何在 ChannelPipeline 中流转,被 ChannelHandler 处理。

                                                I/O Request
                                            via {@link Channel} or
                                        {@link ChannelHandlerContext}
                                                      |
  +---------------------------------------------------+---------------+
  |                           ChannelPipeline         |               |
  |                                                  \|/              |
  |    +---------------------+            +-----------+----------+    |
  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  |               |
  |               |                                  \|/              |
  |    +----------+----------+            +-----------+----------+    |
  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  .               |
  |               .                                   .               |
  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
  |        [ method call]                       [method call]         |
  |               .                                   .               |
  |               .                                  \|/              |
  |    +----------+----------+            +-----------+----------+    |
  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  |               |
  |               |                                  \|/              |
  |    +----------+----------+            +-----------+----------+    |
  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
  |    +----------+----------+            +-----------+----------+    |
  |              /|\                                  |               |
  +---------------+-----------------------------------+---------------+
                  |                                  \|/
  +---------------+-----------------------------------+---------------+
  |               |                                   |               |
  |       [ Socket.read() ]                    [ Socket.write() ]     |
  |                                                                   |
  |  Netty Internal I/O Threads (Transport Implementation)            |
  +-------------------------------------------------------------------+

围绕 Inbound/Outbound 两个概念,有 Inbound/Outbound 事件、Inbound/Outbound 传播方法、Inbound/Outbound 处理器这几个概念。 其中 Inbound/Outbound 事件由传播方法触发,并经过处理器处理。

其中 Inbound event 传播方法如下,即以 fire 开头的方法。在当前的 ChannelHandlerContext 调用此方法,会将时间传递给下一个 Handler。

其中 outbound event 传播方法如下,非 fire 开头的方法。同样的在当前的 ChannelHandlerContext 调用此方法,会将时间传递给下一个 Handler。

ChannelPipeline 的默认实现是 DefaultChannelPipeline,其会在你配置的 ChannelHandler 之前增加 HeadHandler,在之后增加 TailHandler。 二者会自动的增加一些操作,如你并不用在你的 ChannelHandler 中实现真正调用 socket 的write 发送数据,这个在 HeadHandler 中已经默认 为你做了,可见 HeadHandler(HeadContext)的 write 方法的实现,岂会调用 channel 的 unsafe 代理发送数据。

ByteBuf

ByteBuf 是 Netty 的I/O byte 数据 buffer 实现,其没有直接使用 JDK nio 下的 ByteBuffer。 ByteBuf 较 JDK 的 ByteBuffer 的优势是:

ByteBuf 使用了读写双指针的实现,下面是一个 Buffer 的双指针分割效果。


+-------------------+------------------+------------------+
| discardable bytes |  readable bytes  |  writable bytes  |
|                   |     (CONTENT)    |                  |
+-------------------+------------------+------------------+
|                   |                  |                  |
0      <=      readerIndex   <=   writerIndex    <=    capacity

ByteBuf 核心 API:

Derived buffers API 包含:

我对上面这几个方法的形容虽然是拷贝,但是这几个方法并没有实际意义上去复制一个新的buffer出来,它和原buffer是共享数据的。 所以说调用这些方法消耗是很低的,并没有开辟新的空间去存储,但是修改后会影响原buffer。这种方法也就是咱们俗称的浅拷贝。 要想进行深拷贝,这里可以调用copy()和copy(index,length)方法,使用方法和上面介绍的一致,但是会进行内存复制工作,效率很低。

ByteBuf 的创建尽量使用工厂方法 Unpooled, 而不是 new 出来。

ChannelFuture

如前所述,Channel I/O 操作的异步结果。Netty 中所有的 I/O 操作都是异步的,其操作异步结果便是 ChannelFuture。

从 ChannelFuture 的 API 中可以判断对应的操作是否已经完成,是否成功等。可见。

                                     +---------------------------+
                                     | Completed successfully    |
                                     +---------------------------+
                                +---->      isDone() = true      |
+--------------------------+    |    |   isSuccess() = true      |
|        Uncompleted       |    |    +===========================+
+--------------------------+    |    | Completed with failure    |
|      isDone() = false    |    |    +---------------------------+
|   isSuccess() = false    |----+---->      isDone() = true      |
| isCancelled() = false    |    |    |       cause() = non-null  |
|       cause() = null     |    |    +===========================+
+--------------------------+    |    | Completed by cancellation |
                                |    +---------------------------+
                                +---->      isDone() = true      |
                                     | isCancelled() = true      |
                                     +---------------------------+

也可以向 ChannelFuture 中注册监听器GenericFutureListener来达到结果被动通知的效果。

总结

如上。