Netty 之内存分配:PooledByteBufAllocator


PooledByteBufAllocator中维护了 2 个PoolArena数组:heapArenasdirectArenas

1
2
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;

每个线程会绑定一个PoolArena到它的线程本地变量PoolThreadCache中,也就是说,所有该线程中的内存分配都会使用同一个PoolArena。那怎么来决定一个线程该绑定哪个PoolArena呢?答案是最少使用原则,下面的代码可以很容易看出这一点。在初始化线程本地变量时,调用了#leastUsedArena来决定需要绑定的PoolArena。另外,该方法也可以最大限度的避免竞争,且均匀分布任务负荷。

早前版本的 Netty 使用了轮询方式来决定线程的PoolArena,该方法的缺点是随着线程频繁的创建和消亡,各PoolArena的负载越来越不均衡。而最少使用这一方式可以避免这种问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// PoolThreadLocalCache#initialValue
protected synchronized PoolThreadCache initialValue() {
    final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
    final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

    Thread current = Thread.currentThread();
    if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
        return new PoolThreadCache(heapArena, directArena, ...);
    }
    // No caching so just use 0 as sizes.
    return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
// PoolThreadLocalCache#leastUsedArena
// 返回绑定线程数最少的 PoolArena
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
    if (arenas == null || arenas.length == 0) {
        return null;
    }

    PoolArena<T> minArena = arenas[0];
    for (int i = 1; i < arenas.length; i++) {
        PoolArena<T> arena = arenas[i];
        if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
            minArena = arena;
        }
    }

    return minArena;
}

内存分配

分配直接内存的时候,从线程本地变量中拿出PoolArena,创建PooledByteBuf实例,分配并初始化其底层空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    // 容量校验
    validate(initialCapacity, maxCapacity);
    return newDirectBuffer(initialCapacity, maxCapacity);
}

protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    // 本地 PoolArena
    PoolArena<ByteBuffer> directArena = cache.directArena;

    final ByteBuf buf;
    if (directArena != null) {
        // 创建 buf,分配、初始化底层空间
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        buf = ...
    }
    // 包装一层,引用计数的 buf
    return toLeakAwareBuffer(buf);
}

内存回收

当用户或 Netty 的某段代码不需要继续使用某PooledByteBuf实例时,通过调用ReferenceCounted#release方法减少引用计数,当引用计数达到 0 的时候才会真正触发内存回收。

当需要持有某个 Netty 传给我们的PooledByteBuf实例,防止被其他线程误释放时,调用ReferenceCounted#retain方法增加引用计数;在不再需要使用的时候调用ReferenceCounted#release 方法,减少引用计数,告诉别人如果可以,就把它放了吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public boolean release() {
    return release0(1);
}

private boolean release0(int decrement) {
    // 递减引用计数
    int oldRef = refCntUpdater.getAndAdd(this, -decrement);
    if (oldRef == decrement) {
        deallocate();
        return true;
    }
    if (oldRef < decrement || oldRef - decrement > oldRef) {
        // Ensure we don't over-release, and avoid underflow.
        refCntUpdater.getAndAdd(this, decrement);
        throw new IllegalReferenceCountException(oldRef, -decrement);
    }
    return false;
}
// 回收内存空间,回收 buf 实例
protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        tmpNioBuf = null;
        // 空间回收
        chunk.arena.free(chunk, handle, maxLength, cache);
        chunk = null;
        // 实例回收
        recycle();
    }
}

参考