首页 > 代码库 > JAVA多线程之生产者消费者

JAVA多线程之生产者消费者

生产者消费者并发编程:

假设仓库有10个仓位,分别有10个生产者和10个消费者,生产者不断生产产品,放入仓库的仓位中,而消费者则不断从仓库中获取产品,

如果仓库已满,则生产者要等待,等消费者消费后,空出仓位后,再继续放入产品。

反之如果仓库已空,则消费者要等待,等待生产者生产出产品后,再继续消费产品。

关于生产者、消费者有四种实现方式

1,wait,nofity方式

2,ReentrantLock锁的await()和signal()

3,阻塞队列的方式

4,Semaphore 信号量方式

 

下面分别以这四种方式实现

1,wait,nofity方式

package producer;

public class WareHouse {
    private Object[] space = new Object[10];
    private int currentPutIdx = 0;
    private int currentGetIdx = 0;
    private volatile int count = 0;

    public synchronized void putProduct(Object o) throws InterruptedException{
        while (count >= 10) { // 库位是否已满,如果满就等待
                wait();
        }
        if (currentPutIdx > (space.length - 1)) {
            currentPutIdx = 0;
        }
        space[currentPutIdx++] = o;
        synchronized (this) {
            count++;
        }
        System.out.println("当前线程:" + Thread.currentThread().getName()+ " 放入一个资源,当前资源数:" + count);
        notifyAll();
    }

    public synchronized void getProduct() throws InterruptedException{
        while (count <= 0) { // 已空,等待
                wait();
        }
        if (currentGetIdx > (space.length - 1)) {
            currentGetIdx = 0;
        }
        if (space[currentGetIdx] == null) {
            System.out.println("当前线程:" + Thread.currentThread().getName()+ " is null");
        } else {
            space[currentGetIdx++] = null;
            synchronized (this) {
                count--;
            }
            System.out.println("当前线程:" + Thread.currentThread().getName()+ " 得到一个资源,当前资源数:" + count);
            notifyAll();
        }
    }
}

 

package producer;

import java.util.Random;

public class Test {
    
    public static void main(String[] args) {
        WareHouse warehouse = new WareHouse();
        for(int i =0;i<1000;i++){
            new Thread(new Producer(warehouse), "Producer "+i).start();
        }
        for(int i =0;i<1000;i++){
            new Thread(new Consumer(warehouse), "Consumer "+i).start();
        }
    }
    
    static class Producer implements Runnable{
        private WareHouse factory;
        public Producer(WareHouse factory) {
            this.factory = factory;
        }
        public void run() {
            while(true){
                try {
                    Thread.sleep(new Random().nextInt(1000));
                    factory.putProduct(new Object());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static class Consumer implements Runnable{
        private WareHouse factory;
        public Consumer(WareHouse factory){
            this.factory = factory;
        }
        public void run() {
            while(true){
                try {
                    Thread.sleep(new Random().nextInt(1000));
                    factory.getProduct();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

2,ReentrantLock锁的await()和signal()

package sort;

import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerConsumer {
    private LinkedList<Object> myList = new LinkedList<Object>();
    private int MAX = 10;
    private final Lock lock = new ReentrantLock();
    private final Condition full = lock.newCondition();
    private final Condition empty = lock.newCondition();

    public ProducerConsumer() {
    }

    public void start() {
        new Producer().start();
        new Consumer().start();
    }

    public static void main(String[] args) throws Exception {
        ProducerConsumer s2 = new ProducerConsumer();
        s2.start();
    }

    class Producer extends Thread {
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (myList.size() == MAX) {
                        System.out.println("warning: it‘s full!");
                        full.await();
                    }
                    Object o = new Object();
                    if (myList.add(o)) {
                        System.out.println("Producer: " + o);
                        empty.signal();
                    }
                } catch (InterruptedException ie) {
                    System.out.println("producer is interrupted!");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Consumer extends Thread {
        public void run() {
            while (true) {
                lock.lock();
                try {
                    while (myList.size() == 0) {
                        System.out.println("warning: it‘s empty!");
                        empty.await();
                    }
                    Object o = myList.removeLast();
                    System.out.println("Consumer: " + o);
                    full.signal();
                } catch (InterruptedException ie) {
                    System.out.println("consumer is interrupted!");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

}

 

3,阻塞队列的方式

import java.util.concurrent.*;

public class ProducerConsumer {
    // 建立一个阻塞队列
    private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(10);

    public ProducerConsumer() {
    }

    public void start() {
        new Producer().start();
        new Consumer().start();
    }

    public static void main(String[] args) throws Exception {
        ProducerConsumer s3 = new ProducerConsumer();
        s3.start();
    }

    class Producer extends Thread {
        public void run() {
            while (true) {
                try {
                    Object o = new Object();
                    // 取出一个对象
                    queue.put(o);
                    System.out.println("Producer: " + o);
                } catch (InterruptedException e) {
                    System.out.println("producer is interrupted!");
                }
                // }
            }
        }
    }

    class Consumer extends Thread {
        public void run() {
            while (true) {
                try {
                    // 取出一个对象
                    Object o = queue.take();
                    System.out.println("Consumer: " + o);
                } catch (InterruptedException e) {
                    System.out.println("producer is interrupted!");
                }
                // }
            }
        }
    }

}

 

4,Semaphore 信号量方式

package com.test.current;

import java.util.concurrent.Semaphore;

public class TestSemaphore {
    public static void main(String[] args) {
        for (int i = 0; i <= 3; i++) {
            new Thread(new Producer()).start();
            new Thread(new Consumer()).start();
        }
    }
    //仓库  
    static WareHouse buffer = new WareHouse();  
    //生产者
    static class Producer implements Runnable{
        static int num = 1;
        public void run() {
            int n = num++;
            while(true){
                try{
                    buffer.put(n);
                    System.out.println(">"+n);
                    Thread.sleep(10);
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        
    }
    
    static class Consumer implements Runnable{
        public void run() {
            while (true) {
                try {
                    System.out.println("<"+buffer.take());
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    static class WareHouse{
        //非满锁
        final Semaphore notFull = new Semaphore(10);
        //非空锁
        final Semaphore notEmpty = new Semaphore(0);
        //互斥锁
        final Semaphore mutex = new Semaphore(1);
        //库存容量
        final Object[] items = new Object[10];
        int putPosi, takePosi, count;
        public void put(Object x)throws InterruptedException{
            try{
                notFull.acquire();
                mutex.acquire();
                items[putPosi] = x;
                if (++putPosi == items.length) {
                    putPosi = 0;
                }
                count++;
            }finally{
                notEmpty.release();
                mutex.release();
            }
        }
        
        public Object take()throws InterruptedException{
            notEmpty.acquire();
            mutex.acquire();
            try{
                Object x = items[takePosi];
                if(++takePosi == items.length){
                    takePosi = 0;
                }
                --count;
                return x;
            }finally{
                notFull.release();
                mutex.release();
            }
            
        }
    }
}