简介

有时候 C++ 插件会执行非常耗时的任务,为了避免阻塞 Node.js 的事件循环,我们应该异步的运行耗时的任务。

对于 C++ 写的 Node.js 插件,有两种方式完成耗时的异步任务:

  1. 依赖 Node.js 的异步能力,Node.js 会起多个线程,其中一个主线程运行事件循环,其它线程为工作线程,来执行耗时任务。
  2. 在 C++ 里自己起一个线程运行耗时任务。

一般的异步任务可以用第一种方式完成,但是 Node.js 起的工作线程有限,且还需要运行其它任务,如果需要开更多的线程可以使用第二种方式。

有时我们不得不用第二种方式,比如我们用一个存在的 C++ 库时,需要监听它从另一个线程发送事件,这种情况就需要用第二种方式来将消息发送到 Node.js 的主线程。

下面来介绍这两种方法具体怎么做。

使用 Node.js 的工作线程

node-addon-api 提供了 AsyncWorker 这个抽象类来完成异步任务,AsyncWorker 的流程如下图所示:

start
:New AsyncWorker;
->
:调用 Queue;
-> Node.js 调用
:执行 OnExecute;
if (执行是否成功) then (成功)
        :执行 OnOK;
else (失败)
        :执行 OnError;
endif
:执行完成;
if (调用 SupressDestruct) then (是)
        :执行完成;
else (否)
        :delete AsyncWorker;
endif
end

代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <napi.h>

unsigned long long Fibonacci(int n) {
    unsigned long long a = 0, b = 1;
    for (int i = 0; i < n; i++) {
        unsigned long long tmp = a;
        a = b;
        b = tmp + b;
    }
    return a;
}

class FibonacciAsyncWorker : public Napi::AsyncWorker {
    public:
    FibonacciAsyncWorker(Napi::Function& callback, int n)
    : AsyncWorker(callback), n_(n) {}

    void Execute() override {
        result = Fibonacci(n_);
    }

    void OnOK() override {
        Callback().Call({Env().Null(), Napi::Number::New(Env(), result)});
    }

    private:
    int n_;
    unsigned long long result;
};

Napi::Value ExportFibonacci(const Napi::CallbackInfo& info) {
    int n = info[0].As<Napi::Number>();
    Napi::Function callback = info[1].As<Napi::Function>();
    FibonacciAsyncWorker* worker = new FibonacciAsyncWorker(callback, n);
    worker->Queue();
    return Napi::String::New(info.Env(), "start calc fibonacci");
}

Napi::Object Init(Napi::Env env, Napi::Object exports) {
    exports["runFibonacciWorker"] = Napi::Function::New(env, ExportFibonacci, std::string("runFibonacciWorker"));
    return exports;
}

NODE_API_MODULE(fibonacci, Init);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
const fibonacci = require("./build/fibonacci.node")

message = fibonacci.runFibonacciWorker(20, (error, result) => {
    if (error) {
        console.log("worker return error")
    } else {
        console.log("result: ", result)
    }
})

console.log(message)

输出结果如下:

start calc fibonacci result: 6765

使用 C++ 线程

使用 C++ 线程最关键的还是任务执行完成后如何调用 Node.js 的回调函数,同样 node-addon-api 提供了 ThreadSafeFunction 这个类来完成 C++ 线程和 Node.js 主线程的通信。

ThreadSafeFunction::New() 会创建一个持久的引用对象,这个对象会有一个 JavaScript 回调函数,这个函数可以被多个线程异步的调用。

ThreadSafeFunction 在两种情况下会被析构掉:

  1. 所有用到 ThreadSafeFunction 对象的线程都调用了该对象的 Release() 方法。
  2. 调用 ThreadSafeFunction 对象的 BlockingCallNonBlockingCall 方法返回 napi_closing 状态。

Release() 方法应该最后被调用,因为它被调用后就不能保证 ThreadSafeFunction 没有析构。

成员方法

构造函数

  1. 创建一个空的 ThreadSafeFunction 实例

    1
    
    Napi::Function::ThreadSafeFunction();
    
  2. 从存在的 ThreadSafeFunction 中创建实例

1
Napi::ThreadSafeFunction::ThreadSafeFunction(napi_threadsafe_function tsfn);

这样创建的实例只能调用 Blocking(void*) / NonBlocking(void*) ,=tsfn= 携带的数据不能再使用。

New

根据传入参数,创建一个实例。

1
2
3
4
5
6
7
8
9
New(napi_env env,
    const Function& callback,
    const Object& resource,
    ResourceString resourceName,
    size_t maxQueueSize,
    size_t initialThreadCount,
    ContextType* context,
    Finalizer finalizeCallback,
    FinalizerDataType* data);
  • env: 构造 ThreadSafeFunctionnapi_env 环境
  • callback: 将从其它线程调用的回调
  • [optional] resource: 和异步任务关联的对象,该对象传递给可能的 async_hooks init 钩子
  • resourceName: 资源标识符,用于诊断信息
  • maxQueueSize: 队列的最大值,0 表示不限制
  • initialThreadCount: 使用此函数的初始线程数量,包括主线程
  • [optional] context: 附加数据
  • [optional]finalizeCallback: ThreadSafeFunction 析构时执行的回调,这个回调在主线程进行
  • [optional] data: 传给 finalizeCallback 的数据

Acquire

显示的表明一个新的线程将使用 ThreadSafeFunction 对象。

1
napi_status Napi::ThreadSafeFunction::Acquire()

返回值:

  • napi_ok: 成功请求
  • napi_closing: 请求失败

Release

显示表明当前线程不再使用 ThreadSafeFunction 对象。

1
napi_status Napi::ThreadSafeFunction::Release()

返回值:

  • napi_ok: 成功 release
  • napi_invalid_arg: ThreadSafeFunction 对象的线程计数为 0
  • napi_generic_failure: release 时出现错误

Abort

终止 ThreadSafeFunction 对象的使用,除了 Release 的所有 API 都会返回 napi_closing

1
napi_status Napi::ThreadSafeFunction::Abort()

BlockingCall/NonBlockingCall

调用 JS 函数,使用阻塞或者非阻塞的机制

  • BlockingCall(): 将会阻塞直到队列有额外的空间。如果 maxQueueSize 为 0, 这个函数不会阻塞。
  • NonBlockingCall(): 如果队列已满,返回 napi_queue_full

New 创建的 ThreadSafeFunction 只能用下面的重载

1
2
3
napi_status Napi::ThreadSafeFunction::BlockingCall(DataType* data, Callback callback) const

napi_status Napi::ThreadSafeFunction::NonBlockingCall(DataType* data, Callback callback) const
  • [optional] data: 传递给 callback 的数据
  • [optional] callback: 在主线程被调用的 C++ 函数。这个 callback 会收到 ThreadSafeFunction 的 JS 回调和数据类型指针作为参数。简单来讲,=bacllback= 的格式为 void operator()(Napi::Env env, Function jsCallback, DataType* data)

返回值:

  • napi_ok: 成功加到队列中
  • napi_queue_full: 队列已满
  • napi_closing: ThreadSafeFunction 不再接受调用
  • napi_invalid_arg: ThreadSafeFunction 已经关闭
  • napi_generic_faulure: 出现错误

代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <napi.h>
#include <thread>

std::thread native_thread;
Napi::ThreadSafeFunction tsfn;

unsigned long long Fibonacci(int n) {
    unsigned long long a = 0, b = 1;
    for (int i = 0; i < n; i++) {
        unsigned long long tmp = a;
        a = b;
        b = tmp + b;
    }
    return a;
}

Napi::Value ExportFibonacci(const Napi::CallbackInfo& info) {
    int n = info[0].As<Napi::Number>();
    Napi::Function callback = info[1].As<Napi::Function>();

    // 创建一个 ThreadSafeFunction
    tsfn = Napi::ThreadSafeFunction::New(
        info.Env(),
        callback, // js 的回调
        "Resource name",
        0, // 无限制
        1, //
        [](Napi::Env) {
            native_thread.join();
        });

    native_thread = std::thread([n]{
        auto callback = [](Napi::Env env, Napi::Function js_callback, unsigned long long* result) {
            js_callback.Call({env.Null(), Napi::Number::New(env, *result)});
            delete result;
        };
        unsigned long long* value = new unsigned long long(Fibonacci(n));

        napi_status status = tsfn.BlockingCall(value, callback);
        if (status != napi_ok) {
            // 处理错误

        }
        tsfn.Release();
    });

    return Napi::String::New(info.Env(), "start calc fibonacci with native thread");
}

Napi::Object Init(Napi::Env env, Napi::Object exports) {
    exports["runFibonacciWorker"] = Napi::Function::New(env, ExportFibonacci, std::string("runFibonacciWorker"));
    return exports;
}

NODE_API_MODULE(fibonacci, Init);