Java线程池实验:ArrayBlockingQueue和LinkedBlockingQueue吞吐量测试
二者区别
ArrayBlockingQueue内部使用1个锁来控制队列项的插入、取出操作,而LinkedBlockingQueue则是使用了2个锁来控制,一个名为putLock,另一个是takeLock,但是锁的本质都是ReentrantLock。
Array里面使用的时一个锁,不管put还是take行为,都可能被这个锁卡住,而Linked里面put和take是两个锁,put只会被put行为卡住,而不会被take卡住,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,因此吞吐性能自然强于Array。
Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.
吞吐量实验
下面的两段程序只有这一处不一样:
使用生产者消费者模型去测试队列的吞吐量,用一个线程池生产任务放到 任务队列中,另一个线程池从任务队列中拿到并消费任务。
生产者和消费者线程池内部的队列都是ArrayBlockingQueue,而联系生产者和消费者的任务队列一个是ArrayBlockingQueue, 一个是LinkedBlockingDeque。
BlockingQueue DATA_QUEUE = new ArrayBlockingQueue(taskNum);BlockingQueue DATA_QUEUE = new LinkedBlockingDeque(taskNum);
package chapter09;import java.util.concurrent.*;public class AQueuethroughputTest { public static void main(String[] args) throws InterruptedException { int taskNum = 1000000; BlockingQueue DATA_QUEUE = new ArrayBlockingQueue(taskNum); ThreadPoolExecutor producer = new ThreadPoolExecutor(4,4, 0, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(taskNum)); for (int i = 0; i < taskNum; i++) { int finalI = i+1; producer.execute(new Runnable() { @Override public void run() { DATA_QUEUE.add(finalI); System.out.println("已添加任务,任务序列号为: "+finalI); } }); } int coreNum = Runtime.getRuntime().availableProcessors(); System.out.println("CPU核数为 " + coreNum); ThreadPoolExecutor arrayPool = new ThreadPoolExecutor(coreNum,coreNum, 0, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(taskNum)); // 需要改为taskNum 因为此时任务队列的 put 还在进行 CountDownLatch latch = new CountDownLatch(taskNum); long startTime = System.currentTimeMillis(); for (int i = 0; i < taskNum; i++) { int num = DATA_QUEUE.take(); arrayPool.submit(new Runnable() { @Override public void run() { System.out.println("==已处理任务,任务序列号是: " + num+"=="); latch.countDown(); } }); } latch.await(); arrayPool.shutdown(); System.out.println("任务执行完毕"); long endTime = System.currentTimeMillis(); System.out.println("程序运行时间:" + (endTime - startTime) + "ms"); System.out.println("吞吐量tps :"+ (double)taskNum/(endTime - startTime)*1000); }}
package chapter09;import java.util.concurrent.*;public class LQueuethroughputTest { public static void main(String[] args) throws InterruptedException { int taskNum = 1000000; BlockingQueue DATA_QUEUE = new LinkedBlockingDeque(taskNum); ThreadPoolExecutor producer = new ThreadPoolExecutor(4,4, 0, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(taskNum)); for (int i = 0; i < taskNum; i++) { int finalI = i+1; producer.execute(new Runnable() { @Override public void run() { DATA_QUEUE.add(finalI); System.out.println("已添加任务,任务序列号为: "+finalI); } }); } int coreNum = Runtime.getRuntime().availableProcessors(); System.out.println("CPU核数为 " + coreNum); ThreadPoolExecutor arrayPool = new ThreadPoolExecutor(coreNum,coreNum, 0, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(taskNum)); // 需要改为taskNum 因为此时任务队列的 put 还在进行 CountDownLatch latch = new CountDownLatch(taskNum); long startTime = System.currentTimeMillis(); for (int i = 0; i < taskNum; i++) { int num = DATA_QUEUE.take(); arrayPool.submit(new Runnable() { @Override public void run() { System.out.println("==已处理任务,任务序列号是: " + num+"=="); latch.countDown(); } }); } latch.await(); arrayPool.shutdown(); System.out.println("任务执行完毕"); long endTime = System.currentTimeMillis(); System.out.println("程序运行时间:" + (endTime - startTime) + "ms"); System.out.println("吞吐量tps :"+ (double)taskNum/(endTime - startTime)*1000); }}
结果展示
ArrayBlockingQueue:
LinkedBlockingDeque:
创作打卡挑战赛
赢取流量/现金/CSDN周边激励大奖