抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

摘要:本文学习了能在高并发场景下使用的线程安全集合。

环境

Windows 10 企业版 LTSC 21H2
Java 1.8

1 集合中的线程安全问题

1.1 List

示例:

java
1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}

结果:

log
1
Exception in thread "1" Exception in thread "8" java.util.ConcurrentModificationException

说明出现了并发修改异常。

1.2 Set

示例:

java
1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
Set<String> set = new HashSet<>();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(set);
}, String.valueOf(i)).start();
}
}

结果:

log
1
Exception in thread "1" Exception in thread "8" java.util.ConcurrentModificationException

说明出现了并发修改异常。

1.3 Map

示例:

java
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
Map<String, String> map = new HashMap<>();
for (int i = 0; i < 10; i++) {
String key = String.valueOf(i);
new Thread(() -> {
map.put(key, UUID.randomUUID().toString().substring(0, 8));
System.out.println(map);
}, String.valueOf(i)).start();
}
}

结果:

log
1
Exception in thread "1" Exception in thread "8" java.util.ConcurrentModificationException

说明出现了并发修改异常。

2 线程安全的集合

JUC下有关Collection的类:

  • CopyOnWriteArrayList:实现了List接口,相当于线程安全的ArrayList。
  • CopyOnWriteArraySet:继承于AbstractSet类,实现了Set接口,相当于线程安全的HashSet。通过CopyOnWriteArrayList实现。
  • ConcurrentSkipListSet:继承于AbstractSet类,实现了NavigableSet接口,相当于线程安全的TreeSet。通过ConcurrentSkipListMap实现。
  • ConcurrentLinkedQueue:继承于AbstractQueue类,实现了Queue接口,是单向链表实现的线程安全的无界队列,该队列支持FIFO方式操作元素。
  • ConcurrentLinkedDeque:继承于AbstractCollection类,实现了Deque接口,是双向链表实现的线程安全的无界队列,该队列支持FIFO和FILO方式操作元素,相当于线程安全的LinkedList。
  • ArrayBlockingQueue:实现了BlockingQueue接口,是数组实现的线程安全的有界阻塞队列。
  • LinkedBlockingQueue:实现了BlockingQueue接口,是单向链表实现的线程安全的无界阻塞队列,该队列支持FIFO方式操作元素。
  • LinkedBlockingDeque:实现了BlockingDeque接口,是双向链表实现的线程安全的无界阻塞队列,该队列支持FIFO和FILO方式操作元素。
  • SynchronousQueue:实现了BlockingQueue接口,是不存储元素的阻塞队列,put操作必须等待take操作,否则不能添加元素并产生中断抛出异常。
  • DelayQueue:实现了BlockingQueue接口,是支持延迟获取的阻塞队列。

JUC下有关Map的类:

  • ConcurrentHashMap:继承于AbstractMap类,相当于线程安全的HashMap,是线程安全的哈希表。使用数组加链表加红黑树结构和CAS操作实现。
  • ConcurrentSkipListMap:继承于AbstractMap类,相当于线程安全的TreeMap,是线程安全的有序的哈希表。通过跳表实现的。

Collections工具类下的方法:

  • synchronizedList()方法:将普通的List转换为线程安全的List,通过synchronized同步锁保证对集合的访问是线程安全的。
  • synchronizedSet()方法:将普通的Set转换为线程安全的Set,通过synchronized同步锁保证对集合的访问是线程安全的。
  • synchronizedMap()方法:将普通的Map转换为线程安全的Map,通过synchronized同步锁保证对集合的访问是线程安全的。

3 典型分析

2.1 CopyOnWriteArrayList

2.1.1 说明

CopyOnWriteArrayList使用写时复制技术,其内部有个volatile修饰的数组用于保存数据,在操作数据时会创建新数组,并将更新后的数据拷贝到新数组中,最后再将该数组赋值给原数组。

通过volatile关键字保证数据修改时的可见性,通过在操作数据前使用Lock互斥锁来保护数据。所以涉及到修改数据的操作,CopyOnWriteArrayList效率很低。

