为什么要用ConcurrentHashMap

HashMap线程不安全,而Hashtable是线程安全,但是它使用了synchronized进行方法同步,插入、读取数据都使用了synchronized,当插入数据的时候不能进行读取(相当于把整个Hashtable都锁住了,全表锁),当多线程并发的情况下,都要竞争同一把锁,导致效率极其低下。而在JDK1.5后为了改进Hashtable的痛点,ConcurrentHashMap应运而生。

ConcurrentHashMap为什么高效?

JDK1.5中的实现

ConcurrentHashMap使用的是分段锁技术,将ConcurrentHashMap将锁一段一段的存储,然后给每一段数据配一把锁(segment),当一个线程占用一把锁(segment)访问其中一段数据的时候,其他段的数据也能被其它的线程访问,默认分配16个segment。默认比Hashtable效率提高16倍。

JDK1.8中的实现

ConcurrentHashMap取消了segment分段锁,而采用CAS和synchronized来保证并发安全。数据结构跟HashMap1.8的结构一样,
数组+链表/红黑二叉树。
synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。

JDK1.8的ConcurrentHashMap的结构图如下:

JDK7的ConcurrentHashMap扩容


HashMap是线程不安全的,我们来看下线程安全的ConcurrentHashMap,在JDK7的时候,这种安全策略采用的是分段锁的机制,ConcurrentHashMap维护了一个Segment数组,Segment这个类继承了重入锁ReentrantLock,并且该类里面维护了一个
HashEntry<K,V>[]
table数组,在写操作put,remove,扩容的时候,会对Segment加锁,所以仅仅影响这个Segment,不同的Segment还是可以并发的,所以解决了线程的安全问题,同时又采用了分段锁也提升了并发的效率。

扩容的源码:
// 方法参数上的 node 是这次扩容后,需要添加到新的数组中的数据。 private void rehash(HashEntry<K,V> node)
{ HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; // 2 倍
int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity *
loadFactor); // 创建新数组 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new
HashEntry[newCapacity]; // 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制
‘000...00011111’ int sizeMask = newCapacity - 1; // 遍历原数组,老套路,将原数组位置 i 处的链表拆分到
新数组位置 i 和 i+oldCap 两个位置 for (int i = 0; i < oldCapacity ; i++) { // e 是链表的第一个元素
HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next;
// 计算应该放置在新数组中的位置, // 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 =
19 int idx = e.hash & sizeMask; if (next == null) // 该位置处只有一个元素,那比较好办
newTable[idx] = e; else { // Reuse consecutive sequence at same slot // e 是链表表头
HashEntry<K,V> lastRun = e; // idx 是当前链表的头结点 e 的新位置 int lastIdx = idx; // 下面这个
for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的 for (HashEntry<K,V> last = next;
last != null; last = last.next) { int k = last.hash & sizeMask; if (k !=
lastIdx) { lastIdx = k; lastRun = last; } } // 将 lastRun 及其之后的所有节点组成的这个链表放到
lastIdx 这个位置 newTable[lastIdx] = lastRun; // 下面的操作是处理 lastRun 之前的节点, //
这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中 for (HashEntry<K,V> p = e; p != lastRun; p =
p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n
= newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } //
将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部 int nodeIndex = node.hash & sizeMask; // add
the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node;
table = newTable; }
JDK8的ConcurrentHashMap扩容


在JDK8中彻底抛弃了JDK7的分段锁的机制,新的版本主要使用了Unsafe类的CAS自旋赋值+synchronized同步+LockSupport阻塞等手段实现的高效并发,代码可读性稍差。

ConcurrentHashMap的JDK8与JDK7版本的并发实现相比,最大的区别在于JDK8的锁粒度更细,理想情况下talbe数组元素的大小就是其支持并发的最大个数,在JDK7里面最大并发个数就是Segment的个数,默认值是16,可以通过构造函数改变一经创建不可更改,这个值就是并发的粒度,每一个segment下面管理一个table数组,加锁的时候其实锁住的是整个segment,这样设计的好处在于数组的扩容是不会影响其他的segment的,简化了并发设计,不足之处在于并发的粒度稍粗,所以在JDK8里面,去掉了分段锁,将锁的级别控制在了更细粒度的table元素级别,也就是说只需要锁住这个链表的head节点,并不会影响其他的table元素的读写,好处在于并发的粒度更细,影响更小,从而并发效率更好,但不足之处在于并发扩容的时候,由于操作的table都是同一个,不像JDK7中分段控制,所以这里需要等扩容完之后,所有的读写操作才能进行,所以扩容的效率就成为了整个并发的一个瓶颈点,好在Doug
lea大神对扩容做了优化,本来在一个线程扩容的时候,如果影响了其他线程的数据,那么其他的线程的读写操作都应该阻塞,但Doug
lea说你们闲着也是闲着,不如来一起参与扩容任务,这样人多力量大,办完事你们该干啥干啥,别浪费时间,于是在JDK8的源码里面就引入了一个ForwardingNode类,在一个线程发起扩容的时候,就会改变sizeCtl这个值,其含义如下:
sizeCtl :默认为0,用来控制table的初始化和扩容操作,具体应用在后续会体现出来。 -1 代表table正在初始化 -N
表示有N-1个线程正在进行扩容操作 其余情况: 1、如果table未初始化,表示table需要初始化的大小。
2、如果table初始化完成,表示table的容量,默认是table大小的0.75倍

扩容时候会判断这个值,如果超过阈值就要扩容,首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素f,初始化一个forwardNode实例fwd,如果f
== null,则在table中的i位置放入fwd,否则采用头插法的方式把当前旧table数组的指定任务范围的数据给迁移到新的数组中,然后
给旧table原位置赋值fwd。直到遍历过所有的节点以后就完成了复制工作,把table指向nextTable,并更新sizeCtl为新数组大小的0.75倍
,扩容完成。在此期间如果其他线程的有读写操作都会判断head节点是否为forwardNode节点,如果是就帮助扩容。
 
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n =
tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) <
MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if
(nextTab == null) { // initiating try { @SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch
(Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return;
} nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance =
true; boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int
nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if
((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if
(U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex >
stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1;
advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if
(finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>>
1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing =
advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab,
i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) ==
MOVED) advance = true; // already processed else { synchronized (f) { if
(tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n;
Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int
b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit ==
0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for
(Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V
pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn =
new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i
+ n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof
TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail =
null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for
(Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V>
p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if
((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; }
else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p;
++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new
TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc !=
0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i +
n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
 

技术
©2019-2020 Toolsou All rights reserved,
STM32的内部温度传感器实验总结。JLink、STLink、DAPLink、CMSIS DAP使用区别Linux 常用的命令vue-loader+webpack项目配置《剑指offer刷题笔记》6、重建二叉树 【c++详细题解】pycharm中安装cv2失败,及其解决数据库基础-MySql8.0(第二篇)--DML和DQLpython模拟阴阳师抽卡CSS实现loading小动画二维哈希(矩阵哈希)