Node.js v23.6.0 文档


异步上下文跟踪#

¥Asynchronous context tracking

稳定性: 2 - 稳定的

¥Stability: 2 - Stable

源代码: lib/async_hooks.js

介绍#

¥Introduction

这些类用于关联状态并在整个回调和 promise 链中传播它。它们允许在 Web 请求的整个生命周期或任何其他异步持续时间内存储数据。它类似于其他语言中的线程本地存储。

¥These classes are used to associate state and propagate it throughout callbacks and promise chains. They allow storing data throughout the lifetime of a web request or any other asynchronous duration. It is similar to thread-local storage in other languages.

AsyncLocalStorageAsyncResource 类是 node:async_hooks 模块的一部分:

¥The AsyncLocalStorage and AsyncResource classes are part of the node:async_hooks module:

import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks';const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks');

类:AsyncLocalStorage#

¥Class: AsyncLocalStorage

此类创建通过异步操作保持一致的存储。

¥This class creates stores that stay coherent through asynchronous operations.

虽然你可以在 node:async_hooks 模块之上创建自己的实现,但 AsyncLocalStorage 应该是首选,因为它是一种高性能且内存安全的实现,涉及实现起来并不明显的重要优化。

¥While you can create your own implementation on top of the node:async_hooks module, AsyncLocalStorage should be preferred as it is a performant and memory safe implementation that involves significant optimizations that are non-obvious to implement.

以下示例使用 AsyncLocalStorage 构建一个简单的日志器,它为传入的 HTTP 请求分配 ID,并将它们包含在每个请求中记录的消息中。

¥The following example uses AsyncLocalStorage to build a simple logger that assigns IDs to incoming HTTP requests and includes them in messages logged within each request.

import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';

const asyncLocalStorage = new AsyncLocalStorage();

function logWithId(msg) {
  const id = asyncLocalStorage.getStore();
  console.log(`${id !== undefined ? id : '-'}:`, msg);
}

let idSeq = 0;
http.createServer((req, res) => {
  asyncLocalStorage.run(idSeq++, () => {
    logWithId('start');
    // Imagine any chain of async operations here
    setImmediate(() => {
      logWithId('finish');
      res.end();
    });
  });
}).listen(8080);

http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
//   0: start
//   1: start
//   0: finish
//   1: finishconst http = require('node:http');
const { AsyncLocalStorage } = require('node:async_hooks');

const asyncLocalStorage = new AsyncLocalStorage();

function logWithId(msg) {
  const id = asyncLocalStorage.getStore();
  console.log(`${id !== undefined ? id : '-'}:`, msg);
}

let idSeq = 0;
http.createServer((req, res) => {
  asyncLocalStorage.run(idSeq++, () => {
    logWithId('start');
    // Imagine any chain of async operations here
    setImmediate(() => {
      logWithId('finish');
      res.end();
    });
  });
}).listen(8080);

http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
//   0: start
//   1: start
//   0: finish
//   1: finish

AsyncLocalStorage 的每个实例都维护一个独立的存储上下文。多个实例可以安全地同时存在,而不会有干扰彼此数据的风险。

¥Each instance of AsyncLocalStorage maintains an independent storage context. Multiple instances can safely exist simultaneously without risk of interfering with each other's data.

new AsyncLocalStorage()#

创建 AsyncLocalStorage 的新实例。Store 仅在 run() 调用内或 enterWith() 调用后提供。

¥Creates a new instance of AsyncLocalStorage. Store is only provided within a run() call or after an enterWith() call.

静态方法:AsyncLocalStorage.bind(fn)#

¥Static method: AsyncLocalStorage.bind(fn)

稳定性: 1 - 实验性的

¥Stability: 1 - Experimental

  • fn <Function> 绑定到当前执行上下文的函数。

    ¥fn <Function> The function to bind to the current execution context.

  • 返回:<Function> 在捕获的执行上下文中调用 fn 的新函数。

    ¥Returns: <Function> A new function that calls fn within the captured execution context.

将给定函数绑定到当前执行上下文。

¥Binds the given function to the current execution context.

