You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

13 KiB

14 | NioEndpoint组件Tomcat如何实现非阻塞I/O

UNIX系统下的I/O模型有5种同步阻塞I/O、同步非阻塞I/O、I/O多路复用、信号驱动I/O和异步I/O。这些名词我们好像都似曾相识但这些I/O通信模型有什么区别同步和阻塞似乎是一回事到底有什么不同等一下在这之前你是不是应该问自己一个终极问题什么是I/O为什么需要这些I/O模型

所谓的I/O就是计算机内存与外部设备之间拷贝数据的过程。我们知道CPU访问内存的速度远远高于外部设备因此CPU是先把外部设备的数据读到内存里然后再进行处理。请考虑一下这个场景当你的程序通过CPU向外部设备发出一个读指令时数据从外部设备拷贝到内存往往需要一段时间这个时候CPU没事干了你的程序是主动把CPU让给别人还是让CPU不停地查数据到了吗数据到了吗……

这就是I/O模型要解决的问题。今天我会先说说各种I/O模型的区别然后重点分析Tomcat的NioEndpoint组件是如何实现非阻塞I/O模型的。

Java I/O模型

对于一个网络I/O通信过程比如网络数据读取会涉及两个对象一个是调用这个I/O操作的用户线程另外一个就是操作系统内核。一个进程的地址空间分为用户空间和内核空间用户线程不能直接访问内核空间。

当用户线程发起I/O操作后网络数据读取操作会经历两个步骤

  • 用户线程等待内核将数据从网卡拷贝到内核空间。
  • 内核将数据从内核空间拷贝到用户空间。

各种I/O模型的区别就是它们实现这两个步骤的方式是不一样的。

同步阻塞I/O用户线程发起read调用后就阻塞了让出CPU。内核等待网卡数据到来把数据从网卡拷贝到内核空间接着把数据拷贝到用户空间再把用户线程叫醒。

同步非阻塞I/O用户线程不断的发起read调用数据没到内核空间时每次都返回失败直到数据到了内核空间这一次read调用后在等待数据从内核空间拷贝到用户空间这段时间里线程还是阻塞的等数据到了用户空间再把线程叫醒。

I/O多路复用用户线程的读取操作分成两步了线程先发起select调用目的是问内核数据准备好了吗等内核把数据准备好了用户线程再发起read调用。在等待数据从内核空间拷贝到用户空间这段时间里线程还是阻塞的。那为什么叫I/O多路复用呢因为一次select调用可以向内核查多个数据通道Channel的状态所以叫多路复用。

异步I/O用户线程发起read调用的同时注册一个回调函数read立即返回等内核将数据准备好后再调用指定的回调函数完成处理。在这个过程中用户线程一直没有阻塞。

NioEndpoint组件

Tomcat的NioEndpoint组件实现了I/O多路复用模型接下来我会介绍NioEndpoint的实现原理下一期我会介绍Tomcat如何实现异步I/O模型。

总体工作流程

我们知道对于Java的多路复用器的使用无非是两步

  1. 创建一个Selector在它身上注册各种感兴趣的事件然后调用select方法等待感兴趣的事情发生。

  2. 感兴趣的事情发生了比如可以读了这时便创建一个新的线程从Channel中读数据。

Tomcat的NioEndpoint组件虽然实现比较复杂但基本原理就是上面两步。我们先来看看它有哪些组件它一共包含LimitLatch、Acceptor、Poller、SocketProcessor和Executor共5个组件它们的工作过程如下图所示。

LimitLatch是连接控制器它负责控制最大连接数NIO模式下默认是10000达到这个阈值后连接请求被拒绝。

Acceptor跑在一个单独的线程里它在一个死循环里调用accept方法来接收新连接一旦有新的连接请求到来accept方法返回一个Channel对象接着把Channel对象交给Poller去处理。

