剖析 Netty 內部網路實現原理

作者 | 張彥飛allen

來源 | 開發內功修煉

Netty 是一個在 Java 生態裡應用非常廣泛的的網路程式設計工具包,它在 2004 年誕生到現在依然是火的一塌糊塗,光在 github 上就有 30000 多個項目在用它。所以要想更好地掌握網路程式設計,我想就繞不開 Netty。所以今天我們就來分析分析 Netty 內部網路模組的工作原理。

一、Netty 用法

我們首先找一個 Netty 的例子,本篇文章整體都是圍繞這個例子來展開敘述的。我們下載 Netty 的原始碼,並在 examples 中找到 echo 這個 demo。同時,為了防止程式碼更新導致對本文敘述的影響,我們切到 4.1 分支上來。

# git checkout https://github.com/netty/netty.git

# git checkout -b 4.1

# cd example/src/main/java/io/netty/example/echo

在這個 demo 的 EchoServer 中,展示了使用 Netty 寫 Server 的經典用法。

public final class EchoServer {

public static void main(String[] args) throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup();

final EchoServerHandler serverHandler = new EchoServerHandler();

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 100)

.handler(new LoggingHandler(LogLevel.INFO))

.childHandler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline p = ch.pipeline();

if (sslCtx != null) {

p.addLast(sslCtx.newHandler(ch.alloc()));

}

p.addLast(serverHandler);

}

});

// Start the server.

ChannelFuture f = b.bind(PORT).sync();

......

}

}

如果你是一個 Java 新手,或者乾脆像飛哥一樣沒用 Netty 寫過服務,相信上述程式碼基本是看不懂的。究其根本原因是相比 C/C++ ,Java 的封裝程度比較高。Java 語言本身的 JVM 中 NIO 對網路的封裝就已經遮蔽了很多底層的概念了,再加上 Netty 又封裝了一層,所以 Java 開發者常用的一些術語和概念和其它語言出入很大。

比如上面程式碼中的 Channel、NioEventLoopGroup 等都是其它語言中所沒見過的。不過你也不用感到害怕,因為這其中的每一個概念都是 socket、進程等底層概念穿了一身不同的衣服而已。接下來我們分別細了解一下這些概念。

1.1 NioEventLoopGroup

如果你沒接觸過 Netty,可以簡單把 NioEventLoopGroup 理解為一個執行緒池就可以。每一個 NioEventLoopGroup 內部包含一個或者多個 NioEventLoop。

其中 NioEventLoop 是對執行緒、epoll 等概念進行了一個集中的封裝。

首先,EventLoop 本身就是一個執行緒。為什麼這麼說,我們通過看 NioEventLoop 的繼承關係就能看出來。NioEventLoop 繼承於 SingleThreadEventLoop,而 SingleThreadEventLoop 又繼承於 SingleThreadEventExecutor。SingleThreadEventExecutor 實現了在 Netty 中對本地執行緒的抽象。

public abstract class SingleThreadEventExecutor extends ... {

private volatile Thread thread;

private final Queue taskQueue;

}

在 SingleThreadEventExecutor 中不但封裝了執行緒對象 Thread,而且還配置了一個任務隊列 taskQueue,用於其它執行緒向它來放置待處理的任務。

1.2 selector

另外 NioEventLoopEventLoop 以 selector 的名義封裝了 epoll(在 Linux 作業系統下)。

在 NioEventLoop 對象內部,會有 selector 成員定義。這其實就是封裝的 epoll 而來的。我們來看具體的封裝過程。以及 selectedKeys,這是從 selector 上發現的待處理的事件列表。

public final class NioEventLoop extends SingleThreadEventLoop{

// selector

private Selector selector;

private Selector unwrappedSelector;

// selector 上發現的各種待處理事件

private SelectedSelectionKeySet selectedKeys;

}

NioEventLoopGroup 在構造的時候,會調用 SelectorProvider#provider 來生成 provider,在默認情況下會調用 sun.nio.ch.DefaultSelectorProvider.create 來創建。

//file:java/nio/channels/spi/SelectorProvider.java

public abstract class SelectorProvider {

public static SelectorProvider provider() {

// 1. java.nio.channels.spi.SelectorProvider 屬性指定實現類

// 2. SPI 指定實現類

......

// 3. 默認實現,Windows 和 Linux 下不同

provider = sun.nio.ch.DefaultSelectorProvider.create();

return provider;

}

}

