JDK1.8 ConcurrentHashMap源码分析(一)
前面几篇文章分析了HashMap和LongAdder的源码,因为是逐行分析的,我想我应该是讲明白了吧。这期主要来分析ConcurrentHashMap的源码,看完了HashMap源码的朋友应该会觉得其实也就那样,把其中位运算弄明白了之后基本没什么难的地方,也就一个resize()方法在扩容的时候略微复杂一点点,和ConcurrentHashMap比起来HashMap简直是个弟弟,阅读ConcurrentHashMap源码之前需要读完HashMap和LongAdder源码,不然理解起来会很吃力,这期依旧是逐行分析源码,一起来感受一下Doug Lea写的代码的魅力。
一、基本原理
ConcurrentHashMap在1.7的底层实现是Entry数组+链表+Segment数组,Segment其实就是分段锁,每一个Segment负责加锁一个或多个桶位,加锁方式是ReentrantLock。在1.8的底层实现是Node数组+链表/红黑树,锁机制是CAS + synchronize,synchronize锁住的是桶位的头节点。关于1.7我就不去具体分析了,重点还是1.8的源码。
如下图所示,ConcurrentHashMap在结构上和HashMap并没有太多的变化,这里重点需要关注的是Forwarding这个节点,ConcurrentHashMap中维护了table和nextTable两个变量,table就是真正存放元素的数组,nextTable是用于扩容时维护的一个新数组,因为单个线程是依次转移每个桶位的数据到新的数组上去,当转移完成后就会把该桶位的头节点设置为Forwarding节点,这样做的好处是,如果有其他线程这个时候来旧数组中查询元素,发现需要查询的桶位上面是ForwardingNode,则可以直接去新数组中查询而不用等待扩容完成。
ConcurrentHashMap还有一个机制是并发扩容,当数组正在扩容时,如果有其他的线程执行put操作,会一起参与进扩容中来,这样可以让ConcurrentHashMap的性能进一步提升,具体的操作留待后续中讲解。
二、静态常量
同样的还是从类中的静态常量开始,因为这些常量都是会被后面方法反复用到的,我们需要先提前知道每个常量的含义。
/**
* Map的最大容量2^30(一般不可能达到)
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 默认初始容量16
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* table数组的最大长度
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* 默认并发级别16(可同时参与扩容的线程数量)
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 负载因子0.75
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 链表转为红黑树的阈值(链表长度)
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 红黑树退化成链表的阈值(红黑树节点个数)
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 发生树化的最小数组长度64
*/
static final int MIN_TREEIFY_CAPACITY = 64;
/**
* 扩容时每个线程负责的最小桶位数量为16
*/
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* 扩容标识戳
*/
private static int RESIZE_STAMP_BITS = 16;
/**
* 计算的结果是65535,表示最大参与扩容的线程数量
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/**
* 这个留着后面来看
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
// 元素的hash值为-1时,表示当前节点是Forwarding节点
static final int MOVED = -1;
// 元素的hash值为-2时,表示当前节点是Treebin节点,这个节点是用于代理操作红黑树的
static final int TREEBIN = -2;
// 这个暂时不管
static final int RESERVED = -3;
// 0x7fffffff -> 0111 1111 1111 1111 1111 1111 1111 1111
// 是用于按位与运算来把一个负数转为正数
static final int HASH_BITS = 0x7fffffff;
// CPU核心数
static final int NCPU = Runtime.getRuntime().availableProcessors();
三、内部基本方法
/**
* 扰动函数,用于计算hashCode和hashCode高16的异或值,
* 计算出的值和HASH_BITS进行按位与,这样可以保证得到一定是正数
*/
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
/**
* 和HashMap一样的功能,得到比c大的最小2的整数次幂
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
/**
* 获得table数组i下标元素在主存中最新的值
*/
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
/**
* cas方式修改table数组i下标元素的值
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
/**
* 修改table数组i下标元素并且立即同步至主存
*/
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
四、成员变量
/**
* 当前存储元素的数组
*/
transient volatile Node<K,V>[] table;
/**
* 扩容时的新数组,扩容结束后会赋值给table
*/
private transient volatile Node<K,V>[] nextTable;
/**
* 对ConcurrentHashMap的操作次数
*/
private transient volatile long baseCount;
/**
* sizeCtl = -1: 表示table正在初始化或扩容, sizeCtl = -1时表示正在初始化
* sizeCtl < -1: 高16位表示扩容戳,低16位 = -(1 + n),n为参与扩容的线程数量
* sizeCtl = 0: 表示table未初始化并且未传入初始化大小
* sizeCtl > 0: 下一次的扩容阈值或table未初始化但制定了初始化大小
*/
private transient volatile int sizeCtl;
/**
* 扩容时下一次需要迁移的桶位下标
*/
private transient volatile int transferIndex;
/**
* 锁变量,用于对CounterCell[]初始化或扩容时加锁
*/
private transient volatile int cellsBusy;
/**
* 和LongAdder中的cell[]是一个东西
*/
private transient volatile CounterCell[] counterCells;
五、构造方法
五个构造方法,其中细节会略微有差异,简单阅读一下即可。
/**
* 创建一个map,table数组默认大小16
*/
public ConcurrentHashMap() {
}
/**
* 指定初始化大小为initialCapacity
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 判断initialCapacity值是否超出最大限制
// 使用tableSizeFor()方法时入参和HashMap有区别
// HashMap的入参就是initialCapacity
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
/**
* 使用ConcurrentHashMap容纳另一个map的所有元素
*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
// 先尝试以16来容纳
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
/**
* 指定初始化大小、负载因子
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
/**
* 指定初始化大小、负载因子、并发级别
*/
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 初始化大小必须大于等于并发级别
if (initialCapacity < concurrencyLevel)
initialCapacity = concurrencyLevel;
// 根据负载因子重新计算初始大小
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
// 使用tableSizeFor()得到2的整数次幂
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
// 将最后计算得出的数组初始化大小赋值给sizeCtl
this.sizeCtl = cap;
}
以上是ConcurrentHashMap中需要提前了解的常量、变量、方法,下一期会分析重点方法put()。