Netty 之发送缓冲区 ChannelOutboundBuffer


0x01 综述

ChannelOutboundBuffer 为 Channel 的数据发送缓冲区,数据封装成以 Entry 节点的形式存放在单向链表中。链表有三个指针:

1
2
3
4
5
6
7
8
// The Entry that is the first in the linked-list structure that was flushed
private Entry flushedEntry;
// The Entry which is the first unflushed in the linked-list structure
private Entry unflushedEntry;
// The Entry which represents the tail of the buffer
private Entry tailEntry;
// The number of flushed entries that are not written yet
private int flushed;

它们把整个链表分成了 2 个区间,flush 区间 [flushedEntry, unflushedEntry) 和 非 flush 区间 [unflushedEntry, tailEntry]。

flushedEntry 要么为 null,说明还没有调用 ChannelOutboundBuffer#flush 操作,此时 unflushedEntry 为头指针;要么充当头指针,此时区间 [flushedEntry, unflushedEntry) 为已 flush 数据,字段 flushed 记录该区间大小。

tailEntry 为尾指针,方便往链表尾部添加 Entry。


0x02 #addMessage

ChannelOutboundBuffer#addMessage 在链表非 flush 区间尾部添加一个 Entry。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    incrementPendingOutboundBytes(entry.pendingSize, false);
}

方法 #incrementPendingOutboundBytes 修改缓冲区中的数据量 totalPendingSize 。如果 totalPendingSize 大于 channel 配置的写缓冲区高水位线,则触发 ChannelPipeline 的 ChannelWritabilityChanged 事件,禁止继续写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// totalPendingSize 原子修改器
private static final 
AtomicLongFieldUpdater<ChannelOutboundBuffer> 
TOTAL_PENDING_SIZE_UPDATER =
        AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");

// 出站缓冲区中的数据量
@SuppressWarnings("UnusedDeclaration")
private volatile long totalPendingSize;

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

private void setUnwritable(boolean invokeLater) {
    for (;;) {
        final int oldValue = unwritable;
        final int newValue = oldValue | 1;
        if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
            if (oldValue == 0 && newValue != 0) {
                fireChannelWritabilityChanged(invokeLater);
            }
            break;
        }
    }
}

invokeLater 参数指定是否立即执行还是异步执行。


0x03 #addFlush

ChannelOutboundBuffer#addFlush 把当前链表中处于非 flush 区间 [unflushedEntry, tailEntry] 的 Entry 逐个加入到flush 区间 [flushedEntry, unflushedEntry) 中。

遍历的过程当中,那些 promise 不能设置成不可撤销的 Entry ,调用 Entry#cancel 回收内存并减少 totalPendingSize 。如果 totalPendingSize 小于 channel 配置的写缓冲区低水位线,则触发 ChannelPipeline 的 ChannelWritabilityChanged 事件,设置可写。最后置 unflushedEntry 为 null。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void addFlush() {
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}

private void decrementPendingOutboundBytes(long size, boolean invokeLater, 
        boolean notifyWritability) {
    if (size == 0) {
        return;
    }

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
        setWritable(invokeLater);
    }
}

// Entry#cancel()
int cancel() {
    if (!cancelled) {
        cancelled = true;
        int pSize = pendingSize;

        // release message and replace with an empty buffer
        ReferenceCountUtil.safeRelease(msg);
        msg = Unpooled.EMPTY_BUFFER;

        pendingSize = 0;
        total = 0;
        progress = 0;
        bufs = null;
        buf = null;
        return pSize;
    }
    return 0;
}

0x04 #current

ChannelOutboundBuffer#current 返回 flushedEntry 指向的 Entry 中的数据。

1
2
3
4
5
6
7
8
public Object current() {
    Entry entry = flushedEntry;
    if (entry == null) {
        return null;
    }

    return entry.msg;
}

0x05 #progress

ChannelOutboundBuffer#progress 进度通知。如果 flushedEntry 中的 promise 为 ChannelProgressivePromise 类型,则尝试通知进度,也就是当前 Entry 中的数据真正写入 channel 的进度。

1
2
3
4
5
6
7
8
9
10
public void progress(long amount) {
    Entry e = flushedEntry;
    assert e != null;
    ChannelPromise p = e.promise;
    if (p instanceof ChannelProgressivePromise) {
        long progress = e.progress + amount;
        e.progress = progress;
        ((ChannelProgressivePromise) p).tryProgress(progress, e.total);
    }
}

0x06 #remove

ChannelOutboundBuffer#remove 从链表中删除 flushedEntry 指向的 Entry , flushedEntry 指向下一个 Entry。

