12. SynchronousQueue

SynchronousQueue类实现了BlockingQueue接口。阅读BlockingQueue文本以获取有关的更多信息。

SynchronousQueue是一个内部只能包含一个元素的队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。同样,如果线程尝试获取元素并且当前不存在任何元素,则该线程将被阻塞,直到线程将元素插入队列。

将这个类称为队列有点夸大其词。这更像是一个点。

源码

SynchronousQueue的内部实现了两个类,一个是TransferStack类,使用LIFO顺序存储元素,这个类用于非公平模式;还有一个类是TransferQueue,使用FIFI顺序存储元素,这个类用于公平模式。这两个类继承自"Nonblocking Concurrent Objects with Condition Synchronization"算法,此算法是由W. N. Scherer III 和 M. L. Scott提出的,关于此算法的理论内容在这个网站中:http://www.cs.rochester.edu/u/scott/synchronization/pseudocode/duals.html。两个类的性能差不多,FIFO通常用于在竞争下支持更高的吞吐量,而LIFO在一般的应用中保证更高的线程局部性。

队列(或者栈)的节点在任何时间要么是"date"模式 —— 通过put操作提供的元素的模式,要么是"request"模式 —— 通过take操作取出元素的模式,要么为空?;褂幸桓瞿J绞?fulfill"模式,当队列有一个data节点时,请求从队列中获取一个元素就会构造一个"fulfill"模式的节点,反之亦然。这个类最有趣的特性在于任何操作都能够计算出现在队列头节点处于什么模式,然后根据它进行操作而无需使用锁。

队列和栈都继承了抽象类Transferer,这个类只定义了一个方法transfer,此方法可以既可以执行put也可以执行take操作。这两个操作被统一到了一个方法中,因为在dual数据结构中,put和take操作是对称的,所以相近的所有结点都可以被结合。使用transfer方法是从长远来看的,它相比分为两个几乎重复的部分来说更加容易理解。

队列和栈数据结构在概念上有许多相似性,但是在真正的实现细节上却几乎没有什么相似的地方。为了简单起见,它们保持清晰,这样在以后它们能以不同的方法扩展。

SynchronousQueue中使用的队列和栈的算法和"Nonblocking Concurrent Objects with Condition Synchronization"算法相比是不同的版本,包括对取消的处理。主要的差别如下:

  1. 最初的算法使用了位标记指针,但是此类在结点中使用了模式位,这导致了很多深入的改变。
  2. SynchronousQueue必须阻塞线程,直到变为fulfilled模式。
  3. 支持取消操作,通过超时和中断方式,包括清除被取消的结点/线程,以避免无法进行垃圾回收和无用的内存消耗。

阻塞主要通过LockSupportpark/unpark方法完成,除了下一个结点将要变为fulfilled模式的情况,这时会在多处理器机器中使用自旋等待。在非常忙碌的SynchronousQueue中,自旋可以显著改变吞吐量。而在不忙碌的情况下,自旋的次数就会变的足够小,不会影响性能。

清除操作在队列和栈中以不同的方式完成。对于队列,当结点被取消时,我们总是可以在O(1)时间立刻删除它。但是如果它被固定在队尾,它就必须等待直到其他取消操作完成。对于栈来说,我们需要以O(n)时间遍历来确保能够删除这个结点,不过这个操作可以和其他访问栈的线程同时进行。

队列和栈的父类为Transferer。它只定义了一个通用方法。

abstract static class Transferer<E> {
    /**
     * 执行put或者take操作/
     * 如果参数e非空,这个元素将被交给一个消费线程;如果为null,
     * 则请求返回一个被生产者提交的元素。
     * 如果返回的结果非空,那么元素被提交了或被接受了;如果为null,
     * 这个操作可能因为超时或者中断失败了。调用者可以通过检查
     * Thread.interrupted来区分到底是因为什么元素失败。
     */
    abstract E transfer(E e, boolean timed, long nanos);
}

TransferStack

这个类继承自Scherer-Scott的 dual stack 算法,但不完全相同,它使用结点而不是位标记指针。

static final class TransferStack<E> extends Transferer<E> {

    /* Modes for SNodes, ORed together in node fields */
    /** 表示一个未满足的消费者 */
    static final int REQUEST    = 0;
    /** 表示一个未满足的生产者 */
    static final int DATA       = 1;
    /** Node is fulfilling another unfulfilled DATA or REQUEST */
    static final int FULFILLING = 2;

