- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用 Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- Corepack 核心包
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议 2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS 模块
- module/esm ECMAScript 模块
- module/package 包模块
- module/typescript TS 模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- sea 单个可执行应用程序
Node.js v23.5.0 文档
- Node.js v23.5.0
-
目录
- 异步上下文跟踪
- 介绍
- 类:
AsyncLocalStorage
new AsyncLocalStorage()
- 静态方法:
AsyncLocalStorage.bind(fn)
- 静态方法:
AsyncLocalStorage.snapshot()
asyncLocalStorage.disable()
asyncLocalStorage.getStore()
asyncLocalStorage.enterWith(store)
asyncLocalStorage.run(store, callback[, ...args])
asyncLocalStorage.exit(callback[, ...args])
- 与
async/await
一起使用 - 故障排除:上下文丢失
- 类:
AsyncResource
new AsyncResource(type[, options])
- 静态方法:
AsyncResource.bind(fn[, type[, thisArg]])
asyncResource.bind(fn[, thisArg])
asyncResource.runInAsyncScope(fn[, thisArg, ...args])
asyncResource.emitDestroy()
asyncResource.asyncId()
asyncResource.triggerAsyncId()
- 将
AsyncResource
用于Worker
线程池 - 将
AsyncResource
与EventEmitter
集成
- 异步上下文跟踪
-
导航
- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用 Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- Corepack 核心包
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议 2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS 模块
- module/esm ECMAScript 模块
- module/package 包模块
- module/typescript TS 模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- sea 单个可执行应用程序
- 其他版本
异步上下文跟踪#
¥Asynchronous context tracking
¥Stability: 2 - Stable
源代码: lib/async_hooks.js
介绍#
¥Introduction
这些类用于关联状态并在整个回调和 promise 链中传播它。它们允许在 Web 请求的整个生命周期或任何其他异步持续时间内存储数据。它类似于其他语言中的线程本地存储。
¥These classes are used to associate state and propagate it throughout callbacks and promise chains. They allow storing data throughout the lifetime of a web request or any other asynchronous duration. It is similar to thread-local storage in other languages.
AsyncLocalStorage
和 AsyncResource
类是 node:async_hooks
模块的一部分:
¥The AsyncLocalStorage
and AsyncResource
classes are part of the
node:async_hooks
module:
import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks';
const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks');
类:AsyncLocalStorage
#
¥Class: AsyncLocalStorage
此类创建通过异步操作保持一致的存储。
¥This class creates stores that stay coherent through asynchronous operations.
虽然你可以在 node:async_hooks
模块之上创建自己的实现,但 AsyncLocalStorage
应该是首选,因为它是一种高性能且内存安全的实现,涉及实现起来并不明显的重要优化。
¥While you can create your own implementation on top of the node:async_hooks
module, AsyncLocalStorage
should be preferred as it is a performant and memory
safe implementation that involves significant optimizations that are non-obvious
to implement.
以下示例使用 AsyncLocalStorage
构建一个简单的日志器,它为传入的 HTTP 请求分配 ID,并将它们包含在每个请求中记录的消息中。
¥The following example uses AsyncLocalStorage
to build a simple logger
that assigns IDs to incoming HTTP requests and includes them in messages
logged within each request.
import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
const http = require('node:http');
const { AsyncLocalStorage } = require('node:async_hooks');
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
AsyncLocalStorage
的每个实例都维护一个独立的存储上下文。多个实例可以安全地同时存在,而不会有干扰彼此数据的风险。
¥Each instance of AsyncLocalStorage
maintains an independent storage context.
Multiple instances can safely exist simultaneously without risk of interfering
with each other's data.
new AsyncLocalStorage()
#
创建 AsyncLocalStorage
的新实例。Store 仅在 run()
调用内或 enterWith()
调用后提供。
¥Creates a new instance of AsyncLocalStorage
. Store is only provided within a
run()
call or after an enterWith()
call.
静态方法:AsyncLocalStorage.bind(fn)
#
¥Static method: AsyncLocalStorage.bind(fn)
¥Stability: 1 - Experimental
-
fn
<Function> 绑定到当前执行上下文的函数。¥
fn
<Function> The function to bind to the current execution context. -
返回:<Function> 在捕获的执行上下文中调用
fn
的新函数。¥Returns: <Function> A new function that calls
fn
within the captured execution context.
将给定函数绑定到当前执行上下文。
¥Binds the given function to the current execution context.
静态方法:AsyncLocalStorage.snapshot()
#
¥Static method: AsyncLocalStorage.snapshot()
¥Stability: 1 - Experimental
-
返回:<Function> 带有签名
(fn: (...args) : R, ...args) : R
的新函数。¥Returns: <Function> A new function with the signature
(fn: (...args) : R, ...args) : R
.
捕获当前执行上下文并返回一个接受函数作为参数的函数。每当调用返回的函数时,它都会在捕获的上下文中调用传递给它的函数。
¥Captures the current execution context and returns a function that accepts a function as an argument. Whenever the returned function is called, it calls the function passed to it within the captured context.
const asyncLocalStorage = new AsyncLocalStorage();
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot());
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()));
console.log(result); // returns 123
AsyncLocalStorage.snapshot() 可以代替 AsyncResource 用于简单的异步上下文跟踪目的,例如:
¥AsyncLocalStorage.snapshot() can replace the use of AsyncResource for simple async context tracking purposes, for example:
class Foo {
#runInAsyncScope = AsyncLocalStorage.snapshot();
get() { return this.#runInAsyncScope(() => asyncLocalStorage.getStore()); }
}
const foo = asyncLocalStorage.run(123, () => new Foo());
console.log(asyncLocalStorage.run(321, () => foo.get())); // returns 123
asyncLocalStorage.disable()
#
¥Stability: 1 - Experimental
禁用 AsyncLocalStorage
的实例。对 asyncLocalStorage.getStore()
的所有后续调用都将返回 undefined
,直到再次调用 asyncLocalStorage.run()
或 asyncLocalStorage.enterWith()
。
¥Disables the instance of AsyncLocalStorage
. All subsequent calls
to asyncLocalStorage.getStore()
will return undefined
until
asyncLocalStorage.run()
or asyncLocalStorage.enterWith()
is called again.
调用 asyncLocalStorage.disable()
时,将退出所有当前链接到该实例的上下文。
¥When calling asyncLocalStorage.disable()
, all current contexts linked to the
instance will be exited.
在可以对 asyncLocalStorage
进行垃圾回收之前,需要调用 asyncLocalStorage.disable()
。这不适用于 asyncLocalStorage
提供的存储,因为这些对象与相应的异步资源一起被垃圾回收。
¥Calling asyncLocalStorage.disable()
is required before the
asyncLocalStorage
can be garbage collected. This does not apply to stores
provided by the asyncLocalStorage
, as those objects are garbage collected
along with the corresponding async resources.
当 asyncLocalStorage
在当前进程中不再使用时使用此方法。
¥Use this method when the asyncLocalStorage
is not in use anymore
in the current process.
asyncLocalStorage.getStore()
#
返回当前存储。如果在通过调用 asyncLocalStorage.run()
或 asyncLocalStorage.enterWith()
初始化的异步上下文之外调用,它将返回 undefined
。
¥Returns the current store.
If called outside of an asynchronous context initialized by
calling asyncLocalStorage.run()
or asyncLocalStorage.enterWith()
, it
returns undefined
.
asyncLocalStorage.enterWith(store)
#
¥Stability: 1 - Experimental
store
<any>
转换为当前同步执行的剩余部分的上下文,然后通过任何后续异步调用持久保存存储。
¥Transitions into the context for the remainder of the current synchronous execution and then persists the store through any following asynchronous calls.
示例:
¥Example:
const store = { id: 1 };
// Replaces previous store with the given store object
asyncLocalStorage.enterWith(store);
asyncLocalStorage.getStore(); // Returns the store object
someAsyncOperation(() => {
asyncLocalStorage.getStore(); // Returns the same object
});
此转换将持续整个同步执行。这意味着,例如,如果在事件处理程序中输入上下文,则后续事件处理程序也将在该上下文中运行,除非使用 AsyncResource
专门绑定到另一个上下文。这就是为什么 run()
应该优于 enterWith()
的原因,除非有充分的理由使用后一种方法。
¥This transition will continue for the entire synchronous execution.
This means that if, for example, the context is entered within an event
handler subsequent event handlers will also run within that context unless
specifically bound to another context with an AsyncResource
. That is why
run()
should be preferred over enterWith()
unless there are strong reasons
to use the latter method.
const store = { id: 1 };
emitter.on('my-event', () => {
asyncLocalStorage.enterWith(store);
});
emitter.on('my-event', () => {
asyncLocalStorage.getStore(); // Returns the same object
});
asyncLocalStorage.getStore(); // Returns undefined
emitter.emit('my-event');
asyncLocalStorage.getStore(); // Returns the same object
asyncLocalStorage.run(store, callback[, ...args])
#
-
store
<any> -
callback
<Function> -
...args
<any>
在上下文中同步运行函数并返回其返回值。在回调函数之外无法访问该存储。在回调中创建的任何异步操作都可以访问该存储。
¥Runs a function synchronously within a context and returns its return value. The store is not accessible outside of the callback function. The store is accessible to any asynchronous operations created within the callback.
可选的 args
被传递给回调函数。
¥The optional args
are passed to the callback function.
如果回调函数抛出错误,则 run()
也会抛出该错误。堆栈跟踪不受此调用的影响,上下文已退出。
¥If the callback function throws an error, the error is thrown by run()
too.
The stacktrace is not impacted by this call and the context is exited.
示例:
¥Example:
const store = { id: 2 };
try {
asyncLocalStorage.run(store, () => {
asyncLocalStorage.getStore(); // Returns the store object
setTimeout(() => {
asyncLocalStorage.getStore(); // Returns the store object
}, 200);
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns undefined
// The error will be caught here
}
asyncLocalStorage.exit(callback[, ...args])
#
¥Stability: 1 - Experimental
-
callback
<Function> -
...args
<any>
在上下文之外同步运行函数并返回其返回值。该存储在回调函数或回调中创建的异步操作中不可访问。在回调函数内完成的任何 getStore()
调用将始终返回 undefined
。
¥Runs a function synchronously outside of a context and returns its
return value. The store is not accessible within the callback function or
the asynchronous operations created within the callback. Any getStore()
call done within the callback function will always return undefined
.
可选的 args
被传递给回调函数。
¥The optional args
are passed to the callback function.
如果回调函数抛出错误,则 exit()
也会抛出该错误。堆栈跟踪不受此调用的影响,并且重新进入上下文。
¥If the callback function throws an error, the error is thrown by exit()
too.
The stacktrace is not impacted by this call and the context is re-entered.
示例:
¥Example:
// Within a call to run
try {
asyncLocalStorage.getStore(); // Returns the store object or value
asyncLocalStorage.exit(() => {
asyncLocalStorage.getStore(); // Returns undefined
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns the same object or value
// The error will be caught here
}
与 async/await
一起使用#
¥Usage with async/await
如果在异步函数中,只有一个 await
调用在上下文中运行,则应使用以下模式:
¥If, within an async function, only one await
call is to run within a context,
the following pattern should be used:
async function fn() {
await asyncLocalStorage.run(new Map(), () => {
asyncLocalStorage.getStore().set('key', value);
return foo(); // The return value of foo will be awaited
});
}
本例中 store 只在回调函数和 foo
调用的函数中可用。在 run
之外,调用 getStore
将返回 undefined
。
¥In this example, the store is only available in the callback function and the
functions called by foo
. Outside of run
, calling getStore
will return
undefined
.
故障排除:上下文丢失#
¥Troubleshooting: Context loss
在大多数情况下,AsyncLocalStorage
可以正常工作。在极少数情况下,当前存储会在其中一个异步操作中丢失。
¥In most cases, AsyncLocalStorage
works without issues. In rare situations, the
current store is lost in one of the asynchronous operations.
如果你的代码是基于回调的,则使用 util.promisify()
对其进行 promise 就足够了,因此它可以开始使用原生 promise。
¥If your code is callback-based, it is enough to promisify it with
util.promisify()
so it starts working with native promises.
如果你需要使用基于回调的 API,或者你的代码采用自定义的 thenable 实现,请使用 AsyncResource
类将异步操作与正确的执行上下文相关联。通过在你怀疑导致上下文丢失的调用之后记录 asyncLocalStorage.getStore()
的内容,找到导致上下文丢失的函数调用。当代码记录 undefined
时,调用的最后一个回调可能是上下文丢失的原因。
¥If you need to use a callback-based API or your code assumes
a custom thenable implementation, use the AsyncResource
class
to associate the asynchronous operation with the correct execution context.
Find the function call responsible for the context loss by logging the content
of asyncLocalStorage.getStore()
after the calls you suspect are responsible
for the loss. When the code logs undefined
, the last callback called is
probably responsible for the context loss.
类:AsyncResource
#
¥Class: AsyncResource
AsyncResource
类旨在通过嵌入器的异步资源进行扩展。使用它,用户可以轻松触发自己资源的生命周期事件。
¥The class AsyncResource
is designed to be extended by the embedder's async
resources. Using this, users can easily trigger the lifetime events of their
own resources.
init
钩子将在实例化 AsyncResource
时触发。
¥The init
hook will trigger when an AsyncResource
is instantiated.
以下是 AsyncResource
API 的概述。
¥The following is an overview of the AsyncResource
API.
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
new AsyncResource(type[, options])
#
-
type
<string> 异步事件的类型。¥
type
<string> The type of async event. -
options
<Object>-
triggerAsyncId
<number> 创建此异步事件的执行上下文的 ID。默认值:executionAsyncId()
。¥
triggerAsyncId
<number> The ID of the execution context that created this async event. Default:executionAsyncId()
. -
requireManualDestroy
<boolean> 如果设置为true
,则当对象被垃圾回收时禁用emitDestroy
。这通常不需要设置(即使手动调用emitDestroy
),除非检索到资源的asyncId
并调用敏感 API 的emitDestroy
。当设置为false
时,则只有在至少有一个活动的destroy
钩子时才会调用emitDestroy
垃圾回收。默认值:false
。¥
requireManualDestroy
<boolean> If set totrue
, disablesemitDestroy
when the object is garbage collected. This usually does not need to be set (even ifemitDestroy
is called manually), unless the resource'sasyncId
is retrieved and the sensitive API'semitDestroy
is called with it. When set tofalse
, theemitDestroy
call on garbage collection will only take place if there is at least one activedestroy
hook. Default:false
.
-
用法示例:
¥Example usage:
class DBQuery extends AsyncResource {
constructor(db) {
super('DBQuery');
this.db = db;
}
getInfo(query, callback) {
this.db.get(query, (err, data) => {
this.runInAsyncScope(callback, null, err, data);
});
}
close() {
this.db = null;
this.emitDestroy();
}
}
静态方法:AsyncResource.bind(fn[, type[, thisArg]])
#
¥Static method: AsyncResource.bind(fn[, type[, thisArg]])
-
fn
<Function> 绑定到当前执行上下文的函数。¥
fn
<Function> The function to bind to the current execution context. -
type
<string> 与底层AsyncResource
关联的可选名称。¥
type
<string> An optional name to associate with the underlyingAsyncResource
. -
thisArg
<any>
将给定函数绑定到当前执行上下文。
¥Binds the given function to the current execution context.
asyncResource.bind(fn[, thisArg])
#
-
fn
<Function> 绑定到当前AsyncResource
的函数。¥
fn
<Function> The function to bind to the currentAsyncResource
. -
thisArg
<any>
将要执行的给定函数绑定到此 AsyncResource
的范围。
¥Binds the given function to execute to this AsyncResource
's scope.
asyncResource.runInAsyncScope(fn[, thisArg, ...args])
#
-
fn
<Function> 在此异步资源的执行上下文中调用的函数。¥
fn
<Function> The function to call in the execution context of this async resource. -
thisArg
<any> 用于函数调用的接收器。¥
thisArg
<any> The receiver to be used for the function call. -
...args
<any> 传递给函数的可选参数。¥
...args
<any> Optional arguments to pass to the function.
在异步资源的执行上下文中使用提供的参数调用提供的函数。这将建立上下文,在回调前触发 AsyncHooks,调用函数,在回调后触发 AsyncHooks,然后恢复原来的执行上下文。
¥Call the provided function with the provided arguments in the execution context of the async resource. This will establish the context, trigger the AsyncHooks before callbacks, call the function, trigger the AsyncHooks after callbacks, and then restore the original execution context.
asyncResource.emitDestroy()
#
-
返回:<AsyncResource>
asyncResource
的引用。¥Returns: <AsyncResource> A reference to
asyncResource
.
调用所有的 destroy
钩子。这应该只被调用一次。如果多次调用,则会报错。这必须手动调用。如果资源留给 GC 收集,则永远不会调用 destroy
钩子。
¥Call all destroy
hooks. This should only ever be called once. An error will
be thrown if it is called more than once. This must be manually called. If
the resource is left to be collected by the GC then the destroy
hooks will
never be called.
asyncResource.asyncId()
#
asyncResource.triggerAsyncId()
#
-
返回:<number> 传给
AsyncResource
构造函数的同一个triggerAsyncId
。¥Returns: <number> The same
triggerAsyncId
that is passed to theAsyncResource
constructor.
将 AsyncResource
用于 Worker
线程池#
¥Using AsyncResource
for a Worker
thread pool
以下示例显示如何使用 AsyncResource
类为 Worker
池正确提供异步跟踪。其他资源池,例如数据库连接池,可以遵循类似的模型。
¥The following example shows how to use the AsyncResource
class to properly
provide async tracking for a Worker
pool. Other resource pools, such as
database connection pools, can follow a similar model.
假设任务是将两个数字相加,使用名为 task_processor.js
的文件,其内容如下:
¥Assuming that the task is adding two numbers, using a file named
task_processor.js
with the following content:
import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
const { parentPort } = require('node:worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
它周围的工作池可以使用以下结构:
¥a Worker pool around it could use the following structure:
import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import { Worker } from 'node:worker_threads';
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(new URL('task_processor.js', import.meta.url));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const path = require('node:path');
const { Worker } = require('node:worker_threads');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
module.exports = WorkerPool;
如果没有 WorkerPoolTaskInfo
对象添加的显式跟踪,回调似乎与各个 Worker
对象相关联。但是,Worker
的创建与任务的创建无关,并且不提供有关任务计划时间的信息。
¥Without the explicit tracking added by the WorkerPoolTaskInfo
objects,
it would appear that the callbacks are associated with the individual Worker
objects. However, the creation of the Worker
s is not associated with the
creation of the tasks and does not provide information about when tasks
were scheduled.
该池可以按如下方式使用:
¥This pool could be used as follows:
import WorkerPool from './worker_pool.js';
import os from 'node:os';
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
const WorkerPool = require('./worker_pool.js');
const os = require('node:os');
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
将 AsyncResource
与 EventEmitter
集成#
¥Integrating AsyncResource
with EventEmitter
由 EventEmitter
触发的事件监听器可能在与调用 eventEmitter.on()
时处于活动状态的执行上下文不同的执行上下文中运行。
¥Event listeners triggered by an EventEmitter
may be run in a different
execution context than the one that was active when eventEmitter.on()
was
called.
以下示例显示如何使用 AsyncResource
类将事件监听器与正确的执行上下文正确关联。相同的方法可以应用于 Stream
或类似的事件驱动类。
¥The following example shows how to use the AsyncResource
class to properly
associate an event listener with the correct execution context. The same
approach can be applied to a Stream
or a similar event-driven class.
import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);
const { createServer } = require('node:http');
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);