You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
13 KiB
Markdown

2 years ago
# 19 | CountDownLatch和CyclicBarrier如何让多线程步调一致
前几天老板突然匆匆忙忙过来,说对账系统最近越来越慢了,能不能快速优化一下。我了解了对账系统的业务后,发现还是挺简单的,用户通过在线商城下单,会生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。
对账系统的处理逻辑很简单,你可以参考下面的对账系统流程图。目前对账系统的处理逻辑是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。
![](https://static001.geekbang.org/resource/image/06/fe/068418bdc371b8a1b4b740428a3b3ffe.png)
对账系统流程图
对账系统的代码抽象之后,也很简单,核心代码如下,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。
```
while(存在未对账订单){
// 查询未对账订单
pos = getPOrders();
// 查询派送单
dos = getDOrders();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
```
## 利用并行优化对账系统
老板要我优化性能,那我就首先要找到这个对账系统的瓶颈所在。
目前的对账系统由于订单量和派送单量巨大所以查询未对账订单getPOrders()和查询派送单getDOrders()相对较慢,那有没有办法快速优化一下呢?目前对账系统是单线程执行的,图形化后是下图这个样子。对于串行化的系统,优化性能首先想到的是能否**利用多线程并行处理**。
![](https://static001.geekbang.org/resource/image/cd/a5/cd997c259e4165c046e79e766abfe2a5.png)
对账系统单线程执行示意图
所以这里你应该能够看出来这个对账系统里的瓶颈查询未对账订单getPOrders()和查询派送单getDOrders()是否可以并行处理呢显然是可以的因为这两个操作并没有先后顺序的依赖。这两个最耗时的操作并行之后执行过程如下图所示。对比一下单线程的执行示意图你会发现同等时间里并行执行的吞吐量近乎单线程的2倍优化效果还是相对明显的。
![](https://static001.geekbang.org/resource/image/a5/3b/a563c39ece918578ad2ff33ab5f3743b.png)
对账系统并行执行示意图
思路有了下面我们再来看看如何用代码实现。在下面的代码中我们创建了两个线程T1和T2并行执行查询未对账订单getPOrders()和查询派送单getDOrders()这两个操作。在主线程中执行对账操作check()和差异写入save()两个操作。不过需要注意的是主线程需要等待线程T1和T2执行完才能执行check()和save()这两个操作为此我们通过调用T1.join()和T2.join()来实现等待当T1和T2线程退出时调用T1.join()和T2.join()的主线程就会从阻塞态被唤醒从而执行之后的check()和save()。
```
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待T1、T2结束
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
```
## 用CountDownLatch实现线程等待
经过上面的优化之后基本上可以跟老板汇报收工了但还是有点美中不足相信你也发现了while循环里面每次都会创建新的线程而创建线程可是个耗时的操作。所以最好是创建出来的线程能够循环利用估计这时你已经想到线程池了是的线程池就能解决这个问题。
而下面的代码就是用线程池优化后的我们首先创建了一个固定大小为2的线程池之后在while循环里重复利用。一切看上去都很顺利但是有个问题好像无解了那就是主线程如何知道getPOrders()和getDOrders()这两个操作什么时候执行完。前面主线程通过调用线程T1和T2的join()方法来等待线程T1和T2退出但是在线程池的方案里线程根本就不会退出所以join()方法已经失效了。
```
// 创建2个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
});
/* ??如何实现等待??*/
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
```
那如何解决这个问题呢你可以开动脑筋想出很多办法最直接的办法是弄一个计数器初始值设置成2当执行完`pos = getPOrders();`这个操作之后将计数器减1执行完`dos = getDOrders();`之后也将计数器减1在主线程里等待计数器等于0当计数器等于0时说明这两个查询操作执行完了。等待计数器等于0其实就是一个条件变量用管程实现起来也很简单。
不过我并不建议你在实际项目中去实现上面的方案因为Java并发包里已经提供了实现类似功能的工具类**CountDownLatch**我们直接使用就可以了。下面的代码示例中在while循环里面我们首先创建了一个CountDownLatch计数器的初始值等于2之后在`pos = getPOrders();`和`dos = getDOrders();`两条语句的后面对计数器执行减1操作这个对计数器减1的操作是通过调用 `latch.countDown();` 来实现的。在主线程中,我们通过调用 `latch.await()` 来实现对计数器等于0的等待。
```
// 创建2个线程的线程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 计数器初始化为2
CountDownLatch latch =
new CountDownLatch(2);
// 查询未对账订单
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
```
## 进一步优化性能
经过上面的重重优化之后,长出一口气,终于可以交付了。不过在交付之前还需要再次审视一番,看看还有没有优化的余地,仔细看还是有的。
前面我们将getPOrders()和getDOrders()这两个查询操作并行了但这两个查询操作和对账操作check()、save()之间还是串行的。很显然,这两个查询操作和对账操作也是可以并行的,也就是说,在执行对账操作的时候,可以同时去执行下一轮的查询操作,这个过程可以形象化地表述为下面这幅示意图。
![](https://static001.geekbang.org/resource/image/e6/8b/e663d90f49d9666e618ac1370ccca58b.png)
完全并行执行示意图
那接下来我们再来思考一下如何实现这步优化,两次查询操作能够和对账操作并行,对账操作还依赖查询操作的结果,这明显有点生产者-消费者的意思,两次查询操作是生产者,对账操作是消费者。既然是生产者-消费者模型,那就需要有个队列,来保存生产者生产的数据,而消费者则从这个队列消费数据。
不过针对对账这个项目,我设计了两个队列,并且两个队列的元素之间还有对应关系。具体如下图所示,订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。两个队列的好处是,对账操作可以每次从订单队列出一个元素,从派送单队列出一个元素,然后对这两个元素执行对账操作,这样数据一定不会乱掉。
![](https://static001.geekbang.org/resource/image/22/da/22e8ba1c04a3bc2605b98376ed6832da.png)
双队列示意图
下面再来看如何用双队列来实现完全的并行。一个最直接的想法是一个线程T1执行订单的查询工作一个线程T2执行派送单的查询工作当线程T1和T2都各自生产完1条数据的时候通知线程T3执行对账操作。这个想法虽看上去简单但其实还隐藏着一个条件那就是线程T1和线程T2的工作要步调一致不能一个跑得太快一个跑得太慢只有这样才能做到各自生产完1条数据的时候通知线程T3。
下面这幅图形象地描述了上面的意图线程T1和线程T2只有都生产完1条数据的时候才能一起向下执行也就是说线程T1和线程T2要互相等待步调要一致同时当线程T1和T2都生产完一条数据的时候还要能够通知线程T3执行对账操作。
![](https://static001.geekbang.org/resource/image/65/ad/6593a10a393d9310a8f864730f7426ad.png)
同步执行示意图
## 用CyclicBarrier实现线程同步
下面我们就来实现上面提到的方案。这个方案的难点有两个一个是线程T1和T2要做到步调一致另一个是要能够通知到线程T3。
你依然可以利用一个计数器来解决这两个难点计数器初始化为2线程T1和T2生产完一条数据都将计数器减1如果计数器大于0则线程T1或者T2等待。如果计数器等于0则通知线程T3并唤醒等待的线程T1或者T2与此同时将计数器重置为2这样线程T1和线程T2生产下一条数据的时候就可以继续使用这个计数器了。
同样还是建议你不要在实际项目中这么做因为Java并发包里也已经提供了相关的工具类**CyclicBarrier**。在下面的代码中我们首先创建了一个计数器初始值为2的CyclicBarrier你需要注意的是创建CyclicBarrier的时候我们还传入了一个回调函数当计数器减到0的时候会调用这个回调函数。
线程T1负责查询订单当查出一条时调用 `barrier.await()` 来将计数器减1同时等待计数器变成0线程T2负责查询派送单当查出一条时也调用 `barrier.await()` 来将计数器减1同时等待计数器变成0当T1和T2都调用 `barrier.await()` 的时候计数器会减到0此时T1和T2就可以执行下一条语句了同时会调用barrier的回调函数来执行对账操作。
非常值得一提的是CyclicBarrier的计数器有自动重置的功能当减到0的时候会自动重置你设置的初始值。这个功能用起来实在是太方便了。
```
// 订单队列
Vector<P> pos;
// 派送单队列
Vector<D> dos;
// 执行回调的线程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(()->{
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
while(存在未对账订单){
// 查询运单库
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
```
## 总结
CountDownLatch和CyclicBarrier是Java并发包提供的两个非常易用的线程同步工具类这两个工具类用法的区别在这里还是有必要再强调一下**CountDownLatch主要用来解决一个线程等待多个线程的场景**,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;而**CyclicBarrier是一组线程之间互相等待**更像是几个驴友之间不离不弃。除此之外CountDownLatch的计数器是不能循环利用的也就是说一旦计数器减到0再有线程调用await(),该线程会直接通过。但**CyclicBarrier的计数器是可以循环利用的**而且具备自动重置的功能一旦计数器减到0会自动重置到你设置的初始值。除此之外CyclicBarrier还可以设置回调函数可以说是功能丰富。
本章的示例代码中有两处用到了线程池你现在只需要大概了解即可因为线程池相关的知识咱们专栏后面还会有详细介绍。另外线程池提供了Future特性我们也可以利用Future特性来实现线程之间的等待这个后面我们也会详细介绍。
## 课后思考
本章最后的示例代码中CyclicBarrier的回调函数我们使用了一个固定大小的线程池你觉得是否有必要呢
欢迎在留言区与我分享你的想法,也欢迎你在留言区记录你的思考过程。感谢阅读,如果你觉得这篇文章对你有帮助的话,也欢迎把它分享给更多的朋友。