Java实现生产者-消费者模型的几种方法

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

生产者消费者模式实现(Java)

阻塞队列是实现生产者消费者模式的关键,本文介绍两种自定义阻塞队列的实现以及JDK 1.5 以后新增的 java.util.concurrent包中提供的阻塞队列类。

首先,阻塞队列接口:

package com.bytebeats.concurrent.queue;

/**
 * 阻塞队列接口
 *
 * @author Ricky Fung
 * @create 2017-03-26 17:28
 */
public interface IBlockingQueue<T> {

    void put(T data) throws InterruptedException;

    T take() throws InterruptedException;
}

方式1

使用 Object.wait()/notifyAll() 来实现阻塞队列。

1、阻塞队列实现

package com.bytebeats.concurrent.queue;

import java.util.LinkedList;

/**
 * 使用Object.wait()/notifyAll()实现的阻塞队列
 *
 * @author Ricky Fung
 * @create 2017-03-26 16:18
 */
public class TraditionalBlockingQueue<T> implements IBlockingQueue<T> {
    private final int queueSize;
    private final LinkedList<T> list = new LinkedList<T>();
    private final Object lock = new Object();

    public TraditionalBlockingQueue(){
        this(10);
    }
    public TraditionalBlockingQueue(int queueSize) {
        if(queueSize<1){
            throw new IllegalArgumentException("queueSize must be positive number");
        }
        this.queueSize = queueSize;
    }

    @Override
    public void put(T data) throws InterruptedException {

        synchronized (lock){
            while(list.size()>=queueSize) {
                lock.wait();
            }
            list.addLast(data);
            lock.notifyAll();
        }
    }

    @Override
    public T take() throws InterruptedException {

        synchronized(lock){
            while(list.size()<=0) {
                lock.wait();
            }
            T data = list.removeFirst();
            lock.notifyAll();
            return data;
        }
    }
}

注意要点

  1. 判定 LinkedList大小为0或者大于等于queueSize时须使用 while (condition) {},不能使用 if(condition) {}。其中 while(condition)循环,它又被叫做“自旋锁”。自旋锁以及wait()和notify()方法在线程通信这篇文章中有更加详细的介绍。为防止该线程没有收到notify()调用也从wait()中返回(也称作虚假唤醒),这个线程会重新去检查condition条件以决定当前是否可以安全地继续执行还是需要重新保持等待,而不是认为线程被唤醒了就可以安全地继续执行了。
  2. 在 take 方法取走一个元素后须调用 lock.notifyAll();,如果使用 lock.notify(); 方法在某些情况下会导致 生产者-消费者 同时处于阻塞状态。

方式2

通过Lock和Condition实现阻塞队列

package com.bytebeats.concurrent.queue;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 通过Lock和Condition实现阻塞队列
 *
 * @author Ricky Fung
 * @create 2017-03-26 17:08
 */
public class ConditionBlockingQueue<T> implements IBlockingQueue<T> {
    private final Object[] items;
    int putptr, takeptr, count;

    private final Lock lock = new ReentrantLock();
    private final Condition notFull  = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public ConditionBlockingQueue(){
        this(10);
    }
    public ConditionBlockingQueue(int queueSize) {
        if(queueSize<1){
            throw new IllegalArgumentException("queueSize must be positive number");
        }
        items = new Object[queueSize];
    }

