在JDK7中,ConcurrentHashMap是通过分段锁机制来实现的,所谓分段(Segment)是指将一个ConcurrentHashMap拆分成多个Segment,通过保证每个Segment内是线程安全的,从而达到整个ConcurrentHashMap的线程安全。通过将锁的粒度从整个ConcurrentHashMap降为每个Segment以提升并发能力。

JDK8后,ConcurrentHashMap不再使用Segment这种结构,而是与HashMap一致,但是依旧保留分段锁的思想。原来锁的是每个Segment,现在是锁某个桶,也就是桶的第一个节点。

源码分析

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
// 什么都没做
public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

默认的构造方法什么都没有做,各个成员变量都是用默认值。
如果指定了初始容量,通过初始容量,计算了sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】。如 initialCapacity 为 10,那么得到 sizeCtl 为 16,如果 initialCapacity 为 11,得到 sizeCtl 为 32。具体这个sizeCtl是干什么的,由源码可知,它是一个控制标识符,表示哈希表的状态,分负值和非负值两种。

1
2
3
4
5
6
7
8
9
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;

现在先不管具体的取值,只需要知道,如果调用默认构造方法,sizeCtl是0,如果指定了初始容量,sizeCtl是一个与初始容量有关的2的指数幂。

put方法

由构造方法可以看出,new 完ConcurrentHashMap对象以后,没有进行真正的初始化,真正的初始化逻辑是在put逻辑中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
// 自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
...
}
}
addCount(1L, binCount);
return null;
}

先看第七行获取hash值的spread方法,它与HashMap中基本一致,将key的hashcode的高16位和低16位进行异或运算,让低16位融合了hashcode高16位和低16位的特征,不一样的地方是又与HASH_BITS进行了与运算,这一步的作用是将最高位设置为0,也就是说强制这个hash值是正数,至于为什么,下面再说。

1
2
3
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

initTable方法

真正执行初始化操作的是第13行的initTable()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 自旋
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

根据代码得知,如果当前线程发现sizeCtl小于0,说明有其他线程正在执行初始化或者扩容,那么直接让出时间片,直到tab被初始化完成后返回。如果不小于0,那么通过CAS将sc置为-1,执行初始化操作。sizeCtl为非负数时,分为0和正数的情况,0代表使用的默认构造方法,那么直接使用DEFAULT_CAPACITY作为哈希表的容量,否则使用sc(也就是线程cas成功之前的sizeCtl值)作为哈希表的容量,然后将sizeCtl设置为0.75倍哈希表的容量(sc = n - (n >>> 2))。
由此可以知道,并发场景下,只有1个线程可以成功执行初始化流程,其他线程都会自旋等待直至初始化流程完成。

根据Node类型执行put逻辑

初始化完成后,执行真正的put逻辑,与HashMap一样会根据哈希表索引位置上的Node类型进行不同的put逻辑。不同的时获取Node的方式:f = tabAt(tab, i = (n - 1) & hash)ConcurrentHashMap没有直接使用tab[i]获取i索引位置上的Node,而是通过tabAt方法,原因在于,哈希表本身(table)标注了volatile关键字保证了可见性和有序性,但是哈希表上的元素并没有,通过tabAt方法可以达到与volatile相似的语义。

Node为null

直接通过cas的方式将元素放置到当前索引位置上。

Node不为null并且hash值为-1

hash值为-1,代表此位置上的Node是ForwardingNode。这里也就说明了前面spread方法为什么要将hash值强制设为正数了,因为负数的hash值是有特殊意义的。
ForwardingNode是一种临时节点,在扩容进行中才会出现,并且它不存储实际的数据。当put方法执行到此时,会调用helpTransfer方法,从名字可以看出来,是帮助扩容的逻辑,具体逻辑等到扩容一起看。

普通put逻辑

如果不是上面两个场景,代码就会执行到前面写了省略号的地方,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}

首先,通过synchronized锁住了哈希表上的这个节点,它可能是一个链表头(next可能为null,也可能不为null)或者是红黑树的根节点。锁住了它,也就相当于锁住了整个桶(bin)。
如果当前哈希表上的Node的hash值是正的,也就是普通Node,那么很简单,待put的元素要么替换要么追加到当前桶的链表上。
如果当前哈希表上的Node是TreeBin也就是当前桶是一个红黑树,执行putTreeVal的逻辑。

addCount方法

