  • 阻塞式集合(Blocking Collection):这类集合包括添加和移除数据的方法。当集合已满或为空时,被调用的添加或者移除方法就不能立即被执行,那么调用这个方法的线程将被阻塞,一直到该方法可以被成功执行。
  • 非阻塞式集合(Non-Blocking Collection):这类集合也包括添加和移除数据的方法。如果方法不能立即执行,则返回null或抛出异常,但是调用这个方法的线程不会被阻塞。


  • 非阻塞式列表对应的实现类:ConcurrentLinkedDeque类;
  • 阻塞式列表对应的实现类:LinkedBlockingDeque类;
  • 用于数据生成或消费的阻塞式列表对应的实现类:LinkedTransferQueue类;
  • 按优先级排序列表元素的阻塞式列表对应的实现类:PriorityBlockingQueue类;
  • 带有延迟列表元素的阻塞式列表对应的实现类:DelayQueue类;
  • 非阻塞式可遍历映射对应的实现类:ConcurrentSkipListMap类;
  • 随机数字对应的实现类:ThreadLocalRandom类;
  • 原子变量对应的实现类:AtomicLong和AtomicIntegerArray类。

1. 使用非阻塞式线程安全列表




  • 添加大量的数据到一个列表中;
  • 从同一个列表中移除大量的数据。

1. 创建一个名为AddTask的类,实现Runnable接口。

import java.util.concurrent.ConcurrentLinkedDeque;public class AddTask implements Runnable {    private ConcurrentLinkedDeque<String> list;        public AddTask(ConcurrentLinkedDeque<String> list){        this.list = list;    }        @Override    public void run() {        String name = Thread.currentThread().getName();        for(int i=0;i<10000;i++){            list.add(name+": Element "+i);        }    }}

2. 创建名为PoolTask的类,并实现Runnable接口。

import java.util.concurrent.ConcurrentLinkedDeque;public class PollTask implements Runnable {    private ConcurrentLinkedDeque<String> list;        public PollTask(ConcurrentLinkedDeque<String> list){        this.list = list;    }        @Override    public void run() {        for(int i=0;i<5000;i++){            list.pollFirst();            list.pollLast();        }    }}

3. 实现范例的主类Main,并添加main()方法。

