Node.js v24.13.0 文档


#>

【Stream】

源代码: lib/stream.js

流是 Node.js 中用于处理流数据的抽象接口。 node:stream 模块提供了实现流接口的 API。

【A stream is an abstract interface for working with streaming data in Node.js. The node:stream module provides an API for implementing the stream interface.】

Node.js 提供了许多流对象。例如,向 HTTP 服务器发送请求process.stdout 都是流实例。

【There are many stream objects provided by Node.js. For instance, a request to an HTTP server and process.stdout are both stream instances.】

流可以是可读的、可写的,或者两者兼有。所有流都是 EventEmitter 的实例。

【Streams can be readable, writable, or both. All streams are instances of EventEmitter.】

要访问 node:stream 模块:

【To access the node:stream module:】

const stream = require('node:stream'); 

node:stream 模块对于创建新类型的流实例非常有用。通常不需要使用 node:stream 模块来消费流。

【The node:stream module is useful for creating new types of stream instances. It is usually not necessary to use the node:stream module to consume streams.】

本文档的组织#>

【Organization of this document】

本文件包含两个主要部分以及一个用于备注的第三部分。第一部分解释了如何在应用中使用现有的数据流。第二部分解释了如何创建新的数据流类型。

【This document contains two primary sections and a third section for notes. The first section explains how to use existing streams within an application. The second section explains how to create new types of streams.】

流的类型#>

【Types of streams】

Node.js 中有四种基本的流类型:

【There are four fundamental stream types within Node.js:】

此外,该模块还包括实用函数 stream.duplexPair()stream.pipeline()stream.finished()stream.Readable.from()stream.addAbortSignal()

【Additionally, this module includes the utility functions stream.duplexPair(), stream.pipeline(), stream.finished() stream.Readable.from(), and stream.addAbortSignal().】

流 Promise API#>

【Streams Promises API】

stream/promises API 提供了一组用于流的异步实用函数,这些函数返回 Promise 对象,而不是使用回调。该 API 可以通过 require('node:stream/promises')require('node:stream').promises 访问。

【The stream/promises API provides an alternative set of asynchronous utility functions for streams that return Promise objects rather than using callbacks. The API is accessible via require('node:stream/promises') or require('node:stream').promises.】

stream.pipeline(streams[, options])#>

stream.pipeline(source[, ...transforms], destination[, options])#>

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
  createReadStream('archive.tar'),
  createGzip(),
  createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

要使用 AbortSignal,将其作为选项对象传入,作为最后一个参数。当信号被中止时,底层管道将调用 destroy,并抛出 AbortError

【To use an AbortSignal, pass it inside an options object, as the last argument. When the signal is aborted, destroy will be called on the underlying pipeline, with an AbortError.】

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  const ac = new AbortController();
  const signal = ac.signal;

  setImmediate(() => ac.abort());
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz'),
    { signal },
  );
}

run().catch(console.error); // AbortErrorimport { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
  await pipeline(
    createReadStream('archive.tar'),
    createGzip(),
    createWriteStream('archive.tar.gz'),
    { signal },
  );
} catch (err) {
  console.error(err); // AbortError
}

pipeline API 也支持异步生成器:

【The pipeline API also supports async generators:】

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal });
      }
    },
    fs.createWriteStream('uppercase.txt'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal });
    }
  },
  createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

记住要处理传递给异步生成器的 signal 参数。尤其是在异步生成器作为管道源(即第一个参数)或管道永远不会完成的情况下。

【Remember to handle the signal argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.】

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
  await pipeline(
    async function* ({ signal }) {
      await someLongRunningfn({ signal });
      yield 'asd';
    },
    fs.createWriteStream('uppercase.txt'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
  async function* ({ signal }) {
    await someLongRunningfn({ signal });
    yield 'asd';
  },
  fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

pipeline API 提供 回调版本

【The pipeline API provides callback version:】

stream.finished(stream[, options])#>

const { finished } = require('node:stream/promises');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';

const rs = createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.

finished API 也提供了一个 回调版本

【The finished API also provides a callback version.】

stream.finished() 在返回的 Promise 被解决或拒绝后会留下悬空的事件监听器(特别是 'error''end''finish''close')。这样设计的原因是为了防止意外的 'error' 事件(由于流实现不正确)导致意外崩溃。如果不希望出现这种行为,应将 options.cleanup 设置为 true

await finished(rs, { cleanup: true }); 

对象模式#>

【Object mode】

所有由 Node.js API 创建的流仅操作字符串、<Buffer><TypedArray><DataView> 对象:

【All streams created by Node.js APIs operate exclusively on strings, <Buffer>, <TypedArray> and <DataView> objects:】

  • StringsBuffers 是与流一起使用的最常见类型。
  • TypedArrayDataView 让你可以使用像 Int32ArrayUint8Array 这样的类型来处理二进制数据。当你将 TypedArray 或 DataView 写入流时,Node.js 会处理原始字节。

然而,流实现也可以与其他类型的 JavaScript 值一起工作(null 除外,因为它在流中有特殊用途)。这样的流被认为是在“对象模式”下运行的。

【It is possible, however, for stream implementations to work with other types of JavaScript values (with the exception of null, which serves a special purpose within streams). Such streams are considered to operate in "object mode".】

流实例在创建时使用 objectMode 选项切换到对象模式。尝试将已有流切换到对象模式是不安全的。

【Stream instances are switched into object mode using the objectMode option when the stream is created. Attempting to switch an existing stream into object mode is not safe.】

缓冲#>

【Buffering】

WritableReadable 流都会将数据存储在内部缓冲区中。

【Both Writable and Readable streams will store data in an internal buffer.】

潜在缓存的数据量取决于传递给流构造函数的 highWaterMark 选项。对于普通流,highWaterMark 选项指定一个 字节总数。对于以对象模式运行的流,highWaterMark 指定对象的总数量。对于处理(但不解码)字符串的流,highWaterMark 指定 UTF-16 代码单元的总数量。

【The amount of data potentially buffered depends on the highWaterMark option passed into the stream's constructor. For normal streams, the highWaterMark option specifies a total number of bytes. For streams operating in object mode, the highWaterMark specifies a total number of objects. For streams operating on (but not decoding) strings, the highWaterMark specifies a total number of UTF-16 code units.】

当实现调用 stream.push(chunk) 时,数据会在 Readable 流中被缓冲。如果流的使用者不调用 stream.read(),数据将一直保存在内部队列中,直到被消费。

【Data is buffered in Readable streams when the implementation calls stream.push(chunk). If the consumer of the Stream does not call stream.read(), the data will sit in the internal queue until it is consumed.】

一旦内部读取缓冲区的总大小达到 highWaterMark 指定的阈值,流将暂时停止从底层资源读取数据,直到当前缓冲的数据被消耗(也就是说,流将停止调用用于填充读取缓冲区的内部 readable._read() 方法)。

【Once the total size of the internal read buffer reaches the threshold specified by highWaterMark, the stream will temporarily stop reading data from the underlying resource until the data currently buffered can be consumed (that is, the stream will stop calling the internal readable._read() method that is used to fill the read buffer).】

当反复调用 [writable.write(chunk)][stream-write] 方法时,数据会被缓存在 Writable 流中。当内部写入缓冲区的总大小低于 highWaterMark 设置的阈值时,调用 writable.write() 将返回 true。一旦内部缓冲区的大小达到或超过 highWaterMark,则会返回 false

【Data is buffered in Writable streams when the writable.write(chunk) method is called repeatedly. While the total size of the internal write buffer is below the threshold set by highWaterMark, calls to writable.write() will return true. Once the size of the internal buffer reaches or exceeds the highWaterMark, false will be returned.】

stream API 的一个主要目标,特别是 stream.pipe() 方法,是将数据缓冲限制在可接受的水平,以便不同速度的来源和目的地不会超出可用内存的承载能力。

【A key goal of the stream API, particularly the stream.pipe() method, is to limit the buffering of data to acceptable levels such that sources and destinations of differing speeds will not overwhelm the available memory.】

highWaterMark 选项是一个阈值,而不是限制:它决定了流在停止请求更多数据之前缓存的数据量。它通常不会强制执行严格的内存限制。具体的流实现可能会选择执行更严格的限制,但这是可选的。

【The highWaterMark option is a threshold, not a limit: it dictates the amount of data that a stream buffers before it stops asking for more data. It does not enforce a strict memory limitation in general. Specific stream implementations may choose to enforce stricter limits but doing so is optional.】

因为 DuplexTransform 流都是 ReadableWritable,每个都维护着两个独立的内部缓冲区,用于读取和写入,这使得每一侧可以独立于另一侧操作,同时保持适当且高效的数据流。例如,net.Socket 实例是 Duplex 流,它的 Readable 端允许消费从套接字接收的数据,而它的 Writable 端允许向套接字写入数据。由于写入套接字的数据速率可能比接收数据的速率快或慢,每一侧都应当独立于另一侧进行操作(并缓冲)。

【Because Duplex and Transform streams are both Readable and Writable, each maintains two separate internal buffers used for reading and writing, allowing each side to operate independently of the other while maintaining an appropriate and efficient flow of data. For example, net.Socket instances are Duplex streams whose Readable side allows consumption of data received from the socket and whose Writable side allows writing data to the socket. Because data may be written to the socket at a faster or slower rate than data is received, each side should operate (and buffer) independently of the other.】

内部缓冲的机制是内部实现细节,可能会随时更改。然而,对于某些高级实现,可以使用 writable.writableBufferreadable.readableBuffer 来获取内部缓冲。不建议使用这些未记录的属性。

【The mechanics of the internal buffering are an internal implementation detail and may be changed at any time. However, for certain advanced implementations, the internal buffers can be retrieved using writable.writableBuffer or readable.readableBuffer. Use of these undocumented properties is discouraged.】

流消费者的 API#>

【API for stream consumers】

几乎所有的 Node.js 应用,无论多么简单,都会以某种方式使用流。下面是一个在 Node.js 应用中使用流的示例,该应用实现了一个 HTTP 服务器:

【Almost all Node.js applications, no matter how simple, use streams in some manner. The following is an example of using streams in a Node.js application that implements an HTTP server:】

const http = require('node:http');

const server = http.createServer((req, res) => {
  // `req` is an http.IncomingMessage, which is a readable stream.
  // `res` is an http.ServerResponse, which is a writable stream.

  let body = '';
  // Get the data as utf8 strings.
  // If an encoding is not set, Buffer objects will be received.
  req.setEncoding('utf8');

  // Readable streams emit 'data' events once a listener is added.
  req.on('data', (chunk) => {
    body += chunk;
  });

  // The 'end' event indicates that the entire body has been received.
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // Write back something interesting to the user:
      res.write(typeof data);
      res.end();
    } catch (er) {
      // uh oh! bad json!
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON 

Writable 流(例如示例中的 res)提供了诸如 write()end() 等方法,用于向流中写入数据。

Readable 流使用 EventEmitter API 来通知应用代码当有数据可以从流中读取时。这些可用的数据可以通过多种方式从流中读取。

WritableReadable 流都以各种方式使用 EventEmitter API 来传达流的当前状态。

【Both Writable and Readable streams use the EventEmitter API in various ways to communicate the current state of the stream.】

DuplexTransform 流都是 WritableReadable

向流写入数据或从流读取数据的应用不需要直接实现流接口,通常也没有理由调用 require('node:stream')

【Applications that are either writing data to or consuming data from a stream are not required to implement the stream interfaces directly and will generally have no reason to call require('node:stream').】

希望实现新类型流的开发者应参考第流实现者的 API节。

【Developers wishing to implement new types of streams should refer to the section API for stream implementers.】

可写流#>

【Writable streams】

可写流是一种用于将数据写入到某个 目标 的抽象。

【Writable streams are an abstraction for a destination to which data is written.】

Writable 流的示例包括:

【Examples of Writable streams include:】

其中一些例子实际上是实现了 Writable 接口的 Duplex 流。

【Some of these examples are actually Duplex streams that implement the Writable interface.】

所有 Writable 流都实现了 stream.Writable 类定义的接口。

【All Writable streams implement the interface defined by the stream.Writable class.】

虽然 Writable 流的具体实例可能在各种方面有所不同,但所有 Writable 流都遵循与下面示例中所示相同的基本使用模式:

【While specific instances of Writable streams may differ in various ways, all Writable streams follow the same fundamental usage pattern as illustrated in the example below:】

const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data'); 

类:stream.Writable#>

【Class: stream.Writable

事件:'close'#>

【Event: 'close'

'close' 事件在流及其底层资源(例如文件描述符)被关闭时触发。该事件表示不会再发出更多事件,也不会进行进一步的计算。

【The 'close' event is emitted when the stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur.】

如果使用 emitClose 选项创建 Writable 流,它总是会发出 'close' 事件。

【A Writable stream will always emit the 'close' event if it is created with the emitClose option.】

事件:'drain'#>

【Event: 'drain'

如果对 stream.write(chunk) 的调用返回 false,当适合恢复向流写入数据时,将会触发 'drain' 事件。

【If a call to stream.write(chunk) returns false, the 'drain' event will be emitted when it is appropriate to resume writing data to the stream.】

// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // Last time!
        writer.write(data, encoding, callback);
      } else {
        // See if we should continue, or wait.
        // Don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // Had to stop early!
      // Write some more once it drains.
      writer.once('drain', write);
    }
  }
} 

事件:'错误'#>

【Event: 'error'

如果在写入或传输数据时发生错误,将会触发 'error' 事件。调用监听器回调时,会传入一个 Error 参数。

【The 'error' event is emitted if an error occurred while writing or piping data. The listener callback is passed a single Error argument when called.】

除非在创建流时将 autoDestroy 选项设置为 false,否则当 'error' 事件被触发时,流将被关闭。

【The stream is closed when the 'error' event is emitted unless the autoDestroy option was set to false when creating the stream.】

在“error”之后,除了“close”之外,不应再触发其他事件(包括“error”事件)。

【After 'error', no further events other than 'close' should be emitted (including 'error' events).】

事件:'finish'#>

【Event: 'finish'

在调用 stream.end() 方法并且所有数据已被刷新到底层系统后,会触发 'finish' 事件。

【The 'finish' event is emitted after the stream.end() method has been called, and all data has been flushed to the underlying system.】

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
  console.log('All writes are now complete.');
});
writer.end('This is the end\n'); 

事件:'pipe'#>

【Event: 'pipe'

当在可读流上调用 stream.pipe() 方法时,会触发 'pipe' 事件,将此可写流添加到其目标集合中。