在 Linux 下,默認創建的 provider 使用的就是 epoll。

//file:sun/nio/ch/DefaultSelectorProvider.java

public class DefaultSelectorProvider {

public static SelectorProvider create() {

String osname = AccessController

.doPrivileged(new GetPropertyAction("os.name"));

if (osname.equals("Linux"))

return createProvider("sun.nio.ch.EPollSelectorProvider");

}

}

1.3 Channel

Channel 是 JavaNIO 裡的一個概念。大家把它理解成 socket,以及在 socket 之上的一系列操作方法的封裝就可以了。

Java 在 Channel 中把 connect、bind、read、write 等方法都以成員方法的形式給封裝起來了。

public interface Channel extends ... {

Channel read();

Channel flush();

......

interface Unsafe {

void bind(SocketAddress localAddress, ...);

void connect(SocketAddress remoteAddress, ...);

void write(Object msg, ...);

......

}

}

另外在 Java 中,習慣把 listen socket 叫做父 channel,客戶端握手請求到達以後創建出來的新連接叫做子 channel,方便區分。

1.4 Pipeline

在每個 Channel 對象的內部,除了封裝了 socket 以外,還都一個特殊的資料結構 DefaultChannelPipeline pipeline。在這個 pipeline 裡是各種時機裡註冊的 handler。

Channel 上的讀寫操作都會走到這個 DefaultChannelPipeline 中,當 channel 上完成 register、active、read、readComplete 等操作時,會觸發 pipeline 中的相應方法。

這個 ChannelPipeline 其實就是一個雙向連結串列,以及連結串列上的各式各樣的操作方法。

public interface ChannelPipeline {

ChannelPipeline addFirst(String name, ChannelHandler handler);

ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);

ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

ChannelPipeline addLast(String name, ChannelHandler handler);

ChannelPipeline fireChannelRead(Object msg);

}

1.5 EchoServer 解讀

現在我們具備了對 Java、對 Netty 的初步理解以後,我們再會後來看一下開篇提到的 EchoServer 原始碼。

public final class EchoServer {

public static void main(String[] args) throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup();

final EchoServerHandler serverHandler = new EchoServerHandler();

ServerBootstrap b = new ServerBootstrap();

b.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.option(ChannelOption.SO_BACKLOG, 100)

.handler(new LoggingHandler(LogLevel.INFO))

.childHandler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline p = ch.pipeline();

if (sslCtx != null) {

p.addLast(sslCtx.newHandler(ch.alloc()));

}

p.addLast(serverHandler);

}

});

// Start the server.

ChannelFuture f = b.bind(PORT).sync();

......

}

}

在該程式碼一開頭,bossGroup = new NioEventLoopGroup(1)這一行是創建了一個只有一個執行緒的執行緒池。workerGroup = new NioEventLoopGroup又創建了 worker 執行緒池,沒有指定數量,Netty 內部會根據當前機器的 CPU 核數來靈活決定。

ServerBootstrap 這是一個腳手架類,是為了讓我們寫起伺服器程序來更方便一些。

b.group(bossGroup, workerGroup)這一行是將兩個執行緒池傳入,第一個作為 boss 只處理 accept 接收新的客戶端連接請求。第二個參數作為 worker 執行緒池,來處理連接上的請求接收、處理以及結果發送發送。

我們注意下 childHandler是傳入了一個 ChannelInitializer,這是當有新的客戶端連接到達時會回調的一個方法。在這個方法內部,我們給這個新的 chaneel 的 pipeline 上添加了一個處理器 serverHandler,以便收到資料的時候執行該處理器進行請求處理。

上面的幾個方法都是定義,在b.bind方法中真正開始啟動服務,創建父 channel(listen socket),創建 boss 執行緒。當有新連接到達的時候 boss 執行緒再創建子 channel,為其 pipeline 添加處理器,並啟動 worker 執行緒來進行處理。

二、Netty bootstrap 參數構建

簡言之 bootstrap.group() .channel() .childHandler() .childOption() 就是在構建 Netty Server 的各種參數。

2.1 group 設置

