前言
JUC 高并发容器是基于非阻塞算法(或者无锁编程算法)实现的容器类,无锁编程(Lock Free)算法主要通过 CAS(Compare And Swap)+volatile 组合实现,通过 CAS 保障操作的原子性,通过volatile 保障变量的内存的可见性。无锁编程(Lock Free)算法的主要优点:
(1)开销较?。翰恍枰谀诤颂陀没淝谢唤?。
(2)读写不互斥:只有写操作需要使用基于 CAS 机制的乐观锁,读读操作之间可以不用互斥。
1.高并发容器分类
JUC 包中提供了 List、Set、Queue、Map 各种类型的高并发容器,如 ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList 和 CopyOnWriteArraySet。
在性能上,ConcurrentHashMap 通常优于同步的 HashMap,ConcurrentSkipListMap 通常优于同步
的 TreeMap。当读取和遍历操作远远大于列表的更新操作时,CopyOnWriteArrayList 优于同步的ArrayList。
1.1 List
JUC 包中高并发 List 主要有 CopyOnWriteArrayList,对应的基础容器为 ArrayList。
CopyOnWriteArrayList 相当于线程安全的 ArrayList,它实现了 List 接口。在读多写少的场景中,其性能远远高于 ArrayList 的同步包装容器。
1.2 Set
JUC 包中 Set 主要有 CopyOnWriteArraySet、ConcurrentSkipListSet。
- CopyOnWriteArraySet 继承于 AbstractSet 类,对应的基础容器为 HashSet。其内部组合了一个 CopyOnWriteArrayList 对象,它是核心操作是基于 CopyOnWriteArrayList 实现的。
- ConcurrentSkipListSet 是线程安全的有序的集合,对应的基础容器为 TreeSet。它继承于
AbstractSet,并实现了NavigableSet接口。ConcurrentSkipListSet是通过ConcurrentSkipListMap
实现的。
1.3 Map
JUC 包中 Map 主要有 ConcurrentHashMap、ConcurrentSkipListMap。
- ConcurrentHashMap 对应的基础容器为 HashMap。JDK7 中 ConcurrentHashMap 采用一种更加细粒度的“分段锁(Segment)”加锁机制,JDK8 中采用 CAS 无锁算法。
- ConcurrentSkipListMap 对应的基础容器为 TreeMap。其内部的 Skip List(跳表)结构是一种可以代替平衡树的数据结构,默认是按照 Key 值升序的。
1.4 Queue
JUC 包中 Queue 的实现类包括三类:单向队列、双向队列、阻塞队列。
- ConcurrentLinkedQueue 是一个基于列表实现的单向队列,按照 FIFO(先进先出)原则对元素进行排序。新元素从队列尾部插入,而获取队列元素,则需要从队列头部获取。
- ConcurrentLinkedDeque 是基于链表的双向队列,但是该队列不允许 null 元素。作为双端队列,ConcurrentLinkedDeque 可以当作“栈”来使用,并且高效地支持并发环境。
除了提供普通的单向、双向队列,JUC 拓展了 Queue,增加了可阻塞的插入和获取等操作,提供了一组阻塞队列,具体如下: - ArrayBlockingQueue:基于数组实现的可阻塞的 FIFO 队列
- LinkedBlockingQueue:基于链表实现的可阻塞的 FIFO 队列
- PriorityBlockingQueue:按优先级排序的队列
- DelayQueue:按照元素的 delay 时间进行排序的队列
- SynchronousQueue:无缓冲等待队列
2. CopyOnWriteArrayList分析
在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此如果每次读取都进行加锁操作,其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读操作是线程安全的。
写时复制(CopyOnWrite,简称 COW)思想是计算机程序设计领域中的一种优化策略。其核心思想是,如果有多个 Accessor(访问器)访问一个资源(如内存或者是磁盘上的数据存储)时,他们会共同获取相同的指针指向相同的资源,只要有一个(修改器)需要修改该资源,系统会复制一份专用 Private Copy(副本)给该 Mutator,而其他 Accessor 所见到的最初的资源仍然保持不变,修改的过程对其他的 Accessor 都是透明的(transparently)。COW 主要的优点是如果没有修改器(mutator)去修改资源,就不会有副本被创建,因此多个 Accessor 可以共享同一份资源。
2.1 CopyOnWriteArrayList 的使用
在不使用CopyOnWriteArrayList 的情况下代码如下:
public class WithoutCopyOnWriteArrayListTest {
public static class ConcurrentTarget implements Runnable {
//并发操作的目标队列
List<String> targetList = null;
public ConcurrentTarget(List<String> targetList) {
this.targetList = targetList;
}
@Override
public void run() {
Iterator<String> iterator = targetList.iterator();
//迭代操作
while (iterator.hasNext()) {
// 在迭代操作时,进行列表的修改
String threadName = Thread.currentThread().getName();
System.out.println("开始往同步队列加入线程名称:" + threadName);
targetList.add(threadName);
}
}
//测试同步队列:在迭代操作时,进行列表的修改
public static void main(String[] args) {
List<String> notSafeList = Arrays.asList("a", "b", "c");
List<String> synList = Collections.synchronizedList(notSafeList);
//创建一个执行目标
ConcurrentTarget synchronizedListListDemo =
new ConcurrentTarget(synList);
//10 个线程并发
for (int i = 0; i < 10; i++) {
new Thread(synchronizedListListDemo , "线程" + i).start();
}
//主线程等待
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
运行代码会报如下错误:
java.lang.UnsupportedOperationException
at java.util.AbstractList.add(AbstractList.java:148)
at java.util.AbstractList.add(AbstractList.java:108)
at java.util.Collections$SynchronizedCollection.add(Collections.java:2035)
at com.ymj.study.code10_juc_container.CopyOnWriteArrayListTest$ConcurrentTarget.run(CopyOnWriteArrayListTest.java:33)
at java.lang.Thread.run(Thread.java:748)
这个时候可使用 CopyOnWriteArrayList 替代 Collections.synchronizedList同步包装实例,具体的代码如下:
public class CopyOnWriteArrayListTest {
public static class ConcurrentTarget implements Runnable {
//并发操作的目标队列
List<String> targetList = null;
public ConcurrentTarget(List<String> targetList) {
this.targetList = targetList;
}
@Override
public void run() {
Iterator<String> iterator = targetList.iterator();
//迭代操作
while (iterator.hasNext()) {
// 在迭代操作时,进行列表的修改
String threadName = Thread.currentThread().getName();
System.out.println("开始往同步队列加入线程名称:" + threadName);
targetList.add(threadName);
}
}
}
public static void main(String[] args) {
List<String> notSafeList = Arrays.asList("a", "b", "c");
//创建一个 CopyOnWriteArrayList 队列
List<String> copyOnWriteArrayList = new CopyOnWriteArrayList();
copyOnWriteArrayList.addAll(notSafeList);
//并发执行目标
ConcurrentTarget copyOnWriteArrayListDemo =
new ConcurrentTarget(copyOnWriteArrayList);
for (int i = 0; i < 10; i++) {
new Thread(copyOnWriteArrayListDemo, "线程" + i).start();
}
//主线程等待
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行之后发现UnsupportedOperationException 异常没有了。也就是说,使用CopyOnWriteArrayList 容器,可以在进行元素迭代的同时,又要进行元素添加操作。
2.2 CopyOnWriteArrayList 原理
所谓 CopyOnWrite(写时复制):就是在修改器(mutator)对一块内存进行修改时,不直接在原有内存块上进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后,再将原来的指针(或者引用)指向新的内存,原来的内存被回收。
CopyOnWriteArrayList 是写时复制思想的一种典型实现: 其含有一个指向操作内存的内部指针 array,而可变操作(add、set 等)是在 array 数组的副本上进行的。当元素需要被修改或者增加的时候,并不直接在 array 指向的原有数组上操作,而是首先对 array 进行一次拷贝,将修改的内容写入拷贝副本中。写完之后,再将内部指针 array 指向新的副本,这样就可以确保修改操作不会影响访问器(accessor)的读取操作了。CopyOnWriteArrayList 的原理,如图所示:
CopyOnWriteArrayList 核心成员如下:
public class CopyOnWriteArrayList<E>
implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
private static final long serialVersionUID = 8673264195747942595L;
/** The lock protecting all mutators */
/**
* 对所有的修改器(mutator)方法进行?;ぃ梦势鳎╝ccessor)方法并不需要?;? */
final transient ReentrantLock lock = new ReentrantLock();
/** The array, accessed only via getArray/setArray. */
/**
* 内部对象数组,通过 getArray/setArray 方法去访问
*/
private transient volatile Object[] array;
/**
* Gets the array. Non-private so as to also be accessible
* from CopyOnWriteArraySet class.
*/
/**
* 获取内部对象数组
*/
final Object[] getArray() {
return array;
}
/**
* Sets the array.
*/
/**
* 设置内部对象数组
*/
final void setArray(Object[] a) {
array = a;
}
}
2.3 CopyOnWriteArrayList 读取操作
访问器(accessor)的读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。
/** 操作内存的引用*/
private transient volatile Object[] array;
public E get(int index) {
return get(getArray(), index);
}
//获取元素
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
//返回操作内存
final Object[] getArray() {
return array;
}
2.4 CopyOnWriteArrayList 写入操作
CopyOnWriteArrayList 的写入操作 add( )方法在执行的时候加了独占锁以确保只能有一个线程进行写入操作,避免多线程写的时候会 copy 出多个副本。
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 拷贝新数组
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
// 释放锁
lock.unlock();
}
}
从 add 操作可以看出,在每次进行添加操作的时候,CopyOnWriteArrayList 底层都是重新 copy了一份数组,再往新的数组中添加数组,待添加完了,再将新的 array 引用指向新的数组。当 add操作完成后,array 的引用就已经指向另一个存储空间了。
那么既然每次添加元素的时候,都会重新复制一份新的数组,那就带来了一个问题,就是增加了内存的开销,如果容器的写操作比较频繁,那么其开销就比较大。所以,在实际应用的时候,CopyOnWriteArrayList 并不适合做添加操作。但是如果在并发场景下,迭代操作比较频繁,那CopyOnWriteArrayList 是个不错的选择。
2.5 CopyOnWriteArrayList 的迭代器实现
CopyOnWriteArray 有自己的迭代器,该迭代器不会检查修改状态,也无需检查状态。为什么呢?因为被迭代的 array 数组是可以说是只读的,不会有其他线程能够修改它。
static final class COWIterator<E> implements ListIterator<E> {
/** Snapshot of the array */
/**对象数组的快照(snapshot)*/
private final Object[] snapshot;
/** Index of element to be returned by subsequent call to next. */
private int cursor;
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}
public boolean hasNext() {
return cursor < snapshot.length;
}
public boolean hasPrevious() {
return cursor > 0;
}
@SuppressWarnings("unchecked")
//下一个元素
public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
@SuppressWarnings("unchecked")
public E previous() {
if (! hasPrevious())
throw new NoSuchElementException();
return (E) snapshot[--cursor];
}
public int nextIndex() {
return cursor;
}
public int previousIndex() {
return cursor-1;
}
/**
* Not supported. Always throws UnsupportedOperationException.
* @throws UnsupportedOperationException always; {@code remove}
* is not supported by this iterator.
*/
public void remove() {
throw new UnsupportedOperationException();
}
/**
* Not supported. Always throws UnsupportedOperationException.
* @throws UnsupportedOperationException always; {@code set}
* is not supported by this iterator.
*/
public void set(E e) {
throw new UnsupportedOperationException();
}
/**
* Not supported. Always throws UnsupportedOperationException.
* @throws UnsupportedOperationException always; {@code add}
* is not supported by this iterator.
*/
public void add(E e) {
throw new UnsupportedOperationException();
}
@Override
public void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
Object[] elements = snapshot;
final int size = elements.length;
for (int i = cursor; i < size; i++) {
@SuppressWarnings("unchecked") E e = (E) elements[i];
action.accept(e);
}
cursor = size;
}
}
迭代器的 snapshot(快照)成员,会在构造迭代器的时候,使用 CopyOnWriteArrayList 的 array成员去初始化,具体如下:
//获取迭代器
public Iterator<E> iterator() {
return new COWIterator<E>(getArray(), 0);
}
//返回操作内存
final Object[] getArray() {
return array;
}
2.6 CopyOnWriteArrayList总结
CopyOnWriteArrayList 的优点
CopyOnWriteArrayList 有一个显著的优点,那就是读取、遍历操作不需要同步,速度会非??臁K?,CopyOnWriteArrayList 适用于读操作多、写操作相对较少的场景("读多写少"),比如可以在进行“黑名单”拦截时使用 CopyOnWriteArrayList。
CopyOnWriteArrayList 和 ReentrantReadWriteLock 的比较
CopyOnWriteArrayList 和 ReentrantReadWriteLock 读写锁的思想非常类似,读写锁的思想是读读共享、写写互斥、读写互斥、写读互斥。但是 CopyOnWriteArrayList 相比读写锁的又更进一步:为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,而且写入也不会阻塞读取操作,只有写入和写入之间需要进行同步等待,读操作的性能得到大幅度提升。
3 BlockingQueue分析
在 Java8 中,提供了 7 个阻塞队列
阻塞队列 | 介绍 |
---|---|
ArrayBlockingQueue | 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则对元素进行排序。 |
LinkedBlockingQueue | 链表实现的有界阻塞队列, 此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序 |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序 |
DelayQueue | 优先级队列实现的无界阻塞队列 |
SynchronousQueue | 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。 |
LinkedTransferQueue | 链表实现的无界阻塞队列 |
LinkedBlockingDeque | 链表实现的双向阻塞队列 |
3.1 阻塞队列的操作方法
在阻塞队列中,提供了四种处理方式:
1. 插入操作
- add(e) :添加元素到队列中,如果队列满了,继续插入元素会报错,IllegalStateException。
- offer(e): 添加元素到队列,同时会返回元素是否插入成功的状态,如果成功则返回 true
- put(e) :当阻塞队列满了以后,生产者继续通过 put添加元素,队列会一直阻塞生产者线程,知道队列可用
-
offer(e,time,unit) :当阻塞队列满了以后继续添加元素,生产者线程会被阻塞指定时间,如果超时,则线程直接退出
2. 移除操作 - remove():当队列为空时,调用 remove 会返回 false,如果元素移除成功,则返回 true
- poll(): 当队列中存在元素,则从队列中取出一个元素,如果队列为空,则直接返回 null
- take():基于阻塞的方式获取队列中的元素,如果队列为空,则 take 方法会一直阻塞,直到队列中有新的数据可以消费
- poll(time,unit):带超时机制的获取数据,如果队列为空,则会等待指定的时间再去获取元素返回
3.2 ArrayBlockingQueue 原理分析
3.2.1 构造方法
ArrayBlockingQueue 提供了三个构造方法,分别如下:
capacity: 表示数组的长度,也就是队列的长度。
fair:表示是否为公平的阻塞队列,默认情况下构造的是非公平的阻塞队列。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//重入锁,出队和入队持有这一把锁
lock = new ReentrantLock(fair);
//初始化非空等待队列
notEmpty = lock.newCondition();
//初始化非满等待队列
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
3.2.2 Add方法
以 add 方法作为入口,在 add 方法中会调用父类的 add 方法,也就是 AbstractQueue.如果看源码看得比较多的话,一般这种写法都是调用父类的模版方法来解决通用性问题
public boolean add(E e) {
return super.add(e);
}
// 从父类的 add 方法可以看到,这里做了一个队列是否满了的判断,如果队列满了直接抛出一个异常
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
3.2.2.1 offer 方法
add 方法最终还是调用 offer 方法来添加数据,返回一个添加成功或者失败的布尔值反馈。
这段代码做了几个事情:
- 判断添加的数据是否为空
- 添加重入锁
- 判断队列长度,如果队列长度等于数组长度,表示满了直接返回 false
- 否则,直接调用 enqueue 将元素添加到队列中
public boolean offer(E e) {
//对请求数据做判断
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
3.2.2.2 enqueue方法
这个是最核心的逻辑,方法内部通过 putIndex 索引直接将元素添加到数组 items
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//通过 putIndex 对数据赋值
items[putIndex] = x;
// 当putIndex 等于数组长度时,将 putIndex 重置为 0
if (++putIndex == items.length)
putIndex = 0;
count++; //记录队列元素的个数
//唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素
notEmpty.signal();
}
putIndex 为什么会在等于数组长度的时候重新设置为 0?
因为 ArrayBlockingQueue 是一个 FIFO 的队列,队列添加元素时,是从队尾获取 putIndex 来存储元素,当 putIndex等于数组长度时,下次就需要从数组头部开始添加了。
下面这个图模拟了添加到不同长度的元素时,putIndex 的变化,当 putIndex 等于数组长度时,不可能让 putIndex 继续累加,否则会超出数组初始化的容量大小。同时还需要思考两个问题:
- 当元素满了以后是无法继续添加的,因为会报错
-
其次,队列中的元素肯定会有一个消费者线程通过 take或者其他方法来获取数据,而获取数据的同时元素也会从队列中移除
3.2.3 put方法
put 方法和 add 方法功能一样,差异是 put 方法如果队列满了,会阻塞。这个在最开始的时候说过。接下来看一下
它的实现逻辑:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
/**
* 这个也是获得锁,但是和 lock 的区别是,这个方法优先允许在等待时由其他线程调
* 用等待线程的 interrupt 方法来中断等待直接返回。而 lock
* 方法是尝试获得锁成功后才响应中断
*/
lock.lockInterruptibly();
try {
while (count == items.length)
//队列满了的情况下,当前线程将会被 notFull 条件对象挂起加到等待队列中
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
3.2.4 take方法
take 方法是一种阻塞获取队列中元素的方法它的实现原理很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入 notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put 线程添加了数据,那么 put 操作将会唤醒 take 线程,执行 take 操作。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
//如果队列为空的情况下,直接通过 await 方法阻塞
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
如果队列中添加了元素,那么这个时候,会在 enqueue 中调用 notempty.signal 唤醒 take 线程来获得元素
3.2.4.1 dequeue 方法
这个是出队列的方法,主要是删除队列头部的元素并发返回给客户端,takeIndex,是用来记录拿数据的索引值
/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//默认获取 0 位置的元素
E x = (E) items[takeIndex];
//将该位置的元素设置为空
items[takeIndex] = null;
//这里的作用也是一样,如果拿到数组的最大值,那么重置为 0,继续从头部位置开始获取数据
if (++takeIndex == items.length)
takeIndex = 0;
//记录 元素个数递减
count--;
if (itrs != null)
//同时更新迭代器中的元素数据
itrs.elementDequeued();
//触发 因为队列满了以后导致的被阻塞的线程
notFull.signal();
return x;
}
3.2.4.2 itrs.elementDequeued();
ArrayBlockingQueue 中,实现了迭代器的功能,也就是可以通过迭代器来遍历阻塞队列中的元素
/**
* Called whenever an element has been dequeued (at takeIndex).
*/
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
queueIsEmpty();
else if (takeIndex == 0)
takeIndexWrapped();
}
}
itrs.elementDequeued() 是用来更新迭代器中的元素数据的
3.2.4 remove方法
remove 方法是移除一个指定元素??纯此氖迪执?/p>
public boolean remove(Object o) {
if (o == null) return false;
//获取数组元素
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
//获得锁
lock.lock();
try {
//如果队列不为空
if (count > 0) {
//获取下一个要添加元素时的索引
final int putIndex = this.putIndex;
//获取当前要被移除的元素的索引
int i = takeIndex;
do {
//从takeIndex 下标开始,找到要被删除的元素
if (o.equals(items[i])) {
//移除指定元素
removeAt(i);
//返回执行结果
return true;
}
//当前删除索引执行加 1 后判断是否与数组长度相等
//若为 true,说明索引已到数组尽头,将 i 设置为 0
if (++i == items.length)
i = 0;
} while (i != putIndex); //继续查找,直到找到最后一个元素
}
return false;
} finally {
lock.unlock();
}
}
4 BlockingDeque分析
BlockingDeque定义了一个阻塞的双端队列接口,如下所示
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
void putFirst(E e) throws InterruptedException;
void putLast(E e) throws InterruptedException;
E takeFirst() throws InterruptedException;
E takeLast() throws InterruptedException;
// ...
}
该接口继承了BlockingQueue接口,同时增加了对应的双端队列操作接口。该接口只有一个实现,就是LinkedBlockingDeque。
其核心数据结构如下所示,是一个双向链表。
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements
BlockingDeque<E>, java.io.Serializable {
static final class Node<E> {
E item; Node<E> prev; // 双向链表的Node Node<E> next;
Node(E x) {
item = x;
}
}
transient Node<E> first; // 队列的头和尾
transient Node<E> last;
private transient int count; // 元素个数
private final int capacity; // 容量
// 一把锁+两个条件
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.netCondition();
private final Condition notFull = lock.newCondition();
// ...
}
对应的实现原理,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表。
5 ConcurrentLinkedQueue/Deque
AQS内部的阻塞队列实现原理:基于双向链表,通过对head/tail进行CAS操作,实现入队和出队。ConcurrentLinkedQueue 的实现原理和AQS 内部的阻塞队列类似:同样是基于 CAS,同样是通过head/tail指针记录队列头部和尾部,但还是有稍许差别。
首先,它是一个单向链表,定义如下:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements
Queue<E>, java.io.Serializable {
private static class Node<E> {
volatile E item;
volatile Node<E> next;
//...
}
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
//...
}
其次,在AQS的阻塞队列中,每次入队后,tail一定后移一个位置;每次出队,head一定后移一个位置,以保证head指向队列头部,tail指向链表尾部。但在ConcurrentLinkedQueue中,head/tail的更新可能落后于节点的入队和出队,因为它不是直接对 head/tail指针进行 CAS操作的,而是对 Node中的 item进行操作。下面进行详细分析:
5.1 初始化
初始的时候, head 和 tail 都指向一个 null 节点。对应的代码如下。
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
5.2 入队列
代码如下所示:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
// 对tail的next指针而不是对tail指针执行CAS操作
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
// 每入队两个节点后移一次tail指针
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
// 已经到达队列尾部
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 后移p指针
p = (p != t && t != (t = tail)) ? t : q;
}
}
上面的入队其实是每次在队尾追加2个节点时,才移动一次tail节点。如下图所示:
初始的时候,队列中有1个节点item1,tail指向该节点,假设线程1要入队item2节点:
step1: p=tail,q=p.next=NULL.
step2:对p的next执行CAS操作,追加item2,成功之后,p=tail。所以上面的casTail方法不会执
行,直接返回。此时tail指针没有变化。
之后,假设线程2要入队item3节点,如下图所示:
step3: p=tail,q=p.next.
step4:q!=NULL,因此不会入队新节点。p,q都后移1位。
step5:q=NULL,对p的next执行CAS操作,入队item3节点。
step6:p!=t,满足条件,执行上面的casTail操作,tail后移2个位置,到达队列尾部。
总结出以下关键点:
- 即使tail指针没有移动,只要对p的next指针成功进行CAS操作,就算成功入队列。
- 只有当 p != tail的时候,才会后移tail指针。也就是说,每连续追加2个节点,才后移1次tail指针。即使CAS失败也没关系,可以由下1个线程来移动tail指针。
5.3 出队列
上面说了入队列之后,tail指针不变化,那是否会出现入队列之后,要出队列却没有元素可出的情况呢?
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 出队列的时候,并没有移动head指针,而是把item设置为null
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
// 每产生2个null节点,才把head指针后移两位
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
出队列的代码和入队列类似,也有p、q2个指针,整个变化过程如图5-8所示。假设初始的时候head指向空节点,队列中有item1、item2、item3 三个节点。
step1:p=head,q=p.next.p!=q.
step2:后移p指针,使得p=q。
step3:出队列。关键点:此处并没有直接删除item1节点,只是把该节点的item通过CAS操作置为了NULL。
step4:p!=head,此时队列中有了2个 NULL 节点,再前移1次head指针,对其执行updateHead操作。
总结:
- 出队列的判断并非观察 tail 指针的位置,而是依赖于 head 指针后续的节点是否为NULL这一条件。
- 只要对节点的item执行CAS操作,置为NULL成功,则出队列成功。即使head指针没有成功移动,也可以由下1个线程继续完成。
5.4 队列判空
因为head/tail 并不是精确地指向队列头部和尾部,所以不能简单地通过比较 head/tail 指针来判断队列是否为空,而是需要从head指针开始遍历,找第1个不为NULL的节点。如果找到,则队列不为空;如果找不到,则队列为空。代码如下所示:
public boolean isEmpty() {
// 寻找第一个不是null的节点
return first() == null;
}
Node<E> first() {
restartFromHead:
for (;;) {
// 从head指针开始遍历,查找第一个不是null的节点
for (Node<E> h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
6. ConcurrentHashMap解析
Java并发编程之并发容器ConcurrentHashMap详解
7. ConcurrentSkipListMap/Set
ConcurrentHashMap 是一种 key 无序的 HashMap,ConcurrentSkipListMap则是 key 有序的,实现了NavigableMap接口,此接口又继承了SortedMap接口。
7.1 ConcurrentSkipListMap
7.1.1 为什么要使用SkipList实现Map?
在Java的util包中,有一个非线程安全的HashMap,也就是TreeMap,是key有序的,基于红黑树实现。
而在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的。这里为什么不用红黑树,而用跳查表来实现呢?
借用Doug Lea的原话:
The reason is that there are no known efficient lock0free insertion and deletion algorithms for search trees.
也就是目前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法。
那为什么SkipList可以无锁地实现节点的增加、删除呢?这要从无锁链表的实现说起。
7.1.2 无锁链表
之前讲的无锁队列、栈,都是只在队头、队尾进行CAS操作,通常不会有问题。如果在链表的中间进行插入或删除操作,按照通常的CAS做法,就会出现问题!
操作1:在节点10后面插入节点20。如下图所示,首先把节点20的next指针指向节点30,然后对节点10的next指针执行CAS操作,使其指向节点20即可。
操作2:删除节点10。如下图所示,只需把头节点的next指针,进行CAS操作到节点30即可。
但是,如果两个线程同时操作,一个删除节点10,一个要在节点10后面插入节点20。并且这两个操作都各自是CAS的,此时就会出现问题。如下图所示,删除节点10,会同时把新插入的节点20也删除掉!这个问题超出了CAS的解决范围。
为什么会出现这个问题呢
原因: 在删除节点10的时候,实际受到操作的是节点10的前驱,也就是头节点。节点10本身没有任何变化。故而,再往节点10后插入节点20的线程,并不知道节点10已经被删除了!
针对这个问题,在论文中提出了如下的解决办法,如下图所示,把节点 10 的删除分为两2步:
第一步,把节点10的next指针,mark成删除,即软删除;
第二步,找机会,物理删除。
做标记之后,当线程再往节点10后面插入节点20的时候,便可以先进行判断,节点10是否已经被删除,从而避免在一个删除的节点10后面插入节点20。这个解决方法有一个关键点:“把节点10的next指针指向节点20(插入操作)”和“判断节点10本身是否已经删除(判断操作)”,必须是原子的,必须在1 个CAS操作里面完成!
具体的实现有两个办法:
办法一:AtomicMarkableReference
保证每个 next 是 AtomicMarkableReference 类型。但这个办法不够高效,Doug Lea 在ConcurrentSkipListMap的实现中用了另一种办法。
办法2:Mark节点
我们的目的是标记节点10已经删除,也就是标记它的next字段。那么可以新造一个marker节点,使节点10的next指针指向该Marker节点。这样,当向节点10的后面插入节点20的时候,就可以在插入的同时判断节点10的next指针是否指向了一个Marker节点,这两个操作可以在一个CAS操作里面完成。
7.1.3 跳查表
解决了无锁链表的插入或删除问题,也就解决了跳查表的一个关键问题。因为跳查表就是多层链表叠起来的
下面先看一下跳查表的数据结构:
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K, V> next;
/**
* Creates a new regular node.
*/
Node(K key, Object value, Node<K, V> next) {
this.key = key;
this.value = value;
this.next = next;
}
//...
}
上图中的Node就是跳查表底层节点类型。所有的<K, V>对都是由这个单向链表串起来的。
上面的Index层的节点:
static class Index<K,V> {
final Node<K, V> node;
final Index<K, V> down;
volatile Index<K, V> right;
/**
* Creates index node with given values.
*/
Index(Node<K, V> node, Index<K, V> down, Index<K, V> right) {
this.node = node;
this.down = down;
this.right = right;
}
//...
}
上图中的node属性不存储实际数据,指向Node节点。
down属性:每个Index节点,必须有一个指针,指向其下一个Level对应的节点。
right属性:Index也组成单向链表。
整个ConcurrentSkipListMap就只需要记录顶层的head节点即可:
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
// ...
private transient Index<K,V> head;
// ...
}
下面详细分析如何从跳查表上查找、插入和删除元素。
7.1.4 put实现分析
private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}
z = new Node<K,V>(key, value, n);
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;
}
}
int rnd = ThreadLocalRandom.nextSecondarySeed();
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else { // try to grow by one level
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// find insertion points and splice in
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}
if (j == insertionLevel) {
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}
if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}
在底层,节点按照从小到大的顺序排列,上面的index层间隔地串在一起,因为从小到大排列。查找的时候,从顶层index开始,自左往右、自上往下,形成图示的遍历曲线。假设要查找的元素是32,遍历过程如下:
先遍历第2层Index,发现在21的后面;
从21下降到第1层Index,从21往后遍历,发现在21和35之间;
从21下降到底层,从21往后遍历,最终发现在29和35之间。
在整个的查找过程中,范围不断缩小,最终定位到底层的两个元素之间。
在put代码中,通过findPredecessor找到了待插入的元素在[b,n]之间之后,并不能马上插入。因为其他线程也在操作这个链表,b、n都有可能被删除,所以在插入之前执行了一系列的检查逻辑,而这也正是无锁链表的复杂之处。
7.1.5 remove实现分析
// 若找到了(key, value)就删除,并返回value;找不到就返回null
final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
if (c > 0) {
b = n;
n = f;
continue;
}
if (value != null && !value.equals(v))
break outer;
if (!n.casValue(v, null))
break;
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {
findPredecessor(key, cmp); // clean index
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}
上面的删除方法和插入方法的逻辑非常类似,因为无论是插入,还是删除,都要先找到元素的前驱,也就是定位到元素所在的区间[b,n]。在定位之后,执行下面几个步骤:
- 如果发现b、n已经被删除了,则执行对应的删除清理逻辑;
- 否则,如果没有找到待删除的(k, v),返回null;
- 如果找到了待删除的元素,也就是节点n,则把n的value置为null,同时在n的后面加上Marker节点,同时检查是否需要降低Index的层次。
7.1.6 get实现分析
private V doGet(Object key) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}
无论是插入、删除,还是查找,都有相似的逻辑,都需要先定位到元素位置[b,n],然后判断b、n是否已经被删除,如果是,则需要执行相应的删除清理逻辑。这也正是无锁链表复杂的地方。
7.2 ConcurrentSkipListSet
如下面代码所示,ConcurrentSkipListSet只是对ConcurrentSkipListMap的简单封装。
public class ConcurrentSkipListSet<E>
extends AbstractSet<E>
implements NavigableSet<E>, Cloneable, java.io.Serializable {
// 封装了一个ConcurrentSkipListMap
private final ConcurrentNavigableMap<E,Object> m;
public ConcurrentSkipListSet() {
m = new ConcurrentSkipListMap<E,Object>();
}
public boolean add(E e) {
return m.putIfAbsent(e, Boolean.TRUE) == null;
}
// ...
}
以上内容就是对Java并发编程中并发容器的一些介绍,其中阻塞队列中还有很多并没有一一赘述了。