首页 > 代码库 > Java并发原语——线程、互斥与同步
Java并发原语——线程、互斥与同步
本文将介绍:
- Java线程基本操作(创建、等待等)
- Java线程同步原语(同步、互斥)
Java线程基本操作
Java的线程API以java.lang.Thread类提供,线程的基本操作被封装为为Thread类的方法,其中常用的方法是:
方法 | 说明 | |
void | start() | 启动线程 |
void | join() | 等待线程结束 |
创建(启动)线程
Java中,创建线程的过程分为两步:
- 创建可执行(Runnable)的线程对象;
- 调用它的start()方法;
- 继承(extends)Thread类,重载run()方法;
- 实现(implements)Runnable接口(实现run()方法);
两种创建线程的对象的代码实例如下:
继承Thread类
继承Thread类创建线程,如下:
class ExtendsThread extends Thread { @Override public void run() { for (int i = 0; i < 100; ++i) { System.out.print("*"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class TestExtendsThread { public static void main(String[] args) { // 1.创建线程对象 Thread backThread = new ExtendsThread(); // 2.启动线程 backThread.start(); for(int i=0; i < 100; ++i) { System.out.print("#"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }该程序打印出的*和#是交替的;这说明backThread的run()和主线程同时在执行!当然,如果一个线程的代码不是多次重复使用,可以将该线程写成“匿名内部类”的形式:
public class TestExtendsThread { public static void main(String[] args) { new Thread() { public void run() { for (int i = 0; i < 100; ++i) { System.out.print("*"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); for (int i = 0; i < 100; ++i) { System.out.print("#"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }
实现Runnable接口
Java中创建线程对象的另一种方法是:实现Runnable接口,再用具体类的实例作为Thread的参数构造线程,代码如下:
class RunnableImpl implements Runnable { @Override public void run() { for(int i=0; i < 100; ++i) { System.out.print("*"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } public class TestImlementsRunnable { public static void main(String[] args) { Runnable callback = new RunnableImpl(); Thread backThread = new Thread(callback); backThread.start(); // 启动线程 for(int i=0; i < 100; ++i) { System.out.print("#"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }类似地,RunnableImpl若是不被复用,也可写成“匿名内部类”的形式:
public class TestImlementsRunnable { public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { for(int i=0; i < 100; ++i) { System.out.print("*"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); for(int i=0; i < 100; ++i) { System.out.print("#"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }
这两种方法都实现了run()方法,而Thread的start()方法会调用传入的Runnable对象的run()方法(或是调用自己的run方法)。 run()在这里的作用就是为新线程提供一个入口,或者说run描述了新线程将来要“干什么”;相当于一些C库的回调函数。
等待线程结束
Thread的join()方法提供了“等待线程结束”的功能,Java的主线程默认会等待其他线程的结束。Thread.join()提供的是:一个线程等待另一个线程的功能;例如,在main方法(主线程)中调用 backThread.join();则主线程将会在调用处等待,直到backThread执行完毕。如下代码是典型的start和join的使用顺序:
// in main() Runnable r = new Runnable() { public void run() { // ... } }; Thread back = new Thread(r); back.start(); back.join();这段代码对应的序列图如下:
start()的作用是启动一个线程(程序执行流),使得调用处的执行流程一分为二;而join()的作用则与start相反,使得两个执行流程“合二为一”,如下图所示:
两个线程和几个方法执行时间的先后关系,执行流程先“一分为二”和“合二为一”。
互斥
Java的互斥语义由synchronized关键字提供,具体有两种:
- synchronized代码块
- synchronized方法
为什么需要互斥?
由于本文的定位为多线程编程入门,所以顺便介绍一下为什么会有互斥问题。
猜测下面的程序的输出:
public class NonAtomic { static int count = 0; public static void main(String[] args) { Thread back = new Thread() { @Override public void run() { for(int i=0; i<10000; ++i) { ++count; } } }; back.start(); for(int i=0; i<10000; ++i) { ++count; } try { back.join(); // wait for back thread finish. } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(count); } }这个程序并不能像想象中的那样输出20000,而总是小了一些。为什么会这样?因为++count;操作并不是“原子性”的,即不是一条指令就能完成的功能。在多数体系结构上,实现内存中的整数“自增”操作至少需要三步:
- 从内存中读数据到寄存器
- 在寄存器内加一
- 写回内存
在这幅图中,A、B两个线程同时对value执行“自增”,预期的value值应该是11,而实际的value值却是10。
由此可见,要保证多线程环境下“自增”操作的正确性,就必须保证以上三个操作“一次性执行”而不被其他线程干扰,这就是所谓的“原子性”。
synchronized代码块
synchronized代码块的形式如下:
synchronized(obj) { // do something. }这段代码保证了花括号内代码的“原子性”,就是说两个线程同时执行这一代码块的时候会表现出“要么都不执行,要么全部执行”的特性,即“互斥执行”。两个使用同一obj的synchronized代码块也同样具有“互斥执行”的特性。
只需将上面的NonAtomic稍作修改:
// static int count = 0; 后加一行: static Object lock = new Object(); // ++count改为: synchronized(lock) { ++count; }就能保证程序的输出为20000。
synchronized方法
synchronized代码块通常是方法内的一部分,如果整个方法体都用synchronized锁定,且对象this,如果整个方法体都需要用synchronized(this)锁定,那么也可以用synchronized关键字修饰这个方法。
就是说,这个方法:
public synchronized void someMethod() { // do something... }等价于:
public void someMethod() { synchronized(this) { // do something... } }
同步
通俗地说,“同步”就是保证两个线程事件的时序(先后)关系,这在多线程环境下非常有用。例如,两个线程A, B正在执行一系列工作Ai, Bi,现在想要使得Ap发生在Bq之前,就需要使用“同步原语”:
支持“同步”操作的调用叫做“同步原语”,在多数《操作系统》教材中,这种原语通常被定义为条件变量(condition variable)。
Java的同步原语为java.lang.Object类的几个方法:
- wait() 等待通知,该调用会阻塞当前线程。
- notify() 发出通知,如果有多个线程阻塞在该obj上,该调用会唤醒一个(阻塞)等待该obj的线程。
- notifyAll()发出通知,如果有多个线程阻塞在该obj上,该调用会唤醒所有(阻塞)等待该obj的线程。
notifyAll()通常用于通知“状态改变”,例如,一个多线程测试程序中,多个后台线程被创建后,全都等待主线程发出“开始测试”的命令,此时主线程可用notifyAll()通知各个测试线程。
例如如下代码,模拟运动员起跑过程:首先,发令员等待个运动员就绪;然后发令员一声枪响,所有运动员起跑;
public class TestStartRunning { static final int NUM_ATHLETES = 10; static int readyCount = 0; static Object ready = new Object(); static Object start = new Object(); public static void main(String[] args) { Thread[] athletes = new Thread[NUM_ATHLETES]; // 创建运动员 for (int i = 0; i < athletes.length; ++i) { final int num = i; athletes[i] = new Thread() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " ready!"); synchronized (ready) { ++readyCount; ready.notify(); // 通知发令员,“I'm ready!” } // 等待发令枪响 try { synchronized (start) { start.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " go!"); } }; } // 运动员上场 for (int i = 0; i < athletes.length; ++i) athletes[i].start(); // 主线程充当裁判员角色 try { synchronized (ready) { // 等待所有运动员就位 while (readyCount < athletes.length) { ready.wait(); } } } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " START!"); synchronized (start) { start.notifyAll(); // 打响发令枪 } } }
信号丢失
wait/notify/notifyAll提供了一种线程间事件通知的方式,但这种通知并不能被有效的“记住”;所以,就存在通知丢失(notify missing)的可能——发出通知的线程先notify,接收通知的线程后wait,此时这个事先发出的通知就会丢失。在POSIX规范上,叫做信号丢失;由于现在的多数操作系统(LINUX,Mac,Unix)都遵循POSIX;所以“信号丢失”这个词使用的更广泛。
如下是一个演示通知丢失的代码:
public class TestNotifyMissing { static Object cond = new Object(); public static void main(String[] args) { new Thread() { public void run() { try { Thread.sleep(1000); System.out.println("[back] wait for notify..."); synchronized (cond) { cond.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("[back] wakeup"); } }.start(); System.out.println("[main] notify"); synchronized (cond) { cond.notify(); } } }这个程序不能正常退出,后台线程因为错过了主线程发出的通知而一直在后台等待,程序也不会输出“[back] wake up”。
通俗地说,wait/notify只是一种口头交流,如果你没有听到,就会错过(而不像邮件、公告板,你收到通知的时间可以比别人发出的时间晚)。
如何避免通知丢失呢?由于notify本身不具备“记忆”,所以可以使用额外的变量作为“公告板”;在notify之前修改这个“公告板”;这样,即便其他线程调用wait的时间晚于notify的时间,也能看到写在“公共板”上的通知。
这同时也解释了另外一个语言设计上的问题:为什么Java的wait和notify端都必须要用synchronized锁定?首先,这不是语法级别的规定,不这么写也能编译通过,只是运行时会抛异常;这是JVM的一种运行时安全检查机制,这种机制是在提醒我们——应该使用额外的变量来防止产生通知丢失。例如刚才的NotifyMissing只需稍作修改就能够正常结束
public class TestNotifyMissingSolution { static boolean notified = false; // +++++ static Object cond = new Object(); public static void main(String[] args) { new Thread() { public void run() { try { Thread.sleep(1000); System.out.println("[back] wait for notify..."); synchronized (cond) { while(!notified) // +++++ cond.wait(); } } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("[back] wakeup"); } }.start(); System.out.println("[main] notify"); synchronized (cond) { notified = true; // +++++ cond.notify(); } System.out.println("[main] notified"); } }
虚假唤醒
在例子TestNotifyMissingSolution中,cond.wait()前添加if(!notified),也能够正常运行;但这种做法与文档中给出的while(...)不同,文档中同时指出了虚假唤醒(Spurious Wakeup)的概念。虚假唤醒在《Programming with POSIX Threads》中的解释是::当一个线程wait在某个条件变量上,这个条件变量上没发生broadcast(相当于notifyAll)或signal(相当于notify)调用,wait也又可能返回。虚假唤醒听起来很奇怪,但是在多核系统上,使条件唤醒完全可预测可能导致多数条件变量操作变慢。"
为了防止虚假唤醒,需要在wait返回后继续检查某个条件是否达成,所有通常wait端的条件写为while而不是if,在Java中通常是:
// 等待线程: synchronized(cond) { while(!done) { cond.wait(); } } // 唤醒线程: doSth(); synchronized(cond) { done = true; cond.notify(); }
总结
在<操作系统>的概念中,提供“互斥语义”的叫互斥器(Mutex),提供同步语义的叫条件变量(Condition Variable)。而在Java中,synchronized关键字和java.lang.Object提供了互斥量(mutex)语义,java.lang.Object的wait/notify/notifyAll则提供了条件变量语义。
另外,多线程环境下对象的回收是十分困难的,Java运行环境的垃圾回收(Garbage Collection,GC)功能减轻了程序员的负担。
参考
Java 1.6 apidocs Thread,http://tool.oschina.net/uploads/apidocs/jdk-zh/java/lang/Thread.html
《Java Concurrency in Practice》(中译本名为《Java并发实践》)
Spurious Wakeup -- Wikipedia,http://en.wikipedia.org/wiki/Spurious_wakeup
多线程编程中条件变量和虚假唤醒(spurious wakeup)的讨论,http://siwind.iteye.com/blog/1469216
Java并发原语——线程、互斥与同步