Node.js v12.2.0 文档


目录

stream(流)#

中英对照提交修改

稳定性: 2 - 稳定

流(stream)是 Node.js 中处理流式数据的抽象接口。 stream 模块用于构建实现了流接口的对象。

Node.js 提供了多种流对象。 例如,HTTP 服务器的请求process.stdout 都是流的实例。

流可以是可读的、可写的、或者可读可写的。 所有的流都是 EventEmitter 的实例。

使用方法如下:

const stream = require('stream');

尽管理解流的工作方式很重要,但是 stream 模块主要用于开发者创建新类型的流实例。 对于以消费流对象为主的开发者,极少需要直接使用 stream 模块。

本文档的组织结构#

中英对照提交修改

本文档分为两个主要章节,外加其他注意事项作为第三章节。 第一章节阐述了在应用程序中使用流时需要的 API。 第二章节阐述了实现新类型的流时需要的 API。

流的类型#

中英对照提交修改

Node.js 中有四种基本的流类型:

对象模式#

中英对照提交修改

Node.js 创建的流都是运作在字符串和 Buffer(或 Uint8Array)上。 当然,流的实现也可以使用其它类型的 JavaScript 值(除了 null)。 这些流会以“对象模式”进行操作。

当创建流时,可以使用 objectMode 选项把流实例切换到对象模式。 将已存在的流切换到对象模式是不安全的。

缓冲#

中英对照提交修改

[可写流]和[可读流]都会在内部的缓冲器中存储数据,可以分别使用的 writable.writableBufferreadable.readableBuffer 来获取。

可缓冲的数据大小取决于传入流构造函数的 highWaterMark 选项。 对于普通的流, highWaterMark 指定了字节的总数。 对于对象模式的流, highWaterMark 指定了对象的总数。

当调用 stream.push(chunk) 时,数据会被缓冲在可读流中。 如果流的消费者没有调用 stream.read(),则数据会保留在内部队列中直到被消费。

一旦内部的可读缓冲的总大小达到 highWaterMark 指定的阈值时,流会暂时停止从底层资源读取数据,直到当前缓冲的数据被消费 (也就是说,流会停止调用内部的用于填充可读缓冲的 readable._read())。

当调用 writable.write(chunk) 时,数据会被缓冲在可写流中。 当内部的可写缓冲的总大小小于 highWaterMark 设置的阈值时,调用 writable.write() 会返回 true。 一旦内部缓冲的大小达到或超过 highWaterMark 时,则会返回 false

stream API 的主要目标,特别是 stream.pipe(),是为了限制数据的缓冲到可接受的程度,也就是读写速度不一致的源头与目的地不会压垮内存。

因为 DuplexTransform 都是可读又可写的,所以它们各自维护着两个相互独立的内部缓冲器用于读取和写入, 这使得它们在维护数据流时,读取和写入两边可以各自独立地运作。 例如,net.Socket 实例是 Duplex 流,它的可读端可以消费从 socket 接收的数据,而可写端则可以将数据写入到 socket。 因为数据写入到 socket 的速度可能比接收数据的速度快或者慢,所以在读写两端独立地进行操作(或缓冲)就显得很重要了。

用于消费流的 API#

查看v10.x中文文档

Almost all Node.js applications, no matter how simple, use streams in some manner. The following is an example of using streams in a Node.js application that implements an HTTP server:

const http = require('http');

const server = http.createServer((req, res) => {
  // `req` is an http.IncomingMessage, which is a Readable Stream
  // `res` is an http.ServerResponse, which is a Writable Stream

  let body = '';
  // Get the data as utf8 strings.
  // If an encoding is not set, Buffer objects will be received.
  req.setEncoding('utf8');

  // Readable streams emit 'data' events once a listener is added
  req.on('data', (chunk) => {
    body += chunk;
  });

  // The 'end' event indicates that the entire body has been received
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // Write back something interesting to the user:
      res.write(typeof data);
      res.end();
    } catch (er) {
      // uh oh! bad json!
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token o in JSON at position 1

Writable streams (such as res in the example) expose methods such as write() and end() that are used to write data onto the stream.

Readable streams use the EventEmitter API for notifying application code when data is available to be read off the stream. That available data can be read from the stream in multiple ways.

Both Writable and Readable streams use the EventEmitter API in various ways to communicate the current state of the stream.

Duplex and Transform streams are both Writable and Readable.

Applications that are either writing data to or consuming data from a stream are not required to implement the stream interfaces directly and will generally have no reason to call require('stream').

Developers wishing to implement new types of streams should refer to the section API for Stream Implementers.

可写流#

中英对照提交修改

可写流是对数据要被写入的目的地的一种抽象。

可写流的例子包括:

上面的一些例子事实上是实现了可写流接口的 Duplex 流。

所有可写流都实现了 [stream.Writable] 类定义的接口。

尽管可写流的具体实例可能略有差别,但所有的可写流都遵循同一基本的使用模式,如以下例子所示:

const myStream = getWritableStreamSomehow();
myStream.write('一些数据');
myStream.write('更多数据');
myStream.end('完成写入数据');

stream.Writable 类#

暂无中英对照提交修改

'close' 事件#

查看v10.x中文文档

The 'close' event is emitted when the stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur.

A Writable stream will always emit the 'close' event if it is created with the emitClose option.

'drain' 事件#

查看v10.x中文文档

If a call to stream.write(chunk) returns false, the 'drain' event will be emitted when it is appropriate to resume writing data to the stream.

// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // See if we should continue, or wait.
        // Don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}
'error' 事件#

中英对照提交修改

当写入数据发生错误时触发。

当触发 'error' 事件时,流还未被关闭。

'finish' 事件#

中英对照提交修改

调用 stream.end() 且缓冲数据都已传给底层系统之后触发。

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`写入 #${i}!\n`);
}
writer.end('写入结尾\n');
writer.on('finish', () => {
  console.error('写入已完成');
});
'pipe' 事件#

中英对照提交修改

当在可读流上调用 stream.pipe() 时触发。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.error('有数据正通过管道流入写入器');
  assert.equal(src, reader);
});
reader.pipe(writer);
'unpipe' 事件#

