stream.compose(...streams)
-
streams<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]> | <ReadableStream[]> | <WritableStream[]> | <TransformStream[]> -
¥Returns: <stream.Duplex>
将两个或多个流组合成一个 Duplex 流,该流写入第一个流并从最后一个流读取。每个提供的流都使用 stream.pipeline 通过管道传输到下一个。如果任何流出错,则所有流都将被销毁,包括外部 Duplex 流。
¥Combines two or more streams into a Duplex stream that writes to the
first stream and reads from the last. Each provided stream is piped into
the next, using stream.pipeline. If any of the streams error then all
are destroyed, including the outer Duplex stream.
因为 stream.compose 返回一个新流,该流又可以(并且应该)通过管道传输到其他流中,所以它启用了组合。相反,当将流传递给 stream.pipeline 时,通常第一个流是可读流,最后一个流是可写流,形成一个闭合回路。
¥Because stream.compose returns a new stream that in turn can (and
should) be piped into other streams, it enables composition. In contrast,
when passing streams to stream.pipeline, typically the first stream is
a readable stream and the last a writable stream, forming a closed
circuit.
如果传递 Function,它必须是采用 source Iterable 的工厂方法。
¥If passed a Function it must be a factory method taking a source
Iterable.
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // prints 'HELLOWORLD' stream.compose 可用于将异步迭代器、生成器和函数转换为流。
¥stream.compose can be used to convert async iterables, generators and
functions into streams.
-
AsyncIterable转换为可读的Duplex。无法生成null。¥
AsyncIterableconverts into a readableDuplex. Cannot yieldnull. -
AsyncGeneratorFunction转换为可读/可写的转换Duplex。必须将源AsyncIterable作为第一个参数。无法生成null。¥
AsyncGeneratorFunctionconverts into a readable/writable transformDuplex. Must take a sourceAsyncIterableas first parameter. Cannot yieldnull. -
AsyncFunction转换为可写的Duplex。必须返回null或undefined。¥
AsyncFunctionconverts into a writableDuplex. Must return eithernullorundefined.
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD' 将 stream.compose 作为操作符参见 readable.compose(stream)。
¥See readable.compose(stream) for stream.compose as operator.