静态方法:AsyncLocalStorage.snapshot()#

¥Static method: AsyncLocalStorage.snapshot()

稳定性: 1 - 实验性的

¥Stability: 1 - Experimental

  • 返回:<Function> 带有签名 (fn: (...args) : R, ...args) : R 的新函数。

    ¥Returns: <Function> A new function with the signature (fn: (...args) : R, ...args) : R.

捕获当前执行上下文并返回一个接受函数作为参数的函数。每当调用返回的函数时,它都会在捕获的上下文中调用传递给它的函数。

¥Captures the current execution context and returns a function that accepts a function as an argument. Whenever the returned function is called, it calls the function passed to it within the captured context.

const asyncLocalStorage = new AsyncLocalStorage();
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot());
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()));
console.log(result);  // returns 123 

AsyncLocalStorage.snapshot() 可以代替 AsyncResource 用于简单的异步上下文跟踪目的,例如:

¥AsyncLocalStorage.snapshot() can replace the use of AsyncResource for simple async context tracking purposes, for example:

class Foo {
  #runInAsyncScope = AsyncLocalStorage.snapshot();

  get() { return this.#runInAsyncScope(() => asyncLocalStorage.getStore()); }
}

const foo = asyncLocalStorage.run(123, () => new Foo());
console.log(asyncLocalStorage.run(321, () => foo.get())); // returns 123 

asyncLocalStorage.disable()#

稳定性: 1 - 实验性的

¥Stability: 1 - Experimental

禁用 AsyncLocalStorage 的实例。对 asyncLocalStorage.getStore() 的所有后续调用都将返回 undefined,直到再次调用 asyncLocalStorage.run()asyncLocalStorage.enterWith()

¥Disables the instance of AsyncLocalStorage. All subsequent calls to asyncLocalStorage.getStore() will return undefined until asyncLocalStorage.run() or asyncLocalStorage.enterWith() is called again.

调用 asyncLocalStorage.disable() 时,将退出所有当前链接到该实例的上下文。

¥When calling asyncLocalStorage.disable(), all current contexts linked to the instance will be exited.

在可以对 asyncLocalStorage 进行垃圾回收之前,需要调用 asyncLocalStorage.disable()。这不适用于 asyncLocalStorage 提供的存储,因为这些对象与相应的异步资源一起被垃圾回收。

¥Calling asyncLocalStorage.disable() is required before the asyncLocalStorage can be garbage collected. This does not apply to stores provided by the asyncLocalStorage, as those objects are garbage collected along with the corresponding async resources.

asyncLocalStorage 在当前进程中不再使用时使用此方法。

¥Use this method when the asyncLocalStorage is not in use anymore in the current process.

asyncLocalStorage.getStore()#

返回当前存储。如果在通过调用 asyncLocalStorage.run()asyncLocalStorage.enterWith() 初始化的异步上下文之外调用,它将返回 undefined

¥Returns the current store. If called outside of an asynchronous context initialized by calling asyncLocalStorage.run() or asyncLocalStorage.enterWith(), it returns undefined.

asyncLocalStorage.enterWith(store)#

稳定性: 1 - 实验性的

¥Stability: 1 - Experimental

转换为当前同步执行的剩余部分的上下文,然后通过任何后续异步调用持久保存存储。

¥Transitions into the context for the remainder of the current synchronous execution and then persists the store through any following asynchronous calls.

示例:

¥Example:

const store = { id: 1 };
// Replaces previous store with the given store object
asyncLocalStorage.enterWith(store);
asyncLocalStorage.getStore(); // Returns the store object
someAsyncOperation(() => {
  asyncLocalStorage.getStore(); // Returns the same object
}); 

此转换将持续整个同步执行。这意味着,例如,如果在事件处理程序中输入上下文,则后续事件处理程序也将在该上下文中运行,除非使用 AsyncResource 专门绑定到另一个上下文。这就是为什么 run() 应该优于 enterWith() 的原因,除非有充分的理由使用后一种方法。

¥This transition will continue for the entire synchronous execution. This means that if, for example, the context is entered within an event handler subsequent event handlers will also run within that context unless specifically bound to another context with an AsyncResource. That is why run() should be preferred over enterWith() unless there are strong reasons to use the latter method.

const store = { id: 1 };

emitter.on('my-event', () => {
  asyncLocalStorage.enterWith(store);
});
emitter.on('my-event', () => {
  asyncLocalStorage.getStore(); // Returns the same object
});

asyncLocalStorage.getStore(); // Returns undefined
emitter.emit('my-event');
asyncLocalStorage.getStore(); // Returns the same object 

asyncLocalStorage.run(store, callback[, ...args])#

在上下文中同步运行函数并返回其返回值。在回调函数之外无法访问该存储。在回调中创建的任何异步操作都可以访问该存储。

¥Runs a function synchronously within a context and returns its return value. The store is not accessible outside of the callback function. The store is accessible to any asynchronous operations created within the callback.

可选的 args 被传递给回调函数。

¥The optional args are passed to the callback function.

如果回调函数抛出错误,则 run() 也会抛出该错误。堆栈跟踪不受此调用的影响,上下文已退出。

¥If the callback function throws an error, the error is thrown by run() too. The stacktrace is not impacted by this call and the context is exited.

示例:

¥Example:

const store = { id: 2 };
try {
  asyncLocalStorage.run(store, () => {
    asyncLocalStorage.getStore(); // Returns the store object
    setTimeout(() => {
      asyncLocalStorage.getStore(); // Returns the store object
    }, 200);
    throw new Error();
  });
} catch (e) {
  asyncLocalStorage.getStore(); // Returns undefined
  // The error will be caught here
} 

asyncLocalStorage.exit(callback[, ...args])#

稳定性: 1 - 实验性的

¥Stability: 1 - Experimental

在上下文之外同步运行函数并返回其返回值。该存储在回调函数或回调中创建的异步操作中不可访问。在回调函数内完成的任何 getStore() 调用将始终返回 undefined

¥Runs a function synchronously outside of a context and returns its return value. The store is not accessible within the callback function or the asynchronous operations created within the callback. Any getStore() call done within the callback function will always return undefined.

可选的 args 被传递给回调函数。

¥The optional args are passed to the callback function.

如果回调函数抛出错误,则 exit() 也会抛出该错误。堆栈跟踪不受此调用的影响,并且重新进入上下文。

¥If the callback function throws an error, the error is thrown by exit() too. The stacktrace is not impacted by this call and the context is re-entered.

示例:

¥Example:

// Within a call to run
try {
  asyncLocalStorage.getStore(); // Returns the store object or value
  asyncLocalStorage.exit(() => {
    asyncLocalStorage.getStore(); // Returns undefined
    throw new Error();
  });
} catch (e) {
  asyncLocalStorage.getStore(); // Returns the same object or value
  // The error will be caught here
} 

async/await 一起使用#

¥Usage with async/await

如果在异步函数中,只有一个 await 调用在上下文中运行,则应使用以下模式:

¥If, within an async function, only one await call is to run within a context, the following pattern should be used:

async function fn() {
  await asyncLocalStorage.run(new Map(), () => {
    asyncLocalStorage.getStore().set('key', value);
    return foo(); // The return value of foo will be awaited
  });
} 

本例中 store 只在回调函数和 foo 调用的函数中可用。在 run 之外,调用 getStore 将返回 undefined

¥In this example, the store is only available in the callback function and the functions called by foo. Outside of run, calling getStore will return undefined.

故障排除:上下文丢失#

¥Troubleshooting: Context loss

在大多数情况下,AsyncLocalStorage 可以正常工作。在极少数情况下,当前存储会在其中一个异步操作中丢失。

¥In most cases, AsyncLocalStorage works without issues. In rare situations, the current store is lost in one of the asynchronous operations.

如果你的代码是基于回调的,则使用 util.promisify() 对其进行 promise 就足够了,因此它可以开始使用原生 promise。

¥If your code is callback-based, it is enough to promisify it with util.promisify() so it starts working with native promises.

如果你需要使用基于回调的 API,或者你的代码采用自定义的 thenable 实现,请使用 AsyncResource 类将异步操作与正确的执行上下文相关联。通过在你怀疑导致上下文丢失的调用之后记录 asyncLocalStorage.getStore() 的内容,找到导致上下文丢失的函数调用。当代码记录 undefined 时,调用的最后一个回调可能是上下文丢失的原因。

¥If you need to use a callback-based API or your code assumes a custom thenable implementation, use the AsyncResource class to associate the asynchronous operation with the correct execution context. Find the function call responsible for the context loss by logging the content of asyncLocalStorage.getStore() after the calls you suspect are responsible for the loss. When the code logs undefined, the last callback called is probably responsible for the context loss.

类:AsyncResource#

¥Class: AsyncResource

AsyncResource 类旨在通过嵌入器的异步资源进行扩展。使用它,用户可以轻松触发自己资源的生命周期事件。

¥The class AsyncResource is designed to be extended by the embedder's async resources. Using this, users can easily trigger the lifetime events of their own resources.

init 钩子将在实例化 AsyncResource 时触发。

¥The init hook will trigger when an AsyncResource is instantiated.

以下是 AsyncResource API 的概述。

¥The following is an overview of the AsyncResource API.

import { AsyncResource, executionAsyncId } from 'node:async_hooks';

// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
  type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);

// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);

// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();

// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();

// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();const { AsyncResource, executionAsyncId } = require('node:async_hooks');

// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
  type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);

// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);

// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();

// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();

// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();

new AsyncResource(type[, options])#

  • type <string> 异步事件的类型。

    ¥type <string> The type of async event.

  • options <Object>

    • triggerAsyncId <number> 创建此异步事件的执行上下文的 ID。默认值:executionAsyncId()

      ¥triggerAsyncId <number> The ID of the execution context that created this async event. Default: executionAsyncId().

    • requireManualDestroy <boolean> 如果设置为 true,则当对象被垃圾回收时禁用 emitDestroy。这通常不需要设置(即使手动调用 emitDestroy),除非检索到资源的 asyncId 并调用敏感 API 的 emitDestroy。当设置为 false 时,则只有在至少有一个活动的 destroy 钩子时才会调用 emitDestroy 垃圾回收。默认值:false

      ¥requireManualDestroy <boolean> If set to true, disables emitDestroy when the object is garbage collected. This usually does not need to be set (even if emitDestroy is called manually), unless the resource's asyncId is retrieved and the sensitive API's emitDestroy is called with it. When set to false, the emitDestroy call on garbage collection will only take place if there is at least one active destroy hook. Default: false.

用法示例:

¥Example usage:

class DBQuery extends AsyncResource {
  constructor(db) {
    super('DBQuery');
    this.db = db;
  }

  getInfo(query, callback) {
    this.db.get(query, (err, data) => {
      this.runInAsyncScope(callback, null, err, data);
    });
  }

  close() {
    this.db = null;
    this.emitDestroy();
  }
} 

静态方法:AsyncResource.bind(fn[, type[, thisArg]])#

¥Static method: AsyncResource.bind(fn[, type[, thisArg]])

  • fn <Function> 绑定到当前执行上下文的函数。

    ¥fn <Function> The function to bind to the current execution context.

  • type <string> 与底层 AsyncResource 关联的可选名称。

    ¥type <string> An optional name to associate with the underlying AsyncResource.

  • thisArg <any>

将给定函数绑定到当前执行上下文。

¥Binds the given function to the current execution context.

asyncResource.bind(fn[, thisArg])#

  • fn <Function> 绑定到当前 AsyncResource 的函数。

    ¥fn <Function> The function to bind to the current AsyncResource.

  • thisArg <any>

将要执行的给定函数绑定到此 AsyncResource 的范围。

¥Binds the given function to execute to this AsyncResource's scope.

asyncResource.runInAsyncScope(fn[, thisArg, ...args])#

  • fn <Function> 在此异步资源的执行上下文中调用的函数。

    ¥fn <Function> The function to call in the execution context of this async resource.

  • thisArg <any> 用于函数调用的接收器。

    ¥thisArg <any> The receiver to be used for the function call.

  • ...args <any> 传递给函数的可选参数。

    ¥...args <any> Optional arguments to pass to the function.