中英对照提交修改

当在可读流上调用 stream.unpipe() 时触发。

当可读流通过管道流向可写流发生错误时,也会触发 'unpipe' 事件。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.error('已移除可写流管道');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()#

中英对照提交修改

强制把所有写入的数据都缓冲到内存中。 当调用 stream.uncork()stream.end() 时,缓冲的数据才会被输出。

当写入大量小块数据到流时,内部缓冲可能失效,从而导致性能下降, writable.cork() 主要用于避免这种情况。 对于这种情况,实现了 writable._writev() 的流可以用更优的方式对写入的数据进行缓冲。

writable.destroy([error])#

查看v10.x中文文档

  • error <Error> Optional, an error to emit with 'error' event.
  • Returns: <this>

Destroy the stream. Optionally emit an 'error' event, and emit a 'close' event unless emitClose is set in false. After this call, the writable stream has ended and subsequent calls to write() or end() will result in an ERR_STREAM_DESTROYED error. This is a destructive and immediate way to destroy a stream. Previous calls to write() may not have drained, and may trigger an ERR_STREAM_DESTROYED error. Use end() instead of destroy if data should flush before close, or wait for the 'drain' event before destroying the stream. Implementors should not override this method, but instead implement writable._destroy().

writable.end([chunk][, encoding][, callback])#

查看v10.x中文文档

  • chunk <string> | <Buffer> | <Uint8Array> | <any> Optional data to write. For streams not operating in object mode, chunk must be a string, Buffer or Uint8Array. For object mode streams, chunk may be any JavaScript value other than null.
  • encoding <string> The encoding if chunk is a string
  • callback <Function> Optional callback for when the stream is finished
  • Returns: <this>

Calling the writable.end() method signals that no more data will be written to the Writable. The optional chunk and encoding arguments allow one final additional chunk of data to be written immediately before closing the stream. If provided, the optional callback function is attached as a listener for the 'finish' event.

Calling the stream.write() method after calling stream.end() will raise an error.

// Write 'hello, ' and then end with 'world!'
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed!
writable.setDefaultEncoding(encoding)#

中英对照提交修改

为[可写流]设置默认的 encoding

writable.uncork()#

中英对照提交修改

将调用 stream.cork() 后缓冲的所有数据输出到目标。

当使用 writable.cork()writable.uncork() 来管理流的写入缓冲时,建议使用 process.nextTick() 来延迟调用 writable.uncork()。 通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 进行批处理。

stream.cork();
stream.write('一些 ');
stream.write('数据 ');
process.nextTick(() => stream.uncork());

如果一个流上多次调用 writable.cork(),则必须调用同样次数的 writable.uncork() 才能输出缓冲的数据。

stream.cork();
stream.write('一些 ');
stream.cork();
stream.write('数据 ');
process.nextTick(() => {
  stream.uncork();
  // 数据不会被输出,直到第二次调用 uncork()。
  stream.uncork();
});
writable.writable#

暂无中英对照提交修改

Is true if it is safe to call writable.write().

writable.writableHighWaterMark#

中英对照提交修改

返回构造可写流时传入的 highWaterMark 的值。

writable.writableLength#

中英对照提交修改

返回队列中准备被写入的字节数(或对象数)。

writable.write(chunk[, encoding][, callback])#

中英对照提交修改

  • chunk <string> | <Buffer> | <Uint8Array> | <any> 要写入的数据。  对于非对象模式的流, chunk 必须是字符串、 BufferUint8Array。 对于对象模式的流, chunk 可以是任何 JavaScript 值,除了 null
  • encoding <string> 如果 chunk 是字符串,则指定字符编码。
  • callback <Function> 当数据块被输出到目标后的回调函数。
  • 返回: <boolean> 如果流需要等待 'drain' 事件触发才能继续写入更多数据,则返回 false,否则返回 true

writable.write() 写入数据到流,并在数据被完全处理之后调用 callback。 如果发生错误,则 callback 可能被调用也可能不被调用。 为了可靠地检测错误,可以为 'error' 事件添加监听器。

在接收了 chunk 后,如果内部的缓冲小于创建流时配置的 highWaterMark,则返回 true 。 如果返回 false ,则应该停止向流写入数据,直到 'drain' 事件被触发。

当流还未被排空时,调用 write() 会缓冲 chunk,并返回 false。 一旦所有当前缓冲的数据块都被排空了(被操作系统接收并传输),则触发 'drain' 事件。 建议一旦 write() 返回 false,则不再写入任何数据块,直到 'drain' 事件被触发。 当流还未被排空时,也是可以调用 write(),Node.js 会缓冲所有被写入的数据块,直到达到最大内存占用,这时它会无条件中止。 甚至在它中止之前, 高内存占用将会导致垃圾回收器的性能变差和 RSS 变高(即使内存不再需要,通常也不会被释放回系统)。 如果远程的另一端没有读取数据,TCP 的 socket 可能永远也不会排空,所以写入到一个不会排空的 socket 可能会导致远程可利用的漏洞。

对于 Transform, 写入数据到一个不会排空的流尤其成问题,因为 Transform 流默认会被暂停,直到它们被 pipe 或者添加了 'data''readable' 事件句柄。

如果要被写入的数据可以根据需要生成或者取得,建议将逻辑封装为一个[可读流]并且使用 stream.pipe()。 如果要优先调用 write(),则可以使用 'drain' 事件来防止背压与避免内存问题:

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// 在回调函数被执行后再进行其他的写入。
write('hello', () => {
  console.log('完成写入,可以进行更多的写入');
});

可读流#

中英对照提交修改

可读流是对提供数据的来源的一种抽象。

可读流的例子包括:

所有可读流都实现了 [stream.Readable] 类定义的接口。

两种读取模式#

中英对照提交修改

可读流运作于两种模式之一:流动模式(flowing)或暂停模式(paused)。

  • 在流动模式中,数据自动从底层系统读取,并通过 EventEmitter 接口的事件尽可能快地被提供给应用程序。
  • 在暂停模式中,必须显式调用 stream.read() 读取数据块。

所有[可读流]都开始于暂停模式,可以通过以下方式切换到流动模式:

