stream.pipeline(streams, callback)


模块方法,用于在流和生成器之间进行管道转发错误并正确清理并在管道完成时提供回调。

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// 使用管道 API 可以轻松地将一系列流传输到一起,
// 并在管道完全完成时收到通知。

// 有效地 gzip 潜在巨大的 tar 文件的管道:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  }
);

pipeline API 也是可 promise 化的:

const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

pipeline API 还支持异步生成器:

const pipeline = util.promisify(stream.pipeline);
const fs = require('fs');

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source) {
      source.setEncoding('utf8');  // 使用字符串而不是 `Buffer`。
      for await (const chunk of source) {
        yield chunk.toUpperCase();
      }
    },
    fs.createWriteStream('uppercase.txt')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

stream.pipeline() 将在所有流上调用 stream.destroy(err),除了:

  • 已触发 'end''close'Readable 流。
  • 已触发 'finish''close'Writable 流。

在调用 callback 后,stream.pipeline() 在流上留下悬空事件监听器。 在失败后重用流的情况下,这可能会导致事件监听器泄漏和吞下错误。

A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  }
);

The pipeline API is promisify-able as well:

const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

The pipeline API also supports async generators:

const pipeline = util.promisify(stream.pipeline);
const fs = require('fs');

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source) {
      source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
      for await (const chunk of source) {
        yield chunk.toUpperCase();
      }
    },
    fs.createWriteStream('uppercase.txt')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

stream.pipeline() will call stream.destroy(err) on all streams except:

  • Readable streams which have emitted 'end' or 'close'.
  • Writable streams which have emitted 'finish' or 'close'.

stream.pipeline() leaves dangling event listeners on the streams after the callback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.