Node.js v6.10.3 文档


目录

stream (流)#

查看英文版 / 参与翻译

稳定性: 2 - 稳定的

流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。

Node.js 提供了多种流对象。 例如, HTTP 请求process.stdout 就都是流的实例。

流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。

stream 模块可以通过以下方式引入:

const stream = require('stream');

尽管所有的 Node.js 用户都应该理解流的工作方式,这点很重要, 但是 stream 模块本身只对于那些需要创建新的流的实例的开发者最有用处。 对于主要是 消费 流的开发者来说,他们很少(如果有的话)需要直接使用 stream 模块。

本文档的组织#

查看英文版 / 参与翻译

本文档主要分为两节,第三节是一些额外的注意事项。第一节阐述了在应用中和 使用 流相关的 API 。 第二节阐述了和 实现 新的流类型相关的 API 。

流的类型#

查看英文版 / 参与翻译

Node.js 中有四种基本的流类型:

对象模式#

查看英文版 / 参与翻译

所有使用 Node.js API 创建的流对象都只能操作 strings 和 Buffer 对象。但是,通过一些第三方流的实现,你依然能够处理其它类型的 JavaScript 值 (除了 null,它在流处理中有特殊意义)。 这些流被认为是工作在 “对象模式”(object mode)。

在创建流的实例时,可以通过 objectMode 选项使流的实例切换到对象模式。试图将已经存在的流切换到对象模式是不安全的。

缓冲#

查看英文版 / 参与翻译

WritableReadable 流都会将数据存储到内部的缓存(buffer)中。这些缓存可以 通过相应的 writable._writableState.getBuffer()readable._readableState.buffer来获取。

缓存的大小取决于传递给流构造函数的 highWaterMark 选项。 对于普通的流, highWaterMark 选项指定了总共的字节数。对于工作在对象模式的流, highWaterMark 指定了对象的总数。

当可读流的实现调用 stream.push(chunk) 方法时,数据被放到缓存中。如果流的消费者 没有调用 stream.read() 方法, 这些数据会始终存在于内部队列中,直到被消费。

当内部可读缓存的大小达到 highWaterMark 指定的阈值时,流会暂停从底层资源读取数据,直到当前 缓存的数据被消费 (也就是说, 流会在内部停止调用 readable._read() 来填充可读缓存)。

可写流通过反复调用 writable.write(chunk) 方法将数据放到缓存。 当内部可写缓存的总大小小于 highWaterMark 指定的阈值时, 调用 writable.write() 将返回true。 一旦内部缓存的大小达到或超过 highWaterMark ,调用 writable.write() 将返回 false

stream API 的关键目标, 尤其对于 stream.pipe() 方法, 就是限制缓存数据大小,以达到可接受的程度。这样,对于读写速度不匹配的源头和目标,就不会超出可用的内存大小。

DuplexTransform 都是可读写的。 在内部,它们都维护了 两个 相互独立的缓存用于读和写。 在维持了合理高效的数据流的同时,也使得对于读和写可以独立进行而互不影响。 例如, net.Socket 就是 Duplex 的实例,它的可读端可以消费从套接字(socket)中接收的数据, 可写端则可以将数据写入到套接字。 由于数据写入到套接字中的速度可能比从套接字接收数据的速度快或者慢, 在读写两端使用独立缓存,并进行独立操作就显得很重要了。

流消费者的 API#

查看英文版 / 参与翻译

几乎所有的 Node.js 应用,不管多么简单,都在某种程度上使用了流。 下面是在 Node.js 应用中使用流实现的一个简单的 HTTP 服务器:

const http = require('http');

const server = http.createServer( (req, res) => {
  // req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  // res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  let body = '';
  // 接收数据为 utf8 字符串,
  // 如果没有设置字符编码,将接收到 Buffer 对象。
  req.setEncoding('utf8');

  // 如果监听了 'data' 事件,Readable streams 触发 'data' 事件 
  req.on('data', (chunk) => {
    body += chunk;
  });

  // end 事件表明整个 body 都接收完毕了 
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // 发送一些信息给用户
      res.write(typeof data);
      res.end();
    } catch (er) {
      // json 数据解析失败 
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o

Writable 流 (比如例子中的 res) 暴露了一些方法,比如 write()end() 。这些方法可以将数据写入到流中。

当流中的数据可以读取时,Readable 流使用 EventEmitter API 来通知应用。 这些数据可以使用多种方法从流中读取。

WritableReadable 流都使用了 EventEmitter API ,通过多种方式, 与流的当前状态进行交互。

DuplexTransform 都是同时满足 WritableReadable

对于只是简单写入数据到流和从流中消费数据的应用来说, 不要求直接实现流接口,通常也不需要调用 require('stream')

需要实现两种类型流的开发者可以参考 API for Stream Implementers

可写流#

查看英文版 / 参与翻译

Writable streams 是 destination 的一种抽象,这种 destination 允许数据写入。

Writable 的例子包括了:

注意: 上面的某些例子事实上是 Duplex 流,只是实现了 Writable 接口。

所有 Writable 流都实现了 stream.Writable 类定义的接口。

尽管特定的 Writable 流的实现可能略有差别, 所有的 Writable streams 都可以按一种基本模式进行使用,如下面例子所示:

const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');

stream.Writable 类#

查看英文版 / 参与翻译

'close' 事件#

查看英文版 / 参与翻译

'close' 事件将在流或其底层资源(比如一个文件)关闭后触发。'close' 事件触发后,该流将不会再触发任何事件。

不是所有可写流都会触发 'close' 事件。

'drain' 事件#

查看英文版 / 参与翻译

如果调用 stream.write(chunk) 方法返回 false,流将在适当的时机触发 'drain' 事件,这时才可以继续向流中写入数据。

// 向可写流中写入数据一百万次。
// 需要注意背压 (back-pressure)。
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i--;
      if (i === 0) {
        // 最后 一次
        writer.write(data, encoding, callback);
      } else {
        // 检查是否可以继续写入。 
        // 这里不要传递 callback, 因为写入还没有结束! 
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // 这里提前停下了, 
      // 'drain' 事件触发后才可以继续写入  
      writer.once('drain', write);
    }
  }
}
'error' 事件#