在异步资源的执行上下文中使用提供的参数调用提供的函数。这将建立上下文,在回调前触发 AsyncHooks,调用函数,在回调后触发 AsyncHooks,然后恢复原来的执行上下文。

¥Call the provided function with the provided arguments in the execution context of the async resource. This will establish the context, trigger the AsyncHooks before callbacks, call the function, trigger the AsyncHooks after callbacks, and then restore the original execution context.

asyncResource.emitDestroy()#

调用所有的 destroy 钩子。这应该只被调用一次。如果多次调用,则会报错。这必须手动调用。如果资源留给 GC 收集,则永远不会调用 destroy 钩子。

¥Call all destroy hooks. This should only ever be called once. An error will be thrown if it is called more than once. This must be manually called. If the resource is left to be collected by the GC then the destroy hooks will never be called.

asyncResource.asyncId()#

  • 返回:<number> 分配给资源的唯一 asyncId

    ¥Returns: <number> The unique asyncId assigned to the resource.

asyncResource.triggerAsyncId()#

  • 返回:<number> 传给 AsyncResource 构造函数的同一个 triggerAsyncId

    ¥Returns: <number> The same triggerAsyncId that is passed to the AsyncResource constructor.

AsyncResource 用于 Worker 线程池#

¥Using AsyncResource for a Worker thread pool

以下示例显示如何使用 AsyncResource 类为 Worker 池正确提供异步跟踪。其他资源池,例如数据库连接池,可以遵循类似的模型。

¥The following example shows how to use the AsyncResource class to properly provide async tracking for a Worker pool. Other resource pools, such as database connection pools, can follow a similar model.

假设任务是将两个数字相加,使用名为 task_processor.js 的文件,其内容如下:

¥Assuming that the task is adding two numbers, using a file named task_processor.js with the following content:

import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
  parentPort.postMessage(task.a + task.b);
});const { parentPort } = require('node:worker_threads');
parentPort.on('message', (task) => {
  parentPort.postMessage(task.a + task.b);
});

它周围的工作池可以使用以下结构:

¥a Worker pool around it could use the following structure:

import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import { Worker } from 'node:worker_threads';

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo');
    this.callback = callback;
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result);
    this.emitDestroy();  // `TaskInfo`s are used only once.
  }
}

export default class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super();
    this.numThreads = numThreads;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];

    for (let i = 0; i < numThreads; i++)
      this.addNewWorker();

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift();
        this.runTask(task, callback);
      }
    });
  }

  addNewWorker() {
    const worker = new Worker(new URL('task_processor.js', import.meta.url));
    worker.on('message', (result) => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result);
      worker[kTaskInfo] = null;
      this.freeWorkers.push(worker);
      this.emit(kWorkerFreedEvent);
    });
    worker.on('error', (err) => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo])
        worker[kTaskInfo].done(err, null);
      else
        this.emit('error', err);
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1);
      this.addNewWorker();
    });
    this.workers.push(worker);
    this.freeWorkers.push(worker);
    this.emit(kWorkerFreedEvent);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback });
      return;
    }

    const worker = this.freeWorkers.pop();
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
    worker.postMessage(task);
  }

  close() {
    for (const worker of this.workers) worker.terminate();
  }
}const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const path = require('node:path');
const { Worker } = require('node:worker_threads');

const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');

class WorkerPoolTaskInfo extends AsyncResource {
  constructor(callback) {
    super('WorkerPoolTaskInfo');
    this.callback = callback;
  }

  done(err, result) {
    this.runInAsyncScope(this.callback, null, err, result);
    this.emitDestroy();  // `TaskInfo`s are used only once.
  }
}

class WorkerPool extends EventEmitter {
  constructor(numThreads) {
    super();
    this.numThreads = numThreads;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];

    for (let i = 0; i < numThreads; i++)
      this.addNewWorker();

