You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

15 KiB

22 | 非阻塞I/O提升性能的加速器

你好我是盛延敏这里是网络编程实战第22讲欢迎回来。

在性能篇的前两讲中我分别介绍了select和poll两种不同的I/O多路复用技术。在接下来的这一讲中我将带大家进入非阻塞I/O模式的世界。事实上非阻塞I/O配合I/O多路复用是高性能网络编程中的常见技术。

阻塞 VS 非阻塞

当应用程序调用阻塞I/O完成某个操作时应用程序会被挂起等待内核完成操作感觉上应用程序像是被“阻塞”了一样。实际上内核所做的事情是将CPU时间切换给其他有需要的进程网络应用程序在这种情况下就会得不到CPU时间做该做的事情。

非阻塞I/O则不然当应用程序调用非阻塞I/O完成某个操作时内核立即返回不会把CPU时间切换给其他进程应用程序在返回后可以得到足够的CPU时间继续完成其他事情。

如果拿去书店买书举例子阻塞I/O对应什么场景呢 你去了书店,告诉老板(内核)你想要某本书,然后你就一直在那里等着,直到书店老板翻箱倒柜找到你想要的书,有可能还要帮你联系全城其它分店。注意,这个过程中你一直滞留在书店等待老板的回复,好像在书店老板这里"阻塞"住了。

那么非阻塞I/O呢你去了书店问老板有没你心仪的那本书老板查了下电脑告诉你没有你就悻悻离开了。一周以后你又来这个书店再问这个老板老板一查有了于是你买了这本书。注意这个过程中你没有被阻塞而是在不断轮询。

但轮询的效率太低了于是你向老板提议“老板到货给我打电话吧我再来付钱取书。”这就是前面讲到的I/O多路复用。

再进一步你连去书店取书也想省了得了让老板代劳吧你留下地址付了书费让老板到货时寄给你你直接在家里拿到就可以看了。这就是我们将会在第30讲中讲到的异步I/O。

这几个I/O模型再加上进程、线程模型构成了整个网络编程的知识核心。

按照使用场景非阻塞I/O可以被用到读操作、写操作、接收连接操作和发起连接操作上。接下来我们对它们一一解读。

非阻塞I/O

读操作

如果套接字对应的接收缓冲区没有数据可读在非阻塞情况下read调用会立即返回一般返回EWOULDBLOCK或EAGAIN出错信息。在这种情况下出错信息是需要小心处理比如后面再次调用read操作而不是直接作为错误直接返回。这就好像去书店买书没买到离开一样需要不断进行又一次轮询处理。

写操作

不知道你有没有注意到在阻塞I/O情况下write函数返回的字节数和输入的参数总是一样的。如果返回值总是和输入的数据大小一样write等写入函数还需要定义返回值吗我不知道你是不是和我一样刚接触到这一部分知识的时候有这种困惑。

这里就要引出我们所说的非阻塞I/O。在非阻塞I/O的情况下如果套接字的发送缓冲区已达到了极限不能容纳更多的字节那么操作系统内核会尽最大可能从应用程序拷贝数据到发送缓冲区中并立即从write等函数调用中返回。可想而知在拷贝动作发生的瞬间有可能一个字符也没拷贝有可能所有请求字符都被拷贝完成那么这个时候就需要返回一个数值告诉应用程序到底有多少数据被成功拷贝到了发送缓冲区中应用程序需要再次调用write函数以输出未完成拷贝的字节。

write等函数是可以同时作用到阻塞I/O和非阻塞I/O上的为了复用一个函数处理非阻塞和阻塞I/O多种情况设计出了写入返回值并用这个返回值表示实际写入的数据大小。

也就是说非阻塞I/O和阻塞I/O处理的方式是不一样的。

非阻塞I/O需要这样拷贝→返回→再拷贝→再返回。

而阻塞I/O需要这样拷贝→直到所有数据拷贝至发送缓冲区完成→返回。

不过在实战中你可以不用区别阻塞和非阻塞I/O使用循环的方式来写入数据就好了。只不过在阻塞I/O的情况下循环只执行一次就结束了。

我在前面的章节中已经介绍了类似的方案你可以看到writen函数的实现。

/* 向文件描述符fd写入n字节数 */
ssize_t writen(int fd, const void * data, size_t n)
{
    size_t      nleft;
    ssize_t     nwritten;
    const char  *ptr;

    ptr = data;
    nleft = n;
    //如果还有数据没被拷贝完成,就一直循环
    while (nleft > 0) {
        if ( (nwritten = write(fd, ptr, nleft)) <= 0) {
           /* 这里EAGAIN是非阻塞non-blocking情况下通知我们再次调用write() */
            if (nwritten < 0 && errno == EAGAIN)
                nwritten = 0;      
            else
                return -1;         /* 出错退出 */
        }

        /* 指针增大,剩下字节数变小*/
        nleft -= nwritten;
        ptr   += nwritten;
    }
    return n;
}

