首页 > 代码库 > 从生产者消费者窥探线程同步(上)

从生产者消费者窥探线程同步(上)

欢迎转载,转载请注明出处。尊重他人的一丢丢努力,谢谢啦!
阅读本篇之后,如果你觉得说得还有点道理,那不妨先戳一下从生产者消费者窥探线程同步(下) ,两篇一起嚼才更好呢。

最近复习了下生产者消费者模式,虽然对它不太陌生,但要说认认真真地实现,还真从来没有过,这里将它总结一下,有不妥或者见识不到之处,欢迎留言指出。

为什么要使用

大概基于以下2点:
(1)可以实现解耦
大多数设计模式,都会创造出一个第三者来担任解耦角色。比如末班模式的模板类,工厂模式的工厂类等。而消费者观察者模式则是使用拥塞队列来给两者解耦的。解耦之后,生产者和消费者就是两个相对独立的个体,他们之间不再进行直接的交互,而是通过拥塞队列来中转完成。
(2)线程安全
既然提到了拥塞队列,肯定就少不了并发问题,就少不了线程安全。更具体一点来说,整个安全控制要做到以下6点:

1 同一时间只有一个生产者生产;
2 同一时间只有一个消费者消费;
3 生产的同时不能消费;
4 消费的同时不能生产;
5 拥塞队列为空,不能消费;
6 拥塞队列为满,不能生产;

总之,就是一个时间点,只能进行一种活动。

实现方法

从它的特点来看,要想通过不同的实现方式,必然要在线程安全这一块花点心思。代码层面的线程同步,主要有三种实现方式:阻塞,非阻塞和一些不需要同步方案的代码(本身就是安全的)。而在这三种方式中,使用最多的恐怕要数阻塞方式,也就是互斥同步,网上一些博文对这个概念似乎有偏颇之嫌,这里先明确一下两个概念:互斥和同步。

所谓的互斥,就是互斥同步(下文简称互斥),它是实现同步的一种阻塞方案,互斥是方法,同步是目的。它们两个并不是并列关系,而应该算是一种因果关系。

互斥的实现方式包括临界区Critical Section,互斥量Metux,信号量Semaphore,当然还有伟大的synchronized、以及Java 5以后提供的Lock等等。

网上大多数实现都是synchronized、Lock、BlockingQueue这三总方式,毋庸置疑,这三种方式确实用的比较多。值得指出的是,通过Semaphore和Metux的PV操作,同样可以达到目的。

实现

前面已经说过,设计模式大多数都是奔着解耦去的,能使一团糟糕的代码变得条理清晰。在上代码不妨先来看一下程序结构:

技术分享
主要四个文件构成,各自的作用如名字所示:
(1)首先得有产品吧,不然生产毛线,对应Product;
(2)有了产品,自然就有生产者和消费者,对应Producer和Consumer,实质是两个线程;
(3)有了生产者消费者,如何实现他们之间的交互,也就是怎么解决何时生产何时该消费呢,这就用到了前面说过的拥塞队列,对应StorageQueue。

惯例,到了能上代码就不说话环节。

//定义产品
public class Product {
    long id;
    String name;

    public Product(long id, String name) {
        super();
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "产品 详情:[id= " + id + " , name= " + name + "]";
    }
}

//消费者
public class Consumer extends Thread {
    int num;

    public Consumer(int num) {
        super();
        this.num = num;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        super.run();
        //注意,不同的实现,这里需要更改为对应的仓库
        StorageLock.consume(num);
    }
}

//生产者
public class Producer extends Thread {
    int num;
    public Producer(int num) {
        // TODO Auto-generated constructor stub
        this.num = num;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        super.run();
        //注意,不同的实现,这里需要更改为对应的仓库
        StorageLock.produce(num);
    }
}

(1)首先用最简单的BlockQueue实现

核心:拥塞控制完全交给BlockQueue来实现,这个队列内部用到了可重入锁的await()和singal()方法,队列满时,再存放就阻塞;队列空时,再取就阻塞。

