NodeJS 异步转同步

众所周知 JavaScript 是为异步设计的,写 JS 时也要尽量用异步的 API 以免阻塞主线程,但现实需求总是超出预想,最近就遇到了需要将异步代码转为同步的奇葩情况。

JS 的异步有两种,Callback 和 Promise,再加上同步就一共三种模式,它们之间大部分是可以转换的,比如:

const { promisify, callbackify } = require("util");

const sync = () => 123;
const callback = (cb) => cb(null, 123);
const promise = async () => 123;

const fn1 = callbackify(promise); // Promise 转 Callback
const fn2 = callbackify(sync); // Sync 转 Callback
const fn3 = promisify(callback); // Callback 转 Promise
const fn4 = async () => sync(); // Sync 转 Promise

三种模式两两组合一共有6种,但实际上能做到的只有4个,所以说是大部分可以转换,剩下的两个不能的是 Promise 转 Sync 和 Callback 转 Sync,因为 Node 并未提供把异步转换为同步的方法,完整的转换应该如下图所示:

图中红色的两个部分即是 Node 所欠缺的,本文就是讲解如何补上这两个函数,让三种模式形成完整的闭环。

方案选择

经过简单的搜索,发现解决这个问题有两种方法:

  1. 通过 Node 扩展调用底层的事件循环 API,手动执行所有异步任务,这类的代表是 abbr/deasync
  2. child_process模块里有同步的 API 可以等待进程退出,只要把函数放到子进程里运行然后传回结果即可,这类项目有 sync-rpcmake-synchronous

第二种局限性较大,首先进程之间的数据是独立的,这要求在子进程里运行的函数是一个纯函数;其次参数和返回值要序列化;最后启动进程是一个开销很大的操作,如果频繁调用对性能有影响。

所以我选择第一种,当然它也不是完美的,比如要使用 Node 内部的 API 兼容性可能有问题。

选定方案后,我研究了下 abbr/deasync 这个项目,发现它缺乏维护,Issues 和 PR 都没啥回应,代码还停留在 Node v0.x 时代,仅有针对回调的 API 而对 Promise 不友好,扩展使用的也是旧版 N-Api。另外还有一些毛病比如没有提供预编译二进制等等。

不对幸好它的代码很少,花一点时间理解了原理之后我决定把它重写一遍。

实现思路

思路可以说很简单,拿回调来讲就是先设置一个完成状态,然后调用异步函数,在传入的回调函数里设置该状态;因为函数是异步的所以立即返回,接下来搞一个循环不停执行 Event Loop 使异步任务得以运行,当完成状态被设置后退出循环并处理结果。

整个流程的关键就是手动执行 Event Loop,说到这个就要先谈谈事件模型了。

Node 事件模型

为了实现任务的优先级,Node 的底层循环并不是单一的队列,而是把任务划分成了很多种,其优先级也不一样,但总的来说可以看作两大类:宏任务(Macrotask)和微任务(Microtask)。

宏任务分为几个阶段,比如定时器、IO处理等,宏任务的优先级较低,在一次循环内新增的宏任务只能在下次循环时运行。

微任务最典型的就是Promise.then,它的的优先级较高,在一个宏任务完成后会运行所有微任务,如果一个微任务里又添加了新的微任务,则新加的也会在下一个宏任务前运行。

两种任务的区别就到这里,更深入的话就与本文无关了,感兴趣的可以自己搜索相关资料。

V8 引擎自带了一个 Event loop,但 Node 并不使用而是自己整了一套实现叫libuv,它提供了 Event loop 和一套相关的 API 用于运行宏任务。而对于微任务,Node 有一个内部 API process._tickCallback可以手动运行它们。所以只要调用他俩即可执行完整的事件循环。

代码

Event loop 在 JavaScript 层是没有办法访问的,所以只能靠 C++ 来写扩展。首先引入libuv的头文件uv.h,然后拿到当前的 loop 然后调用uv_run即可:

