Netty 之内存分配:PoolArena


综述

本文只具体分析PoolArena的子类DirectArena

下面是PoolArena内部的整体数据结构。

1
2
3
4
5
// q... 为 块链表 PoolChunkList
qInit ==> q000 <==> q025 <==> q050 <==> q075 <==> q100

// xxx为数字,块为 PoolChunk
qxxx ==> 1 <==> 2 <==> ... <==> N

#allocateNormal

正常内存分配路径。沿着 q050、q025、q000、qInit、q075的顺序,依次用各个块链表分配空间,有一个分配成功则退出。

块链表参见Netty 之内存分配:PoolChunkList

如果都不能分配,则新创建一个块并加入到 qInit 中。从新块中分配空间。块内空间分配见Netty 之内存分配:Buddy 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    if (q050.allocate(buf, reqCapacity, normCapacity) 
        || q025.allocate(buf, reqCapacity, normCapacity) 
        || q000.allocate(buf, reqCapacity, normCapacity) 
        || qInit.allocate(buf, reqCapacity, normCapacity) 
        || q075.allocate(buf, reqCapacity, normCapacity)) {
        return;
    }

    // 创建一个新的 块
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    // 从新块中分配空间
    long handle = c.allocate(normCapacity);
    // 确信成功分配
    assert handle > 0;
    // 初始化 buf 底层内存空间
    c.initBuf(buf, handle, reqCapacity);
    // 把新创建的块加入 qInit 中
    qInit.add(c);
}

#newChunk创建一个新的块。

如果系统提供的直接内存没有自带 cleaner,这个时候,需要 Netty 通过反射的方式创建DirectByteBuffer实例,并负责管理所分配空间的计数与释放。其他情况下,可以直接调用ByteBuffer@allocateDirect创建DirectByteBuffer实例。

如果需要地址对齐,实际分配的直接内存会多directMemoryCacheAlignment个字节,块中实际使用的内存地址偏移量offset就等于对齐需要的偏移量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// DirectArena#newChunk
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder,
        int pageShifts, int chunkSize) {
    // 不用对齐
    if (directMemoryCacheAlignment == 0) {
        return new PoolChunk<ByteBuffer>(
            this,   // 所属 arena
            allocateDirect(chunkSize),  // 底层空间
            pageSize, maxOrder, pageShifts, 
            chunkSize,  // 块大小
            0   // 地址偏移
        );
    }

    final ByteBuffer memory = allocateDirect(chunkSize + directMemoryCacheAlignment);
    return new PoolChunk<ByteBuffer>(
        this,   // 所属 arena
        memory, // 底层空间
        pageSize, maxOrder, pageShifts, 
        chunkSize,  // 块大小
        offsetCacheLine(memory) // 地址偏移
    );
}
// DirectArena@allocateDirect
private static ByteBuffer allocateDirect(int capacity) {
    return PlatformDependent.useDirectBufferNoCleaner() 
            // 通过反射的方式,创建 DirectByteBuffer 实例
            ? PlatformDependent.allocateDirectNoCleaner(capacity) 
            // 直接创建 DirectByteBuffer 实例
            : ByteBuffer.allocateDirect(capacity);
}
// DirectArena#offsetCacheLine
// 计算对齐所需要的偏移量
private int offsetCacheLine(ByteBuffer memory) {
    // 有 Unsafe 时才能计算偏移量,否则会抛异常 NPE.
    int remainder = HAS_UNSAFE
            ? (int) (PlatformDependent.directBufferAddress(memory) 
                    & directMemoryCacheAlignmentMask)
            : 0;
    // offset = alignment - address & (alignment - 1)
    return directMemoryCacheAlignment - remainder;
}

#allocateHuge

PooledByteBuf实例单独创建一个非池化的临时块,并初始化之。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
    PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
    // 更新统计量
    activeBytesHuge.add(chunk.chunkSize());
    // 底层空间初始化
    buf.initUnpooled(chunk, reqCapacity);
    // 更新统计量
    allocationsHuge.increment();
}
// 创建一个非池化的临时块
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
    // 无需对齐
    if (directMemoryCacheAlignment == 0) {
        return new PoolChunk<ByteBuffer>(this, allocateDirect(capacity), capacity, 0);
    }
    // 要对齐,修改容量成对齐后的大小
    final ByteBuffer memory = allocateDirect(capacity + directMemoryCacheAlignment);
    // offsetCacheLine 计算对齐偏移量
    return new PoolChunk<ByteBuffer>(this, memory, capacity, offsetCacheLine(memory));
}

