首页 > 代码库 > Java队列集合的性能测试

Java队列集合的性能测试

同时开10个线程存入和取出100万的数据,结论如下:

DoubleBufferedQueue < ConcurrentLinkedQueue < ArrayBlockingQueue < LinkedBlockingQueue


执行结果如下:

100万 DoubleBufferedQueue入队时间:9510 出队时间:10771
100万 DoubleBufferedQueue入队时间:8169 出队时间:9789
1000万 DoubleBufferedQueue入队时间:98285 出队时间:101088
1000万 DoubleBufferedQueue入队时间:101859 出队时间:105964

100万 ConcurrentLinkedQueue入队时间:10557 出队时间:13716
100万 ConcurrentLinkedQueue入队时间:25298 出队时间:25332
1000万 ConcurrentLinkedQueue队列时间:121868 出队时间:136116
1000万 ConcurrentLinkedQueue队列时间:134306 出队时间:147893

100万 ArrayBlockingQueue入队时间:21080 出队时间:22025
100万 ArrayBlockingQueue入队时间:17689 出队时间:19654
1000万 ArrayBlockingQueue入队时间:194400 出队时间:205968
1000万 ArrayBlockingQueue入队时间:192268 出队时间:197982

100万 LinkedBlockingQueue入队时间:38236 出队时间:52555
100万 LinkedBlockingQueue入队时间:30646 出队时间:38573
1000万 LinkedBlockingQueue入队时间:375669 出队时间:391976
1000万 LinkedBlockingQueue入队时间:701363 出队时间:711217

 

doubleBufferedQueue:

package test.MoreThread.d;import java.util.ArrayList;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import test.MoreThread.l.linkedBlockingQueue;import comrt.util.DoubleBufferedQueue;//DoubleBufferedQueue入队时间:9510  出队时间:10771//DoubleBufferedQueue入队时间:8169  出队时间:9789public class doubleBufferedQueue {    private static final Logger log = LoggerFactory            .getLogger(doubleBufferedQueue.class);    public final static int size1 = 1000000;    public static DoubleBufferedQueue<Object> queue = new DoubleBufferedQueue<Object>(            size1);    public final static int threadNumber = 10;    public static boolean isOver = false;    public static void main(String[] args) throws InterruptedException,            ExecutionException {//        long timestart = System.currentTimeMillis();        Thread thread1 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService = Executors                        .newFixedThreadPool(threadNumber);                ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();                for (int i = 0; i < threadNumber; i++) {                    Future<Long> future = executorService                            .submit(new ExecDoubleBufferedQueue());                    results.add(future);                }                long allTime = 0;                for (Future<Long> fs : results) {                    try {                        allTime += fs.get();                        // log.info("" + fs.get());                    } catch (InterruptedException e) {                        log.info("" + e);                        return;                    } catch (ExecutionException e) {                        log.info("" + e);                    } finally {                        executorService.shutdown();                    }                }                doubleBufferedQueue.isOver = true;                log.info("入队列总共执行时间:" + allTime);            }        });        thread1.start();        // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart));        // ------------------------------        Thread thread2 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService2 = Executors                        .newFixedThreadPool(threadNumber);                ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();                for (int i = 0; i < threadNumber; i++) {                    Future<Long> future = executorService2                            .submit(new ExecDoubleBufferedQueue_Out());                    results_out.add(future);                }                long allTime_out = 0;                for (Future<Long> fs : results_out) {                    try {                        allTime_out += fs.get();                        // log.info("" + fs.get());                    } catch (InterruptedException e) {                        log.info("" + e);                        return;                    } catch (ExecutionException e) {                        log.info("" + e);                    } finally {                        executorService2.shutdown();                    }                }                log.info("出队列总共执行时间:" + allTime_out);            }        });        thread2.start();    }}class ExecDoubleBufferedQueue implements Callable<Long> {    private static final Logger log = LoggerFactory            .getLogger(doubleBufferedQueue.class);    @Override    public Long call() throws Exception {        long time = System.currentTimeMillis();        for (int i = 0; i < doubleBufferedQueue.size1; i++) {            doubleBufferedQueue.queue.offer(i);        }        long time2 = System.currentTimeMillis() - time;        // log.info("执行时间:" + time2);        return time2;    }}class ExecDoubleBufferedQueue_Out implements Callable<Long> {    private static final Logger log = LoggerFactory            .getLogger(doubleBufferedQueue.class);    @Override    public Long call() throws Exception {        long time = System.currentTimeMillis();        while (!doubleBufferedQueue.isOver) {            doubleBufferedQueue.queue.poll();        }        long time2 = System.currentTimeMillis() - time;        // log.info("执行时间:" + time2);        return time2;    }}

 

