协程概念参考 协程的概念及实现 ,C++ 20 的协程为无栈协程
协程语法
定义协程
C++20 将以下三种函数当成协程
有 co_await 表达式
有 co_yield 表达式
有 co_return 表达式
协程约束:
- 参数不能为可变参数
- 不能有
return 表达式 - 不能为常量函数
- 不能为构造函数,析构函数和
main 函数
协程的执行
一个协程涉及以下对象:
- Promise 类型对象,协程内部通过它来提交协程结果和异常,这个和
std::promise 没有任何关系 - 协程句柄,协程外部使用,用来恢复协程执行和销毁协程资源
- 协程的上下文,使用堆分配内存,包括:
- promise 对象
- 协程参数
- 协程挂起点的状态
- 协程局部变量
协程执行流程:
- 使用
operator new 分配协程上下文 - 拷贝所有函数参数到协程上下文,值传递的参数会被移动或复制,引用传递的参数保持引用(可能出现悬垂引用)
- 调用 Promise 对象的构造函数
- 调用
promise.get_return_object() ,保持返回值在局部变量中。调用的结果在协程第一次挂起时返回 - 调用
promise.initial_suspend() ,并对结果执行 co_await - 当 co_await promise.initial_suspend() 恢复执行时,开始运行协程的函数体。
- 当协程运行到挂起点时:
- 之前获得的返回对象在必要时经过隐式转换为协程的返回类型后,返回给调用方或恢复方
- 当协程到达
co_return 时,执行下面操作:- 针对
co_return; 或 co_return expr 而 expr 的类型为 void, 调用 promise.return_void() - 针对
co_return expr 而 expr 类型不为 void 时,调用 promise.return_value(expr) - 以创建相反的顺序销毁局部变量
- 调用
promise.final_suspend() 并对结果执行 co_await
协程执行出现异常的处理:
- 捕获所有异常,调用
promise.unhandled_exception() - 调用
promise.final_suspend() 并对结果执行 co_await
协程上下文销毁动作(co_return ,异常或协程句柄被释放):
- 调用 promise 对象的析构函数
- 调用函数参数的析构函数
- 调用
operator delete 释放内存 - 返回到协程被调用或被恢复的地方
协程上下文内存分配优化:
- 协程生命周期比调用者生命周期短
- 协程栈的大小在调用点已知
这种情况,协程上下文会放到调用者的内存空间
Promise 类型
编译器通过协程的返回类型确定协程所用到的 promise 类型
比如协程声明为 task<void> (int f) ,对应的 promise 类型为 std::coroutine_traits<task<void>, int>::promise_typ
co_await
co_await expr 挂起一个协程,将控制流返回给调用者, co_await 只能出现在函数体中(包括 lambda 函数)
co_await 处理流程:
- 将
expr 用下面规则转换为 awaitable :- 如果 expr 是由初始挂起点、最终挂起点或 yield 表达式产生的,则 awaitable 对象就是 expr 本身
- 否则如果当前协程的 promise 类型有
await_transform 成员函数,则 awaitable 对象是 promise.await_transform(expr) - 否则 awaitable 对象就是
expr
- 然后通过以下方式获取
awaitable 对象,如果 expr 为纯右值,则 awaiter 为临时对象,否则 awaiter 为该值的引用- 如果重载了
operator co_await ,则调用对应的重载函数:- 对于成员函数,调用
awaitable.operator co_await() - 对于非成员函数,调用
operator co_await(static_case<Awaitable&&>(awaitable))
- 否则如果没有找到重载函数,对象就是
awaitble 本身 - 最后如果重载存在歧义,则编译失败
- 调用
awaiter.await_ready() ,如果结果为 false ,协程会被挂起:- 此时调用
awaiter.await_suspend(handle) ,handle 表示当前协程句柄。此函数职责在 executor 上调度或恢复协程,根据 await_suspend 返回类型处理:- 返回
void ,控制立即返回到调用者 - 返回
bool 值, true 控制返回到调用者, false 则恢复当前协程 - 返回另一个协程的协程句柄,则该句柄会被恢复执行(通过调用
handle.resume() ) - 抛出异常,当前协程恢复执行,异常重新抛出
- 最后无论协程是否会被挂起,
awaiter.await_resume() 都会被调用,其返回值为整个 co_await expr 的结果
co_yield
co_yield 会返回挂起当前协程并返回一个值给调用者,等价于 await promise.yield_value()
使用例子
计算阶乘
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
| #include <coroutine>
#include <cstdint>
#include <exception>
#include <iostream>
struct MyGenerator {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;
struct promise_type {
uint64_t value_;
std::exception_ptr exception_;
MyGenerator get_return_object() {
return MyGenerator(handle_type::from_promise(*this));
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception(void) { exception_ = std::current_exception(); }
std::suspend_always yield_value(uint64_t value) {
value_ = value;
return {};
}
void return_void() {}
};
handle_type handle_;
MyGenerator(handle_type handle) : handle_(handle) {}
~MyGenerator() { handle_.destroy(); };
uint64_t operator()() {
handle_();
return handle_.promise().value_;
}
};
MyGenerator Factorial(uint64_t n) {
uint64_t ans = 1;
for (int i = 1; i <= n; i++) {
ans *= i;
co_yield ans;
}
}
int main(int argc, char* argv[]) {
auto fac = Factorial(3);
for (int i = 1; i <= 3; i++) {
std::cout << i << " factorial: " << fac() << std::endl;
}
return 0;
}
|
1 factorial: 1
2 factorial: 2
3 factorial: 6
异步任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
| #include <coroutine>
#include <cstdint>
#include <exception>
#include <iostream>
#include <thread>
#include <chrono>
using namespace std::chrono_literals;
auto Timeout(std::jthread& t, int seconds) {
struct Awaitable {
std::jthread* t;
int seconds;
bool await_ready() {
return false;
}
void await_suspend(std::coroutine_handle<> handle) {
*t = std::jthread([handle, seconds = this->seconds]{
std::cout << "thread sleep for " << seconds << "s" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(seconds));
std::cout << "thread sleep for " << seconds << "s finished" << std::endl;
// 任务完成,唤醒协程,resume 非线程安全,且不能在 await_suspend 返回前调用
handle.resume();
});
}
void await_resume() {}
};
return Awaitable{&t, seconds};
}
struct Task {
struct promise_type {
Task get_return_object() {
return {};
}
std::suspend_never initial_suspend() {
return {};
}
std::suspend_never final_suspend() noexcept {
return {};
}
void return_void() {}
void unhandled_exception() {}
};
};
Task AsyncSleep(std::jthread& t, int seconds) {
std::cout << "Timeout start" << std::endl;
co_await Timeout(t, seconds);
std::cout << "Timeout end" << std::endl;
}
int main(int argc, char* argv[]) {
std::jthread t;
std::cout << "before async sleep" << std::endl;
AsyncSleep(t, 2);
std::cout << "after async sleep" << std::endl;
return 0;
}
|
before async sleep
Timeout start
after async sleep
thread sleep for 2s
thread sleep for 2s finished
Timeout end