readable.filter(fn[, options])


稳定性: 1 - 实验性

  • fn <Function> | <AsyncFunction> 一个用于从流中过滤块的函数。
    • data <any> 从流中获取一块数据。
    • options <Object>
      • signal <AbortSignal> 如果流被销毁则中止,从而允许提前中止 fn 调用。
  • options <Object>
    • concurrency <number>fn 在流上同时调用的最大并发次数。默认值: 1
    • highWaterMark <number> 在等待用户消费过滤后的项目时要缓冲的项目数量。默认值: concurrency * 2 - 1
    • signal <AbortSignal> 允许在信号被中止时销毁流。
  • 返回:<Readable> 使用谓词 fn 过滤后的流。

此方法允许对流进行过滤。对于流中的每个数据块,将调用 fn 函数,如果它返回一个真值,该数据块将被传递到结果流中。如果 fn 函数返回一个 promise,那么该 promise 将被 await

【This method allows filtering the stream. For each chunk in the stream the fn function will be called and if it returns a truthy value, the chunk will be passed to the result stream. If the fn function returns a promise - that promise will be awaited.】

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).filter(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
  // Logs domains with more than 60 seconds on the resolved dns record.
  console.log(result);
}