#allocate

返回一个PooledByteBuf实例,分配并初始化其底层依赖的内存空间。

reqCapacity为请求容量,而maxCapacity为最大容量,之后调整实例容量的时候,只能在范围[0, maxCapacity]中调整,见下面的方法#reallocate

堆外内存管理参见Netty 之内存分配:堆外内存分配与释放块内空间分配见Netty 之内存分配:Buddy 算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
    // 创建或重用 buf 实例
    PooledByteBuf<T> buf = newByteBuf(maxCapacity);
    // 分配其底层依赖的内存空间
    allocate(cache, buf, reqCapacity);
    // 谁要给谁
    return buf;
}
// in DirectArena
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
    if (HAS_UNSAFE) {
        // 内部使用 Unsafe 直接访问内存地址的方式操作数据
        return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
    } else {
        // 内部使用 ByteBuffer 提供的 api 操作数据
        return PooledDirectByteBuf.newInstance(maxCapacity);
    }
}
// 分配并初始化 buf 底层依赖的内存空间
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, 
        final int reqCapacity) {
    // 规范化请求容量 reqCapacity
    final int normCapacity = normalizeCapacity(reqCapacity);
    // 页内空间分配
    if (isTinyOrSmall(normCapacity)) {
        int tableIdx;
        PoolSubpage<T>[] table;
        boolean tiny = isTiny(normCapacity);
        // 小于 512 为 tiny
        if (tiny) { 
            if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                // cache 中分配成功,nice
                return;
            }
            // cache 不能分配
            // 根据容量,确定我们要从 tinySubpagePools 中的哪个链表分配空间
            tableIdx = tinyIdx(normCapacity);
            table = tinySubpagePools;
        } 
        // [512, 8k) 为 small 
        else {
            if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                // cache 中分配成功,good
                return;
            }
            // cache 不能分配
            // 根据容量,确定我们要从 smallSubpagePools 中的哪个链表分配空间
            tableIdx = smallIdx(normCapacity);
            table = smallSubpagePools;
        }
        // 拿出对应链表的首指针 head
        final PoolSubpage<T> head = table[tableIdx];
        // 其他有地方会修改链表内容,会竞争,因此咱们给锁上
        synchronized (head) {
            // 链表首页
            final PoolSubpage<T> s = head.next;
            // 链表不为空呀
            if (s != head) {
                assert s.doNotDestroy && s.elemSize == normCapacity;
                // 拿到分配的页内空间编号
                long handle = s.allocate();
                // 不能小于 0 哦
                assert handle >= 0;
                // 初始化 buf 底层依赖的空间
                s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                // 更新统计量
                incTinySmallAllocation(tiny);
                // 分配并初始化结束
                return;
            }
        }
        // 页内空间相应的链表为空
        synchronized (this) {
            // 走正常路径分配
            allocateNormal(buf, reqCapacity, normCapacity);
        }
        // 更新统计量
        incTinySmallAllocation(tiny);
        // 分配并初始化结束
        return;
    }
    // 块空间分配
    if (normCapacity <= chunkSize) {
        // cache 中分配
        if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
            // cache 分配成功,GG
            return;
        }
        
        synchronized (this) {
            // cache 分配失败,走正常路径分配
            allocateNormal(buf, reqCapacity, normCapacity);
            // 更新统计量
            ++allocationsNormal;
        }
    } else {
        // 太大了,超出块的分配空间
        // 单独分配一个临时创建的大的块,并初始化
        allocateHuge(buf, reqCapacity);
    }
}

#reallocate

调整PooledByteBuf的容量,内部实际上重新分配了PooledByteBuf底层空间。扩容时,需要数据复制,收缩时可能不需要。

