首页 > 代码库 > JAVA学习第二十六课(多线程(六))- 多生产者多消费者问题

JAVA学习第二十六课(多线程(六))- 多生产者多消费者问题

多生产者多消费者问题

以生产馒头 消费馒头为例。


class Resource 
{
	private String name;
	private int count = 1;
	private boolean flag = false;

	public synchronized void set(String name)
	{
		if (flag) 
		{
				try {
				this.wait();
			} catch (Exception e) {
				// TODO: handle exception
			}
		}
		this.name = name + count;
		count++;
			System.out.println(Thread.currentThread().getName()+"---生产者----"+this.name);
			flag = true;
			notify();	
	}
	public synchronized void out()
	{
		if (!flag) {
			try {
				this.wait();
			} catch (Exception e) {
				// TODO: handle exception
			}
		}
		System.out.println(Thread.currentThread().getName()+"---------消费者--------"+this.name);
		flag = false;
		notify();
	}
}


class Producer implements Runnable
{
	private Resource r;
	Producer(Resource r)
	{
		this.r = r;
	}
	public void run()
	{
		while(true)
		{
			r.set("馒头");
		}
	}
}
class Consumer implements Runnable
{
	private Resource r;
	Consumer(Resource r)
	{
		this.r = r;
	}
	public void run()
	{
		while(true)
		{
			r.out();
		}
	}
}

public class Main 
{
	public static void main(String[] args)
	{
		Resource r = new Resource();
		Producer pro = new Producer(r);
		Consumer con = new Consumer(r);
		Thread t0 = new Thread(pro);
		Thread t1 = new Thread(pro);
		Thread t2 = new Thread(con);
		Thread t3 = new Thread(con);
		t0.start();
		t1.start();
		t2.start();
		t3.start();
	}
}


上述代码容易出现一个问题,就是同一个馒头被多次消费,或者有的馒头没有被消费。单生者,单消费者不会出现这个问题

原因:notify 唤醒的线程是任意一个,比如t0 t1都是等待状态,t0活了之后,t1也可能被唤醒,所以就会产生多个馒头而没有被消费的情况,针对这个问题,flag的判断可以改为循环,这样却容易造成了死锁情况:t0 t1被wait了,t2正常消费一次,flag = false,唤醒了t0,因为循环t2 t3被wait了,t0执行一次,flag = true,假设唤醒t1,判断t0进行wait,while,t1也进入了等待,死锁状态

PS:冻结状态的线程被唤醒后本次就不参与判断,向下执行,所以简单来说,多个馒头没有被消费问题的产生,是因为冻结状态的线程不再继续参与判断造成的,而死锁是因为循环判断造成的


多生产多消费问题解决:

notify,改为notifyAll,当前本类线程中的一个正常执行后,唤醒所有线程,当前同类线程因为while,自然会继续等待,对方任意一个线程执行一次,对方剩余线程继续等待,这样就实现了生成对应消费

class Resource 
{
	private String name;
	private int count = 1;
	private boolean flag = false;

	public synchronized void set(String name)
	{
		while (flag) 
		{
				try {
				this.wait();
			} catch (Exception e) {
				// TODO: handle exception
			}
		}
		this.name = name + count;
		count++;
			System.out.println(Thread.currentThread().getName()+"---生产者----"+this.name);
			flag = true;
			notifyAll();	
	}
	public synchronized void out()
	{
		while (!flag) {
			try {
				this.wait();
			} catch (Exception e) {
				// TODO: handle exception
			}
		}
		System.out.println(Thread.currentThread().getName()+"---------消费者--------"+this.name);
		flag = false;
		notifyAll();
	}
}

总结:

if判断,只会判断一次,容易造成不该唤醒的线程被唤醒了,出现问题

while判断,虽然解决了线程获取执行权后,是否要运行

notify,只能唤醒任意一个线程,但是唤醒本方线程,没有意义,且while+notify = 死锁

notifyAll,解决了本方线程一定会唤醒对方线程