使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。迭代器支持hasNext()方法和next()方法等不可变操作,但不支持add()方法和remove()方法等可变操作。

2.1.2 源码

2.1.2.1 设置元素

设置元素:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E set(int index, E element) {
// 使用锁来保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获得array指向的引用地址
Object[] elements = getArray();
// 获取指定位置的旧元素
E oldValue = get(elements, index);
// 如果旧元素的引用和新元素的引用不同
if (oldValue != element) {
// 创建新的数组并拷贝array数组的值,替换新数组指定位置的元素
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len);
newElements[index] = element;
// 将array的引用地址指向新的数组
setArray(newElements);
} else {
// 为了确保volatile的语义,任何读操作都应该是写操作的结构,可以去掉
setArray(elements);
}
return oldValue;
} finally {
lock.unlock();
}
}
2.1.2.2 添加元素

添加元素:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void add(int index, E element) {
// 使用锁来保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获得array指向的引用地址
Object[] elements = getArray();
int len = elements.length;
// 如果指定位置越界,则抛出异常
if (index > len || index < 0)
throw new IndexOutOfBoundsException("Index: "+index+", Size: "+len);
Object[] newElements;
// 如果插入位置是末尾
int numMoved = len - index;
if (numMoved == 0)
// 将原数组进行拷贝并扩大一个容量
newElements = Arrays.copyOf(elements, len + 1);
else {
// 如果不是插入到末尾,则创建扩大一个容量的数组
newElements = new Object[len + 1];
// 分段复制原数组,并空出指定位置
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index, newElements, index + 1, numMoved);
}
// 设置指定位置的指定元素
newElements[index] = element;
// 将array引用的地址指向新的数组
setArray(newElements);
} finally {
lock.unlock();
}
}
2.1.2.3 删除元素

删除元素:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public E remove(int index) {
// 使用锁来保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获得array指向的引用地址
Object[] elements = getArray();
int len = elements.length;
// 根据指定的位置获取元素
E oldValue = get(elements, index);
// 如果指定的元素是最后一个元素
int numMoved = len - index - 1;
if (numMoved == 0)
// 将原数组进行拷贝截取并将array的引用地址指向新的数组
setArray(Arrays.copyOf(elements, len - 1));
else {
// 如果不是最后一个元素,则创建减少一个容量的数组
Object[] newElements = new Object[len - 1];
// 分段复制原数组,并空出指定位置
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index, numMoved);
// 将array的引用地址指向新的数组
setArray(newElements);
}
// 返回该位置上的元素
return oldValue;
} finally {
lock.unlock();
}
}

2.2 ConcurrentHashMap

2.2.1 说明

在JDK1.8之前,ConcurrentHashMap使用分段锁机制实现,其最大并发度受Segment的个数限制。

在JDK1.8之后,ConcurrentHashMap使用与HashMap类似的数组配合链表或红黑树的方式实现,加锁采用CAS自旋锁和synchronized可重入锁等机制实现。

2.2.2 属性

使用sizeCtl属性表示标志控制符,这个参数非常重要,出现在ConcurrentHashMap的各个阶段,不同取值的含义:

  • 负数:表示正在进行初始化或扩容操作:
    • -1表示正在进行初始化操作。
    • -N表示正在进行扩容操作:高16位是扩容标识,与数组长度有关,最高位固定为1,低16位减1表示扩容线程数。
  • 0:表示数组还未初始化。
  • 正数:表示下一次进行扩容的大小,类似于扩容阈值。它的值始终是当前容量的0.75倍,如果数组节点个数大于等于sizeCtl,则进行扩容。

内部类Node中的hash属性:

  • 负数:-1表示该节点为转发节点,-2表示该节点为红黑树节点。
  • 正数:表示根据key计算得到的hash值。

2.2.3 源码

2.2.3.1 构造方法

需要说明的是,在构造方法里并没有对集合进行初始化操作,而是等到了添加节点的时候才进行初始化,属于懒汉式的加载方式。

另外,loadFactor参数在JDK1.8中不再有加载因子的意义,仅为了兼容以前的版本,加载因子默认为0.75并通过移位运算计算,不支持修改。