Poller的本质是一个Selector也跑在单独线程里。Poller在内部维护一个Channel数组它在一个死循环里不断检测Channel的数据就绪状态一旦有Channel可读就生成一个SocketProcessor任务对象扔给Executor去处理。

Executor就是线程池负责运行SocketProcessor任务类SocketProcessor的run方法会调用Http11Processor来读取和解析请求数据。我们知道Http11Processor是应用层协议的封装它会调用容器获得响应再把响应通过Channel写出。

接下来我详细介绍一下各组件的设计特点。

LimitLatch

LimitLatch用来控制连接个数当连接数到达最大时阻塞线程直到后续组件处理完一个连接后将连接数减1。请你注意到达最大连接数后操作系统底层还是会接收客户端连接但用户层已经不再接收。LimitLatch的核心代码如下

public class LimitLatch {
    private class Sync extends AbstractQueuedSynchronizer {
     
        @Override
        protected int tryAcquireShared() {
            long newCount = count.incrementAndGet();
            if (newCount > limit) {
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;
    private final AtomicLong count;
    private volatile long limit;
    
    //线程调用这个方法来获得接收新连接的许可,线程可能被阻塞
    public void countUpOrAwait() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
    }

    //调用这个方法来释放一个连接许可,那么前面阻塞的线程可能被唤醒
    public long countDown() {
      sync.releaseShared(0);
      long result = getCount();
      return result;
   }
}

从上面的代码我们看到LimitLatch内步定义了内部类Sync而Sync扩展了AQSAQS是Java并发包中的一个核心类它在内部维护一个状态和一个线程队列可以用来控制线程什么时候挂起,什么时候唤醒。我们可以扩展它来实现自己的同步器实际上Java并发包里的锁和条件变量等等都是通过AQS来实现的而这里的LimitLatch也不例外。

理解上面的代码时有两个要点:

  1. 用户线程通过调用LimitLatch的countUpOrAwait方法来拿到锁如果暂时无法获取这个线程会被阻塞到AQS的队列中。那AQS怎么知道是阻塞还是不阻塞用户线程呢其实这是由AQS的使用者来决定的也就是内部类Sync来决定的因为Sync类重写了AQS的tryAcquireShared()方法。它的实现逻辑是如果当前连接数count小于limit线程能获取锁返回1否则返回-1。

  2. 如何用户线程被阻塞到了AQS的队列那什么时候唤醒呢同样是由Sync内部类决定Sync重写了AQS的tryReleaseShared()方法,其实就是当一个连接请求处理完了,这时又可以接收一个新连接了,这样前面阻塞的线程将会被唤醒。

其实你会发现AQS就是一个骨架抽象类它帮我们搭了个架子用来控制线程的阻塞和唤醒。具体什么时候阻塞、什么时候唤醒由你来决定。我们还注意到当前线程数被定义成原子变量AtomicLong而limit变量用volatile关键字来修饰这些并发编程的实际运用。

Acceptor

Acceptor实现了Runnable接口因此可以跑在单独线程里。一个端口号只能对应一个ServerSocketChannel因此这个ServerSocketChannel是在多个Acceptor线程之间共享的它是Endpoint的属性由Endpoint完成初始化和端口绑定。初始化过程如下

serverSock = ServerSocketChannel.open();
serverSock.socket().bind(addr,getAcceptCount());
serverSock.configureBlocking(true);

从上面的初始化代码我们可以看到两个关键信息:

  1. bind方法的第二个参数表示操作系统的等待队列长度我在上面提到当应用层面的连接数到达最大值时操作系统可以继续接收连接那么操作系统能继续接收的最大连接数就是这个队列长度可以通过acceptCount参数配置默认是100。