可读流可以通过以下方式切换回暂停模式:

  • 如果没有管道目标,则调用 stream.pause()
  • 如果有管道目标,则移除所有管道目标。调用 stream.unpipe() 可以移除多个管道目标。

只有提供了消费或忽略数据的机制后,可读流才会产生数据。 如果消费的机制被禁用或移除,则可读流会停止产生数据。

为了向后兼容,移除 'data' 事件句柄不会自动地暂停流。 如果有管道目标,一旦目标变为 drain 状态并请求接收数据时,则调用 stream.pause() 也不能保证流会保持暂停模式。

如果可读流切换到流动模式,且没有可用的消费者来处理数据,则数据将会丢失。 例如,当调用 readable.resume() 时,没有监听 'data' 事件或 'data' 事件句柄已移除。

添加 'readable' 事件句柄会使流自动停止流动,并通过 readable.read() 消费数据。 如果 'readable' 事件句柄被移除,且存在 'data' 事件句柄,则流会再次开始流动。

三种状态#

查看v10.x中文文档

The "two modes" of operation for a Readable stream are a simplified abstraction for the more complicated internal state management that is happening within the Readable stream implementation.

Specifically, at any given point in time, every Readable is in one of three possible states:

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

When readable.readableFlowing is null, no mechanism for consuming the stream's data is provided. Therefore, the stream will not generate data. While in this state, attaching a listener for the 'data' event, calling the readable.pipe() method, or calling the readable.resume() method will switch readable.readableFlowing to true, causing the Readable to begin actively emitting events as data is generated.

Calling readable.pause(), readable.unpipe(), or receiving backpressure will cause the readable.readableFlowing to be set as false, temporarily halting the flowing of events but not halting the generation of data. While in this state, attaching a listener for the 'data' event will not switch readable.readableFlowing to true.

const { PassThrough, Writable } = require('stream');
const pass = new PassThrough();
const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false

pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok');  // Will not emit 'data'
pass.resume();     // Must be called to make stream emit 'data'

While readable.readableFlowing is false, data may be accumulating within the stream's internal buffer.

选择一种接口风格#

中英对照提交修改

可读流的 API 贯穿了多个 Node.js 版本,且提供了多种方法来消费流数据。 开发者通常应该选择其中一种方法来消费数据,不要在单个流使用多种方法来消费数据。 混合使用 on('data')on('readable')pipe() 或异步迭代器,会导致不明确的行为。

对于大多数用户,建议使用 readable.pipe(),因为它是消费流数据最简单的方式。 如果开发者需要精细地控制数据的传递与产生,可以使用 EventEmitterreadable.on('readable')/readable.read()readable.pause()/readable.resume()

stream.Readable 类#

暂无中英对照提交修改

'close' 事件#

查看v10.x中文文档

The 'close' event is emitted when the stream and any of its underlying resources (a file descriptor, for example) have been closed. The event indicates that no more events will be emitted, and no further computation will occur.

A Readable stream will always emit the 'close' event if it is created with the emitClose option.

'data' 事件#

中英对照提交修改

  • chunk <Buffer> | <string> | <any> 数据块。 对于非对象模式的流, chunk 可以是字符串或 Buffer。 对于对象模式的流, chunk 可以是任何 JavaScript 值,除了 null

当流将数据块传送给消费者后触发。 当调用 readable.pipe()readable.resume() 或绑定监听器到 'data' 事件时,流会转换到流动模式。 当调用 readable.read() 且有数据块返回时,也会触发 'data' 事件。

如果使用 readable.setEncoding() 为流指定了默认的字符编码,则监听器回调传入的数据为字符串,否则传入的数据为 Buffer

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`接收到 ${chunk.length} 个字节的数据`);
});
'end' 事件#

中英对照提交修改

当流中没有数据可供消费时触发。

'end' 事件只有在数据被完全消费掉后才会触发。 要想触发该事件,可以将流转换到流动模式,或反复调用 stream.read() 直到数据被消费完。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`接收到 ${chunk.length} 个字节的数据`);
});
readable.on('end', () => {
  console.log('已没有数据');
});
'error' 事件#

中英对照提交修改

当流因底层内部出错而不能产生数据、或推送无效的数据块时触发。

Event: 'pause'#

暂无中英对照

The 'pause' event is emitted when stream.pause() is called and readableFlowing is not false.

'readable' 事件#

查看v10.x中文文档

The 'readable' event is emitted when there is data available to be read from the stream. In some cases, attaching a listener for the 'readable' event will cause some amount of data to be read into an internal buffer.

const readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // There is some data to read now
  let data;

  while (data = this.read()) {
    console.log(data);
  }
});

The 'readable' event will also be emitted once the end of the stream data has been reached but before the 'end' event is emitted.

Effectively, the 'readable' event indicates that the stream has new information: either new data is available or the end of the stream has been reached. In the former case, stream.read() will return the available data. In the latter case, stream.read() will return null. For instance, in the following example, foo.txt is an empty file:

const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
  console.log('end');
});

The output of running this script is:

$ node test.js
readable: null
end

In general, the readable.pipe() and 'data' event mechanisms are easier to understand than the 'readable' event. However, handling 'readable' might result in increased throughput.

If both 'readable' and 'data' are used at the same time, 'readable' takes precedence in controlling the flow, i.e. 'data' will be emitted only when stream.read() is called. The readableFlowing property would become false. If there are 'data' listeners when 'readable' is removed, the stream will start flowing, i.e. 'data' events will be emitted without calling .resume().

Event: 'resume'#

暂无中英对照

The 'resume' event is emitted when stream.resume() is called and readableFlowing is not true.

readable.destroy([error])#

查看v10.x中文文档

  • error <Error> Error which will be passed as payload in 'error' event
  • Returns: <this>

Destroy the stream. Optionally emit an 'error' event, and emit a 'close' event unless emitClose is set in false. After this call, the readable stream will release any internal resources and subsequent calls to push() will be ignored. Implementors should not override this method, but instead implement readable._destroy().

readable.isPaused()#

中英对照提交修改

返回可读流当前的操作状态。 主要用于 readable.pipe() 底层的机制。 大多数情况下无需直接使用该方法。