同样,concurrencyLevel参数在JDK1.8中不再有多线程运行的并发度的意义,仅为了兼容以前的版本。

构造方法:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 空参构造器
public ConcurrentHashMap() {
}

// 指定初始容量的构造器
public ConcurrentHashMap(int initialCapacity) {
// 参数有效性判断
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 提供多余空间,避免初始化后马上扩容,防止初始容量为0,保证最小容量为2
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// 设置标志控制符
this.sizeCtl = cap;
}

// 指定初始容量,加载因子的构造器
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

// 指定初始容量,加载因子,并发度的构造器
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
// 参数有效性判断
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 比较初始容量和并发度的大小,取最大值作为初始容量
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
// 计算最大容量
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
// 设置标志控制符
this.sizeCtl = cap;
}

// 包含指定Map集合的构造器
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
// 设置标志控制符
this.sizeCtl = DEFAULT_CAPACITY;
// 放置指定的集合
putAll(m);
}
2.2.3.2 初始化方法

集合并不会在构造方法里进行初始化,而是在用到集合的时候才进行初始化,在初始化的同时会设置集合的阈值sizeCtl。

在初始化的过程中为了保证线程安全,总共使用了两步操作:

  1. 通过CAS原子更新方法将sizeCtl设置为-1,保证只有一个线程执行初始化。
  2. 线程获取初始化权限后内部进行二次判断,保证只有在未初始化的情况下才能执行初始化。

