0%

SourceAnalysis-Netty-NioEventLoop

阅读更多

1 继承体系

NioEventLoop

2 构造方法

NioEventLoop仅有一个构造方法,该方法接受如下几个参数

  1. NioEventLoopGroup parent:NioEventLoop由NioEventLoopGroup进行创建和管理
  2. Executor executor:用于创建线程的Executor,常用实现就是ThreadPerTaskExecutor
  3. SelectorProvider selectorProvider:用于创建Selector
  4. SelectStrategy strategy:Select的策略
  5. RejectedExecutionHandler rejectedExecutionHandler:拒绝执行任务是的策略
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
//产生Selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

继续跟踪父类SingleThreadEventLoop的构造方法

  • 该方法继续调用父类的构造方法
  • 初始化同步阻塞队列,用的是LinkedBlockingQueue作为实现
  • 其中tailTasks用于存放非ScheduledTask
1
2
3
4
5
6
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}

继续跟踪父类SingleThreadEventExecutor的构造方法

  • 该方法继续调用父类的构造方法
  • 初始化任务队列,用的是LinkedBlockingQueue作为实现
  • 其中taskQueue用于存放ScheduledTask
1
2
3
4
5
6
7
8
9
10
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

继续跟踪AbstractScheduledEventExecutor的构造方法

1
2
3
protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}

继续跟踪AbstractEventExecutor的构造方法

1
2
3
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}

2.1 openSelector

在NioEventLoop的构造方法中,有一个关键的方法openSelector,该方法负责创建Selector,在创建的过程中会添加在Netty中自定义的并且与Selector相关的组件,包括

  • SelectedSelectionKeySet:用于存放已被选择的SelectionKey
  • 为什么要这样做呢?从方法名(processSelectedKeysOptimized与processSelectedKeysPlain)猜测,可能是为了提升性能(SelectedSelectionKeySet底层是数组,而用原生的Set的话,只能用迭代器遍历)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}

if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}

//自定义的SelectedSelectionKeySet
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});

if (!(maybeSelectorImplClass instanceof Class) ||
//ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}

final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//在这里,通过反射,将selectorImplClass中的指定域替换成自定义的类型
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
if (cause != null) {
return cause;
}

//这里进行了替换
selectedKeysField.set(unwrappedSelector, selectedKeySet);
//这里进行了替换
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});

if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

3 execute

execute方法用于添加Runnable,并执行。本小节将解析任务提交以及执行的过程

execute方法位于SingleThreadEventExecutor中,主要就是将任务添加到队列当中。如果当前线程池的线程尚未启动,则启动它

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

若当前线程池尚未启动线程,那么执行startThread方法启动新线程,startThread方法继续调用doStartThread方法(如果这两个条件不成立会怎样?)

1
2
3
4
5
6
7
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}

doStartThread方法调用executor.execute来执行该Runnable,executor一般来说是ThreadPerTaskExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
updateLastExecutionTime();
try {
//主要执行这一句
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}

//Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}

try {
//Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}

terminationFuture.setSuccess(null);
}
}
}
}
});
}

接着调用位于ThreadPerTaskExecutor中的execute方法,该方法创建一个新的线程,并启动,进而执行command中定义的run方法

1
2
3
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}

再回到doStartThread方法中,该方法中定义的Runnble的run方法中,最重要的步骤就是执行SingleThreadEventExecutor.this.run()方法,这个方法在NioEventLoop中定义,我们在下一节中进行分析

4 run

