readable.forEach(fn[, options])


稳定性: 1 - 实验性的

¥Stability: 1 - Experimental

  • fn <Function> | <AsyncFunction> 调用流的每个块的函数。

    ¥fn <Function> | <AsyncFunction> a function to call on each chunk of the stream.

    • data <any> 来自流的数据块。

      ¥data <any> a chunk of data from the stream.

    • options <Object>

      • signal <AbortSignal> 如果流被销毁则中止,允许提前中止 fn 调用。

        ¥signal <AbortSignal> aborted if the stream is destroyed allowing to abort the fn call early.

  • options <Object>

    • concurrency <number> 一次调用流的 fn 的最大并发调用数。默认值:1

      ¥concurrency <number> the maximum concurrent invocation of fn to call on the stream at once. Default: 1.

    • signal <AbortSignal> 如果信号中止,允许销毁流。

      ¥signal <AbortSignal> allows destroying the stream if the signal is aborted.

  • 返回:<Promise> 流完成时的 promise。

    ¥Returns: <Promise> a promise for when the stream has finished.

此方法允许迭代流。对于流中的每个块,将调用 fn 函数。如果 fn 函数返回一个 promise - 这个 promise 将会被 await

¥This method allows iterating a stream. For each chunk in the stream the fn function will be called. If the fn function returns a promise - that promise will be awaited.

此方法与 for await...of 循环的不同之处在于它可以选择并发处理块。此外,forEach 迭代只能通过传递 signal 选项并中止相关的 AbortController 来停止,而 for await...of 可以使用 breakreturn 停止。在任何一种情况下,流都将被销毁。

¥This method is different from for await...of loops in that it can optionally process chunks concurrently. In addition, a forEach iteration can only be stopped by having passed a signal option and aborting the related AbortController while for await...of can be stopped with break or return. In either case the stream will be destroyed.

该方法与监听 'data' 事件的不同之处在于,它使用底层机制中的 readable 事件,可以限制并发 fn 调用的数量。

¥This method is different from listening to the 'data' event in that it uses the readable event in the underlying machinary and can limit the number of concurrent fn calls.

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
  // Logs result, similar to `for await (const result of dnsResults)`
  console.log(result);
});
console.log('done'); // Stream has finished