查看英文版 / 参与翻译

'error' 事件在写入数据出错或者使用管道出错时触发。事件发生时,回调函数仅会接收到一个 Error 参数。

注意: 'error' 事件发生时,流并不会关闭。

'finish' 事件#

查看英文版 / 参与翻译

在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统(underlying system)之后, 'finish' 事件将被触发。

const writer = getWritableStreamSomehow();
for (var i = 0; i < 100; i ++) {
  writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.error('All writes are now complete.');
});
'pipe' 事件#

查看英文版 / 参与翻译

  • src <stream.Readable> 输出到目标可写流(writable)的源流(source stream)

在可读流(readable stream)上调用 stream.pipe() 方法,并在目标流向 (destinations) 中添加当前可写流 ( writable ) 时,将会在可写流上触发 'pipe' 事件。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.error('something is piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);
'unpipe' 事件#

查看英文版 / 参与翻译

Readable 上调用 stream.unpipe() 方法,从目标流向中移除当前 Writable 时,将会触发 'unpipe' 事件。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.error('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()#

查看英文版 / 参与翻译

调用 writable.cork() 方法将强制所有写入数据都内存中的缓冲区里。 直到调用 stream.uncork()stream.end() 方法时,缓冲区里的数据才会被输出。

在向流中写入大量小块数据(small chunks of data)时,内部缓冲区(internal buffer)可能失效,从而导致性能下降。writable.cork() 方法主要就是用来避免这种情况。 对于这种情况, 实现了 writable._writev() 方法的流可以对写入的数据进行缓冲,从而提高写入效率。

也可查看 writable.uncork()

writable.end([chunk][, encoding][, callback])#

查看英文版 / 参与翻译

  • chunk <String> | <Buffer> | <any> 可选的,需要写入的数据。对于非对象模式下的流, chunk 必须是字符串或者 Buffer。对于对象模式下的流, chunk 可以是任意的 JavaScript 值,除了 null
  • encoding <String> 如果 chunk 是字符串,这里指定字符编码。
  • callback <Function> 可选的,流结束时的回调函数。

调用 writable.end() 方法表明接下来没有数据要被写入 Writable。通过传入可选的 chunkencoding 参数,可以在关闭流之前再写入一段数据。如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数。

在调用了 stream.end() 方法之后,再调用 stream.write() 方法将会导致错误。

// 写入 'hello, ' ,并用 'world!' 来结束写入
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// 后面不允许再写入数据!
writable.setDefaultEncoding(encoding)#

查看英文版 / 参与翻译

  • encoding <String> 新的默认编码
  • 返回: this

writable.setDefaultEncoding() 用于为 Writable 设置 encoding

writable.uncork()#

查看英文版 / 参与翻译

writable.uncork() 将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据。

如果使用 writable.cork()writable.uncork() 来管理写入缓存,建议使用 process.nextTick() 来延迟调用 writable.uncork() 方法。通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 方法进行批处理。

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());

如果一个流多次调用了 writable.cork() 方法,那么也必须调用同样次数的 writable.uncork() 方法以输出缓冲区数据。

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();
  // 之前的数据只有在 uncork() 被二次调用后才会输出
  stream.uncork();
});

也可查看 writable.cork()

writable.write(chunk[, encoding][, callback])#

查看英文版 / 参与翻译

  • chunk <String> | <Buffer> 要写入的数据
  • encoding <String> 如果 chunk 是字符串,这里指定字符编码
  • callback <Function> 缓冲数据输出时的回调函数
  • 返回: <Boolean> 如果流需要等待 'drain' 事件触发才能继续写入数据,这里将返回 false ; 否则返回 true

writable.write() 方法向流中写入数据,并在数据处理完成后调用 callback 。如果有错误发生, callback 不一定 会接收到这个错误作为第一个参数。要确保可靠地检测到写入错误,应该监听 'error' 事件。

在确认了 chunk 后,如果内部缓冲区的大小小于创建流时设定的 highWaterMark 阈值,函数将返回 true 。 如果返回值为 false ,应该停止向流中写入数据,直到 'drain' 事件被触发。

