PooledByteBufAllocator
中维护了 2 个PoolArena
数组:heapArenas
和directArenas
。
1
2
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;
每个线程会绑定一个PoolArena
到它的线程本地变量PoolThreadCache
中,也就是说,所有该线程中的内存分配都会使用同一个PoolArena
。那怎么来决定一个线程该绑定哪个PoolArena
呢?答案是最少使用原则,下面的代码可以很容易看出这一点。在初始化线程本地变量时,调用了#leastUsedArena
来决定需要绑定的PoolArena
。另外,该方法也可以最大限度的避免竞争,且均匀分布任务负荷。
早前版本的 Netty 使用了轮询方式来决定线程的
PoolArena
,该方法的缺点是随着线程频繁的创建和消亡,各PoolArena
的负载越来越不均衡。而最少使用这一方式可以避免这种问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// PoolThreadLocalCache#initialValue
protected synchronized PoolThreadCache initialValue() {
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
return new PoolThreadCache(heapArena, directArena, ...);
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
// PoolThreadLocalCache#leastUsedArena
// 返回绑定线程数最少的 PoolArena
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
分配直接内存的时候,从线程本地变量中拿出PoolArena
,创建PooledByteBuf
实例,分配并初始化其底层空间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
// 容量校验
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
// 本地 PoolArena
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
// 创建 buf,分配、初始化底层空间
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = ...
}
// 包装一层,引用计数的 buf
return toLeakAwareBuffer(buf);
}
当用户或 Netty 的某段代码不需要继续使用某PooledByteBuf
实例时,通过调用ReferenceCounted#release
方法减少引用计数,当引用计数达到 0 的时候才会真正触发内存回收。
当需要持有某个 Netty 传给我们的PooledByteBuf
实例,防止被其他线程误释放时,调用ReferenceCounted#retain
方法增加引用计数;在不再需要使用的时候调用ReferenceCounted#release
方法,减少引用计数,告诉别人如果可以,就把它放了吧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public boolean release() {
return release0(1);
}
private boolean release0(int decrement) {
// 递减引用计数
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
if (oldRef == decrement) {
deallocate();
return true;
}
if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don't over-release, and avoid underflow.
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, -decrement);
}
return false;
}
// 回收内存空间,回收 buf 实例
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
tmpNioBuf = null;
// 空间回收
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
// 实例回收
recycle();
}
}