readable.forEach(fn[, options])

稳定性: 1 - 实验

此方法允许迭代流。 对于流中的每个块,将调用 fn 函数。 如果 fn 函数返回 promise,则该 Promise 将被 await

此方法与 for await...of 循环不同,它可以选择性地同时处理块。 此外,forEach 迭代只能通过传入 signal 选项并中止相关 AbortController 来停止,而 for await...of 可以通过 breakreturn 停止。 无论哪种情况,流都将被销毁。

此方法与监听 'data' 事件不同,它使用底层机器中的 readable 事件,可以限制 fn 并发调用的数量。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// 使用同步谓词。
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
// 使用异步谓词,单次最多进行 2 个查询。
const resolver = new Resolver();
const dnsResults = await Readable.from([
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
  // 记录结果,类似于 `for await (const result of dnsResults)`
console.log('done'); // 流已结束

Stability: 1 - Experimental

  • fn <Function> | <AsyncFunction> a function to call on each chunk of the stream.
    • data <any> a chunk of data from the stream.
    • options <Object>
      • signal <AbortSignal> aborted if the stream is destroyed allowing to abort the fn call early.
  • options <Object>
    • concurrency <number> the maximum concurrent invocation of fn to call on the stream at once. Default: 1.
    • signal <AbortSignal> allows destroying the stream if the signal is aborted.
  • Returns: <Promise> a promise for when the stream has finished.

This method allows iterating a stream. For each chunk in the stream the fn function will be called. If the fn function returns a promise - that promise will be awaited.

This method is different from for await...of loops in that it can optionally process chunks concurrently. In addition, a forEach iteration can only be stopped by having passed a signal option and aborting the related AbortController while for await...of can be stopped with break or return. In either case the stream will be destroyed.

This method is different from listening to the 'data' event in that it uses the readable event in the underlying machinary and can limit the number of concurrent fn calls.

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = await Readable.from([
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
  // Logs result, similar to `for await (const result of dnsResults)`
console.log('done'); // Stream has finished