ServerBootstrap 和其父類 AbstractBootstrap 內部分別定義了兩個 EventLoopGroup group 成員。父類 AbstractBootstrap 的 group 是用來處理 accpet 事件的,ServerBootstrap 下的 childGroup 用來處理其它所有的讀寫等事件。

group() 方法就是把 EventLoopGroup 參數設置到自己的成員上完事。其中如果調用 group() 只傳入了一個執行緒池,那麼將來本服務下的所有事件都由這個執行緒池來處理。詳情查看飛哥精簡後的原始碼。

//file:io/netty/bootstrap/ServerBootstrap.java

public class ServerBootstrap extends AbstractBootstrap {

//用來處理非 accept 以外的執行緒池

private volatile EventLoopGroup childGroup;

public ServerBootstrap group(EventLoopGroup group) {

return group(group, group);

}

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {

super.group(parentGroup);

this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");

return this;

}

}

public abstract class AbstractBootstrap ... {

//用來處理 accept 的執行緒

volatile EventLoopGroup group;

public B group(EventLoopGroup group) {

this.group = group;

......

}

}

2.2 channel 設置

再看 ServerBootstrap#channel 方法 是用來定義一個工廠方法,將來需要創建 channel 的時候都調用該工廠進行創建。

//file:io/netty/bootstrap/ServerBootstrap.java

public class ServerBootstrap extends AbstractBootstrap {

public B channel(Class channelClass) {

return channelFactory(new ReflectiveChannelFactory(

ObjectUtil.checkNotNull(channelClass, "channelClass")

));

}

}

回頭看本文開頭 demo,.channel(NioServerSocketChannel.class)指的是將來需要創建 channel 的時候,創建 NioServerSocketChannel 這個類型的。

2.3 option 設置

再看 option 方法,只是設置到了 options 成員中而已

//file:io/netty/bootstrap/ServerBootstrap.java

public class ServerBootstrap extends AbstractBootstrap {

public B option(ChannelOption option, T value) {

ObjectUtil.checkNotNull(option, "option");

synchronized (options) {

if (value == null) {

options.remove(option);

} else {

options.put(option, value);

}

}

return self();

}

}

2.4 handler 方法

本文 demo 設置了兩處 handler,一處是 handler,另一處是 childHandler。他們都是分別設置到自己的成員上就完事,看原始碼。

//file:io/netty/bootstrap/ServerBootstrap.java

public class ServerBootstrap extends ...... {

public B handler(ChannelHandler handler) {

this.handler = ObjectUtil.checkNotNull(handler, "handler");

return self();

}

public ServerBootstrap childHandler(ChannelHandler childHandler) {

this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");

return this;

}

}

三、Netty bootstrap 啟動服務

ServerBootstrap 下的 bind 方法是服務啟動過程中非常重要的一個方法。創建父 channel(listen socket),創建 boss 執行緒,為 boss 執行緒綁定 Acceptor 處理器,調用系統調用 bind 進行綁定和監聽都是在這裡完成的。

先來直接看一下 bind 相關的入口原始碼。

//file:io/netty/bootstrap/ServerBootstrap.java

public class ServerBootstrap extends AbstractBootstrap ... {

......

}

//file:io/netty/bootstrap/AbstractBootstrap.java

public abstract class AbstractBootstrap ... {

public ChannelFuture bind(SocketAddress localAddress) {

validate();

return doBind(...);

}

private ChannelFuture doBind(final SocketAddress localAddress) {

//創建父 channel、初始化並且註冊

final ChannelFuture regFuture = initAndRegister();

final Channel channel = regFuture.channel();

......

//如果 Register 已經完成,則直接 doBind0

if (regFuture.isDone()) {

ChannelPromise promise = channel.newPromise();

doBind0(regFuture, channel, localAddress, promise);

return promise;

//否則就註冊一個 listener(回調),等 register 完成的時候調用

} else {

final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);

regFuture.addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

promise.registered();

doBind0(regFuture, channel, localAddress, promise);

}

return promise;

}

}

//創建 channel,對其初始化,並且 register(會創建 parent 執行緒)

final ChannelFuture initAndRegister() {

//3.1 創建父 channel(listen socket)

channel = channelFactory.newChannel();

//3.2 對父 channel(listen socket)進行初始化

init(channel);

//3.3 註冊並啟動 boss 執行緒

ChannelFuture regFuture = config().group().register(channel);

......

}