While a stream is not draining, calls to write() will buffer chunk, and return false. Once all currently buffered chunks are drained (accepted for delivery by the operating system), the 'drain' event will be emitted. It is recommended that once write() returns false, no more chunks be written until the 'drain' event is emitted. While calling write() on a stream that is not draining is allowed, Node.js will buffer all written chunks until maximum memory usage occurs, at which point it will abort unconditionally. Even before it aborts, high memory usage will cause poor garbage collector performance and high RSS (which is not typically released back to the system, even after the memory is no longer required). Since TCP sockets may never drain if the remote peer does not read the data, writing a socket that is not draining may lead to a remotely exploitable vulnerability.

Writing data while the stream is not draining is particularly problematic for a Transform, because the Transform streams are paused by default until they are piped or an 'data' or 'readable' event handler is added.

If the data to be written can be generated or fetched on demand, it is recommended to encapsulate the logic into a Readable and use stream.pipe(). However, if calling write() is preferred, it is possible to respect backpressure and avoid memory issues using the the 'drain' event:

function write (data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb)
  } else {
    process.nextTick(cb)
  }
}

// Wait for cb to be called before doing any other write.
write('hello', () => {
  console.log('write completed, do more writes now')
})

对象模式的写入流将忽略 encoding 参数。

可读流#

查看英文版 / 参与翻译

可读流(Readable streams)是对提供数据的 源头 (source)的抽象。

可读流的例子包括:

所有的 Readable 都实现了 stream.Readable 类定义的接口。

两种模式#

查看英文版 / 参与翻译

可读流事实上工作在下面两种模式之一:flowing 和 paused 。

在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。

在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。

所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式:

可读流可以通过下面途径切换到 paused 模式:

  • 如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
  • 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

这里需要记住的重要概念就是,可读流需要先为其提供消费或忽略数据的机制,才能开始提供数据。如果消费机制被禁用或取消,可读流将 尝试 停止生成数据。

注意: 为了向后兼容,取消 'data' 事件监听并 不会 自动将流暂停。同时,如果存在管道目标(pipe destination),且目标状态变为可以接收数据(drain and ask for more data),调用了 stream.pause() 方法也并不保证流会一直 保持 暂停状态。

注意: 如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。

三种状态#

查看英文版 / 参与翻译

可读流的“两种操作模式”是一种简单抽象。它抽象了在可读流实现(Readable stream implementation)内部发生的复杂的状态管理过程。

在任意时刻,任意可读流应确切处于下面三种状态之一:

  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true

readable._readableState.flowingnull,由于不存在数据消费者,可读流将不会产生数据。

如果监听 'data' 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法, readable._readableState.flowing 的值将会变为 true 。这时,随着数据生成,可读流开始频繁触发事件。

调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure), 将导致 readable._readableState.flowing 值变为 false。 这将暂停事件流,但 不会 暂停数据生成。

readable._readableState.flowing 值为 false 时, 数据可能堆积到流的内部缓存中。

选择一种#

查看英文版 / 参与翻译

可读流 API 的演化贯穿了多个 Node.js 版本,提供了多种方法来消费流数据。通常开发者应该选择其中 一种 来消费数据,而 不应该 在单个流使用多种方法来消费数据。

对于大多数用户,建议使用 readable.pipe() 方法来消费流数据,因为它是最简单的一种实现。开发者如果要精细地控制数据传递和产生的过程,可以使用 EventEmitterreadable.pause()/readable.resume() 提供的 API 。

stream.Readable 类#

查看英文版 / 参与翻译

'close' 事件#

查看英文版 / 参与翻译

'close' 事件将在流或其底层资源(比如一个文件)关闭后触发。'close' 事件触发后,该流将不会再触发任何事件。

不是所有 Readable 都会触发 'close' 事件。

'data' 事件#

查看英文版 / 参与翻译

  • chunk <Buffer> | <String> | <any> 数据片段。对于非对象模式的可读流,这是一个字符串或者 Buffer。 对于对象模式的可读流,这可以是除 null 以外的任意类型 JavaScript 值。

'data' 事件会在流将数据传递给消费者时触发。当流转换到 flowing 模式时会触发该事件。调用 readable.pipe()readable.resume() 方法,或为 'data' 事件添加回调可以将流转换到 flowing 模式。 'data' 事件也会在调用 readable.read() 方法并有数据返回时触发。

在没有明确暂停的流上添加 'data' 事件监听会将流转换为 flowing 模式。 数据会在可用时尽快传递给下个流程。

如果调用 readable.setEncoding() 方法明确为流指定了默认编码,回调函数将接收到一个字符串,否则接收到的数据将是一个 Buffer 实例。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
'end' 事件#

查看英文版 / 参与翻译

'end' 事件将在流中再没有数据可供消费时触发。

注意'end' 事件只有在数据被完全消费后 才会触发 。 可以在数据被完全消费后,通过将流转换到 flowing 模式, 或反复调用 stream.read() 方法来实现这一点。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
});
'error' 事件#

查看英文版 / 参与翻译

'error' 事件可以在任何时候在可读流实现(Readable implementation)上触发。 通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。

回调函数将接收到一个 Error 对象。

'readable' 事件#

查看英文版 / 参与翻译

'readable' 事件将在流中有数据可供读取时触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中。

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // 有一些数据可读了
});

当到达流数据尾部时, 'readable' 事件也会触发。触发顺序在 'end' 事件之前。

事实上, 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。 例如,下面的例子中的 foo.txt 是一个空文件:

const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log('readable:', rr.read());
});
rr.on('end', () => {
  console.log('end');
});

上面交脚本的输出如下:

$ node test.js
readable: null
end

注意: 通常情况下, 应该使用 readable.pipe() 方法和 'data' 事件机制,而不是 'readable' 事件。

readable.isPaused()#

查看英文版 / 参与翻译

readable.isPaused() 方法返回可读流的当前操作状态。 该方法主要是在 readable.pipe() 方法的底层机制中用到。大多数情况下,没有必要直接使用该方法。

const readable = new stream.Readable

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false
readable.pause()#

查看英文版 / 参与翻译

  • 返回: this

readable.pause() 方法将会使 flowing 模式的流停止触发 'data' 事件, 进而切出 flowing 模式。任何可用的数据都将保存在内部缓存中。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});
readable.pipe(destination[, options])#

查看英文版 / 参与翻译

readable.pipe() 绑定一个 Writablereadable 上, 将可写流自动切换到 flowing 模式并将所有数据传给绑定的 Writable。数据流将被自动管理。这样,即使是可读流较快,目标可写流也不会超负荷(overwhelmed)。

下面例子将 readable 中的所有数据通过管道传递给名为 file.txt 的文件:

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// readable 中的所有数据都传给了 'file.txt'
readable.pipe(writable);

可以在单个可读流上绑定多个可写流。

readable.pipe() 方法返回 目标流 的引用,这样就可以对流进行链式地管道操作:

const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

默认情况下,当源可读流(the source Readable stream)触发 'end' 事件时,目标流也会调用 stream.end() 方法从而结束写入。要禁用这一默认行为, end 选项应该指定为 false, 这将使目标流保持打开, 如下面例子所示:

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
});

这里有一点要警惕,如果可读流在处理时发生错误,目标可写流 不会 自动关闭。 如果发生错误,需要 手动 关闭所有流以避免内存泄漏。

注意:不管对 process.stderrprocess.stdout 指定什么选项,它们都是直到 Node.js 进程退出才关闭。

readable.read([size])#

查看英文版 / 参与翻译

The readable.read() method pulls some data out of the internal buffer and returns it. If no data available to be read, null is returned. By default, the data will be returned as a Buffer object unless an encoding has been specified using the readable.setEncoding() method or the stream is operating in object mode.

The optional size argument specifies a specific number of bytes to read. If size bytes are not available to be read, null will be returned unless the stream has ended, in which case all of the data remaining in the internal buffer will be returned (even if it exceeds size bytes).

If the size argument is not specified, all of the data contained in the internal buffer will be returned.

The readable.read() method should only be called on Readable streams operating in paused mode. In flowing mode, readable.read() is called automatically until the internal buffer is fully drained.

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
});

In general, it is recommended that developers avoid the use of the 'readable' event and the readable.read() method in favor of using either readable.pipe() or the 'data' event.

A Readable stream in object mode will always return a single item from a call to readable.read(size), regardless of the value of the size argument.

Note: If the readable.read() method returns a chunk of data, a 'data' event will also be emitted.

Note: Calling stream.read([size]) after the 'end' event has been emitted will return null. No runtime error will be raised.

readable.resume()#

查看英文版 / 参与翻译

  • Returns: this

The readable.resume() method causes an explicitly paused Readable stream to resume emitting 'data' events, switching the stream into flowing mode.

The readable.resume() method can be used to fully consume the data from a stream without actually processing any of that data as illustrated in the following example:

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  });
readable.setEncoding(encoding)#

查看英文版 / 参与翻译

  • encoding <String> 要使用的编码
  • Returns: this

readble.setEncoding() 方法会为从可读流读入的数据设置默认字符编码

设置编码会使得该流数据返回指定编码的字符串而不是Buffer对象。例如,调用readable.setEncoding('utf-8')会使得输出数据作为UTF-8数据解析,并作为字符串返回。调用readable.setEncoding('hex')使得数据被编码成16进制字符串格式。

可读流会妥善处理多字节字符,如果仅仅直接从流中取出Buffer对象,很可能会导致错误解码。

调用readable.setEncoding(null)可以禁止编码。该方法在处理二进制数据或大字节字符串分割为许多块时非常有用。

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});
readable.unpipe([destination])#

查看英文版 / 参与翻译

The readable.unpipe() method detaches a Writable stream previously attached using the stream.pipe() method.

If the destination is not specified, then all pipes are detached.

If the destination is specified, but no pipe is set up for it, then the method does nothing.

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt');
  readable.unpipe(writable);
  console.log('Manually close the file stream');
  writable.end();
}, 1000);
readable.unshift(chunk)#

查看英文版 / 参与翻译

The readable.unshift() method pushes a chunk of data back into the internal buffer. This is useful in certain situations where a stream is being consumed by code that needs to "un-consume" some amount of data that it has optimistically pulled out of the source, so that the data can be passed on to some other party.

Note: The stream.unshift(chunk) method cannot be called after the 'end' event has been emitted or a runtime error will be thrown.

Developers using stream.unshift() often should consider switching to use of a Transform stream instead. See the API for Stream Implementers section for more information.

// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
const StringDecoder = require('string_decoder').StringDecoder;
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  var header = '';
  function onReadable() {
    var chunk;
    while (null !== (chunk = stream.read())) {
      var str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // found the header boundary
        var split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // remove the readable listener before unshifting
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // now the body of the message can be read from the stream.
        callback(null, header, stream);
      } else {
        // still reading the header.
        header += str;
      }
    }
  }
}

Note: Unlike stream.push(chunk), stream.unshift(chunk) will not end the reading process by resetting the internal reading state of the stream. This can cause unexpected results if readable.unshift() is called during a read (i.e. from within a stream._read() implementation on a custom stream). Following the call to readable.unshift() with an immediate stream.push('') will reset the reading state appropriately, however it is best to simply avoid calling readable.unshift() while in the process of performing a read.

readable.wrap(stream)#

查看英文版 / 参与翻译

  • stream <Stream> An "old style" readable stream

Versions of Node.js prior to v0.10 had streams that did not implement the entire stream module API as it is currently defined. (See Compatibility for more information.)

When using an older Node.js library that emits 'data' events and has a stream.pause() method that is advisory only, the readable.wrap() method can be used to create a Readable stream that uses the old stream as its data source.

It will rarely be necessary to use readable.wrap() but the method has been provided as a convenience for interacting with older Node.js applications and libraries.

For example:

const OldReader = require('./old-api-module.js').OldReader;
const Readable = require('stream').Readable;
const oreader = new OldReader;
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
});

Duplex 流与 Transform 流#

stream.Duplex 类#

查看英文版 / 参与翻译

Duplex 流是同时实现了 ReadableWritable 接口的流。

Duplex 流的实例包括了:

stream.Transform 类#

查看英文版 / 参与翻译

变换流(Transform streams) 是一种 Duplex 流。它的输出与输入是通过某种方式关联的。和所有 Duplex 流一样,变换流同时实现了 ReadableWritable 接口。

变换流的实例包括:

API for Stream Implementers#

查看英文版 / 参与翻译

The stream module API has been designed to make it possible to easily implement streams using JavaScript's prototypal inheritance model.

First, a stream developer would declare a new JavaScript class that extends one of the four basic stream classes (stream.Writable, stream.Readable, stream.Duplex, or stream.Transform), making sure the call the appropriate parent class constructor:

const Writable = require('stream').Writable;

class MyWritable extends Writable {
  constructor(options) {
    super(options);
  }
}

The new stream class must then implement one or more specific methods, depending on the type of stream being created, as detailed in the chart below:

Use-case

Class

Method(s) to implement

Reading only

Readable

_read

Writing only

Writable

_write, _writev

Reading and writing

Duplex

_read, _write, _writev

Operate on written data, then read the result

Transform

_transform, _flush

Note: The implementation code for a stream should never call the "public" methods of a stream that are intended for use by consumers (as described in the API for Stream Consumers section). Doing so may lead to adverse side effects in application code consuming the stream.

Simplified Construction#

查看英文版 / 参与翻译

For many simple cases, it is possible to construct a stream without relying on inheritance. This can be accomplished by directly creating instances of the stream.Writable, stream.Readable, stream.Duplex or stream.Transform objects and passing appropriate methods as constructor options.

For example:

const Writable = require('stream').Writable;

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  }
});

Implementing a Writable Stream#

查看英文版 / 参与翻译

The stream.Writable class is extended to implement a Writable stream.

Custom Writable streams must call the new stream.Writable([options]) constructor and implement the writable._write() method. The writable._writev() method may also be implemented.

Constructor: new stream.Writable([options])#

查看英文版 / 参与翻译

For example:

const Writable = require('stream').Writable;

class MyWritable extends Writable {
  constructor(options) {
    // Calls the stream.Writable() constructor
    super(options);
  }
}

Or, when using pre-ES6 style constructors:

const Writable = require('stream').Writable;
const util = require('util');

function MyWritable(options) {
  if (!(this instanceof MyWritable))
    return new MyWritable(options);
  Writable.call(this, options);
}
util.inherits(MyWritable, Writable);

Or, using the Simplified Constructor approach:

const Writable = require('stream').Writable;

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  }
});

writable._write(chunk, encoding, callback)#

查看英文版 / 参与翻译

  • chunk <Buffer> | <String> The chunk to be written. Will always be a buffer unless the decodeStrings option was set to false.
  • encoding <String> If the chunk is a string, then encoding is the character encoding of that string. If chunk is a Buffer, or if the stream is operating in object mode, encoding may be ignored.
  • callback <Function> Call this function (optionally with an error argument) when processing is complete for the supplied chunk.

All Writable stream implementations must provide a writable._write() method to send data to the underlying resource.

Note: Transform streams provide their own implementation of the writable._write().

Note: This function MUST NOT be called by application code directly. It should be implemented by child classes, and called only by the internal Writable class methods only.

The callback method must be called to signal either that the write completed successfully or failed with an error. The first argument passed to the callback must be the Error object if the call failed or null if the write succeeded.

It is important to note that all calls to writable.write() that occur between the time writable._write() is called and the callback is called will cause the written data to be buffered. Once the callback is invoked, the stream will emit a 'drain' event. If a stream implementation is capable of processing multiple chunks of data at once, the writable._writev() method should be implemented.