值得注意,BlockQueue有两套(实际上是三套)存取的方法,分别是put()和take()、offer()和poll()。它们对着应不同的处理策略,说白了就是当队列满时,调用put()方法会阻塞,一直等到队列有空闲然后将元素放进去。而后者offer()不会等待,而是直接丢弃,返回false,它看起来更像是add()方法的线程安全版!!!自己动手写的时候,一定要注意。

public class StorageQueue {
    public static Integer MAX = 50;
    public static ArrayBlockingQueue<Product> list = new ArrayBlockingQueue<>(
            MAX);

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService s = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            s.submit(new Producer(2));
            s.submit(new Consumer(1));
        }
    }

    public static void produce(Integer num) {
        if (list.size() == MAX) {
            System.out.println(Thread.currentThread().getName()
                    + " 我是生产,我在等待... ");
        }
        try {
            for (int i = 0; i < num; i++) {
                list.put(new Product(i, ""));
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 库存: "
                + list.size());
    }

    public static void consume(Integer num) {
        if (list.size() == 0) {
            System.out.println(Thread.currentThread().getName()
                    + " 我是消费,我在等待... ");
        }
        try {
            for (int i = 0; i < num; i++) {
                list.take();
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " 库存: "
                + list.size());
    }
}

这是部分输出,考虑到篇幅,就不全部贴上了,后文附有源码下载链接,感兴趣的可自行下载运行:

pool-1-thread-1 库存: 2
pool-1-thread-2 库存: 1
pool-1-thread-4 库存: 0
pool-1-thread-5 库存: 3
pool-1-thread-1 库存: 1
pool-1-thread-2 库存: 2
pool-1-thread-6 库存: 2
...

(2)其次是经典的synchronized实现

核心:没什么好说的,主要是使用对象的notify()和wait()来实现线程间通信。可以用同步方法或者同步代码块,这里采用的是同步代码块。

public class StorageSync {
    public static Integer MAX = 50;
    public static List<Product> list = new ArrayList<>();

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        ExecutorService s = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            s.submit(new Producer(50));
            s.submit(new Consumer(5));
        }
    }

    public static void produce(Integer num) {
        synchronized (list) {
            AtomicInteger m = new AtomicInteger(0);
            while (list.size() + num > MAX) {
                // if (list.size() + num > MAX) {
                try {
                    m.addAndGet(1);
                    System.out.println(Thread.currentThread().getName() + " 阻塞"
                            + " m: " + m);
                    System.out.println("要生产的数量:" + num + "\t库存量:" + list.size()
                            + "\t暂时不能执行生产任务!");
                    list.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // else {
            for (int i = 0; i < num; i++) {
                list.add(new Product(i, ""));
            }
            System.out.println(Thread.currentThread().getName() + " m: " + m);
            System.out.println("已经生产数:" + num + "\t现仓储量为:" + list.size());
            list.notify();
            // }
        }
    }

    public static void consume(Integer num) {
        synchronized (list) {
            // if (num > list.size()) {
            while (num > list.size()) {
                try {
                    System.out
                            .println(Thread.currentThread().getName() + " 阻塞");
                    System.out.println("要消费的数量:" + num + "\t库存量:" + list.size()
                            + "\t暂时不能执行消费任务!");
                    list.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // } else {
            for (int i = 0; i < num; i++) {
                list.remove(0);
            }
            System.out.println(Thread.currentThread().getName());
            System.out.println("已经消费数:" + num + "\t现仓储量为:" + list.size());
            list.notifyAll();
            // }
        }
    }
}

部分运行结果:

pool-1-thread-1 m: 0
已经生产数:50    现仓储量为:50
pool-1-thread-1 阻塞 m: 1
要生产的数量:50   库存量:50  暂时不能执行生产任务!
pool-1-thread-2
已经消费数:5 现仓储量为:45
pool-1-thread-1 阻塞 m: 2
要生产的数量:50   库存量:45  暂时不能执行生产任务!
pool-1-thread-2
已经消费数:5 现仓储量为:40
pool-1-thread-1 阻塞 m: 3
要生产的数量:50   库存量:40  暂时不能执行生产任务!
pool-1-thread-2 阻塞 m: 1
要生产的数量:50   库存量:40  暂时不能执行生产任务!
pool-1-thread-65
已经消费数:5 现仓储量为:35
...

注意代码中的AtomicInteger m是我用来跟踪线程状态的变量,表次该线程阻塞的次数,完全可以删去。

不知道你有没有这样的疑问:代码中使用了这样的循环语句while (list.size() + num > MAX),为什么不用if(list.size() + num > MAX)来判断呢?这里怎么看都应该是个顺序控制,而不应该是个循环呀?再说了,在执行到list.wait();之后,线程不是阻塞了吗?后面的for循环语句自然就不会执行,为什么还要用while()来循环判断,岂不多余?

乍一听,上面的分析确实”蛮有道理”,而且我相信,大多数人第一次写的时候,很容易就想到if上来了。我们不妨先顺着这思路写一下,看看有什么后果。将代码中的if语句屏蔽去掉。

public static void produce(Integer num) {
        synchronized (list) {
            AtomicInteger m = new AtomicInteger(0);
            // while (list.size() + num > MAX) {
            if (list.size() + num > MAX) {
                try {
                    m.addAndGet(1);
                    System.out.println(Thread.currentThread().getName() + " 阻塞"
                            + " m: " + m);
                    System.out.println("要生产的数量:" + num + "\t库存量:" + list.size()
                            + "\t暂时不能执行生产任务!");
                    list.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // else {
            for (int i = 0; i < num; i++) {
                list.add(new Product(i, ""));
            }
            System.out.println(Thread.currentThread().getName() + " m: " + m);
            System.out.println("已经生产数:" + num + "\t现仓储量为:" + list.size());
            list.notify();
            // }
        }
    }

    public static void consume(Integer num) {
        synchronized (list) {
            if (num > list.size()) {
                // while (num > list.size()) {
                try {
                    System.out
                            .println(Thread.currentThread().getName() + " 阻塞");
                    System.out.println("要消费的数量:" + num + "\t库存量:" + list.size()
                            + "\t暂时不能执行消费任务!");
                    list.wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // } else {
            for (int i = 0; i < num; i++) {
                list.remove(0);
            }
            System.out.println(Thread.currentThread().getName());
            System.out.println("已经消费数:" + num + "\t现仓储量为:" + list.size());
            list.notifyAll();
            // }
        }
    }

部分运行结果:

pool-1-thread-1 m: 0
已经生产数:50    现仓储量为:50
pool-1-thread-2
已经消费数:5 现仓储量为:45
pool-1-thread-2 阻塞 m: 1
要生产的数量:50   库存量:45  暂时不能执行生产任务!
pool-1-thread-1 阻塞 m: 1
要生产的数量:50   库存量:45  暂时不能执行生产任务!
pool-1-thread-3 阻塞 m: 1
要生产的数量:50   库存量:45  暂时不能执行生产任务!**
pool-1-thread-4
已经消费数:5 现仓储量为:40
pool-1-thread-3 m: 1
已经生产数:50    现仓储量为:90**
pool-1-thread-1 m: 1
已经生产数:50    现仓储量为:140
pool-1-thread-2 m: 1
已经生产数:50    现仓储量为:190
...

what?丑旦,你的仓库容量都飙到190啦?这还了得!汗…

冷静下来,先分析一下原因。不妨从有代表性的pool-1-thread-3入手,根据输出的结果来看,第一下pool-1-thread-3阻塞的时候,它的输出是正常的,即打印了要生产的数量:50 库存量:45 暂时不能执行生产任务!,然而第二次阻塞的时候,明知道仓库容不下,Mr pool-1-thread-3 先生还是不听话地生产了50个,一下子就爆仓了。

为什会这样呢?让我们暂且回到代码的结构上看一下。

技术分享

根据这个图,再来重现一下上面的情况。

我们的Mr pool-1-thread-3 先生运气不太好,仓库已经满了,不能大展拳脚进行生产,只得乖乖滴执行了list.wait();,放弃手中所持有的同步锁,目前处于阻塞状态(第一次阻塞)。注意虽然Mr pool-1-thread-3 先生已经放弃了同步锁,但他此时仍然停留在方法内部,只是暂时丧失了获得锁的权利,直到一个notify来临。得,那Mr pool-1-thread-3 先生,你先凉快一会儿去吧。

接下来,该我们的Miss pool-1-thread-4 小姐登场。仓库已满,只是消费的时候。Miss pool-1-thread-4 小姐毫不客气地消费了5个资源之后,爽快地抛了一个notifyAll()。

处于阻塞状态的每一双眼睛立刻闪现出欲望的光芒。原来救世主是你,我们的Mr pool-1-thread-3 先生心下大喜,立刻伸出双手。哎,你别说,再搓比的屌丝也有春天,再倒霉的线程也有狗屎运的时候。托总管JVM的福,Mr pool-1-thread-3 先生拿到了这把锁,他立刻从原地(也就是list.wait()这一行)出发,高歌猛进。

不好,一下子就进到了for循环里面,仓库就崩塌了,悲剧就发生了…
更可怕的时,Mr pool-1-thread-3 先生完全没有意识到自己闯下的大祸,在一顿愉快的生产之后,竟然也大手一挥,扮演起救世主的角色,继续抛出notify(),后面的情况可想而知。

反应快的同学可能要说了,为什么不在for循环前面加上一个else呢?
这是另外一个问题了。
不妨先把代码修改一下,看看有什么现象。

技术分享

确实,一切”正常”,并没有爆仓,所有的生产、消费好像都没什么问题。但是细心的话,你会发现我这里打印的”m”的值,要么是0要么是1,永远不会有别的值。而使用while的则不一样。

至于这种想现象,还要回到代码上来看。

同样的,我们Mr pool-1-thread-3 先生拿到了Miss pool-1-thread-4 小姐释放的同步锁,他立刻从原地(也就是list.wait()这一行)出发,高歌猛进。

不好,下面是else分支,我进不去啊,而下面也没有可让我执行的语句了,Mr pool-1-thread-3 先生暗暗叫苦,吐血三升,倒地而亡,当然在蹬腿之前,他还要把自己辛苦拿到的锁释放出来…

这样也没什么问题呃,Mr pool-1-thread-3 先生生产不了,还有下一个Mr pool-1-thread-3 先生呢?

真的是这样吗?

Miss pool-1-thread-4 小姐发出的唤醒,本意是给那些阻塞在生产线,希望继续生产的优质男们,结果优质男们拿到锁之后并不能生产,而是直接挂了。对,就是挂了。而最终执行生产的都是那些一开始并没有被阻塞起来的线程。

这样看起来,有点类似于非阻塞的同步控制但并不是(遗忘的请自行回到篇首复习,说白了,就是一个线程先进行操作,如果没有发生竞争,那就成功了;如果发生竞争,这个线程就不断地重试,直到成功)。事实上,这种做法,选择的是一种抛弃策略,就是一个线程无法生产,那就放弃它,让下一个线程来尝试生产,直到仓库存满为止。造成的后果,就是资源浪费,想生产的不能保证都能生产到,该消费的也不保证都能消费到。

反观,while()则巧妙地化解了这个问题。

依然是,我们Mr pool-1-thread-3 先生拿到了Miss pool-1-thread-4 小姐释放的同步锁,他立刻从原地(也就是list.wait()这一行)出发,高歌猛进。

不好,Mr pool-1-thread-3 先生发现自己依然处在循环之中,要想出去,必须得满足判断条件。于是,他开始计算list.size() + num > MAX是否成立,运气好的话(不成立),跳出循环,开始愉快地生产。运气不好(成立),依然被圈在while()里,只得再次执行了list.wait();,释放同步锁,等待下一个救世主的到来…

所以我们也能看到,使用while()的程序输出的m值是不确定的,而且一个线程对应的m值,会呈现出增长的态势,也说它的状态是唤醒–等待–唤醒–等待…想生产而不得,委屈ing…也从侧面吻合了我们的分析。JDK源码中类似的并发控制,也都是用的while(),所有以后就放心地使用它吧。

如果你对这个地方的写法还有疑问,一定要自己把程序跑起来,对着输出分析一下。

后两种实现,我们放在下一篇讲。

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    从生产者消费者窥探线程同步(上)