stream.compose(...streams)


稳定性: 1 - stream.compose 是实验的。

将两个或多个流组合成一个 Duplex 流,其写入第一个流并从最后一个流读取。 每个提供的流都通过管道传输到下一个,使用 stream.pipeline。 如果任何流错误,则所有流都将被销毁,包括外部的 Duplex 流。

因为 stream.compose 返回新的流,该流又可以(并且应该)通过管道传输到其他流中,所以它支持组合。 相比之下,当将流传到 stream.pipeline 时,通常第一个流是可读流,最后一个流是可写流,从而形成闭合回路。

如果传入了 Function,则它必须是采用 source Iterable 的工厂方法。

import { compose, Transform } from 'node:stream';

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''));
  }
});

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf;
}

console.log(res); // 打印 'HELLOWORLD'

stream.compose 可用于将异步迭代器、生成器和函数转换为流。

  • AsyncIterable 转换为可读的 Duplex。 无法产生 null
  • AsyncGeneratorFunction 转换为可读/可写的转换 Duplex。 必须将源 AsyncIterable 作为第一个参数。 无法产生 null
  • AsyncFunction 转换为可写的 Duplex。 必须返回 nullundefined
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';

// 将 AsyncIterable 转换为可读的 Duplex。
const s1 = compose(async function*() {
  yield 'Hello';
  yield 'World';
}());

// 将 AsyncGenerator 转换为转换 Duplex。
const s2 = compose(async function*(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
});

let res = '';

// 将 AsyncFunction 转换为可写的 Duplex。
const s3 = compose(async function(source) {
  for await (const chunk of source) {
    res += chunk;
  }
});

await finished(compose(s1, s2, s3));

console.log(res); // 打印 'HELLOWORLD'

Stability: 1 - stream.compose is experimental.

Combines two or more streams into a Duplex stream that writes to the first stream and reads from the last. Each provided stream is piped into the next, using stream.pipeline. If any of the streams error then all are destroyed, including the outer Duplex stream.

Because stream.compose returns a new stream that in turn can (and should) be piped into other streams, it enables composition. In contrast, when passing streams to stream.pipeline, typically the first stream is a readable stream and the last a writable stream, forming a closed circuit.

If passed a Function it must be a factory method taking a source Iterable.

import { compose, Transform } from 'node:stream';

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''));
  }
});

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf;
}

console.log(res); // prints 'HELLOWORLD'

stream.compose can be used to convert async iterables, generators and functions into streams.

  • AsyncIterable converts into a readable Duplex. Cannot yield null.
  • AsyncGeneratorFunction converts into a readable/writable transform Duplex. Must take a source AsyncIterable as first parameter. Cannot yield null.
  • AsyncFunction converts into a writable Duplex. Must return either null or undefined.
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
  yield 'Hello';
  yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
  for await (const chunk of source) {
    res += chunk;
  }
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD'