首页 > 代码库 > java 线程 生产者-消费者与队列,任务间使用管道进行输入、输出 讲解示例 --thinking java4

java 线程 生产者-消费者与队列,任务间使用管道进行输入、输出 讲解示例 --thinking java4

package org.rui.thread.block2;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;

import org.rui.thread.LiftOff;

/**
 * 生产者-消费者与队列
 * 
 * @author lenovo
 * 
 */

class LiftOffRunner implements Runnable {

	private BlockingQueue<LiftOff> rockets;

	public LiftOffRunner(BlockingQueue<LiftOff> b) {
		rockets = b;
	}

	//添加一个任务到队列
	public void add(LiftOff lo) {
		//将指定元素插入此队列中(如果立即可行且不会违反容量限制),
		try {
			rockets.put(lo);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

	@Override
	public void run() {

		try {
			while (!Thread.interrupted()) {
				// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
				LiftOff rocket = rockets.take();
				rocket.run();
			}

		} catch (InterruptedException e) {
			System.out.println("中断退出");
		}
		System.out.println("x exiting liftOffRunner");

	}
}

public class TestBlockingQueues {
	
	static void getkey() {
		try {
			// compensate for windows/linux difference in the
			// 回车键产生的结果
			new BufferedReader(new InputStreamReader(System.in)).readLine();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	static void getkey(String message) {
		System.out.println(message);
		getkey();
	}

	static void tets(String msg, BlockingQueue<LiftOff> queue) {
		System.out.println(msg);
		LiftOffRunner runner = new LiftOffRunner(queue);
		
		//启动一个线程
		Thread t = new Thread(runner);
		t.start();
		
		for (int i = 0; i < 5; i++) {
			//加入任务到LiftOffRunner队列中
			runner.add(new LiftOff(5));
		}
		
		//输入控制台
		getkey("press 'enter' (" + msg + ")");
		t.interrupt();
		System.out.println(" 完了 " + msg + "test");

	}

	public static void main(String[] args) {
		tets("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>());// unlimited																		// size
		tets("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>(3));// fied																		// size
		tets("SynchronousQueue", new SynchronousQueue<LiftOff>());// size of 1

	}

}


package org.rui.thread.block2;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * 吐司BlockingQueue
 * @author lenovo
 *
 */

class Toast {
	public enum Status {
		DRY/* 干的 */, BUTTERED/* 涂黄油 */, JAMMED// 果酱
	}

	private Status status = Status.DRY;
	private final int id;

	public Toast(int idn) {
		id = idn;
	}

	public void butter() {
		status = Status.BUTTERED;
	}

	public void jam() {
		status = Status.JAMMED;
	}

	public Status getStatus() {
		return status;
	}

	public int getId() {
		return id;
	}

	public String toString() {
		return "Toast " + id + ":" + status;
	}
}

/**
 * 吐司队列
 * 
 * @author lenovo
 * 
 */
class ToastQueue extends LinkedBlockingQueue<Toast> {
}

class Toaster implements Runnable {
	private ToastQueue toastQueue;
	private int count = 0;
	private Random rand = new Random(47);

	public Toaster(ToastQueue tq) {
		toastQueue = tq;
	}

	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
				// 制作 toast
				Toast t = new Toast(count++);
				System.out.println(t);
				// insert into queue
				toastQueue.put(t);

			}
		} catch (InterruptedException e) {
			System.out.println("Toaster interrupted");
		}
		System.out.println("toaster off");
	}
}

// apply butter to toast
class Butterer implements Runnable {
	private ToastQueue dryQueue, butteredQueue;

	public Butterer(ToastQueue dry, ToastQueue buttered) {
		dryQueue = dry;
		butteredQueue = buttered;
	}

	@Override
	public void run() {
		try {

			while (!Thread.interrupted()) {
				// blocks until next piece of toast is available 块,直到下一块面包
				Toast t = dryQueue.take();
				t.butter();
				System.out.println(t);
				butteredQueue.put(t);
			}
		} catch (InterruptedException e) {
			System.out.println("涂黄油 interrupted");
		}
		System.out.println("涂黄油 off");
	}

}

// apply jam to buttered toast
class Jammer implements Runnable {
	private ToastQueue butteredQueue, finishedQueue;

	public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
		this.butteredQueue = butteredQueue;
		this.finishedQueue = finishedQueue;
	}

	@Override
	public void run() {
		try {

			while (!Thread.interrupted()) {
				// blocks until next piece of toast is available 块,直到下一块面包
				Toast t = butteredQueue.take();
				t.jam();
				System.out.println(t);
				finishedQueue.put(t);

			}
		} catch (InterruptedException e) {
			System.out.println("涂果酱 interrupted");
		}
		System.out.println("涂果酱 off");
	}

}

// ////使用烤面包 consume the toast
class Eater implements Runnable {
	private ToastQueue finishedQueue;
	private int counter = 0;

	public Eater(ToastQueue finished) {
		finishedQueue = finished;
	}

	@Override
	public void run() {
		try {

			while (!Thread.interrupted()) {
				Toast t = finishedQueue.take();
				// verify that the toast is coming in order 确认面包来了
				// and that all pieces are getting jammed ,所有碎片越来越挤
				if (t.getId() != counter++
						|| t.getStatus() != Toast.Status.JAMMED) {
					System.out.println("===>>>>error" + t);
					System.exit(1);

				} else {
					System.out.println("吃!" + t);
				}

			}
		} catch (InterruptedException e) {
			System.out.println("食者 interrupted");
		}
		System.out.println(" 食者 off");
	}
}

/**
 * main
 * 
 * @author lenovo
 * 
 */
public class ToastOMatic {

	public static void main(String[] args) throws InterruptedException {
		ToastQueue dryQueue = new ToastQueue();
		ToastQueue butteredQueue = new ToastQueue();
		ToastQueue finishedQueue = new ToastQueue();
		
		ExecutorService exec = Executors.newCachedThreadPool();
		exec.execute(new Toaster(dryQueue));//烤面包
		exec.execute(new Butterer(dryQueue, butteredQueue));//涂黄油
		exec.execute(new Jammer(butteredQueue, finishedQueue));//上果酱
		exec.execute(new Eater(finishedQueue));//吃
		TimeUnit.SECONDS.sleep(5);
		exec.shutdownNow();

	}
}
/**output:
 Toast 0:DRY
Toast 0:BUTTERED
Toast 0:JAMMED
吃!Toast 0:JAMMED
Toast 1:DRY
Toast 1:BUTTERED
Toast 1:JAMMED
吃!Toast 1:JAMMED
Toast 2:DRY
Toast 2:BUTTERED
Toast 2:JAMMED
吃!Toast 2:JAMMED
...
...
Toast 10:DRY
Toast 10:BUTTERED
Toast 10:JAMMED
吃!Toast 10:JAMMED
Toast 11:DRY
Toast 11:BUTTERED
Toast 11:JAMMED
吃!Toast 11:JAMMED
Toast 12:DRY
Toast 12:BUTTERED
Toast 12:JAMMED
吃!Toast 12:JAMMED
Toast 13:DRY
Toast 13:BUTTERED
Toast 13:JAMMED
吃!Toast 13:JAMMED
Toast 14:DRY
Toast 14:BUTTERED
Toast 14:JAMMED
吃!Toast 14:JAMMED
食者 interrupted
Toaster interrupted
 食者 off
涂果酱 interrupted
涂果酱 off
涂黄油 interrupted
涂黄油 off
toaster off

 */

package org.rui.thread.block2;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 任务间使用管道进行输入、输出
 * 
 * @author lenovo
 * 
 */
class Sender implements Runnable {
	private Random rand = new Random(47);
	private PipedWriter out = new PipedWriter();

	public PipedWriter getPipedWriter() {
		return out;
	}

	@Override
	public void run() {
		try {
			while (true) {
				for (char c = 'A'; c <= 'z'; c++) {
					out.write(c);
					TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));

				}
			}
		} catch (IOException e) {
			System.out.println(e + " sender write Exception");
		} catch (InterruptedException e) {
			System.out.println(e + " sender sleep interrupted");
		}

	}

}