下面我通过一张表来总结一下read和write在阻塞模式和非阻塞模式下的不同行为特性


关于read和write还有几个结论你需要把握住

  1. read总是在接收缓冲区有数据时就立即返回不是等到应用程序给定的数据充满才返回。当接收缓冲区为空时阻塞模式会等待非阻塞模式立即返回-1并有EWOULDBLOCK或EAGAIN错误。
  2. 和read不同阻塞模式下write只有在发送缓冲区足以容纳应用程序的输出字节时才返回而非阻塞模式下则是能写入多少就写入多少并返回实际写入的字节数。
  3. 阻塞模式下的write有个特例, 就是对方主动关闭了套接字这个时候write调用会立即返回并通过返回值告诉应用程序实际写入的字节数如果再次对这样的套接字进行write操作就会返回失败。失败是通过返回值-1来通知到应用程序的。

accept

当accept和I/O多路复用select、poll等一起配合使用时如果在监听套接字上触发事件说明有连接建立完成此时调用accept肯定可以返回已连接套接字。这样看来似乎把监听套接字设置为非阻塞没有任何好处。

为了说明这个问题我们构建一个客户端程序其中最关键的是一旦连接建立设置SO_LINGER套接字选项把l_onoff标志设置为1把l_linger时间设置为0。这样连接被关闭时TCP套接字上将会发送一个RST。

struct linger ling;
ling.l_onoff = 1; 
ling.l_linger = 0;
setsockopt(socket_fd, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling));
close(socket_fd);

服务器端使用select I/O多路复用不过监听套接字仍然是blocking的。如果监听套接字上有事件发生休眠5秒以便模拟高并发场景下的情形。