【The 'pipe' event is emitted when the stream.pipe() method is called on a readable stream, adding this writable to its set of destinations.】

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.log('Something is piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer); 

事件:'unpipe'#>

【Event: 'unpipe'

当在 Readable 流上调用 stream.unpipe() 方法时,会触发 'unpipe' 事件,从其目标集合中移除该 Writable

【The 'unpipe' event is emitted when the stream.unpipe() method is called on a Readable stream, removing this Writable from its set of destinations.】

如果当一个 Readable 流管道进入 Writable 流时出现错误,也会发出这个。

【This is also emitted in case this Writable stream emits an error when a Readable stream pipes into it.】

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.log('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer); 
writable.cork()#>

writable.cork() 方法会强制将所有写入的数据缓存在内存中。缓冲的数据将在调用 stream.uncork()stream.end() 方法时被刷新。

【The writable.cork() method forces all written data to be buffered in memory. The buffered data will be flushed when either the stream.uncork() or stream.end() methods are called.】

writable.cork() 的主要目的是应对在流中快速连续写入多个小数据块的情况。writable.cork() 不会立即将这些数据块转发到底层目标,而是会将所有数据块缓冲起来,直到调用 writable.uncork() 为止,此时如果存在 writable._writev(),则会将所有数据块一次性传递给它。这样可以避免出现头部阻塞的情况,即在等待第一个小数据块处理时其他数据被缓冲。然而,如果在未实现 writable._writev() 的情况下使用 writable.cork() 可能会对吞吐量产生不利影响。

【The primary intent of writable.cork() is to accommodate a situation in which several small chunks are written to the stream in rapid succession. Instead of immediately forwarding them to the underlying destination, writable.cork() buffers all the chunks until writable.uncork() is called, which will pass them all to writable._writev(), if present. This prevents a head-of-line blocking situation where data is being buffered while waiting for the first small chunk to be processed. However, use of writable.cork() without implementing writable._writev() may have an adverse effect on throughput.】

另请参阅:writable.uncork()writable._writev()

【See also: writable.uncork(), writable._writev().】

writable.destroy([error])#>
  • error <Error> 可选,通过 'error' 事件发出的错误。
  • 返回:<this>

销毁流。可以选择触发一个 'error' 事件,并触发一个 'close' 事件(除非 emitClose 设置为 false)。调用此方法后,可写流将结束,随后对 write()end() 的调用将导致 ERR_STREAM_DESTROYED 错误。 这种方式会立即且破坏性地销毁流。先前对 write() 的调用可能尚未完成,并可能触发 ERR_STREAM_DESTROYED 错误。如果数据应在关闭前刷新,应使用 end() 而不是 destroy,或者在销毁流之前等待 'drain' 事件。

【Destroy the stream. Optionally emit an 'error' event, and emit a 'close' event (unless emitClose is set to false). After this call, the writable stream has ended and subsequent calls to write() or end() will result in an ERR_STREAM_DESTROYED error. This is a destructive and immediate way to destroy a stream. Previous calls to write() may not have drained, and may trigger an ERR_STREAM_DESTROYED error. Use end() instead of destroy if data should flush before close, or wait for the 'drain' event before destroying the stream.】

const { Writable } = require('node:stream');

const myStream = new Writable();

const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error 
const { Writable } = require('node:stream');

const myStream = new Writable();

myStream.destroy();
myStream.on('error', function wontHappen() {}); 
const { Writable } = require('node:stream');

const myStream = new Writable();
myStream.destroy();

myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED 

一旦调用了 destroy(),任何后续调用都将不再起作用,并且除了 _destroy() 外,不会再触发任何 'error' 错误。

【Once destroy() has been called any further calls will be a no-op and no further errors except from _destroy() may be emitted as 'error'.】

实现者不应覆盖此方法,而应实现 writable._destroy()

【Implementors should not override this method, but instead implement writable._destroy().】

writable.closed#>

'close' 事件被触发后为 true

【Is true after 'close' has been emitted.】

writable.destroyed#>

在调用 writable.destroy() 之后为 true

【Is true after writable.destroy() has been called.】

const { Writable } = require('node:stream');

const myStream = new Writable();

console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true 
writable.end([chunk[, encoding]][, callback])#>

调用 writable.end() 方法表示不会再向 Writable 写入更多数据。可选的 chunkencoding 参数允许在关闭流之前立即写入最后一块数据。

【Calling the writable.end() method signals that no more data will be written to the Writable. The optional chunk and encoding arguments allow one final additional chunk of data to be written immediately before closing the stream.】

在调用 stream.end() 后调用 stream.write() 方法会引发错误。

【Calling the stream.write() method after calling stream.end() will raise an error.】

// Write 'hello, ' and then end with 'world!'.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed! 
writable.setDefaultEncoding(encoding)#>

writable.setDefaultEncoding() 方法用于设置 Writable 流的默认 encoding

【The writable.setDefaultEncoding() method sets the default encoding for a Writable stream.】

writable.uncork()#>

writable.uncork() 方法会刷新自 stream.cork() 被调用以来缓冲的所有数据。

【The writable.uncork() method flushes all data buffered since stream.cork() was called.】

在使用 writable.cork()writable.uncork() 来管理对流的写入缓冲时,使用 process.nextTick() 来延迟调用 writable.uncork()。这样可以将所有在特定 Node.js 事件循环阶段发生的 writable.write() 调用进行批处理。

【When using writable.cork() and writable.uncork() to manage the buffering of writes to a stream, defer calls to writable.uncork() using process.nextTick(). Doing so allows batching of all writable.write() calls that occur within a given Node.js event loop phase.】

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork()); 

如果在同一个流上多次调用 writable.cork() 方法,则必须调用相同次数的 writable.uncork() 来刷新缓冲的数据。

【If the writable.cork() method is called multiple times on a stream, the same number of calls to writable.uncork() must be called to flush the buffered data.】

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();
  // The data will not be flushed until uncork() is called a second time.
  stream.uncork();
}); 

另请参见:writable.cork()

【See also: writable.cork().】

writable.writable#>

如果调用 writable.write() 是安全的,则为 true,这意味着流尚未被销毁、出错或结束。

【Is true if it is safe to call writable.write(), which means the stream has not been destroyed, errored, or ended.】

writable.writableAborted#>

返回流在发出 'finish' 之前是否已被销毁或发生错误。

【Returns whether the stream was destroyed or errored before emitting 'finish'.】

writable.writableEnded#>

[writable.end()][writable.end()] 被调用后为 true。此属性并不表示数据是否已刷新,如需检查数据是否已刷新,请改用 writable.writableFinished

【Is true after writable.end() has been called. This property does not indicate whether the data has been flushed, for this use writable.writableFinished instead.】

writable.writableCorked#>

完全打开流所需调用 writable.uncork() 的次数。

【Number of times writable.uncork() needs to be called in order to fully uncork the stream.】

writable.errored#>

如果流因错误而被销毁,则返回错误。

【Returns error if the stream has been destroyed with an error.】

writable.writableFinished#>

在触发 'finish' 事件之前立即设置为 true

【Is set to true immediately before the 'finish' event is emitted.】

writable.writableHighWaterMark#>

返回在创建此 Writable 时传入的 highWaterMark 的值。

【Return the value of highWaterMark passed when creating this Writable.】

writable.writableLength#>

此属性包含队列中准备写入的字节数(或对象数)。该值提供有关 highWaterMark 状态的内部监控数据。

【This property contains the number of bytes (or objects) in the queue ready to be written. The value provides introspection data regarding the status of the highWaterMark.】

writable.writableNeedDrain#>

如果流的缓冲区已满并且流将触发 'drain',则为 true

【Is true if the stream's buffer has been full and stream will emit 'drain'.】

writable.writableObjectMode#>

获取给定 Writable 流的 objectMode 属性的值。

【Getter for the property objectMode of a given Writable stream.】

writable[Symbol.asyncDispose]()#>

使用 AbortError 调用 writable.destroy(),并返回一个在流完成时兑现的 promise。

【Calls writable.destroy() with an AbortError and returns a promise that fulfills when the stream is finished.】

writable.write(chunk[, encoding][, callback])#>
  • chunk <string> | <Buffer> | <TypedArray> | <DataView> | <any> 要写入的可选数据。对于非对象模式的流,chunk 必须是 <string><Buffer><TypedArray><DataView>。对于对象模式的流,chunk 可以是除 null 之外的任何 JavaScript 值。
  • encoding <string> | <null> 如果 chunk 是字符串,则指定其编码。默认值: 'utf8'
  • callback <Function> 当这一块数据被刷新时的回调。
  • 返回值:<boolean> 如果流希望调用代码在继续写入额外数据之前等待 'drain' 事件被触发,则返回 false;否则返回 true

writable.write() 方法将一些数据写入流,并在数据完全处理后调用提供的 callback。如果发生错误,callback 将以错误作为其第一个参数被调用。callback 是异步调用的,并且在 'error' 事件触发之前调用。

【The writable.write() method writes some data to the stream, and calls the supplied callback once the data has been fully handled. If an error occurs, the callback will be called with the error as its first argument. The callback is called asynchronously and before 'error' is emitted.】

如果在接收 chunk 后内部缓冲区小于创建流时配置的 highWaterMark,返回值为 true。如果返回 false,应停止向流写入数据,直到发出 'drain' 事件。

【The return value is true if the internal buffer is less than the highWaterMark configured when the stream was created after admitting chunk. If false is returned, further attempts to write data to the stream should stop until the 'drain' event is emitted.】

当流没有排空时,对 write() 的调用会将 chunk 缓存起来,并返回 false。一旦所有当前缓冲的块被排空(被操作系统接受交付),将触发 'drain' 事件。一旦 write() 返回 false,不要写入更多的块,直到 'drain' 事件被触发。虽然可以在流未排空时调用 write(),Node.js 会缓存所有写入的块,直到达到最大内存使用量,此时将无条件中止。即使在中止之前,高内存使用也会导致垃圾回收器性能下降和 RSS 占用高(即使内存不再需要,也通常不会释放回系统)。由于如果远程端没有读取数据,TCP 套接字可能永远不会排空,因此向未排空的套接字写入数据可能导致远程可利用的安全漏洞。

【While a stream is not draining, calls to write() will buffer chunk, and return false. Once all currently buffered chunks are drained (accepted for delivery by the operating system), the 'drain' event will be emitted. Once write() returns false, do not write more chunks until the 'drain' event is emitted. While calling write() on a stream that is not draining is allowed, Node.js will buffer all written chunks until maximum memory usage occurs, at which point it will abort unconditionally. Even before it aborts, high memory usage will cause poor garbage collector performance and high RSS (which is not typically released back to the system, even after the memory is no longer required). Since TCP sockets may never drain if the remote peer does not read the data, writing a socket that is not draining may lead to a remotely exploitable vulnerability.】

在流未排空时写入数据对 Transform 来说特别有问题,因为 Transform 流默认是暂停的,直到它们被管道连接或添加了 'data''readable' 事件处理程序。

【Writing data while the stream is not draining is particularly problematic for a Transform, because the Transform streams are paused by default until they are piped or a 'data' or 'readable' event handler is added.】

如果要写入的数据可以按需生成或获取,建议将逻辑封装到 Readable 中并使用 stream.pipe()。然而,如果更倾向于调用 write(),可以使用 'drain' 事件来遵守背压并避免内存问题:

【If the data to be written can be generated or fetched on demand, it is recommended to encapsulate the logic into a Readable and use stream.pipe(). However, if calling write() is preferred, it is possible to respect backpressure and avoid memory issues using the 'drain' event:】

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// Wait for cb to be called before doing any other write.
write('hello', () => {
  console.log('Write completed, do more writes now.');
}); 

处于对象模式的 Writable 流将始终忽略 encoding 参数。

【A Writable stream in object mode will always ignore the encoding argument.】

可读流#>

【Readable streams】

可读流是一种用于从中消耗数据的 来源 的抽象。

【Readable streams are an abstraction for a source from which data is consumed.】

Readable 流的示例包括:

【Examples of Readable streams include:】

所有 Readable 流都实现了 stream.Readable 类定义的接口。

【All Readable streams implement the interface defined by the stream.Readable class.】

两种读取模式#>

【Two reading modes】

Readable 流实际上有两种工作模式:流动模式和暂停模式。这些模式与 对象模式 无关。Readable 流可以处于对象模式,也可以不处于对象模式,无论它是处于流动模式还是暂停模式。

  • 在流模式下,数据会自动从底层系统读取,并通过 EventEmitter 接口使用事件尽可能快速地提供给应用。
  • 在暂停模式下,必须显式调用 stream.read() 方法从流中读取数据块。

所有 Readable 流都是以暂停模式开始的,但可以通过以下方式之一切换到流动模式:

【All Readable streams begin in paused mode but can be switched to flowing mode in one of the following ways:】

Readable 可以使用以下方法之一切换回暂停模式:

【The Readable can switch back to paused mode using one of the following:】

  • 如果没有管道目标,可以调用 stream.pause() 方法。
  • 如果有管道目标,可以通过删除所有管道目标来实现。可以通过调用stream.unpipe()方法来删除多个管道目标。

需要记住的重要概念是,Readable 不会生成数据,除非提供了用于消费或忽略该数据的机制。如果消费机制被禁用或移除,Readable 将尝试停止生成数据。

【The important concept to remember is that a Readable will not generate data until a mechanism for either consuming or ignoring that data is provided. If the consuming mechanism is disabled or taken away, the Readable will attempt to stop generating the data.】

出于向后兼容的原因,移除 'data' 事件处理程序不会自动暂停流。此外,如果存在管道目标,那么调用 stream.pause() 并不能保证在这些目标消耗完数据并请求更多数据后,流会_保持_暂停状态。

【For backward compatibility reasons, removing 'data' event handlers will not automatically pause the stream. Also, if there are piped destinations, then calling stream.pause() will not guarantee that the stream will remain paused once those destinations drain and ask for more data.】

如果将 Readable 切换到流动模式,而没有可用的消费者来处理数据,那么这些数据将会丢失。例如,当调用 readable.resume() 方法而没有为 'data' 事件附加监听器,或者从流中移除了 'data' 事件处理程序时,就可能发生这种情况。

【If a Readable is switched into flowing mode and there are no consumers available to handle the data, that data will be lost. This can occur, for instance, when the readable.resume() method is called without a listener attached to the 'data' event, or when a 'data' event handler is removed from the stream.】

添加 'readable' 事件处理程序会自动使流停止流动,数据必须通过 readable.read() 消耗。如果移除 'readable' 事件处理程序,那么如果存在 'data' 事件处理程序,流将会再次开始流动。

【Adding a 'readable' event handler automatically makes the stream stop flowing, and the data has to be consumed via readable.read(). If the 'readable' event handler is removed, then the stream will start flowing again if there is a 'data' event handler.】

三种状态#>

【Three states】

Readable 流的“两种模式”操作是对 Readable 流实现中更复杂的内部状态管理的一种简化抽象。

【The "two modes" of operation for a Readable stream are a simplified abstraction for the more complicated internal state management that is happening within the Readable stream implementation.】

具体来说,在任意时刻,每个 Readable 都处于三种可能状态之一:

【Specifically, at any given point in time, every Readable is in one of three possible states:】

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

readable.readableFlowingnull 时,不会提供用于消费流数据的机制。因此,流将不会生成数据。在这种状态下,附加 'data' 事件的监听器、调用 readable.pipe() 方法或调用 readable.resume() 方法将会把 readable.readableFlowing 改为 true,使 Readable 开始在生成数据时主动触发事件。

【When readable.readableFlowing is null, no mechanism for consuming the stream's data is provided. Therefore, the stream will not generate data. While in this state, attaching a listener for the 'data' event, calling the readable.pipe() method, or calling the readable.resume() method will switch readable.readableFlowing to true, causing the Readable to begin actively emitting events as data is generated.】

调用 readable.pause()readable.unpipe() 或遇到背压(backpressure)会导致 readable.readableFlowing 被设置为 false,暂时停止事件的流动,但不会停止数据的生成。在这种状态下,附加 'data' 事件的监听器不会将 readable.readableFlowing 切换为 true

【Calling readable.pause(), readable.unpipe(), or receiving backpressure will cause the readable.readableFlowing to be set as false, temporarily halting the flowing of events but not halting the generation of data. While in this state, attaching a listener for the 'data' event will not switch readable.readableFlowing to true.】

const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false.

pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing is still false.
pass.write('ok');  // Will not emit 'data'.
pass.resume();     // Must be called to make stream emit 'data'.
// readableFlowing is now true. 

readable.readableFlowingfalse 时,数据可能会在流的内部缓冲区中积累。

【While readable.readableFlowing is false, data may be accumulating within the stream's internal buffer.】

选择一种 API 风格#>

【Choose one API style】

