基于事件的并发

到目前为止,我们在写并发性的时候,好像构建并发应用程序的唯一方法就是使用线程。就像生活中的许多事情一样,这并不完全正确。具体来说,基于图形用户界面的应用程序和某些类型的互联网服务器经常使用不同风格的并发编程。这种风格被称为基于事件的并发,已在一些现代系统中流行起来,包括 node.js 等服务器端框架,但其根源在于 C/UNIX 系统,我们将在下文讨论。

基于事件的并发解决了两个方面的问题。首先,在多线程应用程序中正确管理并发性是一项挑战;正如我们所讨论的,可能会出现锁丢失、死锁和其他令人讨厌的问题。其次,在多线程应用程序中,开发人员几乎无法控制特定时刻的调度;相反,程序员只需创建线程,然后寄希望于底层操作系统以合理的方式在可用 CPU 上调度这些线程。由于很难构建一个通用的调度程序,在所有情况下都能很好地处理所有工作负载,操作系统有时会以非最佳的方式调度工作。因此,我们的关键是:

我们如何在不使用线程的情况下构建并发服务器,从而保留对并发的控制并避免一些似乎困扰多线程应用程序的问题?

1 基本思想:事件循环

如上所述,我们将使用的基本方法称为基于事件的并发。这种方法非常简单:只需等待某件事情(即 “事件”)发生;当事件发生时,检查事件的类型,并完成所需的少量工作(可能包括发出 I/O 请求,或安排其他事件的未来处理等)。就是这样!

在了解细节之前,我们先来看看典型的基于事件的服务器是什么样的。此类应用程序基于一个简单的结构,即事件循环。事件循环的伪代码是这样的:

1
2
3
4
5
while (1) {
    events = getEvents();
    for (e in events)
        processEvent(e);
}

其实就是这么简单。主循环只需等待事件发生(在上面的代码中调用 getEvents()),然后对返回的每个事件逐个进行处理;处理每个事件的代码称为事件处理程序。重要的是,当处理程序处理事件时,它是系统中发生的唯一活动;因此,决定下一步处理哪个事件就相当于调度。这种对调度的显式控制是基于事件的方法的基本优势之一。

但是,上述讨论给我们留下了一个更大的问题:基于事件的服务器究竟如何确定哪些事件正在发生,尤其是在网络和磁盘 I/O 方面?具体来说,事件服务器如何判断信息是否已经到达?

