首页 > 代码库 > 漫谈并发编程(五):线程之间的协作
漫谈并发编程(五):线程之间的协作
wait()与notifyAll()
因此wait()释放锁。这就意味着还有一个任务能够获得这个锁。因此在该对象中的其它synchronized方法能够在wait()期间被调用,而其它的方法通常将会产生改变,而这样的改变正是使被挂起的任务又一次唤醒所感兴趣的变化。
- 在wait()期间对象锁是释放的
- 能够通过notify()、notifyAll(),或者令时间到期。从wait()中恢复运行。
这样的wait()将无限等待下去,直到线程接收到notify()或者notifyAll()消息。
考虑设计方式:1. 这样的东西能够单独被定义出来。 2. 在Object中提供该"东西"的实现。 明显另外一种方式要轻松方便很多。迁移性更强。其次。这样的东西可能不是线程安全的,所以须要锁来支持。
使用synchronized来进行同步的保护是理所应当,由于"东西"的实现就在Object中,其次使用synchronized的优点是一定程度能够避免由于锁不一致的情况下产生的wait()及notifyAll的不正确应。wait()在一把锁中释放了锁,和notifyAll在还有一把锁进行操作毫无相关。
class Car { private boolean waxOn = false; public synchronized void waxed() { waxOn = true; notifyAll( ); } public synchronized void buffed( ) { waxOn = false; notifyAll( ); } public synchronized void waitForWaxing( ) throws InterruptedException{ while(waxOn == false) wait( ); } public synchronized void waitForBuffing( ) throws InterruptedException { while(waxOn == true) wait( ); } } class WaxOn implements Runnable { private Car car; public WaxOn(Car c) { car = c;} public void run() { try { while(!Thread.interrupted()) { System.out.print(" Wax on!"); TimeUnit.MILLISECONDS.sleep(200); car.waxed(); car.waitForBuffing(); } } catch (InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax On task"); } } class WaxOff implements Runnable { private Car car; public WaxOff(Car c) {car = c;} public void run( ) { try { while(!Thread.interrupted()) { car.waitForWaxing(); System.out.print("Wax Off"); TimeUnit.MILLISECONDS.sleep(200); car.buffed(); } } catch(InterruptedException e) { System.out.println("Exiting via interrupt"); } System.out.println("Ending Wax Off task"); } } public class WaxOMatic { public static void main(String[] args) throws Exception{ Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff(car)); exec.execute(new WaxOn(car)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); } }
这非常重要,由于:
- 你可能有多个任务出于同样的原因在等待一个锁,而第一个唤醒任务可能已经改变这样的状况(即使你没有这么做,有人也会通过继承你的类去这么做)。假设属于这样的情况,那么这个任务应该被再次挂起,直至其感兴趣的条件发生变化。
- 也有可能某些任务处于不同的原因在等待你的对象上锁(在这样的情况下必须使用(notifyAll))。在这样的情况下,你须要检查是否已经由正确的原因唤醒,假设不是,就再次调用wait()。
使用notify()时。在众多等待同一个锁的任务中仅仅有一个会被唤醒,因此假设你希望使用notify()就必须保证被唤醒的是恰当的任务。
另外,为了使用notify()。全部任务必须等待同样的条件,由于假设你有多个任务在等待不同的条件。那么你就不会知道是否唤醒的恰当的任务。假设使用notify(),当条件发生变化时,必须仅仅有一个任务能从中受益。最后,这些限制对全部可能存在的子类都必须总是起作用的。
假设这些规则中有不论什么一条不满足。那么你就必须使用notifyAll()而不是notify()。
用wait()和notifyAll()实现生产者消费者问题
并且须要注意的是不能使用Lock来限制资源的訪问。由于wait时无法释放该锁。假设还要限制在notifyAll时不能notifyAll到同类。那么实现这个问题还是有难度的。
class Meal { } class WaitPerson implements Runnable { private String name; private Restaurant restaurant; public WaitPerson(String name, Restaurant res) { this.name = name; this.restaurant = res; } @Override public void run() { try { while (!Thread.interrupted()) { synchronized (restaurant.waitPersons) { while (restaurant.meals.size() < 1) { restaurant.waitPersons.wait(); } } synchronized (restaurant.chefs) { if (restaurant.meals.size() >= 1) { restaurant.meals.poll(); restaurant.chefs.notifyAll(); System.out.println(name + " consumed a meal !"); } } } } catch (InterruptedException e) { System.out.println(name + " is ended via InterruptedException !"); return; } System.out.println(name + " is ended via InterruptedException !"); } } class Chef implements Runnable { private String name; private Restaurant restaurant; public Chef(String name, Restaurant res) { this.name = name; this.restaurant = res; } @Override public void run() { try { while (!Thread.interrupted()) { synchronized (restaurant.chefs) { while (restaurant.meals.size() > 10) { restaurant.chefs.wait(); } } synchronized (restaurant.waitPersons) { if (restaurant.meals.size() <= 10) { restaurant.meals.add(new Meal()); restaurant.waitPersons.notifyAll(); System.out.println(name + " produced a meal !"); } } } } catch (InterruptedException e) { System.out.println(name + " is ended via InterruptedException !"); return; } System.out.println(name + " is ended via InterruptedException !"); } } public class Restaurant { public Queue<Meal> meals = new ConcurrentLinkedQueue<Meal>(); public List<WaitPerson> waitPersons = new ArrayList<WaitPerson>(); public List<Chef> chefs = new ArrayList<Chef>(); public static void main(String[] args) throws InterruptedException { Restaurant res = new Restaurant(); ExecutorService exec = Executors.newCachedThreadPool(); Chef chef1 = new Chef("chef1", res); Chef chef2 = new Chef("chef2", res); res.chefs.add(chef1); res.chefs.add(chef2); exec.execute(chef1); exec.execute(chef2); WaitPerson waitPerson1 = new WaitPerson("waitPerson1", res); WaitPerson waitPerson2 = new WaitPerson("waitPerson2", res); res.waitPersons.add(waitPerson1); res.waitPersons.add(waitPerson2); exec.execute(waitPerson1); exec.execute(waitPerson2); // TimeUnit.MILLISECONDS.sleep(3000); // exec.shutdownNow(); } }
只是使用这样的方式实在是太晦涩了。生产者消费者问题的机制须要我们去控制,实际上,java并发类库为我们提供了这样的模型的实现,我们待会会用堵塞队列来重写这个问题。
使用显式的Lock和Condition对象
class Car { private boolean waxOn = false; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void waxed() { lock.lock(); try { waxOn = true; condition.signalAll(); } finally { lock.unlock(); } } public void buffed( ) { lock.lock(); try { waxOn = false; condition.signalAll(); } finally { lock.unlock(); } } public void waitForWaxing( ) throws InterruptedException{ lock.lock(); try{ while(waxOn == false) condition.await(); } finally { lock.unlock(); } } public void waitForBuffing( ) throws InterruptedException { lock.lock(); try { while(waxOn == true) condition.await( ); } finally { lock.unlock(); } } }
使用BlockingQueue来解决生产者消费者问题
在java.util.concurrent.BlockingQueue接口中提供了这个队列,这个接口有大量的实现。你通常能够使用LinkedBlockingQueue。它是一个无界队列,还能够使用ArrayBlockingQueue,它具有固定的尺寸,因此你能够在它被堵塞之前,向当中放置有限数量的元素。
class Meal { } class WaitPerson implements Runnable { private String name; private RestaurantBlookingQueue restaurant; public WaitPerson(String name, RestaurantBlookingQueue res) { this.name = name; this.restaurant = res; } @Override public void run() { try { while (!Thread.interrupted()) { restaurant.meals.take(); System.out.println(name + "taked a Meal"); Thread.sleep(100); } } catch (InterruptedException e) { System.out.println(name + " is ended via InterruptedException !"); return; } System.out.println(name + " is ended via InterruptedException !"); } } class Chef implements Runnable { private String name; private RestaurantBlookingQueue restaurant; public Chef(String name, RestaurantBlookingQueue res) { this.name = name; this.restaurant = res; } @Override public void run() { try { while (!Thread.interrupted()) { restaurant.meals.put(new Meal()); System.out.println(this.name + "made a meal"); Thread.sleep(100); } } catch (InterruptedException e) { System.out.println(name + " is ended via InterruptedException !"); return; } System.out.println(name + " is ended via InterruptedException !"); } } public class RestaurantBlookingQueue { public BlockingQueue<Meal> meals = new ArrayBlockingQueue<Meal>(10); public List<WaitPerson> waitPersons = new ArrayList<WaitPerson>(); public List<Chef> chefs = new ArrayList<Chef>(); public static void main(String[] args) throws InterruptedException { RestaurantBlookingQueue res = new RestaurantBlookingQueue(); ExecutorService exec = Executors.newCachedThreadPool(); Chef chef1 = new Chef("chef1", res); Chef chef2 = new Chef("chef2", res); res.chefs.add(chef1); res.chefs.add(chef2); exec.execute(chef1); exec.execute(chef2); WaitPerson waitPerson1 = new WaitPerson("waitPerson1", res); WaitPerson waitPerson2 = new WaitPerson("waitPerson2", res); res.waitPersons.add(waitPerson1); res.waitPersons.add(waitPerson2); exec.execute(waitPerson1); exec.execute(waitPerson2); // TimeUnit.MILLISECONDS.sleep(3000); // exec.shutdownNow(); } }
任务间使用管道进行输入/输出
提供线程功能的类库以"管道"的形式对线程的输入/输出提供了支持。
它们在Java输入/输出类库中的相应物就是PipedWriter类(同意任务向管道写)和PipedReader类(同意不同任务从同一个管道读)。这个模型能够看成是"生产者-消费者"问题的变体。
管道基本是一个堵塞队列,存在于多个引入BlookingQueue之前的Java版本号。
class Sender implements Runnable { private Random rand = new Random(47); private PipedWriter out = new PipedWriter(); public PipedWriter getPipedWriter( ) {return out;} public void run( ) { try { while(true) { for(char c = ‘A‘ ; c <= ‘z‘; c++) { out.write(c); TimeUnit.MILLISECONDS.sleep( rand.nextInt(500)); } } } catch (IOException e) { System.out.println(e + " Sender write exception"); } catch (InterruptedException e) { System.out.println(e + " Sender sleep exception"); } } } class Receiver implements Runnable { private PipedReader in; public Receiver(Sender sender) throws IOException { in = new PipedReader(sender.getPipedWriter()); } public void run( ) { try { while(true) { System.out.print("Read: "+(char)in.read() + ", "); } } catch (IOException e) { System.out.println(e + " Receiver read exception"); } } } public class PipedIO { public static void main(String []args) throws Exception { Sender sender = new Sender( ); Receiver receiver = new Receiver( sender ); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(sender); exec.execute(receiver); TimeUnit.SECONDS.sleep( 4 ); exec.shutdownNow(); } }
死锁
除了逻辑上预防并发。我们还须要处理意外情况,比如获取到资源的线程中途挂掉。我们须要释放资源。在程序中即释放锁。在程序中能够通过try-catch实现。
漫谈并发编程(五):线程之间的协作