以下术语对理解本文很重要。
page
,页,最小内存分配单元;pageSize
,页大小,默认为 8K;chunk
,块,页的集合;chunkSize
,块大小,;maxOrder
,完全二叉树的深度。首先我们开辟一个大小为chunkSize
的字节数组,当请求分配一个大小为size
的 ByteBuf 时,我们返回数组中第一个大小合适且未分配的连续空间首地址,并把这段连续空间标记为不可用。为此,我们构造一棵完全二叉树,并把节点的分配信息存放在数组memoryMap
中。
- 简单起见,所有的
size
都会被规范化,也就是转换成不小于size
的 2 的 N 次幂。
我们把块
分成地址连续的个页
,二叉树的叶节点对应一个页
。树的节点分布如下,括号内为该层节点所代表的空间大小,深度越大,代表的空间越小:
1
2
3
4
5
6
7
depth=0 // 1 个节点 (chunkSize/2^0,即 chunkSize)
depth=1 // 2 个节点 (chunkSize/2^1)
..
..
depth=d // 2^d 个节点 (chunkSize/2^d)
..
depth=maxOrder // 2^maxOrder 个节点 (chunkSize/2^maxOrder = pageSize)
比如,根节点能分配的最大空间为整个块,深度为1的节点能分配的最大空间为,深度为2的节点能分配最大空间为,以此类推,叶节点能分配的空间为,也就是只能分配1页。深度越小,能分配的空间越大。
有了这棵搜索树之后,如果要分配一个大小为的连续空间时,我们只要在深度为 k 的层上,从左到右查找空闲节点就可以了。
我们把完全二叉树的节点从上到下、从左到右编号:,共个节点。memoryMap
中的元素存放节点的分配状态信息。的含义是:编号id
的节点能分配的最大空间在深度为x
的层上,或者,id
节点能分配的最小深度在x
。随着节点的分配与释放,我们也会动态的去更新数组中的信息。memoryMap
中的元素会被初始化为每个节点的深度,即,其能分配的最大空间为自身代表的大小。
节点有三个状态:空闲、部分分配、不可用。
memoryMap[id]
层上节点代表的空间大小;
depth_of_id
:节点id
的深度。
在深度为 d 的层,分配空闲节点并返回节点编号。在 d 层找到空闲节点后,设置节点状态为不可用,回溯更新父节点分配状态,直到根节点。
块的可分配容量不够,直接返回 -1。
只要
块
的根节点能分配的最小深度小于等于 d,肯定能在 d 层找到空闲节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private int allocateNode(int d) {
int id = 1;
// has last d bits = 0 and rest all = 1
// 比如 d=5,initial = 0xFF FF FF 11100000
int initial = - (1 << d);
// 从根节点开始
byte val = value(id);
if (val > d) {
// 根节点下面第一个能分配的子节点在 d 之下,显然容量不够分配,GG
return -1;
}
// 至此,我们能保证在深度为 d 的层上有空闲节点
// 下面从上到下,从左到右,查找 d 层空闲节点
// 对于深度小于 d 的层,id & initial == 0
// 对于深度等于 d 的层,id & initial == 1 << d
// 对于深度大于 d 的层,id & initial > 1 << d
// 我们只处理 能分配的空间大于 d 层节点的节点 或者 d 层之上的节点
while (val < d || (id & initial) == 0) {
// 左节点
id <<= 1;
// 左节点能分配的最小深度
val = value(id);
// memoryMap[id] > d,说明节点可分配空间不够,换右节点试试
if (val > d) {
// 右节点
id ^= 1;
// 右节点能分配的最小深度
val = value(id);
}
}
// 深度为 d,编号为 id 的节点,就是我们要找的
byte value = value(id);
// 设置 id 不可用
setValue(id, unusable); // mark as unusable
// 更新父节点分配信息,一直到根节点
updateParentsAlloc(id);
// 返回 编号 id
return id;
}
// 更新父节点分配信息,一直到根节点
private void updateParentsAlloc(int id) {
while (id > 1) {
// 父节点
int parentId = id >>> 1;
// 自身可分配节点深度
byte val1 = value(id);
// 兄弟可分配节点深度
byte val2 = value(id ^ 1);
// 取 min(val1, val2) 作为父节点可分配最小深度
byte val = val1 < val2 ? val1 : val2;
// 设置父节点可分配节点深度
setValue(parentId, val);
// 继续往上走,总会到根节点的
id = parentId;
}
}
private byte value(int id) {
return memoryMap[id];
}
private void setValue(int id, byte val) {
memoryMap[id] = val;
}
分配大小为normCapacity
的连续空间,normCapacity
为规范化后的数值。
pageSize
为 8k 时,pageShifts
为 13。如果要分配 16k 的空间,那么深度。很合理,很好很强大。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private long allocateRun(int normCapacity) {
// 计算要分配的节点深度
int d = maxOrder - (log2(normCapacity) - pageShifts);
// 在深度 d 分配空闲节点,并返回节点 id
int id = allocateNode(d);
if (id < 0) {
// 块容量不够,GG
return id;
}
// 更新剩余空间大小,normCapacity
freeBytes -= runLength(id);
return id;
}
private int runLength(int id) {
// 节点 id 代表的以 byte 为单位的空间大小
return 1 << log2ChunkSize - depth(id);
}
初始化PooledByteBuf
实例对应的底层内存空间。handle
包含页内空间编号和相应的叶节点编号,reqCapacity
为请求容量。
方法#initBufWithSubpage
参见Netty 之内存分配:Slab 算法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
// 块节点编号
int memoryMapIdx = memoryMapIdx(handle);
// 页内空间编号
int bitmapIdx = bitmapIdx(handle);
// 普通空间
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
// 初始化 buf 底层空间
buf.init(
this, // 所属的块
handle, // 块节点编号
// 块偏移(offset) + 页偏移
runOffset(memoryMapIdx) + offset,
reqCapacity, // 实际占用的空间大小
runLength(memoryMapIdx), // 分配的空间大小
rena.parent.threadCache() // 线程本地缓存
);
}
// 页内空间
else {
// 初始化 buf 对应的页内空间
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
}
}
分配请求参数normCapacity
指定大小的空间,并返回分配的代表块节点编号和页内空间编号的handle
值。
1
2
3
4
5
6
7
8
9
10
long allocate(int normCapacity) {
// 块节点空间分配
if ((normCapacity & subpageOverflowMask) != 0) {
return allocateRun(normCapacity);
}
// 页内空间分配
else {
return allocateSubpage(normCapacity);
}
}
释放编号为handle
的节点,修改节点状态为空闲,回溯更新父节点分配状态,直到根节点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void free(long handle) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
// 此处省略页内空间释放逻辑
// 增加可用空间大小
freeBytes += runLength(memoryMapIdx);
// 设置节点 memoryMapIdx 为空闲节点
setValue(memoryMapIdx, depth(memoryMapIdx));
// 更新父节点分配信息,直到根节点
updateParentsFree(memoryMapIdx);
}
// 更新父节点分配信息,直到根节点
private void updateParentsFree(int id) {
int logChild = depth(id) + 1;
while (id > 1) {
// 父节点
int parentId = id >>> 1;
// 自身可分配节点深度
byte val1 = value(id);
// 兄弟可分配节点深度
byte val2 = value(id ^ 1);
// 咱兄弟俩所处的深度
logChild -= 1;
// 咱兄弟俩都已空闲,更新父节点状态为空闲,也就是更新为它自己的深度
if (val1 == logChild && val2 == logChild) {
setValue(parentId, (byte) (logChild - 1));
}
// 否则
else {
// 取 min(val1, val2) 作为父节点可分配最小深度
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
}
// 继续往上走,总会到根节点的
id = parentId;
}
}
// 取低 32 位
private static int memoryMapIdx(long handle) {
return (int) handle;
}
// 取高 32 位
private static int bitmapIdx(long handle) {
return (int) (handle >>> Integer.SIZE);
}