首页 > 代码库 > 生产者-消费者问题【Java实现】

生产者-消费者问题【Java实现】

 综合示例,演示有限长度字符序列缓冲区的并发读写, 或者称 生产者 - 消费者问题。错漏之处, 恳请指出 ^_^

 

/**  * PCProblem :  * 模拟生产者-消费者问题, 生产者产生字符并写入字符序列缓冲区, 消费者从缓冲区取走字符 *  * @author shuqin1984 2011-08-05 *  */package threadprogramming.basic.simulation;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class PCProblem {		public static void main(String[] args) {				System.out.println(" ---- Thread main starts up ---- ");				// 模拟 生产者 - 消费者 任务				SharedCharBuffer sharedBuffer = new SharedCharBuffer(10);		ExecutorService es = Executors.newCachedThreadPool();		for (int i=1; i <= 10; i++) {			es.execute(new ProducerThread(i, sharedBuffer));			es.execute(new ConsumerThread(i, sharedBuffer));		}		es.shutdown();					// 运行 5 秒后终止模拟 				try {			TimeUnit.SECONDS.sleep(5);		} catch (InterruptedException e) {			e.printStackTrace();		}				ProducerThread.cancel();		ConsumerThread.cancel();		es.shutdownNow();				System.out.println("Time to be over.");			}}

  生产者: Producer.java

  

/** * ProducerThread: 生产者线程 */package threadprogramming.basic.simulation;import java.util.Random;import java.util.concurrent.TimeUnit;public class ProducerThread extends Thread {	 	private static String str = "abc1defg2hijk3lmno4pqrs5tuvwx6yz" +    "AB7CDEF8GHIJK9LMNO0PQR_STU*VWXYZ";		private static volatile boolean endflag = false;		private final int id;		private SharedCharBuffer buffer;		public ProducerThread(int id, SharedCharBuffer buffer) {		this.id = id;		this.buffer = buffer;	}		public static void cancel() {		endflag = true;	}		public boolean isCanceled() {		return endflag == true;	}		/**	 * 生产者任务: 只要任务不取消,且缓冲区不满,就往缓冲区中字符	 */	public void run()	{		while (!isCanceled()  && !Thread.interrupted()) {			synchronized (buffer) {				while (buffer.isFull()) {					  // 缓冲区已满,生产者必须等待				    try {						buffer.wait();					} catch (InterruptedException e) {						System.out.println(this + " Interrupted.");					} 				}				char ch = produce();				System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " 准备写缓冲区:" + ch);				buffer.write(ch);				System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " :\t\t\t" + buffer);				buffer.notifyAll();			}			try {				TimeUnit.MILLISECONDS.sleep(100);			} catch (InterruptedException e) {				System.out.println(this + " Interrupted.");			}		}		System.out.println("Exit from: " + this);	}		public char produce()	{		Random rand = new Random(); 		return str.charAt(rand.nextInt(64));	}	    public String toString()    {    	return "P[" + id + "]";    }}

   消费者: 

/** * ConsumerThread:  消费者线程 *  */package threadprogramming.basic.simulation;import java.util.concurrent.TimeUnit;public class ConsumerThread implements Runnable {		private static volatile boolean endflag = false;		private final int id;		private SharedCharBuffer buffer;		public ConsumerThread(int id, SharedCharBuffer buffer) {		this.id = id;		this.buffer = buffer;	}	public static void cancel() {		endflag = true;	}		public boolean isCanceled() {		return endflag == true;	}		/**	 * consume:		 * 当缓冲区buffer中有字符时,就取出字符显示【相当于消费者】。	 * 	 */	public char consume() {					return buffer.fetch();	}		/**	 * 消费者任务: 只要任务不取消,且缓冲区不被置空,就从缓冲区中取出字符消费。	 */	public void run() {							while (!isCanceled() && !Thread.interrupted()) {				synchronized (buffer) {				while (buffer.isEmpty()) {					try {						buffer.wait();					} catch (InterruptedException e) {						System.out.println(this + " Interrupted.");					}				}				System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " 取出字符: " + consume());				System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " :\t\t\t" + buffer);				buffer.notifyAll();			}			try {				TimeUnit.MILLISECONDS.sleep(100);			} catch (InterruptedException e) {				System.out.println(this + " Interrupted.");			}		}    		System.out.println("Exit from: " + this);	}		public String toString() {		return "C[" + id + "]";	}}

  有限字符缓冲区: SharedCharBuffer.java

/** * SharedCharBuffer: 有限长度字符缓冲区 *  */package threadprogramming.basic.simulation;public class SharedCharBuffer {		private char[] charBuffer;        // 用来生产和消费的有限长度字符缓冲区	private int  front;               // 将要取字符的位置下标	private int  rear;                // 将要写字符的位置下标		public SharedCharBuffer(int capacity) {				if (charBuffer == null) {			charBuffer = new char[capacity];		}			front = rear = 0;	}		/**	 * 判断缓冲区是否已满,满则生产者等待	 */	public synchronized boolean isFull()	{		return (rear+1) % charBuffer.length == front;	}		/**	 * 判断缓冲区是否为空,空则消费者等待	 */	public synchronized boolean isEmpty()	{		return front == rear;	}		/**	 * write: 将给定字符写入缓冲区中【改变了缓冲区内容】	 * synchronized 关键字用于实现互斥访问缓冲区	 * @param ch character that will be written into the buffer.	 * 	 */	public synchronized void write(char ch) {				      charBuffer[rear] = ch;	      rear = (rear+1) % charBuffer.length;          	}		/**	 * read: 读取缓冲区中给定位置的字符【不改变缓冲区内容】	 * synchronized 关键字用于实现互斥访问缓冲区	 * 	 */	public synchronized char read(int index) {		 		return charBuffer[index]; 	}		/**	 * fetch: 取出缓冲区给定位置的字符【改变了缓冲区内容】	 * synchronized 关键字用于实现互斥访问缓冲区	 *	 */	public synchronized char fetch() {  		char ch = charBuffer[front];		front = (front + 1) % charBuffer.length;		return ch;		}		/**	 * getStringOfBuffer: 缓冲区内容的字符串表示	 * @return  string representation of the buffer‘s contents	 * 	 */    public synchronized String toString() {				if (isEmpty()) {			return "缓冲区为空!";		}				StringBuilder bufferstr = new StringBuilder("缓冲区内容: ");			int i = front;		while ((i+1)% charBuffer.length != rear) {			bufferstr.append(charBuffer[i]);			i = (i+1) % charBuffer.length;		}		bufferstr.append(charBuffer[i]);		return bufferstr.toString();			}}

  

生产者-消费者问题【Java实现】