concurrentLinkedQueue:

package test.MoreThread.c;import java.util.ArrayList;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//ConcurrentLinkedQueue入队时间:10557  出队时间:13716//ConcurrentLinkedQueue入队时间:25298  出队时间:25332public class concurrentLinkedQueue {    private static final Logger log = LoggerFactory            .getLogger(concurrentLinkedQueue.class);    public static ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();    public final static int size1 = 1000000;    public final static int threadNumber = 10;    public static boolean isOver = false;    public static void main(String[] args) throws InterruptedException,            ExecutionException {        // long timestart = System.currentTimeMillis();        Thread thread1 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService = Executors                        .newFixedThreadPool(threadNumber);                ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();                for (int i = 0; i < threadNumber; i++) {                    Future<Long> future = executorService.submit(new Exec());                    results.add(future);                }                long allTime = 0;                for (Future<Long> fs : results) {                    try {                        allTime += fs.get();//                        log.info("" + fs.get());                    } catch (InterruptedException e) {                        log.info("" + e);                        return;                    } catch (ExecutionException e) {                        log.info("" + e);                    } finally {                        executorService.shutdown();                    }                }                concurrentLinkedQueue.isOver = true;                log.info("队列总共执行时间:" + allTime);            }        });        thread1.start();        // ------------------------------        Thread thread2 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService2 = Executors                        .newFixedThreadPool(threadNumber);                ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();                for (int i = 0; i < threadNumber; i++) {                    Future<Long> future = executorService2                            .submit(new Exec_Out());                    results_out.add(future);                }                long allTime_out = 0;                for (Future<Long> fs : results_out) {                    try {                        allTime_out += fs.get();                        // log.info("" + fs.get());                    } catch (InterruptedException e) {                        log.info("" + e);                        return;                    } catch (ExecutionException e) {                        log.info("" + e);                    } finally {                        executorService2.shutdown();                    }                }                log.info("出队列总共执行时间:" + allTime_out);            }        });        thread2.start();        // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart));    }}class Exec implements Callable<Long> {    private static final Logger log = LoggerFactory            .getLogger(concurrentLinkedQueue.class);    @Override    public Long call() throws Exception {        long time = System.currentTimeMillis();        for (int i = 0; i < concurrentLinkedQueue.size1; i++) {            concurrentLinkedQueue.queue.offer(i);        }        long time2 = System.currentTimeMillis() - time;//        log.info("执行时间:" + time2);        return time2;    }}class Exec_Out implements Callable<Long> {    private static final Logger log = LoggerFactory            .getLogger(concurrentLinkedQueue.class);    @Override    public Long call() throws Exception {        long time = System.currentTimeMillis();        while (!concurrentLinkedQueue.isOver) {            concurrentLinkedQueue.queue.poll();        }        long time2 = System.currentTimeMillis() - time;        // log.info("执行时间:" + time2);        return time2;    }}

 

arrayBlockingQueue:

