首页 > 代码库 > 生产者和消费问题
生产者和消费问题
本文使用java语言借助java并发库去实现生产者和消费者问题。主要设计思路:1.物料池是共享容器;2.生产者只负责生产物料,添加到物料池中;3.消费者从池中获取物料。在这里使用ReenTranLock控制共享容器的同步,使用Conditona做线程间的通知,当物料池满的时候挂起生产者,并且唤醒消费者去消费池中物料,当池中无物料的时候,挂起消费者,唤醒生产者生产物料。
在编码之前我需要先对生产者、消费者、物料池做一个简单的分析:
1.消费者和生产者他们的任务都是单一的,消费者消费物料,生产者生产物料,消费者和生产者对外只需要记住是哪个物料池就行了。
2.共享数据控制同步应该同一个类中完成,这样控制方便,而且简单。
3.发出通知的应该物料池。因为只有它自己知道自己的状态,消费者和生产者才不会关心它。冷暖自知!!
在做了简单的分析之后,清楚了各个对象的功能。接下来就是设计了。涉及到具体的地方无会在代码中注释,就不在这干巴巴的说了。
核心-物料池
package com.autonavi.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /*** * 物料池 * @author 零下三度 * */ public class Pool { private ReentrantLock lock = new ReentrantLock(); private Condition putCondition = lock.newCondition(); private Condition getCondition = lock.newCondition(); private Product[] productPool = new Product[10]; private int size;//池中物料的数量 private int currPutIndex;//当前添加物料的索引 private int currGetIndex;//当前获取物料的索引 private Pool(){ size = 0; currPutIndex = 0; currGetIndex = 0; } public static Pool getPool(){ return new Pool(); } /** * 生产者向物料池添加一个商品 * @param product * @throws InterruptedException */ public void put(Product product) throws InterruptedException{ try{ lock.lock(); //如果物料池满了,则不再允许向物料池中添加物料。 while(size == productPool.length){ System.out.println("物料池已经满了,暂时不能添加产品了,请耐心等待.....当前池中物料为:"+size); putCondition.await(); } productPool[currPutIndex] = product; if(++currPutIndex == productPool.length){ currPutIndex = 0; } ++size; //注意:由于终端是共享资源,放在此处才能看到真正的测试结果过 System.out.println(product.toString()+"已经添加到物料池中,当前池中产品个数:"+size+",currPutIndex="+currPutIndex); //添加了物料,池中有可用的物料,通知消费者可以从池中获取物料 getCondition.signal(); }finally{ lock.unlock(); } } /*** * 消费者从物料池中获取一个商品。 * @return * @throws InterruptedException */ public Product get() throws InterruptedException{ try{ lock.lock(); //如果池中没有物料,则禁止消费者从池中获取物料 while(size == 0){ System.out.println("目前没有物料,暂时无法获取产品,请耐心等待.....当前池中物料数量:"+size); getCondition.await(); } Product p = productPool[currGetIndex]; productPool[currGetIndex] = null; if(++currGetIndex == productPool.length){ currGetIndex = 0; } --size; System.out.println("出库的是:"+p.toString()+"当前池中还有产品个数:"+size+",currGetIndex="+currGetIndex); putCondition.signal(); return p; }finally{ lock.unlock(); } } }
生产者:
package com.autonavi.pc; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /*** * 生产者 * @author 零下三度 * */ public class Producers implements Runnable{ private AtomicLong id = new AtomicLong(0); private Pool pool; private String productName; public Producers(){ } public Producers(Pool pool,String productName,AtomicLong id){ this.id = id; this.pool = pool; this.productName = productName; } public Pool getPool() { return pool; } public void setPool(Pool pool) { this.pool = pool; } public String getProductName() { return productName; } public void setProductName(String productName) { this.productName = productName; } public AtomicLong getId() { return id; } public void setId(AtomicLong id) { this.id = id; } /*** * 生产者的工作就是生产商品,添加到物料吃中 * 至于什么时候停止,什么时候开始,是需要被人去给他消息的。 */ public void run() { Product p; try { while(true){ TimeUnit.MILLISECONDS.sleep(200); p = new Product(""+id.incrementAndGet(),productName); pool.put(p); } }catch (InterruptedException e) { e.printStackTrace(); } } public void start(){ Thread t = new Thread(this); t.start(); } }
消费者:
package com.autonavi.pc; import java.util.concurrent.TimeUnit; /*** * 消费者 * @author 零下三度 * */ public class Consumers implements Runnable{ private Pool pool; public Pool getPool() { return pool; } public void setPool(Pool pool) { this.pool = pool; } /*** * 消费者的工作就是消费物料池中的商品 */ public void run() { try { Product p; while(true){ TimeUnit.MILLISECONDS.sleep(200); p = pool.get(); } } catch (InterruptedException e) { e.printStackTrace(); } } public void start(){ Thread t = new Thread(this); t.start(); } }
物料-产品:
package com.autonavi.pc; /*** * 商品 * @author 零下三度 * */ public class Product { private String id; private String name; public Product(){ } public Product(String id, String name) { this.id = id; this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { String val = "id:"+id+"\n"+"name:"+name+"\n"; return val; } @Override public boolean equals(Object obj) { if(obj != null && obj instanceof Product){ Product p = (Product)obj; if(this.id != null && this.id.equals(p.getId())){ return true; } } return false; } @Override public int hashCode() { return id.hashCode(); } }
测试用例:
package com.autonavi.pc; import java.util.concurrent.atomic.AtomicLong; public class ProductsAndConsumersTest { public static void main(String[] args) throws InterruptedException { Pool pool = Pool.getPool(); Producers p = new Producers(pool,"产品A",new AtomicLong(0)); System.out.println("启动生产者"); p.start(); Consumers c = new Consumers(); c.setPool(pool); System.out.println("启动消费者"); c.start(); } }
生产者和消费问题
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。