首页 > 代码库 > 生产者-消费者问题【Java实现】
生产者-消费者问题【Java实现】
综合示例,演示有限长度字符序列缓冲区的并发读写, 或者称 生产者 - 消费者问题。错漏之处, 恳请指出 ^_^
/** * PCProblem : * 模拟生产者-消费者问题, 生产者产生字符并写入字符序列缓冲区, 消费者从缓冲区取走字符 * * @author shuqin1984 2011-08-05 * */package threadprogramming.basic.simulation;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;public class PCProblem { public static void main(String[] args) { System.out.println(" ---- Thread main starts up ---- "); // 模拟 生产者 - 消费者 任务 SharedCharBuffer sharedBuffer = new SharedCharBuffer(10); ExecutorService es = Executors.newCachedThreadPool(); for (int i=1; i <= 10; i++) { es.execute(new ProducerThread(i, sharedBuffer)); es.execute(new ConsumerThread(i, sharedBuffer)); } es.shutdown(); // 运行 5 秒后终止模拟 try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } ProducerThread.cancel(); ConsumerThread.cancel(); es.shutdownNow(); System.out.println("Time to be over."); }}
生产者: Producer.java
/** * ProducerThread: 生产者线程 */package threadprogramming.basic.simulation;import java.util.Random;import java.util.concurrent.TimeUnit;public class ProducerThread extends Thread { private static String str = "abc1defg2hijk3lmno4pqrs5tuvwx6yz" + "AB7CDEF8GHIJK9LMNO0PQR_STU*VWXYZ"; private static volatile boolean endflag = false; private final int id; private SharedCharBuffer buffer; public ProducerThread(int id, SharedCharBuffer buffer) { this.id = id; this.buffer = buffer; } public static void cancel() { endflag = true; } public boolean isCanceled() { return endflag == true; } /** * 生产者任务: 只要任务不取消,且缓冲区不满,就往缓冲区中字符 */ public void run() { while (!isCanceled() && !Thread.interrupted()) { synchronized (buffer) { while (buffer.isFull()) { // 缓冲区已满,生产者必须等待 try { buffer.wait(); } catch (InterruptedException e) { System.out.println(this + " Interrupted."); } } char ch = produce(); System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " 准备写缓冲区:" + ch); buffer.write(ch); System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " :\t\t\t" + buffer); buffer.notifyAll(); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { System.out.println(this + " Interrupted."); } } System.out.println("Exit from: " + this); } public char produce() { Random rand = new Random(); return str.charAt(rand.nextInt(64)); } public String toString() { return "P[" + id + "]"; }}
消费者:
/** * ConsumerThread: 消费者线程 * */package threadprogramming.basic.simulation;import java.util.concurrent.TimeUnit;public class ConsumerThread implements Runnable { private static volatile boolean endflag = false; private final int id; private SharedCharBuffer buffer; public ConsumerThread(int id, SharedCharBuffer buffer) { this.id = id; this.buffer = buffer; } public static void cancel() { endflag = true; } public boolean isCanceled() { return endflag == true; } /** * consume: * 当缓冲区buffer中有字符时,就取出字符显示【相当于消费者】。 * */ public char consume() { return buffer.fetch(); } /** * 消费者任务: 只要任务不取消,且缓冲区不被置空,就从缓冲区中取出字符消费。 */ public void run() { while (!isCanceled() && !Thread.interrupted()) { synchronized (buffer) { while (buffer.isEmpty()) { try { buffer.wait(); } catch (InterruptedException e) { System.out.println(this + " Interrupted."); } } System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " 取出字符: " + consume()); System.out.println(TimeIndicator.getcurrTime() + ":\t" + this + " :\t\t\t" + buffer); buffer.notifyAll(); } try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { System.out.println(this + " Interrupted."); } } System.out.println("Exit from: " + this); } public String toString() { return "C[" + id + "]"; }}
有限字符缓冲区: SharedCharBuffer.java
/** * SharedCharBuffer: 有限长度字符缓冲区 * */package threadprogramming.basic.simulation;public class SharedCharBuffer { private char[] charBuffer; // 用来生产和消费的有限长度字符缓冲区 private int front; // 将要取字符的位置下标 private int rear; // 将要写字符的位置下标 public SharedCharBuffer(int capacity) { if (charBuffer == null) { charBuffer = new char[capacity]; } front = rear = 0; } /** * 判断缓冲区是否已满,满则生产者等待 */ public synchronized boolean isFull() { return (rear+1) % charBuffer.length == front; } /** * 判断缓冲区是否为空,空则消费者等待 */ public synchronized boolean isEmpty() { return front == rear; } /** * write: 将给定字符写入缓冲区中【改变了缓冲区内容】 * synchronized 关键字用于实现互斥访问缓冲区 * @param ch character that will be written into the buffer. * */ public synchronized void write(char ch) { charBuffer[rear] = ch; rear = (rear+1) % charBuffer.length; } /** * read: 读取缓冲区中给定位置的字符【不改变缓冲区内容】 * synchronized 关键字用于实现互斥访问缓冲区 * */ public synchronized char read(int index) { return charBuffer[index]; } /** * fetch: 取出缓冲区给定位置的字符【改变了缓冲区内容】 * synchronized 关键字用于实现互斥访问缓冲区 * */ public synchronized char fetch() { char ch = charBuffer[front]; front = (front + 1) % charBuffer.length; return ch; } /** * getStringOfBuffer: 缓冲区内容的字符串表示 * @return string representation of the buffer‘s contents * */ public synchronized String toString() { if (isEmpty()) { return "缓冲区为空!"; } StringBuilder bufferstr = new StringBuilder("缓冲区内容: "); int i = front; while ((i+1)% charBuffer.length != rear) { bufferstr.append(charBuffer[i]); i = (i+1) % charBuffer.length; } bufferstr.append(charBuffer[i]); return bufferstr.toString(); }}
生产者-消费者问题【Java实现】
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。