You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

248 lines
13 KiB
Markdown

2 years ago
# 第20讲 | 并发包中的ConcurrentLinkedQueue和LinkedBlockingQueue有什么区别
在上一讲中我分析了Java并发包中的部分内容今天我来介绍一下线程安全队列。Java标准库提供了非常多的线程安全队列很容易混淆。
今天我要问你的问题是并发包中的ConcurrentLinkedQueue和LinkedBlockingQueue有什么区别
## 典型回答
有时候我们把并发包下面的所有容器都习惯叫作并发容器但是严格来讲类似ConcurrentLinkedQueue这种“Concurrent\*”容器,才是真正代表并发。
关于问题中它们的区别:
* Concurrent类型基于lock-free在常见的多线程访问场景一般可以提供较高吞吐量。
* 而LinkedBlockingQueue内部则是基于锁并提供了BlockingQueue的等待性方法。
不知道你有没有注意到java.util.concurrent包提供的容器Queue、List、Set、Map从命名上可以大概区分为Concurrent\*、CopyOnWrite_和Blocking_等三类同样是线程安全容器可以简单认为
* Concurrent类型没有类似CopyOnWrite之类容器相对较重的修改开销。
* 但是凡事都是有代价的Concurrent往往提供了较低的遍历一致性。你可以这样理解所谓的弱一致性例如当利用迭代器遍历时如果容器发生修改迭代器仍然可以继续进行遍历。
* 与弱一致性对应的就是我介绍过的同步容器常见的行为“fail-fast”也就是检测到容器在遍历过程中发生了修改则抛出ConcurrentModificationException不再继续遍历。
* 弱一致性的另外一个体现是size等操作准确性是有限的未必是100%准确。
* 与此同时,读取的性能具有一定的不确定性。
## 考点分析
今天的问题是又是一个引子,考察你是否了解并发包内部不同容器实现的设计目的和实现区别。
队列是非常重要的数据结构我们日常开发中很多线程间数据传递都要依赖于它Executor框架提供的各种线程池同样无法离开队列。面试官可以从不同角度考察比如
* 哪些队列是有界的,哪些是无界的?(很多同学反馈了这个问题)
* 针对特定场景需求,如何选择合适的队列实现?
* 从源码的角度,常见的线程安全队列是如何实现的,并进行了哪些改进以提高性能表现?
为了能更好地理解这一讲需要你掌握一些基本的队列本身和数据结构方面知识如果这方面知识比较薄弱《数据结构与算法分析》是一本比较全面的参考书专栏还是尽量专注于Java领域的特性。
## 知识扩展
**线程安全队列一览**
我在[专栏第8讲](http://time.geekbang.org/column/article/7810)中介绍过常见的集合中如LinkedList是个Deque只不过不是线程安全的。下面这张图是Java并发类库提供的各种各样的**线程安全**队列实现,注意,图中并未将非线程安全部分包含进来。
![](https://static001.geekbang.org/resource/image/79/79/791750d6fe7ef88ecb3897e1d029f079.png)
我们可以从不同的角度进行分类,从基本的数据结构的角度分析,有两个特别的[Deque](https://docs.oracle.com/javase/9/docs/api/java/util/Deque.html)实现ConcurrentLinkedDeque和LinkedBlockingDeque。Deque的侧重点是支持对队列头尾都进行插入和删除所以提供了特定的方法如:
* 尾部插入时需要的[addLast(e)](https://docs.oracle.com/javase/9/docs/api/java/util/Deque.html#addLast-E-)、[offerLast(e)](https://docs.oracle.com/javase/9/docs/api/java/util/Deque.html#offerLast-E-)。
* 尾部删除所需要的[removeLast()](https://docs.oracle.com/javase/9/docs/api/java/util/Deque.html#removeLast--)、[pollLast()](https://docs.oracle.com/javase/9/docs/api/java/util/Deque.html#pollLast--)。
从上面这些角度能够理解ConcurrentLinkedDeque和LinkedBlockingQueue的主要功能区别也就足够日常开发的需要了。但是如果我们深入一些通常会更加关注下面这些方面。
从行为特征来看绝大部分Queue都是实现了BlockingQueue接口。在常规队列操作基础上Blocking意味着其提供了特定的等待性操作获取时take等待元素进队或者插入时put等待队列出现空位。
```
/**
* 获取并移除队列头结点,如果必要,其会等待直到队列出现元素
*/
E take() throws InterruptedException;
/**
* 插入元素,如果队列已满,则等待直到队列出现空闲空间
  …
*/
void put(E e) throws InterruptedException;
```
另一个BlockingQueue经常被考察的点就是是否有界Bounded、Unbounded这一点也往往会影响我们在应用开发中的选择我这里简单总结一下。
* ArrayBlockingQueue是最典型的的有界队列其内部以final的数组保存数据数组的大小就决定了队列的边界所以我们在创建ArrayBlockingQueue时都要指定容量
```
public ArrayBlockingQueue(int capacity, boolean fair)
```
* LinkedBlockingQueue容易被误解为无边界但其实其行为和内部代码都是基于有界的逻辑实现的只不过如果我们没有在创建队列时就指定容量那么其容量限制就自动被设置为Integer.MAX\_VALUE成为了无界队列。
* SynchronousQueue这是一个非常奇葩的队列实现每个删除操作都要等待插入操作反之每个插入操作也都要等待删除动作。那么这个队列的容量是多少呢是1吗其实不是的其内部容量是0。
* PriorityBlockingQueue是无边界的优先队列虽然严格意义上来讲其大小总归是要受系统资源影响。
* DelayedQueue和LinkedTransferQueue同样是无边界的队列。对于无边界的队列有一个自然的结果就是put操作永远也不会发生其他BlockingQueue的那种等待情况。
如果我们分析不同队列的底层实现BlockingQueue基本都是基于锁实现一起来看看典型的LinkedBlockingQueue。
```
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
```
我在介绍ReentrantLock的条件变量用法的时候分析过ArrayBlockingQueue不知道你有没有注意到其条件变量与LinkedBlockingQueue版本的实现是有区别的。notEmpty、notFull都是同一个再入锁的条件变量而LinkedBlockingQueue则改进了锁操作的粒度头、尾操作使用不同的锁所以在通用场景下它的吞吐量相对要更好一些。
下面的take方法与ArrayBlockingQueue中的实现也是有不同的由于其内部结构是链表需要自己维护元素数量值请参考下面的代码。
```
public E take() throws InterruptedException {
   final E x;
   final int c;
   final AtomicInteger count = this.count;
   final ReentrantLock takeLock = this.takeLock;
   takeLock.lockInterruptibly();
   try {
       while (count.get() == 0) {
           notEmpty.await();
       }
       x = dequeue();
       c = count.getAndDecrement();
       if (c > 1)
           notEmpty.signal();
   } finally {
       takeLock.unlock();
   }
   if (c == capacity)
       signalNotFull();
   return x;
}
```
类似ConcurrentLinkedQueue等则是基于CAS的无锁技术不需要在每个操作时使用锁所以扩展性表现要更加优异。
相对比较另类的SynchronousQueue在Java 6中其实现发生了非常大的变化利用CAS替换掉了原本基于锁的逻辑同步开销比较小。它是Executors.newCachedThreadPool()的默认队列。
**队列使用场景与典型用例**
在实际开发中我提到过Queue被广泛使用在生产者-消费者场景比如利用BlockingQueue来实现由于其提供的等待机制我们可以少操心很多协调工作你可以参考下面样例代码
```
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ConsumerProducer {
   public static final String EXIT_MSG  = "Good bye!";
   public static void main(String[] args) {
// 使用较小的队列,以更好地在输出中展示其影响
       BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
       Producer producer = new Producer(queue);
       Consumer consumer = new Consumer(queue);
       new Thread(producer).start();
       new Thread(consumer).start();
   }
   static class Producer implements Runnable {
       private BlockingQueue<String> queue;
       public Producer(BlockingQueue<String> q) {
           this.queue = q;
       }
       @Override
       public void run() {
           for (int i = 0; i < 20; i++) {
               try{
                   Thread.sleep(5L);
                   String msg = "Message" + i;
                   System.out.println("Produced new item: " + msg);
                   queue.put(msg);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           }
           try {
               System.out.println("Time to say good bye!");
               queue.put(EXIT_MSG);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
   static class Consumer implements Runnable{
       private BlockingQueue<String> queue;
       public Consumer(BlockingQueue<String> q){
           this.queue=q;
       }
       @Override
       public void run() {
           try{
               String msg;
               while(!EXIT_MSG.equalsIgnoreCase( (msg = queue.take()))){
                   System.out.println("Consumed item: " + msg);
                   Thread.sleep(10L);
               }
               System.out.println("Got exit message, bye!");
           }catch(InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}
```
上面是一个典型的生产者-消费者样例如果使用非Blocking的队列那么我们就要自己去实现轮询、条件判断如检查poll返回值是否null等逻辑如果没有特别的场景要求Blocking实现起来代码更加简单、直观。
前面介绍了各种队列实现,在日常的应用开发中,如何进行选择呢?
以LinkedBlockingQueue、ArrayBlockingQueue和SynchronousQueue为例我们一起来分析一下根据需求可以从很多方面考量
* 考虑应用场景中对队列边界的要求。ArrayBlockingQueue是有明确的容量限制的而LinkedBlockingQueue则取决于我们是否在创建时指定SynchronousQueue则干脆不能缓存任何元素。
* 从空间利用角度数组结构的ArrayBlockingQueue要比LinkedBlockingQueue紧凑因为其不需要创建所谓节点但是其初始分配阶段就需要一段连续的空间所以初始内存需求更大。
* 通用场景中LinkedBlockingQueue的吞吐量一般优于ArrayBlockingQueue因为它实现了更加细粒度的锁操作。
* ArrayBlockingQueue实现比较简单性能更好预测属于表现稳定的“选手”。
* 如果我们需要实现的是两个线程之间接力性handoff的场景按照[专栏上一讲](http://time.geekbang.org/column/article/9373)的例子你可能会选择CountDownLatch但是[SynchronousQueue](http://www.baeldung.com/java-synchronous-queue)也是完美符合这种场景的,而且线程间协调和数据传输统一起来,代码更加规范。
* 可能令人意外的是很多时候SynchronousQueue的性能表现往往大大超过其他实现尤其是在队列元素较小的场景。
今天我分析了Java中让人眼花缭乱的各种线程安全队列试图从几个角度让每个队列的特点更加明确进而希望减少你在日常工作中使用时的困扰。
## 一课一练
关于今天我们讨论的题目你做到心中有数了吗? 今天的内容侧重于Java自身的角度面试官也可能从算法的角度来考察所以今天留给你的思考题是指定某种结构比如栈用它实现一个BlockingQueue实现思路是怎样的呢
请你在留言区写写你对这个问题的思考,我会选出经过认真思考的留言,送给你一份学习奖励礼券,欢迎你与我一起讨论。
你的朋友是不是也在准备面试呢?你可以“请朋友读”,把今天的题目分享给好友,或许你能帮到他。