const readable = new stream.Readable();

readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()#

中英对照提交修改

使流动模式的流停止触发 'data' 事件,并切换出流动模式。 任何可用的数据都会保留在内部缓存中。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`接收到 ${chunk.length} 字节的数据`);
  readable.pause();
  console.log('暂停一秒');
  setTimeout(() => {
    console.log('数据重新开始流动');
    readable.resume();
  }, 1000);
});

如果存在 'readable' 事件监听器,则该方法不起作用。

readable.pipe(destination[, options])#

中英对照提交修改

绑定可写流到可读流,将可读流自动切换到流动模式,并将可读流的所有数据推送到绑定的可写流。 数据流会被自动管理,所以即使可读流更快,目标可写流也不会超负荷。

例子,将可读流的所有数据通过管道推送到 file.txt 文件:

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// readable 的所有数据都推送到 'file.txt'。
readable.pipe(writable);

可以在单个可读流上绑定多个可写流。

readable.pipe() 会返回目标流的引用,这样就可以对流进行链式地管道操作:

const fs = require('fs');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

默认情况下,当来源可读流触发 'end' 事件时,目标可写流也会调用 stream.end() 结束写入。 若要禁用这种默认行为, end 选项应设为 false,这样目标流就会保持打开:

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('结束');
});

如果可读流发生错误,目标可写流不会自动关闭,需要手动关闭所有流以避免内存泄漏。

readable.read([size])#

查看v10.x中文文档

The readable.read() method pulls some data out of the internal buffer and returns it. If no data available to be read, null is returned. By default, the data will be returned as a Buffer object unless an encoding has been specified using the readable.setEncoding() method or the stream is operating in object mode.

The optional size argument specifies a specific number of bytes to read. If size bytes are not available to be read, null will be returned unless the stream has ended, in which case all of the data remaining in the internal buffer will be returned.

If the size argument is not specified, all of the data contained in the internal buffer will be returned.

The readable.read() method should only be called on Readable streams operating in paused mode. In flowing mode, readable.read() is called automatically until the internal buffer is fully drained.

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
});

Note that the while loop is necessary when processing data with readable.read(). Only after readable.read() returns null, 'readable' will be emitted.

A Readable stream in object mode will always return a single item from a call to readable.read(size), regardless of the value of the size argument.

If the readable.read() method returns a chunk of data, a 'data' event will also be emitted.

Calling stream.read([size]) after the 'end' event has been emitted will return null. No runtime error will be raised.

readable.readable#

暂无中英对照提交修改

Is true if it is safe to call readable.read().

readable.readableHighWaterMark#

中英对照提交修改

返回构造可读流时传入的 highWaterMark 的值。

readable.readableLength#

中英对照提交修改

返回队列中准备读取的字节数(或对象数)。

readable.resume()#

中英对照提交修改

将被暂停的可读流恢复触发 'data' 事件,并将流切换到流动模式。

readable.resume() 可以用来充分消耗流中的数据,但无需实际处理任何数据:

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('到达流的尽头,但无需读取任何数据');
  });

当存在 'readable' 事件监听器时, readable.resume() 不起作用。

readable.setEncoding(encoding)#

中英对照提交修改

为从可读流读取的数据设置字符编码。

默认情况下没有设置字符编码,流数据返回的是 Buffer 对象。 如果设置了字符编码,则流数据返回指定编码的字符串。 例如,调用 readable.setEncoding('utf-8') 会将数据解析为 UTF-8 数据,并返回字符串,调用 readable.setEncoding('hex') 则会将数据编码成十六进制字符串。

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('读取到 %d 个字符的字符串数据', chunk.length);
});
readable.unpipe([destination])#

中英对照提交修改

解绑之前使用 stream.pipe() 绑定的可写流。

如果没有指定 destination, 则解绑所有管道.

如果指定了 destination, 但它没有建立管道,则不起作用.

const fs = require('fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// 可读流的所有数据开始传输到 'file.txt',但一秒后停止。
readable.pipe(writable);
setTimeout(() => {
  console.log('停止写入 file.txt');
  readable.unpipe(writable);
  console.log('手动关闭文件流');
  writable.end();
}, 1000);
readable.unshift(chunk)#

查看v10.x中文文档

  • chunk <Buffer> | <Uint8Array> | <string> | <any> Chunk of data to unshift onto the read queue. For streams not operating in object mode, chunk must be a string, Buffer or Uint8Array. For object mode streams, chunk may be any JavaScript value other than null.

The readable.unshift() method pushes a chunk of data back into the internal buffer. This is useful in certain situations where a stream is being consumed by code that needs to "un-consume" some amount of data that it has optimistically pulled out of the source, so that the data can be passed on to some other party.

The stream.unshift(chunk) method cannot be called after the 'end' event has been emitted or a runtime error will be thrown.

Developers using stream.unshift() often should consider switching to use of a Transform stream instead. See the API for Stream Implementers section for more information.

// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
const { StringDecoder } = require('string_decoder');
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  let header = '';
  function onReadable() {
    let chunk;
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // Found the header boundary
        const split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // Remove the 'readable' listener before unshifting
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // Now the body of the message can be read from the stream.
        callback(null, header, stream);
      } else {
        // Still reading the header.
        header += str;
      }
    }
  }
}

Unlike stream.push(chunk), stream.unshift(chunk) will not end the reading process by resetting the internal reading state of the stream. This can cause unexpected results if readable.unshift() is called during a read (i.e. from within a stream._read() implementation on a custom stream). Following the call to readable.unshift() with an immediate stream.push('') will reset the reading state appropriately, however it is best to simply avoid calling readable.unshift() while in the process of performing a read.

readable.wrap(stream)#

中英对照提交修改

在 Node.js v0.10 之前,流没有实现当前定义的所有的流模块 API。(详见兼容性

当使用老版本的 Node.js 时,只能触发 'data' 事件或调用 stream.pause() 方法,可以使用 readable.wrap() 创建老版本的流作为数据源。

现在几乎无需使用 readable.wrap(),该方法主要用于老版本的 Node.js 应用和库。

例子:

const { OldReader } = require('./old-api-module.js');
const { Readable } = require('stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // 各种操作。
});
readable[Symbol.asyncIterator]()#