package test.MoreThread.a;import java.util.ArrayList;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//ArrayBlockingQueue入队时间:21080  出队时间:22025//ArrayBlockingQueue入队时间:17689  出队时间:19654public class arrayBlockingQueue {    private static final Logger log = LoggerFactory            .getLogger(arrayBlockingQueue.class);    public final static int size1 = 1000000;    public static ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(            size1);    public final static int threadNumber = 10;    public static boolean isOver = false;    public static void main(String[] args) throws InterruptedException,            ExecutionException {        // long timestart = System.currentTimeMillis();        Thread thread1 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService = Executors                        .newFixedThreadPool(threadNumber);                ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();                for (int i = 0; i < threadNumber; i++) {                    Future<Long> future = executorService                            .submit(new ExecArrayBlockingQueue());                    results.add(future);                }                long allTime = 0;                for (Future<Long> fs : results) {                    try {                        allTime += fs.get();                        // log.info("" + fs.get());                    } catch (InterruptedException e) {                        log.info("" + e);                        return;                    } catch (ExecutionException e) {                        log.info("" + e);                    } finally {                        executorService.shutdown();                    }                }                arrayBlockingQueue.isOver = true;                log.info("队列总共执行时间:" + allTime);            }        });        thread1.start();        // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart));        // ------------------------------        Thread thread2 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService2 = Executors                        .newFixedThreadPool(threadNumber);                ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();                for (int i = 0; i < threadNumber; i++) {                    Future<Long> future = executorService2                            .submit(new ExecArrayBlockingQueue_Out());                    results_out.add(future);                }                long allTime_out = 0;                for (Future<Long> fs : results_out) {                    try {                        allTime_out += fs.get();                        // log.info("" + fs.get());                    } catch (InterruptedException e) {                        log.info("" + e);                        return;                    } catch (ExecutionException e) {                        log.info("" + e);                    } finally {                        executorService2.shutdown();                    }                }                log.info("出队列总共执行时间:" + allTime_out);            }        });        thread2.start();    }}class ExecArrayBlockingQueue implements Callable<Long> {    private static final Logger log = LoggerFactory            .getLogger(arrayBlockingQueue.class);    @Override    public Long call() throws Exception {        long time = System.currentTimeMillis();        for (int i = 0; i < arrayBlockingQueue.size1; i++) {            arrayBlockingQueue.queue.offer(i);        }        long time2 = System.currentTimeMillis() - time;        // log.info("执行时间:" + time2);        return time2;    }}class ExecArrayBlockingQueue_Out implements Callable<Long> {    private static final Logger log = LoggerFactory            .getLogger(arrayBlockingQueue.class);    @Override    public Long call() throws Exception {        long time = System.currentTimeMillis();        while (!arrayBlockingQueue.isOver) {            arrayBlockingQueue.queue.poll();        }        long time2 = System.currentTimeMillis() - time;        // log.info("执行时间:" + time2);        return time2;    }}

 

linkedBlockingQueue:

package test.MoreThread.l;import java.util.ArrayList;import java.util.Vector;import java.util.concurrent.Callable;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingQueue;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//LinkedBlockingQueue入队时间:38236  出队时间:52555//LinkedBlockingQueue入队时间:30646  出队时间:38573public class linkedBlockingQueue {    private static final Logger log = LoggerFactory            .getLogger(linkedBlockingQueue.class);    public final static int size1 = 1000000;    public static LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(            size1);    public final static int threadNumber = 10;    public static boolean isOver = false;    public static void main(String[] args) throws InterruptedException,            ExecutionException {        long timestart = System.currentTimeMillis();        Thread thread1 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService = Executors                        .newFixedThreadPool(threadNumber);                ArrayList<Future<Long>> results = new ArrayList<Future<Long>>();                for (int i = 0; i < threadNumber; i++) {                    Future<Long> future = executorService                            .submit(new ExecLinkedBlockingQueue());                    results.add(future);                }                                long allTime = 0;                for (Future<Long> fs : results) {                    try {                        allTime += fs.get();                        // log.info("" + fs.get());                    } catch (InterruptedException e) {                        log.info("" + e);                        return;                    } catch (ExecutionException e) {                        log.info("" + e);                    } finally {                        executorService.shutdown();                    }                }                linkedBlockingQueue.isOver = true;                log.info("入队列总共执行时间:" + allTime);            }        });        thread1.start();        // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart));//        System.out.println(linkedBlockingQueue.queue.size());        // ------------------------------        Thread thread2 = new Thread(new Runnable() {            public void run() {                ExecutorService executorService2 = Executors                        .newFixedThreadPool(threadNumber);                ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>();                for (int i = 0; i < threadNumber; i++) {                    Future<Long> future = executorService2                            .submit(new ExecLinkedBlockingQueue_Out());                    results_out.add(future);                }                long allTime_out = 0;                for (Future<Long> fs : results_out) {                    try {                        allTime_out += fs.get();                        // log.info("" + fs.get());                    } catch (InterruptedException e) {                        log.info("" + e);                        return;                    } catch (ExecutionException e) {                        log.info("" + e);                    } finally {                        executorService2.shutdown();                    }                }                log.info("出队列总共执行时间:" + allTime_out);            }        });        thread2.start();    }}class ExecLinkedBlockingQueue implements Callable<Long> {    private static final Logger log = LoggerFactory            .getLogger(linkedBlockingQueue.class);    @Override    public Long call() throws Exception {        long time = System.currentTimeMillis();        for (int i = 0; i < linkedBlockingQueue.size1; i++) {            linkedBlockingQueue.queue.offer(i);        }        long time2 = System.currentTimeMillis() - time;        // log.info("执行时间:" + time2);        return time2;    }}class ExecLinkedBlockingQueue_Out implements Callable<Long> {    private static final Logger log = LoggerFactory            .getLogger(linkedBlockingQueue.class);    @Override    public Long call() throws Exception {        long time = System.currentTimeMillis();        while (!linkedBlockingQueue.isOver) {            linkedBlockingQueue.queue.poll();        }        long time2 = System.currentTimeMillis() - time;        // log.info("执行时间:" + time2);        return time2;    }}

 