Readable 流 API 在多个 Node.js 版本中不断发展,提供了多种消费流数据的方法。通常,开发者应选择一种数据消费方法,并且绝不应同时使用多种方法从单个流中读取数据。具体来说,结合使用 on('data')on('readable')pipe() 或异步迭代器可能会导致不可预期的行为。

【The Readable stream API evolved across multiple Node.js versions and provides multiple methods of consuming stream data. In general, developers should choose one of the methods of consuming data and should never use multiple methods to consume data from a single stream. Specifically, using a combination of on('data'), on('readable'), pipe(), or async iterators could lead to unintuitive behavior.】

类:stream.Readable#>

【Class: stream.Readable

事件:'close'#>

【Event: 'close'

'close' 事件在流及其底层资源(例如文件描述符)被关闭时触发。该事件表示不会再发出更多事件,也不会进行进一步的计算。

【The 'close' event is emitted when the stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur.】

如果使用 emitClose 选项创建 Readable 流,它总是会发出 'close' 事件。

【A Readable stream will always emit the 'close' event if it is created with the emitClose option.】

事件:'data'#>

【Event: 'data'

  • chunk <Buffer> | <string> | <any> 数据块。对于不以对象模式运行的流,数据块将是字符串或 Buffer。对于以对象模式运行的流,数据块可以是任意 JavaScript 值,但不能是 null

'data' 事件会在流将数据块的所有权交给消费者时触发。每当通过调用 readable.pipe()readable.resume() 或为 'data' 事件附加监听回调将流切换为流动模式时,都可能发生这种情况。每当调用 readable.read() 方法且有可返回的数据块时,'data' 事件也会被触发。

【The 'data' event is emitted whenever the stream is relinquishing ownership of a chunk of data to a consumer. This may occur whenever the stream is switched in flowing mode by calling readable.pipe(), readable.resume(), or by attaching a listener callback to the 'data' event. The 'data' event will also be emitted whenever the readable.read() method is called and a chunk of data is available to be returned.】

'data' 事件监听器附加到未明确暂停的流上,会使流切换到流动模式。数据将会在可获取时立即传递。

【Attaching a 'data' event listener to a stream that has not been explicitly paused will switch the stream into flowing mode. Data will then be passed as soon as it is available.】

如果流使用 readable.setEncoding() 方法指定了默认编码,监听器回调将以字符串形式接收数据块;否则,数据将以 Buffer 形式传递。

【The listener callback will be passed the chunk of data as a string if a default encoding has been specified for the stream using the readable.setEncoding() method; otherwise the data will be passed as a Buffer.】

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
}); 

事件:'end'#>

【Event: 'end'

当流中没有更多数据可供消费时,会触发 'end' 事件。

【The 'end' event is emitted when there is no more data to be consumed from the stream.】

除非数据被完全消耗,否则不会触发 'end' 事件。这可以通过将流切换到流动模式,或重复调用 stream.read(),直到所有数据被消耗完来实现。

【The 'end' event will not be emitted unless the data is completely consumed. This can be accomplished by switching the stream into flowing mode, or by calling stream.read() repeatedly until all data has been consumed.】

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
}); 

事件:'错误'#>

【Event: 'error'

Readable 实现的 'error' 事件可能在任何时候被触发。通常,如果底层流由于内部故障无法生成数据,或者当流实现尝试推送无效的数据块时,就可能发生这种情况。

【The 'error' event may be emitted by a Readable implementation at any time. Typically, this may occur if the underlying stream is unable to generate data due to an underlying internal failure, or when a stream implementation attempts to push an invalid chunk of data.】

监听器回调将会接收到一个单独的 Error 对象。

【The listener callback will be passed a single Error object.】

事件:'pause'#>

【Event: 'pause'

当调用 stream.pause()readableFlowing 不为 false 时,会触发 'pause' 事件。

【The 'pause' event is emitted when stream.pause() is called and readableFlowing is not false.】

事件:'readable'#>

【Event: 'readable'

'readable' 事件在流中有可读取的数据时触发,直到达到配置的高水位标记(state.highWaterMark)。实际上,它表示流的缓冲区中有新的信息。如果缓冲区中有数据,可以调用 stream.read() 来获取这些数据。此外,当流的末端已经到达时,也可能会触发 'readable' 事件。

【The 'readable' event is emitted when there is data available to be read from the stream, up to the configured high water mark (state.highWaterMark). Effectively, it indicates that the stream has new information within the buffer. If data is available within this buffer, stream.read() can be called to retrieve that data. Additionally, the 'readable' event may also be emitted when the end of the stream has been reached.】

const readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // There is some data to read now.
  let data;

  while ((data = this.read()) !== null) {
    console.log(data);
  }
}); 

如果已经到达流的末端,调用 stream.read() 将返回 null 并触发 'end' 事件。如果根本没有任何数据可供读取,这同样适用。例如,在下面的例子中,foo.txt 是一个空文件:

【If the end of the stream has been reached, calling stream.read() will return null and trigger the 'end' event. This is also true if there never was any data to be read. For instance, in the following example, foo.txt is an empty file:】

const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
  console.log('end');
}); 

运行此脚本的输出是:

【The output of running this script is:】

$ node test.js
readable: null
end 

在某些情况下,为 'readable' 事件附加监听器会导致一定量的数据被读取到内部缓冲区中。

【In some cases, attaching a listener for the 'readable' event will cause some amount of data to be read into an internal buffer.】

总体而言,readable.pipe()'data' 事件机制比 'readable' 事件更容易理解。然而,处理 'readable' 可能会提高吞吐量。

【In general, the readable.pipe() and 'data' event mechanisms are easier to understand than the 'readable' event. However, handling 'readable' might result in increased throughput.】

如果同时使用 'readable''data''readable' 在控制流程时具有优先权,即只有在调用 stream.read() 时才会触发 'data'readableFlowing 属性将变为 false。如果在移除 'readable' 时存在 'data' 监听器,流将开始流动,即无需调用 .resume() 就会触发 'data' 事件。

【If both 'readable' and 'data' are used at the same time, 'readable' takes precedence in controlling the flow, i.e. 'data' will be emitted only when stream.read() is called. The readableFlowing property would become false. If there are 'data' listeners when 'readable' is removed, the stream will start flowing, i.e. 'data' events will be emitted without calling .resume().】

事件:'resume'#>

【Event: 'resume'

当调用 stream.resume()readableFlowing 不为 true 时,会触发 'resume' 事件。

【The 'resume' event is emitted when stream.resume() is called and readableFlowing is not true.】

readable.destroy([error])#>
  • error <Error> 将作为 'error' 事件的负载传递的错误
  • 返回:<this>

销毁流。可选择触发一个 'error' 事件,并触发一个 'close' 事件(除非 emitClose 设置为 false)。在此调用之后,可读流将释放任何内部资源,后续对 push() 的调用将被忽略。

【Destroy the stream. Optionally emit an 'error' event, and emit a 'close' event (unless emitClose is set to false). After this call, the readable stream will release any internal resources and subsequent calls to push() will be ignored.】

一旦调用了 destroy(),任何后续调用都将不再起作用,并且除了 _destroy() 外,不会再触发任何 'error' 错误。

【Once destroy() has been called any further calls will be a no-op and no further errors except from _destroy() may be emitted as 'error'.】

实现者不应重写此方法,而应实现 readable._destroy()

【Implementors should not override this method, but instead implement readable._destroy().】

readable.closed#>

'close' 事件被触发后为 true

【Is true after 'close' has been emitted.】

readable.destroyed#>

在调用 readable.destroy() 之后为 true

【Is true after readable.destroy() has been called.】

readable.isPaused()#>

readable.isPaused() 方法返回 Readable 当前的运行状态。这个方法主要用于支持 readable.pipe() 方法的机制。在大多数典型情况下,通常没有理由直接使用该方法。

【The readable.isPaused() method returns the current operating state of the Readable. This is used primarily by the mechanism that underlies the readable.pipe() method. In most typical cases, there will be no reason to use this method directly.】

const readable = new stream.Readable();

readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false 
readable.pause()#>

readable.pause() 方法会导致处于流动模式的流停止发出 'data' 事件,并退出流动模式。任何可用的数据将保留在内部缓冲区中。

【The readable.pause() method will cause a stream in flowing mode to stop emitting 'data' events, switching out of flowing mode. Any data that becomes available will remain in the internal buffer.】

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
}); 

如果存在 'readable' 事件监听器,readable.pause() 方法将不起作用。

【The readable.pause() method has no effect if there is a 'readable' event listener.】

readable.pipe(destination[, options])#>

readable.pipe() 方法将 Writable 流附加到 readable 上,使其自动切换到流动模式,并将其所有数据推送到附加的 Writable。数据的流动将被自动管理,以确保目标 Writable 流不会被速度更快的 Readable 流压垮。

【The readable.pipe() method attaches a Writable stream to the readable, causing it to switch automatically into flowing mode and push all of its data to the attached Writable. The flow of data will be automatically managed so that the destination Writable stream is not overwhelmed by a faster Readable stream.】

以下示例将所有来自 readable 的数据导入名为 file.txt 的文件:

【The following example pipes all of the data from the readable into a file named file.txt:】

const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'.
readable.pipe(writable); 

可以将多个 Writable 流附加到同一个 Readable 流。

【It is possible to attach multiple Writable streams to a single Readable stream.】

readable.pipe() 方法返回对 destination 流的引用,从而可以建立管道流链:

【The readable.pipe() method returns a reference to the destination stream making it possible to set up chains of piped streams:】

const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w); 

默认情况下,当源 Readable 流触发 'end' 时,stream.end() 会在目标 Writable 流上被调用,从而使目标流不再可写。要禁用此默认行为,可以将 end 选项传递为 false,使目标流保持打开状态:

【By default, stream.end() is called on the destination Writable stream when the source Readable stream emits 'end', so that the destination is no longer writable. To disable this default behavior, the end option can be passed as false, causing the destination stream to remain open:】

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
}); 

一个重要的注意事项是,如果 Readable 流在处理过程中发出错误,Writable 目的地 不会自动关闭。如果发生错误,需要 手动 关闭每个流,以防止内存泄漏。

【One important caveat is that if the Readable stream emits an error during processing, the Writable destination is not closed automatically. If an error occurs, it will be necessary to manually close each stream in order to prevent memory leaks.】

process.stderrprocess.stdoutWritable 流在 Node.js 进程退出之前从不关闭,无论指定了哪些选项。

【The process.stderr and process.stdout Writable streams are never closed until the Node.js process exits, regardless of the specified options.】

readable.read([size])#>

readable.read() 方法从内部缓冲区读取数据并返回。如果没有可读取的数据,则返回 null。默认情况下,数据会作为 Buffer 对象返回,除非通过 readable.setEncoding() 方法指定了编码,或者流正在以对象模式运行。

【The readable.read() method reads data out of the internal buffer and returns it. If no data is available to be read, null is returned. By default, the data is returned as a Buffer object unless an encoding has been specified using the readable.setEncoding() method or the stream is operating in object mode.】

可选的 size 参数指定要读取的字节数。如果没有足够的 size 字节可供读取,将返回 null,_除非_流已结束,在这种情况下,将返回内部缓冲区中剩余的所有数据。

【The optional size argument specifies a specific number of bytes to read. If size bytes are not available to be read, null will be returned unless the stream has ended, in which case all of the data remaining in the internal buffer will be returned.】

如果未指定 size 参数,将返回内部缓冲区中包含的所有数据。

【If the size argument is not specified, all of the data contained in the internal buffer will be returned.】

size 参数必须小于或等于 1 GiB。

【The size argument must be less than or equal to 1 GiB.】

readable.read() 方法应仅在处于暂停模式的 Readable 流上调用。在流动模式下,readable.read() 会自动调用,直到内部缓冲区完全清空。

【The readable.read() method should only be called on Readable streams operating in paused mode. In flowing mode, readable.read() is called automatically until the internal buffer is fully drained.】

const readable = getReadableStreamSomehow();

// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
  let chunk;
  console.log('Stream is readable (new data received in buffer)');
  // Use a loop to make sure we read all currently available data
  while (null !== (chunk = readable.read())) {
    console.log(`Read ${chunk.length} bytes of data...`);
  }
});

// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
  console.log('Reached end of stream.');
}); 

每次调用 readable.read() 都会返回一块数据或 null,表示此刻没有更多数据可供读取。这些数据块不会自动拼接。由于一次 read() 调用不会返回所有数据,因此可能需要使用 while 循环来持续读取数据块,直到获取所有数据。在读取大文件时,.read() 可能会暂时返回 null,表示它已消耗所有缓冲的内容,但可能还有更多数据尚未缓冲。在这种情况下,一旦缓冲区中有更多数据,就会触发一个新的 'readable' 事件,而 'end' 事件则表示数据传输结束。

【Each call to readable.read() returns a chunk of data or null, signifying that there's no more data to read at that moment. These chunks aren't automatically concatenated. Because a single read() call does not return all the data, using a while loop may be necessary to continuously read chunks until all data is retrieved. When reading a large file, .read() might return null temporarily, indicating that it has consumed all buffered content but there may be more data yet to be buffered. In such cases, a new 'readable' event is emitted once there's more data in the buffer, and the 'end' event signifies the end of data transmission.】

因此,要从 readable 中读取文件的全部内容,就有必要在多个 'readable' 事件中收集数据块:

【Therefore to read a file's whole contents from a readable, it is necessary to collect chunks across multiple 'readable' events:】

const chunks = [];

readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk);
  }
});

readable.on('end', () => {
  const content = chunks.join('');
}); 

Readable 流在对象模式下,无论 size 参数的值是多少,从调用 readable.read(size) 返回的总是单个项目。

【A Readable stream in object mode will always return a single item from a call to readable.read(size), regardless of the value of the size argument.】

如果 readable.read() 方法返回一段数据,也会触发 'data' 事件。

【If the readable.read() method returns a chunk of data, a 'data' event will also be emitted.】

'end' 事件被触发后调用 stream.read([size]) 将返回 null。不会引发运行时错误。

【Calling stream.read([size]) after the 'end' event has been emitted will return null. No runtime error will be raised.】

readable.readable#>

true 表示调用 readable.read() 是安全的,这意味着该流尚未被销毁或触发 'error''end' 事件。

【Is true if it is safe to call readable.read(), which means the stream has not been destroyed or emitted 'error' or 'end'.】

readable.readableAborted#>

返回流在发出 'end' 之前是否已被销毁或发生错误。

【Returns whether the stream was destroyed or errored before emitting 'end'.】

readable.readableDidRead#>

返回 'data' 是否已被触发。

【Returns whether 'data' has been emitted.】

readable.readableEncoding#>

用于获取给定 Readable 流的 encoding 属性的 getter。可以使用 readable.setEncoding() 方法设置 encoding 属性。

【Getter for the property encoding of a given Readable stream. The encoding property can be set using the readable.setEncoding() method.】

readable.readableEnded#>

'end' 事件被触发时变为 true

【Becomes true when 'end' event is emitted.】

readable.errored#>

如果流因错误而被销毁,则返回错误。

【Returns error if the stream has been destroyed with an error.】

readable.readableFlowing#>

该属性反映了 Readable 流的当前状态,如 三种状态 节所述。

【This property reflects the current state of a Readable stream as described in the Three states section.】

readable.readableHighWaterMark#>

返回在创建此 Readable 时传入的 highWaterMark 的值。

【Returns the value of highWaterMark passed when creating this Readable.】

readable.readableLength#>

此属性包含队列中准备读取的字节数(或对象数)。该值提供了关于 highWaterMark 状态的内部监控数据。