    // Any time the kWorkerFreedEvent is emitted, dispatch
    // the next task pending in the queue, if any.
    this.on(kWorkerFreedEvent, () => {
      if (this.tasks.length > 0) {
        const { task, callback } = this.tasks.shift();
        this.runTask(task, callback);
      }
    });
  }

  addNewWorker() {
    const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
    worker.on('message', (result) => {
      // In case of success: Call the callback that was passed to `runTask`,
      // remove the `TaskInfo` associated with the Worker, and mark it as free
      // again.
      worker[kTaskInfo].done(null, result);
      worker[kTaskInfo] = null;
      this.freeWorkers.push(worker);
      this.emit(kWorkerFreedEvent);
    });
    worker.on('error', (err) => {
      // In case of an uncaught exception: Call the callback that was passed to
      // `runTask` with the error.
      if (worker[kTaskInfo])
        worker[kTaskInfo].done(err, null);
      else
        this.emit('error', err);
      // Remove the worker from the list and start a new Worker to replace the
      // current one.
      this.workers.splice(this.workers.indexOf(worker), 1);
      this.addNewWorker();
    });
    this.workers.push(worker);
    this.freeWorkers.push(worker);
    this.emit(kWorkerFreedEvent);
  }

  runTask(task, callback) {
    if (this.freeWorkers.length === 0) {
      // No free threads, wait until a worker thread becomes free.
      this.tasks.push({ task, callback });
      return;
    }

    const worker = this.freeWorkers.pop();
    worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
    worker.postMessage(task);
  }

  close() {
    for (const worker of this.workers) worker.terminate();
  }
}

module.exports = WorkerPool;

如果没有 WorkerPoolTaskInfo 对象添加的显式跟踪,回调似乎与各个 Worker 对象相关联。但是,Worker 的创建与任务的创建无关,并且不提供有关任务计划时间的信息。

¥Without the explicit tracking added by the WorkerPoolTaskInfo objects, it would appear that the callbacks are associated with the individual Worker objects. However, the creation of the Workers is not associated with the creation of the tasks and does not provide information about when tasks were scheduled.

该池可以按如下方式使用:

¥This pool could be used as follows:

import WorkerPool from './worker_pool.js';
import os from 'node:os';

const pool = new WorkerPool(os.availableParallelism());

let finished = 0;
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result);
    if (++finished === 10)
      pool.close();
  });
}const WorkerPool = require('./worker_pool.js');
const os = require('node:os');

const pool = new WorkerPool(os.availableParallelism());

let finished = 0;
for (let i = 0; i < 10; i++) {
  pool.runTask({ a: 42, b: 100 }, (err, result) => {
    console.log(i, err, result);
    if (++finished === 10)
      pool.close();
  });
}

AsyncResourceEventEmitter 集成#

¥Integrating AsyncResource with EventEmitter

EventEmitter 触发的事件监听器可能在与调用 eventEmitter.on() 时处于活动状态的执行上下文不同的执行上下文中运行。

¥Event listeners triggered by an EventEmitter may be run in a different execution context than the one that was active when eventEmitter.on() was called.

以下示例显示如何使用 AsyncResource 类将事件监听器与正确的执行上下文正确关联。相同的方法可以应用于 Stream 或类似的事件驱动类。

¥The following example shows how to use the AsyncResource class to properly associate an event listener with the correct execution context. The same approach can be applied to a Stream or a similar event-driven class.

import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';

const server = createServer((req, res) => {
  req.on('close', AsyncResource.bind(() => {
    // Execution context is bound to the current outer scope.
  }));
  req.on('close', () => {
    // Execution context is bound to the scope that caused 'close' to emit.
  });
  res.end();
}).listen(3000);const { createServer } = require('node:http');
const { AsyncResource, executionAsyncId } = require('node:async_hooks');

const server = createServer((req, res) => {
  req.on('close', AsyncResource.bind(() => {
    // Execution context is bound to the current outer scope.
  }));
  req.on('close', () => {
    // Execution context is bound to the scope that caused 'close' to emit.
  });
  res.end();
}).listen(3000);
Node.js 中文网 - 粤ICP备13048890号