初始化集合数组,使用CAS原子更新保证线程安全,使用volatile保证顺序和可见性:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 死循环以完成初始化
while ((tab = table) == null || tab.length == 0) {
// 如果sizeCtl小于0则表示正在初始化,当前线程让出CPU
if ((sc = sizeCtl) < 0)
Thread.yield();
// 如果需要初始化,并且使用CAS原子更新成功
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 第一个线程初始化之后,第二个线程还会进来所以需要重新判断,类似于线程同步的二次判断
if ((tab = table) == null || tab.length == 0) {
// 如果没有指定容量则使用默认容量16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
// 初始化一个指定容量的节点数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 将节点数组指向集合数组
table = tab = nt;
// 扩容阀值,获取容量的0.75倍的值,写法更高端比直接乘高效
sc = n - (n >>> 2);
}
} finally {
// 将sizeCtl的值设为阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}
2.2.3.3 添加节点

添加节点的代码逻辑:

  1. 校验数据,判断传入一个key和value是否为空,如果为空就直接报错。ConcurrentHashMap是不可为空的,HashMap是可以为空的。
  2. 初始化数组,判断数组是否为空,如果为空就执行初始化方法。
  3. 插入或更新节点,如果数组插入位置的节点为空就通过CAS操作插入节点,如果数组正在扩容就执行协助扩容方法,如果产生哈希碰撞就找到节点并更新节点或者插入节点。
  4. 插入节点后,链表节点判断是否要转为红黑树节点,并且需要增加容量并判断是否需要扩容。

添加节点:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 排除null的数据
if (key == null || value == null) throw new NullPointerException();
// 计算hash,右移16位并同自身异或,高16位不变,低16位增加高16位数据,最后还要保证返回正数
int hash = spread(key.hashCode());
// 节点个数。0表示未加入新结点,2表示树节点或链表节点的节点数,其它值表示链表节点的节点数。主要用于加入节点后,判断是否要由链表转为红黑树
int binCount = 0;
// CAS自旋
for (Node<K,V>[] tab = table;;) {
// 声明节点、数组长度、对应的数组下标、节点的hash值
Node<K,V> f; int n, i, fh;
// 如果没有初始化则进行初始化。除非构造时指定集合,否则默认构造不初始化,添加时检查是否初始化,属于懒汉式初始化
if (tab == null || (n = tab.length) == 0)
// 初始化数组
tab = initTable();
// 如果已经初始化了,并且位置上的节点为null
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 使用CAS操作插入节点
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
// 添加成功,跳出循环
break;
}
// 如果位置上的节点不为null,并且节点的hash为-1,表示转发节点,说明数组正在扩容
else if ((fh = f.hash) == MOVED)
// 协助扩容
tab = helpTransfer(tab, f);
// 产生哈希碰撞,并且没有扩容
else {
V oldVal = null;
// 锁住节点
synchronized (f) {
// 获取节点,判断节点是否发生了变化
if (tabAt(tab, i) == f) {
// 判断是否是链表节点,大于等于0表示链表节点,-1表示转发节点,-2表示红黑树节点
if (fh >= 0) {
// 标记链表节点,表示链表节点个数
binCount = 1;
// 循环添加节点到链表,同时节点个数自增
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
// 传入参数onlyIfAbsent为false表示找到节点则进行替换
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 遍历到尾节点,没有找到节点,添加并作为尾节点
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 如果是红黑树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
// 标记红黑树节点
binCount = 2;
// 尝试添加,如果返回原值非空
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
// 替换节点
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 如果添加到了链表节点,需要进一步判断是否需要转为红黑树
if (binCount != 0) {
// 如果链表上的节点数大于等于8
if (binCount >= TREEIFY_THRESHOLD)
// 尝试转为红黑树
treeifyBin(tab, i);
if (oldVal != null)
// 返回原值
return oldVal;
break;
}
}
}
// 数组增加容量并判断是否要扩容
addCount(1L, binCount);
return null;
}
2.2.3.4 修改容量

修改容量时,使用的是分散热点机制,借鉴了LongAdder类的add()方法以及Striped64类的longAccumulato()方法。

修改容量后需要判断是否要执行扩容方法,支持多个线程同时扩容。

修改容量:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
private final void addCount(long x, int check) {
// x是要增加的容量。check是插入前节点个数,0表示未加入新结点,2表示树节点或链表节点的节点数,其它值表示链表节点的节点数
CounterCell[] as; long b, s;
// 如果counterCells不为null,说明产生了并发,继续更新容量
// 如果counterCells为null,使用baseCount增加容量,如果对baseCount增加失败,说明产生了并发,继续更新容量
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果counterCells为null,说明对baseCount增加失败,继续通过fullAddCount()方法更新容量
// 如果counterCells不为null,但是counterCells的个数小于1,说明需要初始化counterCells数组,继续通过fullAddCount()方法更新容量
// 如果counterCells不为null,并且counterCells的个数大于等于1,但是通过线程的probe值和counterCells数组长度减一相与得到的counterCell为空,说明线程对应counterCells数组上的位置没有counterCell,继续通过fullAddCount()方法更新容量
// 如果counterCells不为null,并且counterCells的个数大于等于1,并且线程对应counterCells数组上的位置有counterCell,但是用counterCell位置中的值增加容量失败,表示更新容量失败,继续通过fullAddCount()方法更新容量
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 继续更新容量,原理是热点分散机制,类似Striped64类的longAccumulato()方法
fullAddCount(x, uncontended);
return;
}
// 执行到这一步,说明通过counterCell位置中的值增加容量成功,如果插入前节点个数小于等于1则不考虑扩容直接返回
if (check <= 1)
return;
// 计算数组节点个数
s = sumCount();
}
// 说明通过baseCount增加容量成功,如果check的值大于等于0,需要检查是否要扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果节点个数大于等于阈值,并且数组不为空,并且数组长度小于最大值,执行扩容方法。循环判断,防止多线程同时扩容跳过if判断
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
// 将长度的二进制位左边连续0的个数同1左移15位进行或运算
int rs = resizeStamp(n);
// 如果sizeCtl小于0说明有其他线程正在扩容
// 这里数组已经完成了初始化,每个线程首次判断后都先进入else语句
if (sc < 0) {
// 如果sizeCtl右移16位后不等于rs,说明已经扩容完成
// 如果sizeCtl右移16位后等于rs加1,说明已经扩容完成
// 如果sizeCtl右移16位后等于rs加低16位全为1的数字,说明已经扩容完成
// 如果扩容数组是null,说明已经扩容完成
// 如果转移索引小于等于0,说明已经扩容完成,无法再分配任务
if ((sc >>> RESIZE_STAMP_SHIFT) != rs ||
sc == rs + 1 ||// 此处应为(sc >>> RESIZE_STAMP_SHIFT) == rs + 1
sc == rs + MAX_RESIZERS ||// 此处应为(sc >>> RESIZE_STAMP_SHIFT) == rs + MAX_RESIZERS
(nt = nextTable) == null ||
transferIndex <= 0)
// 跳出循环
break;
// 协助扩容,协助扩容时增加sizeCtl的值,结束扩容时减少sizeCtl的值
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 扩容
transfer(tab, nt);
}
// 如果sizeCtl大于或等于0,说明第一次扩容,使用CAS设置sizeCtl为rs左移16位后加2的数字,此时sizeCtl为负数
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
// 进行扩容操作
transfer(tab, null);
// 计算数组节点个数
s = sumCount();
}
}
}
2.2.3.5 协助扩容

