简介
有时候 C++ 插件会执行非常耗时的任务,为了避免阻塞 Node.js 的事件循环,我们应该异步的运行耗时的任务。
对于 C++ 写的 Node.js 插件,有两种方式完成耗时的异步任务:
- 依赖 Node.js 的异步能力,Node.js 会起多个线程,其中一个主线程运行事件循环,其它线程为工作线程,来执行耗时任务。
 - 在 C++ 里自己起一个线程运行耗时任务。
 
一般的异步任务可以用第一种方式完成,但是 Node.js 起的工作线程有限,且还需要运行其它任务,如果需要开更多的线程可以使用第二种方式。
有时我们不得不用第二种方式,比如我们用一个存在的 C++ 库时,需要监听它从另一个线程发送事件,这种情况就需要用第二种方式来将消息发送到 Node.js 的主线程。
下面来介绍这两种方法具体怎么做。
使用 Node.js 的工作线程
node-addon-api 提供了 AsyncWorker 这个抽象类来完成异步任务,AsyncWorker 的流程如下图所示:
start
:New AsyncWorker;
->
:调用 Queue;
-> Node.js 调用
:执行 OnExecute;
if (执行是否成功) then (成功)
        :执行 OnOK;
else (失败)
        :执行 OnError;
endif
:执行完成;
if (调用 SupressDestruct) then (是)
        :执行完成;
else (否)
        :delete AsyncWorker;
endif
end
代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  | #include <napi.h>
unsigned long long Fibonacci(int n) {
    unsigned long long a = 0, b = 1;
    for (int i = 0; i < n; i++) {
        unsigned long long tmp = a;
        a = b;
        b = tmp + b;
    }
    return a;
}
class FibonacciAsyncWorker : public Napi::AsyncWorker {
    public:
    FibonacciAsyncWorker(Napi::Function& callback, int n)
    : AsyncWorker(callback), n_(n) {}
    void Execute() override {
        result = Fibonacci(n_);
    }
    void OnOK() override {
        Callback().Call({Env().Null(), Napi::Number::New(Env(), result)});
    }
    private:
    int n_;
    unsigned long long result;
};
Napi::Value ExportFibonacci(const Napi::CallbackInfo& info) {
    int n = info[0].As<Napi::Number>();
    Napi::Function callback = info[1].As<Napi::Function>();
    FibonacciAsyncWorker* worker = new FibonacciAsyncWorker(callback, n);
    worker->Queue();
    return Napi::String::New(info.Env(), "start calc fibonacci");
}
Napi::Object Init(Napi::Env env, Napi::Object exports) {
    exports["runFibonacciWorker"] = Napi::Function::New(env, ExportFibonacci, std::string("runFibonacciWorker"));
    return exports;
}
NODE_API_MODULE(fibonacci, Init);
  | 
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
  | const fibonacci = require("./build/fibonacci.node")
message = fibonacci.runFibonacciWorker(20, (error, result) => {
    if (error) {
        console.log("worker return error")
    } else {
        console.log("result: ", result)
    }
})
console.log(message)
  | 
输出结果如下:
start calc fibonacci
result: 6765
使用 C++ 线程
使用 C++ 线程最关键的还是任务执行完成后如何调用 Node.js 的回调函数,同样 node-addon-api
提供了 ThreadSafeFunction 这个类来完成 C++ 线程和 Node.js 主线程的通信。
ThreadSafeFunction::New() 会创建一个持久的引用对象,这个对象会有一个 JavaScript 回调函数,这个函数可以被多个线程异步的调用。
ThreadSafeFunction 在两种情况下会被析构掉:
- 所有用到 
ThreadSafeFunction 对象的线程都调用了该对象的 Release() 方法。 - 调用 
ThreadSafeFunction 对象的 BlockingCall 或 NonBlockingCall 方法返回 napi_closing 状态。 
Release() 方法应该最后被调用,因为它被调用后就不能保证 ThreadSafeFunction 没有析构。
成员方法
构造函数
创建一个空的 ThreadSafeFunction 实例
1
  | Napi::Function::ThreadSafeFunction();
  | 
从存在的 ThreadSafeFunction 中创建实例
1
  | Napi::ThreadSafeFunction::ThreadSafeFunction(napi_threadsafe_function tsfn);
  | 
