Netty 中FastThreadLocal
用来代替ThreadLocal
存放线程本地变量,从FastThreadLocalThread
类型的线程中访问本地变量时,比使用ThreadLocal
会有更好的性能。
FastThreadLocal
使用InternalThreadLocalMap
存放实际的数据。和ThreadLocal
实现方式类似,FastThreadLocalThread
中有一个InternalThreadLocalMap
类型的字段threadLocalMap
,这样一个线程对应一个InternalThreadLocalMap
实例,该线程下所有的线程本地变量都会放threadLocalMap
中的数组indexedVariables
中。
线程本地变量有时会简写为
TLV
,Thread Local Variables。
InternalThreadLocalMap
继承了UnpaddedInternalThreadLocalMap
。
1
2
3
4
// 普通线程时,使用 ThreadLocal 存放当前线程的 InternalThreadLocalMap 实例
static final
ThreadLocal<InternalThreadLocalMap>
slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
如果线程类型是FastThreadLocalThread
,那么直接从线程中获取字段threadLocalMap
;如果是普通线程,那么从默认的ThreadLocal
实例slowThreadLocalMap
中获取当前线程的InternalThreadLocalMap
实例。
InternalThreadLocalMap
中使用数组indexedVariables
来存放线程本地变量。构造函数在初始化时,会开辟一个 32 元素的空间,并填充UNSET
。由FastThreadLocal
全局 ID index
的分配特性,线程本地变量在数组中不一定是连续存放的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 数组,FastThreadLocalThread 类线程用来存放本地变量
Object[] indexedVariables;
// 占位符,说明该位置没有被设置过 tlv
public static final Object UNSET = new Object();
private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}
private static Object[] newIndexedVariableTable() {
// 初始容量 32
Object[] array = new Object[32];
// 填充占位符
Arrays.fill(array, UNSET);
return array;
}
下面这段代码是为了防止伪共享。通常 CPU 的缓存行一般是 64 或 128 字节,为了防止InternalThreadLocalMap
的不同实例被加载到同一个缓存行,我们需要多余填充一些字段,使得每个实例的大小超出缓存行的大小。
1
2
3
// Cache line padding (must be public)
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
静态方法@getIfSet
从当前线程中拿出InternalThreadLocalMap
实例,没有则返回null
。
1
2
3
4
5
6
7
8
9
10
public static InternalThreadLocalMap getIfSet() {
// 获取当前线程
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
// 线程为 FastThreadLocalThread 类型时,直接返回字段 threadLocalMap
return ((FastThreadLocalThread) thread).threadLocalMap();
}
// 普通线程,从默认的 ThreadLocal 中获取 InternalThreadLocalMap 实例
return slowThreadLocalMap.get();
}
静态方法@get
从当前线程中拿出InternalThreadLocalMap
实例,没有初始化一个再返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static InternalThreadLocalMap get() {
// 获取当前线程
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
// 从线程实例中直接获取,没有初始化一个再返回
return fastGet((FastThreadLocalThread) thread);
} else {
// 从 ThreadLocal 中拿,没有初始化一个再返回
return slowGet();
}
}
// 从线程实例中直接获取,没有初始化一个再返回
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
// 从线程实例中直接获取
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
// 没有初始化一个
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
// 从 ThreadLocal 中拿,没有初始化一个再返回
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap
= UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
// 从 ThreadLocal 中拿
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
// 没有初始化一个
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
返回 JVM 全局唯一递增索引,最大Integer.Max_Value
。
1
2
3
4
5
6
7
8
9
10
11
12
// in UnpaddedInternalThreadLocalMap
static final AtomicInteger nextIndex = new AtomicInteger();
// InternalThreadLocalMap@nextVariableIndex
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index < 0) {
// 悲剧,溢出了
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
从数组indexedVariables
中获取下标为index
的元素,下标越界,则返回UNSET
。
1
2
3
4
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length ? lookup[index] : UNSET;
}
在指定下标index
处存放数据,如果原先已经有数据,返回 FALSE,否则说明是第一次存放,返回 TRUE。 当index
越界时,以大于index
的最小 2 的 N 次幂扩容。
- 因为初始容量是 32,实际上就是翻倍扩容。
- 由于
index
的全局唯一性,导致在同一线程中的FastThreadLocal
实例 ID 不一定连续,因此index
越界不代表数组就没有空间了,只是这些空间不能被使用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
// 是否已经有数据
return oldValue == UNSET;
} else {
// 扩容并存放数据
expandIndexedVariableTableAndSet(index, value);
// 这种情况下,index 处肯定没有数据
return true;
}
}
// 最小 2 的 n 次幂扩容,并在 index 处存放数据
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
// 计算最小 2 的 n 次幂
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
// 扩容并拷贝原数据
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
// 无数据部分填充占位符 UNSET
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
// 存放本次数据
newArray[index] = value;
// 设置新数组
indexedVariables = newArray;
}
置数组indexVariables
指定下标index
处的数据为占位符UNSET
,并返回原数据。相当于从数组中删除了index
处的数据。
1
2
3
4
5
6
7
8
9
10
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}
每个FastThreadLocal
实例在初始化的时候都会被分配一个 JVM 全局唯一 ID:index
。在获取线程本地变量时,使用这个索引。
1
2
3
4
5
6
private static final
int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
variablesToRemoveIndex
指定用来存放FastThreadLocal
实例的集合variablesToRemove
在indexedVariables
数组中的位置。集合variablesToRemove
一般是数组第一个元素,或第一个非UNSET
元素。
获取线程本地变量,没有就初始化一个值再返回,并把本FastThreadLocal
实例加入variablesToRemoveindex
索引的variablesToRemove
集合。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public final V get() {
return get(InternalThreadLocalMap.get());
}
// 从 threadLocalMap 获取线程本地变量,threadLocalMap 必须属于当前线程
public final V get(InternalThreadLocalMap threadLocalMap) {
// 用本 FastThreadLocal 实例的 index 去 indexedVariables 数组中取数据
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
// 非占位符数据,返回
return (V) v;
}
// 初始化一个返回
return initialize(threadLocalMap);
}
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
// 调用子类实现初始化一个值或 null
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}
// 放入 indexedVariables 数组
threadLocalMap.setIndexedVariable(index, v);
// 添加本实例到 variablesToRemove 集合
addToVariablesToRemove(threadLocalMap, this);
return v;
}
// threadLocalMap 必须属于当前线程
private static void addToVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
// 从 variablesToRemoveIndex 下标处获取 variablesToRemove 集合
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
// 本线程第一次添加 tlv,创建 Set 存放本 FastThreadLocal 实例
variablesToRemove =
Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
// Set 本身存放在 variablesToRemoveIndex 指定的位置处
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
// 非首次添加 tlv
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
// 加入 FastThreadLocal 实例
variablesToRemove.add(variable);
}
// 子类可以具体实现初始值
protected V initialValue() throws Exception {
return null;
}
设置线程本地变量,并把相关联的FastThreadLocal
实例放入indexVariables
数组variablesToRemoveIndex
下标处的集合中。
当设置的变量为UNSET
时,删除线程本地变量,并把自身从variablesToRemove
集合中移除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
// 非占位符数据
set(InternalThreadLocalMap.get(), value);
} else {
remove();
}
}
// threadLocalMap 必须属于当前线程
public final void set(InternalThreadLocalMap threadLocalMap, V value) {
if (value != InternalThreadLocalMap.UNSET) { // 非占位符数据
// 在 index 出存放 value
if (threadLocalMap.setIndexedVariable(index, value)) {
// 第一次存放
addToVariablesToRemove(threadLocalMap, this);
}
} else {
// 删除*线程本地变量*,并把自身从`variablesToRemove`集合中移除
remove(threadLocalMap);
}
}
删除线程本地变量,并把自身从variablesToRemove
集合中移除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());
}
// threadLocalMap 必须属于当前线程
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
// 删除变量
Object v = threadLocalMap.removeIndexedVariable(index);
// 删除自身实例
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
// 移除通知
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
// 从 threadLocalMap 删除 FastThreadLocal 实例
private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v == InternalThreadLocalMap.UNSET || v == null) {
// 还没有初始化 variablesToRemove 集合
return;
}
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
// 删除 FastThreadLocal 实例
variablesToRemove.remove(variable);
}