协程概念参考 协程的概念及实现 ,C++ 20 的协程为无栈协程

协程语法

定义协程

C++20 将以下三种函数当成协程

  1. co_await 表达式

  2. co_yield 表达式

  3. co_return 表达式

协程约束:

  • 参数不能为可变参数
  • 不能有 return 表达式
  • 不能为常量函数
  • 不能为构造函数,析构函数和 main 函数

协程的执行

一个协程涉及以下对象:

  • Promise 类型对象,协程内部通过它来提交协程结果和异常,这个和 std::promise 没有任何关系
  • 协程句柄,协程外部使用,用来恢复协程执行和销毁协程资源
  • 协程的上下文,使用堆分配内存,包括:
    • promise 对象
    • 协程参数
    • 协程挂起点的状态
    • 协程局部变量

协程执行流程:

  1. 使用 operator new 分配协程上下文
  2. 拷贝所有函数参数到协程上下文,值传递的参数会被移动或复制,引用传递的参数保持引用(可能出现悬垂引用)
  3. 调用 Promise 对象的构造函数
  4. 调用 promise.get_return_object() ,保持返回值在局部变量中。调用的结果在协程第一次挂起时返回
  5. 调用 promise.initial_suspend() ,并对结果执行 co_await
  6. 当 co_await promise.initial_suspend() 恢复执行时,开始运行协程的函数体。
  7. 当协程运行到挂起点时:
    • 之前获得的返回对象在必要时经过隐式转换为协程的返回类型后,返回给调用方或恢复方
  8. 当协程到达 co_return 时,执行下面操作:
    • 针对 co_return;co_return exprexpr 的类型为 void, 调用 promise.return_void()
    • 针对 co_return exprexpr 类型不为 void 时,调用 promise.return_value(expr)
    • 以创建相反的顺序销毁局部变量
    • 调用 promise.final_suspend() 并对结果执行 co_await

协程执行出现异常的处理:

  1. 捕获所有异常,调用 promise.unhandled_exception()
  2. 调用 promise.final_suspend() 并对结果执行 co_await

协程上下文销毁动作(co_return ,异常或协程句柄被释放):

  • 调用 promise 对象的析构函数
  • 调用函数参数的析构函数
  • 调用 operator delete 释放内存
  • 返回到协程被调用或被恢复的地方

协程上下文内存分配优化:

  • 协程生命周期比调用者生命周期短
  • 协程栈的大小在调用点已知

这种情况,协程上下文会放到调用者的内存空间

Promise 类型

编译器通过协程的返回类型确定协程所用到的 promise 类型

比如协程声明为 task<void> (int f) ,对应的 promise 类型为 std::coroutine_traits<task<void>, int>::promise_typ

co_await

co_await expr 挂起一个协程,将控制流返回给调用者, co_await 只能出现在函数体中(包括 lambda 函数)

co_await 处理流程:

  1. expr 用下面规则转换为 awaitable
    • 如果 expr 是由初始挂起点、最终挂起点或 yield 表达式产生的,则 awaitable 对象就是 expr 本身
    • 否则如果当前协程的 promise 类型有 await_transform 成员函数,则 awaitable 对象是 promise.await_transform(expr)
    • 否则 awaitable 对象就是 expr
  2. 然后通过以下方式获取 awaitable 对象,如果 expr 为纯右值,则 awaiter 为临时对象,否则 awaiter 为该值的引用
    • 如果重载了 operator co_await ,则调用对应的重载函数:
      • 对于成员函数,调用 awaitable.operator co_await()
      • 对于非成员函数,调用 operator co_await(static_case<Awaitable&&>(awaitable))
    • 否则如果没有找到重载函数,对象就是 awaitble 本身
    • 最后如果重载存在歧义,则编译失败
  3. 调用 awaiter.await_ready() ,如果结果为 false ,协程会被挂起:
    • 此时调用 awaiter.await_suspend(handle) ,handle 表示当前协程句柄。此函数职责在 executor 上调度或恢复协程,根据 await_suspend 返回类型处理:
      • 返回 void ,控制立即返回到调用者
      • 返回 bool 值, true 控制返回到调用者, false 则恢复当前协程
      • 返回另一个协程的协程句柄,则该句柄会被恢复执行(通过调用 handle.resume()
      • 抛出异常,当前协程恢复执行,异常重新抛出
  4. 最后无论协程是否会被挂起, awaiter.await_resume() 都会被调用,其返回值为整个 co_await expr 的结果

co_yield

co_yield 会返回挂起当前协程并返回一个值给调用者,等价于 await promise.yield_value()

使用例子

计算阶乘

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#include <coroutine>
#include <cstdint>
#include <exception>
#include <iostream>

struct MyGenerator {

    struct promise_type;
    using handle_type = std::coroutine_handle<promise_type>;

    struct promise_type {
        uint64_t value_;
        std::exception_ptr exception_;

        MyGenerator get_return_object() {
            return MyGenerator(handle_type::from_promise(*this));
        }

        std::suspend_always initial_suspend() { return {}; }
        std::suspend_always final_suspend() noexcept { return {}; }
        void unhandled_exception(void) { exception_ = std::current_exception(); }

        std::suspend_always yield_value(uint64_t value) {
            value_ = value;
            return {};
        }

        void return_void() {}

    };

    handle_type handle_;

    MyGenerator(handle_type handle) : handle_(handle) {}
    ~MyGenerator() { handle_.destroy(); };

    uint64_t operator()() {
        handle_();
        return handle_.promise().value_;
    }
};


MyGenerator Factorial(uint64_t n) {
    uint64_t ans = 1;
    for (int i = 1; i <= n; i++) {
        ans *= i;
        co_yield ans;
    }
}

int main(int argc, char* argv[]) {

    auto fac = Factorial(3);
    for (int i = 1; i <= 3; i++) {
        std::cout << i << " factorial: " << fac() << std::endl;
    }
    return 0;
}

1 factorial: 1 2 factorial: 2 3 factorial: 6

异步任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <coroutine>
#include <cstdint>
#include <exception>
#include <iostream>
#include <thread>
#include <chrono>

using namespace std::chrono_literals;

auto Timeout(std::jthread& t, int seconds) {
    struct Awaitable {
        std::jthread* t;
        int seconds;

        bool await_ready() {
            return false;
        }

        void await_suspend(std::coroutine_handle<> handle) {
            *t = std::jthread([handle, seconds = this->seconds]{
                std::cout << "thread sleep for " << seconds << "s" << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(seconds));
                std::cout << "thread sleep for " << seconds << "s finished" << std::endl;
                // 任务完成,唤醒协程,resume 非线程安全,且不能在 await_suspend 返回前调用
                handle.resume();
            });
        }
        void await_resume() {}
    };

    return Awaitable{&t, seconds};
}

struct Task {
    struct promise_type {
        Task get_return_object() {
            return {};
        }
        std::suspend_never initial_suspend() {
            return {};
        }
        std::suspend_never final_suspend() noexcept {
            return {};
        }
        void return_void() {}
        void unhandled_exception() {}
    };
};

Task AsyncSleep(std::jthread& t, int seconds) {
    std::cout << "Timeout start" << std::endl;
    co_await Timeout(t, seconds);
    std::cout << "Timeout end" << std::endl;
}

int main(int argc, char* argv[]) {

    std::jthread t;
    std::cout << "before async sleep" << std::endl;
    AsyncSleep(t, 2);
    std::cout << "after async sleep" << std::endl;

    return 0;
}

before async sleep Timeout start after async sleep thread sleep for 2s thread sleep for 2s finished Timeout end