暂无中英对照

Stability: 2 - Stable

const fs = require('fs');

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const k of readable) {
    data += k;
  }
  console.log(data);
}

print(fs.createReadStream('file')).catch(console.log);

If the loop terminates with a break or a throw, the stream will be destroyed. In other terms, iterating over a stream will consume the stream fully. The stream will be read in chunks of size equal to the highWaterMark option. In the code example above, data will be in a single chunk if the file has less then 64kb of data because no highWaterMark option is provided to fs.createReadStream().

双工流与转换流#

stream.Duplex 类#

中英对照提交修改

双工流(Duplex)是同时实现了 ReadableWritable 接口的流。

Duplex 流的例子包括:

stream.Transform 类#

中英对照提交修改

转换流(Transform)是一种 Duplex 流,但它的输出与输入是相关联的。 与 Duplex 流一样, Transform 流也同时实现了 ReadableWritable 接口。

Transform 流的例子包括:

transform.destroy([error])#

查看v10.x中文文档

Destroy the stream, and optionally emit an 'error' event. After this call, the transform stream would release any internal resources. Implementors should not override this method, but instead implement readable._destroy(). The default implementation of _destroy() for Transform also emit 'close' unless emitClose is set in false.

stream.finished(stream[, options], callback)#

暂无中英对照

  • stream <Stream> A readable and/or writable stream.
  • options <Object>

    • error <boolean> If set to false, then a call to emit('error', err) is not treated as finished. Default: true.
    • readable <boolean> When set to false, the callback will be called when the stream ends even though the stream might still be readable. Default: true.
    • writable <boolean> When set to false, the callback will be called when the stream ends even though the stream might still be writable. Default: true.
  • callback <Function> A callback function that takes an optional error argument.

A function to get notified when a stream is no longer readable, writable or has experienced an error or a premature close event.

const { finished } = require('stream');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
  if (err) {
    console.error('Stream failed.', err);
  } else {
    console.log('Stream is done reading.');
  }
});

rs.resume(); // drain the stream

Especially useful in error handling scenarios where a stream is destroyed prematurely (like an aborted HTTP request), and will not emit 'end' or 'finish'.

The finished API is promisify-able as well;

const finished = util.promisify(stream.finished);

const rs = fs.createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // drain the stream

stream.pipeline(...streams, callback)#

查看v10.x中文文档

  • ...streams <Stream> Two or more streams to pipe between.
  • callback <Function> Called when the pipeline is fully done.

A module method to pipe between streams forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  }
);

The pipeline API is promisify-able as well:

const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);

用于实现流的 API#

中英对照提交修改

stream 模块 API 的设计是为了更容易使用 JavaScript 的原型继承模式来实现流。

流的开发者可以声明一个新的 JavaScript 类并继承四个基本流类中之一(stream.Writeablestream.Readablestream.Duplexstream.Transform),且确保调用了对应的父类构造器:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    // ...
  }
}

根据所创建的流类型,新的流类必须实现一个或多个特定的方法,如下图所示:

用例需实现的方法
只读流Readable_read
只写流Writable_write, _writev, _final
可读可写流Duplex_read, _write, _writev, _final
对写入的数据进行操作,然后读取结果Transform_transform, _flush, _final

实现流的代码中不应该调用流的公共方法,因为这些方法是给消费者使用的(详见用于消费流的API)。 这样做可能会导致使用流的应用程序代码产生不利的副作用。

简单的实现#

中英对照提交修改

对于简单的案例,构造流可以不依赖继承。 直接创建 stream.Writablestream.Readablestream.Duplexstream.Transform 的实例,并传入对应的方法作为构造函数选项。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  }
});

实现可写流#

中英对照提交修改

stream.Writable 类可用于实现可写流。

自定义的可写流必须调用 new stream.Writable([options]) 构造函数并实现 writable._write() 方法, writable._writev() 方法是可选的。

new stream.Writable([options])#

查看v10.x中文文档

  • options <Object>

    • highWaterMark <number> Buffer level when stream.write() starts returning false. Default: 16384 (16kb), or 16 for objectMode streams.
    • decodeStrings <boolean> Whether to encode strings passed to stream.write() to Buffers (with the encoding specified in the stream.write() call) before passing them to stream._write(). Other types of data are not converted (i.e. Buffers are not decoded into strings). Setting to false will prevent strings from being converted. Default: true.
    • defaultEncoding <string> The default encoding that is used when no encoding is specified as an argument to stream.write(). Default: 'utf8'.
    • objectMode <boolean> Whether or not the stream.write(anyObj) is a valid operation. When set, it becomes possible to write JavaScript values other than string, Buffer or Uint8Array if supported by the stream implementation. Default: false.
    • emitClose <boolean> Whether or not the stream should emit 'close' after it has been destroyed. Default: true.
    • write <Function> Implementation for the stream._write() method.
    • writev <Function> Implementation for the stream._writev() method.
    • destroy <Function> Implementation for the stream._destroy() method.
    • final <Function> Implementation for the stream._final() method.
    • autoDestroy <boolean> Whether this stream should automatically call .destroy() on itself after ending. Default: false.
const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    // Calls the stream.Writable() constructor
    super(options);
    // ...
  }
}

Or, when using pre-ES6 style constructors:

const { Writable } = require('stream');
const util = require('util');

function MyWritable(options) {
  if (!(this instanceof MyWritable))
    return new MyWritable(options);
  Writable.call(this, options);
}
util.inherits(MyWritable, Writable);

Or, using the Simplified Constructor approach:

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  }
});

writable._write(chunk, encoding, callback)#

查看v10.x中文文档

  • chunk <Buffer> | <string> | <any> The Buffer to be written, converted from the string passed to stream.write(). If the stream's decodeStrings option is false or the stream is operating in object mode, the chunk will not be converted & will be whatever was passed to stream.write().
  • encoding <string> If the chunk is a string, then encoding is the character encoding of that string. If chunk is a Buffer, or if the stream is operating in object mode, encoding may be ignored.
  • callback <Function> Call this function (optionally with an error argument) when processing is complete for the supplied chunk.