要注意,容量收缩的时候,可能会导致部分数据丢失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
    // 调整范围 [0, macCapacity]
    if (newCapacity < 0 || newCapacity > buf.maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = buf.length;
    if (oldCapacity == newCapacity) {
        // 容量没有变动,GG
        return;
    }

    // 备份原值快照
    PoolChunk<T> oldChunk = buf.chunk;
    long oldHandle = buf.handle;
    T oldMemory = buf.memory;
    int oldOffset = buf.offset;
    int oldMaxLength = buf.maxLength;
    int readerIndex = buf.readerIndex();
    int writerIndex = buf.writerIndex();
    // 调用 #allocate 分配并初始化 buf 新空间
    allocate(parent.threadCache(), buf, newCapacity);
    // 扩容
    if (newCapacity > oldCapacity) {
        // 数据复制
        memoryCopy(oldMemory, oldOffset, buf.memory, buf.offset, oldCapacity);
    } 
    // 收缩
    else if (newCapacity < oldCapacity) {
        if (readerIndex < newCapacity) {
            if (writerIndex > newCapacity) {
                // 不能继续写了
                writerIndex = newCapacity;
            }
            // 数据复制
            memoryCopy(oldMemory, oldOffset + readerIndex,
                    buf.memory, buf.offset + readerIndex, writerIndex - readerIndex);
        } 
        else {
            // 读写都不行了,数据也就不需要复制了
            readerIndex = writerIndex = newCapacity;
        }
    }
    // 初始化读写位置
    buf.setIndex(readerIndex, writerIndex);

    if (freeOldMemory) {
        // 回收原先的内存空间
        free(oldChunk, oldHandle, oldMaxLength, buf.cache);
    }
}
// DirectArena#memoryCopy
// 内存数据拷贝
protected void memoryCopy(ByteBuffer src, int srcOffset, 
        ByteBuffer dst, int dstOffset, int length) {
    if (length == 0) {
        return;
    }

    if (HAS_UNSAFE) {
        PlatformDependent.copyMemory(
                PlatformDependent.directBufferAddress(src) + srcOffset,
                PlatformDependent.directBufferAddress(dst) + dstOffset, length);
    } else {
        // 其他 Netty buffers 可能会继续操作 src 和 dst,我们拷贝一份外包装就行
        src = src.duplicate();
        dst = dst.duplicate();
        src.position(srcOffset).limit(srcOffset + length);
        dst.position(dstOffset);
        dst.put(src);
    }
}

#free

回收块中空间,更新各种统计量。

  • 临时分配的大空间,需要被销毁;
  • 块中空间回收后,利用率下降,可能需要在链表中往前移动,完全空闲时,该块需要被销毁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
    // 非池化的
    if (chunk.unpooled) {
        int size = chunk.chunkSize();
        // 释放块中的内存
        destroyChunk(chunk);
        // 更新统计量
        activeBytesHuge.add(-size);
        // 更新统计量
        deallocationsHuge.increment();
    } 
    // 池中的
    else {
        // 极小、小、普通
        SizeClass sizeClass = sizeClass(normCapacity);
        if (cache != null 
                && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
            // cached so not free it.
            return;
        }
        // 回收空间,适当的时候释放块空间
        freeChunk(chunk, handle, sizeClass);
    }
}
// 极小、小、普通
private SizeClass sizeClass(int normCapacity) {
    if (!isTinyOrSmall(normCapacity)) {
        return SizeClass.Normal;
    }
    return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
}
// 回收空间,适当的时候释放块空间
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
    final boolean destroyChunk;
    synchronized (this) {
        // 更新统计量
        switch (sizeClass) {
        case Normal:
            ++deallocationsNormal;
            break;
        case Small:
            ++deallocationsSmall;
            break;
        case Tiny:
            ++deallocationsTiny;
            break;
        default:
            throw new Error();
        }
        // 回收块中的空间,如果块完全空闲,设置销毁
        destroyChunk = !chunk.parent.free(chunk, handle);
    }
    // 块完全空闲,需要销毁
    if (destroyChunk) {
        // 释放块中的分配的内存
        destroyChunk(chunk);
    }
}
// 释放块中的分配的内存
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
    if (PlatformDependent.useDirectBufferNoCleaner()) {
        PlatformDependent.freeDirectNoCleaner(chunk.memory);
    } else {
        PlatformDependent.freeDirectBuffer(chunk.memory);
    }
}

参考