在 Netty 中,ThreadPerChannelEventLoop
是Oio
使用的工作线程。从ThreadPerChannelEventLoop
的继承树(不含接口),很容易看出,与Nio 工作线程类似,Oio
的工作线程也是单线程的。
1
2
3
SingleThreadEventExecutor
<- SingleThreadEventLoop
<- ThreadPerChannelEventLoop
与Nio
不同的是,Oio
中 channel 和工作线程是一对一的关系。
channel 注册工作线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// ThreadPerChannelEventLoop#register
private Channel ch;
public ChannelFuture register(ChannelPromise promise) {
// 调用父类方法 SingleThreadEventLoop#register 注册通道
return super.register(promise).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// 关联注册成功的 channel
ch = future.channel();
} else {
// 撤销
deregister();
}
}
});
}
// SingleThreadEventLoop#register
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 调用 Unsafe#register 完成注册
promise.channel().unsafe().register(this, promise);
return promise;
}
方法Unsafe#register
参考:Netty 之通道 AbstractChannel。
撤销 channel。
1
2
3
4
5
6
protected void deregister() {
// channel 字段置空
ch = null;
parent.activeChildren.remove(this);
parent.idleChildren.add(this);
}
从队列scheduledQueue
或taskQueue
中拿出一个任务。
流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// SingleThreadEventExecutor#takeTask
protected Runnable takeTask() {
// 确保在工作线程中执行
assert inEventLoop();
// taskQueue 类型限定 BlockingQueue
if (!(taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
}
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
// 看看有没有定时任务
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
// 没有定时任务
if (scheduledTask == null) {
Runnable task = null;
try {
// 从 taskQueue 中阻塞获取一个任务
task = taskQueue.take();
if (task == WAKEUP_TASK) {
// 忽略唤醒任务
task = null;
}
} catch (InterruptedException e) {
// 被中断,忽略之
}
// 返回任务,可能为 null
return task;
}
// 有定时任务
else {
// 定时任务延期执行时长
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
// 延期执行时长还有呢
if (delayNanos > 0) {
try {
// 我们用 delayNanos 这么长的时间去 taskQueue 中阻塞式拿任务
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// 被提前中断,返回 null
return null;
}
}
// delayNanos 过去了,taskQueue 依然没有任务
if (task == null) {
// 把定时任务队列 scheduledTaskQueue 中当前可以执行的任务移到 taskQueue 中来
fetchFromScheduledTaskQueue();
// 这时候 taskQueue 中肯定有任务了,拿出一个来吧
task = taskQueue.poll();
}
if (task != null) {
// 不是 null 就你了
return task;
}
}
}
}
单线程的工作线程执行流。流程:
关闭准备中
,如果是转 5,否则转 7;channel 的状态可能由于调用Unsafe#deregister
方法,注销了工作线程而处于未注册状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// ThreadPerChannelEventLoop#run
protected void run() {
for (;;) {
// 获取一个任务,普通任务或定时任务
Runnable task = takeTask();
if (task != null) {
// 运行之
task.run();
//更新最后运行时间
updateLastExecutionTime();
}
Channel ch = this.ch;
if (isShuttingDown()) { // 工作线程 `关闭准备中`
if (ch != null) {
// 关闭通道
ch.unsafe().close(ch.unsafe().voidPromise());
}
if (confirmShutdown()) {
break;
}
} else {
if (ch != null) {
// Handle deregistration
if (!ch.isRegistered()) {
// 执行taskQueue所有的任务,
// 并执行 scheduledTaskQueue 中到当前为止可以安排运行的任务。
runAllTasks();
deregister();
}
}
}
}
}
方法Unsafe#deregister
参考:Netty 之通道 AbstractChannel。
方法#confirmShutdown
和#runAllTasks
参考Netty 之工作线程 NioEventLoop。