介绍

Reactor 模式是一种非阻塞 IO(non-blocking IO) + IO 复用(IO multiplexing)模型, 程序的基本结构是一个事件循环(event loop),以事件驱动(event-driven)和事件回调的方式实现业务逻辑。

使用这种模式需要转换思维模式,把原来 主动调用 recv()来接收数据,主动调用 accept() 来接收新连接,主动调用 send()发送数据 的思路转换成 注册一个收数据的回调,收到数 据就会回调注册的函数;注册一个接收连接的回调,接收新的连接就会回掉函数,发送数据 时,只管往连接中写,低层库会负责无阻塞的发送

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
while (!quit) {
  int timeout_ms = 1000;
  int retval = poll(fds, nfds, timeout_ms);
  if (retval < 0) {
    // 处理错误, 回调用户的 error handler
  } else {
    // 处理到期的timer,回调用户的 timer handler
    if (retval > 0) {
      // 处理IO事件,回掉用户的 IO event handler
    }
  }
}

Reactor 的核心

Channel: 负责一个文件描述符的各个事件的处理。 Poller: IO multiplexing, 通过 Poller 拿到 IO 事件,具体可使用 poll,select,epoll 等。 EventLoop: 循环通过 Poller 拿到活动的 Channel,处理每个活动 Channel 上的事件。

1
2
3
4
5
6
7
8
void loop() {
  while (!quit) {
    activeChannels = Poller(timeout);
    for (auto channel : activeChannels) {
      channel.handleEvent()
    }
  }
}

上面是整个事件循环的伪代码,下面是一个时序图。

actor client
entity EventLoop
Entity Poller
Entity ChannelA
Entity ChannelB

client -> EventLoop : loop()
EventLoop -> Poller : poll()
Poller --> EventLoop : activeChannels
EventLoop -> ChannelA : handleEvent()
ChannelA -> : user callback
EventLoop -> ChannelB : handleEvent()
ChannelB -> : user callback

下面的伪代码利用上面的 loop()实现一个单次响应的定时器。

1
2
3
4
5
6
7
8
9
EventLoop loop; // 定义一个事件循环
int timer_fd = timerfd_create(...); // 创建一个fd, 表示时间
// 初始化时间
timerfd_settime(timerfd, time_long);
Channel channel(&loop, timer_fd); // 创建一个关心timer_fd 的channel
channel.setCallback(callback); // 设置回调函数
loop.loop(); // 循环开始
// 循环结束
close(timer_fd); // 关闭文件描述符

注:channel 结束时不会关闭它拥有的 fd, fd 由上层管理。