//3.4 真正的bind

private static void doBind0(...) {

channel.eventLoop().execute(new Runnable() {

@Override

public void run() {

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

......

}

});

}

}

在這個過程中,做了如下幾件重要的事情

  • 創建父 channel(listen socket)

  • 對父 channel(listen socket)進行初始化

  • register父 channel(listen socket)到主 group,並啟動主進程

  • 真正的 bind

接下來我們分開來看。

3.1 創建父 channel(listen socket)

在 initAndRegister() 方法中創建 channel(socket),它調用了 channelFactory.newChannel()。

public abstract class AbstractBootstrap

//創建 channel,對其初始化,並且 register(會創建 parent 執行緒)

final ChannelFuture initAndRegister() {

//3.1 創建 listen socket

channel = channelFactory.newChannel();

......

}

}

回想下2.2節的channel 方法,返回的是一個反射 ReflectiveChannelFactory。沒錯這裡的 newChannel 就是調用這個工廠方法來創建出來一個 NioServerSocketChannel 對象。

3.2 對父 channel(listen socket)進行初始化

在 initAndRegister 創建除了 channel 之後,需要調用 init 對其進行初始化。

public abstract class AbstractBootstrap

final ChannelFuture initAndRegister() {

//3.1 創建父 channel(listen socket)

//3.2 對父 channel(listen socket)進行初始化

init(channel);

......

}

}

在 init() 中對 channel 進行初始化,一是給 options 和 attrs 賦值,二是構建了父 channel 的 pipeline。

//file:src/main/java/io/netty/bootstrap/ServerBootstrap.java

public class ServerBootstrap extends AbstractBootstrap {

void init(Channel channel) {

//設置 option 和 attr

setChannelOptions(channel, newOptionsArray(), logger);

setAttributes(channel, newAttributesArray());

//設置 pipeline

ChannelPipeline p = channel.pipeline();

p.addLast(new ChannelInitializer() {

......

});

}

}

在 setChannelOptions 中對 channel 的各種 option 進行設置。回憶我們在使用 ServerBootstrap 時可以傳入 SO_BACKLOG,這就是其中的一個 option。在這裡會真正設置到 channel(socket)上。

ServerBootstrap b = new ServerBootstrap();

b.option(ChannelOption.SO_BACKLOG, 100)

在 init 中,稍微難理解一點是 p.addLast(new ChannelInitializer…)。這一段程式碼只是給父 channel 添加一個 handler 而已。其真正的執行要等到 register 後,我們待會再看。

3.3 register 父 channel

父 channel 在創建完,並且初始化之後,需要註冊到 boss 執行緒上才可用。

public abstract class AbstractBootstrap

final ChannelFuture initAndRegister() {

//3.1 創建父 channel(listen socket)

//3.2 對父 channel(listen socket)進行初始化

//3.3 註冊並啟動 boss 執行緒

ChannelFuture regFuture = config().group().register(channel);

......

}

}

其中 config().group() 最終會調用到 AbstractBootstrap#group,在這個方法裡獲取的是我們傳入進來的 bossGroup。

public abstract class AbstractBootstrap

volatile EventLoopGroup group;

public final EventLoopGroup group() {

return group;

}

}

其中 bossGroup 是一個 NioEventLoopGroup 實例,所以程式碼會進入到 NioEventLoopGroup#register 方法。

public class NioEventLoopGroup extends MultithreadEventLoopGroup {}

public abstract class MultithreadEventLoopGroup extends ... {

@Override

public ChannelFuture register(Channel channel) {

return next().register(channel);

}

@Override

public EventLoop next() {

return (EventLoop) super.next();

}

}

在 NioEventLoopGroup 裡包含一個或多個 EventLoop。上面的 next 方法就是從中選擇一個出來,然後將 channel 註冊到其上。

對於本文來講,我們使用的是 NioEventLoopGroup,其內部包含的自然也就是 NioEventLoop,我們繼續查找其 register 方法。

public final class NioEventLoop extends SingleThreadEventLoop

//在 eventloop 裡註冊一個 channle(socket)

public void register(final SelectableChannel ch, ...) {

......

register0(ch, interestOps, task);

}