If the decodeStrings property is set in the constructor options, then chunk may be a string rather than a Buffer, and encoding will indicate the character encoding of the string. This is to support implementations that have an optimized handling for certain string data encodings. If the decodeStrings property is explicitly set to false, the encoding argument can be safely ignored, and chunk will remain the same object that is passed to .write().

The writable._write() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.

writable._writev(chunks, callback)#

查看英文版 / 参与翻译

  • chunks <Array> The chunks to be written. Each chunk has following format: { chunk: ..., encoding: ... }.
  • callback <Function> A callback function (optionally with an error argument) to be invoked when processing is complete for the supplied chunks.

Note: This function MUST NOT be called by application code directly. It should be implemented by child classes, and called only by the internal Writable class methods only.

The writable._writev() method may be implemented in addition to writable._write() in stream implementations that are capable of processing multiple chunks of data at once. If implemented, the method will be called with all chunks of data currently buffered in the write queue.

The writable._writev() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.

Errors While Writing#

查看英文版 / 参与翻译

It is recommended that errors occurring during the processing of the writable._write() and writable._writev() methods are reported by invoking the callback and passing the error as the first argument. This will cause an 'error' event to be emitted by the Writable. Throwing an Error from within writable._write() can result in unexpected and inconsistent behavior depending on how the stream is being used. Using the callback ensures consistent and predictable handling of errors.

const Writable = require('stream').Writable;

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
});

一个可写流的例子#

查看英文版 / 参与翻译

下面说明了一个相当简单(有点无意义)的可写流实现。虽然这个具体的可写流实例没有任何真正的特殊用途,但该示例说明了一个自定义流实例所需要的元素:

const Writable = require('stream').Writable;

class MyWritable extends Writable {
  constructor(options) {
    super(options);
  }

  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
}

Implementing a Readable Stream#

查看英文版 / 参与翻译

The stream.Readable class is extended to implement a Readable stream.

Custom Readable streams must call the new stream.Readable([options]) constructor and implement the readable._read() method.

new stream.Readable([options])#

查看英文版 / 参与翻译

  • options <Object>
    • highWaterMark <Number> The maximum number of bytes to store in the internal buffer before ceasing to read from the underlying resource. Defaults to 16384 (16kb), or 16 for objectMode streams
    • encoding <String> If specified, then buffers will be decoded to strings using the specified encoding. Defaults to null
    • objectMode <Boolean> Whether this stream should behave as a stream of objects. Meaning that stream.read(n) returns a single value instead of a Buffer of size n. Defaults to false
    • read <Function> Implementation for the stream._read() method.

For example:

const Readable = require('stream').Readable;

class MyReadable extends Readable {
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
  }
}

Or, when using pre-ES6 style constructors:

const Readable = require('stream').Readable;
const util = require('util');

function MyReadable(options) {
  if (!(this instanceof MyReadable))
    return new MyReadable(options);
  Readable.call(this, options);
}
util.inherits(MyReadable, Readable);

Or, using the Simplified Constructor approach:

const Readable = require('stream').Readable;

const myReadable = new Readable({
  read(size) {
    // ...
  }
});

readable._read(size)#

查看英文版 / 参与翻译

  • size <Number> Number of bytes to read asynchronously

Note: This function MUST NOT be called by application code directly. It should be implemented by child classes, and called only by the internal Readable class methods only.

All Readable stream implementations must provide an implementation of the readable._read() method to fetch data from the underlying resource.

When readable._read() is called, if data is available from the resource, the implementation should begin pushing that data into the read queue using the this.push(dataChunk) method. _read() should continue reading from the resource and pushing data until readable.push() returns false. Only when _read() is called again after it has stopped should it resume pushing additional data onto the queue.

Note: Once the readable._read() method has been called, it will not be called again until the readable.push() method is called.

The size argument is advisory. For implementations where a "read" is a single operation that returns data can use the size argument to determine how much data to fetch. Other implementations may ignore this argument and simply provide data whenever it becomes available. There is no need to "wait" until size bytes are available before calling stream.push(chunk).

The readable._read() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.

readable.push(chunk[, encoding])#

查看英文版 / 参与翻译

  • chunk <Buffer> | <Null> | <String> Chunk of data to push into the read queue
  • encoding <String> Encoding of String chunks. Must be a valid Buffer encoding, such as 'utf8' or 'ascii'
  • Returns <Boolean> true if additional chunks of data may continued to be pushed; false otherwise.

When chunk is a Buffer or string, the chunk of data will be added to the internal queue for users of the stream to consume. Passing chunk as null signals the end of the stream (EOF), after which no more data can be written.

When the Readable is operating in paused mode, the data added with readable.push() can be read out by calling the readable.read() method when the 'readable' event is emitted.

When the Readable is operating in flowing mode, the data added with readable.push() will be delivered by emitting a 'data' event.

The readable.push() method is designed to be as flexible as possible. For example, when wrapping a lower-level source that provides some form of pause/resume mechanism, and a data callback, the low-level source can be wrapped by the custom Readable instance as illustrated in the following example:

// source is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowlevelSourceObject();

    // Every time there's data, push it into the internal buffer.
    this._source.ondata = (chunk) => {
      // if push() returns false, then stop reading from source
      if (!this.push(chunk))
        this._source.readStop();
    };

    // When the source ends, push the EOF-signaling `null` chunk
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read will be called when the stream wants to pull more data in
  // the advisory size argument is ignored in this case.
  _read(size) {
    this._source.readStart();
  }
}

