You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

13 KiB

17 | Executor组件Tomcat如何扩展Java线程池

在开发中我们经常会碰到“池”的概念比如数据库连接池、内存池、线程池、常量池等。为什么需要“池”呢程序运行的本质就是通过使用系统资源CPU、内存、网络、磁盘等来完成信息的处理比如在JVM中创建一个对象实例需要消耗CPU和内存资源如果你的程序需要频繁创建大量的对象并且这些对象的存活时间短就意味着需要进行频繁销毁那么很有可能这部分代码会成为性能的瓶颈。

而“池”就是用来解决这个问题的简单来说对象池就是把用过的对象保存起来等下一次需要这种对象的时候直接从对象池中拿出来重复使用避免频繁地创建和销毁。在Java中万物皆对象线程也是一个对象Java线程是对操作系统线程的封装创建Java线程也需要消耗系统资源因此就有了线程池。JDK中提供了线程池的默认实现我们也可以通过扩展Java原生线程池来实现自己的线程池。

同样为了提高处理能力和并发度Web容器一般会把处理请求的工作放到线程池里来执行Tomcat扩展了原生的Java线程池来满足Web容器高并发的需求下面我们就来学习一下Java线程池的原理以及Tomcat是如何扩展Java线程池的。

Java线程池

简单的说Java线程池里内部维护一个线程数组和一个任务队列当任务处理不过来的时就把任务放到队列里慢慢处理。

ThreadPoolExecutor

我们先来看看Java线程池核心类ThreadPoolExecutor的构造函数你需要知道ThreadPoolExecutor是如何使用这些参数的这是理解Java线程工作原理的关键。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

每次提交任务时,如果线程数还没达到核心线程数corePoolSize,线程池就创建新线程来执行。当线程数达到corePoolSize后,新增的任务就放到工作队列workQueue里,而线程池中的线程则努力地从workQueue里拉活来干也就是调用poll方法来获取任务。

如果任务很多,并且workQueue是个有界队列,队列可能会满,此时线程池就会紧急创建新的临时线程来救场,如果总的线程数达到了最大线程数maximumPoolSize,则不能再创建新的临时线程了,转而执行拒绝策略handler,比如抛出异常或者由调用者线程来执行任务等。

如果高峰过去了线程池比较闲了怎么办临时线程使用pollkeepAliveTime, unit方法从工作队列中拉活干请注意poll方法设置了超时时间如果超时了仍然两手空空没拉到活表明它太闲了这个线程会被销毁回收。

那还有一个参数threadFactory是用来做什么的呢?通过它你可以扩展原生的线程工厂,比如给创建出来的线程取个有意义的名字。

FixedThreadPool/CachedThreadPool

Java提供了一些默认的线程池实现比如FixedThreadPool和CachedThreadPool它们的本质就是给ThreadPoolExecutor设置了不同的参数是定制版的ThreadPoolExecutor。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

从上面的代码你可以看到:

  • FixedThreadPool有固定长度nThreads的线程数组,忙不过来时会把任务放到无限长的队列里,这是因为LinkedBlockingQueue默认是一个无界队列
  • CachedThreadPool的maximumPoolSize参数值是Integer.MAX_VALUE,因此它对线程个数不做限制,忙不过来时无限创建临时线程,闲下来时再回收。它的任务队列是SynchronousQueue表明队列长度为0。

Tomcat线程池

跟FixedThreadPool/CachedThreadPool一样Tomcat的线程池也是一个定制版的ThreadPoolExecutor。

定制版的ThreadPoolExecutor

通过比较FixedThreadPool和CachedThreadPool我们发现它们传给ThreadPoolExecutor的参数有两个关键点

  • 是否限制线程个数。
  • 是否限制队列长度。

对于Tomcat来说这两个资源都需要限制也就是说要对高并发进行控制否则CPU和内存有资源耗尽的风险。因此Tomcat传入的参数是这样的

//定制版的任务队列
taskqueue = new TaskQueue(maxQueueSize);

//定制版的线程工厂
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());

//定制版的线程池
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);

你可以看到其中的两个关键点:

  • Tomcat有自己的定制版任务队列和线程工厂并且可以限制任务队列的长度它的最大长度是maxQueueSize。
  • Tomcat对线程数也有限制设置了核心线程数minSpareThreads和最大线程池数maxThreads

除了资源限制以外Tomcat线程池还定制自己的任务处理流程。我们知道Java原生线程池的任务处理逻辑比较简单

  1. 前corePoolSize个任务时来一个任务就创建一个新线程。
  2. 后面再来任务,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
  3. 如果总线程数达到maximumPoolSize执行拒绝策略。

Tomcat线程池扩展了原生的ThreadPoolExecutor通过重写execute方法实现了自己的任务处理逻辑

  1. 前corePoolSize个任务时来一个任务就创建一个新线程。
  2. 再来任务的话,就把任务添加到任务队列里让所有的线程去抢,如果队列满了就创建临时线程。
  3. 如果总线程数达到maximumPoolSize则继续尝试把任务添加到任务队列中去。
  4. 如果缓冲队列也满了,插入失败,执行拒绝策略。

观察Tomcat线程池和Java原生线程池的区别其实就是在第3步Tomcat在线程总数达到最大数时不是立即执行拒绝策略而是再尝试向任务队列添加任务添加失败后再执行拒绝策略。那具体如何实现呢其实很简单我们来看一下Tomcat线程池的execute方法的核心代码。