  2. ServerSocketChannel被设置成阻塞模式也就是说它是以阻塞的方式接收连接的。

ServerSocketChannel通过accept()接受新的连接accept()方法返回获得SocketChannel对象然后将SocketChannel对象封装在一个PollerEvent对象中并将PollerEvent对象压入Poller的Queue里这是个典型的“生产者-消费者”模式Acceptor与Poller线程之间通过Queue通信。

Poller

Poller本质是一个Selector它内部维护一个Queue这个Queue定义如下

private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

SynchronizedQueue的方法比如offer、poll、size和clear方法都使用了synchronized关键字进行修饰用来保证同一时刻只有一个Acceptor线程对Queue进行读写。同时有多个Poller线程在运行每个Poller线程都有自己的Queue。每个Poller线程可能同时被多个Acceptor线程调用来注册PollerEvent。同样Poller的个数可以通过pollers参数配置。

Poller不断的通过内部的Selector对象向内核查询Channel的状态一旦可读就生成任务类SocketProcessor交给Executor去处理。Poller的另一个重要任务是循环遍历检查自己所管理的SocketChannel是否已经超时如果有超时就关闭这个SocketChannel。

SocketProcessor

我们知道Poller会创建SocketProcessor任务类交给线程池处理而SocketProcessor实现了Runnable接口用来定义Executor中线程所执行的任务主要就是调用Http11Processor组件来处理请求。Http11Processor读取Channel的数据来生成ServletRequest对象这里请你注意

Http11Processor并不是直接读取Channel的。这是因为Tomcat支持同步非阻塞I/O模型和异步I/O模型在Java API中相应的Channel类也是不一样的比如有AsynchronousSocketChannel和SocketChannel为了对Http11Processor屏蔽这些差异Tomcat设计了一个包装类叫作SocketWrapperHttp11Processor只调用SocketWrapper的方法去读写数据。

Executor

Executor是Tomcat定制版的线程池它负责创建真正干活的工作线程干什么活呢就是执行SocketProcessor的run方法也就是解析请求并通过容器来处理请求最终会调用到我们的Servlet。后面我会用专门的篇幅介绍Tomcat怎么扩展和使用Java原生的线程池。

高并发思路

在弄清楚NioEndpoint的实现原理后我们来考虑一个重要的问题怎么把这个过程做到高并发呢

高并发就是能快速地处理大量的请求需要合理设计线程模型让CPU忙起来尽量不要让线程阻塞因为一阻塞CPU就闲下来了。另外就是有多少任务就用相应规模的线程数去处理。我们注意到NioEndpoint要完成三件事情接收连接、检测I/O事件以及处理请求那么最核心的就是把这三件事情分开用不同规模的线程数去处理比如用专门的线程组去跑Acceptor并且Acceptor的个数可以配置用专门的线程组去跑PollerPoller的个数也可以配置最后具体任务的执行也由专门的线程池来处理也可以配置线程池的大小。

本期精华

I/O模型是为了解决内存和外部设备速度差异的问题。我们平时说的阻塞或非阻塞是指应用程序在发起I/O操作时是立即返回还是等待。而同步和异步,是指应用程序在与内核通信时,数据从内核空间到应用空间的拷贝,是由内核主动发起还是由应用程序来触发。

在Tomcat中Endpoint组件的主要工作就是处理I/O而NioEndpoint利用Java NIO API实现了多路复用I/O模型。其中关键的一点是读写数据的线程自己不会阻塞在I/O等待上而是把这个工作交给Selector。同时Tomcat在这个过程中运用到了很多Java并发编程技术比如AQS、原子类、并发容器线程池等都值得我们去细细品味。

课后思考

Tomcat的NioEndpoint组件的名字中有NIONIO是非阻塞的意思似乎说的是同步非阻塞I/O模型但是NioEndpoint又是调用Java的的Selector来实现的我们知道Selector指的是I/O多路复用器也就是我们说的I/O多路复用模型这不是矛盾了吗

不知道今天的内容你消化得如何?如果还有疑问,请大胆的在留言区提问,也欢迎你把你的课后思考和心得记录下来,与我和其他同学一起讨论。如果你觉得今天有所收获,欢迎你把它分享给你的朋友。