//最終調用 channel 的 register

private void register0(SelectableChannel ch, int interestOps, NioTask task) {

ch.register(unwrappedSelector, interestOps, task);

}

}

可見,NioEventLoop 的 register 最後又調用到 channel 的 register 上了。在我們本文中,我們創建的 channel 是 NioServerSocketChannel,我們就依照這條線索來查。

//file:src/main/java/io/netty/channel/AbstractChannel.java

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

public final void register(EventLoop eventLoop, final ChannelPromise promise) {

......

//關聯自己到 eventLoop

AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {

register0(promise);

} else {

try {

eventLoop.execute(new Runnable() {

@Override

public void run() {

register0(promise);

}

});

}

......

}

}

}

在 channel 的父類 AbstractChannel 中的 register 中,先是把自己關聯到傳入的 eventLoop 上。接著調用 inEventLoop 來判斷執行緒當前運行的執行緒是否是 EventExecutor的支撐執行緒,是則返回直接 register0。

一般來說,服務在啟動的時候都是主執行緒在運行。這個時候很可能 boss 執行緒還沒有啟動。所以如果發現當前不是 boss 執行緒的話,就調用 eventLoop.execute 來啟動 boss 執行緒。

NioEventLoop 的父類是 SingleThreadEventExecutor, 找到 execute 方法。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

public void execute(Runnable task) {

execute0(task);

}

private void execute0(@Schedule Runnable task) {

execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));

}

private void execute(Runnable task, boolean immediate) {

boolean inEventLoop = inEventLoop();

addTask(task);

if (!inEventLoop) {

startThread();

}

if (!addTaskWakesUp && immediate) {

wakeup(inEventLoop);

}

}

}

我們先來看 addTask(task),它是將 task 添加到任務隊列中。等待執行緒起來以後再運行。

public abstract class SingleThreadEventExecutor extends ... {

private final Queue taskQueue;

protected void addTask(Runnable task) {

(task);

}

final boolean offerTask(Runnable task) {

return taskQueue.offer(task);

}

}

inEventLoop() 是判斷當前執行緒是不是自己綁定的執行緒,這時還在主執行緒中運行,所以 inEventLoop 為假,會進入 startThread 開始為 EventLoop 創建執行緒。

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {

private void startThread() {

doStartThread();

......

}

private void doStartThread() {

executor.execute(new Runnable() {

@Override

public void run() {

SingleThreadEventExecutor.this.run();

......

}

}

}

}

在 doStartThread 中調用 Java 執行緒管理工具 Executor 來啟動 boss 執行緒。

3.4 boss 執行緒啟動

當執行緒起來以後就進入了自己的執行緒循環中了,會遍歷自己的任務隊列,然後開始處理自己的任務。

public final class NioEventLoop extends SingleThreadEventLoop {

protected void run() {

for (;;) {

if (!hasTasks()) {

strategy = select(curDeadlineNanos);

}

//如果有任務的話就開始處理

runAllTasks(0);

//任務處理完畢就調用 epoll_wait 等待事件發生

processSelectedKeys();

}

}

}

前面我們在 3.3 節看到 eventLoop.execute 把一個 Runnable 任務添加到了任務隊列裡。當 EventLoop 執行緒啟動後,它會遍歷自己的任務隊列並開始處理。這時會進入到 AbstractChannel#register0 方法開始運行。

//file:src/main/java/io/netty/channel/AbstractChannel.java

public abstract class AbstractChannel extends ... {

public final void register(...) {

eventLoop.execute(new Runnable() {

@Override

public void run() {

register0(promise);

}

});

......

}

private void register0(ChannelPromise promise) {

doRegister();

......

}

}

函數 doRegister 是在 AbstractNioChannel 類下。

//file:io/netty/channel/nio/AbstractNioChannel.java

public abstract class AbstractNioChannel extends AbstractChannel {

private final SelectableChannel ch;

protected SelectableChannel javaChannel() {

return ch;

}

public NioEventLoop eventLoop() {

return (NioEventLoop) super.eventLoop();

}

protected void doRegister() throws Exception {

boolean selected = false;

for (;;) {

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

}

}

}

上面最關鍵的一句是selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);。這一句就相當於在 C 語言下調用 epoll_ctl 把 listen socket 添加到了 epoll 對象下。