【This property contains the number of bytes (or objects) in the queue ready to be read. The value provides introspection data regarding the status of the highWaterMark.】

readable.readableObjectMode#>

获取给定 Readable 流的 objectMode 属性的值。

【Getter for the property objectMode of a given Readable stream.】

readable.resume()#>

readable.resume() 方法会使显式暂停的 Readable 流恢复触发 'data' 事件,将流切换到流动模式。

【The readable.resume() method causes an explicitly paused Readable stream to resume emitting 'data' events, switching the stream into flowing mode.】

readable.resume() 方法可以用来完全读取流中的数据,而无需实际处理这些数据:

【The readable.resume() method can be used to fully consume the data from a stream without actually processing any of that data:】

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  }); 

如果存在 'readable' 事件监听器,readable.resume() 方法将不起作用。

【The readable.resume() method has no effect if there is a 'readable' event listener.】

readable.setEncoding(encoding)#>

readable.setEncoding() 方法用于设置从 Readable 流读取的数据的字符编码。

【The readable.setEncoding() method sets the character encoding for data read from the Readable stream.】

默认情况下,不会指定任何编码,流数据将以 Buffer 对象的形式返回。设置编码会导致流数据以指定编码的字符串形式返回,而不是 Buffer 对象。例如,调用 readable.setEncoding('utf8') 会使输出数据被解释为 UTF-8 数据,并作为字符串传递。调用 readable.setEncoding('hex') 会使数据以十六进制字符串格式编码。

【By default, no encoding is assigned and stream data will be returned as Buffer objects. Setting an encoding causes the stream data to be returned as strings of the specified encoding rather than as Buffer objects. For instance, calling readable.setEncoding('utf8') will cause the output data to be interpreted as UTF-8 data, and passed as strings. Calling readable.setEncoding('hex') will cause the data to be encoded in hexadecimal string format.】

Readable 流将正确处理通过流传递的多字节字符,如果仅作为 Buffer 对象从流中获取,这些字符可能会被错误解码。

【The Readable stream will properly handle multi-byte characters delivered through the stream that would otherwise become improperly decoded if simply pulled from the stream as Buffer objects.】

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('Got %d characters of string data:', chunk.length);
}); 
readable.unpipe([destination])#>

readable.unpipe() 方法会分离之前使用 stream.pipe() 方法附加的 Writable 流。

【The readable.unpipe() method detaches a Writable stream previously attached using the stream.pipe() method.】

如果未指定 destination,则会分离所有管道。

【If the destination is not specified, then all pipes are detached.】

如果指定了 destination,但没有为其设置管道,则该方法不会执行任何操作。

【If the destination is specified, but no pipe is set up for it, then the method does nothing.】

const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt.');
  readable.unpipe(writable);
  console.log('Manually close the file stream.');
  writable.end();
}, 1000); 
readable.unshift(chunk[, encoding])#>

chunk 传递为 null 表示流的结束(EOF),其行为与 readable.push(null) 相同,此后无法再写入数据。EOF 信号会放在缓冲区的末尾,任何已缓冲的数据仍会被刷新。

【Passing chunk as null signals the end of the stream (EOF) and behaves the same as readable.push(null), after which no more data can be written. The EOF signal is put at the end of the buffer and any buffered data will still be flushed.】

readable.unshift() 方法将一块数据推回内部缓冲区。这在某些情况下非常有用,例如流正在被某些代码消费,而这些代码需要“取消消费”它乐观地从源中提取的一些数据,以便这些数据可以传递给其他方。

【The readable.unshift() method pushes a chunk of data back into the internal buffer. This is useful in certain situations where a stream is being consumed by code that needs to "un-consume" some amount of data that it has optimistically pulled out of the source, so that the data can be passed on to some other party.】

stream.unshift(chunk) 方法在 'end' 事件被触发后不能调用,否则会抛出运行时错误。

【The stream.unshift(chunk) method cannot be called after the 'end' event has been emitted or a runtime error will be thrown.】

经常使用 stream.unshift() 的开发者应考虑改用 Transform 流。有关更多信息,请参阅 流实现者的 API 部分。

【Developers using stream.unshift() often should consider switching to use of a Transform stream instead. See the API for stream implementers section for more information.】

// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  let header = '';
  function onReadable() {
    let chunk;
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk);
      if (str.includes('\n\n')) {
        // Found the header boundary.
        const split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // Remove the 'readable' listener before unshifting.
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // Now the body of the message can be read from the stream.
        callback(null, header, stream);
        return;
      }
      // Still reading the header.
      header += str;
    }
  }
} 

stream.push(chunk) 不同,stream.unshift(chunk) 不会通过重置流的内部读取状态来结束读取过程。如果在读取过程中(例如在自定义流的 stream._read() 实现中)调用 readable.unshift(),可能会导致意外结果。在调用 readable.unshift() 后立即进行 stream.push('') 会适当地重置读取状态,但最好在执行读取操作时避免调用 readable.unshift()

【Unlike stream.push(chunk), stream.unshift(chunk) will not end the reading process by resetting the internal reading state of the stream. This can cause unexpected results if readable.unshift() is called during a read (i.e. from within a stream._read() implementation on a custom stream). Following the call to readable.unshift() with an immediate stream.push('') will reset the reading state appropriately, however it is best to simply avoid calling readable.unshift() while in the process of performing a read.】

readable.wrap(stream)#>

在 Node.js 0.10 之前,流并没有实现当前定义的整个 node:stream 模块 API。(有关更多信息,请参见 兼容性。)

【Prior to Node.js 0.10, streams did not implement the entire node:stream module API as it is currently defined. (See Compatibility for more information.)】

在使用一个发出 'data' 事件并且具有仅供参考的 stream.pause() 方法的旧 Node.js 库时,可以使用 readable.wrap() 方法创建一个 Readable 流,该流以旧流作为其数据源。

【When using an older Node.js library that emits 'data' events and has a stream.pause() method that is advisory only, the readable.wrap() method can be used to create a Readable stream that uses the old stream as its data source.】

很少需要使用 readable.wrap(),但提供该方法是为了方便与旧版 Node.js 应用和库的交互。

【It will rarely be necessary to use readable.wrap() but the method has been provided as a convenience for interacting with older Node.js applications and libraries.】

const { OldReader } = require('./old-api-module.js');
const { Readable } = require('node:stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
}); 
readable[Symbol.asyncIterator]()#>
const fs = require('node:fs');

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const chunk of readable) {
    data += chunk;
  }
  console.log(data);
}

print(fs.createReadStream('file')).catch(console.error); 

如果循环通过 breakreturnthrow 终止,流将会被销毁。换句话说,对流进行迭代将会完全消耗该流。流会以大小等于 highWaterMark 选项的块读取。在上面的代码示例中,如果文件数据少于 64 KiB,由于没有为 fs.createReadStream() 提供 highWaterMark 选项,数据将会在一个单独的块中读取。

【If the loop terminates with a break, return, or a throw, the stream will be destroyed. In other terms, iterating over a stream will consume the stream fully. The stream will be read in chunks of size equal to the highWaterMark option. In the code example above, data will be in a single chunk if the file has less then 64 KiB of data because no highWaterMark option is provided to fs.createReadStream().】

readable[Symbol.asyncDispose]()#>

使用 AbortError 调用 readable.destroy(),并返回一个在流完成时兑现的 promise。

【Calls readable.destroy() with an AbortError and returns a promise that fulfills when the stream is finished.】

readable.compose(stream[, options])#>
import { Readable } from 'node:stream';

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ');

    for (const word of words) {
      yield word;
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();

console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator'] 

有关更多信息,请参见stream.compose

【See stream.compose for more information.】

readable.iterator([options])#>
  • options <Object>
    • destroyOnReturn <boolean> 当设置为 false 时,对异步迭代器调用 return,或在使用 for await...of 迭代时通过 breakreturnthrow 退出,不会销毁流。默认值: true
  • 返回:<AsyncIterator> 用于消费流。

此方法创建的迭代器允许用户在 for await...of 循环被 returnbreakthrow 退出时选择取消销毁流,或者如果在迭代过程中流发出了错误,则迭代器应销毁该流。

【The iterator created by this method gives users the option to cancel the destruction of the stream if the for await...of loop is exited by return, break, or throw, or if the iterator should destroy the stream if the stream emitted an error during iteration.】

const { Readable } = require('node:stream');

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk); // 1
    break;
  }

  console.log(readable.destroyed); // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk); // Will print 2 and then 3
  }

  console.log(readable.destroyed); // True, stream was totally consumed
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk); // 1
    break;
  }

  console.log(readable.destroyed); // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]));
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}

showBoth(); 
readable.map(fn[, options])#>

稳定性: 1 - 实验性

  • fn <Function> | <AsyncFunction> 是一个可以对流中的每个区块进行映射的函数。
    • data <any> 从流中获取一块数据。
    • options <Object>
      • signal <AbortSignal> 如果流被销毁则中止,从而允许提前中止 fn 调用。
  • options <Object>
    • concurrency <number>fn 在流上同时调用的最大并发次数。默认值: 1
    • highWaterMark <number> 在等待用户消费映射后的项时要缓冲的项目数量。默认值: concurrency * 2 - 1
    • signal <AbortSignal> 允许在信号被中止时销毁流。
  • 返回:<Readable> 一个使用函数 fn 映射的流。

此方法允许对流进行映射。fn 函数会对流中的每个块调用一次。如果 fn 函数返回一个 promise,该 promise 会在被传递到结果流之前被 await

【This method allows mapping over the stream. The fn function will be called for every chunk in the stream. If the fn function returns a promise - that promise will be awaited before being passed to the result stream.】

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
  console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
  console.log(result); // Logs the DNS result of resolver.resolve4.
} 
readable.filter(fn[, options])#>

稳定性: 1 - 实验性

  • fn <Function> | <AsyncFunction> 一个用于从流中过滤块的函数。
    • data <any> 从流中获取一块数据。
    • options <Object>
      • signal <AbortSignal> 如果流被销毁则中止,从而允许提前中止 fn 调用。
  • options <Object>
    • concurrency <number>fn 在流上同时调用的最大并发次数。默认值: 1
    • highWaterMark <number> 在等待用户消费过滤后的项目时要缓冲的项目数量。默认值: concurrency * 2 - 1
    • signal <AbortSignal> 允许在信号被中止时销毁流。
  • 返回:<Readable> 使用谓词 fn 过滤后的流。

此方法允许对流进行过滤。对于流中的每个数据块,将调用 fn 函数,如果它返回一个真值,该数据块将被传递到结果流中。如果 fn 函数返回一个 promise,那么该 promise 将被 await

【This method allows filtering the stream. For each chunk in the stream the fn function will be called and if it returns a truthy value, the chunk will be passed to the result stream. If the fn function returns a promise - that promise will be awaited.】

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).filter(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
  // Logs domains with more than 60 seconds on the resolved dns record.
  console.log(result);
} 
readable.forEach(fn[, options])#>

稳定性: 1 - 实验性

  • fn <Function> | <AsyncFunction> 是一个在流的每个数据块上调用的函数。
    • data <any> 从流中获取一块数据。
    • options <Object>
      • signal <AbortSignal> 如果流被销毁则中止,从而允许提前中止 fn 调用。
  • options <Object>
    • concurrency <number>fn 在流上同时调用的最大并发次数。默认值: 1
    • signal <AbortSignal> 允许在信号被中止时销毁流。
  • 返回:<Promise> 一个表示流已完成的承诺。

此方法允许迭代一个流。对于流中的每个块,将调用 fn 函数。如果 fn 函数返回一个 Promise,该 Promise 将被 await

【This method allows iterating a stream. For each chunk in the stream the fn function will be called. If the fn function returns a promise - that promise will be awaited.】

这种方法不同于 for await...of 循环,因为它可以选择并发处理数据块。此外,forEach 迭代只能通过传递 signal 选项并中止相关的 AbortController 来停止,而 for await...of 可以通过 breakreturn 来停止。在任何情况下,流都将被销毁。

【This method is different from for await...of loops in that it can optionally process chunks concurrently. In addition, a forEach iteration can only be stopped by having passed a signal option and aborting the related AbortController while for await...of can be stopped with break or return. In either case the stream will be destroyed.】

这种方法与监听 'data' 事件不同,它使用底层机制中的 readable 事件,并且可以限制并发 fn 调用的数量。

【This method is different from listening to the 'data' event in that it uses the readable event in the underlying machinery and can limit the number of concurrent fn calls.】

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
  // Logs result, similar to `for await (const result of dnsResults)`
  console.log(result);
});
console.log('done'); // Stream has finished 
readable.toArray([options])#>

稳定性: 1 - 实验性

  • options <Object>
    • signal <AbortSignal> 允许在信号被中止时取消 toArray 操作。
  • 返回:<Promise> 一个包含流内容数组的 promise。

此方法允许轻松获取流的内容。

【This method allows easily obtaining the contents of a stream.】

由于这种方法会将整个流读入内存,因此抵消了流的优势。它的目的是为了互操作性和方便性,而不是作为使用流的主要方式。

【As this method reads the entire stream into memory, it negates the benefits of streams. It's intended for interoperability and convenience, not as the primary way to consume streams.】

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

const resolver = new Resolver();

// Make dns queries concurrently using .map and collect
// the results into an array using toArray
const dnsResults = await Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 }).toArray(); 
readable.some(fn[, options])#>

稳定性: 1 - 实验性

  • fn <Function> | <AsyncFunction> 是一个在流的每个数据块上调用的函数。
    • data <any> 从流中获取一块数据。
    • options <Object>
      • signal <AbortSignal> 如果流被销毁则中止,从而允许提前中止 fn 调用。
  • options <Object>
    • concurrency <number>fn 在流上同时调用的最大并发次数。默认值: 1
    • signal <AbortSignal> 允许在信号被中止时销毁流。
  • 返回:<Promise> 一个 Promise,如果 fn 对至少一个块返回了真值,则该 Promise 的结果为 true

这种方法类似于 Array.prototype.some,会对流中的每个块调用 fn,直到等待的返回值为 true(或任何真值)。一旦对某个块调用 fn 的等待返回值为真,流就会被销毁,Promise 会以 true 作为结果完成。如果对所有块的 fn 调用都没有返回真值,Promise 会以 false 作为结果完成。

【This method is similar to Array.prototype.some and calls fn on each chunk in the stream until the awaited return value is true (or any truthy value). Once an fn call on a chunk awaited return value is truthy, the stream is destroyed and the promise is fulfilled with true. If none of the fn calls on the chunks return a truthy value, the promise is fulfilled with false.】

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false

// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
  'file1',
  'file2',
  'file3',
]).some(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished 
readable.find(fn[, options])#>

稳定性: 1 - 实验性

  • fn <Function> | <AsyncFunction> 是一个在流的每个数据块上调用的函数。
    • data <any> 从流中获取一块数据。
    • options <Object>
      • signal <AbortSignal> 如果流被销毁则中止,从而允许提前中止 fn 调用。
  • options <Object>
    • concurrency <number>fn 在流上同时调用的最大并发次数。默认值: 1
    • signal <AbortSignal> 允许在信号被中止时销毁流。
  • 返回:<Promise> 一个 promise,该 promise 会返回第一个经过 fn 评估为真值的块,如果未找到任何元素,则返回 undefined

此方法类似于 Array.prototype.find,会在流中的每个块上调用 fn,以查找 fn 返回真值的块。一旦某次 fn 调用的 await 返回值为真,流将被销毁,并且该 promise 会以 fn 返回真值的那个值被解决。如果所有块上的 fn 调用都返回假值,则该 promise 会以 undefined 被解决。

【This method is similar to Array.prototype.find and calls fn on each chunk in the stream to find a chunk with a truthy value for fn. Once an fn call's awaited return value is truthy, the stream is destroyed and the promise is fulfilled with value for which fn returned a truthy value. If all of the fn calls on the chunks return a falsy value, the promise is fulfilled with undefined.】

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined

// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
  'file1',
  'file2',
  'file3',
]).find(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished 
readable.every(fn[, options])#>

稳定性: 1 - 实验性

  • fn <Function> | <AsyncFunction> 是一个在流的每个数据块上调用的函数。
    • data <any> 从流中获取一块数据。
    • options <Object>
      • signal <AbortSignal> 如果流被销毁则中止,从而允许提前中止 fn 调用。
  • options <Object>
    • concurrency <number>fn 在流上同时调用的最大并发次数。默认值: 1
    • signal <AbortSignal> 允许在信号被中止时销毁流。
  • 返回值:<Promise> 一个 promise,当 fn 对所有块返回了真值时,该 promise 的结果为 true

该方法类似于 Array.prototype.every,会在流中的每个块上调用 fn,以检查所有等待的返回值是否为 fn 的真值。一旦对某个块的 fn 调用返回的等待值为假,流将被销毁,并且该 Promise 会以 false 作为结果。如果所有块上的 fn 调用都返回真值,则该 Promise 会以 true 作为结果。

【This method is similar to Array.prototype.every and calls fn on each chunk in the stream to check if all awaited return values are truthy value for fn. Once an fn call on a chunk awaited return value is falsy, the stream is destroyed and the promise is fulfilled with false. If all of the fn calls on the chunks return a truthy value, the promise is fulfilled with true.】

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true

// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
  'file1',
  'file2',
  'file3',
]).every(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished 
readable.flatMap(fn[, options])#>

稳定性: 1 - 实验性

此方法通过对流的每个块应用给定的回调函数,然后将结果展平成一个新的流,从而返回一个新流。

【This method returns a new stream by applying the given callback to each chunk of the stream and then flattening the result.】

可以从 fn 返回一个流或其他可迭代对象或异步可迭代对象,结果流将被合并(扁平化)到返回的流中。

【It is possible to return a stream or another iterable or async iterable from fn and the result streams will be merged (flattened) into the returned stream.】

import { Readable } from 'node:stream';
import { createReadStream } from 'node:fs';

// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
  console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
  './1.mjs',
  './2.mjs',
  './3.mjs',
  './4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
  // This will contain the contents (all chunks) of all 4 files
  console.log(result);
} 
readable.drop(limit[, options])#>

稳定性: 1 - 实验性

此方法返回一个新流,其中前 limit 个块已被删除。

【This method returns a new stream with the first limit chunks dropped.】

import { Readable } from 'node:stream';

await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4] 
readable.take(limit[, options])#>

稳定性: 1 - 实验性

该方法返回一个包含前 limit 个数据块的新流。

【This method returns a new stream with the first limit chunks.】

import { Readable } from 'node:stream';

await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2] 
readable.reduce(fn[, initial[, options]])#>

稳定性: 1 - 实验性

  • fn <Function> | <AsyncFunction> 是一个在流中的每个数据块上调用的归约函数。
    • previous <any> 上一次调用 fn 获得的值,如果指定了 initial 值则使用该值,否则使用流的第一个块。
    • data <any> 从流中获取一块数据。
    • options <Object>
      • signal <AbortSignal> 如果流被销毁则中止,从而允许提前中止 fn 调用。
  • initial <any> 用于归约的初始值。
  • options <Object>
  • 返回:<Promise> 一个用于归约最终值的 Promise。

此方法按顺序对流的每个块调用 fn,将前一个元素计算的结果传递给它。它返回一个表示归约最终值的 Promise。

【This method calls fn on each chunk of the stream in order, passing it the result from the calculation on the previous element. It returns a promise for the final value of the reduction.】

如果未提供 initial 值,则流的第一个数据块将用作初始值。如果流为空,Promise 将被拒绝,并返回一个 TypeError,其 code 属性为 ERR_INVALID_ARGS

【If no initial value is supplied the first chunk of the stream is used as the initial value. If the stream is empty, the promise is rejected with a TypeError with the ERR_INVALID_ARGS code property.】

import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';

const directoryPath = './src';
const filesInDir = await readdir(directoryPath);

const folderSize = await Readable.from(filesInDir)
  .reduce(async (totalSize, file) => {
    const { size } = await stat(join(directoryPath, file));
    return totalSize + size;
  }, 0);

console.log(folderSize); 

reducer 函数逐元素迭代流,这意味着没有 concurrency 参数或并行处理。要并发执行 reduce,你可以将异步函数提取到 readable.map 方法中。

【The reducer function iterates the stream element-by-element which means that there is no concurrency parameter or parallelism. To perform a reduce concurrently, you can extract the async function to readable.map method.】

import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';

const directoryPath = './src';
const filesInDir = await readdir(directoryPath);

const folderSize = await Readable.from(filesInDir)
  .map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0);

console.log(folderSize); 

双工和转换流#>

【Duplex and transform streams】

类:stream.Duplex#>

【Class: stream.Duplex

双工流是同时实现 ReadableWritable 接口的流。

【Duplex streams are streams that implement both the Readable and Writable interfaces.】

Duplex 流的例子包括:

【Examples of Duplex streams include:】

duplex.allowHalfOpen#>

如果为 false,当可读端结束时,可写端将自动结束。最初通过 allowHalfOpen 构造选项设置,默认值为 true

【If false then the stream will automatically end the writable side when the readable side ends. Set initially by the allowHalfOpen constructor option, which defaults to true.】

可以手动更改此设置以更改现有 Duplex 流实例的半开放行为,但必须在触发 'end' 事件之前进行更改。

【This can be changed manually to change the half-open behavior of an existing Duplex stream instance, but must be changed before the 'end' event is emitted.】

类:stream.Transform#>

【Class: stream.Transform

转换流是 Duplex 流,其中输出在某种程度上与输入相关。像所有 Duplex 流一样,Transform 流实现了 ReadableWritable 接口。

【Transform streams are Duplex streams where the output is in some way related to the input. Like all Duplex streams, Transform streams implement both the Readable and Writable interfaces.】

Transform 流的示例包括:

【Examples of Transform streams include:】

transform.destroy([error])#>

销毁流,并可选择触发 'error' 事件。调用此方法后,转换流将释放任何内部资源。实现者不应重写此方法,而应实现 readable._destroy()Transform_destroy() 默认实现也会触发 'close',除非 emitClose 设置为 false。

【Destroy the stream, and optionally emit an 'error' event. After this call, the transform stream would release any internal resources. Implementors should not override this method, but instead implement readable._destroy(). The default implementation of _destroy() for Transform also emit 'close' unless emitClose is set in false.】

一旦调用了 destroy(),任何后续调用都将不会有任何作用,并且除了 _destroy() 外,不会再触发 'error' 错误。

【Once destroy() has been called, any further calls will be a no-op and no further errors except from _destroy() may be emitted as 'error'.】

stream.duplexPair([options])#>
  • options <Object> 一个传递给两个 Duplex 构造函数的值,用于设置诸如缓冲等选项。
  • 返回:两个 Duplex 实例的 <Array>

实用函数 duplexPair 返回一个包含两个元素的数组,每个元素都是一个相互连接的 Duplex 流:

【The utility function duplexPair returns an Array with two items, each being a Duplex stream connected to the other side:】

const [ sideA, sideB ] = duplexPair(); 

写入一个流的内容可以在另一个流上读取。它提供了类似网络连接的行为,客户端写入的数据可以被服务器读取,反之亦然。

【Whatever is written to one stream is made readable on the other. It provides behavior analogous to a network connection, where the data written by the client becomes readable by the server, and vice-versa.】

双工流是对称的;使用其中任何一个都不会有行为上的差异。

【The Duplex streams are symmetrical; one or the other may be used without any difference in behavior.】

stream.finished(stream[, options], callback)#>

  • stream <Stream> | <ReadableStream> | <WritableStream> 一个可读和/或可写的流/网络流。
  • options <Object>
    • error <boolean> 如果设置为 false,则调用 emit('error', err) 不会被视为完成。默认值: true
    • readable <boolean> 当设置为 false 时,即使流仍然可读,也会在流结束时调用回调。默认值: true
    • writable <boolean> 当设置为 false 时,即使流仍然可写,也会在流结束时调用回调。默认值: true
    • signal <AbortSignal> 允许中止等待流完成。如果信号被中止,底层流不会被中止。回调函数会被调用,并带有 AbortError。所有通过此函数添加的已注册监听器也将被移除。
  • callback <Function> 一个可选接收错误参数的回调函数。
  • 返回:<Function> 一个清理函数,用于移除所有注册的监听器。

一个用于在流不再可读、不可写或发生错误或提前关闭事件时接收通知的函数。

【A function to get notified when a stream is no longer readable, writable or has experienced an error or a premature close event.】

const { finished } = require('node:stream');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
  if (err) {
    console.error('Stream failed.', err);
  } else {
    console.log('Stream is done reading.');
  }
});

rs.resume(); // Drain the stream. 

在错误处理场景中尤其有用,例如流被提前销毁(如中止的 HTTP 请求),并且不会触发 'end''finish'

【Especially useful in error handling scenarios where a stream is destroyed prematurely (like an aborted HTTP request), and will not emit 'end' or 'finish'.】

finished API 提供 承诺版本

【The finished API provides promise version.】

stream.finished() 在调用 callback 后会留下悬挂的事件监听器(特别是 'error''end''finish''close')。这样做的原因是为了防止由于流实现不正确而导致的意外 'error' 事件引起程序意外崩溃。如果不希望出现这种行为,则需要在回调中调用返回的清理函数:

const cleanup = finished(rs, (err) => {
  cleanup();
  // ...
}); 

stream.pipeline(source[, ...transforms], destination, callback)#>

stream.pipeline(streams, callback)#>

一个模块方法,用于在流和生成器之间进行传输,转发错误并正确清理,并在管道完成时提供回调。

【A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.】

const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  },
); 

pipeline API 提供了一个 承诺版本

【The pipeline API provides a promise version.】

stream.pipeline() 会对除以下情况外的所有流调用 stream.destroy(err)

  • 已经触发 'end''close' 事件的 Readable 流。
  • 已经触发 'finish''close' 事件的 Writable 流。

stream.pipeline() 在调用 callback 后会在流上留下悬空的事件监听器。在流在失败后被重复使用的情况下,这可能会导致事件监听器泄漏和错误被吞掉。如果最后一个流是可读的,悬空的事件监听器将会被移除,以便最后一个流可以在以后被使用。

stream.pipeline() 在发生错误时会关闭所有流。 将 IncomingRequestpipeline 一起使用可能会导致意外行为,因为它会在未发送预期响应的情况下销毁套接字。 请参阅下面的示例:

const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt');
  pipeline(fileStream, res, (err) => {
    if (err) {
      console.log(err); // No such file
      // this message can't be sent once `pipeline` already destroyed the socket
      return res.end('error!!!');
    }
  });
}); 

stream.compose(...streams)#>

稳定性: 1 - stream.compose 是实验性的。

将两个或多个流合并为一个 Duplex 流,该流向第一个流写入数据,并从最后一个流读取数据。每个提供的流都会使用 stream.pipeline 管道传输到下一个流。如果其中任何一个流发生错误,则所有流都会被销毁,包括外部的 Duplex 流。

【Combines two or more streams into a Duplex stream that writes to the first stream and reads from the last. Each provided stream is piped into the next, using stream.pipeline. If any of the streams error then all are destroyed, including the outer Duplex stream.】

因为 stream.compose 返回一个新的流,这个流可以(也应该)被连接到其他流中,从而实现组合。相比之下,当将流传递给 stream.pipeline 时,通常第一个流是可读流,最后一个流是可写流,形成一个封闭的回路。

【Because stream.compose returns a new stream that in turn can (and should) be piped into other streams, it enables composition. In contrast, when passing streams to stream.pipeline, typically the first stream is a readable stream and the last a writable stream, forming a closed circuit.】

如果传入的是 Function,它必须是一个接收 source Iterable 的工厂方法。

【If passed a Function it must be a factory method taking a source Iterable.】

import { compose, Transform } from 'node:stream';

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''));
  },
});

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf;
}

console.log(res); // prints 'HELLOWORLD' 

stream.compose 可以用来将异步可迭代对象、生成器和函数转换为流。

  • AsyncIterable 可以转换为可读的 Duplex。不能产生 null
  • AsyncGeneratorFunction 会转换为可读/可写的转换 Duplex。必须将源 AsyncIterable 作为第一个参数。不能生成 null
  • AsyncFunction 会转换为可写的 Duplex。必须返回 nullundefined
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
  yield 'Hello';
  yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
  for await (const chunk of source) {
    res += chunk;
  }
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD' 

有关将 stream.compose 用作运算符,请参见 readable.compose(stream)

【See readable.compose(stream) for stream.compose as operator.】

stream.isErrored(stream)#>

返回流是否遇到错误。

【Returns whether the stream has encountered an error.】

stream.isReadable(stream)#>

返回流是否可读。

【Returns whether the stream is readable.】

stream.isWritable(stream)#>

返回流是否可写。

【Returns whether the stream is writable.】

stream.Readable.from(iterable[, options])#>

  • iterable <Iterable> 实现了 Symbol.asyncIteratorSymbol.iterator 可迭代协议的对象。如果传入 null 值,会触发 'error' 事件。
  • options <Object> 提供给 new stream.Readable([options]) 的选项。 默认情况下,Readable.from() 会将 options.objectMode 设置为 true,除非通过将 options.objectMode 设置为 false 显式选择退出。
  • 返回: <stream.Readable>

用于从迭代器中创建可读流的实用方法。

【A utility method for creating readable streams out of iterators.】

const { Readable } = require('node:stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
}); 

调用 Readable.from(string)Readable.from(buffer) 不会对字符串或缓冲区进行迭代以匹配其他流的语义,这是出于性能考虑。

【Calling Readable.from(string) or Readable.from(buffer) will not have the strings or buffers be iterated to match the other streams semantics for performance reasons.】

如果将包含 Promise 的 Iterable 对象作为参数传入,可能会导致未处理的拒绝。

【If an Iterable object containing promises is passed as an argument, it might result in unhandled rejection.】

const { Readable } = require('node:stream');

Readable.from([
  new Promise((resolve) => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]); 

stream.Readable.fromWeb(readableStream[, options])#>

stream.Readable.isDisturbed(stream)#>

返回流是否已被读取或取消。

【Returns whether the stream has been read from or cancelled.】

stream.Readable.toWeb(streamReadable[, options])#>

  • streamReadable <stream.Readable>
  • options <Object>
    • strategy <Object>
      • highWaterMark <number> 在从指定的 stream.Readable 读取时,应用背压之前的最大内部队列大小(创建的 ReadableStream)。如果未提供值,将使用给定的 stream.Readable 的值。
      • size <Function> 一个返回给定数据块大小的函数。如果未提供值,所有数据块的大小将为 1
  • 返回:<ReadableStream>

stream.Writable.fromWeb(writableStream[, options])#>

stream.Writable.toWeb(streamWritable)#>

stream.Duplex.from(src)#>

用于创建双工流的实用方法。

【A utility method for creating duplex streams.】

  • Stream 将可写流转换为可写的 Duplex,将可读流转换为可读的 Duplex
  • Blob 转换为可读的 Duplex
  • string 转换为可读的 Duplex
  • ArrayBuffer 转换为可读的 Duplex
  • AsyncIterable 可以转换为可读的 Duplex。不能产生 null
  • AsyncGeneratorFunction 会转换为可读/可写的转换流 Duplex。必须将源 AsyncIterable 作为第一个参数。不能生成 null
  • AsyncFunction 会转换为可写的 Duplex。必须返回 nullundefined
  • Object ({ writable, readable })readablewritable 转换为 Stream,然后将它们组合成 Duplex,其中 Duplex 会写入 writable 并从 readable 读取。
  • Promise 会转换为可读的 Duplex。值 null 会被忽略。
  • ReadableStream 转换为可读的 Duplex
  • WritableStream 转换为可写的 Duplex
  • 返回:<stream.Duplex>

如果将包含 Promise 的 Iterable 对象作为参数传入,可能会导致未处理的拒绝。

【If an Iterable object containing promises is passed as an argument, it might result in unhandled rejection.】

const { Duplex } = require('node:stream');

Duplex.from([
  new Promise((resolve) => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]); 

stream.Duplex.fromWeb(pair[, options])#>

import { Duplex } from 'node:stream';
import {
  ReadableStream,
  WritableStream,
} from 'node:stream/web';

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world');
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk);
  },
});