2 一个重要的API:select()(或者poll()

考虑到这个基本事件循环,接下来我们必须解决如何接收事件的问题。在大多数系统中,可以通过 select()poll() 系统调用使用一个基本 API。

这些接口使程序能够简单地检查是否有任何需要处理的传入 I/O。例如,想象一下一个网络应用(比如一个 Web 服务器)希望检查是否有任何网络数据包已经到达以便对其进行服务。这些系统调用让你正好做到了这一点。以 select() 为例。手册页(在 Mac 上)描述了该 API 的方式:

1
2
3
4
5
int select(int nfds, 
           fd_set *restrict readfds, 
           fd_set *restrict writefds, 
           fd_set *restrict errorfds, 
           struct timeval *restrict timeout);

手册页中的实际描述: select() 检查其地址在 readfdswritefdserrorfds中传递的 I/O 描述符集,以查看其中的某些描述符是否已准备好读取、准备好写入或有异常条件待定。检查每个集合中的第一个 nfds 描述符,即检查描述符集合中从 0nfds-1 的描述符。返回时,select() 将给定的描述符集替换为由已准备好执行请求的操作的描述符组成的子集。 select() 返回所有集合中就绪描述符的总数。

关于 select() 的几点。首先,请注意,它可以让您检查描述符是否可以读取和写入;前者让服务器确定新数据包已到达并需要处理,而后者让服务知道何时可以回复(即出站队列未满)。

其次,注意超时参数。这里的一种常见用法是将超时设置为 NULL,这会导致 select() 无限期地阻塞,直到某个描述符准备就绪。然而,更强大的服务器通常会指定某种超时;一种常见的技术是将超时设置为零,从而使用对 select() 的调用来立即返回。

poll() 系统调用非常相似。有关详细信息,请参阅其手册页。不管怎样,这些基本原语为我们提供了一种构建非阻塞事件循环的方法,它只需检查传入的数据包,从套接字中读取消息,并根据需要进行回复。

3 使用select()

为了更具体地说明这一点,我们来看看如何使用 select() 查看哪些网络描述符上有传入的报文。下面这段代码显示了一个简单的示例。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int main(void) {
    // open and set up a bunch of sockets (not shown)
    // main loop
    while (1) {
        // initialize the fd_set to all zero
        fd_set readFDs;
        FD_ZERO(&readFDs);

        // now set the bits for the descriptors
        // this server is interested in
        // (for simplicity, all of them from min to max)
        int fd;
        for (fd = minFD; fd < maxFD; fd++)
            FD_SET(fd, &readFDs);

        // do the select
        int rc = select(maxFD+1, &readFDs, NULL, NULL, NULL);

        // check which actually have data using FD_ISSET()
        for (fd = minFD; fd < maxFD; fd++)
            if (FD_ISSET(fd, &readFDs))
                processFD(fd);
    }
}

这段代码其实相当简单易懂。在初始化之后,服务器进入一个无限循环。在循环内部,它首先使用 FD_ZERO() 宏清除文件描述符集,然后使用 FD_SET()minFDmaxFD 的所有文件描述符都包含在文件描述符集中。例如,这组描述符可能代表服务器正在关注的所有网络套接字。最后,服务器调用 select() 查看哪些连接上有可用数据。然后,在一个循环中使用 FD_ISSET(),事件服务器就能看到哪些描述符已准备好数据,并处理传入的数据。

当然,真正的服务器要比这复杂得多,需要在发送消息、发出磁盘 I/O 和许多其他细节时使用逻辑。

4 为什么更简单?不需要锁

有了单 CPU 和基于事件的应用程序,并发程序中的问题就不复存在了。具体来说,由于一次只处理一个事件,因此无需获取或释放锁;基于事件的服务器不会被其他线程中断,因为它是绝对的单线程。因此,线程程序中常见的并发问题在基于事件的基本方法中并不存在。

TIP:不要阻塞基于事件的服务器

基于事件的服务器可以对任务调度进行细粒度的控制。然而,为了维持这种控制,不能进行任何阻止调用者执行的调用;不遵守此设计技巧将导致基于事件的服务器被阻塞,会发生一系列的严重问题。

5 问题:阻塞系统调用

到目前为止,基于事件的编程听起来不错,对吗?你只需编写一个简单的循环,然后在事件发生时进行处理。你甚至不需要考虑锁定问题!但是有一个问题:如果一个事件要求你发出一个可能会阻塞的系统调用怎么办?

例如,假设客户端向服务器发出请求,要求从磁盘读取文件,并将文件内容返回给请求客户端(就像简单的 HTTP 请求一样)。要处理这样的请求,某个事件处理程序最终必须发出 open() 系统调用来打开文件,然后再发出一系列 read() 调用来读取文件。当文件被读入内存后,服务器可能会开始向客户端发送结果。

open()read() 调用都可能向存储系统发出 I/O 请求(当所需的元数据或数据不在内存中时),因此可能需要很长时间才能提供服务。对于基于线程的服务器来说,这不是问题:当发出 I/O 请求的线程暂停(等待 I/O 完成)时,其他线程可以运行,从而使服务器取得进展。事实上,I/O 和其他计算的这种自然重叠正是基于线程的编程非常自然和简单的原因。

然而,在基于事件的方法中,没有其他线程可以运行:只有主事件循环。这就意味着,如果事件处理程序发出的调用阻塞,整个服务器就会这样做:阻塞,直到调用完成。当事件循环阻塞时,系统就会处于闲置状态,从而造成巨大的潜在资源浪费。因此,在基于事件的系统中,我们有一条必须遵守的规则:不允许阻塞调用。

6 解决方案:异步 I/O

为了克服这一限制,许多现代操作系统引入了向磁盘系统发出 I/O 请求的新方法,一般称为异步 I/O。这些接口使应用程序能够发出 I/O 请求,并在 I/O 完成之前立即将控制权返回给调用者;其他接口使应用程序能够确定各种 I/O 是否已完成。

例如,让我们看看 Mac 提供的接口(其他系统也有类似的 API)。应用程序接口围绕着一个基本结构,即struct aiocbAIO 控制块(常用术语)。该结构的简化版本如下(更多信息请参阅手册):

1
2
3
4
5
6
struct aiocb {
    int aio_fildes; /* File descriptor */
    off_t aio_offset; /* File offset */
    volatile void *aio_buf; /* Location of buffer */
    size_t aio_nbytes; /* Length of transfer */
};

要对文件进行异步读取,应用程序应首先在此结构中填入相关信息:要读取文件的文件描述符(aio_fildes)、文件中的偏移量(aio_offset)以及请求的长度(aio_nbytes),最后是读取结果应复制到的目标内存位置(aio_buf)。

填入此结构后,应用程序必须发出异步调用来读取文件;在 Mac 上,此 API 只是异步读取 API:

1
int aio_read(struct aiocb *aiocbp);

该调用会尝试发出 I/O;如果成功,它就会立即返回,应用程序(即基于事件的服务器)可以继续工作。

不过,我们还必须解决最后一个难题。我们如何判断 I/O 是否已完成,从而确定缓冲区(aio_buf 指向的缓冲区)中已包含所请求的数据?这就需要最后一个 API。在 Mac 上,它被称为 aio_error()。该 API 如下所示:

1
int aio_error(const struct aiocb *aiocbp);

该系统调用检查 aiocbp 引用的请求是否已完成。如果是,则例程返回成功(用零表示);如果不是,则返回 EINPROGRESS。因此,对于每个未完成的异步 I/O,应用程序可以通过调用 aio_error() 定期轮询系统,以确定所述 I/O 是否尚未完成

您可能已经注意到的一件事是检查 I/O 是否已完成是一件很痛苦的事情;如果一个程序在给定时间点发出了数十或数百个 I/O,它是否应该简单地重复检查每个 I/O,或者先等待一会儿,或者……?

为了解决这个问题,一些系统提供了一种基于中断的方法。此方法使用 UNIX 信号来通知应用程序异步 I/O 何时完成,从而无需重复询问系统。

在没有异步 I/O 的系统中,纯粹基于事件的方法无法实现。不过,聪明的研究人员已经推导出了能很好代替它们的方法。例如,Pai 等人 描述了一种混合方法,其中事件用于处理网络数据包,线程池用于管理未完成的 I/O。详情请阅读他们的论文

UNIX 信号

在所有现代 UNIX 变体中,都有一个被称为信号的庞大而迷人的基础架构。最简单来说,信号提供了一种与进程通信的方式。具体来说,信号可以传递给应用程序;传递信号时,应用程序会停止正在进行的任何操作,以运行信号处理程序(即应用程序中处理该信号的代码)。处理完成后,进程将恢复之前的行为。

每个信号都有一个名称,如 HUP(挂起)、INT(中断)、SEGV(分段违规)等;详情请查看手册页面:man signal。有趣的是,有时内核本身也会发出信号。例如,当你的程序遇到分段违规时,操作系统会向其发送 SIGSEGV(在信号名称前加上 SIG 是很常见的);如果你的程序被配置为捕获该信号,你实际上可以运行一些代码来响应这种错误的程序行为(这对调试很有用)。当一个信号被发送到一个未配置为处理该信号的进程时,一些默认行为将被执行;对于 SEGV,该进程将被杀死。

下面是一个进入无限循环的简单程序,但它首先设置了一个信号处理程序来捕获 SIGHUP

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
#include <stdio.h>
#include <signal.h>

void handle(int arg) {
    printf("stop wakin’ me up...\n");
}

int main(int argc, char *argv[]) {
    signal(SIGHUP, handle);
    while (1)
        ; // doin’ nothin’ except catchin’ some sigs
    return 0;
}

你可以使用 kill 命令行工具向它发送信号。这样做会中断程序中的主 while 循环,并运行处理程序代码 handle()

1
2
3
4
❯ ./loop_signal&
[1] 66420
kill -HUP 66420
stop wakin’ me up...

7 另一个问题:状态管理

基于事件的方法的另一个问题是,这类代码的编写通常比传统的基于线程的代码复杂。原因如下:当一个事件处理程序发出异步 I/O 时,它必须打包一些程序状态,供下一个事件处理程序在 I/O 最终完成时使用;而在基于线程的程序中不需要这项额外工作,因为程序所需的状态在线程的栈中。Adya 等人将这项工作称为人工栈管理,它是基于事件的编程的基础。

为了更具体地说明这一点,让我们来看一个简单的例子:基于线程的服务器需要从文件描述符(fd)中读取数据,并在完成后将从文件中读取的数据写入网络套接字描述符(sd)。代码(忽略错误检查)如下:

1
2
int rc = read(fd, buffer, size);
rc = write(sd, buffer, size);

正如你所看到的,在多线程程序中,做这样的工作是轻而易举的;当 read() 最终返回时,代码会立即知道要写入哪个套接字,因为该信息就在线程的堆栈中(在变量 sd 中)。

而在基于事件的系统中,情况就没那么简单了。要执行同样的任务,我们首先要使用上述 AIO 调用异步发出读取指令。假设我们使用 aio_error() 调用定期检查读取是否完成;当该调用通知我们读取完成时,基于事件的服务器如何知道该做什么?

正如 Adya 等人所描述的那样,解决办法是使用一种古老的编程语言结构,即 continuation。虽然听起来很复杂,但其实想法很简单:基本上,在某个数据结构中记录完成处理该事件所需的信息;当事件发生时(即磁盘 I/O 完成时),查找所需的信息并处理该事件。

在这种特殊情况下,解决方案是在某种数据结构(如哈希表)中记录套接字描述符 (sd),并以文件描述符 (fd) 为索引。磁盘 I/O 完成后,事件处理程序将使用文件描述符查找continuation,并将套接字描述符的值返回给调用者。此时(最后),服务器就可以进行最后的工作,将数据写入套接字。

8 事件仍有哪些困难?

基于事件的方法还有一些其他困难需要提及。例如,当系统从单核 CPU 转向多核 CPU 时,基于事件的方法的一些简便性就消失了。具体来说,为了利用一个以上的 CPU,事件服务器必须并行运行多个事件处理程序;这样做时,通常的同步问题(如临界区)就会出现,必须采用通常的解决方案(如锁)。因此,在现代多核系统上,不加锁的简单事件处理已不再可能。

基于事件的方法的另一个问题是,它不能很好地与分页等某些类型的系统活动集成。例如,如果事件处理程序发生页面故障,它就会阻塞,因此服务器在页面故障完成之前不会取得进展。尽管服务器在结构上已经避免了显式阻塞,但页面故障导致的这种隐式阻塞是难以避免的,因此在普遍存在时会导致严重的性能问题。

第三个问题是,随着时间的推移,基于事件的代码可能难以管理,因为各种例程的确切语义会发生变化。例如,如果一个例程从非阻塞变为阻塞,那么调用该例程的事件处理程序也必须随之改变,以适应其新的性质,即把自己撕成两半。由于阻塞对于基于事件的服务器来说是灾难性的,因此程序员必须时刻注意每个事件所使用的应用程序接口在语义上的变化。

最后,虽然异步磁盘 I/O 现在已经可以在大多数平台上实现,但它需要很长的时间才能实现,而且它与异步网络 I/O 的集成方式也不像你想象的那么简单和统一。例如,虽然我们希望使用 select() 接口来管理所有未完成的 I/O,但通常需要将用于网络的 select() 和用于磁盘 I/O 的 AIO 调用结合起来使用。


相关内容

Buy me a coffee~
HeZephyr 支付宝支付宝
HeZephyr 微信微信
0%