Note: The readable.push() method is intended be called only by Readable Implementers, and only from within the readable._read() method.

Errors While Reading#

查看英文版 / 参与翻译

It is recommended that errors occurring during the processing of the readable._read() method are emitted using the 'error' event rather than being thrown. Throwing an Error from within readable._read() can result in unexpected and inconsistent behavior depending on whether the stream is operating in flowing or paused mode. Using the 'error' event ensures consistent and predictable handling of errors.

const Readable = require('stream').Readable;

const myReadable = new Readable({
  read(size) {
    if (checkSomeErrorCondition()) {
      process.nextTick(() => this.emit('error', err));
      return;
    }
    // do some work
  }
});

一个数流的例子#

查看英文版 / 参与翻译

以下是可读流的一个基本例子,触发数字1到1,000,000升序,然后结束

const Readable = require('stream').Readable;

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    var i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      var str = '' + i;
      var buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
}

Implementing a Duplex Stream#

查看英文版 / 参与翻译

A Duplex stream is one that implements both Readable and Writable, such as a TCP socket connection.

Because JavaScript does not have support for multiple inheritance, the stream.Duplex class is extended to implement a Duplex stream (as opposed to extending the stream.Readable and stream.Writable classes).

Note: The stream.Duplex class prototypically inherits from stream.Readable and parasitically from stream.Writable, but instanceof will work properly for both base classes due to overriding Symbol.hasInstance on stream.Writable.

Custom Duplex streams must call the new stream.Duplex([options]) constructor and implement both the readable._read() and writable._write() methods.

new stream.Duplex(options)#

查看英文版 / 参与翻译

  • options <Object> Passed to both Writable and Readable constructors. Also has the following fields:
    • allowHalfOpen <Boolean> Defaults to true. If set to false, then the stream will automatically end the readable side when the writable side ends and vice versa.
    • readableObjectMode <Boolean> Defaults to false. Sets objectMode for readable side of the stream. Has no effect if objectMode is true.
    • writableObjectMode <Boolean> Defaults to false. Sets objectMode for writable side of the stream. Has no effect if objectMode is true.

For example:

const Duplex = require('stream').Duplex;

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
  }
}

Or, when using pre-ES6 style constructors:

const Duplex = require('stream').Duplex;
const util = require('util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);

Or, using the Simplified Constructor approach:

const Duplex = require('stream').Duplex;

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  }
});

An Example Duplex Stream#

查看英文版 / 参与翻译

The following illustrates a simple example of a Duplex stream that wraps a hypothetical lower-level source object to which data can be written, and from which data can be read, albeit using an API that is not compatible with Node.js streams. The following illustrates a simple example of a Duplex stream that buffers incoming written data via the Writable interface that is read back out via the Readable interface.

const Duplex = require('stream').Duplex;
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // The underlying source only deals with strings
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
}

The most important aspect of a Duplex stream is that the Readable and Writable sides operate independently of one another despite co-existing within a single object instance.

Object Mode Duplex Streams#

查看英文版 / 参与翻译

For Duplex streams, objectMode can be set exclusively for either the Readable or Writable side using the readableObjectMode and writableObjectMode options respectively.

In the following example, for instance, a new Transform stream (which is a type of Duplex stream) is created that has an object mode Writable side that accepts JavaScript numbers that are converted to hexadecimal strings on the Readable side.

const Transform = require('stream').Transform;

// All Transform streams are also Duplex Streams
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Coerce the chunk to a number if necessary
    chunk |= 0;

    // Transform the chunk into something else.
    const data = chunk.toString(16);

    // Push the data onto the readable queue.
    callback(null, '0'.repeat(data.length % 2) + data);
  }
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64

Implementing a Transform Stream#

查看英文版 / 参与翻译

A Transform stream is a Duplex stream where the output is computed in some way from the input. Examples include zlib streams or crypto streams that compress, encrypt, or decrypt data.

Note: There is no requirement that the output be the same size as the input, the same number of chunks, or arrive at the same time. For example, a Hash stream will only ever have a single chunk of output which is provided when the input is ended. A zlib stream will produce output that is either much smaller or much larger than its input.

The stream.Transform class is extended to implement a Transform stream.

The stream.Transform class prototypically inherits from stream.Duplex and implements its own versions of the writable._write() and readable._read() methods. Custom Transform implementations must implement the transform._transform() method and may also implement the transform._flush() method.

Note: Care must be taken when using Transform streams in that data written to the stream can cause the Writable side of the stream to become paused if the output on the Readable side is not consumed.

new stream.Transform([options])#

查看英文版 / 参与翻译

For example:

const Transform = require('stream').Transform;

class MyTransform extends Transform {
  constructor(options) {
    super(options);
  }
}

Or, when using pre-ES6 style constructors:

const Transform = require('stream').Transform;
const util = require('util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform);

Or, using the Simplified Constructor approach:

const Transform = require('stream').Transform;

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  }
});

Events: 'finish' and 'end'#

查看英文版 / 参与翻译

The 'finish' and 'end' events are from the stream.Writable and stream.Readable classes, respectively. The 'finish' event is emitted after stream.end() is called and all chunks have been processed by stream._transform(). The 'end' event is emitted after all data has been output, which occurs after the callback in transform._flush() has been called.