put完元素后,还需要做的一步操作是记录当前ConcurrentHashMap有多少个元素。正常思路,我们可以通过cas或者AtomicLong(本质也是cas)保证计数的线程安全,基于cas的操作在高竞争(high contention)的情况下,吞吐量并不好。这里ConcurrentHashMap采用了类似java.util.concurrent.atomic.LongAdder的方法,基于锁分段的思想进行累加的操作。LongAdder可以参看这篇文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
private transient volatile CounterCell[] counterCells;


private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果counterCells为null,说明没有并发add,尝试通过cas增加baseCount
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 初始化CounterCell
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// 检查是否需要帮助扩容,具体逻辑参考下面
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

简单总结

  1. 当插入结束的时候,会对 size 进行加一。也会进行是否需要扩容的判断。
  2. 优先使用计数盒子(如果不是空,说明并发了),如果计数盒子是空,使用 baseCount 变量。对其加 X。
  3. 如果修改 baseCount 失败,使用计数盒子。如果此次修改失败,在fullAddCount方法死循环插入。
  4. 检查是否需要扩容。如果需要走下面的扩容逻辑。

fullAddCount的主要逻辑是:

  1. 第一次创建时,创建长度为2的CouterCell数组as。获取当前线程的随机数与1做与运算,得到0或者1。将as[0]或者as[1]进行赋值。
  2. 如果已经存在CouterCell数组as,再判断当前线程与数组长度取余后映射到的as[x]是否为null,如果是null,创建CouterCell对象并赋值;否则cas修改CouterCell对象的value值,如果依然没有成功,则对CouterCell数组进行扩容(长度*2)。重复此步骤直至成功。
  3. 在对CouterCell数组扩容或者创建CounterCell对象时,都需要抢占cellsBusy作为自旋锁。

扩容

HashMap不同的是,ConcurrentHashMap的扩容不是通过loadFactor出发的,主要是发生put逻辑的treeifyBinhelpTransferaddCount这几个方法中。

tryPresize

tryPresize主要是在treeifyBin中调用,当桶中元素个数超过8,但是哈希表长度小于64(也就是16、32)的时候tryPresize会被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// size 传进来的时候是当前哈希表长度的2倍
private final void tryPresize(int size) {
// size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 兼容putAll方法
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// 已经达到最大,不扩容
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
// 不是同一轮扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 第一个进行扩容的线程进这里
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}

static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

先看49行resizeStamp方法,这是ConcurrentHashMap里一个很重要的方法!
入参代表哈希表的长度,我们假设其是16,numberOfLeadingZeros方法得到的是16的二进制(10000)前面有多少个0,32-5=27。1 << (RESIZE_STAMP_BITS - 1)进行常量替换就是1 << 15,也就是1*215。因此rs = resizeStamp(16)的二进制是0000000000000000 1000000000011011resizeStamp方法返回了一个只跟当前哈希表长度有关的数(rs),rs的低RESIZE_STAMP_BITS(默认是16,而且好像没有修改的地方)位的最高位一定是以1开头,rs是一个包含了哈希表长度信息的数字,因为哈希表长度一定是2的指数幂,比如16的二进制是10000,32的二进制就是100000,所以numberOfLeadingZeros(n)的结果其实是可以区分不同的哈希表长度的

紧接着,如果sc>=0,也就是还没有其他线程进行扩容,那么当前线程会进入第42行的分支,将sizeCtl置为(rs << RESIZE_STAMP_SHIFT) + 2RESIZE_STAMP_SHIFT的值与刚刚计算rs的RESIZE_STAMP_BITS有关:

1
2
3
private static int RESIZE_STAMP_BITS = 16;

private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

可以认为RESIZE_STAMP_SHIFT的值就是rs高位那些填充的0的个数(上例就是16)。因此rs << RESIZE_STAMP_SHIFT就变成了1000000000011011 0000000000000000,可以看到,最高位是1,代表这是个负数而且很大,sizeCtl为负数并且不等于-1(加不加那个2都不影响它是一个很大很大的负数),代表哈希表正在进行扩容,是不是豁然开朗~
到这里,我们先结合sizeCtl的部分注释,总结下sizeCtl为负数的可能取值及其含义:

When negative, the table is being initialized or resized: -1 for initialization, else -(1 + the number of active resizing threads).

  • -1代表哈希表正在进行初始化
  • 非-1的负数,这个地方是网上很多文章写的有问题,而且非常误导的地方,其实也怪这个注解写的有问题。通过刚才的代码我们可以看到,除了-1,sizeCtl会被cas置为很大很大的负数(rs << RESIZE_STAMP_SHIFT) + 2,不可能是类似-2,-3这类很小的负数,那么-(1 + the number of active resizing threads)代表什么意思呢?其实它是指(rs << RESIZE_STAMP_SHIFT) + 2中后面加的这个2。如果我们把rs << RESIZE_STAMP_SHIFT作为一个基数,这个基数再加上xx就是(1 + the number of active resizing threads)。那么活跃的扩容线程数(the number of active resizing threads)在这就就是2 - 1 = 1了。如果RESIZE_STAMP_BITS是16的话,那就相当于sizeCtl的高16位代表的是根据当前正在扩容的哈希表扩容前的长度生成的一个stamp,而低16位表示的就是(1 + the number of active resizing threads)

接着刚才tryPresize方法说,在第42行的else if中,也就是第一个开始扩容的线程将sizeCtlcas置为(rs << RESIZE_STAMP_SHIFT) + 2就相当于表明当前有1个活跃的扩容线程开始进入transfer方法工作啦!

再看回30~40行的分支,也就是:

1
2
3
4
5
6
7
8
9
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}

进入此分支,首先需要sizeCtl为负数,代表有其他线程在初始化或扩容。这个分支就会有两个结果:跳出自旋、执行transfer(tab, nt);。这两个if判断会出现在多个地方,除了tryPresize还有addCounthelpTransfer方法中,也就是所有会发生扩容的地方。
先看第二个if判断,它很好理解,就是将参与扩容的活跃线程数+1,然后执行transfer逻辑就行了,与第一个进行本轮扩容的线程的区别在于第二个入参是否为null。
我们主要来看第一个if判断:

  • (sc >>> RESIZE_STAMP_SHIFT) != rs ,结合上面写的就很好理解,就是判断当前线程要参与扩容轮次是不是当前正在进行的一样。因为有可能当前线程在计算完rs后,让出了时间片,那一轮的扩容已经完成了并且已经开始了下一轮,当再获得时间片的时候,当前线程是不应该再继续参与扩容的。
  • sc == rs + 1 || sc == rs + MAX_RESIZERS,之所以把这两个条件放一起,是因为这两个条件应该是存在BUG的,因为sc一定是个负数,而rs一定是一个正数,所以这两个条件不可能成立。正确的写法应该是sc == rs <<< RESIZE_STAMP_SHIFT + 1 || sc == (rs <<< RESIZE_STAMP_SHIFT) + MAX_RESIZERS。那么这两个条件想要表达什么意思呢?
    1. sc == rs <<< RESIZE_STAMP_SHIFT + 1表达的是当前活跃的扩容线程数是1 - 1 = 0,没有活跃的扩容线程,没有活跃的扩容线程是指当前扩容操作都已经完成了,之所以sizeCtl还没有被置为正数是因为还有一个参与了扩容操作的线程(不一定真正执行了数据迁移的工作,也许只是调用了transfer但是没有领到任务)正在执行最后的检查操作,具体的看下面的transfer逻辑。
    2. sc == (rs <<< RESIZE_STAMP_SHIFT) + MAX_RESIZERS判断的是参与当前这一轮的扩容线程是否已经达到了最大的线程数限制MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1,注意,MAX_RESIZERS最后之所以要减1也是因为1 + the number of active resizing threads
  • (nt = nextTable) == null这个判断是获取第一个扩容线程创建好的新哈希表,如果获取不到自然也就终止参与扩容。需要注意的是ConcurrentHashMap源码中大量的使用了这种变量赋值方法,让人头大……
  • transferIndex <= 0 如果当前扩容所有的任务都被领完了,当前线程自然也不需要再参与扩容。最后的这两个判断逻辑都会在transfer的逻辑中仔细说。

transfer

transfer方法主要做两个事情:扩大哈希表长度、数据迁移。这个方法会被很多地方调用,比如刚刚的tryPresizehelpTransfer以及addCount。此方法支持多线程执行,外围调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab 参数为 null,之后再调用此方法的时候,nextTab 不会为 null。
ConcurrentHashMap是支持多个线程并发扩容的,它的思路是这样的,将哈希表分成n段,交给n个线程去并发执行扩容操作。因为扩容后,桶中的元素不是在原索引位置,就是在原索引+原容量的位置上,因此并发扩容不会产生冲突。每段包含几个桶就是步长(stride),同时需要一个全局的指针(transferIndex)来表明哪些桶已经被线程开始处理了。
第一个发起数据迁移的线程会将 transferIndex 指向原数组最后的位置,然后从后往前的 stride 个任务属于第一个线程,然后将 transferIndex 指向新的位置,再往前的 stride 个任务属于第二个线程,依此类推。当然,这里说的第二个线程不是真的一定指代了第二个线程,也可以是同一个线程。之所以从后往前,是为了防止与迭代发生冲突,迭代操作的下标是从小往大,顺序相反时,二者相遇过后,迭代没处理的都是已经transfer的hash桶,transfer没处理的,都是已经迭代的hash桶,冲突会变少。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 如果是nextTab == null,直接将nextTable初始化为当前容量的2倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}

