NodeJS 异步转同步
众所周知 JavaScript 是为异步设计的,写 JS 时也要尽量用异步的 API 以免阻塞主线程,但现实需求总是超出预想,最近就遇到了需要将异步代码转为同步的奇葩情况。
JS 的异步有两种,Callback 和 Promise,再加上同步就一共三种模式,它们之间大部分是可以转换的,比如:
const { promisify, callbackify } = require("util");
const sync = () => 123;
const callback = (cb) => cb(null, 123);
const promise = async () => 123;
const fn1 = callbackify(promise); // Promise 转 Callback
const fn2 = callbackify(sync); // Sync 转 Callback
const fn3 = promisify(callback); // Callback 转 Promise
const fn4 = async () => sync(); // Sync 转 Promise
三种模式两两组合一共有6种,但实际上能做到的只有4个,所以说是大部分可以转换,剩下的两个不能的是 Promise 转 Sync 和 Callback 转 Sync,因为 Node 并未提供把异步转换为同步的方法,完整的转换应该如下图所示:
图中红色的两个部分即是 Node 所欠缺的,本文就是讲解如何补上这两个函数,让三种模式形成完整的闭环。
方案选择
经过简单的搜索,发现解决这个问题有两种方法:
- 通过 Node 扩展调用底层的事件循环 API,手动执行所有异步任务,这类的代表是 abbr/deasync。
child_process
模块里有同步的 API 可以等待进程退出,只要把函数放到子进程里运行然后传回结果即可,这类项目有 sync-rpc、make-synchronous。
第二种局限性较大,首先进程之间的数据是独立的,这要求在子进程里运行的函数是一个纯函数;其次参数和返回值要序列化;最后启动进程是一个开销很大的操作,如果频繁调用对性能有影响。
所以我选择第一种,当然它也不是完美的,比如要使用 Node 内部的 API 兼容性可能有问题。
选定方案后,我研究了下 abbr/deasync 这个项目,发现它缺乏维护,Issues 和 PR 都没啥回应,代码还停留在 Node v0.x 时代,仅有针对回调的 API 而对 Promise 不友好,扩展使用的也是旧版 N-Api。另外还有一些毛病比如没有提供预编译二进制等等。
不对幸好它的代码很少,花一点时间理解了原理之后我决定把它重写一遍。
实现思路
思路可以说很简单,拿回调来讲就是先设置一个完成状态,然后调用异步函数,在传入的回调函数里设置该状态;因为函数是异步的所以立即返回,接下来搞一个循环不停执行 Event Loop 使异步任务得以运行,当完成状态被设置后退出循环并处理结果。
整个流程的关键就是手动执行 Event Loop,说到这个就要先谈谈事件模型了。
Node 事件模型
为了实现任务的优先级,Node 的底层循环并不是单一的队列,而是把任务划分成了很多种,其优先级也不一样,但总的来说可以看作两大类:宏任务(Macrotask)和微任务(Microtask)。
宏任务分为几个阶段,比如定时器、IO处理等,宏任务的优先级较低,在一次循环内新增的宏任务只能在下次循环时运行。
微任务最典型的就是Promise.then
,它的的优先级较高,在一个宏任务完成后会运行所有微任务,如果一个微任务里又添加了新的微任务,则新加的也会在下一个宏任务前运行。
两种任务的区别就到这里,更深入的话就与本文无关了,感兴趣的可以自己搜索相关资料。
V8 引擎自带了一个 Event loop,但 Node 并不使用而是自己整了一套实现叫libuv
,它提供了 Event loop 和一套相关的 API 用于运行宏任务。而对于微任务,Node 有一个内部 API process._tickCallback
可以手动运行它们。所以只要调用他俩即可执行完整的事件循环。
代码
Event loop 在 JavaScript 层是没有办法访问的,所以只能靠 C++ 来写扩展。首先引入libuv
的头文件uv.h
,然后拿到当前的 loop
然后调用uv_run
即可:
#define NAPI_VERSION 3
#include <uv.h>
#include <node_api.h>
napi_value Run(napi_env env, napi_callback_info info) {
uv_loop_s* loop;
napi_get_uv_event_loop(env, &loop);
uv_run(loop, UV_RUN_ONCE);
return nullptr;
}
// 导出 run 函数
napi_value Init(napi_env env, napi_value exports) {
napi_value fn_run;
napi_create_function(env, "run", NAPI_AUTO_LENGTH, Run, NULL, &fn_run);
napi_set_named_property(env, exports, "run", fn_run);
return exports;
}
NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)
因为使用的都是最基本的 API 所以设置 NAPI 级别为 3,这个级别兼容所有 Node 版本,uv_run
的第二个参数表明只运行一轮。
编译Node.js原生模块需要配置binding.gyp
,文件应放在包的根目录,然后执行node-gyp rebuild --ensure
即可
{
"targets": [
{
"target_name": "binding",
"sources": [ "src/binding.cc" ]
}
]
}
node-gyp rebuild --ensure
// or
node-gyp configure
node-gyp build
循环执行 Event Loop 的函数如下,跟事件模型一样先微任务再宏任务,因为异步函数也可能只有微任务,所以在执行宏任务前又检测一次避免多余的调用:
import { run } from "./build/Release/binding.node";
function loopWhile(pred: () => boolean) {
while (pred()) {
process._tickCallback();
if (pred()) run();
}
}
有了执行事件循环的函数,接下来就能构造用于回调式异步的转换代码:
type Fn<T, A extends any[], R> = (this: T, ...args: A) => R;
// 定义完成状态:运行中、成功、异常。
const State = {
Pending: 0,
Fulfilled: 1,
Rejected: 2,
};
// callback to sync
export function deasync<T, R = any>(fn: Fn<T, any[], void>) {
return function (this: T, ...args: any[]) {
let state = State.Pending;
let valueOrError: unknown;
// 在回调中设置状态和结果。
args.push((err: unknown, res: R) => {
if (err) {
valueOrError = err;
state = State.Rejected;
} else {
valueOrError = res;
state = State.Fulfilled;
}
});
fn.apply(this, args);
// 不停执行 Event Loop 直到状态被设置,此时会阻塞下面代码的执行
loopWhile(() => state === State.Pending);
// 处理结果,返回或者抛异常。
if (state === State.Rejected) {
throw valueOrError;
} else {
return valueOrError as R;
}
};
}
就这么简单,这个函数即可满足最开始的需求,比如将dns
模块转成同步:
import dns from "dns";
import { deasync } from "@kaciras/deasync";
const dnsResolveSync = deasync<void, string>(dns.resolve);
console.log(dnsResolveSync("google.com"));
运行后打印出 Google 的 IP 地址。
搞定了回调再说说现在的主流 Promise,对于Promise 形式的异步同样可以用这种方法:
// 判断对象是不是 Promise
function isThenable<T>(value: any): value is PromiseLike<T> {
return typeof value.then === "function";
}
export function awaitSync<T>(promise: PromiseLike<T> | T) {
let state = State.Pending;
let valueOrError: unknown;
if (!isThenable(promise)) {
return promise;
}
promise.then(res => {
valueOrError = res;
state = State.Fulfilled;
}, err => {
valueOrError = err;
state = State.Rejected;
});
loopWhile(() => state === State.Pending);
if (state === State.Rejected) {
throw valueOrError;
} else {
return valueOrError as T;
}
}
awaitSync
顾名思义就相当于同步的await
,用法示例:
const { awaitSync } = require("@kaciras/deasync");
const promise = new Promise(resolve => setTimeout(resolve, 1000)).then(() => "wake up");
// 暂停 1 秒后输出 "wake up"
console.log(awaitSync(promise));