协助扩容的代码逻辑:

  1. 判断集合数组不为空,并且节点是转发节点,并且转发节点的子节点不为空,如果不成立则不需要扩容。
  2. 循环判断是否扩容成功,如果没有就进行协助扩容,并增加sizeCtl的值,扩容结束后会减少sizeCtrl的值。

协助扩容:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 如果数组不为null,并且节点是转发节点,并且转发节点的子节点不为null
if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 将长度的二进制位左边连续0的个数同1左移15位进行或运算
int rs = resizeStamp(tab.length);
// 如果新数组没有被修改,并且原数组也没有被修改,并且sizeCtl小于0说明还在扩容
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
// 如果sizeCtl右移16位后不等于rs,说明已经扩容完成
// 如果sizeCtl右移16位后等于rs加1,说明已经扩容完成
// 如果sizeCtl右移16位后等于rs加低16位全为1的数字,说明已经扩容完成
// 如果转移索引小于等于0,说明已经扩容完成,无法再分配任务
if ((sc >>> RESIZE_STAMP_SHIFT) != rs ||
sc == rs + 1 ||// 此处应为(sc >>> RESIZE_STAMP_SHIFT) == rs + 1
sc == rs + MAX_RESIZERS ||// 此处应为(sc >>> RESIZE_STAMP_SHIFT) == rs + MAX_RESIZERS
transferIndex <= 0)
// 跳出循环
break;
// 协助扩容,协助扩容时增加sizeCtl的值,结束扩容时减少sizeCtl的值
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 扩容
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
2.2.3.6 扩容方法

扩容方法的核心思路是根据CPU内核数将原数组分成多个区间,每个线程都领取一个区间,对区间内的每个根节点进行扩容:

  • 计算区间长度,根据CPU核心数平均分配给每个CPU相同大小的区间,如果不够16个,默认就是16个。
  • 创建扩容后数组,有且只能由一个线程构建一个新数组。
  • 双层循环完成扩容,外层for循环处理区间节点,内层使用while循环处理扩容区间。
  • 处理区间节点时,判断如果完成扩容则返回跳出for循环,否则判断节点状态。如果是空节点和处理过的节点则设置为转发节点,否则对要扩容的节点使用高低节点进行拆分。
  • 处理扩容区间时,线程会循环获取所有待处理区间,直至完成扩容,修改while循环标志位跳出循环。
  • 当线程完成扩容后,会返回线程并减少sizeCtl的值,但最后一个线程返回时,更新数组和阈值。