如果 flushedEntry 为 null,则清空 nioBuffers 缓存,直接返回 false。否则从链表中删除 Entry,设置该 Entry 的 promise 为 success;修改 totalPendingSize,如果 totalPendingSize 小于 channel 配置的写缓存低水位线,则触发 ChannelPipeline 的 ChannelWritabilityChanged 事件。

最后回收 Entry。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public boolean remove() {
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

    removeEntry(e);

    if (!e.cancelled) {
        // only release message, notify and decrement if it was not canceled before.
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
        decrementPendingOutboundBytes(size, false, true);
    }

    // recycle the entry
    e.recycle();

    return true;
}

private void removeEntry(Entry e) {
    if (-- flushed == 0) {
        // processed everything
        flushedEntry = null;
        if (e == tailEntry) {
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        flushedEntry = e.next;
    }
}

0x07 #remove

ChannelOutboundBuffer#remove(Throwable cause) 基本逻辑跟 ChannelOutboundBuffer#remove一致,除了设置 Entry 的 promise 为 fail。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public boolean remove(Throwable cause) {
    return remove0(cause, true);
}

private boolean remove0(Throwable cause, boolean notifyWritability) {
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

    removeEntry(e);

    if (!e.cancelled) {
        // only release message, fail and decrement if it was not canceled before.
        ReferenceCountUtil.safeRelease(msg);

        safeFail(promise, cause);
        decrementPendingOutboundBytes(size, false, notifyWritability);
    }

    // recycle the entry
    e.recycle();

    return true;
}

0x08 #removeBytes

ChannelOutboundBuffer#removeBytes(long writtenBytes) 从 flushedEntry 指向的 Entry 开始,依次删除数据全部发送完的 Entry,更新部分发送完 Entry 的 readerIndex,并对每个 Entry 中的 promise 发出进度通知。

最后清空 nioBuffers 缓存。

