首页 > 代码库 > 17.并发类容器设计

17.并发类容器设计

并发类容器设计

    1.ConcurrentHashMap:代替散列普通的hashTable,添加了复合操作支持。

  1. private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
  2. for (Map.Entry<String, Object> m : resultMap.entrySet()) {
  3. count += (Long) m.getValue();
  4. }


     2.CopyOnWriteArrayList:代替了voctor,并发的CopyOnWriteArraySet,以及并发的queue。


    3.ConcurrentLikedQueue:(高性能队列)、无阻塞形式、基于链表的无界队列,该队列不允许为空、先进先出原则。

  1. package demo6;
  2. import java.util.concurrent.ConcurrentLinkedQueue;
  3. /**高性能队列、无阻塞
  4. */
  5. public class DemoConcurrentLinkedQueue {
  6. /**
  7. * 说明:list与Queue的区别
  8. * 1.队列先进先出
  9. * 2.list集合
  10. */
  11. /*
  12. 使用take()函数,如果队列中没有数据,则线程wait释放CPU,而poll()则不会等待,直接返回null;同样,空间耗尽时offer()函数不会等待,直接返回false,
  13. 而put()则会wait,因此如果你使用while(true)来获得队列元素,千万别用poll(),CPU会100%的.
  14. */
  15. public static void main(String[] args){
  16. ConcurrentLinkedQueue<String> mq = new ConcurrentLinkedQueue<String>();
  17. mq.add("小明");
  18. mq.add("王老吉");
  19. mq.add("大宝");
  20. mq.add("哈士奇");
  21. System.err.println(mq.poll());//1.取出该元素并删除
  22. System.err.println(mq.size());
  23. System.err.println(mq.peek());//1.取出不删除
  24. System.err.println(mq.size());
  25. }
  26. }
  27. 输出:
  28. 小明3王老吉3

    4.LinkedBlockingQueue:(阻塞形式队列)、连链表构成、采用分离锁、读写分离、无界队列、可并行生产、消费。

    

  1. BlockingQueue<String> mq2 = new LinkedBlockingQueue<String>(5);//有界、无界
  2. mq2.add("1+");
  3. mq2.add("2+");
  4. mq2.add("3+");
  5. mq2.add("4+");
  6. mq2.add("5+");
  7. for (Iterator iterator = mq2.iterator();iterator.hasNext();){
  8. String str = (String) iterator.next();
  9. System.err.println(str);
  10. }
  11. List<String> list = new ArrayList<>();
  12. System.err.println("批量获取:"+mq2.drainTo(list,3));
  13. for (String s:list){


    5.ArrayBlockingQueue:基于数组的实现阻塞形式队列、自定义长度、生产、消费不能并行执行,可先进先出、先进后出。

    

  1. BlockingQueue<String> mq1 = new ArrayBlockingQueue<String>(3);
  2. mq1.add("a");
  3. mq1.put("b");
  4. mq1.add("e");


    6.PriorityBlockingQueue:基于优先级的阻塞形式,使用公平分离锁实现,优先级有comparable对象决定,实现此接口,没取一个队列排列一次队列

  1. package demo6;
  2. public class Task implements Comparable<Task> {
  3. private int id;
  4. private String name;
  5. public int getId() {
  6. return id;
  7. }
  8. public void setId(int id) {
  9. this.id = id;
  10. }
  11. public String getName() {
  12. return name;
  13. }
  14. public void setName(String name) {
  15. this.name = name;
  16. }
  17. @Override
  18. public int compareTo(Task o) {
  19. return this.id > o.id ? 1 : (this.id < o.id ? -1 : 0);
  20. }
  21. @Override
  22. public String toString() {
  23. return "Task{" +
  24. "id=" + id +
  25. ", name=‘" + name + ‘\‘‘ +
  26. ‘}‘;
  27. }
  28. }
  1. package demo6;
  2. import java.util.concurrent.PriorityBlockingQueue;
  3. public class DemoPriorityBlockingQueue {
  4. public static void main(String[] args) throws InterruptedException {
  5. PriorityBlockingQueue<Task> priorityBlockingQueue = new PriorityBlockingQueue<Task>();
  6. /**
  7. * 每取一个队列,就排一次序
  8. */
  9. Task t1 = new Task();
  10. t1.setId(3);
  11. t1.setName(" 任务1");
  12. Task t2 = new Task();
  13. t2.setId(6);
  14. t2.setName(" 任务2");
  15. Task t3 = new Task();
  16. t3.setId(1);
  17. t3.setName(" 任务3");
  18. priorityBlockingQueue.add(t1);
  19. priorityBlockingQueue.add(t2);
  20. priorityBlockingQueue.add(t3);
  21. /* for (Task q:priorityBlockingQueue){
  22. System.err.println(q.getId()+"\t"+q.getName());
  23. }*/
  24. System.err.println("容器:"+priorityBlockingQueue);
  25. System.err.println(priorityBlockingQueue.poll().getId());
  26. System.err.println("容器:"+priorityBlockingQueue);
  27. System.err.println(priorityBlockingQueue.poll().getId());
  28. System.err.println("容器:"+priorityBlockingQueue);
  29. System.err.println(priorityBlockingQueue.poll().getId());
  30. }
  31. }
  32. 输出
  33. 容器:[Task{id=1, name=‘ 任务3‘}, Task{id=6, name=‘ 任务2‘}, Task{id=3, name=‘ 任务1‘}]1容器:[Task{id=3, name=‘ 任务1‘}, Task{id=6, name=‘ 任务2‘}]3容器:[Task{id=6, name=‘ 任务2‘}]6

    7.synchronizeQueue:无缓冲的队列,生产消费的数据直接被消费者offer,不可直接使用add方法,必须先take方法在前,调用处于阻塞状态,等待add()方法的通知。

  1. package demo6;
  2. import java.util.concurrent.SynchronousQueue;
  3. public class DemoSynchronousQueue {
  4. public static void main(String[] args) {
  5. final SynchronousQueue<String> synchronousQueue = new SynchronousQueue<String>();
  6. /**
  7. * 不可直接使用add()方法,必须先有take方法在前调用(处于阻塞状态,等待add的通知激活),add()方法在后,
  8. */
  9. Thread t1 = new Thread(new Runnable() {
  10. @Override
  11. public void run() {
  12. try {
  13. System.err.println(synchronousQueue.take());
  14. } catch (InterruptedException e) {
  15. e.printStackTrace();
  16. }
  17. }
  18. }, "T1");
  19. t1.start();
  20. Thread t2 = new Thread(new Runnable() {
  21. @Override
  22. public void run() {
  23. synchronousQueue.add("哈哈哈哈");
  24. }
  25. }, "T2");
  26. t2.start();
  27. }
  28. }

    8.delayQueue:带有延迟时间的队列。

  1. package demo6;
  2. import java.util.concurrent.Delayed;
  3. import java.util.concurrent.TimeUnit;
  4. public class WangMin implements Delayed {
  5. private String name;
  6. private String id;
  7. /*结束时间*/
  8. private long endTime;
  9. public WangMin(String name, String id, long endTime) {
  10. this.name = name;
  11. this.id = id;
  12. this.endTime = endTime;
  13. }
  14. public String getName() {
  15. return name;
  16. }
  17. public void setName(String name) {
  18. this.name = name;
  19. }
  20. public String getId() {
  21. return id;
  22. }
  23. public void setId(String id) {
  24. this.id = id;
  25. }
  26. public long getEndTime() {
  27. return endTime;
  28. }
  29. public void setEndTime(long endTime) {
  30. this.endTime = endTime;
  31. }
  32. private TimeUnit timeUnit = TimeUnit.SECONDS;
  33. /*用来判断是否时间已经到期*/
  34. @Override
  35. public long getDelay(TimeUnit unit) {
  36. return this.endTime - System.currentTimeMillis();
  37. }
  38. /*相互排序比较时间*/
  39. @Override
  40. public int compareTo(Delayed o) {
  41. WangMin wangMin = (WangMin) o;
  42. return this.getDelay(this.timeUnit) - wangMin.getDelay(this.timeUnit) > 0 ? 1 : 0;
  43. }
  44. }
  1. package demo6;
  2. import java.util.concurrent.DelayQueue;
  3. public class WangBa implements Runnable{
  4. private DelayQueue<WangMin> queue = new DelayQueue<WangMin>();
  5. public boolean yinye = true;
  6. public void startShangWang(String name, String id, int money) {
  7. WangMin wangMin = new WangMin(name,id,1000*money+System.currentTimeMillis());
  8. System.err.println("网络机器:"+wangMin.getName()+"\t身份证:"+wangMin.getId()+"\t金额:"+money+"\t开始上网。。。");
  9. queue.add(wangMin);
  10. }
  11. public void endShangWang(WangMin wangMin) {
  12. System.err.println("网络机器:"+wangMin.getName()+"\t身份证:"+wangMin.getId()+"\t下机了....");
  13. }
  14. @Override
  15. public void run() {
  16. while (yinye){
  17. try {
  18. WangMin min = queue.take();
  19. endShangWang(min);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. }
  25. public static void main(String[] args){
  26. System.err.println("网吧开始...");
  27. WangBa wangBa = new WangBa();
  28. Thread sw = new Thread(wangBa);
  29. sw.start();
  30. wangBa.startShangWang("亚瑟1","xdwdasdasda12324323",5);
  31. wangBa.startShangWang("亚瑟2","xdwdasdasda12324323",20);
  32. wangBa.startShangWang("亚瑟3","xdwdasdasda12324323",15);
  33. }
  34. }
  35. 输出:
  36. 网吧开始...网络机器:亚瑟1 身份证:xdwdasdasda12324323 金额:5 开始上网。。。网络机器:亚瑟2 身份证:xdwdasdasda12324323 金额:20 开始上网。。。网络机器:亚瑟3 身份证:xdwdasdasda12324323 金额:15 开始上网。。。网络机器:亚瑟1 身份证:xdwdasdasda12324323 下机了....网络机器:亚瑟3 身份证:xdwdasdasda12324323 下机了....网络机器:亚瑟2 身份证:xdwdasdasda12324323 下机了....

常用方法

    1.add() 添加一个元素。

    2.offer(队列,时间秒,时间范围单位)

    3.poll()取出元素并删除,否返回null。

    4.peck()取出,否返回false。

    5.tace()取出,如果队列无数据直接释放cpu。


并发类:

    1. ConcurrentLikedQueue         高性能、无阻塞、无界

    2.BlockingQueue    阻塞形式队列



null


17.并发类容器设计