所以,多生产多消费问题 = while + notifyAll,但是也造成了效率低的问题(本方不该唤醒,也被唤醒了)


新特性(JDK1.5升级)

如何不唤醒本方,只唤醒对方呢

void Fu()//前期
{
	synchronized (obj) //关于锁的操作是隐式的,只有锁自己知道
	{
		code....
	}
}
//后期升级,将锁这一事物封装成了对象 Lock L = new ReentrantLock();,将操作锁的隐式方式定义到了对象中void Fu()
{
	L.lock();//获取锁
	code..
	L.unlock();//释放锁
}

接口Lock

Lock 实现提供了比使用 synchronized 方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的 Condition 对象。

ConditionObject 监视器方法(waitnotifynotifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

Lock l = ...; 
     l.lock();
     try {
         // access the resource protected by this lock
     } finally {
         l.unlock();
     }


Lock接口:替代了同步代码块或同步函数,将同步锁的隐式操作变成现实操作,更为灵活,可以一个锁加多个监视器

方法:

L.lock();//获取锁

L.unlock();//释放锁,一般与finally代码块连用

Condition接口:实现了Object中wait(),notify(),notifyAll()方法,将这些监视器方法进行了单独的封装,变成Condition监视器对象,可以任意锁进行组合

await()  相当于 wait

signal() 相当于 notify

signalAll(); 相当于 notifyAll

import java.util.concurrent.locks.*;

class Resource 
{
	private String name;
	private int count = 1;
	private boolean flag = false;
	//创建一个锁
	Lock L = new ReentrantLock();
	//通过已有的锁,获取该锁监视器对象(Condition)
	//Condition con = L.newCondition();
	//通过已有的锁,获取两组监视器,一个监视生产者,一个监视消费者
	Condition pro_con = L.newCondition();
	Condition consu_con = L.newCondition();
	
	public  void set(String name)
	{
		L.lock();
		try {
					while (flag) 
					{
						try {
							pro_con.await();//线程冻结
						}catch (Exception e) {
							// TODO: handle exception
						}
					}
					this.name = name + count;
					count++;
				System.out.println(Thread.currentThread().getName()+"---生产者5.0----"+this.name);
					flag = true;
					//con.signalAll();//唤醒所有线程
					consu_con.signal();//唤醒消费者线程,不用All
		} catch (Exception e) {
			// TODO: handle exception
		}
		finally
		{
			L.unlock();//释放锁
		}
		
	}
	public void out()
	{
		L.lock();
		try {
						while (!flag) {
					try {
						consu_con.await();
					} catch (Exception e) {
						// TODO: handle exception
					}
				}
		System.out.println(Thread.currentThread().getName()+"---------消费者5.0--------"+this.name);
				flag = false;
				//con.signalAll();
				pro_con.signal();
		} catch (Exception e) {
			// TODO: handle exception
		}
		finally
		{
			L.unlock();
		}
	
	}
}


class Producer implements Runnable
{
	private Resource r;
	Producer(Resource r)
	{
		this.r = r;
	}
	public void run()
	{
		while(true)
		{
			r.set("馒头");
		}
	}
}
class Consumer implements Runnable
{
	private Resource r;
	Consumer(Resource r)
	{
		this.r = r;
	}
	public void run()
	{
		while(true)
		{
			r.out();
		}
	}
}

public class Main 
{
	public static void main(String[] args)
	{
		Resource r = new Resource();
		Producer pro = new Producer(r);
		Consumer con = new Consumer(r);
		Thread t0 = new Thread(pro);
		Thread t1 = new Thread(pro);
		Thread t2 = new Thread(con);
		Thread t3 = new Thread(con);
		t0.start();
		t1.start();
		t2.start();
		t3.start();
	}
}

实际开发是这样的代码

class BoundedBuffer {//缓冲区
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];//创建一个大的容器
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length) 
         notFull.await();
       items[putptr] = x; 
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0) 
         notEmpty.await();
       Object x = items[takeptr]; 
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }


JAVA学习第二十六课(多线程(六))- 多生产者多消费者问题