#define NAPI_VERSION 3

#include <uv.h>
#include <node_api.h>

napi_value Run(napi_env env, napi_callback_info info) {
	uv_loop_s* loop;
	napi_get_uv_event_loop(env, &loop);
	uv_run(loop, UV_RUN_ONCE);
	return nullptr;
}

// 导出 run 函数
napi_value Init(napi_env env, napi_value exports) {
	napi_value fn_run;
	napi_create_function(env, "run", NAPI_AUTO_LENGTH, Run, NULL, &fn_run);
	napi_set_named_property(env, exports, "run", fn_run);
	return exports;
}

NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)

因为使用的都是最基本的 API 所以设置 NAPI 级别为 3,这个级别兼容所有 Node 版本,uv_run的第二个参数表明只运行一轮。

编译Node.js原生模块需要配置binding.gyp,文件应放在包的根目录,然后执行node-gyp rebuild --ensure即可

{
  "targets": [
    {
      "target_name": "binding",
      "sources": [ "src/binding.cc" ]
    }
  ]
}
node-gyp rebuild --ensure
// or
node-gyp configure
node-gyp build

循环执行 Event Loop 的函数如下,跟事件模型一样先微任务再宏任务,因为异步函数也可能只有微任务,所以在执行宏任务前又检测一次避免多余的调用:

import { run } from "./build/Release/binding.node";

function loopWhile(pred: () => boolean) {
	while (pred()) {
		process._tickCallback();
		if (pred()) run();
	}
}

有了执行事件循环的函数,接下来就能构造用于回调式异步的转换代码:

type Fn<T, A extends any[], R> = (this: T, ...args: A) => R;

// 定义完成状态:运行中、成功、异常。
const State = {
	Pending: 0,
	Fulfilled: 1,
	Rejected: 2,
};

//	callback to sync
export function deasync<T, R = any>(fn: Fn<T, any[], void>) {

	return function (this: T, ...args: any[]) {
		let state = State.Pending;
		let valueOrError: unknown;

		// 在回调中设置状态和结果。
		args.push((err: unknown, res: R) => {
			if (err) {
				valueOrError = err;
				state = State.Rejected;
			} else {
				valueOrError = res;
				state = State.Fulfilled;
			}
		});

		fn.apply(this, args);

		// 不停执行 Event Loop 直到状态被设置,此时会阻塞下面代码的执行
		loopWhile(() => state === State.Pending);

		// 处理结果,返回或者抛异常。
		if (state === State.Rejected) {
			throw valueOrError;
		} else {
			return valueOrError as R;
		}
	};
}

就这么简单,这个函数即可满足最开始的需求,比如将dns模块转成同步:

import dns from "dns";
import { deasync } from "@kaciras/deasync";

const dnsResolveSync = deasync<void, string>(dns.resolve);
console.log(dnsResolveSync("google.com"));

运行后打印出 Google 的 IP 地址。

搞定了回调再说说现在的主流 Promise,对于Promise 形式的异步同样可以用这种方法:

// 判断对象是不是 Promise
function isThenable<T>(value: any): value is PromiseLike<T> {
	return typeof value.then === "function";
}

export function awaitSync<T>(promise: PromiseLike<T> | T) {
	let state = State.Pending;
	let valueOrError: unknown;

	if (!isThenable(promise)) {
		return promise;
	}

	promise.then(res => {
		valueOrError = res;
		state = State.Fulfilled;
	}, err => {
		valueOrError = err;
		state = State.Rejected;
	});

	loopWhile(() => state === State.Pending);

	if (state === State.Rejected) {
		throw valueOrError;
	} else {
		return valueOrError as T;
	}
}

awaitSync 顾名思义就相当于同步的await,用法示例:

const { awaitSync } = require("@kaciras/deasync");

const promise = new Promise(resolve => setTimeout(resolve, 1000)).then(() => "wake up");

// 暂停 1 秒后输出 "wake up"
console.log(awaitSync(promise));