run方法是NioEventLoop中的核心方法,该方法展示了整个NioEventLoop的处理流程。该方法包含两个核心流程

  1. processSelectedKeys
  2. runAllTasks
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
protected void run() {
for (;;) {
try {
//计算策略
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));

//'wakenUp.compareAndSet(false, true)' is always evaluated
//before calling 'selector.wakeup()' to reduce the wake-up
//overhead. (Selector.wakeup() is an expensive operation.)
//
//However, there is a race condition in this approach.
//The race condition is triggered when 'wakenUp' is set to
//true too early.
//
//'wakenUp' is set to true too early if:
//1) Selector is waken up between 'wakenUp.set(false)' and
//'selector.select(...)'. (BAD)
//2) Selector is waken up between 'selector.select(...)' and
//'if (wakenUp.get()) { ... }'. (OK)
//
//In the first case, 'wakenUp' is set to true and the
//following 'selector.select(...)' will wake up immediately.
//Until 'wakenUp' is set to false again in the next round,
//'wakenUp.compareAndSet(false, true)' will fail, and therefore
//any attempt to wake up the Selector will fail, too, causing
//the following 'selector.select(...)' call to block
//unnecessarily.
//
//To fix this problem, we wake up the selector again if wakenUp
//is true immediately after selector.select(...).
//It is inefficient in that it wakes up the selector for both
//the first case (BAD - wake-up required) and the second case
//(OK - no wake-up required).

if (wakenUp.get()) {
selector.wakeup();
}
//fall through
default:
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
//反正就是执行processSelectedKeys以及runAllTasks
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
//Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
//Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
//Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

4.1 processSelectedKeys

processSelectedKeys方法根据目前selectedKeys的状态,细化为两个相似的部分

  • selectedKeys != null,则调用processSelectedKeysOptimized方法
  • selectedKeys == null,则调用selector.selectedKeys()后获取Set,然后再调用processSelectedKeysPlain方法进行处理
  • 以上两个方法的差异无非就是一个处理的是SelectionKey数组,另一个处理SelectionKey的Set
1
2
3
4
5
6
7
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

4.1.1 processSelectedKeysOptimized

selectedKeys!=null时,调用processSelectedKeysOptimized方法进行后续处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
//null out entry in the array to allow to have it GC'ed once the Channel close
//See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;

final Object a = k.attachment();

//如果是AbstractNioChannel,则进一步调用processSelectedKey来处理
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

//如果还需要重新Select一次
if (needsToSelectAgain) {
//null out entries in the array to allow to have it GC'ed once the Channel close
//See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);

selectAgain();
i = -1;
}
}
}

processSelectedKey方法大致逻辑如下

  • 从Channel中获取NioUnsafe对象,该对象是执行底层IO操作的关键对象
  • 首先进行一系列的判断,若全部正常,则最终会调用read方法进行数据的读取操作,关于read方法目前不再深入分析了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
//If the channel implementation throws an exception because there is no event loop, we ignore this
//because we are only trying to determine if ch is registered to this event loop and thus has authority
//to close ch.
return;
}
//Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
//and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
//still healthy and should not be closed.
//See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
//close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}

try {
int readyOps = k.readyOps();
//We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
//the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
//See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

//Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
//to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//当一切正常时,调用read方法读取数据
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

4.1.2 processSelectedKeysPlain

selectedKeys==null时,调用processSelectedKeysOptimized方法进行后续处理,该方法与processSelectedKeysOptimized的区别仅仅是前者处理Set,后者处理数组。因此不再赘述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
//check if the set is empty and if so just return to not create garbage by
//creating a new Iterator every time even if there is nothing to process.
//See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}

Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (!i.hasNext()) {
break;
}

if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();

//Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}

4.2 runAllTasks

runAllTasks方法位于SingleThreadEventExecutor,该方法主要用于执行那些与Selector无关的,通过execute提交的任务。该方法主要逻辑如下

  • 首先获取所有ScheduledTask,并执行
  • 然后执行所有其他任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;

do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); //keep on processing until we fetched all scheduled tasks.

if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}

runAllTasksFrom方法位于SingleThreadEventExecutor中,其逻辑很简单,从指定的任务队列中获取任务,然后执行

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}

afterRunningAllTasks方法位于SingleThreadEventLoop中,这里讲tailTasks作为参数,然后执行位于SingleThreadEventExecutor的方法runAllTasksFrom

1
2
3
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}