All Writable stream implementations must provide a writable._write() method to send data to the underlying resource.

Transform streams provide their own implementation of the writable._write().

This function MUST NOT be called by application code directly. It should be implemented by child classes, and called by the internal Writable class methods only.

The callback method must be called to signal either that the write completed successfully or failed with an error. The first argument passed to the callback must be the Error object if the call failed or null if the write succeeded.

All calls to writable.write() that occur between the time writable._write() is called and the callback is called will cause the written data to be buffered. When the callback is invoked, the stream might emit a 'drain' event. If a stream implementation is capable of processing multiple chunks of data at once, the writable._writev() method should be implemented.

If the decodeStrings property is explicitly set to false in the constructor options, then chunk will remain the same object that is passed to .write(), and may be a string rather than a Buffer. This is to support implementations that have an optimized handling for certain string data encodings. In that case, the encoding argument will indicate the character encoding of the string. Otherwise, the encoding argument can be safely ignored.

The writable._write() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.

writable._writev(chunks, callback)#

中英对照提交修改

  • chunks <Object[]> 要写入的多个数据块。 每个数据块的格式为{ chunk: ..., encoding: ... }
  • callback <Function> 当全部数据块被处理完成后的回调函数。

该函数不能被应用程序代码直接调用。 该函数应该由子类实现,且只能被内部的 Writable 类的方法调用。

writable._writev() 能够同时处理多个数据块。 如果实现了该方法,调用该方法时会传入当前缓冲在写入队列中的所有数据块。

writable._writev() 方法有下划线前缀,因为它是在定义在类的内部,不应该被用户程序直接调用。

writable._destroy(err, callback)#

中英对照提交修改

_destroy() 方法会被 writable.destroy() 调用。 它可以被子类重写,但不能直接调用。

writable._final(callback)#

中英对照提交修改

  • callback <Function> 当结束写入所有剩余数据时的回调函数。

_final() 方法不能直接调用。 它应该由子类实现,且只能通过内部的 Writable 类的方法调用。

该方法会在流关闭之前被调用,且在 callback 被调用后触发 'finish' 事件。 主要用于在流结束之前关闭资源或写入缓冲的数据。

写入时的异常处理#

中英对照提交修改

writable._write()writable._writev() 在执行期间发生的错误时,建议调用回调函数并传入错误对象作为第一个参数。 这样 Writable 就会触发 'error' 事件。 从 writable._write() 中抛出错误可能会导致无法预期的结果。 使用回调可以确保对错误进行一致且可预测的处理。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('无效的数据块'));
    } else {
      callback();
    }
  }
});

可写流的例子#

中英对照提交修改

以下举例了一个相当简单(并且有点无意义)的自定义的 Writable 流的实现。 虽然这个特定的 Writable 流的实例没有任何实际的特殊用途,但该示例说明了一个自定义的 Writable 流实例的每个必需元素:

const { Writable } = require('stream');

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('数据块是无效的'));
    } else {
      callback();
    }
  }
}

在可写流中解码 buffer#

中英对照提交修改

解码 buffer 是一个常见的任务,例如使用转换流处理字符串输入。 当使用多字节的字符编码(比如 UTF-8)时,这是一个重要的处理。 下面的例子展示了如何使用 StringDecoderWritable 解码多字节的字符串。

const { Writable } = require('stream');
const { StringDecoder } = require('string_decoder');

class StringWritable extends Writable {
  constructor(options) {
    super(options);
    this._decoder = new StringDecoder(options && options.defaultEncoding);
    this.data = '';
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk);
    }
    this.data += chunk;
    callback();
  }
  _final(callback) {
    this.data += this._decoder.end();
    callback();
  }
}

const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();

w.write('货币: ');
w.write(euro[0]);
w.end(euro[1]);

console.log(w.data); // 货币: €

实现可读流#

中英对照提交修改

stream.Readable 类可用于实现可读流。

自定义的可读流必须调用 new stream.Readable([options]) 构造函数并实现 readable._read() 方法。

new stream.Readable([options])#

查看v10.x中文文档

  • options <Object>

    • highWaterMark <number> The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Default: 16384 (16kb), or 16 for objectMode streams.
    • encoding <string> If specified, then buffers will be decoded to strings using the specified encoding. Default: null.
    • objectMode <boolean> Whether this stream should behave as a stream of objects. Meaning that stream.read(n) returns a single value instead of a Buffer of size n. Default: false.
    • read <Function> Implementation for the stream._read() method.
    • destroy <Function> Implementation for the stream._destroy() method.
    • autoDestroy <boolean> Whether this stream should automatically call .destroy() on itself after ending. Default: false.
const { Readable } = require('stream');

class MyReadable extends Readable {
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
    // ...
  }
}

Or, when using pre-ES6 style constructors:

const { Readable } = require('stream');
const util = require('util');

function MyReadable(options) {
  if (!(this instanceof MyReadable))
    return new MyReadable(options);
  Readable.call(this, options);
}
util.inherits(MyReadable, Readable);

Or, using the Simplified Constructor approach:

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    // ...
  }
});

readable._read(size)#

查看v10.x中文文档

  • size <number> Number of bytes to read asynchronously

This function MUST NOT be called by application code directly. It should be implemented by child classes, and called by the internal Readable class methods only.

All Readable stream implementations must provide an implementation of the readable._read() method to fetch data from the underlying resource.

When readable._read() is called, if data is available from the resource, the implementation should begin pushing that data into the read queue using the this.push(dataChunk) method. _read() should continue reading from the resource and pushing data until readable.push() returns false. Only when _read() is called again after it has stopped should it resume pushing additional data onto the queue.

Once the readable._read() method has been called, it will not be called again until the readable.push() method is called.

The size argument is advisory. For implementations where a "read" is a single operation that returns data can use the size argument to determine how much data to fetch. Other implementations may ignore this argument and simply provide data whenever it becomes available. There is no need to "wait" until size bytes are available before calling stream.push(chunk).

The readable._read() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.

readable._destroy(err, callback)#

中英对照提交修改

_destroy() 方法会被 readable.destroy() 调用。 它可以被子类重写,但不能直接调用。