const pair = {
  readable,
  writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });

duplex.write('hello');

for await (const chunk of duplex) {
  console.log('readable', chunk);
}const { Duplex } = require('node:stream');
const {
  ReadableStream,
  WritableStream,
} = require('node:stream/web');

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world');
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk);
  },
});

const pair = {
  readable,
  writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });

duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));

stream.Duplex.toWeb(streamDuplex)#>

import { Duplex } from 'node:stream';

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk);
    callback();
  },
});

const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');

const { value } = await readable.getReader().read();
console.log('readable', value);const { Duplex } = require('node:stream');

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk);
    callback();
  },
});

const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');

readable.getReader().read().then((result) => {
  console.log('readable', result.value);
});

stream.addAbortSignal(signal, stream)#>

将 AbortSignal 附加到可读或可写流。这允许代码使用 AbortController 控制流的销毁。

【Attaches an AbortSignal to a readable or writeable stream. This lets code control stream destruction using an AbortController.】

对与传入的 AbortSignal 对应的 AbortController 调用 abort,其行为与在流上调用 .destroy(new AbortError()) 相同,对于 Web 流则是 controller.error(new AbortError())

【Calling abort on the AbortController corresponding to the passed AbortSignal will behave the same way as calling .destroy(new AbortError()) on the stream, and controller.error(new AbortError()) for webstreams.】

const fs = require('node:fs');

const controller = new AbortController();
const read = addAbortSignal(
  controller.signal,
  fs.createReadStream(('object.json')),
);
// Later, abort the operation closing the stream
controller.abort(); 

或者将 AbortSignal 与可读流作为异步可迭代对象一起使用:

【Or using an AbortSignal with a readable stream as an async iterable:】

const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
  controller.signal,
  fs.createReadStream(('object.json')),
);
(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk);
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // The operation was cancelled
    } else {
      throw e;
    }
  }
})(); 

或者在可读流(ReadableStream)中使用 AbortSignal

【Or using an AbortSignal with a ReadableStream:】

const controller = new AbortController();
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello');
    controller.enqueue('world');
    controller.close();
  },
});

addAbortSignal(controller.signal, rs);

finished(rs, (err) => {
  if (err) {
    if (err.name === 'AbortError') {
      // The operation was cancelled
    }
  }
});

const reader = rs.getReader();

reader.read().then(({ value, done }) => {
  console.log(value); // hello
  console.log(done); // false
  controller.abort();
}); 

stream.getDefaultHighWaterMark(objectMode)#>

返回流使用的默认 highWaterMark。默认值为 65536(64 KiB),对于 objectMode 则为 16

【Returns the default highWaterMark used by streams. Defaults to 65536 (64 KiB), or 16 for objectMode.】

stream.setDefaultHighWaterMark(objectMode, value)#>

设置流使用的默认 highWaterMark。

【Sets the default highWaterMark used by streams.】

流实现者的 API#>

【API for stream implementers】

node:stream 模块 API 的设计目的是使得可以轻松地使用 JavaScript 的原型继承模型来实现流。

【The node:stream module API has been designed to make it possible to easily implement streams using JavaScript's prototypal inheritance model.】

首先,流开发者会声明一个新的 JavaScript 类,该类继承四个基本流类之一(stream.Writablestream.Readablestream.Duplexstream.Transform),并确保调用相应的父类构造函数:

【First, a stream developer would declare a new JavaScript class that extends one of the four basic stream classes (stream.Writable, stream.Readable, stream.Duplex, or stream.Transform), making sure they call the appropriate parent class constructor:】

const { Writable } = require('node:stream');

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark });
    // ...
  }
} 

在扩展流时,请记住用户可以并且应该在将这些选项传递给基础构造函数之前提供哪些选项。例如,如果实现对 autoDestroyemitClose 选项做出了假设,不要允许用户覆盖这些选项。要明确哪些选项会被转发,而不是隐式地转发所有选项。

【When extending streams, keep in mind what options the user can and should provide before forwarding these to the base constructor. For example, if the implementation makes assumptions in regard to the autoDestroy and emitClose options, do not allow the user to override these. Be explicit about what options are forwarded instead of implicitly forwarding all options.】

新的流类必须根据所创建的流类型实现一个或多个特定方法,如下表所示:

【The new stream class must then implement one or more specific methods, depending on the type of stream being created, as detailed in the chart below:】

用例需要实现的方法
仅读取Readable_read()
仅写入Writable_write(), _writev(), _final()
读写Duplex_read(), _write(), _writev(), _final()
操作已写入的数据,然后读取结果Transform_transform(), _flush(), _final()

流的实现代码绝不应调用供消费者使用的流的“公共”方法(如 流消费者的 API 节所述)。这样做可能会导致使用该流的应用代码出现不良副作用。

【The implementation code for a stream should never call the "public" methods of a stream that are intended for use by consumers (as described in the API for stream consumers section). Doing so may lead to adverse side effects in application code consuming the stream.】

避免重写公共方法,例如 write()end()cork()uncork()read()destroy(),或通过 .emit() 触发内部事件,例如 'error''data''end''finish''close'。这样做可能会破坏当前和未来的流不变量,导致与其他流、流工具以及用户预期之间的行为和/或兼容性问题。

【Avoid overriding public methods such as write(), end(), cork(), uncork(), read() and destroy(), or emitting internal events such as 'error', 'data', 'end', 'finish' and 'close' through .emit(). Doing so can break current and future stream invariants leading to behavior and/or compatibility issues with other streams, stream utilities, and user expectations.】

简化结构#>

【Simplified construction】

对于许多简单的情况,可以在不依赖继承的情况下创建流。这可以通过直接创建 stream.Writablestream.Readablestream.Duplexstream.Transform 对象的实例,并将适当的方法作为构造函数选项传递来实现。

【For many simple cases, it is possible to create a stream without relying on inheritance. This can be accomplished by directly creating instances of the stream.Writable, stream.Readable, stream.Duplex, or stream.Transform objects and passing appropriate methods as constructor options.】

const { Writable } = require('node:stream');

const myWritable = new Writable({
  construct(callback) {
    // Initialize state and load resources...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Free resources...
  },
}); 

实现可写流#>

【Implementing a writable stream】

stream.Writable 类被扩展以实现 Writable 流。

【The stream.Writable class is extended to implement a Writable stream.】

自定义 Writable 流必须调用 new stream.Writable([options]) 构造函数,并实现 writable._write() 和/或 writable._writev() 方法。

【Custom Writable streams must call the new stream.Writable([options]) constructor and implement the writable._write() and/or writable._writev() method.】

new stream.Writable([options])#>
const { Writable } = require('node:stream');

class MyWritable extends Writable {
  constructor(options) {
    // Calls the stream.Writable() constructor.
    super(options);
    // ...
  }
} 

或者,当使用 ES6 之前的样式构造函数时:

【Or, when using pre-ES6 style constructors:】

const { Writable } = require('node:stream');
const util = require('node:util');

function MyWritable(options) {
  if (!(this instanceof MyWritable))
    return new MyWritable(options);
  Writable.call(this, options);
}
util.inherits(MyWritable, Writable); 

或者,使用简化的构造函数方法:

【Or, using the simplified constructor approach:】

const { Writable } = require('node:stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
}); 

对传入的 AbortSignal 对应的 AbortController 调用 abort,其行为与在可写流上调用 .destroy(new AbortError()) 相同。

【Calling abort on the AbortController corresponding to the passed AbortSignal will behave the same way as calling .destroy(new AbortError()) on the writeable stream.】

const { Writable } = require('node:stream');

const controller = new AbortController();
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort(); 
writable._construct(callback)#>
  • callback <Function> 当流初始化完成后调用此函数(可选择传入一个错误参数)。

_construct() 方法不能被直接调用。它可以由子类实现,如果实现了,也只会被内部的 Writable 类方法调用。

【The _construct() method MUST NOT be called directly. It may be implemented by child classes, and if so, will be called by the internal Writable class methods only.】

在流构造函数返回之后,这个可选函数将在一个时间片中被调用,从而延迟任何 _write()_final()_destroy() 的调用,直到调用 callback。这对于在流可用之前初始化状态或异步初始化资源非常有用。

【This optional function will be called in a tick after the stream constructor has returned, delaying any _write(), _final() and _destroy() calls until callback is called. This is useful to initialize state or asynchronously initialize resources before the stream can be used.】

const { Writable } = require('node:stream');
const fs = require('node:fs');

class WriteStream extends Writable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fd = null;
  }
  _construct(callback) {
    fs.open(this.filename, 'w', (err, fd) => {
      if (err) {
        callback(err);
      } else {
        this.fd = fd;
        callback();
      }
    });
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback);
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, (er) => callback(er || err));
    } else {
      callback(err);
    }
  }
} 
writable._write(chunk, encoding, callback)#>
  • chunk <Buffer> | <string> | <any> 要写入的 Buffer,由传递给 stream.write()string 转换而来。如果流的 decodeStrings 选项为 false 或者流正以对象模式运行,该 chunk 将不会被转换,并且将保持为传递给 stream.write() 的原始内容。
  • encoding <string> 如果块是一个字符串,则 encoding 是该字符串的字符编码。如果块是一个 Buffer,或者流在对象模式下运行,则可能会忽略 encoding
  • callback <Function> 在处理完提供的块后,调用此函数(可选择传入错误参数)。

所有 Writable 流实现都必须提供 writable._write() 和/或 writable._writev() 方法来向底层资源发送数据。

【All Writable stream implementations must provide a writable._write() and/or writable._writev() method to send data to the underlying resource.】

Transform 流提供了它们自己的 writable._write() 实现。

此函数绝对不能被应用代码直接调用。它应由子类实现,并且仅由内部的 Writable 类方法调用。

【This function MUST NOT be called by application code directly. It should be implemented by child classes, and called by the internal Writable class methods only.】

callback 函数必须在 writable._write() 内同步调用,或者异步调用(即不同的事件循环周期),以表示写操作是成功完成还是出现了错误。如果调用失败,传递给 callback 的第一个参数必须是 Error 对象;如果写操作成功,则应为 null

【The callback function must be called synchronously inside of writable._write() or asynchronously (i.e. different tick) to signal either that the write completed successfully or failed with an error. The first argument passed to the callback must be the Error object if the call failed or null if the write succeeded.】

在调用 writable._write() 和回调 callback 之间发生的所有 writable.write() 调用都会导致写入的数据被缓冲。当调用 callback 时,流可能会触发一个 'drain' 事件。如果流的实现能够一次处理多个数据块,则应实现 writable._writev() 方法。

【All calls to writable.write() that occur between the time writable._write() is called and the callback is called will cause the written data to be buffered. When the callback is invoked, the stream might emit a 'drain' event. If a stream implementation is capable of processing multiple chunks of data at once, the writable._writev() method should be implemented.】

如果在构造函数选项中显式将 decodeStrings 属性设置为 false,那么 chunk 将保持传递给 .write() 的相同对象,并且可能是字符串而不是 Buffer。这是为了支持对某些字符串数据编码有优化处理的实现。在这种情况下,encoding 参数将指示字符串的字符编码。否则,可以安全地忽略 encoding 参数。

【If the decodeStrings property is explicitly set to false in the constructor options, then chunk will remain the same object that is passed to .write(), and may be a string rather than a Buffer. This is to support implementations that have an optimized handling for certain string data encodings. In that case, the encoding argument will indicate the character encoding of the string. Otherwise, the encoding argument can be safely ignored.】

writable._write() 方法前面有一个下划线,因为它是定义该方法的类的内部方法,用户程序不应直接调用它。

【The writable._write() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.】

writable._writev(chunks, callback)#>
  • chunks <Object[]> 要写入的数据。该值是一个 <Object> 数组,每个元素表示一个要写入的数据块。这些对象的属性如下:
    • chunk <Buffer> | <string> 一个缓冲区实例或包含待写入数据的字符串。如果 Writable 是在 decodeStrings 选项设置为 false 的情况下创建的,并且向 write() 传递了一个字符串,那么 chunk 将是一个字符串。
    • encoding <string> chunk 的字符编码。如果 chunk 是一个 Buffer,则 encoding 将为 'buffer'
  • callback <Function> 当处理完提供的块时,将调用的回调函数(可选择带有错误参数)。

此函数绝对不能被应用代码直接调用。它应由子类实现,并且仅由内部的 Writable 类方法调用。

【This function MUST NOT be called by application code directly. It should be implemented by child classes, and called by the internal Writable class methods only.】

writable._writev() 方法可以作为附加或替代 writable._write() 来实现,用于能够一次处理多个数据块的流实现。如果已经实现,并且存在来自之前写入的缓冲数据,则将调用 _writev() 而不是 _write()

【The writable._writev() method may be implemented in addition or alternatively to writable._write() in stream implementations that are capable of processing multiple chunks of data at once. If implemented and if there is buffered data from previous writes, _writev() will be called instead of _write().】

writable._writev() 方法前有一个下划线前缀,因为它是定义该方法的类的内部方法,用户程序不应直接调用它。

【The writable._writev() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.】

writable._destroy(err, callback)#>
  • err <Error> 可能的错误。
  • callback <Function> 一个可选接收错误参数的回调函数。

_destroy() 方法由 writable.destroy() 调用。
子类可以重写它,但绝对不能直接调用。

【The _destroy() method is called by writable.destroy(). It can be overridden by child classes but it must not be called directly.】

writable._final(callback)#>
  • callback <Function> 在写入剩余数据完成后调用此函数(可选择传入一个错误参数)。

_final() 方法 不能 被直接调用。它可以由子类实现,如果实现了,将只会被内部的 Writable 类方法调用。

【The _final() method must not be called directly. It may be implemented by child classes, and if so, will be called by the internal Writable class methods only.】

在流关闭之前,将调用此可选函数,并会延迟 'finish' 事件的触发,直到调用 callback。这对于在流结束前关闭资源或写入缓冲数据非常有用。

【This optional function will be called before the stream closes, delaying the 'finish' event until callback is called. This is useful to close resources or write buffered data before a stream ends.】

写入时出错#>

【Errors while writing】

在处理 writable._write()writable._writev()writable._final() 方法时发生的错误必须通过调用回调并将错误作为第一个参数传递来传播。从这些方法中抛出 Error 或手动触发 'error' 事件会导致未定义行为。

【Errors occurring during the processing of the writable._write(), writable._writev() and writable._final() methods must be propagated by invoking the callback and passing the error as the first argument. Throwing an Error from within these methods or manually emitting an 'error' event results in undefined behavior.】

如果一个 Readable 流被传输到一个 Writable 流,当 Writable 抛出错误时,Readable 流将会被解除管道连接。

