Netty 之工作线程组 NioEventLoopGroup


综述

NioEventLoopGroup中内部维护一个工作线程数组,channel 注册时,它从数组中按特定算法挑选一个,提供给 channel 。默认的挑选算法是循环挑选

NioEventLoopGroup负责管理内部工作线程的生命周期。所有作为EventExecutor的职责,都通过委托的方式,给其中一个工作线程来处理。

工作线程NioEventLoop请参考Netty 之工作线程 NioEventLoop

下面是NioEventLoopGroup继承树,不含接口。

1
2
3
4
AbstractEventExecutorGroup
<- MultithreadEventExecutorGroup
<- MultithreadEventLoopGroup
<- NioEventLoopGroup

主要的初始化工作在MultithreadEventExecutorGroup的构造函数中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
protected MultithreadEventExecutorGroup(
        int nThreads, Executor executor, 
        EventExecutorChooserFactory chooserFactory, Object... args) {
    // 线程数如果不提供,默认是 cpu 核数 * 2
    if (nThreads <= 0) {
        throw new IllegalArgumentException(
            String.format("nThreads: %d (expected: > 0)", nThreads)
        );
    }
    // executor 提供真正的执行线程
    if (executor == null) {
        // 默认使用 ThreadPerTaskExecutor,此时一个工作线程对应一个底层 java 线程
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    // 初始化工作线程存放数组
    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 初始化每个工作线程
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                // 初始化工作线程失败
                for (int j = 0; j < i; j ++) {
                    // 依次关闭初始化成功的工作线程
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        // 无限等待工作线程终止
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // 被中断,设置中断标志,退出循环,剩下的留给用户处理
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    // 初始化工作线程挑选算法,默认是循环挑选
    chooser = chooserFactory.newChooser(children);
    // 初始化工作线程终止监听器
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                // 所有 工作线程 都已终止,设置 工作线程组 已终止
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        // 依次添加工作线程终止监听器
        e.terminationFuture().addListener(terminationListener);
    }
    // 初始化只读工作线程集合 readonlyChildren
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
// NioEventLoopGroup#newChild
// 返回新的 工作线程 实例
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(
        this, 
        executor, 
        (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), 
        (RejectedExecutionHandler) args[2]
    );
}

#next

从内部数组中挑选一个工作线程并返回。Netty自带的 2 个选择器都是按循环挑选算法返回工作线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public EventExecutor next() {
    return chooser.next();
}
// DefaultEventExecutorChooserFactory$PowerOfTwoEventExecutorChooser
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        // 自增取模
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}
// DefaultEventExecutorChooserFactory$GenericEventExecutorChooser
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        // 自增取模
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

#register

从内部数组中选取一个工作线程注册 channel。

1
2
3
4
5
6
7
8
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

@Override
public ChannelFuture register(ChannelPromise promise) {
    return next().register(promise);
}

#rebuildSelectors

重建数组中所有工作线程的 Selector。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public void rebuildSelectors() {
    for (EventExecutor e: this) {
        ((NioEventLoop) e).rebuildSelector();
    }
}
// NioEventLoop#rebuildSelector
// 确保在工作线程自身线程中处理重建
public void rebuildSelector() {
    if (!inEventLoop()) {
        execute(new Runnable() {
            @Override
            public void run() {
                rebuildSelector0();
            }
        });
        return;
    }
    
    rebuildSelector0();
}
// NioEventLoop#rebuildSelector0
private void rebuildSelector0() {
    final Selector oldSelector = selector;
    final SelectorTuple newSelectorTuple;

    if (oldSelector == null) {
        return;
    }

    try {
        // 创建新的 Selector 实例
        newSelectorTuple = openSelector();
    } catch (Exception e) {
        // 创建失败返回
        logger.warn("Failed to create a new Selector.", e);
        return;
    }

    // 成功迁移数,日志用
    int nChannels = 0;
    // 注册所有 channel 到新的 Selector 上
    for (SelectionKey key: oldSelector.keys()) {
        Object a = key.attachment();
        try {
            if (!key.isValid() 
                || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                continue;
            }

            int interestOps = key.interestOps();
            key.cancel();
            // channel 注册新的 Selector
            SelectionKey newKey = key.channel().register(
                    newSelectorTuple.unwrappedSelector, interestOps, a);

            if (a instanceof AbstractNioChannel) {
                // Update SelectionKey
                ((AbstractNioChannel) a).selectionKey = newKey;
            }
            nChannels ++;
        } catch (Exception e) {
            logger.warn("Failed to re-register a Channel to the new Selector.", e);
            if (a instanceof AbstractNioChannel) {
                // channel 迁移失败,关闭之
                AbstractNioChannel ch = (AbstractNioChannel) a;
                ch.unsafe().close(ch.unsafe().voidPromise());
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, key, e);
            }
        }
    }

    selector = newSelectorTuple.selector;
    unwrappedSelector = newSelectorTuple.unwrappedSelector;

    try {
        // 是时候关闭就的 Selector 了
        oldSelector.close();
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("Failed to close the old Selector.", t);
        }
    }

    if (logger.isInfoEnabled()) {
        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }
}

#awaitTermination

在指定时间timeout内等待工作线程组终止,并返回是否终止成功。如果在超时之前返回,一般说明终止成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
    // 截止时间
    long deadline = System.nanoTime() + unit.toNanos(timeout);
    // 一个个等啊
    loop: for (EventExecutor l: children) {
        for (;;) {
            // 剩余时间
            long timeLeft = deadline - System.nanoTime();
            if (timeLeft <= 0) {
                // 没有时间了,GG
                break loop;
            }
            if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
                // 咱已终止了,下一个吧
                break;
            }
        }
    }
    return isTerminated();
}