public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
  
  ...
  
  public void execute(Runnable command, long timeout, TimeUnit unit) {
      submittedCount.incrementAndGet();
      try {
          //调用Java原生线程池的execute去执行任务
          super.execute(command);
      } catch (RejectedExecutionException rx) {
         //如果总线程数达到maximumPoolSizeJava原生线程池执行拒绝策略
          if (super.getQueue() instanceof TaskQueue) {
              final TaskQueue queue = (TaskQueue)super.getQueue();
              try {
                  //继续尝试把任务放到任务队列中去
                  if (!queue.force(command, timeout, unit)) {
                      submittedCount.decrementAndGet();
                      //如果缓冲队列也满了,插入失败,执行拒绝策略。
                      throw new RejectedExecutionException("...");
                  }
              } 
          }
      }
}

从这个方法你可以看到Tomcat线程池的execute方法会调用Java原生线程池的execute去执行任务如果总线程数达到maximumPoolSizeJava原生线程池的execute方法会抛出RejectedExecutionException异常但是这个异常会被Tomcat线程池的execute方法捕获到并继续尝试把这个任务放到任务队列中去如果任务队列也满了再执行拒绝策略。

定制版的任务队列

细心的你有没有发现在Tomcat线程池的execute方法最开始有这么一行

submittedCount.incrementAndGet();

这行代码的意思把submittedCount这个原子变量加一并且在任务执行失败抛出拒绝异常时将这个原子变量减一

submittedCount.decrementAndGet();

其实Tomcat线程池是用这个变量submittedCount来维护已经提交到了线程池但是还没有执行完的任务个数。Tomcat为什么要维护这个变量呢这跟Tomcat的定制版的任务队列有关。Tomcat的任务队列TaskQueue扩展了Java中的LinkedBlockingQueue我们知道LinkedBlockingQueue默认情况下长度是没有限制的除非给它一个capacity。因此Tomcat给了它一个capacityTaskQueue的构造函数中有个整型的参数capacityTaskQueue将capacity传给父类LinkedBlockingQueue的构造函数。

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

  public TaskQueue(int capacity) {
      super(capacity);
  }
  ...
}

这个capacity参数是通过Tomcat的maxQueueSize参数来设置的但问题是默认情况下maxQueueSize的值是Integer.MAX_VALUE,等于没有限制,这样就带来一个问题:当前线程数达到核心线程数之后,再来任务的话线程池会把任务添加到任务队列,并且总是会成功,这样永远不会有机会创建新线程了。

为了解决这个问题TaskQueue重写了LinkedBlockingQueue的offer方法在合适的时机返回false返回false表示任务添加失败这时线程池会创建新的线程。那什么是合适的时机呢请看下面offer方法的核心源码

public class TaskQueue extends LinkedBlockingQueue<Runnable> {

  ...
   @Override
  //线程池调用任务队列的方法时,当前线程数肯定已经大于核心线程数了
  public boolean offer(Runnable o) {

      //如果线程数已经到了最大值,不能创建新线程了,只能把任务添加到任务队列。
      if (parent.getPoolSize() == parent.getMaximumPoolSize()) 
          return super.offer(o);
          
      //执行到这里,表明当前线程数大于核心线程数,并且小于最大线程数。
      //表明是可以创建新线程的,那到底要不要创建呢?分两种情况:
      
      //1. 如果已提交的任务数小于当前线程数,表示还有空闲线程,无需创建新线程
      if (parent.getSubmittedCount()<=(parent.getPoolSize())) 
          return super.offer(o);
          
      //2. 如果已提交的任务数大于当前线程数线程不够用了返回false去创建新线程
      if (parent.getPoolSize()<parent.getMaximumPoolSize()) 
          return false;
          
      //默认情况下总是把任务添加到任务队列
      return super.offer(o);
  }
  
}

从上面的代码我们看到只有当前线程数大于核心线程数、小于最大线程数并且已提交的任务个数大于当前线程数时也就是说线程不够用了但是线程数又没达到极限才会去创建新的线程。这就是为什么Tomcat需要维护已提交任务数这个变量它的目的就是在任务队列的长度无限制的情况下,让线程池有机会创建新的线程

当然默认情况下Tomcat的任务队列是没有限制的你可以通过设置maxQueueSize参数来限制任务队列的长度。

本期精华

池化的目的是为了避免频繁地创建和销毁对象减少对系统资源的消耗。Java提供了默认的线程池实现我们也可以扩展Java原生的线程池来实现定制自己的线程池Tomcat就是这么做的。Tomcat扩展了Java线程池的核心类ThreadPoolExecutor并重写了它的execute方法定制了自己的任务处理流程。同时Tomcat还实现了定制版的任务队列重写了offer方法使得在任务队列长度无限制的情况下线程池仍然有机会创建新的线程。

课后思考

请你再仔细看看Tomcat的定制版任务队列TaskQueue的offer方法它多次调用了getPoolSize方法但是这个方法是有锁的锁会引起线程上下文切换而损耗性能请问这段代码可以如何优化呢

不知道今天的内容你消化得如何?如果还有疑问,请大胆的在留言区提问,也欢迎你把你的课后思考和心得记录下来,与我和其他同学一起讨论。如果你觉得今天有所收获,欢迎你把它分享给你的朋友。