    static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }

    /** Node class for TransferStacks. */
    static final class SNode {
        volatile SNode next;        // 栈中的下一个结点
        volatile SNode match;       // 匹配此结点的结点
        volatile Thread waiter;     // 控制 park/unpark
        Object item;                // 数据
        int mode;

核心算法 transfer

使用put操作时参数e不为空,而使用take操作时参数e为null,而timednanos指定是否使用超时。

  1. 如果头节点为空或者已经包含了相同模式的结点,那么尝试将结点
    增加到栈中并且等待匹配。如果被取消,返回null
  2. 如果头节点是一个模式不同的结点,尝试将一个fulfilling结点加入到栈中,匹配相应的等待结点,然后一起从栈中弹出,并且返回匹配的元素。匹配和弹出操作可能无法进行,由于其他线程正在执行操作3
  3. 如果栈顶已经有了一个fulfilling结点,帮助它完成它的匹配和弹出操作,然后继续。
E transfer(E e, boolean timed, long nanos) {
    /*
     * 基础算法,循环尝试下面三种操作中的一个:
     *
     * 1. 如果头节点为空或者已经包含了相同模式的结点,尝试将结点
     *    增加到栈中并且等待匹配。如果被取消,返回null
     *
     * 2. 如果头节点是一个模式不同的结点,尝试将一个`fulfilling`结点加入
     *    到栈中,匹配相应的等待结点,然后一起从栈中弹出,
     *    并且返回匹配的元素。匹配和弹出操作可能无法进行,
     *    由于其他线程正在执行操作3
     *
     * 3. 如果栈顶已经有了一个`fulfilling`结点,帮助它完成
     *    它的匹配和弹出操作,然后继续。
     */

    SNode s = null; // constructed/reused as needed
    // 传入参数为null代表请求获取一个元素,否则表示插入元素
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        // 如果头节点为空或者和当前模式相同
        if (h == null || h.mode == mode) {  // empty or same-mode
            // 设置超时时间为 0,立刻返回
            if (timed && nanos <= 0L) {     // can't wait
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            // 构造一个结点并且设为头节点
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                // 等待满足
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        // 检查头节点是否为FULFILLIING
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            // 更新头节点为自己
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                // 循环直到匹配成功
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        // 帮助满足的结点匹配
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

当一个结点插入到栈中,它要么能和其他结点匹配然后一起出栈,否则就需要等待一个匹配的结点到来。在等待的过程中,一般使用自旋等待代替阻塞(在多处理器环境下),因为很有可能会有相应结点到来。如果自旋结束还没有匹配,那么就设置waiter然后阻塞自己,在阻塞自己之前还会再检查至少一次是否有匹配的结点。

如果等待的过程中由于超时到期或者中断,那么需要取消此节点,方法是将match字段指向自己,然后返回。

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = shouldSpin(s)
        ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
        : 0;
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel();
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0) {
            Thread.onSpinWait();
            spins = shouldSpin(s) ? (spins - 1) : 0;
        }
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanos);
    }
}

使用TransferStack即SynchronousQueue的非公平模式时,先put再take结点变化如下(注意DATA节点是插入线程构造的,而REQUEST是提取元素的线程的模式,此节点在构造时会变为FULFILLING节点,此处依然使用REQUEST以指代是take线程):

如果先take再put时,插入线程则会构建一个模式为[11]的结点,而11 & FULFILLING != 0, 所以isFulfilling(h.mode)方法会返回true。

清除

在最坏的情况我们需要遍历整个栈来删除节点s。如果有多个线程并发调用clean方法,我们不会知道其他线程可能已经删除了此节点。

void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread

    /*
     * At worst we may need to traverse entire stack to unlink
     * s. If there are multiple concurrent calls to clean, we
     * might not see s if another thread has already removed
     * it. But we can stop when we see any node known to
     * follow s. We use s.next unless it too is cancelled, in
     * which case we try the node one past. We don't check any
     * further because we don't want to doubly traverse just to
     * find sentinel.
     */

    SNode past = s.next;
    if (past != null && past.isCancelled())
        past = past.next;

    // 删除头部被取消的节点
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        casHead(p, p.next);

    // 移除中间的节点
    while (p != null && p != past) {
        SNode n = p.next;
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            p = n;
    }
}

TransferQueue

static final class TransferQueue<E> extends Transferer<E> {
    /*
     * This extends Scherer-Scott dual queue algorithm, differing,
     * among other ways, by using modes within nodes rather than
     * marked pointers. The algorithm is a little simpler than
     * that for stacks because fulfillers do not need explicit
     * nodes, and matching is done by CAS'ing QNode.item field
     * from non-null to null (for put) or vice versa (for take).
     */

    /** Node class for TransferQueue. */
    static final class QNode {
        volatile QNode next;          // next node in queue
        volatile Object item;         // CAS'ed to or from null
        volatile Thread waiter;       // to control park/unpark
        final boolean isData;

transfer方法