readable.push(chunk[, encoding])#

查看v10.x中文文档

  • chunk <Buffer> | <Uint8Array> | <string> | <null> | <any> Chunk of data to push into the read queue. For streams not operating in object mode, chunk must be a string, Buffer or Uint8Array. For object mode streams, chunk may be any JavaScript value.
  • encoding <string> Encoding of string chunks. Must be a valid Buffer encoding, such as 'utf8' or 'ascii'.
  • Returns: <boolean> true if additional chunks of data may continue to be pushed; false otherwise.

When chunk is a Buffer, Uint8Array or string, the chunk of data will be added to the internal queue for users of the stream to consume. Passing chunk as null signals the end of the stream (EOF), after which no more data can be written.

When the Readable is operating in paused mode, the data added with readable.push() can be read out by calling the readable.read() method when the 'readable' event is emitted.

When the Readable is operating in flowing mode, the data added with readable.push() will be delivered by emitting a 'data' event.

The readable.push() method is designed to be as flexible as possible. For example, when wrapping a lower-level source that provides some form of pause/resume mechanism, and a data callback, the low-level source can be wrapped by the custom Readable instance:

// `_source` is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowLevelSourceObject();

    // Every time there's data, push it into the internal buffer.
    this._source.ondata = (chunk) => {
      // If push() returns false, then stop reading from source
      if (!this.push(chunk))
        this._source.readStop();
    };

    // When the source ends, push the EOF-signaling `null` chunk
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read will be called when the stream wants to pull more data in
  // the advisory size argument is ignored in this case.
  _read(size) {
    this._source.readStart();
  }
}

The readable.push() method is intended be called only by Readable implementers, and only from within the readable._read() method.

For streams not operating in object mode, if the chunk parameter of readable.push() is undefined, it will be treated as empty string or buffer. See readable.push('') for more information.

读取时的异常处理#

中英对照提交修改

readable._read() 在执行期间发生的错误时,建议触发 'error' 事件而不是抛出错误。 从 readable._read() 中抛出错误可能会导致无法预期的结果。 使用'error' 事件可以确保对错误进行一致且可预测的处理。

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    if (checkSomeErrorCondition()) {
      process.nextTick(() => this.emit('error', err));
      return;
    }
    // 各种处理。
  }
});

可读流的例子#

中英对照提交修改

下面是一个可读流的简单例子,依次触发读取 1 到 1,000,000:

const { Readable } = require('stream');

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      const str = String(i);
      const buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
}

实现双工流#

中英对照提交修改

[双工流]同时实现了[可读流]和[可写流],例如 TCP socket 连接。 因为 JavaScript 不支持多重继承,所以使用 stream.Duplex 类实现[双工流](而不是使用 stream.Readable 类和 stream.Writable 类)。

stream.Duplex 类的原型继承自 stream.Readable 和寄生自 stream.Writable,但是 instanceof 对这两个基础类都可用,因为重写了 stream.WritableSymbol.hasInstance

自定义的双工流必须调用 new stream.Duplex([options]) 构造函数并实现 readable._read()writable._write() 方法。

new stream.Duplex(options)#

中英对照提交修改

  • options <Object> 同时传给 WritableReadable 的构造函数。  * allowHalfOpen <boolean> 如果设为 false,则当可读端结束时,可写端也会自动结束。 默认为 true

    • readableObjectMode <boolean> 设置流的可读端为 objectMode。 如果 objectModetrue,则不起作用。 默认为 false

    • writableObjectMode <boolean> 设置流的可写端为 objectMode。 如果 objectModetrue,则不起作用。 默认为 false

    • readableHighWaterMark <number> 设置流的可读端的 highWaterMark。 如果已经设置了 highWaterMark,则不起作用。

    • writableHighWaterMark <number> 设置流的可写端的 highWaterMark。 如果已经设置了 highWaterMark,则不起作用。

const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
}

使用 ES6 之前的语法:

const { Duplex } = require('stream');
const util = require('util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);

使用简化的构造函数:

const { Duplex } = require('stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  }
});

双工流的例子#

中英对照提交修改

下面是一个双工流的例子,封装了一个可读可写的底层资源对象。

const { Duplex } = require('stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // 底层资源只处理字符串。
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
}

双工流最重要的方面是,可读端和可写端相互独立于彼此地共存在同一个对象实例中。

对象模式的双工流#

中英对照提交修改

对双工流来说,可以使用 readableObjectMode 和 writableObjectMode 选项来分别设置可读端和可写端的 objectMode

在下面的例子中,创建了一个变换流(双工流的一种),对象模式的可写端接收 JavaScript 数值,并在可读端转换为十六进制字符串。

const { Transform } = require('stream');

// 转换流也是双工流。
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // 强制把 chunk 转换成数值。
    chunk |= 0;

    // 将 chunk 转换成十六进制。
    const data = chunk.toString(16);

    // 推送数据到可读队列。
    callback(null, '0'.repeat(data.length % 2) + data);
  }
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// 打印: 01
myTransform.write(10);
// 打印: 0a
myTransform.write(100);
// 打印: 64

实现转换流#

中英对照提交修改

[转换流]是一种[双工流],它会对输入做些计算然后输出。 例如 zlib 流和 crypto 流会压缩、加密或解密数据。

输出流的大小、数据块的数量都不一定会和输入流的一致。 例如, Hash 流在输入结束时只会输出一个数据块,而 zlib 流的输出可能比输入大很多或小很多。 stream.Transform 类可用于实现了一个[转换流]。

stream.Transform 类继承自 stream.Duplex,并且实现了自有的 writable._write()readable._read() 方法。 自定义的转换流必须实现 transform._transform() 方法,transform._flush() 方法是可选的。

当使用转换流时,如果可读端的输出没有被消费,则写入流的数据可能会导致可写端被暂停。

new stream.Transform([options])#

中英对照提交修改

const { Transform } = require('stream');

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // ...
  }
}

使用 ES6 之前的语法:

const { Transform } = require('stream');
const util = require('util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform);

使用简化的构造函数:

const { Transform } = require('stream');

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  }
});

'finish' 与 'end' 事件#