【If a Readable stream pipes into a Writable stream when Writable emits an error, the Readable stream will be unpiped.】

const { Writable } = require('node:stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  },
}); 

可写流示例#>

【An example writable stream】

下面展示了一个相当简单(并且有些没有意义)的自定义 Writable 流实现。虽然这个特定的 Writable 流实例没有任何实际的特别用途,但该示例展示了自定义 Writable 流实例的每个必需元素:

【The following illustrates a rather simplistic (and somewhat pointless) custom Writable stream implementation. While this specific Writable stream instance is not of any real particular usefulness, the example illustrates each of the required elements of a custom Writable stream instance:】

const { Writable } = require('node:stream');

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
} 

可写流中的解码缓冲区#>

【Decoding buffers in a writable stream】

解码缓冲区是一项常见任务,例如,在使用输入为字符串的 transformer 时。当使用多字节字符编码(如 UTF-8)时,这不是一个简单的过程。以下示例演示如何使用 StringDecoderWritable 解码多字节字符串。

【Decoding buffers is a common task, for instance, when using transformers whose input is a string. This is not a trivial process when using multi-byte characters encoding, such as UTF-8. The following example shows how to decode multi-byte strings using StringDecoder and Writable.】

const { Writable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');

class StringWritable extends Writable {
  constructor(options) {
    super(options);
    this._decoder = new StringDecoder(options?.defaultEncoding);
    this.data = '';
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk);
    }
    this.data += chunk;
    callback();
  }
  _final(callback) {
    this.data += this._decoder.end();
    callback();
  }
}

const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();

w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);

console.log(w.data); // currency: € 

实现可读流#>

【Implementing a readable stream】

stream.Readable 类被扩展以实现 Readable 流。

【The stream.Readable class is extended to implement a Readable stream.】

自定义 Readable 流必须调用 new stream.Readable([options]) 构造函数并实现 readable._read() 方法。

【Custom Readable streams must call the new stream.Readable([options]) constructor and implement the readable._read() method.】

new stream.Readable([options])#>
  • options <Object>
    • highWaterMark <number> 在停止从底层资源读取之前,内部缓冲区可以存储的最大 字节数默认值: 65536(64 KiB),对于 objectMode 流为 16
    • encoding <string> 如果指定,则缓冲区将使用指定的编码解码为字符串。默认值: null
    • objectMode <boolean> 此流是否应表现为对象流。意思是 stream.read(n) 返回单个值而不是大小为 nBuffer默认值: false
    • emitClose <boolean> 流被销毁后是否应触发 'close' 事件。默认值: true
    • read <Function> 方法的 stream._read() 实现。
    • destroy <Function> 方法的 stream._destroy() 实现。
    • construct <Function> 方法的 stream._construct() 实现。
    • autoDestroy <boolean> 该流在结束后是否应自动调用 .destroy() 方法。默认值: true
    • signal <AbortSignal> 表示可能取消的信号。
const { Readable } = require('node:stream');

class MyReadable extends Readable {
  constructor(options) {
    // Calls the stream.Readable(options) constructor.
    super(options);
    // ...
  }
} 

或者,当使用 ES6 之前的样式构造函数时:

【Or, when using pre-ES6 style constructors:】

const { Readable } = require('node:stream');
const util = require('node:util');

function MyReadable(options) {
  if (!(this instanceof MyReadable))
    return new MyReadable(options);
  Readable.call(this, options);
}
util.inherits(MyReadable, Readable); 

或者,使用简化的构造函数方法:

【Or, using the simplified constructor approach:】

const { Readable } = require('node:stream');

const myReadable = new Readable({
  read(size) {
    // ...
  },
}); 

对传入的 AbortSignal 对应的 AbortController 调用 abort,其行为与对创建的可读流调用 .destroy(new AbortError()) 相同。

【Calling abort on the AbortController corresponding to the passed AbortSignal will behave the same way as calling .destroy(new AbortError()) on the readable created.】

const { Readable } = require('node:stream');
const controller = new AbortController();
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort(); 
readable._construct(callback)#>
  • callback <Function> 当流初始化完成后调用此函数(可选择传入一个错误参数)。

_construct() 方法不能被直接调用。它可以由子类实现,如果实现了,也只会被内部的 Readable 类方法调用。

【The _construct() method MUST NOT be called directly. It may be implemented by child classes, and if so, will be called by the internal Readable class methods only.】

此可选函数将由流构造函数在下一个事件循环时调度,延迟任何 _read()_destroy() 调用,直到 callback 被调用。这对于在流可以使用之前初始化状态或异步初始化资源非常有用。

【This optional function will be scheduled in the next tick by the stream constructor, delaying any _read() and _destroy() calls until callback is called. This is useful to initialize state or asynchronously initialize resources before the stream can be used.】

const { Readable } = require('node:stream');
const fs = require('node:fs');

class ReadStream extends Readable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fd = null;
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err);
      } else {
        this.fd = fd;
        callback();
      }
    });
  }
  _read(n) {
    const buf = Buffer.alloc(n);
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err);
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
      }
    });
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, (er) => callback(er || err));
    } else {
      callback(err);
    }
  }
} 
readable._read(size)#>
  • size <number> 异步读取的字节数

此函数绝对不能被应用代码直接调用。它应由子类实现,并且仅由内部的 Readable 类方法调用。

【This function MUST NOT be called by application code directly. It should be implemented by child classes, and called by the internal Readable class methods only.】

所有 Readable 流的实现都必须提供 readable._read() 方法的实现,以从底层资源获取数据。

【All Readable stream implementations must provide an implementation of the readable._read() method to fetch data from the underlying resource.】

当调用 readable._read() 时,如果资源中有数据可用,应该开始使用 this.push(dataChunk) 方法将数据推入读取队列。每次在流准备好接收更多数据后调用 this.push(dataChunk) 后,都会再次调用 _read()_read() 可能会继续从资源中读取并推送数据,直到 readable.push() 返回 false。只有在 _read() 停止后再次被调用时,它才应该恢复将额外数据推入队列。

【When readable._read() is called, if data is available from the resource, the implementation should begin pushing that data into the read queue using the this.push(dataChunk) method. _read() will be called again after each call to this.push(dataChunk) once the stream is ready to accept more data. _read() may continue reading from the resource and pushing data until readable.push() returns false. Only when _read() is called again after it has stopped should it resume pushing additional data into the queue.】

一旦调用了 readable._read() 方法,在通过 readable.push() 方法推送更多数据之前,它不会再次被调用。空数据,如空缓冲区和空字符串,不会导致调用 readable._read() 方法。

【Once the readable._read() method has been called, it will not be called again until more data is pushed through the readable.push() method. Empty data such as empty buffers and strings will not cause readable._read() to be called.】

size 参数仅供参考。在某些实现中,‘读取’ 是一次返回数据的单操作,可以使用 size 参数来确定获取多少数据。其他实现可能会忽略此参数,只要数据可用就提供数据。调用 stream.push(chunk) 时,无需等待 size 字节的数据全部可用。

【The size argument is advisory. For implementations where a "read" is a single operation that returns data can use the size argument to determine how much data to fetch. Other implementations may ignore this argument and simply provide data whenever it becomes available. There is no need to "wait" until size bytes are available before calling stream.push(chunk).】

readable._read() 方法前面加下划线是因为它是定义它的类的内部方法,不应由用户程序直接调用。

【The readable._read() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.】

readable._destroy(err, callback)#>
  • err <Error> 可能的错误。
  • callback <Function> 一个可选接收错误参数的回调函数。

_destroy() 方法由 readable.destroy() 调用。
子类可以重写它,但绝对不能直接调用。

【The _destroy() method is called by readable.destroy(). It can be overridden by child classes but it must not be called directly.】

readable.push(chunk[, encoding])#>

chunk<Buffer><TypedArray><DataView><string> 时,该数据 chunk 将被添加到内部队列中供流的用户使用。将 chunk 传递为 null 表示流的结束 (EOF),此后无法再写入更多数据。

【When chunk is a <Buffer>, <TypedArray>, <DataView> or <string>, the chunk of data will be added to the internal queue for users of the stream to consume. Passing chunk as null signals the end of the stream (EOF), after which no more data can be written.】

Readable 处于暂停模式时,通过 readable.push() 添加的数据可以在 'readable' 事件触发时,通过调用 readable.read() 方法读取出来。

【When the Readable is operating in paused mode, the data added with readable.push() can be read out by calling the readable.read() method when the 'readable' event is emitted.】

Readable 在流动模式下运行时,通过 readable.push() 添加的数据将通过触发 'data' 事件来传递。

【When the Readable is operating in flowing mode, the data added with readable.push() will be delivered by emitting a 'data' event.】

readable.push() 方法的设计尽可能灵活。例如,在封装提供某种暂停/恢复机制和数据回调的低级源时,可以使用自定义 Readable 实例来封装低级源:

【The readable.push() method is designed to be as flexible as possible. For example, when wrapping a lower-level source that provides some form of pause/resume mechanism, and a data callback, the low-level source can be wrapped by the custom Readable instance:】

// `_source` is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowLevelSourceObject();

    // Every time there's data, push it into the internal buffer.
    this._source.ondata = (chunk) => {
      // If push() returns false, then stop reading from source.
      if (!this.push(chunk))
        this._source.readStop();
    };

    // When the source ends, push the EOF-signaling `null` chunk.
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read() will be called when the stream wants to pull more data in.
  // The advisory size argument is ignored in this case.
  _read(size) {
    this._source.readStart();
  }
} 

readable.push() 方法用于将内容推入内部缓冲区。它可以由 readable._read() 方法驱动。

【The readable.push() method is used to push the content into the internal buffer. It can be driven by the readable._read() method.】

对于不在对象模式下运行的流,如果 readable.push()chunk 参数为 undefined,它将被视为空字符串或缓冲区。更多信息请参见 readable.push('')

【For streams not operating in object mode, if the chunk parameter of readable.push() is undefined, it will be treated as empty string or buffer. See readable.push('') for more information.】

读取时出错#>

【Errors while reading】

在处理 readable._read() 时发生的错误必须通过 readable.destroy(err) 方法传播。从 readable._read() 内部抛出 Error 或手动触发 'error' 事件会导致未定义行为。

【Errors occurring during processing of the readable._read() must be propagated through the readable.destroy(err) method. Throwing an Error from within readable._read() or manually emitting an 'error' event results in undefined behavior.】

const { Readable } = require('node:stream');

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition();
    if (err) {
      this.destroy(err);
    } else {
      // Do some work.
    }
  },
}); 

计数流示例#>

【An example counting stream】

以下是一个 Readable 流的基本示例,它会按顺序发出从 1 到 1,000,000 的数字,然后结束。

【The following is a basic example of a Readable stream that emits the numerals from 1 to 1,000,000 in ascending order, and then ends.】

const { Readable } = require('node:stream');

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      const str = String(i);
      const buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
} 

实现双工流#>

【Implementing a duplex stream】

一个 Duplex 流是同时实现了 ReadableWritable 的流,例如 TCP 套接字连接。

【A Duplex stream is one that implements both Readable and Writable, such as a TCP socket connection.】

由于 JavaScript 不支持多重继承,stream.Duplex 类被扩展以实现 Duplex 流(而不是同时扩展 stream.Readablestream.Writable 类)。

【Because JavaScript does not have support for multiple inheritance, the stream.Duplex class is extended to implement a Duplex stream (as opposed to extending the stream.Readable and stream.Writable classes).】

stream.Duplex 类从原型上继承自 stream.Readable,并以寄生方式继承自 stream.Writable,但由于在 stream.Writable 上重写了 Symbol.hasInstanceinstanceof 对两个基类都能正常工作。

【The stream.Duplex class prototypically inherits from stream.Readable and parasitically from stream.Writable, but instanceof will work properly for both base classes due to overriding Symbol.hasInstance on stream.Writable.】

自定义 Duplex 流必须调用 new stream.Duplex([options]) 构造函数,并实现 readable._read()writable._write() 方法。

【Custom Duplex streams must call the new stream.Duplex([options]) constructor and implement both the readable._read() and writable._write() methods.】

new stream.Duplex(options)#>
  • options <Object> 同时传递给 WritableReadable 构造函数。还具有以下字段:
    • allowHalfOpen <boolean> 如果设置为 false,当可读端结束时,流将自动结束可写端。 默认值: true
    • readable <boolean> 设置 Duplex 是否可读。 默认值: true
    • writable <boolean> 设置 Duplex 是否可写。 默认值: true
    • readableObjectMode <boolean> 设置流可读端的 objectMode。如果 objectModetrue,则无效。默认值: false
    • writableObjectMode <boolean> 设置流可写端的 objectMode。如果 objectModetrue,则无效。默认值: false
    • readableHighWaterMark <number> 设置流可读端的 highWaterMark。如果提供了 highWaterMark,则此设置无效。
    • writableHighWaterMark <number> 设置流可写端的 highWaterMark。如果提供了 highWaterMark,则此设置无效。
const { Duplex } = require('node:stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
} 

或者,当使用 ES6 之前的样式构造函数时:

【Or, when using pre-ES6 style constructors:】

const { Duplex } = require('node:stream');
const util = require('node:util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex); 

或者,使用简化的构造函数方法:

【Or, using the simplified constructor approach:】

const { Duplex } = require('node:stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
}); 

使用管道时:

【When using pipeline:】

const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');

pipeline(
  fs.createReadStream('object.json')
    .setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Accept string input rather than Buffers
    construct(callback) {
      this.data = '';
      callback();
    },
    transform(chunk, encoding, callback) {
      this.data += chunk;
      callback();
    },
    flush(callback) {
      try {
        // Make sure is valid json.
        JSON.parse(this.data);
        this.push(this.data);
        callback();
      } catch (err) {
        callback(err);
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('completed');
    }
  },
); 

双工流示例#>

【An example duplex stream】

以下示例说明了一个简单的 Duplex 流,它封装了一个假设的底层源对象,数据可以写入该对象,也可以从中读取数据,尽管使用的 API 与 Node.js 流不兼容。 以下示例说明了一个简单的 Duplex 流,它通过 Writable 接口缓冲写入的传入数据,然后通过 Readable 接口读取出来。

【The following illustrates a simple example of a Duplex stream that wraps a hypothetical lower-level source object to which data can be written, and from which data can be read, albeit using an API that is not compatible with Node.js streams. The following illustrates a simple example of a Duplex stream that buffers incoming written data via the Writable interface that is read back out via the Readable interface.】

const { Duplex } = require('node:stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // The underlying source only deals with strings.
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
} 

Duplex 流最重要的方面是,尽管 ReadableWritable 两个部分共存于同一个对象实例中,但它们仍能彼此独立地操作。

【The most important aspect of a Duplex stream is that the Readable and Writable sides operate independently of one another despite co-existing within a single object instance.】

对象模式双工流#>

【Object mode duplex streams】

对于 Duplex 流,可以使用 readableObjectModewritableObjectMode 选项分别仅为 ReadableWritable 端设置 objectMode

【For Duplex streams, objectMode can be set exclusively for either the Readable or Writable side using the readableObjectMode and writableObjectMode options respectively.】

例如,在下面的示例中,创建了一个新的 Transform 流(它是一种 Duplex 流),它的对象模式 Writable 端可以接受 JavaScript 数字,这些数字会在 Readable 端被转换为十六进制字符串。

【In the following example, for instance, a new Transform stream (which is a type of Duplex stream) is created that has an object mode Writable side that accepts JavaScript numbers that are converted to hexadecimal strings on the Readable side.】

const { Transform } = require('node:stream');

// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Coerce the chunk to a number if necessary.
    chunk |= 0;

    // Transform the chunk into something else.
    const data = chunk.toString(16);

    // Push the data onto the readable queue.
    callback(null, '0'.repeat(data.length % 2) + data);
  },
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64 

实现转换流#>

【Implementing a transform stream】

Transform 流是一种 Duplex 流,其中输出是通过某种方式从输入计算得出的。示例包括 zlib 流或 加密货币 流,它们用于压缩、加密或解密数据。

【A Transform stream is a Duplex stream where the output is computed in some way from the input. Examples include zlib streams or crypto streams that compress, encrypt, or decrypt data.】

输出不要求与输入具有相同的大小、相同数量的块,也不要求同时到达。例如,Hash 流只会有一个输出块,该块在输入结束时提供。zlib 流将生成比输入小得多或大得多的输出。

【There is no requirement that the output be the same size as the input, the same number of chunks, or arrive at the same time. For example, a Hash stream will only ever have a single chunk of output which is provided when the input is ended. A zlib stream will produce output that is either much smaller or much larger than its input.】

stream.Transform 类被扩展以实现 Transform 流。

【The stream.Transform class is extended to implement a Transform stream.】

stream.Transform 类原型上继承自 stream.Duplex,并且实现了自己的 writable._write()readable._read() 方法。自定义 Transform 实现必须实现 transform._transform() 方法,并且可以实现 transform._flush() 方法。

【The stream.Transform class prototypically inherits from stream.Duplex and implements its own versions of the writable._write() and readable._read() methods. Custom Transform implementations must implement the transform._transform() method and may also implement the transform._flush() method.】

在使用 Transform 流时必须小心,因为写入流的数据如果未被 Readable 端消费,可能导致流的 Writable 端被暂停。

【Care must be taken when using Transform streams in that data written to the stream can cause the Writable side of the stream to become paused if the output on the Readable side is not consumed.】

new stream.Transform([options])#>
const { Transform } = require('node:stream');

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // ...
  }
} 

或者,当使用 ES6 之前的样式构造函数时:

【Or, when using pre-ES6 style constructors:】

const { Transform } = require('node:stream');
const util = require('node:util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform); 

或者,使用简化的构造函数方法:

【Or, using the simplified constructor approach:】

const { Transform } = require('node:stream');

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
}); 

事件:'end'#>

【Event: 'end'

'end' 事件来自 stream.Readable 类。当所有数据都输出完毕时,会触发 'end' 事件,这发生在 transform._flush() 中的回调被调用之后。如果发生错误,'end' 不应被触发。

【The 'end' event is from the stream.Readable class. The 'end' event is emitted after all data has been output, which occurs after the callback in transform._flush() has been called. In the case of an error, 'end' should not be emitted.】

事件:'finish'#>

【Event: 'finish'

'finish' 事件来自 stream.Writable 类。调用 stream.end() 后并且所有数据块都已被 stream._transform() 处理,才会触发 'finish' 事件。如果发生错误,则不应触发 'finish'

【The 'finish' event is from the stream.Writable class. The 'finish' event is emitted after stream.end() is called and all chunks have been processed by stream._transform(). In the case of an error, 'finish' should not be emitted.】

transform._flush(callback)#>
  • callback <Function> 当剩余数据已被刷新时要调用的回调函数(可选包含一个错误参数和数据)。

此函数绝对不能被应用代码直接调用。它应由子类实现,并且仅由内部的 Readable 类方法调用。

【This function MUST NOT be called by application code directly. It should be implemented by child classes, and called by the internal Readable class methods only.】

在某些情况下,变换操作可能需要在流的末尾输出额外的数据位。例如,zlib 压缩流会存储一定量的内部状态,以便对输出进行最佳压缩。然而,当流结束时,需要将这些额外的数据刷新,以确保压缩数据完整。

【In some cases, a transform operation may need to emit an additional bit of data at the end of the stream. For example, a zlib compression stream will store an amount of internal state used to optimally compress the output. When the stream ends, however, that additional data needs to be flushed so that the compressed data will be complete.】

自定义 Transform 实现 可能 会实现 transform._flush() 方法。当没有更多写入的数据需要被处理时,该方法将被调用,但在发出 'end' 事件以表示 Readable 流结束之前。

【Custom Transform implementations may implement the transform._flush() method. This will be called when there is no more written data to be consumed, but before the 'end' event is emitted signaling the end of the Readable stream.】

transform._flush() 的实现中,transform.push() 方法可以根据需要调用零次或多次。当刷新操作完成时,必须调用 callback 函数。

【Within the transform._flush() implementation, the transform.push() method may be called zero or more times, as appropriate. The callback function must be called when the flush operation is complete.】

transform._flush() 方法以下划线开头,因为它是定义该方法的类的内部方法,用户程序不应直接调用它。

【The transform._flush() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.】

transform._transform(chunk, encoding, callback)#>
  • chunk <Buffer> | <string> | <any> 要转换的 Buffer,由传递给 stream.write()string 转换而来。如果流的 decodeStrings 选项为 false 或流在对象模式下运行,则该 chunk 不会被转换,并且将保持为传递给 stream.write() 的内容。
  • encoding <string> 如果块是一个字符串,那么这是编码类型。如果块是一个缓冲区,那么这是特殊值 'buffer'。在这种情况下忽略它。
  • callback <Function> 一个回调函数(可选带有错误参数和数据),在提供的 chunk 处理完成后被调用。

此函数绝对不能被应用代码直接调用。它应由子类实现,并且仅由内部的 Readable 类方法调用。

【This function MUST NOT be called by application code directly. It should be implemented by child classes, and called by the internal Readable class methods only.】

所有 Transform 流实现都必须提供一个 _transform() 方法来接收输入并生成输出。transform._transform() 的实现处理写入的字节,计算输出,然后使用 transform.push() 方法将该输出传递给可读部分。

【All Transform stream implementations must provide a _transform() method to accept input and produce output. The transform._transform() implementation handles the bytes being written, computes an output, then passes that output off to the readable portion using the transform.push() method.】

transform.push() 方法可以被调用零次或多次,以从单个输入块生成输出,这取决于该块将产生多少输出。

【The transform.push() method may be called zero or more times to generate output from a single input chunk, depending on how much is to be output as a result of the chunk.】

任何给定的输入数据块都可能不会生成任何输出。

【It is possible that no output is generated from any given chunk of input data.】

callback 函数必须仅在当前数据块被完全消费后才调用。如果在处理输入时发生错误,传递给 callback 的第一个参数必须是一个 Error 对象,否则应为 null。如果将第二个参数传递给 callback,它将被转发到 transform.push() 方法,但只有在第一个参数为假值时才会转发。换句话说,以下情况是等效的:

【The callback function must be called only when the current chunk is completely consumed. The first argument passed to the callback must be an Error object if an error occurred while processing the input or null otherwise. If a second argument is passed to the callback, it will be forwarded on to the transform.push() method, but only if the first argument is falsy. In other words, the following are equivalent:】

transform.prototype._transform = function(data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function(data, encoding, callback) {
  callback(null, data);
}; 

transform._transform() 方法以下划线开头是因为它是定义该方法的类的内部方法,用户程序绝不应直接调用它。

【The transform._transform() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.】

transform._transform() 从不并行调用;流实现了一个队列机制,要接收下一个数据块,必须调用 callback,可以是同步调用也可以是异步调用。

类:stream.PassThrough#>

【Class: stream.PassThrough

stream.PassThrough 类是 Transform 流的一个简单实现,它只是将输入字节直接传递到输出。它的主要用途是用于示例和测试,但在某些情况下,stream.PassThrough 作为构建新型流的基础还是很有用的。

【The stream.PassThrough class is a trivial implementation of a Transform stream that simply passes the input bytes across to the output. Its purpose is primarily for examples and testing, but there are some use cases where stream.PassThrough is useful as a building block for novel sorts of streams.】

补充注意#>

【Additional notes】

流与异步生成器和异步迭代器的兼容性#>

【Streams compatibility with async generators and async iterators】

随着 JavaScript 中对异步生成器和迭代器的支持,异步生成器现在实际上已经成为一种一流的语言级流构造。

【With the support of async generators and iterators in JavaScript, async generators are effectively a first-class language-level stream construct at this point.】

下面提供了一些使用 Node.js 流与异步生成器和异步迭代器的常见互操作情况。

【Some common interop cases of using Node.js streams with async generators and async iterators are provided below.】

使用异步迭代器使用可读流#>

【Consuming readable streams with async iterators】

(async function() {
  for await (const chunk of readable) {
    console.log(chunk);
  }
})(); 

异步迭代器会在流上注册一个永久错误处理程序,以防止任何未处理的销毁后错误。

【Async iterators register a permanent error handler on the stream to prevent any unhandled post-destroy errors.】

使用异步生成器创建可读流#>

【Creating readable streams with async generators】

可以使用 Readable.from() 工具方法从异步生成器创建一个 Node.js 可读流:

【A Node.js readable stream can be created from an asynchronous generator using the Readable.from() utility method:】

const { Readable } = require('node:stream');

const ac = new AbortController();
const signal = ac.signal;

async function * generate() {
  yield 'a';
  await someLongRunningFn({ signal });
  yield 'b';
  yield 'c';
}

const readable = Readable.from(generate());
readable.on('close', () => {
  ac.abort();
});

readable.on('data', (chunk) => {
  console.log(chunk);
}); 

从异步迭代器管道传输到可写流#>

【Piping to writable streams from async iterators】

当从异步迭代器写入可写流时,确保正确处理背压和错误。stream.pipeline() 抽象了背压及与背压相关的错误的处理:

【When writing to a writable stream from an async iterator, ensure correct handling of backpressure and errors. stream.pipeline() abstracts away the handling of backpressure and backpressure-related errors:】

const fs = require('node:fs');
const { pipeline } = require('node:stream');
const { pipeline: pipelinePromise } = require('node:stream/promises');

const writable = fs.createWriteStream('./file');

const ac = new AbortController();
const signal = ac.signal;

const iterator = createIterator({ signal });

// Callback Pattern
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err);
  } else {
    console.log(value, 'value returned');
  }
}).on('close', () => {
  ac.abort();
});

// Promise Pattern
pipelinePromise(iterator, writable)
  .then((value) => {
    console.log(value, 'value returned');
  })
  .catch((err) => {
    console.error(err);
    ac.abort();
  }); 

与旧 Node.js 版本的兼容性#>

【Compatibility with older Node.js versions】

在 Node.js 0.10 之前,Readable 流接口更简单,但功能也更弱,实用性也较低。

【Prior to Node.js 0.10, the Readable stream interface was simpler, but also less powerful and less useful.】

  • 与其等待对 stream.read() 方法的调用,'data' 事件将立即开始发出。需要进行一定工作以决定如何处理数据的应用必须将读取的数据存储到缓冲区,以免数据丢失。
  • stream.pause() 方法是建议性的,而不是保证性的。这意味着即使在流处于暂停状态时,也仍然需要准备好接收 'data' 事件。

在 Node.js 0.10 中,添加了 Readable 类。为了与旧的 Node.js 程序向后兼容,当添加 'data' 事件处理程序或调用 stream.resume() 方法时,Readable 流会切换到“流动模式”。其效果是,即使不使用新的 stream.read() 方法和 'readable' 事件,也无需再担心丢失 'data' 块。

【In Node.js 0.10, the Readable class was added. For backward compatibility with older Node.js programs, Readable streams switch into "flowing mode" when a 'data' event handler is added, or when the stream.resume() method is called. The effect is that, even when not using the new stream.read() method and 'readable' event, it is no longer necessary to worry about losing 'data' chunks.】

虽然大多数应用将继续正常运行,但在以下条件下,这会引入一个极端情况:

【While most applications will continue to function normally, this introduces an edge case in the following conditions:】

  • 未添加 'data' 事件监听器。
  • stream.resume() 方法从未被调用。
  • 该流未被导向任何可写目标。

例如,考虑以下代码:

【For example, consider the following code:】

// WARNING!  BROKEN!
net.createServer((socket) => {

  // We add an 'end' listener, but never consume the data.
  socket.on('end', () => {
    // It will never get here.
    socket.end('The message was received but was not processed.\n');
  });

}).listen(1337); 

在 Node.js 0.10 之前,传入的消息数据会被简单地丢弃。然而,在 Node.js 0.10 及更高版本中,套接字会永远保持暂停状态。

【Prior to Node.js 0.10, the incoming message data would be simply discarded. However, in Node.js 0.10 and beyond, the socket remains paused forever.】

在这种情况下的解决方法是调用 stream.resume() 方法来开始数据的流动:

【The workaround in this situation is to call the stream.resume() method to begin the flow of data:】

// Workaround.
net.createServer((socket) => {
  socket.on('end', () => {
    socket.end('The message was received but was not processed.\n');
  });

  // Start the flow of data, discarding it.
  socket.resume();
}).listen(1337); 

除了新的 Readable 流切换到流动模式外,0.10 之前风格的流也可以使用 readable.wrap() 方法封装到 Readable 类中。

【In addition to new Readable streams switching into flowing mode, pre-0.10 style streams can be wrapped in a Readable class using the readable.wrap() method.】

readable.read(0)#>

在某些情况下,有必要触发底层可读流机制的刷新,而实际上并不消耗任何数据。在这种情况下,可以调用 readable.read(0),它将始终返回 null

【There are some cases where it is necessary to trigger a refresh of the underlying readable stream mechanisms, without actually consuming any data. In such cases, it is possible to call readable.read(0), which will always return null.】

如果内部读取缓冲区低于 highWaterMark,并且流当前没有在读取,那么调用 stream.read(0) 将会触发一次低级别的 stream._read() 调用。

【If the internal read buffer is below the highWaterMark, and the stream is not currently reading, then calling stream.read(0) will trigger a low-level stream._read() call.】

虽然大多数应用几乎不需要这样做,但在 Node.js 中有些情况下会这样做,特别是在 Readable 流类的内部实现中。

【While most applications will almost never need to do this, there are situations within Node.js where this is done, particularly in the Readable stream class internals.】

readable.push('')#>

不建议使用 readable.push('')

【Use of readable.push('') is not recommended.】

向不处于对象模式的流推送一个零字节的 <string><Buffer><TypedArray><DataView> 会产生一个有趣的副作用。因为这本质上是对 readable.push() 的一次调用,该调用将结束读取过程。然而,由于参数是一个空字符串,可读缓冲区中不会添加任何数据,因此用户无法消费任何内容。

【Pushing a zero-byte <string>, <Buffer>, <TypedArray> or <DataView> to a stream that is not in object mode has an interesting side effect. Because it is a call to readable.push(), the call will end the reading process. However, because the argument is an empty string, no data is added to the readable buffer so there is nothing for a user to consume.】

readable.setEncoding() 调用后 highWaterMark 不一致#>

highWaterMark discrepancy after calling readable.setEncoding()

使用 readable.setEncoding() 会改变 highWaterMark 在非对象模式下的工作方式。

【The use of readable.setEncoding() will change the behavior of how the highWaterMark operates in non-object mode.】

通常,当前缓冲区的大小是以 字节 为单位与 highWaterMark 进行比较的。然而,在调用 setEncoding() 之后,比较函数将开始以 字符 为单位来测量缓冲区的大小。

【Typically, the size of the current buffer is measured against the highWaterMark in bytes. However, after setEncoding() is called, the comparison function will begin to measure the buffer's size in characters.】

在使用 latin1ascii 的常见情况下,这通常不是问题。但在处理可能包含多字节字符的字符串时,建议注意这种行为。

【This is not a problem in common cases with latin1 or ascii. But it is advised to be mindful about this behavior when working with strings that could contain multi-byte characters.】

Node.js 中文网 - 粤ICP备13048890号