这样创建的实例只能调用 Blocking(void*) / NonBlocking(void*) ,=tsfn= 携带的数据不能再使用。
New
根据传入参数,创建一个实例。
1
2
3
4
5
6
7
8
9
  | New(napi_env env,
    const Function& callback,
    const Object& resource,
    ResourceString resourceName,
    size_t maxQueueSize,
    size_t initialThreadCount,
    ContextType* context,
    Finalizer finalizeCallback,
    FinalizerDataType* data);
  | 
env: 构造 ThreadSafeFunction 的 napi_env 环境callback: 将从其它线程调用的回调[optional] resource: 和异步任务关联的对象,该对象传递给可能的 async_hooks init 钩子resourceName: 资源标识符,用于诊断信息maxQueueSize: 队列的最大值,0 表示不限制initialThreadCount: 使用此函数的初始线程数量,包括主线程[optional] context: 附加数据[optional]finalizeCallback: ThreadSafeFunction 析构时执行的回调,这个回调在主线程进行[optional] data: 传给 finalizeCallback 的数据
Acquire
显示的表明一个新的线程将使用 ThreadSafeFunction 对象。
1
  | napi_status Napi::ThreadSafeFunction::Acquire()
  | 
返回值:
napi_ok: 成功请求napi_closing: 请求失败
Release
显示表明当前线程不再使用 ThreadSafeFunction 对象。
1
  | napi_status Napi::ThreadSafeFunction::Release()
  | 
返回值:
napi_ok: 成功 releasenapi_invalid_arg: ThreadSafeFunction 对象的线程计数为 0napi_generic_failure: release 时出现错误
Abort
终止 ThreadSafeFunction 对象的使用,除了 Release 的所有 API 都会返回 napi_closing
1
  | napi_status Napi::ThreadSafeFunction::Abort()
  | 
BlockingCall/NonBlockingCall
调用 JS 函数,使用阻塞或者非阻塞的机制
BlockingCall(): 将会阻塞直到队列有额外的空间。如果 maxQueueSize 为 0, 这个函数不会阻塞。NonBlockingCall(): 如果队列已满,返回 napi_queue_full
用 New 创建的 ThreadSafeFunction 只能用下面的重载
1
2
3
  | napi_status Napi::ThreadSafeFunction::BlockingCall(DataType* data, Callback callback) const
napi_status Napi::ThreadSafeFunction::NonBlockingCall(DataType* data, Callback callback) const
  | 
[optional] data: 传递给 callback 的数据[optional] callback: 在主线程被调用的 C++ 函数。这个 callback 会收到 ThreadSafeFunction 的
JS 回调和数据类型指针作为参数。简单来讲,=bacllback= 的格式为 void operator()(Napi::Env env, Function jsCallback, DataType* data) 。
返回值:
napi_ok: 成功加到队列中napi_queue_full: 队列已满napi_closing: ThreadSafeFunction 不再接受调用napi_invalid_arg: ThreadSafeFunction 已经关闭napi_generic_faulure: 出现错误
代码
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
  | #include <napi.h>
#include <thread>
std::thread native_thread;
Napi::ThreadSafeFunction tsfn;
unsigned long long Fibonacci(int n) {
    unsigned long long a = 0, b = 1;
    for (int i = 0; i < n; i++) {
        unsigned long long tmp = a;
        a = b;
        b = tmp + b;
    }
    return a;
}
Napi::Value ExportFibonacci(const Napi::CallbackInfo& info) {
    int n = info[0].As<Napi::Number>();
    Napi::Function callback = info[1].As<Napi::Function>();
    // 创建一个 ThreadSafeFunction
    tsfn = Napi::ThreadSafeFunction::New(
        info.Env(),
        callback, // js 的回调
        "Resource name",
        0, // 无限制
        1, //
        [](Napi::Env) {
            native_thread.join();
        });
    native_thread = std::thread([n]{
        auto callback = [](Napi::Env env, Napi::Function js_callback, unsigned long long* result) {
            js_callback.Call({env.Null(), Napi::Number::New(env, *result)});
            delete result;
        };
        unsigned long long* value = new unsigned long long(Fibonacci(n));
        napi_status status = tsfn.BlockingCall(value, callback);
        if (status != napi_ok) {
            // 处理错误
        }
        tsfn.Release();
    });
    return Napi::String::New(info.Env(), "start calc fibonacci with native thread");
}
Napi::Object Init(Napi::Env env, Napi::Object exports) {
    exports["runFibonacciWorker"] = Napi::Function::New(env, ExportFibonacci, std::string("runFibonacciWorker"));
    return exports;
}
NODE_API_MODULE(fibonacci, Init);
  |