stream.compose(...streams)
stream.compose
是实验的。streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>- 返回: <stream.Duplex>
将两个或多个流组合成一个 Duplex
流,其写入第一个流并从最后一个流读取。
每个提供的流都通过管道传输到下一个,使用 stream.pipeline
。
如果任何流错误,则所有流都将被销毁,包括外部的 Duplex
流。
因为 stream.compose
返回新的流,该流又可以(并且应该)通过管道传输到其他流中,所以它支持组合。
相比之下,当将流传到 stream.pipeline
时,通常第一个流是可读流,最后一个流是可写流,从而形成闭合回路。
如果传入了 Function
,则它必须是采用 source
Iterable
的工厂方法。
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // 打印 'HELLOWORLD'
stream.compose
可用于将异步迭代器、生成器和函数转换为流。
AsyncIterable
转换为可读的Duplex
。 无法产生null
。AsyncGeneratorFunction
转换为可读/可写的转换Duplex
。 必须将源AsyncIterable
作为第一个参数。 无法产生null
。AsyncFunction
转换为可写的Duplex
。 必须返回null
或undefined
。
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// 将 AsyncIterable 转换为可读的 Duplex。
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// 将 AsyncGenerator 转换为转换 Duplex。
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// 将 AsyncFunction 转换为可写的 Duplex。
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // 打印 'HELLOWORLD'
将 stream.compose
视为操作符的 readable.compose(stream)
。
stream.compose
is experimental.streams
<Stream[]> | <Iterable[]> | <AsyncIterable[]> | <Function[]>- Returns: <stream.Duplex>
Combines two or more streams into a Duplex
stream that writes to the
first stream and reads from the last. Each provided stream is piped into
the next, using stream.pipeline
. If any of the streams error then all
are destroyed, including the outer Duplex
stream.
Because stream.compose
returns a new stream that in turn can (and
should) be piped into other streams, it enables composition. In contrast,
when passing streams to stream.pipeline
, typically the first stream is
a readable stream and the last a writable stream, forming a closed
circuit.
If passed a Function
it must be a factory method taking a source
Iterable
.
import { compose, Transform } from 'node:stream';
const removeSpaces = new Transform({
transform(chunk, encoding, callback) {
callback(null, String(chunk).replace(' ', ''));
},
});
async function* toUpper(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
}
let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
res += buf;
}
console.log(res); // prints 'HELLOWORLD'
stream.compose
can be used to convert async iterables, generators and
functions into streams.
AsyncIterable
converts into a readableDuplex
. Cannot yieldnull
.AsyncGeneratorFunction
converts into a readable/writable transformDuplex
. Must take a sourceAsyncIterable
as first parameter. Cannot yieldnull
.AsyncFunction
converts into a writableDuplex
. Must return eithernull
orundefined
.
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';
// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
yield 'Hello';
yield 'World';
}());
// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
for await (const chunk of source) {
yield String(chunk).toUpperCase();
}
});
let res = '';
// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
for await (const chunk of source) {
res += chunk;
}
});
await finished(compose(s1, s2, s3));
console.log(res); // prints 'HELLOWORLD'
See readable.compose(stream)
for stream.compose
as operator.