  1. 如果队列为空或者头节点模式和自己的模式相同,尝试将自己增加到队列的等待者中,等待被满足或者被取消
  2. 如果队列包含了在等待的节点,并且本次调用是与之模式匹配的调用,尝试通过CAS修改等待节点item字段然后将其出队
E transfer(E e, boolean timed, long nanos) {
    /* Basic algorithm is to loop trying to take either of
     * two actions:
     *
     * 1. If queue apparently empty or holding same-mode nodes,
     *    try to add node to queue of waiters, wait to be
     *    fulfilled (or cancelled) and return matching item.
     *
     * 2. If queue apparently contains waiting items, and this
     *    call is of complementary mode, try to fulfill by CAS'ing
     *    item field of waiting node and dequeuing it, and then
     *    returning matching item.
     *
     * In each case, along the way, check for and try to help
     * advance head and tail on behalf of other stalled/slow
     * threads.
     *
     * The loop starts off with a null check guarding against
     * seeing uninitialized head or tail values. This never
     * happens in current SynchronousQueue, but could if
     * callers held non-volatile/final ref to the
     * transferer. The check is here anyway because it places
     * null checks at top of loop, which is usually faster
     * than having them implicitly interspersed.
     */

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        // 如果队列为空或者模式与头节点相同
        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            // 如果有其他线程修改了tail,进入下一循环重读
            if (t != tail)                  // inconsistent read
                continue;
            // 如果有其他线程修改了tail,尝试cas更新尾节点,进入下一循环重读
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            // 超时返回
            if (timed && nanos <= 0L)       // can't wait
                return null;
            // 构建一个新节点
            if (s == null)
                s = new QNode(e, isData);
            // 尝试CAS设置尾节点的next字段指向自己
            // 如果失败,重试
            if (!t.casNext(null, s))        // failed to link in
                continue;
      
            // cas设置当前节点为尾节点
            advanceTail(t, s);              // swing tail and wait
            // 等待匹配的节点
            Object x = awaitFulfill(s, e, timed, nanos);
            // 如果被取消,删除自己,返回null
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            // 如果此节点没有被模式匹配的线程出队
            // 那么自己进行出队操作
            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            // 数据不一致,重读
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled     m已经匹配成功了
                x == m ||                   // m cancelled             m被取消了
                !m.casItem(x, e)) {         // lost CAS                CAS竞争失败
                // 上面三个条件无论哪一个满足,都证明m已经失效无用了,
                // 需要将其出队
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            // 成功匹配,依然需要将节点出队
            advanceHead(h, m);              // successfully fulfilled
            // 唤醒匹配节点,如果它被阻塞了
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    /* Same idea as TransferStack.awaitFulfill */
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = (head.next == s)
        ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS)
        : 0;
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel(e);
        Object x = s.item;
        // item被修改后返回
        // 如果put操作在此等待,item会被更新为null
        // 如果take操作再次等待,item会由null变为一个值
        if (x != e)
            return x;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0) {
            --spins;
            Thread.onSpinWait();
        }
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this);
        else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
            LockSupport.parkNanos(this, nanos);
    }
}

使用TransferQueue即公平模式插入节点,队列的变化如下:

注意匹配的时候item的变化。

public operations

SynchronousQueue类的公共操作都是依赖于transfer方法完成的,注意不同的方法调用transfer方法时提供的参数。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    return transferer.transfer(e, true, 0) != null;
}

------------------------------------------------------------------

public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    E e = transferer.transfer(null, true, unit.toNanos(timeout));
    if (e != null || !Thread.interrupted())
        return e;
    throw new InterruptedException();
}

public E poll() {
    return transferer.transfer(null, true, 0);
}

核心要点

  1. 可以指定锁的公平性
  2. 队列内部不会存储元素,所以尽量避免使用add,offer此类立即返回的方法,除非有特殊需求
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容

  • Java8张图 11、字符串不变性 12、equals()方法、hashCode()方法的区别 13、...
    Miley_MOJIE阅读 3,698评论 0 11
  • 一些概念 数据结构就是研究数据的逻辑结构和物理结构以及它们之间相互关系,并对这种结构定义相应的运算,而且确保经过这...
    Winterfell_Z阅读 5,736评论 0 13
  • 1)这本书为什么值得看: Python语言描述,如果学的Python用这本书学数据结构更合适 2016年出版,内容...
    孙怀阔阅读 12,463评论 0 15
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,094评论 1 32
  • 我生命中的源泉, 是故乡。 我依恋着的土地 是故乡。 我心目中的那个幻城, 是故乡。 二十多年的相爱相思, 都化作...
    高文静5966阅读 215评论 1 3