stream.pipeline(streams, callback)
- 'streams' <Stream[]>
source<Stream> | <Iterable> | <AsyncIterable> | <Function>...转换<Stream> | <Function>source<AsyncIterable>- 返回:<AsyncIterable>
destination<Stream> | <Function>source<AsyncIterable>- 返回:<AsyncIterable> | <Promise>
callback<Function> 当流水线完全完成时调用。err<Error>val由destination返回的Promise的解析值。
- 返回:<Stream>
一个模块方法,用于在流和生成器之间进行传输,转发错误并正确清理,并在管道完成时提供回调。
🌐 A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.
const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge tar file efficiently:
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
); pipeline API 提供了一个 Promise 版本,它也可以在最后一个参数接收一个带有 signal <AbortSignal> 属性的 options 对象。当 signal 被中止时,底层管道的 destroy 方法将被调用,并抛出一个 AbortError。
🌐 The pipeline API provides a promise version, which can also
receive an options argument as the last parameter with a
signal <AbortSignal> property. When the signal is aborted,
destroy will be called on the underlying pipeline, with an
AbortError.
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz')
);
console.log('Pipeline succeeded.');
}
run().catch(console.error); 要使用 AbortSignal,请将其作为选项对象的一部分传入,作为最后一个参数:
🌐 To use an AbortSignal, pass it inside an options object,
as the last argument:
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
const ac = new AbortController();
const signal = ac.signal;
setTimeout(() => ac.abort(), 1);
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
{ signal },
);
}
run().catch(console.error); // AbortError pipeline API 也支持异步生成器:
🌐 The pipeline API also supports async generators:
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source, { signal }) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}
run().catch(console.error); 记住要处理传递给异步生成器的 signal 参数。尤其是在异步生成器作为管道源(即第一个参数)或管道永远不会完成的情况下。
🌐 Remember to handle the signal argument passed into the async generator.
Especially in the case where the async generator is the source for the
pipeline (i.e. first argument) or the pipeline will never complete.
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
async function run() {
await pipeline(
async function* ({ signal }) {
await someLongRunningfn({ signal });
yield 'asd';
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
}
run().catch(console.error); stream.pipeline() 会对除以下情况外的所有流调用 stream.destroy(err):
- 已经触发
'end'或'close'事件的Readable流。 - 已经触发
'finish'或'close'事件的Writable流。
stream.pipeline() 在 callback 被调用后,会在流上遗留悬挂的事件监听器。在流在失败后被重用的情况下,这可能导致事件监听器泄漏和错误被吞掉。
stream.pipeline() 在发生错误时会关闭所有流。
将 IncomingRequest 与 pipeline 一起使用可能会导致意外行为,因为它会在未发送预期响应的情况下销毁套接字。
请参阅下面的示例:
const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');
const server = http.createServer((req, res) => {
const fileStream = fs.createReadStream('./fileNotExist.txt');
pipeline(fileStream, res, (err) => {
if (err) {
console.log(err); // No such file
// this message can't be sent once `pipeline` already destroyed the socket
return res.end('error!!!');
}
});
});