DoubleBufferedQueue双缓冲队列

package comrt.util;import java.util.AbstractQueue;import java.util.Collection;import java.util.Iterator;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;import org.slf4j.Logger;import org.slf4j.LoggerFactory;//双缓冲队列,线程安全public class DoubleBufferedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {    private static final long serialVersionUID = 1011398447523020L;    public static final int DEFAULT_QUEUE_CAPACITY = 5000000;    public static final long DEFAULT_MAX_TIMEOUT = 0;    public static final long DEFAULT_MAX_COUNT = 10;    private Logger logger =    LoggerFactory.getLogger(DoubleBufferedQueue.class.getName());    /** The queued items */    private ReentrantLock readLock;    // 写锁    private ReentrantLock writeLock;    // 是否满    private Condition notFull;    private Condition awake;    // 读写数组    private transient E[] writeArray;    private transient E[] readArray;    // 读写计数    private volatile int writeCount;    private volatile int readCount;    // 写数组下标指针    private int writeArrayTP;    private int writeArrayHP;    // 读数组下标指针    private int readArrayTP;    private int readArrayHP;    private int capacity;    public DoubleBufferedQueue(int capacity) {        // 默认        this.capacity = DEFAULT_QUEUE_CAPACITY;        if (capacity > 0) {            this.capacity = capacity;        }        readArray = (E[]) new Object[capacity];        writeArray = (E[]) new Object[capacity];        readLock = new ReentrantLock();        writeLock = new ReentrantLock();        notFull = writeLock.newCondition();        awake = writeLock.newCondition();    }    private void insert(E e) {        writeArray[writeArrayTP] = e;        ++writeArrayTP;        ++writeCount;    }    private E extract() {        E e = readArray[readArrayHP];        readArray[readArrayHP] = null;        ++readArrayHP;        --readCount;        return e;    }    /**     * switch condition: read queue is empty && write queue is not empty     *      * Notice:This function can only be invoked after readLock is grabbed,or may     * cause dead lock     *      * @param timeout     * @param isInfinite     *            : whether need to wait forever until some other thread awake     *            it     * @return     * @throws InterruptedException     */    private long queueSwap(long timeout, boolean isInfinite) throws InterruptedException {        writeLock.lock();        try {            if (writeCount <= 0) {                // logger.debug("Write Count:" + writeCount                // + ", Write Queue is empty, do not switch!");                try {                    // logger.debug("Queue is empty, need wait....");                    if (isInfinite && timeout <= 0) {                        awake.await();                        return -1;                    } else if (timeout > 0) {                        return awake.awaitNanos(timeout);                    } else {                        return 0;                    }                } catch (InterruptedException ie) {                    awake.signal();                    throw ie;                }            } else {                E[] tmpArray = readArray;                readArray = writeArray;                writeArray = tmpArray;                readCount = writeCount;                readArrayHP = 0;                readArrayTP = writeArrayTP;                writeCount = 0;                writeArrayHP = readArrayHP;                writeArrayTP = 0;                notFull.signal();                // logger.debug("Queue switch successfully!");                return 0;            }        } finally {            writeLock.unlock();        }    }    @Override    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {        if (e == null) {            throw new NullPointerException();        }        long nanoTime = 0;                if (timeout > 0) {            nanoTime = unit.toNanos(timeout);        }                writeLock.lockInterruptibly();                try {            for (int i = 0; i < DEFAULT_MAX_COUNT; i++) {                if (writeCount < writeArray.length) {                    insert(e);                    if (writeCount == 1) {                        awake.signal();                    }                    return true;                }                // Time out                if (nanoTime <= 0) {                    // logger.debug("offer wait time out!");                    return false;                }                // keep waiting                try {                    // logger.debug("Queue is full, need wait....");                    nanoTime = notFull.awaitNanos(nanoTime);                } catch (InterruptedException ie) {                    notFull.signal();                    throw ie;                }            }        } finally {            writeLock.unlock();        }        return false;    }    //    @Override    public E poll(long timeout, TimeUnit unit) throws InterruptedException {        long nanoTime = 0;                if (timeout > 0) {            nanoTime = unit.toNanos(timeout);        }                readLock.lockInterruptibly();        try {            if (nanoTime > 0) {                for (int i = 0; i < DEFAULT_MAX_COUNT; i++) {                    if (readCount > 0) {                        return extract();                    }                    if (nanoTime <= 0) {                        // logger.debug("poll time out!");                        return null;                    }                    nanoTime = queueSwap(nanoTime, false);                }            } else {                if (readCount > 0) {                    return extract();                }                queueSwap(nanoTime, false);                if (readCount > 0) {                    return extract();                }             }        } finally {            readLock.unlock();        }        return null;    }    // 等待500毫秒    @Override    public E poll() {        E ret = null;        try {            ret = poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS);        } catch (Exception e) {            ret = null;        }        return ret;    }    // 查看    @Override    public E peek() {        E e = null;        readLock.lock();        try {            if (readCount > 0) {                e = readArray[readArrayHP];            }        } finally {            readLock.unlock();        }        return e;    }    // 默认500毫秒    @Override    public boolean offer(E e) {        boolean ret = false;        try {            ret = offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS);        } catch (Exception e2) {            ret = false;        }        return ret;    }    @Override    public void put(E e) throws InterruptedException {        // never need to // block        offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS);                                                                     }    @Override    public E take() throws InterruptedException {        return poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS);    }    @Override    public int remainingCapacity() {        return this.capacity;    }    @Override    public int drainTo(Collection<? super E> c) {        return 0;    }    @Override    public int drainTo(Collection<? super E> c, int maxElements) {        return 0;    }    @Override    public Iterator<E> iterator() {        return null;    }    // 当前读队列中还有多少个    @Override    public int size() {        int size = 0;        readLock.lock();        try {            size = readCount;        } finally {            readLock.unlock();        }        return size;    }        /**     * 当前已写入的队列大小     * */    public int WriteSize() {        int size = 0;        writeLock.lock();        try {            size = writeCount;        } finally {            writeLock.unlock();        }        return size;    }    public int unsafeReadSize() {        return readCount;    }    public int unsafeWriteSize() {        return writeCount;    }        public int capacity() {        return capacity;    }        public String toMemString() {        return "--read: " + readCount + "/" + capacity + "--write: " + writeCount + "/" + capacity;    }    // 清理    /*     * public void clear() { readLock.lock(); writeLock.lock(); try { readCount     * = 0; readArrayHP = 0; writeCount = 0; writeArrayTP = 0;     * //logger.debug("Queue clear successfully!"); } finally {     * writeLock.unlock(); readLock.unlock(); } }     */}

 

Java队列集合的性能测试