stream.compose(...streams)


稳定性: 1 - stream.compose 是实验性的。

¥Stability: 1 - stream.compose is experimental.

将两个或多个流组合成一个 Duplex 流,该流写入第一个流并从最后一个流读取。每个提供的流都使用 stream.pipeline 通过管道传输到下一个。如果任何流出错,则所有流都将被销毁,包括外部 Duplex 流。

¥Combines two or more streams into a Duplex stream that writes to the first stream and reads from the last. Each provided stream is piped into the next, using stream.pipeline. If any of the streams error then all are destroyed, including the outer Duplex stream.

因为 stream.compose 返回一个新流,该流又可以(并且应该)通过管道传输到其他流中,所以它启用了组合。相反,当将流传递给 stream.pipeline 时,通常第一个流是可读流,最后一个流是可写流,形成一个闭合回路。

¥Because stream.compose returns a new stream that in turn can (and should) be piped into other streams, it enables composition. In contrast, when passing streams to stream.pipeline, typically the first stream is a readable stream and the last a writable stream, forming a closed circuit.

如果传递 Function,它必须是采用 source Iterable 的工厂方法。

¥If passed a Function it must be a factory method taking a source Iterable.

import { compose, Transform } from 'node:stream';

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''));
  },
});

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf;
}

console.log(res); // prints 'HELLOWORLD' 

stream.compose 可用于将异步迭代器、生成器和函数转换为流。

¥stream.compose can be used to convert async iterables, generators and functions into streams.

  • AsyncIterable 转换为可读的 Duplex。无法生成 null

    ¥AsyncIterable converts into a readable Duplex. Cannot yield null.

  • AsyncGeneratorFunction 转换为可读/可写的转换 Duplex。必须将源 AsyncIterable 作为第一个参数。无法生成 null

    ¥AsyncGeneratorFunction converts into a readable/writable transform Duplex. Must take a source AsyncIterable as first parameter. Cannot yield null.

  • AsyncFunction 转换为可写的 Duplex。必须返回 nullundefined

    ¥AsyncFunction converts into a writable Duplex. Must return either null or undefined.

import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
  yield 'Hello';
  yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
  for await (const chunk of source) {
    res += chunk;
  }
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD' 

stream.compose 作为操作符参见 readable.compose(stream)

¥See readable.compose(stream) for stream.compose as operator.