class Receiver implements Runnable {

	private PipedReader in;

	public Receiver(Sender sender) throws IOException {
		in = new PipedReader(sender.getPipedWriter());
	}

	@Override
	public void run() {
		try {
			while (true) {
				// blocks until characters are there
				System.out.println("Read:" + (char) in.read() + ",");

			}
		} catch (IOException e) {
			System.out.println(e+"receiver read execption");
		}

	}

}

public class PipedIO {
	// 接收器 Receiver
	public static void main(String[] args) throws IOException, InterruptedException {
		Sender sender = new Sender();
		Receiver receiver = new Receiver(sender);
		
		ExecutorService exec=Executors.newCachedThreadPool();
		exec.execute(sender);
		exec.execute(receiver);
		
		TimeUnit.SECONDS.sleep(4);
		exec.shutdownNow();
		
	}
}

/**outpt:
Read:A,
Read:B,
Read:C,
Read:D,
Read:E,
Read:F,
Read:G,
Read:H,
Read:I,
Read:J,
Read:K,
Read:L,
Read:M,
Read:N,
Read:O,
Read:P,
java.lang.InterruptedException: sleep interrupted sender sleep interrupted
Read:Q,
java.io.IOException: Write end deadreceiver read execption

 */















java 线程 生产者-消费者与队列,任务间使用管道进行输入、输出 讲解示例 --thinking java4