首页 > 代码库 > 多线程---线程间的通信

多线程---线程间的通信

1 . wait() 方法使当前执行代码的线程进行等待,将当前线程置入"预执行队列",并且在wait()方法所在处停止执行,直到接到通知或者中断。 在调用wait之前,线程必须获得该对象的对象级别锁,即只能在同步方法或者同步代码块中才能调用wait方法。 调用wait()方法后当前线程自动释放锁。在从wait()返回之前,线程与其他线程竞争重新获得锁。

2 . notify() 用来通知那些可能等待该对象的对象锁的其他线程,如果有多个线程等待,则由线程规划器随机选出其中一个呈wait状态的线程对其发出notify, 并使它获取该对象的对象锁,值得注意的是在执行notify方法后当前线程不会马上释放该对象锁,呈wait状态的线程也不能马上获得该对象锁,要等到执行notify方法的线程将线程执行完,也就是退出synchronized代码块后,当前线程才会释放锁,而呈wait状态的线程才可以获取该对象锁。当第一个获得了该对象锁的线程运行完毕之后它会释放掉该对象锁,此时如果对象没有再次使用notify方法,即便该对象已经空闲,其他wait状态等待的线程由于没有得到该对象的通知,还会阻塞在wait状态,直到这个对象发出一个notify或者notifyAll. notify方法也要在同步方法或者同步代码块中才能调用,线程也必须获得该对象的对象级别锁

3 . 执行完同步代码块就会释放对象的锁;在执行同步代码块的过程中,遇到异常而导致线程终止锁也会被释放;在执行同步代码块的过程中,执行了锁所属对象的wait()方法,这个线程将会释放对象锁,而此线程对象将会进入线程等待池中等待被唤醒。

4.方法join()的作用是等待线程对象被销毁。即使所属线程对象x正常执行run()方法中的任务而使当前线程z无限期的阻塞,等待x销毁后再继续执行z后面的代码。

public class Test extends Thread{

