Netty 中的ByteBuf
对 JDK 自带的ByteBuffer
作了改进,用readerIndex
和writerIndex
分别标记读写位置,使用起来更自然,更舒坦。
PooledByteBuf
作为池化的ByteBuf
,提高了内存分配与释放的速度,同时,通过Recycler
,PooledByteBuf
也实现了实例的重用。
实例重用见 Netty 之实例重用:Recycler。
下面代码给出了PooledByteBuf
中的字段。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;
// PooledByteBuf 所属的 块
protected PoolChunk<T> chunk;
// 在 块 中的编号(低 32 位),如果是页内空间,还包括页内空间编号(高 32 位)
protected long handle;
// 块内存
protected T memory;
// 实例占用空间首地址在 块 内存中的偏移
protected int offset;
// 实例在块中实际占用的可用内存大小,只为分配空间的一部分
protected int length;
// 最大可用内存空间长度,分配的空间大小。
// 如果是块内空间,则为分配的块节点对应的容量;如果是页内空间,则是页内空间的大小
int maxLength;
// PooledByteBuf 实例关联的线程本地缓存
PoolThreadCache cache;
// ??
private ByteBuffer tmpNioBuf;
// 所属的分配器
private ByteBufAllocator allocator;
初始化PooledByteBuf
实例。包括:
chunk
,初始化所属的块;handle
,在块中的节点编号,如果是页内空间,还有页内空间编号;offset
,在块中的偏移地址,单位 byte;length
,可用空间大小;maxLenth
,最大空间大小;cache
,关联的线程本地缓存;chunk.arena.parent
。块内空间分配参见 Netty 之内存分配:Buddy 算法,页内空间分配参见 Netty 之内存分配:Slab 算法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void init(PoolChunk<T> chunk, long handle, int offset, int length,
int maxLength, PoolThreadCache cache) {
init0(chunk, handle, offset, length, maxLength, cache);
}
private void init0(PoolChunk<T> chunk, long handle, int offset, int length,
int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
// 所属的块
this.chunk = chunk;
// 初始化当前实例底层层的块内存
memory = chunk.memory;
// 初始化当前实例的分配器
allocator = chunk.arena.parent;
// 关联的线程本地缓存
this.cache = cache;
// 在块中的节点编号,如果是页内空间,还有页内空间编号
this.handle = handle;
// 在块中的偏移地址,单位 byte
this.offset = offset;
// 可用空间大小
this.length = length;
// 最大空间大小
this.maxLength = maxLength;
tmpNioBuf = null;
}
底层空间太大,块中分配不了时,会单独创建一个块,来分配空间。
1
2
3
void initUnpooled(PoolChunk<T> chunk, int length) {
init0(chunk, 0, chunk.offset, length, length, null);
}
调整实例可用空间容量,调整范围只能在[0, maxCapacity]。
调整时,该方法会尽可能的利用已分配的内存空间。扩容时,只有在分配的内存空间不够用时,才会重新去分配空间;收缩时,只在新容量不到原分配空间的一半(块节点上)或者收缩的大小超过 16 字节(页内 tiny 空间)时,才会去重新分配空间。
为了提高块中内存利用率,块空间利用率不到一半的,需要释放内存,重新分配。同样的,在 tiny 空间,收缩超过 16 字节的,也需要重新到收缩后tinySubpagePools
上对应的链表中分配页内空间。
块内空间分配参见 Netty 之内存分配:Buddy 算法,页内空间分配参见 Netty 之内存分配:Slab 算法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public final ByteBuf capacity(int newCapacity) {
// [0, maxCapacity]
checkNewCapacity(newCapacity);
// 没有池化
if (chunk.unpooled) {
if (newCapacity == length) {
// 容量不变,直接返回
return this;
}
}
// 池化
else {
// 扩容
if (newCapacity > length) {
// 当前块节点或页内空间可以容纳调整后的空间
if (newCapacity <= maxLength) {
// 直接修改可用空间容量
length = newCapacity;
// 返回当前实例
return this;
}
}
// 收缩
else if (newCapacity < length) {
// 收缩不到一半
if (newCapacity > maxLength >>> 1) {
// 页内 tiny 空间
if (maxLength <= 512) {
// 收缩后的容量没有小于当前页内空间的等级,tiny 空间每 16 字节一个级别
if (newCapacity > maxLength - 16) {
// 直接修改容量
length = newCapacity;
// 修正容量改变后的读写位置
setIndex(Math.min(readerIndex(), newCapacity),
Math.min(writerIndex(), newCapacity));
// 返回当前实例
return this;
}
}
// 块空间,或页内 small 空间
else { // > 512 (i.e. >= 1024)
length = newCapacity;
// 修正容量改变后的读写位置
setIndex(Math.min(readerIndex(), newCapacity),
Math.min(writerIndex(), newCapacity));
// 返回当前实例
return this;
}
}
}
// 容量不变
else {
// 返回当前实例
return this;
}
}
// 以上条件都不满足,重新给实例分配底层块内存
chunk.arena.reallocate(this, newCapacity, true);
// 返回当前实例
return this;
}
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
throw new IndexOutOfBoundsException(...);
}
setIndex0(readerIndex, writerIndex);
return this;
}
返回索引index
在底层块内存中的实际偏移地址。
1
2
3
4
protected final int idx(int index) {
// 首地址的块偏移加上相对偏移
return offset + index;
}
释放实例占用的底层块空间,并回收本实例。
实例回收/重用见 Netty 之实例重用:Recycler。
1
2
3
4
5
6
7
8
9
10
11
12
13
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
tmpNioBuf = null;
// 块空间回收
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
// 实例回收
recycle();
}
}
实例重用,清除各种标志。
1
2
3
4
5
6
7
8
9
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
// 设置引用
setRefCnt(1);
// 清除读写位置
setIndex0(0, 0);
// 清除标记位置
discardMarks();
}
PooledDirectByteBuf
继承了PooledByteBuf
,底层的数据载体是 JDK 自带的 ByteBuffer
,内部同样也是用了ByteBuffer
的 api 来操作数据。
1
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void getBytes(int index, byte[] dst, int dstIndex, int length, boolean internal) {
checkDstIndex(index, length, dstIndex, dst.length);
ByteBuffer tmpBuf;
if (internal) {
tmpBuf = internalNioBuffer();
} else {
tmpBuf = memory.duplicate();
}
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
tmpBuf.get(dst, dstIndex, length);
}
protected final ByteBuffer internalNioBuffer() {
ByteBuffer tmpNioBuf = this.tmpNioBuf;
if (tmpNioBuf == null) {
this.tmpNioBuf = tmpNioBuf = newInternalNioBuffer(memory);
}
return tmpNioBuf;
}
获取一个PooledDirectByteBuf
实例,重用的或新创建的。
实例回收/重用见 Netty 之实例重用:Recycler。
1
2
3
4
5
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
PooledUnsafeDirectByteBuf
继承了PooledByteBuf
,底层的数据载体是 JDK 自带的 ByteBuffer
,内部使用Unsafe
进行数据操作。
1
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
return this;
}
static void getBytes(AbstractByteBuf buf, long addr, int index, byte[] dst, int dstIndex, int length) {
buf.checkIndex(index, length);
checkNotNull(dst, "dst");
if (isOutOfBounds(dstIndex, length, dst.length)) {
throw new IndexOutOfBoundsException("dstIndex: " + dstIndex);
}
if (length != 0) {
PlatformDependent.copyMemory(addr, dst, dstIndex, length);
}
}
获取一个PooledUnsafeDirectByteBuf
实例,重用的或新创建的。实例回收/重用见 Netty 之实例重用:Recycler。
1
2
3
4
5
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}