简介
有时候 C++ 插件会执行非常耗时的任务,为了避免阻塞 Node.js 的事件循环,我们应该异步的运行耗时的任务。
对于 C++ 写的 Node.js 插件,有两种方式完成耗时的异步任务:
- 依赖 Node.js 的异步能力,Node.js 会起多个线程,其中一个主线程运行事件循环,其它线程为工作线程,来执行耗时任务。
- 在 C++ 里自己起一个线程运行耗时任务。
一般的异步任务可以用第一种方式完成,但是 Node.js 起的工作线程有限,且还需要运行其它任务,如果需要开更多的线程可以使用第二种方式。
有时我们不得不用第二种方式,比如我们用一个存在的 C++ 库时,需要监听它从另一个线程发送事件,这种情况就需要用第二种方式来将消息发送到 Node.js 的主线程。
下面来介绍这两种方法具体怎么做。
使用 Node.js 的工作线程
node-addon-api 提供了 AsyncWorker 这个抽象类来完成异步任务,AsyncWorker 的流程如下图所示:
start
:New AsyncWorker;
->
:调用 Queue;
-> Node.js 调用
:执行 OnExecute;
if (执行是否成功) then (成功)
:执行 OnOK;
else (失败)
:执行 OnError;
endif
:执行完成;
if (调用 SupressDestruct) then (是)
:执行完成;
else (否)
:delete AsyncWorker;
endif
end
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
| #include <napi.h>
unsigned long long Fibonacci(int n) {
unsigned long long a = 0, b = 1;
for (int i = 0; i < n; i++) {
unsigned long long tmp = a;
a = b;
b = tmp + b;
}
return a;
}
class FibonacciAsyncWorker : public Napi::AsyncWorker {
public:
FibonacciAsyncWorker(Napi::Function& callback, int n)
: AsyncWorker(callback), n_(n) {}
void Execute() override {
result = Fibonacci(n_);
}
void OnOK() override {
Callback().Call({Env().Null(), Napi::Number::New(Env(), result)});
}
private:
int n_;
unsigned long long result;
};
Napi::Value ExportFibonacci(const Napi::CallbackInfo& info) {
int n = info[0].As<Napi::Number>();
Napi::Function callback = info[1].As<Napi::Function>();
FibonacciAsyncWorker* worker = new FibonacciAsyncWorker(callback, n);
worker->Queue();
return Napi::String::New(info.Env(), "start calc fibonacci");
}
Napi::Object Init(Napi::Env env, Napi::Object exports) {
exports["runFibonacciWorker"] = Napi::Function::New(env, ExportFibonacci, std::string("runFibonacciWorker"));
return exports;
}
NODE_API_MODULE(fibonacci, Init);
|
1
2
3
4
5
6
7
8
9
10
11
| const fibonacci = require("./build/fibonacci.node")
message = fibonacci.runFibonacciWorker(20, (error, result) => {
if (error) {
console.log("worker return error")
} else {
console.log("result: ", result)
}
})
console.log(message)
|
输出结果如下:
start calc fibonacci
result: 6765
使用 C++ 线程
使用 C++ 线程最关键的还是任务执行完成后如何调用 Node.js 的回调函数,同样 node-addon-api
提供了 ThreadSafeFunction 这个类来完成 C++ 线程和 Node.js 主线程的通信。
ThreadSafeFunction::New()
会创建一个持久的引用对象,这个对象会有一个 JavaScript 回调函数,这个函数可以被多个线程异步的调用。
ThreadSafeFunction 在两种情况下会被析构掉:
- 所有用到
ThreadSafeFunction
对象的线程都调用了该对象的 Release()
方法。 - 调用
ThreadSafeFunction
对象的 BlockingCall
或 NonBlockingCall
方法返回 napi_closing
状态。
Release()
方法应该最后被调用,因为它被调用后就不能保证 ThreadSafeFunction
没有析构。
成员方法
构造函数
创建一个空的 ThreadSafeFunction
实例
1
| Napi::Function::ThreadSafeFunction();
|
从存在的 ThreadSafeFunction
中创建实例
1
| Napi::ThreadSafeFunction::ThreadSafeFunction(napi_threadsafe_function tsfn);
|
这样创建的实例只能调用 Blocking(void*) / NonBlocking(void*)
,=tsfn= 携带的数据不能再使用。
New
根据传入参数,创建一个实例。
1
2
3
4
5
6
7
8
9
| New(napi_env env,
const Function& callback,
const Object& resource,
ResourceString resourceName,
size_t maxQueueSize,
size_t initialThreadCount,
ContextType* context,
Finalizer finalizeCallback,
FinalizerDataType* data);
|
env
: 构造 ThreadSafeFunction
的 napi_env
环境callback
: 将从其它线程调用的回调[optional] resource
: 和异步任务关联的对象,该对象传递给可能的 async_hooks
init 钩子resourceName
: 资源标识符,用于诊断信息maxQueueSize
: 队列的最大值,0 表示不限制initialThreadCount
: 使用此函数的初始线程数量,包括主线程[optional] context
: 附加数据[optional]finalizeCallback
: ThreadSafeFunction
析构时执行的回调,这个回调在主线程进行[optional] data
: 传给 finalizeCallback
的数据
Acquire
显示的表明一个新的线程将使用 ThreadSafeFunction
对象。
1
| napi_status Napi::ThreadSafeFunction::Acquire()
|
返回值:
napi_ok
: 成功请求napi_closing
: 请求失败
Release
显示表明当前线程不再使用 ThreadSafeFunction
对象。
1
| napi_status Napi::ThreadSafeFunction::Release()
|
返回值:
napi_ok
: 成功 releasenapi_invalid_arg
: ThreadSafeFunction
对象的线程计数为 0napi_generic_failure
: release 时出现错误
Abort
终止 ThreadSafeFunction
对象的使用,除了 Release 的所有 API 都会返回 napi_closing
1
| napi_status Napi::ThreadSafeFunction::Abort()
|
BlockingCall/NonBlockingCall
调用 JS 函数,使用阻塞或者非阻塞的机制
BlockingCall()
: 将会阻塞直到队列有额外的空间。如果 maxQueueSize
为 0, 这个函数不会阻塞。NonBlockingCall()
: 如果队列已满,返回 napi_queue_full
用 New
创建的 ThreadSafeFunction
只能用下面的重载
1
2
3
| napi_status Napi::ThreadSafeFunction::BlockingCall(DataType* data, Callback callback) const
napi_status Napi::ThreadSafeFunction::NonBlockingCall(DataType* data, Callback callback) const
|
[optional] data
: 传递给 callback
的数据[optional] callback
: 在主线程被调用的 C++ 函数。这个 callback
会收到 ThreadSafeFunction
的
JS 回调和数据类型指针作为参数。简单来讲,=bacllback= 的格式为 void operator()(Napi::Env env, Function jsCallback, DataType* data)
。
返回值:
napi_ok
: 成功加到队列中napi_queue_full
: 队列已满napi_closing
: ThreadSafeFunction
不再接受调用napi_invalid_arg
: ThreadSafeFunction
已经关闭napi_generic_faulure
: 出现错误
代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| #include <napi.h>
#include <thread>
std::thread native_thread;
Napi::ThreadSafeFunction tsfn;
unsigned long long Fibonacci(int n) {
unsigned long long a = 0, b = 1;
for (int i = 0; i < n; i++) {
unsigned long long tmp = a;
a = b;
b = tmp + b;
}
return a;
}
Napi::Value ExportFibonacci(const Napi::CallbackInfo& info) {
int n = info[0].As<Napi::Number>();
Napi::Function callback = info[1].As<Napi::Function>();
// 创建一个 ThreadSafeFunction
tsfn = Napi::ThreadSafeFunction::New(
info.Env(),
callback, // js 的回调
"Resource name",
0, // 无限制
1, //
[](Napi::Env) {
native_thread.join();
});
native_thread = std::thread([n]{
auto callback = [](Napi::Env env, Napi::Function js_callback, unsigned long long* result) {
js_callback.Call({env.Null(), Napi::Number::New(env, *result)});
delete result;
};
unsigned long long* value = new unsigned long long(Fibonacci(n));
napi_status status = tsfn.BlockingCall(value, callback);
if (status != napi_ok) {
// 处理错误
}
tsfn.Release();
});
return Napi::String::New(info.Env(), "start calc fibonacci with native thread");
}
Napi::Object Init(Napi::Env env, Napi::Object exports) {
exports["runFibonacciWorker"] = Napi::Function::New(env, ExportFibonacci, std::string("runFibonacciWorker"));
return exports;
}
NODE_API_MODULE(fibonacci, Init);
|