    @Override
    public void run() {
        int number = (int)(Math.random()*1000);
        System.out.println(Thread.currentThread().getName()+" thread will sleep " + number + "ms");
        try {
            Thread.sleep(number);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws InterruptedException {
        Test test = new Test();
        test.setName("test");
        test.start();
        test.join();
        System.out.println("main thread execute after test thread end");
    }
}

运行结果为:
test    thread will sleep 595ms
main thread execute after test thread end

如果不调用join方法输出结果为:
main thread execute after test thread end
test thread will sleep 94ms

 

5 .方法join(long millis)和方法sleep(long millis)都有使线程等待一段时间的作用,但是join方法内部是使用wait方法实现的,所以join方法具有释放锁的特点,而sleep方法不会释放锁

6 . 关键字synchronized和wait(),notify()/notifyAll()方法一起使用可以实现等待/通知机制,类ReentrantLock也可以实现同样的功能,但需要借助Condition对象来完成,Condition对象具有更好的灵活性,使用它可以实现多路通知功能,也就是一个Lock对象里面可以创建多个Condition(即对象监视器)对象实例,线程对象可以注册在指定的Condition中,从而有选择的进行通知,在线程调度上更灵活。而在使用notify()/notifyAll()方法进行通知的时候,被通知的线程却是由JVM随机选择的。

  • 使用synchronized和wait,notify实现等待通知机制
public class WaitNotify1 {

    public void waitMethod() {
        System.out.println("  wait time begin at " + System.currentTimeMillis() / 1000);
        synchronized (this) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("  wait time   end at " + System.currentTimeMillis() / 1000);
    }

    public void notifyMethod() {
        System.out.println("notify time begin at " + System.currentTimeMillis() / 1000);
        synchronized (this) {
            this.notify();
        }

        System.out.println("notify time   end at " + System.currentTimeMillis() / 1000);
    }

    public static void main(String[] args) throws InterruptedException {
        WaitNotify1 wn1 = new WaitNotify1();
        // 使用匿名内部类的方式启动两个线程分别调用wn1对象的waitMethod和notifyMethod
        new Thread(new Runnable() {
            @Override
            public void run() {
                wn1.waitMethod();

            }
        }).start();

        Thread.sleep(1000);

        new Thread(new Runnable() {
            @Override
            public void run() {
                wn1.notifyMethod();

            }
        }).start();

        /**
         * 打印结果为: 
         *   wait time begin at 1493194588 
         * notify time begin at 1493194589
         * notify time   end at 1493194589 
         *   wait time   end at 1493194589
         * 
         */

    }

}

 

  • 使用ReentrantLock和Condition对象实现等待-通知机制
public class WaitNotify2 {

    private Lock lock = new ReentrantLock();
    private Condition con = lock.newCondition();

    public void waitMethod() {

        try {
            lock.lock();// 获得对象锁
            System.out.println("  wait time begin at " + System.currentTimeMillis() / 1000);
            con.await();// 线程进入等待
            System.out.println("  wait time   end at " + System.currentTimeMillis() / 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


    public void notityMethod(){
        try {
            lock.lock();
            System.out.println("notify time begin at " + System.currentTimeMillis() / 1000);
            con.signal();// 唤醒线程
            System.out.println("notify time   end at " + System.currentTimeMillis() / 1000);
        } finally{
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        WaitNotify2 wn2 = new WaitNotify2();
        Thread4 thread = new Thread4(wn2);
        thread.start();
        Thread.sleep(1000);
        wn2.notityMethod();
    }


    /**
     * 打印结果为: 
     *   wait time begin at 1493194430 
     * notify time begin at 1493194431
     * notify time   end at 1493194431 
     *   wait time   end at 1493194431
     * 
     */

}

class Thread4 extends Thread {
    private WaitNotify2 wn2;

    public Thread4(WaitNotify2 wn2) {
        this.wn2 = wn2;
    }

    @Override
    public void run() {
        wn2.waitMethod();
    }

}

 

7 . 生产者-消费者模式:
(1) 使用synchronized与wait,notify实现生产者-消费者模式

public class ProducersAndConsumers1 {

    private List<String> list = new ArrayList<>();

    public synchronized void produce() {
        try {
            while (list.size() == 1) {
                System.out.println(Thread.currentThread().getName() + " begin waiting ");
                this.wait();
                System.out.println(Thread.currentThread().getName() + " end   waiting ");
            }
            list.add("nulo");
            this.notifyAll();
            System.out.println("produced by " + Thread.currentThread().getName() + ",now size = " + list.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public synchronized String consume() {
        String value = "";
        try {
            while (list.size() == 0) {
                System.out.println(Thread.currentThread().getName() + " begin waiting ");
                this.wait();
                System.out.println(Thread.currentThread().getName() + " end   waiting ");
            }
            value = list.get(0);
            list.remove(0);
            this.notifyAll();
            System.out.println(
                    "consumed by " + Thread.currentThread().getName() + ",now size = " + list.size());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return value;
    }

    public static void main(String[] args) throws InterruptedException {
        ProducersAndConsumers1 pool = new ProducersAndConsumers1();

        // 一生产一消费
        new ProducerThread("producer",pool).start();
        new ConsumerThread("consumer",pool).start();

        /**
         * 打印结果如下: 
         * consumer begin waiting 
         * producer end waiting 
         * produced by producer,now size = 1 
         * producer begin waiting 
         * consumer end waiting
         * consumed by consumer,now size = 0 
         * consumer begin waiting 
         * producer end waiting
         * produced by producer,now size = 1
         * producer begin waiting
         * consumer end   waiting 
         * consumed by consumer,now size = 0 
         * consumer begin waiting 
         * 
         */





        // 一生产多消费
        /*new ProducerThread("producer",pool).start();
        new ConsumerThread("consumer1",pool).start();
        new ConsumerThread("consumer2",pool).start();*/

        // 多生产一消费
        /*new ProducerThread("producer1",pool).start();
        new ProducerThread("producer2",pool).start();
        new ConsumerThread("consumer",pool).start();*/

        // 多生产多消费
        /*new ProducerThread("producer1",pool).start();
        new ProducerThread("producer2",pool).start();
        new ConsumerThread("consumer1",pool).start();
        new ConsumerThread("consumer2",pool).start();*/


    }
}

/**
 * 
 * 生产者线程
 * 
 * @Description
 * @author niepei
 * @date 2017年4月26日 下午4:44:17
 * @version V1.3.1
 */
class ProducerThread extends Thread {
    private ProducersAndConsumers1 pool;

    public ProducerThread(String name,ProducersAndConsumers1 pool) {
        super(name);
        this.pool = pool;
    }

    @Override
    public void run() {
        while (true) {
            pool.produce();
        }

    }
}

/**
 * 
 * 消费者线程
 * 
 * @Description
 * @author niepei
 * @date 2017年4月26日 下午4:44:29
 * @version V1.3.1
 */
class ConsumerThread extends Thread {
    private ProducersAndConsumers1 pool;

    public ConsumerThread(String name,ProducersAndConsumers1 pool) {
        super(name);
        this.pool = pool;
    }

    @Override
    public void run() {
        while (true) {
            pool.consume();
        }

    }
}

 

(2) 使用ReentrantLock和Condition对象实现生产者-消费者模式

public class ProducersAndConsumers2 {

    private List<String> list = new ArrayList<>();

    private Lock lock = new ReentrantLock();

    private Condition con = lock.newCondition();

    public void produce(){
        try {
            lock.lock();
            while(list.size()==1){
                System.out.println(Thread.currentThread().getName() + " begin waiting ");
                con.await();
                System.out.println(Thread.currentThread().getName() + "   end waiting ");
            }
            list.add("nulo");
            con.signalAll();
            System.out.println("produced by " + Thread.currentThread().getName() + ",now size = " + list.size());
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }

    public String consume(){
        String value = "";
        try {
            lock.lock();
            while(list.size()==0){
                System.out.println(Thread.currentThread().getName() + " begin waiting ");
                con.await();
                System.out.println(Thread.currentThread().getName() + "   end waiting ");
            }
            list.remove(0);
            con.signalAll();
            System.out.println("consumed by " + Thread.currentThread().getName() + ",now size = " + list.size());
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return value;
    }

    public static void main(String[] args) throws InterruptedException {
        ProducersAndConsumers1 pool = new ProducersAndConsumers1();

        // 一生产一消费
        new ProducerThread("producer",pool).start();
        new ConsumerThread("consumer",pool).start();

        /**
         * 打印结果如下: 
         * consumer begin waiting 
         * producer end waiting 
         * produced by producer,now size = 1 
         * producer begin waiting 
         * consumer end waiting
         * consumed by consumer,now size = 0 
         * consumer begin waiting 
         * producer end waiting
         * produced by producer,now size = 1
         * producer begin waiting
         * consumer end   waiting 
         * consumed by consumer,now size = 0 
         * consumer begin waiting 
         */


    }

}


/**
 * 
 * 生产者线程
 * 
 * @Description
 * @author niepei
 * @date 2017年4月26日 下午4:44:17
 * @version V1.3.1
 */
class ProducerThread extends Thread {
    private ProducersAndConsumers1 pool;

    public ProducerThread(String name,ProducersAndConsumers1 pool) {
        super(name);
        this.pool = pool;
    }

    @Override
    public void run() {
        while (true) {
            pool.produce();
        }

    }
}

/**
 * 
 * 消费者线程
 * 
 * @Description
 * @author niepei
 * @date 2017年4月26日 下午4:44:29
 * @version V1.3.1
 */
class ConsumerThread extends Thread {
    private ProducersAndConsumers1 pool;

    public ConsumerThread(String name,ProducersAndConsumers1 pool) {
        super(name);
        this.pool = pool;
    }

    @Override
    public void run() {
        while (true) {
            pool.consume();
        }

    }
}

 

8 . 类ThreadLocal可以使每个线程绑定自己的值,它解决的是变量在各个线程之间的隔离问题,也就是不同线程拥有自己的值,不同线程中的值是可以放入ThreadLocal中保存的

public class UseThreadLocal {

    public static ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    private ThreadLocal<String> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) throws InterruptedException {
        function1();
        /**
         * 打印结果如下: 
         * THREAD_LOCAL in <main> is null 
         * THREAD_LOCAL in <test> is null 
         * THREAD_LOCAL in <test> is test 
         * THREAD_LOCAL in <main> is main          
         */


        new UseThreadLocal().function2();
        /**
         * 打印结果如下: 
         * threadLocal in <main> is null
         * threadLocal in <test> is null
         * threadLocal in <test> is test
         * threadLocal in <main> is main
         */

    }

    public static void function1() throws InterruptedException {
        System.out.println("THREAD_LOCAL in <" + Thread.currentThread().getName() + "> is " + THREAD_LOCAL.get());
        THREAD_LOCAL.set("main");
        Thread a = new Thread("test") {
            @Override
            public void run() {
                System.out.println(
                        "THREAD_LOCAL in <" + Thread.currentThread().getName() + "> is " + THREAD_LOCAL.get());
                THREAD_LOCAL.set("test");
                System.out.println(
                        "THREAD_LOCAL in <" + Thread.currentThread().getName() + "> is " + THREAD_LOCAL.get());
            }
        };
        a.start();
        a.join();
        System.out.println("THREAD_LOCAL in <" + Thread.currentThread().getName() + "> is " + THREAD_LOCAL.get());
    }

    public void function2() throws InterruptedException {

        System.out.println("threadLocal in <" + Thread.currentThread().getName() + "> is " + threadLocal.get());
        threadLocal.set("main");
        Thread a = new Thread("test") {
            @Override
            public void run() {
                System.out.println("threadLocal in <" + Thread.currentThread().getName() + "> is " + threadLocal.get());
                threadLocal.set("test");
                System.out.println("threadLocal in <" + Thread.currentThread().getName() + "> is " + threadLocal.get());
            }
        };
        a.start();
        a.join();
        System.out.println("threadLocal in <" + Thread.currentThread().getName() + "> is " + threadLocal.get());
    }

}

 

多线程---线程间的通信