其中 javaChannel 獲取父 channel,相當於 listen socket。unwrappedSelector 獲取 selector,相當於 epoll 對象。register 相當於使用 epoll_ctl 執行 add 操作。

當 channel 註冊完後,前面 init 時註冊的 ChannelInitializer 回調就會被執行。再回頭看它的 回調定義。

//file:src/main/java/io/netty/bootstrap/ServerBootstrap.java

public class ServerBootstrap extends AbstractBootstrap {

void init(Channel channel)

......

p.addLast(new ChannelInitializer() {

@Override

public void initChannel(final Channel ch) {

......

ch.eventLoop().execute(new Runnable() {

@Override

public void run() {

pipeline.addLast(new ServerBootstrapAcceptor(

ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

}

});

}

});

}

}

在 ChannelInitializer#initChannel 裡,又給 boss 執行緒的 pipeline 裡添加了一個任務。該任務是讓其在自己的 pipeline 上註冊一個 ServerBootstrapAcceptor handler。將來有新連接到達的時候,ServerBootstrapAcceptor 將會被執行。

3.5 真正的 bind

再看 doBind0 方法,調用 channel.bind 完成綁定。

private static void doBind0(...) {

channel.eventLoop().execute(new Runnable() {

@Override

public void run() {

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

......

}

});

}

四、新連接到達

我們再回到 boss 執行緒的主循環中。

public final class NioEventLoop extends SingleThreadEventLoop {

protected void run() {

for (;;) {

strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());

//任務隊列都處理完就開始 select

if (!hasTasks()) {

strategy = select(curDeadlineNanos);

}

//處理各種事件

if (strategy > 0) {

processSelectedKeys();

}

}

}

private int select(long deadlineNanos) throws IOException {

if (deadlineNanos == NONE) {

return selector.select();

}

// Timeout will only be 0 if deadline is within 5 microsecs

long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;

return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);

}

}

假如執行緒任務隊列中的任務都處理乾淨了的情況下,boss 執行緒會調用 select 來發現其 selector 上的各種事件。相當於 C 語言中的 epoll_wait。

當發現有事件發生的時候,例如 OP_WRITE、OP_ACCEPT、OP_READ 等的時候,會進入相應的處理

public final class NioEventLoop extends SingleThreadEventLoop {

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

......

if ((readyOps & SelectionKey.OP_WRITE) != 0) {

ch.unsafe().forceFlush();

}

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {

unsafe.read();

}

}

}

對於服務端的 Unsafe.read() 這裡會執行 io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe.read() 方法,它會調用 JDK 底層的 ServerSocketChannel.accept() 接收到客戶端的連接後,將其封裝成 Netty 的 NioSocketChannel,再通過 Pipeline 將 ChannelRead 事件傳播出去,這樣 ServerBootstrapAcceptor 就可以在 ChannelRead 回調裡處理新的客戶端連接了。

我們直接看 ServerBootstrapAcceptor#ChannelRead。

//file:

public class ServerBootstrap extends AbstractBootstrap {

......

private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

public void channelRead(ChannelHandlerContext ctx, Object msg) {

// 獲取child channel

final Channel child = (Channel) msg;

// 設置 childHandler 到 child channel

child.pipeline().addLast(childHandler);

// 設置 childOptions、 childAttrs

setChannelOptions(child, childOptions, logger);

setAttributes(child, childAttrs);

// 將 child channel 註冊到 childGroup

childGroup.register(child).addListener(new ChannelFutureListener() {

@Override

public void operationComplete(ChannelFuture future) throws Exception {

if (!future.isSuccess()) {

forceClose(child, future.cause());

}

}

});

}

}

}

在 channelRead 先是獲取到了新創建出來的子 channel,併為其 pipeline 添加 childHandler。回頭看 1.5 節,childHandler 是我們自定義的。

緊接著調用 childGroup.register(child) 將子 channel 註冊到 workerGroup 上。這個 register 過程和 3.3節、3.5節過程一樣。區別就是前面是父 channel 註冊到 bossGroup 上,這裡是子 channel 註冊到 workerGroup上。

在 register 完成後,子 channel 被掛到了 workerGroup 其中一個執行緒上,相應的執行緒如果沒有創建也會被創建出來並進入到自己的執行緒循環中。

