首页 > 代码库 > JAVA多线程(七)模式-Producer Consumer

JAVA多线程(七)模式-Producer Consumer

Producer Consumer

生产者创建数据,通过中介控制流量并安全传递给消费者。

适用环境

生产者生产数据的速度与消费者处理数据的速度不一致,中介者通过缓存和阻塞对消费者的数据压力进行调整。

样例

4生产者生产产品,放入市场,2消费者消费。

产品

package ProducerConsumer;

public class Product {
	private String prdId=null;
	public Product(String prdId) {
		this.prdId=prdId;
	}
	public String getOrderId(){
		return this.prdId;
	}
}

市场

package ProducerConsumer;

import java.util.LinkedList;

public class Mart {
	private final LinkedList<Product> items=new LinkedList<Product>();
	private final int size;
	private int state=0;
	
	public Mart(int size){
		this.size=size;
	}
	
	public synchronized void put(Product prd){
		while(items.size()>=size){
			try {
				wait(100);
			} catch (InterruptedException e) {
			}
		}
		items.add(prd);
		notifyAll();
	}
		
	public synchronized Product get(){
		while(items.size()<=0&&state==0){
			try {
				wait(100);
			} catch (InterruptedException e) {
			}
		}
		Product ret=null;		
		ret=items.poll();
		notifyAll();
		return ret;
	}
	
	public synchronized boolean isEmpty(){
		if(items.size()==0){
			return true;
		}
		return false;
	}
	
	public synchronized void setState(int st){
		this.state=st;
	}
}

生产者

package ProducerConsumer;

import java.util.UUID;

public class Producer implements Runnable{
	
	private final String myName;
	private final Mart mart;
	private int state=0;
	
	public Producer(String myName,Mart mart){
		this.myName=myName;
		this.mart=mart;
	}
	
	@Override
	public void run() {
		while(state==0){
			Product p=new Product(UUID.randomUUID().toString());
			mart.put(p);
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(this.myName+" 生产的产品:"+p.getOrderId()+" 已经进入市场。");
		}
		System.out.println(this.myName+" 已停止生产。");
	}
	
	public void setState(int st){
		this.state=st;
	}

}

消费者

package ProducerConsumer;

public class Consumer implements Runnable {
	private final String myName;
	private final Mart mart;
	private int state=0;
	
	public Consumer(String myName,Mart mart){
		this.myName=myName;
		this.mart=mart;
	}
	
	@Override
	public void run() {
		while(state==0){
			Product p=mart.get();
			if(p!=null){
				try {
					Thread.sleep(200);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(this.myName+" 消费了产品:"+p.getOrderId()+"。");
			}
		}
		System.out.println(this.myName+" 已停止消费。");
	}
	
	public void setState(int st){
		this.state=st;
	}
}

测试类

package ProducerConsumer;

public class Test {

	public static void main(String[] args) {
		System.out.println("主线开始。");
		Mart m=new Mart(10);
		Producer p1=new Producer("p1",m);
		Producer p2=new Producer("p2",m);
		Producer p3=new Producer("p3",m);
		Producer p4=new Producer("p4",m);
		
		Consumer c1=new Consumer("c1",m);
		Consumer c2=new Consumer("c2",m);
		
		Thread ctd1=new Thread(c1);
		Thread ctd2=new Thread(c2);
		
		Thread ptd1=new Thread(p1);
		Thread ptd2=new Thread(p2);
		Thread ptd3=new Thread(p3);
		Thread ptd4=new Thread(p4);
		
		
		
		ptd1.start();
		ptd2.start();
		ptd3.start();
		ptd4.start();
		
		try {
			System.out.println("主线程休眠5秒。");
			Thread.sleep(5000);
			System.out.println("主线程休眠结束。");
			System.out.println("市场到达饱和,生产者被阻塞,开启消费者。");
			ctd1.start();
			ctd2.start();			
			System.out.println("主线程再次休眠5秒。");
			Thread.sleep(5000);
			System.out.println("主线程休眠结束。");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("设置生产者毒丸。");
		p1.setState(-1);
		p2.setState(-1);
		p3.setState(-1);
		p4.setState(-1);
		System.out.println("等待队列清空。");
		while(true){
			if(!m.isEmpty()){
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}else{
				break;
			}
		}
		System.out.println("队列已清空。");
		m.setState(-1);
		System.out.println("设置消费者毒丸。");
		c1.setState(-1);
		c2.setState(-1);
		
	}

}

本文出自 “JAVA技术栈笔记” 博客,请务必保留此出处http://stroll.blog.51cto.com/11038467/1856913

JAVA多线程(七)模式-Producer Consumer