...
}

到此为止,就完成了transfer方法的第一个工作:扩大哈希表长度。数据迁移大体的思路是线程通过cas修改transferIndex领取自己的待扩容桶区间也就是一个扩容任务,然后倒序遍历区间内每个桶完成迁移工作。由于逻辑由于分支比较多,就直接看代码的注释吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
...

int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 我觉得没有特别含义,就是用来控制流程
boolean advance = true;
// 迁移工作完成状态,如果是 true,代表本轮扩容的迁移工作结束,只有本轮扩容的最后一个线程会修改此状态。
// 注意这个单词是finishing而不是finished,很有深意哦~ 表明这个状态为true时,不代表扩容真正完成,只是数据迁移的工作完成,依然有一个线程在检查所有的桶是否都迁移完毕,此时sizeCtl对应的值应该是 rs <<< RESIZE_STAMP_SHIFT + 1
boolean finishing = false; // to ensure sweep before committing nextTab
// 死循环,i 表示下标,bound 表示当前线程可以处理的当前桶区间最小下标,每循环一次代表线程处理完了一个桶
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 这个循环就是控制 i 递减。同时,每个线程都会进入这里取得自己需要处理的桶的区间
while (advance) {
int nextIndex, nextBound;
// 第一次进入循环,-1 >=0 这个判断会无法通过,并且如果扩容没有finishing。从而走下面的 nextIndex 赋值操作(获取最新的transferIndex标),然后获取自己的桶区间。
// 之后--i >= bound为true的情况是指当前线程正在处理当前任务下的桶
if (--i >= bound || finishing)
advance = false;
// 这个判断如果为true表明本轮扩容任务已经都被领取没了。
// 这个判断在第一次执行while循环时其实在调用transfer的外层已经判断过一次了,再次判断是为了防止当前线程进入到transfer方法后,在执行到此行代码期间,本轮扩容任务已经都被领取没了。
// 之后再执行到此表明上面分支都没生效,也就是当前线程处理完了自己的桶区间,然后发现已经没有任务可以领取了。
else if ((nextIndex = transferIndex) <= 0) {
// 这个 -1 会在下面的 if 块里判断,从而使线程return
i = -1;
advance = false;
}
// 线程第一次执行此while循环或者处理完了一个任务内的最后一个桶后,会来再次尝试申请一个任务
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 申请到任务后标记自己待处理的桶区间
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}

// i < 0 这个判断生效的情况我认为只有i=-1,也就是没有任务可以领了时候才会生效。而不是网上有些文章写的:领取了最后一段区间[0,16)的线程扩容结束
// i >= n || i + n >= nextn 应该是dead code 详见最后一个参考链接
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {// 如果完成了扩容
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 表示这个线程结束帮助扩容了
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断下自己是不是本轮扩容中的最后一个线程,如果不是,则直接退出
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 如果自己是本轮扩容中的最后一个线程,那么要准备执行收尾工作了
finishing = advance = true;
// 最后一个扩容的线程要重新检查一次旧数组的所有hash桶,看是否是都被正确迁移到新数组了。
// 正常情况下,重新检查时,旧数组所有hash桶都应该是转发节点,此时这个重新检查的工作很快就会执行完。
// 特殊情况,比如扩容重叠,那么会有线程申请到了transfer任务,但是参数错误(旧数组和新数组对不上,不是2倍长度的关系),
// 此时这个线程领取的任务会作废,那么最后检查时,还要处理因为作废二没有被迁移的hash桶,把它们正确迁移到新数组中
i = n; // recheck before commit
}
}
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 出现扩容重叠,有transfer任务被作废的情况下,会执行其他分支,处理因为作废而没有被迁移的hash桶
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 这里就不展开了,跟HashMap,处理rehash流程基本没差
else {...}
}
}

参考