if (FD_ISSET(listen_fd, &readset)) {
    printf("listening socket readable\n");
    sleep(5);
    struct sockaddr_storage ss;
    socklen_t slen = sizeof(ss);
    int fd = accept(listen_fd, (struct sockaddr *) &ss, &slen);

这里的休眠时间非常关键这样在监听套接字上有可读事件发生时并没有马上调用accept。由于客户端发生了RST分节该连接被接收端内核从自己的已完成队列中删除了此时再调用accept由于没有已完成连接假设没有其他已完成连接accept一直阻塞更为严重的是该线程再也没有机会对其他I/O事件进行分发相当于该服务器无法对其他I/O进行服务。

如果我们将监听套接字设为非阻塞上述的情形就不会再发生。只不过对于accept的返回值需要正确地处理各种看似异常的错误例如忽略EWOULDBLOCK、EAGAIN等。

这个例子给我们的启发是一定要将监听套接字设置为非阻塞的尽管这里休眠时间5秒有点夸张但是在极端情况下处理不当的服务器程序是有可能碰到例子所阐述的情况为了让服务器程序在极端情况下工作正常这点工作还是非常值得的。

connect

在非阻塞TCP套接字上调用connect函数会立即返回一个EINPROGRESS错误。TCP三次握手会正常进行应用程序可以继续做其他初始化的事情。当该连接建立成功或者失败时通过I/O多路复用select、poll等可以进行连接的状态检测。

非阻塞I/O + select多路复用

我在这里给出了一个非阻塞I/O搭配select多路复用的例子。

#define MAX_LINE 1024
#define FD_INIT_SIZE 128

char rot13_char(char c) {
    if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
        return c + 13;
    else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
        return c - 13;
    else
        return c;
}

//数据缓冲区
struct Buffer {
    int connect_fd;  //连接字
    char buffer[MAX_LINE];  //实际缓冲
    size_t writeIndex;      //缓冲写入位置
    size_t readIndex;       //缓冲读取位置
    int readable;           //是否可以读
};

struct Buffer *alloc_Buffer() {
    struct Buffer *buffer = malloc(sizeof(struct Buffer));
    if (!buffer)
        return NULL;
    buffer->connect_fd = 0;
    buffer->writeIndex = buffer->readIndex = buffer->readable = 0;
    return buffer;
}

void free_Buffer(struct Buffer *buffer) {
    free(buffer);
}

int onSocketRead(int fd, struct Buffer *buffer) {
    char buf[1024];
    int i;
    ssize_t result;
    while (1) {
        result = recv(fd, buf, sizeof(buf), 0);
        if (result <= 0)
            break;

        for (i = 0; i < result; ++i) {
            if (buffer->writeIndex < sizeof(buffer->buffer))
                buffer->buffer[buffer->writeIndex++] = rot13_char(buf[i]);
            if (buf[i] == '\n') {
                buffer->readable = 1;  //缓冲区可以读
            }
        }
    }

    if (result == 0) {
        return 1;
    } else if (result < 0) {
        if (errno == EAGAIN)
            return 0;
        return -1;
    }

    return 0;
}

int onSocketWrite(int fd, struct Buffer *buffer) {
    while (buffer->readIndex < buffer->writeIndex) {
        ssize_t result = send(fd, buffer->buffer + buffer->readIndex, buffer->writeIndex - buffer->readIndex, 0);
        if (result < 0) {
            if (errno == EAGAIN)
                return 0;
            return -1;
        }

        buffer->readIndex += result;
    }

    if (buffer->readIndex == buffer->writeIndex)
        buffer->readIndex = buffer->writeIndex = 0;

    buffer->readable = 0;

    return 0;
}

int main(int argc, char **argv) {
    int listen_fd;
    int i, maxfd;

    struct Buffer *buffer[FD_INIT_SIZE];
    for (i = 0; i < FD_INIT_SIZE; ++i) {
        buffer[i] = alloc_Buffer();
    }

    listen_fd = tcp_nonblocking_server_listen(SERV_PORT);

    fd_set readset, writeset, exset;
    FD_ZERO(&readset);
    FD_ZERO(&writeset);
    FD_ZERO(&exset);

    while (1) {
        maxfd = listen_fd;

        FD_ZERO(&readset);
        FD_ZERO(&writeset);
        FD_ZERO(&exset);

        // listener加入readset
        FD_SET(listen_fd, &readset);

        for (i = 0; i < FD_INIT_SIZE; ++i) {
            if (buffer[i]->connect_fd > 0) {
                if (buffer[i]->connect_fd > maxfd)
                    maxfd = buffer[i]->connect_fd;
                FD_SET(buffer[i]->connect_fd, &readset);
                if (buffer[i]->readable) {
                    FD_SET(buffer[i]->connect_fd, &writeset);
                }
            }
        }

        if (select(maxfd + 1, &readset, &writeset, &exset, NULL) < 0) {
            error(1, errno, "select error");
        }

        if (FD_ISSET(listen_fd, &readset)) {
            printf("listening socket readable\n");
            sleep(5);
            struct sockaddr_storage ss;
            socklen_t slen = sizeof(ss);
           int fd = accept(listen_fd, (struct sockaddr *) &ss, &slen);
            if (fd < 0) {
                error(1, errno, "accept failed");
            } else if (fd > FD_INIT_SIZE) {
                error(1, 0, "too many connections");
                close(fd);
            } else {
                make_nonblocking(fd);
                if (buffer[fd]->connect_fd == 0) {
                    buffer[fd]->connect_fd = fd;
                } else {
                    error(1, 0, "too many connections");
                }
            }
        }

        for (i = 0; i < maxfd + 1; ++i) {
            int r = 0;
            if (i == listen_fd)
                continue;

            if (FD_ISSET(i, &readset)) {
                r = onSocketRead(i, buffer[i]);
            }
            if (r == 0 && FD_ISSET(i, &writeset)) {
                r = onSocketWrite(i, buffer[i]);
            }
            if (r) {
                buffer[i]->connect_fd = 0;
                close(i);
            }
        }
    }
}

第93行调用fcntl将监听套接字设置为非阻塞。

fcntl(fd, F_SETFL, O_NONBLOCK);

第121行调用select进行I/O事件分发处理。

131-142行在处理新的连接套接字注意这里也把连接套接字设置为非阻塞的。

151-156行在处理连接套接字上的I/O读写事件这里我们抽象了一个Buffer对象Buffer对象使用了readIndex和writeIndex分别表示当前缓冲的读写位置。

实验

启动该服务器:

$./nonblockingserver

使用多个telnet客户端连接该服务器可以验证交互正常。

$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
fasfasfasf
snfsnfsnfs

总结

非阻塞I/O可以使用在read、write、accept、connect等多种不同的场景在非阻塞I/O下使用轮询的方式引起CPU占用率高所以一般将非阻塞I/O和I/O多路复用技术select、poll等搭配使用在非阻塞I/O事件发生时再调用对应事件的处理函数。这种方式极大地提高了程序的健壮性和稳定性是Linux下高性能网络编程的首选。

思考题

给你布置两道思考题:

第一道程序中第133行这个判断说明了什么如果要改进的话你有什么想法

else if (fd > FD_INIT_SIZE) {
    error(1, 0, "too many connections");
    close(fd);

第二道你可以仔细阅读一下数据读写部分Buffer的代码你觉得用一个Buffer对象而不是两个的目的是什么

欢迎在评论区写下你的思考,我会和你一起交流,也欢迎把这篇文章分享给你的朋友或者同事,一起交流一下。