介绍
Reactor 模式是一种非阻塞 IO(non-blocking IO) + IO 复用(IO multiplexing)模型, 程序
的基本结构是一个事件循环(event loop),以事件驱动(event-driven)和事件回调的方式实
现业务逻辑。
使用这种模式需要转换思维模式,把原来 主动调用 recv()来接收数据,主动调用 accept()
来接收新连接,主动调用 send()发送数据 的思路转换成 注册一个收数据的回调,收到数
据就会回调注册的函数;注册一个接收连接的回调,接收新的连接就会回掉函数,发送数据
时,只管往连接中写,低层库会负责无阻塞的发送
1
2
3
4
5
6
7
8
9
10
11
12
|
while (!quit) {
int timeout_ms = 1000;
int retval = poll(fds, nfds, timeout_ms);
if (retval < 0) {
// 处理错误, 回调用户的 error handler
} else {
// 处理到期的timer,回调用户的 timer handler
if (retval > 0) {
// 处理IO事件,回掉用户的 IO event handler
}
}
}
|
Reactor 的核心
Channel: 负责一个文件描述符的各个事件的处理。
Poller: IO multiplexing, 通过 Poller 拿到 IO 事件,具体可使用 poll,select,epoll 等。
EventLoop: 循环通过 Poller 拿到活动的 Channel,处理每个活动 Channel 上的事件。
1
2
3
4
5
6
7
8
|
void loop() {
while (!quit) {
activeChannels = Poller(timeout);
for (auto channel : activeChannels) {
channel.handleEvent()
}
}
}
|
上面是整个事件循环的伪代码,下面是一个时序图。
下面的伪代码利用上面的 loop()实现一个单次响应的定时器。
1
2
3
4
5
6
7
8
9
|
EventLoop loop; // 定义一个事件循环
int timer_fd = timerfd_create(...); // 创建一个fd, 表示时间
// 初始化时间
timerfd_settime(timerfd, time_long);
Channel channel(&loop, timer_fd); // 创建一个关心timer_fd 的channel
channel.setCallback(callback); // 设置回调函数
loop.loop(); // 循环开始
// 循环结束
close(timer_fd); // 关闭文件描述符
|
注:channel 结束时不会关闭它拥有的fd, fd 由上层管理。