wait、notify
/**
* 生产者消费者示例
* <p>生成N个 消费N个</p>
*/
public class ProducerConsumerTest {
class Producer extends Thread {
private Queue<Integer> queue;
private int maxSize;
Producer(Queue<Integer> queue, int maxSize) {
this.queue = queue;
this.maxSize = maxSize;
}
@Override
public void run() {
synchronized (queue) {
//while 一定要放在同步代码内部
while (true) {
//如果要交替打印,可将条件改为: !queue.isEmpty()
while (queue.size() == maxSize) {
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("生产1个");
queue.add(1);
//如果多个消费者 可改为notifyAll
queue.notify();
}
}
}
}
class Consumer extends Thread {
private Queue<Integer> queue;
Consumer(Queue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
synchronized (queue) {
while (true) {
while (queue.isEmpty()) {
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("消费1个");
queue.remove();
queue.notify();
}
}
}
}
public static void main(String[] args) {
Queue<Integer> queue = new LinkedList<>();
ProducerConsumerTest test = new ProducerConsumerTest();
Producer producer = test.new Producer(queue, 10);
Consumer consumer = test.new Consumer(queue);
producer.start();
consumer.start();
}
}
lock condition
//交替打印生产和消费
public class ProducerConsumerLockTest {
private ReentrantLock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private int count;
public void set() {
lock.lock();
try {
while (true) {
while (count !=0) {
condition.await();
}
System.out.println(Thread.currentThread().getName() + "生产一个产品==");
count++;
condition.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void get() {
lock.lock();
try {
while (true) {
while (count ==0 ) {
condition.await();
}
System.out.println(Thread.currentThread().getName() + "消费一个产品");
count--;
condition.signal();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
ProducerConsumerLockTest service = new ProducerConsumerLockTest();
Thread producerThread = new Thread("producerThread") {
@Override
public void run() {
service.set();
}
};
Thread consumerThread = new Thread("consumerThread") {
@Override
public void run() {
service.get();
}
};
producerThread.start();
consumerThread.start();
}
}
BlockingQueue
//一个生成 多个消费示例
public class BlockingQueueTest {
class Producer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
while (true) {
blockingQueue.put(1);
System.out.println(Thread.currentThread().getName()+" put one==");
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
private BlockingQueue<Integer> blockingQueue;
Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
try {
while (true) {
blockingQueue.take();
System.out.println(Thread.currentThread().getName()+" get one");
Thread.sleep(200);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
BlockingQueueTest test = new BlockingQueueTest();
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(1);
new Thread(test.new Producer(blockingQueue)).start();
new Thread(test.new Consumer(blockingQueue)).start();
new Thread(test.new Consumer(blockingQueue)).start();
//exit
Thread.sleep(10 * 1000);
System.exit(0);
}
}