扩容方法:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 根据CPU个数找出扩容时的每个线程处理的区间长度,最小是16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE;
// 表示第一次扩容,因为在addCount()方法中,第一次扩容的时候传入的nextTab的值是null
if (nextTab == null) {
try {
// 创建扩容后的节点数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
// 将创建的数组赋值给新数组
nextTab = nt;
} catch (Throwable ex) {
// 扩容失败,设置sizeCtl为最大值
sizeCtl = Integer.MAX_VALUE;
return;
}
// 将新数组赋值给扩容数组
nextTable = nextTab;
// 转移索引为数组长度,说明区间是逆序迁移,从高位向低位迁移
transferIndex = n;
}
// 设置扩容后的容量
int nextn = nextTab.length;
// 创建一个转发节点,表示节点已处理,转发节点的hash默认为-1
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 如果是false,需要已获取区间,如果是true,说明需要重新获取区间
boolean advance = true;
// 如果是true,完成扩容,如果是false,继续扩容
boolean finishing = false;
// for循环处理区间节点,i表示上界,bound表示下界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// while循环获取扩容区间
while (advance) {
int nextIndex, nextBound;
// 如果上界自减后大于等于下界,说明当前区间还未处理完,跳出while循环,继续处理当前区间
// 如果上界自减后小于下界,并且已完成扩容,跳出while循环,扩容结束
// 如果上界自减后小于下界,并且未完成扩容,继续获取下个区间
if (--i >= bound || finishing)
// 跳出while循环
advance = false;
// 继续获取下个区间,如果下个区间的上界小于等于0,跳出while循环,扩容结束
else if ((nextIndex = transferIndex) <= 0) {
// 设置区间上界为-1
i = -1;
// 跳出while循环
advance = false;
}
// 如果将转移索引修改为下个区间的下界成功,跳出while循环,继续处理下个区间
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
// 设置处理区间的下界
bound = nextBound;
// 设置处理区间的上届
i = nextIndex - 1;
// 跳出while循环
advance = false;
}
}
// while循环结束,判读是否完成扩容
// 如果上界小于0,说明扩容结束
// 如果上界大于等于原容量,表示超过下标最大值,说明扩容结束
// 如果上界加上原容量大于等于新容量,表示超过下标最大值,说明扩容结束
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果完成扩容,执行操作
if (finishing) {
// 删除成员变量
nextTable = null;
// 更新集合数组
table = nextTab;
// 更新阈值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 执行到此,说明完成扩容,将sizeCtl减1,线程会在协助扩容前将sizeCtl加1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 如果不是最后一个扩容线程,当前线程返回
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 如果是最后一个扩容线程,扩容结束,但是会再次进入while循环检查一次
finishing = advance = true;
// 再次while循环检查一下整张表,并更新数组和阈值
i = n;
}
}
// 判断节点是否存在,如果节点是null,则设为转发节点并进入while循环
else if ((f = tabAt(tab, i)) == null)
// 设为转发节点并进入while循环,判断区间是否完成扩容
advance = casTabAt(tab, i, null, fwd);
// 判断节点是否需要扩容,如果节点是转发节点,则进入while循环
else if ((fh = f.hash) == MOVED)
// 进入while循环,判断区间是否完成扩容
advance = true;
// 对节点扩容,节点不是转发节点
else {
// 对这个节点上锁,防止扩容节点时其他线程对该节点修改
synchronized (f) {
// 二次校验上界下标处的节点
if (tabAt(tab, i) == f) {
// 声明高位节点和低位节点
Node<K,V> ln, hn;
// 判断是否是链表节点,大于等于0表示链表节点,-1表示转发节点,-2表示红黑树节点
if (fh >= 0) {
// 计算数组节点高低值,将原容量同节点的hash值进行与运算,判断将该节点放到高位还是低位
int runBit = fh & n;
// 定义尾节点
Node<K,V> lastRun = f;
// 遍历这个节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
// 计算节点高低值,判断将该节点放到高位还是低位
int b = p.hash & n;
// 如果该节点的高低值和数组节点的高低值不同
if (b != runBit) {
// 更新高低值
runBit = b;
// 更新尾节点
lastRun = p;
}
}
// 如果最后更新的高低值是0,设置低位节点
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 如果最后更新的高低值是1,设置高位节点
else {
hn = lastRun;
ln = null;
}
// 再次循环,生成两个链表,尾节点作为停止条件,避免无谓的循环
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 如果与运算结果是0,那么创建低位节点,倒序插入
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
// 如果与运算结果是1,那么创建高位节点,倒序插入
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 设置低位链表,放在新数组的上界位置
setTabAt(nextTab, i, ln);
// 设置高位链表,放在新数组的上界加原长度位置
setTabAt(nextTab, i + n, hn);
// 将旧的链表设置成转发节点
setTabAt(tab, i, fwd);
// 继续处理区间的下一个节点
advance = true;
}
// 如果是红黑树节点
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
// 与运算结果为0的放在低位
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
// 与运算结果为1的放在高位
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果树的节点数小于等于6,那么转成链表,反之,创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
// 如果树的节点数小于等于6,那么转成链表,反之,创建一个新的树
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
// 设置低位树,放在新数组的i位置
setTabAt(nextTab, i, ln);
// 设置高位数,放在新数组的i+n位置
setTabAt(nextTab, i + n, hn);
// 将旧的树设置成转发节点
setTabAt(tab, i, fwd);
// 继续处理区间的下一个节点
advance = true;
}
}
}
}
}
}
2.2.3.7 获取方法