本方法的前提是链表中 Entry 的数据类型为 ByteBuf。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void removeBytes(long writtenBytes) {
    for (;;) {
        Object msg = current();
        if (!(msg instanceof ByteBuf)) {
            assert writtenBytes == 0;
            break;
        }

        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;

        if (readableBytes <= writtenBytes) {
            if (writtenBytes != 0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            remove();
        } else { // readableBytes > writtenBytes
            if (writtenBytes != 0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break;
        }
    }
    clearNioBuffers();
}

0x09 #nioBuffers

ChannelOutboundBuffer#nioBuffers 返回flush 区间上 Entry#msg 中的底层数据载体 ByteBufer 的数组。

Entry 中的数据一般存放在 ByteBuf 中,而一个 ByteBuf 底层由一个或多个 ByteBuffer 组成(简单理解)。最终返回的 ByteBuffer 数组存放在线程本地变量中。

  • maxCount 为 ByteBufer[] 最大长度,
  • nioBufferCount 为 ByteBufer[] 实际长度;
  • maxBytes 为 ByteBufer[] 中数据最大值字节数;
  • nioBufferSize 为 ByteBufer[] 中数据实际字节数。

由于 maxCountmaxBytes 的存在,很多时候只能返回flush 区间上的一部分 Entry 的数据,甚至某个 Entry 中的一部分数据

部分操作系统的 writeX() 系统调用最大只能允许 Integer.MAX_VALUE 字节的数据写入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public ByteBuffer[] nioBuffers(int maxCount, long maxBytes) {
    assert maxCount > 0;
    assert maxBytes > 0;
    // ByteBufer[] 中数据实际字节数
    long nioBufferSize = 0;
    // ByteBufer[] 实际长度
    int nioBufferCount = 0;
    final InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    // 获取线程本地缓存中的数组,应该是上一次调用本方法时放入的 ByteBuffer
    ByteBuffer[] nioBuffers = NIO_BUFFERS.get(threadLocalMap);
    Entry entry = flushedEntry;
    // 遍历整个 flush 区间
    while (isFlushedEntry(entry) && entry.msg instanceof ByteBuf) {
        if (!entry.cancelled) { // 排除已取消的 Entry
            ByteBuf buf = (ByteBuf) entry.msg;
            final int readerIndex = buf.readerIndex();
            // 数据大小
            final int readableBytes = buf.writerIndex() - readerIndex;

            if (readableBytes > 0) {
                // 数组数据量控制
                // 注意防止溢出而没有这样判断:maxBytes < nioBufferSize + readableBytes
                if (maxBytes - readableBytes < nioBufferSize && nioBufferCount != 0) {
                    break;
                }
                // 累计数据量大小
                nioBufferSize += readableBytes;

                int count = entry.count;
                if (count == -1) {  
                    // -1 说明还没有被缓存
                    // 缓存 msg 中 ByteBuffer 的数量到 entry.count
                    entry.count = count = buf.nioBufferCount();
                }
                // 计算加入 msg 中的 (ByteBuffer)s 之后的数组大小
                int neededSpace = min(maxCount, nioBufferCount + count);
                // 超出现有大小,需要扩容
                if (neededSpace > nioBuffers.length) {
                    // 数组扩容
                    nioBuffers = expandNioBufferArray(nioBuffers, neededSpace, nioBufferCount);
                    // 扩容后写回本地线程缓存
                    NIO_BUFFERS.set(threadLocalMap, nioBuffers);
                }
                
                if (count == 1) { 
                    // msg 中只有 1 个 ByteBuffer
                    ByteBuffer nioBuf = entry.buf;
                    if (nioBuf == null) {
                        // 缓存 msg 中的 ByteBuffer 到 entry.buf
                        entry.buf = nioBuf = buf.internalNioBuffer(readerIndex, readableBytes);
                    }
                    // ByteBuffer 加入返回数组 ByteBuffer[]
                    nioBuffers[nioBufferCount++] = nioBuf;
                } 
                // msg 中有多个 ByteBuffer
                else {   
                    ByteBuffer[] nioBufs = entry.bufs;
                    if (nioBufs == null) {
                        // 缓存 msg 中的 (ByteBuffer)s 到 entry.bufs
                        entry.bufs = nioBufs = buf.nioBuffers();
                    }

                    // 依次加入有数据的 ByteBuffer 到返回数组 ByteBuffer[]
                    for (int i = 0; i < nioBufs.length && nioBufferCount < maxCount; ++i) {
                        ByteBuffer nioBuf = nioBufs[i];
                        if (nioBuf == null) {
                            // 一旦为 null,后面的就不用管了,ByteBuf 的具体实现以后分析
                            break;
                        } else if (!nioBuf.hasRemaining()) {
                            // 忽略没有数据可读的 nioBuf
                            continue;
                        }
                        nioBuffers[nioBufferCount++] = nioBuf;
                    }
                }
                // 数组大小控制
                if (nioBufferCount == maxCount) {
                    break;
                }
            }
        }
        entry = entry.next;
    }
    this.nioBufferCount = nioBufferCount;
    this.nioBufferSize = nioBufferSize;

    return nioBuffers;
}

// 翻倍扩容
private static ByteBuffer[] expandNioBufferArray(ByteBuffer[] array, 
        int neededSpace, int size) {
    int newCapacity = array.length;
    do {
        newCapacity <<= 1;
        if (newCapacity < 0) {
            throw new IllegalStateException();
        }
    } while (neededSpace > newCapacity);

    ByteBuffer[] newArray = new ByteBuffer[newCapacity];
    System.arraycopy(array, 0, newArray, 0, size);
    return newArray;
}

0x0A #failFlushed

删除区间 [flushedEntry, unflushedEntry) 上的 Entry ,并设置 Entry 的 promise 为失败。

参数 notify 指定其他条件满足的情况下,是否需要出发 WritabilityChanged 事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void failFlushed(Throwable cause, boolean notify) {
    if (inFail) {
        return;
    }

    try {
        inFail = true;
        for (;;) {
            if (!remove0(cause, notify)) {
                break;
            }
        }
    } finally {
        inFail = false;
    }
}

0x0B #bytesBeforeWritable

ChannelOutboundBuffer#isWritable 返回 false 时,totalPendingSize 高于 channel 配置的缓冲区低水位线字节数,否则返回 0。

大白话:写开关关闭的情况下,需要从缓冲区拿掉多少字节,才能继续写。

1
2
3
4
5
6
7
public long bytesBeforeWritable() {
    long bytes = totalPendingSize - channel.config().getWriteBufferLowWaterMark();
    if (bytes > 0) {
        return isWritable() ? 0 : bytes;
    }
    return 0;
}

0x0C #bytesBeforeUnwritable

ChannelOutboundBuffer#isWritable 返回 true 时,totalPendingSize 低于 channel 配置的缓冲区高水位线字节数,否则返回 0。

大白话:写开关打开的情况下,还能向缓冲区写多少字节。

1
2
3
4
5
6
7
public long bytesBeforeUnwritable() {
    long bytes = channel.config().getWriteBufferHighWaterMark() - totalPendingSize;
    if (bytes > 0) {
        return isWritable() ? bytes : 0;
    }
    return 0;
}