    @Override
    public void put(T data) throws InterruptedException {

        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[putptr] = data;
            if (++putptr == items.length) {
                putptr = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public T take() throws InterruptedException {

        lock.lock();
        try {
            while (count == 0) {
                notEmpty.wait();
            }
            T data = (T) items[takeptr];
            if (++takeptr == items.length) {
                takeptr = 0;
            }
            --count;
            notFull.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }
}

方式3

JDK 1.5 以后新增的 java.util.concurrent包新增了 java.util.concurrent. BlockingQueue 接口:

A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.

并提供了如下几种阻塞队列实现:

  • java.util.concurrent.ArrayBlockingQueue
  • java.util.concurrent.LinkedBlockingQueue
  • java.util.concurrent.SynchronousQueue
  • java.util.concurrent.PriorityBlockingQueue

实现生产者-消费者模型使用 java.util.concurrent.ArrayBlockingQueue或者 java.util.concurrent.LinkedBlockingQueue即可。

测试代码

package com.bytebeats.concurrent;

import com.bytebeats.concurrent.queue.IBlockingQueue;
import com.bytebeats.concurrent.util.Constant;

/**
 * 生产者
 *
 * @author Ricky Fung
 * @create 2017-03-26 16:16
 */
public class Producer implements Runnable {
    private IBlockingQueue<String> queue;
    private int consumerNum;

    public Producer(IBlockingQueue<String> queue, int consumerNum) {
        this.queue = queue;
        this.consumerNum = consumerNum;
    }

    @Override
    public void run() {

        for(int i=0; i< 100; i++){
            try {
                queue.put("data_"+i);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        for(int i=0; i<consumerNum; i++){   //结束符
            try {
                queue.put(Constant.ENDING_SYMBOL);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        System.out.println("Producer over");
    }
}

消费者

package com.bytebeats.concurrent;

import com.bytebeats.concurrent.queue.IBlockingQueue;
import com.bytebeats.concurrent.util.Constant;

import java.util.concurrent.TimeUnit;

/**
 * 消费者
 *
 * @author Ricky Fung
 * @create 2017-03-26 16:16
 */
public class Consumer implements Runnable {
    private IBlockingQueue<String> queue;

    public Consumer(IBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {
            String data = null;
            try {
                data = queue.take();
                System.out.println("Consumer "+Thread.currentThread().getName()+" consume:"+data);
                if (Constant.ENDING_SYMBOL.equals(data)) {
                    break;
                }
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        System.out.println("Consumer over");
    }
}

我们用 一个生产者 两个消费者来做测试,如下:

package com.bytebeats.concurrent;

import com.bytebeats.concurrent.queue.ConditionBlockingQueue;
import com.bytebeats.concurrent.queue.IBlockingQueue;

/**
 * ${DESCRIPTION}
 *
 * @author Ricky Fung
 * @create 2017-03-26 16:21
 */
public class ProducerConsumerDemo {

    public static void main(String[] args) {

        //new ProducerConsumerDemo().testRun(new TraditionalBlockingQueue<String>());
        new ProducerConsumerDemo().testRun(new ConditionBlockingQueue<String>());
    }

    public void testRun(IBlockingQueue<String> queue){

        Thread producer = new Thread(new Producer(queue, 2));
        producer.start();

        Thread consumer1 = new Thread(new Consumer(queue));
        consumer1.start();
        Thread consumer2 = new Thread(new Consumer(queue));
        consumer2.start();
    }
}

如果想要使用JDK内置的阻塞队列,直接将 本例中的 com.bytebeats.concurrent.queue.IBlockingQueue替换为 java.util.concurrent.ArrayBlockingQueue或者 java.util.concurrent.LinkedBlockingQueue即可。

源码

https://github.com/TiFG/daily-codelab/tree/master/producer-consumer-impl

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容

  • /Library/Java/JavaVirtualMachines/jdk-9.jdk/Contents/Home...
    光剑书架上的书阅读 3,874评论 2 8
  • 什么是生产者消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直...
    铁甲依然在_978f阅读 166评论 0 0
  • “为每一个你所偷来的影子找到点亮生命的小小光芒,为它们找回隐匿的记忆拼图,这便是我们对你的全部请托?!?你的影子,...
    Molly_zhang阅读 171评论 0 0
  • 又是一学期的开学季,学校又瞬间从冷清变得热闹起来了。大家都面挂笑容的迎来新的学期。新学期,新的起跑线,新的...
    澄江如练阅读 188评论 0 0
  • 1 前段时间看了一档非常有趣的综艺节目《火星情报局》,几位高级特工每期都会提出几个有趣又好玩的火星提案,于嬉笑娱乐...
    卿木o阅读 1,102评论 0 11