根据指定的键,返回对应的键值对,由于是读操作,所以不涉及到并发问题,步骤如下:

  1. 判断key对应数组位置上的节点是否为空,为空则返回空表示没有找到,不为空则继续判断。
  2. 如果位置节点是转发节点或者红黑树节点,执行相应节点的查询方法,如果位置节点是链表节点,遍历链表节点并查询。

获取节点:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算hash值
int h = spread(key.hashCode());
// 如果数组不为空,并且数组长度大于0,并且数组位置上的节点不为空
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
// 如果hash相等,继续判断
if ((eh = e.hash) == h) {
// 如果找到了节点则返回值
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果正在扩容或者是树节点,执行各自节点的查询方法
else if (eh < 0)
// 尝试查找节点
return (p = e.find(h, key)) != null ? p.val : null;
// 如果位置节点的子节点不为空,则遍历节点查找
while ((e = e.next) != null) {
if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
2.2.3.8 删除节点

删除节点可以看成是替换操作。

删除节点:

java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
final V replaceNode(Object key, V value, Object cv) {
// 计算hash
int hash = spread(key.hashCode());
// CAS自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 如果数组是空,或者数组长度是0,或者数组位置上的节点是空
if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null)
// 跳出循环
break;
// 如果位置上的节点不为null,并且节点的hash为-1,表示转发节点,说明数组正在扩容
else if ((fh = f.hash) == MOVED)
// 协助扩容
tab = helpTransfer(tab, f);
// 产生哈希碰撞,并且没有扩容
else {
V oldVal = null;
// 是否进入了同步代码
boolean validated = false;
// 锁住节点
synchronized (f) {
// 保证位置节点没有被修改
if (tabAt(tab, i) == f) {
// 如果是链表节点
if (fh >= 0) {
validated = true;
// 循环查找指定节点
for (Node<K,V> e = f, pred = null;;) {
K ek;
// 找到节点
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
V ev = e.val;
// 如果传入的原值为空,或者原值和位置节点的值相同,更新或者删除节点
if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {
oldVal = ev;
// 如果新值不为空表示替换
if (value != null)
e.val = value;
// 新值是空表示删除,如果上一节点为空,表示删除位置节点
else if (pred != null)
pred.next = e.next;
// 新值是空表示删除,如果上一节点不为空,表示删除非位置节点
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
// 如果没有找到
if ((e = e.next) == null)
// 跳出循环
break;
}
}
// 如果是红黑树节点
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
// 找到节点
if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
// 如果传入的原值为空,或者原值和位置节点的值相同,更新或者删除节点
if (cv == null || cv == pv || (pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
// 如果进入了同步代码
if (validated) {
// 如果更新或者删除了节点
if (oldVal != null) {
// 如果是删除操作
if (value == null)
// 将数组长度减一
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
2.2.3.9 统计容量

ConcurrentHashMap中baseCount用于保存数组中节点个数,但是并不准确,因为多线程同时增删改,会导致baseCount修改失败,此时会将节点个数存储于counterCells数组内。

当需要统计节点个数的时候,除了要统计baseCount之外,还需要加上counterCells中的每个counterCell的值。

值得一提的是即使如此,统计出来的依旧不是当前数组中节点的准确值,在多线程环境下统计前后并不能暂停线程操作,因此无法保证准确性。

统计集合容量:

java
1
2
3
4
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}

统计节点数量,即baseCount和counterCells元素个数的总和:

java
1
2
3
4
5
6
7
8
9
10
11
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

评论