import java.util.concurrent.ConcurrentLinkedDeque;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.TimeUnit;public class Main {    public static void main(String[] args) {            ConcurrentLinkedDeque<String> list = new ConcurrentLinkedDeque<>();        //创建线程数组threads,它包含100个线程        Thread[] threads = new Thread[100];        for(int i=0;i<threads.length;i++){            AddTask task = new AddTask(list);            threads[i] = new Thread(task);            threads[i].start();        }        System.out.printf("Main: %d AddTask threads have been launched\n", threads.length);        //使用join()方法等待线程完成        try {            for(int i=0;i<threads.length;i++){                threads[i].join();            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        System.out.printf("Main: Size of the List: %d\n", list.size());        //创建100个PollTask对象及其对应的线程        for(int i=0;i<threads.length;i++){            PollTask task = new PollTask(list);            threads[i] = new Thread(task);            threads[i].start();        }        System.out.printf("Main: %d PollTask threads have been launched\n", threads.length);        //使用join()方法等待线程完成        try {            for(int i=0;i<threads.length;i++){                threads[i].join();            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        System.out.printf("Main: Size of the List: %d\n", list.size());    }}

4. 程序运行结果如下

Main: 100 AddTask threads have been launchedMain: Size of the List: 1000000Main: 100 PollTask threads have been launchedMain: Size of the List: 0


2. 使用阻塞式线程安全列表




  • 添加数据到一个列表中;
  • 从一个列表中移除数据。

1. 创建名为Client的类,并实现Runnable接口。

import java.util.Date;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.TimeUnit;public class Client implements Runnable {    private LinkedBlockingDeque<String> requestList;        public Client(LinkedBlockingDeque<String> requestList){        this.requestList = requestList;    }        @Override    public void run() {        try {            for(int i=0;i<3;i++){                for(int j=0;j<5;j++){                    StringBuilder sb = new StringBuilder();                    sb.append(i);                    sb.append(":");                    sb.append(j);                    requestList.put(sb.toString());                    System.out.printf("Client: %s at %s.\n", sb.toString(), new Date());                }                TimeUnit.SECONDS.sleep(2);            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        System.out.println("Client: End\n");    }}

2. 创建范例的主类Main,并添加main()方法。

import java.util.Date;import java.util.concurrent.LinkedBlockingDeque;public class Main {    public static void main(String[] args) {        //指定固定容量        LinkedBlockingDeque<String> list = new LinkedBlockingDeque<String>(3);        Client client = new Client(list);        Thread thread = new Thread(client);        thread.start();        try {            for(int i=0;i<5;i++){                for(int j=0;j<3;j++){                    String request = list.take();                    System.out.printf("Main: Request: %s at %s. Size: %d\n", request, new Date(), list.size());                }                Thread.sleep(300);            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        System.out.println("Main: End of the program.");    }}

3. 程序运行结果如下

3. 使用按优先级排序的阻塞式线程安全列表






1. 创建名为Event的类并实现Comparable接口,指定Comparable接口的泛型参数是Event类。

public class Event implements Comparable<Event>{    private int thread; //存放创建了Event的线程    private int priority;        public Event(int thread, int priority){        this.thread = thread;        this.priority = priority;    }        @Override    public int compareTo(Event o) {        if(this.priority>o.priority)            return -1;        if(this.priority<o.priority)            return 1;        return 0;    }    public int getThread() {        return thread;    }    public int getPriority() {        return priority;    }        }

2. 创建一个名为Task的类,实现Runnable接口。

import java.util.concurrent.PriorityBlockingQueue;public class Task implements Runnable {    private int id;    private PriorityBlockingQueue<Event> queue;        public Task(int id, PriorityBlockingQueue<Event> queue){        this.id = id;        this.queue = queue;    }        @Override    public void run() {        for(int i=0;i<1000;i++){            Event event = new Event(id, i);            queue.add(event);        }            }        }

3. 创建范例的主类Main,并实现main()方法。

import java.util.concurrent.PriorityBlockingQueue;public class Main {    public static void main(String[] args) {        PriorityBlockingQueue<Event> queue = new PriorityBlockingQueue<>();        Thread taskThreads[] = new Thread[5];        for(int i=0;i<taskThreads.length;i++){            Task task = new Task(i, queue);            taskThreads[i] = new Thread(task);        }        //启动线程        for(int i=0;i<taskThreads.length;i++){            taskThreads[i].start();        }        //等待线程执行结束        try {            for(int i=0;i<taskThreads.length;i++){                taskThreads[i].join();            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        System.out.printf("Main: Queue Size: %d\n", queue.size());        for(int i=0;i<taskThreads.length*1000;i++){            Event event = queue.poll();            System.out.printf("Thread %s: Priority %d\n", event.getThread(), event.getPriority());        }        System.out.printf("Main: Queue Size: %d\n", queue.size());        System.out.println("Main: End of the program.");    }}

4. 程序运行结果如下

 4. 使用带有延迟元素的线程安全列表

  Java API提供了一种用于并发应用的有趣的数据结构,即DelayQueue类。这个类可以存放带有激活日期的元素。当调用方法从队列中返回或提取元素时,未来的元素日期将被忽略。这些元素对于这些方法是不可见的。


  • compareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法。如果当前对象的延迟值小于参数对象的值,将返回一个小于0的值;如果当前对象的延迟值大于参数对象的延迟值,将返回一个大于0的值;如果两者的延迟值相等则返回0。
  • getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,单位由单位参数指定。


1. 创建名为Event的类并实现Delayed接口。

import java.util.Date;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;public class Event implements Delayed{    private Date startDate;        public Event(Date startDate){        this.startDate = startDate;    }        @Override    public int compareTo(Delayed o) {        long result = this.getDelay(TimeUnit.NANOSECONDS)-o.getDelay(TimeUnit.NANOSECONDS);        if(result<0)            return -1;        if(result>0)            return 1;        return 0;    }    @Override    public long getDelay(TimeUnit unit) {        Date now = new Date();        long diff = startDate.getTime()-now.getTime();        return unit.convert(diff, TimeUnit.MILLISECONDS);    }        }

2. 创建名为Task的类,并实现Runnable接口。

import java.util.Date;import java.util.concurrent.DelayQueue;public class Task implements Runnable {    private int id;    private DelayQueue<Event> queue;        public Task(int id, DelayQueue<Event> queue){        this.id = id;        this.queue = queue;    }        @Override    public void run() {        Date now = new Date();        Date delay = new Date();        delay.setTime(now.getTime()+id*1000);        System.out.printf("Thread %s: %s\n", id, delay);        for(int i=0;i<100;i++){            Event event = new Event(delay);            queue.add(event);        }    }        }

3. 创建范例的主类Main,并添加main()方法。

import java.util.Date;import java.util.concurrent.DelayQueue;import java.util.concurrent.TimeUnit;public class Main {    public static void main(String[] args) {        DelayQueue<Event> queue = new DelayQueue<>();        Thread threads[] = new Thread[5];        for(int i=0;i<threads.length;i++){            Task task = new Task(i+1, queue);            threads[i] = new Thread(task);        }        for(int i=0;i<threads.length;i++){            threads[i].start();        }        try {            for(int i=0;i<threads.length;i++){                threads[i].join();            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        try {            while(queue.size()>0){                int counter = 0;                Event event;                do{                    event = queue.poll();                    if(event!=null)                        counter++;                }while(event!=null);                System.out.printf("At %s you have read %d events\n", new Date(),counter);                Thread.sleep(500);            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        System.out.println("Main: End.");    }}

4. 程序运行结果如下

5. 使用线程安全可遍历映射

  Java API提供了一种用于并发应用的有趣的数据结构,即ConcurrentNavigableMap接口及其实现类。实现这个接口的类以如下两个部分存放元素:

  • 一个键值(Key),它是元素的标识并且是唯一的。
  • 元素其它部分数据。


  Java API也提供了一个实现ConcurrentSkipListMap接口的类,ConcurrentSkipListMap接口实现了与ConcurrentNavigableMap接口有相同行为的一个非阻塞式列表。从内部实现机制来讲,它使用了一个Skip List来存放数据。Skip List是基于并发列表的数据结构,效率与二叉树相近。有了它,就有了一个数据结构,比如有序列表在添加、搜索或删除元素时耗费更少的访问时间。

  备注:Skip List由William Pugh在1990年引入,详见http://www.cs.umd.edu/~pugh/。



1. 创建名为Contact的类。

public class Contact {    private String name;    private String phone;        public Contact(String name, String phone){        this.name = name;        this.phone = phone;    }    public String getName() {        return name;    }    public String getPhone() {        return phone;    }        }

2. 创建名为Task的类,并实现Runnable接口。

import java.util.concurrent.ConcurrentSkipListMap;public class Task implements Runnable {    private String id;    private ConcurrentSkipListMap<String, Contact> map;        public Task(String id, ConcurrentSkipListMap<String, Contact> map){        this.id = id;        this.map = map;    }        @Override    public void run() {        for(int i=0;i<1000;i++){            Contact contact = new Contact(id, String.valueOf(i+1000));            map.put(id+contact.getPhone(), contact);        }    }        }

3. 实现范例的主类Main,并添加main()方法。

import java.util.Map;import java.util.concurrent.ConcurrentNavigableMap;import java.util.concurrent.ConcurrentSkipListMap;public class Main {    public static void main(String[] args) {        ConcurrentSkipListMap<String, Contact> map = new ConcurrentSkipListMap<>();        Thread threads[] = new Thread[26];        int counter = 0;        for(char i=‘A‘;i<=‘Z‘;i++){            Task task = new Task(String.valueOf(i), map);            threads[counter] = new Thread(task);            threads[counter].start();            counter++;        }        try {            for(int i=0;i<26;i++){                threads[i].join();            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        System.out.printf("Main: Size of the map: %d\n", map.size());        Map.Entry<String, Contact> element;        Contact contact;        //输出第一个实体        element = map.firstEntry();        contact = element.getValue();        System.out.printf("Main: First Entry: %s: %s\n", contact.getName(), contact.getPhone());        //输出最后一个实体        element = map.lastEntry();        contact = element.getValue();        System.out.printf("Main: Last Entry: %s: %s\n", contact.getName(), contact.getPhone());        //使用subMap()取得map的一个子映射,并输出到控制台。        System.out.printf("Main: Submap from A1996 to B1002: \n");        ConcurrentNavigableMap<String, Contact>  summap = map.subMap("A1996", "B1002");        do{            element = summap.pollFirstEntry();            if(element!=null){                contact = element.getValue();                System.out.printf("%s: %s\n", contact.getName(), contact.getPhone());            }        }while(element!=null);    }}

4. 程序运行结果如下

Main: Size of the map: 26000Main: First Entry: A: 1000Main: Last Entry: Z: 1999Main: Submap from A1996 to B1002: A: 1996A: 1997A: 1998A: 1999B: 1000B: 1001

6. 生成并发随机数

  Java并发API提供了一个特殊的类用以在并发程序中生成伪随机数(Pseudo-Random Number),即Java 7新引入的ThreadLocalRandom类。它是线程本地变量。每个生成随机数的线程都有一个不同的生成器,但是都在同一类中被管理,对程序员来说是透明的。相比于使用共享的Random对象为所有线程生成随机数,这种机制具有更好的性能。


1. 创建名为TaskLocalRandom的类并实现Runnable接口。

import java.util.concurrent.ThreadLocalRandom;public class TaskLocalRandom implements Runnable {        //实现类构造器,使用current()方法为当前线程初始化随机数生成器    public TaskLocalRandom(){        ThreadLocalRandom.current();    }        @Override    public void run() {        String name = Thread.currentThread().getName();        for(int i=0;i<10;i++){            System.out.printf("%s: %d\n", name, ThreadLocalRandom.current().nextInt(10));        }            }}

2. 创建本范例的主类Main,并实现main()方法。

public class Main {    public static void main(String[] args) {        Thread threads[] = new Thread[3];        for(int i=0;i<3;i++){            TaskLocalRandom task = new TaskLocalRandom();            threads[i] = new Thread(task);            threads[i].start();        }    }}

3. 程序运行结果如下

 7. 使用原子变量

  原子变量(Atomic Variable)是从Java 5开始引入的,它提供了单个变量上的原子操作。在编译程序时,Java代码中的每个变量、每个操作都将被转换成机器可以理解的指令。例如,当给一个变量赋值时,在Java代码中只使用一个指令,但是编译这个程序时,指令被转换成JVM语言中的不同指令。当多个线程共享同一个变量时,就会发生数据不一致的错误。

  为了避免这类错误,Java引入了原子变量。当一个线程在对原子变量操作时,如果其他线程也试图对同一原子变量执行操作,原子变量的实现类提供了一套机制来检查操作是否在一步内完成。一般来说,这个操作先获取变量值,然后在本地改变变量的值,然后试图用这个改变的值去替换之前的值。如果之前的值没有被其他线程改变,就可以执行这个替换操作。否则,方法将再执行这个操作。这种操作称之为CAS原子操作(Compare and Set)。



1. 创建名为Account的类来模拟银行账户。

import java.util.concurrent.atomic.AtomicLong;public class Account {    //存放账户余额    private AtomicLong balance;        public Account(){        balance = new AtomicLong();    }    public long getBalance() {        return balance.get();    }    public void setBalance(long balance) {        this.balance.set(balance);    }    //增加余额    public void addAccount(long amount){        this.balance.getAndAdd(amount);    }    //减少余额    public void substractAmount(long amount){        this.balance.getAndAdd(-amount);    }}

2. 创建一个名为Company的类并实现Runnable接口。这个类模拟公司的付款。

public class Company implements Runnable {    private Account account;        public Company(Account account){        this.account = account;    }        @Override    public void run() {        for(int i=0;i<10;i++){            account.addAccount(1000);        }    }}

3. 创建名为Bank的类并实现Runnable接口。这个类模拟从账户中取钱。

public class Bank implements Runnable {    private Account account;        public Bank(Account account){        this.account = account;    }        @Override    public void run() {        for(int i=0;i<10;i++){            account.substractAmount(1000);        }    }}

4. 创建名为Main的主类,并实现main()方法。

public class Main {    public static void main(String[] args) {        Account account = new Account();        account.setBalance(1000);        Company company = new Company(account);        Thread companyThread = new Thread(company);        Bank bank = new Bank(account);        Thread bankThread = new Thread(bank);        System.out.printf("Account : Initial Balance: %d\n", account.getBalance());        companyThread.start();        bankThread.start();        //等待所有线程执行结束        try {            companyThread.join();            bankThread.join();            System.out.printf("Account : Final Balance: %d\n", account.getBalance());        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }    }}

5. 程序运行结果如下

Account : Initial Balance: 1000Account : Final Balance: 1000


8. 使用原子数组


  • 死锁:一个线程被阻塞,并且试图获得的锁正被其他线程使用,但其他线程永远不会释放这个锁。这种情况使得应用不会继续执行,并且永远不会结束。
  • 即使只有一个线程访问共享对象,它仍然需要执行必须的代码来获取和释放锁。

  针对这种情况,为了提供更优的性能,Java于是引入了比较和交换操作(Compare-and-Swap Operation)。这个操作使用以下三步修改变量的值。

  1. 取得变量值,即变量的旧值。
  2. 在一个临时变量中修改变量值,即变量的新值。
  3. 如果上面获得的变量旧值与当前变量值相等,就用新值替换旧值。如果已有其他线程修改了这个变量的值,上面获得的变量的旧值就可能与当前变量值不同。



  Java也引入了原子数组(Atomic Array)提供对integer或long数字数组的原子操作。本节将学习如何使用AtomicIntegerArray类的原子数组。

1. 创建名为Incrementer的类实现Runnable接口。

import java.util.concurrent.atomic.AtomicIntegerArray;public class Incrementer implements Runnable {    private AtomicIntegerArray vector;        public Incrementer(AtomicIntegerArray vector){        this.vector = vector;    }        @Override    public void run() {        for(int i=0;i<vector.length();i++){            vector.getAndIncrement(i);        }    }}

2. 创建名为Decrementer的类并实现Runnable接口。

import java.util.concurrent.atomic.AtomicIntegerArray;public class Decrementer implements Runnable {    private AtomicIntegerArray vector;        public Decrementer(AtomicIntegerArray vector){        this.vector = vector;    }        @Override    public void run() {        for(int i=0;i<vector.length();i++){            this.vector.getAndDecrement(i);        }    }}

3. 创建范例的主类Main,并实现main()方法。

import java.util.concurrent.atomic.AtomicIntegerArray;public class Main {    public static void main(String[] args) {        //创建有1000个元素的原子数组        final int THREADS = 1000;        AtomicIntegerArray vector = new AtomicIntegerArray(THREADS);        Incrementer incrementer = new Incrementer(vector);        Decrementer decrementer = new Decrementer(vector);        Thread threadIncrementer[] = new Thread[THREADS];        Thread threadDecrementer[] = new Thread[THREADS];        for(int i=0;i<THREADS;i++){            threadIncrementer[i] = new Thread(incrementer);            threadDecrementer[i] = new Thread(decrementer);            threadIncrementer[i].start();            threadDecrementer[i].start();        }        try {            for(int i=0;i<THREADS;i++){                threadIncrementer[i].join();                threadDecrementer[i].join();            }        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }        for(int i=0;i<vector.length();i++){            if(vector.get(i)!=0)                System.out.printf("Vector[%d] : %d\n", i, vector.get(i));        }        System.out.println("Main: End of the example");    }}

4. 程序运行结果如下

Main: End of the example



  • get(int i):返回数组中由参数指定位置的值。
  • set(int i, int newValue):设置由参数指定位置的新值。