transform._flush(callback)#

查看英文版 / 参与翻译

  • callback <Function> A callback function (optionally with an error argument) to be called when remaining data has been flushed.

Note: This function MUST NOT be called by application code directly. It should be implemented by child classes, and called only by the internal Readable class methods only.

In some cases, a transform operation may need to emit an additional bit of data at the end of the stream. For example, a zlib compression stream will store an amount of internal state used to optimally compress the output. When the stream ends, however, that additional data needs to be flushed so that the compressed data will be complete.

Custom Transform implementations may implement the transform._flush() method. This will be called when there is no more written data to be consumed, but before the 'end' event is emitted signaling the end of the Readable stream.

Within the transform._flush() implementation, the readable.push() method may be called zero or more times, as appropriate. The callback function must be called when the flush operation is complete.

The transform._flush() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.

transform._transform(chunk, encoding, callback)#

查看英文版 / 参与翻译

  • chunk <Buffer> | <String> The chunk to be transformed. Will always be a buffer unless the decodeStrings option was set to false.
  • encoding <String> If the chunk is a string, then this is the encoding type. If chunk is a buffer, then this is the special value - 'buffer', ignore it in this case.
  • callback <Function> A callback function (optionally with an error argument and data) to be called after the supplied chunk has been processed.

Note: This function MUST NOT be called by application code directly. It should be implemented by child classes, and called only by the internal Readable class methods only.

All Transform stream implementations must provide a _transform() method to accept input and produce output. The transform._transform() implementation handles the bytes being written, computes an output, then passes that output off to the readable portion using the readable.push() method.

The transform.push() method may be called zero or more times to generate output from a single input chunk, depending on how much is to be output as a result of the chunk.

It is possible that no output is generated from any given chunk of input data.

The callback function must be called only when the current chunk is completely consumed. The first argument passed to the callback must be an Error object if an error occurred while processing the input or null otherwise. If a second argument is passed to the callback, it will be forwarded on to the readable.push() method. In other words the following are equivalent:

transform.prototype._transform = function (data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function (data, encoding, callback) {
  callback(null, data);
};

The transform._transform() method is prefixed with an underscore because it is internal to the class that defines it, and should never be called directly by user programs.

Class: stream.PassThrough#

查看英文版 / 参与翻译

The stream.PassThrough class is a trivial implementation of a Transform stream that simply passes the input bytes across to the output. Its purpose is primarily for examples and testing, but there are some use cases where stream.PassThrough is useful as a building block for novel sorts of streams.

Additional Notes#

Compatibility with Older Node.js Versions#

查看英文版 / 参与翻译

In versions of Node.js prior to v0.10, the Readable stream interface was simpler, but also less powerful and less useful.

  • Rather than waiting for calls the stream.read() method, 'data' events would begin emitting immediately. Applications that would need to perform some amount of work to decide how to handle data were required to store read data into buffers so the data would not be lost.
  • The stream.pause() method was advisory, rather than guaranteed. This meant that it was still necessary to be prepared to receive 'data' events even when the stream was in a paused state.

In Node.js v0.10, the Readable class was added. For backwards compatibility with older Node.js programs, Readable streams switch into "flowing mode" when a 'data' event handler is added, or when the stream.resume() method is called. The effect is that, even when not using the new stream.read() method and 'readable' event, it is no longer necessary to worry about losing 'data' chunks.

While most applications will continue to function normally, this introduces an edge case in the following conditions:

  • No 'data' event listener is added.
  • The stream.resume() method is never called.
  • The stream is not piped to any writable destination.

For example, consider the following code:

// WARNING!  BROKEN!
net.createServer((socket) => {

  // we add an 'end' method, but never consume the data
  socket.on('end', () => {
    // It will never get here.
    socket.end('The message was received but was not processed.\n');
  });

}).listen(1337);

In versions of Node.js prior to v0.10, the incoming message data would be simply discarded. However, in Node.js v0.10 and beyond, the socket remains paused forever.

The workaround in this situation is to call the stream.resume() method to begin the flow of data:

// Workaround
net.createServer((socket) => {

  socket.on('end', () => {
    socket.end('The message was received but was not processed.\n');
  });

  // start the flow of data, discarding it.
  socket.resume();

}).listen(1337);

In addition to new Readable streams switching into flowing mode, pre-v0.10 style streams can be wrapped in a Readable class using the readable.wrap() method.

readable.read(0)#

查看英文版 / 参与翻译

There are some cases where it is necessary to trigger a refresh of the underlying readable stream mechanisms, without actually consuming any data. In such cases, it is possible to call readable.read(0), which will always return null.

If the internal read buffer is below the highWaterMark, and the stream is not currently reading, then calling stream.read(0) will trigger a low-level stream._read() call.

While most applications will almost never need to do this, there are situations within Node.js where this is done, particularly in the Readable stream class internals.

readable.push('')#

查看英文版 / 参与翻译

Use of readable.push('') is not recommended.

Pushing a zero-byte string or Buffer to a stream that is not in object mode has an interesting side effect. Because it is a call to readable.push(), the call will end the reading process. However, because the argument is an empty string, no data is added to the readable buffer so there is nothing for a user to consume.