中英对照提交修改

'finish' 事件来自 stream.Writable 类,'end' 事件来自 stream.Readable 类。 当调用了 stream.end() 并且 stream._transform() 处理完全部数据块之后,触发 'finish' 事件。 当调用了 transform._flush() 中的回调函数并且所有数据已经输出之后,触发 'end' 事件。

transform._flush(callback)#

中英对照提交修改

  • callback <Function> 当剩余的数据被 flush 后的回调函数。

该函数不能被应用程序代码直接调用。 它应该由子类实现,且只能被内部的 Readable 类的方法调用。

某些情况下,转换操作可能需要在流的末尾发送一些额外的数据。 例如, zlib 压缩流时会储存一些用于优化输出的内部状态。 当流结束时,这些额外的数据需要被 flush 才算完成压缩。

自定义的[转换流]的 transform._flush() 方法是可选的。 当没有更多数据要被消费时,就会调用这个方法,但如果是在 'end' 事件被触发之前调用则会发出可读流结束的信号。

transform._flush() 的实现中, readable.push() 可能会被调用零次或多次。 当 flush 操作完成时,必须调用 callback 函数。

transform._flush() 方法有下划线前缀,因为它是在定义在类的内部,不应该被用户程序直接调用。

transform._transform(chunk, encoding, callback)#

查看v10.x中文文档

  • chunk <Buffer> | <string> | <any> The Buffer to be transformed, converted from the string passed to stream.write(). If the stream's decodeStrings option is false or the stream is operating in object mode, the chunk will not be converted & will be whatever was passed to stream.write().
  • encoding <string> If the chunk is a string, then this is the encoding type. If chunk is a buffer, then this is the special value - 'buffer', ignore it in this case.
  • callback <Function> A callback function (optionally with an error argument and data) to be called after the supplied chunk has been processed.

This function MUST NOT be called by application code directly. It should be implemented by child classes, and called by the internal Readable class methods only.

All Transform stream implementations must provide a _transform() method to accept input and produce output. The transform._transform() implementation handles the bytes being written, computes an output, then passes that output off to the readable portion using the readable.push() method.

The transform.push() method may be called zero or more times to generate output from a single input chunk, depending on how much is to be output as a result of the chunk.

It is possible that no output is generated from any given chunk of input data.

The callback function must be called only when the current chunk is completely consumed. The first argument passed to the callback must be an Error object if an error occurred while processing the input or null otherwise. If a second argument is passed to the callback, it will be forwarded on to the readable.push() method. In other words, the following are equivalent:

transform.prototype._transform = function(data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function(data, encoding, callback) {
  callback(null, data);
};

The transform._transform() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.

transform._transform() is never called in parallel; streams implement a queue mechanism, and to receive the next chunk, callback must be called, either synchronously or asynchronously.

stream.PassThrough 类#

中英对照提交修改

stream.PassThrough 类是一个无关紧要的[转换流],只是单纯地把输入的字节原封不动地输出。 它主要用于示例或测试,但有时也会用于某些新颖的流的基本组成部分。

其他注意事项#

兼容旧版本的 Node.js#

暂无中英对照提交修改

Prior to Node.js 0.10, the Readable stream interface was simpler, but also less powerful and less useful.

  • Rather than waiting for calls to the stream.read() method, 'data' events would begin emitting immediately. Applications that would need to perform some amount of work to decide how to handle data were required to store read data into buffers so the data would not be lost.
  • The stream.pause() method was advisory, rather than guaranteed. This meant that it was still necessary to be prepared to receive 'data' events even when the stream was in a paused state.

In Node.js 0.10, the Readable class was added. For backward compatibility with older Node.js programs, Readable streams switch into "flowing mode" when a 'data' event handler is added, or when the stream.resume() method is called. The effect is that, even when not using the new stream.read() method and 'readable' event, it is no longer necessary to worry about losing 'data' chunks.

While most applications will continue to function normally, this introduces an edge case in the following conditions:

  • No 'data' event listener is added.
  • The stream.resume() method is never called.
  • The stream is not piped to any writable destination.

For example, consider the following code:

// WARNING!  BROKEN!
net.createServer((socket) => {

  // We add an 'end' listener, but never consume the data
  socket.on('end', () => {
    // It will never get here.
    socket.end('The message was received but was not processed.\n');
  });

}).listen(1337);

Prior to Node.js 0.10, the incoming message data would be simply discarded. However, in Node.js 0.10 and beyond, the socket remains paused forever.

The workaround in this situation is to call the stream.resume() method to begin the flow of data:

// Workaround
net.createServer((socket) => {
  socket.on('end', () => {
    socket.end('The message was received but was not processed.\n');
  });

  // Start the flow of data, discarding it.
  socket.resume();
}).listen(1337);

In addition to new Readable streams switching into flowing mode, pre-0.10 style streams can be wrapped in a Readable class using the readable.wrap() method.

readable.read(0)#

中英对照提交修改

在某些情况下,需要触发底层可读流的刷新,但实际并不消费任何数据。 在这种情况下,可以调用 readable.read(0),返回 null

如果内部读取缓冲小于 highWaterMark,且流还未被读取,则调用 stream.read(0) 会触发调用底层的 stream._read()

虽然大多数应用程序几乎不需要这样做,但 Node.js 中会出现这种情况,尤其是在可读流类的内部。

readable.push('')#

中英对照提交修改

不推荐使用 readable.push('')

向一个非对象模式的流推入一个零字节的字符串、 BufferUint8Array 会产生副作用。 因为调用了 readable.push(),该调用会结束读取进程。 然而,因为参数是一个空字符串,没有数据被添加到可读缓冲, 所以也就没有数据可供用户消费。

调用 `readable.setEncoding()` 之后 `highWaterMark` 的差异#

查看v10.x中文文档

The use of readable.setEncoding() will change the behavior of how the highWaterMark operates in non-object mode.

Typically, the size of the current buffer is measured against the highWaterMark in bytes. However, after setEncoding() is called, the comparison function will begin to measure the buffer's size in characters.

This is not a problem in common cases with latin1 or ascii. But it is advised to be mindful about this behavior when working with strings that could contain multi-byte characters.