當子 channel 註冊完畢的時候,childHandler 中 ChannelInitializer#initChannel 會被執行

public final class EchoServer {

public static void main(String[] args) throws Exception {

...

ServerBootstrap b = new ServerBootstrap();

b.childHandler(new ChannelInitializer() {

@Override

public void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline p = ch.pipeline();

if (sslCtx != null) {

p.addLast(sslCtx.newHandler(ch.alloc()));

}

p.addLast(serverHandler);

}

});

......

}

}

在 initChannel 把子 channel 的處理類 serverHandler 添加上來了。Netty demo 中對這個處理類的定義非常的簡單,僅僅只是列印出來而已。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

public void channelRead(......) {

ctx.write(msg);

}

......

}

五、使用者請求到達

當 worker 執行緒起來以後,會進入執行緒循環(boss 執行緒和 worker 執行緒的 run 函數是一個)。在循環中會遍歷自己的任務隊列,如果沒有任務可處理,便 select 來觀察自己所負責的 channel 上是否有事件發生。

public final class NioEventLoop extends SingleThreadEventLoop {

protected void run() {

for (;;) {

if (!hasTasks()) {

strategy = select(curDeadlineNanos);

}

//如果有任務的話就開始處理

runAllTasks(0);

//任務處理完畢就調用 epoll_wait 等待事件發生

processSelectedKeys();

}

}

private int select(long deadlineNanos) throws IOException {

selector.selectNow();

......

}

}

worker 執行緒會調用 select 發現自己所管理的所有子 channel 上的可讀可寫事件。在發現有可讀事件後,會調用 processSelectedKeys,最後觸發 pipeline 使得 EchoServerHandler 方法開始執行。

public class EchoServerHandler extends ChannelInboundHandlerAdapter {

public void channelRead(......) {

ctx.write(msg);

}

......

}

六、總結

事實上,Netty 對網路封裝的比較靈活。既支持單執行緒 Reactor,也支持多執行緒 Reactor、還支持主從多執行緒 Reactor。三種模型對應的用法如下:

public static void main(String[] args) throws Exception {

//單執行緒 Reactor

EventLoopGroup eventGroup = new NioEventLoopGroup(1);

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(eventGroup);

......

//多執行緒 Reactor

EventLoopGroup eventGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(eventGroup);

......

//主從多執行緒 Reactor

EventLoopGroup bossGroup = new NioEventLoopGroup(1);

EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();

serverBootstrap.group(bossGroup, workerGroup);

......

}

為了表述的更全面,本文飛哥選擇的是最為經典的 主從多執行緒 Reactor 模式。本文中所描述的內容可以用下面一幅圖來表示。

在 Netty 中的 boss 執行緒中負責對父 channel(listen socket)上事件的監聽和處理,當有新連接到達的時候,選擇一個 worker 執行緒把這個子 channel(連接 socket )叫給 worker 執行緒來處理。

其中 Worker 執行緒就是等待其管理的所有子 channel(連接 socket)上的事件的監聽和處理。當發現有事件發生的時候,回調使用者設置的 handler 進行處理。在本文的例子中,這個使用者 handler 就是 EchoServerHandler#channelRead。

至此,Netty 網路模組的工作核心原理咱們就介紹完了。飛哥一直「鼓吹」內功的好處。只要你具備了堅實的內功,各種語言裡看似風牛馬不相及的東西,在底層實際上原理是想通的。我本人從來沒用 Java 開發過伺服器程序,更沒碰過 Netty。但是當你多epoll有了深入理解的時候,再看Netty也能很容易看懂,很快就能理解它的核心。這就是鍛鍊內功的好處!

相關文章

C++ 之父誕生 | 歷史上的今天

C++ 之父誕生 | 歷史上的今天

整理 | 王啟隆 透過「歷史上的今天」,從過去看未來,從現在亦可以改變未來。 今天是 2022 年 12 月 30 日,在 1930 年的這...

蘋果推出 Mac OS X | 歷史上的今天

蘋果推出 Mac OS X | 歷史上的今天

整理 | 王啟隆 透過「歷史上的今天」,從過去看未來,從現在亦可以改變未來。 今天是 2023 年 3 月 24 日,在 2016 年的今天...