Node.js v10.4.1 文档


目录

stream (流)#

查看英文版参与翻译

稳定性: 2 - 稳定的

流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。

Node.js 提供了多种流对象。 例如, HTTP 请求process.stdout 就都是流的实例。

流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。

stream 模块可以通过以下方式引入:

const stream = require('stream');

尽管所有的 Node.js 用户都应该理解流的工作方式,这点很重要, 但是 stream 模块本身只对于那些需要创建新的流的实例的开发者最有用处。 对于主要是 消费 流的开发者来说,他们很少(如果有的话)需要直接使用 stream 模块。

本文档的组织#

查看英文版参与翻译

本文档主要分为两节,第三节是一些额外的注意事项。第一节阐述了在应用中和 使用 流相关的 API 。 第二节阐述了和 实现 新的流类型相关的 API 。

流的类型#

查看英文版参与翻译

Node.js 中有四种基本的流类型:

对象模式#

查看英文版参与翻译

所有使用 Node.js API 创建的流对象都只能操作 strings 和 Buffer(或 Uint8Array) 对象。但是,通过一些第三方流的实现,你依然能够处理其它类型的 JavaScript 值 (除了 null,它在流处理中有特殊意义)。 这些流被认为是工作在 “对象模式”(object mode)。

在创建流的实例时,可以通过 objectMode 选项使流的实例切换到对象模式。试图将已经存在的流切换到对象模式是不安全的。

缓冲#

查看英文版参与翻译

WritableReadable 流都会将数据存储到内部的缓冲器(buffer)中。这些缓冲器可以 通过相应的 writable._writableState.getBuffer()readable._readableState.buffer 来获取。

缓冲器的大小取决于传递给流构造函数的 highWaterMark 选项。 对于普通的流, highWaterMark 选项指定了总共的字节数。对于工作在对象模式的流, highWaterMark 指定了对象的总数。

当可读流的实现调用 stream.push(chunk) 方法时,数据被放到缓冲器中。如果流的消费者 没有调用 stream.read() 方法, 这些数据会始终存在于内部队列中,直到被消费。

当内部可读缓冲器的大小达到 highWaterMark 指定的阈值时,流会暂停从底层资源读取数据,直到当前 缓冲器的数据被消费 (也就是说, 流会在内部停止调用 readable._read() 来填充可读缓冲器)。

可写流通过反复调用 writable.write(chunk) 方法将数据放到缓冲器。 当内部可写缓冲器的总大小小于 highWaterMark 指定的阈值时, 调用 writable.write() 将返回true。 一旦内部缓冲器的大小达到或超过 highWaterMark ,调用 writable.write() 将返回 false

stream API 的关键目标, 尤其对于 stream.pipe() 方法, 就是限制缓冲器数据大小,以达到可接受的程度。这样,对于读写速度不匹配的源头和目标,就不会超出可用的内存大小。

DuplexTransform 都是可读写的。 在内部,它们都维护了 两个 相互独立的缓冲器用于读和写。 在维持了合理高效的数据流的同时,也使得对于读和写可以独立进行而互不影响。 例如, net.Socket 就是 Duplex 的实例,它的可读端可以消费从套接字(socket)中接收的数据, 可写端则可以将数据写入到套接字。 由于数据写入到套接字中的速度可能比从套接字接收数据的速度快或者慢, 在读写两端使用独立缓冲器,并进行独立操作就显得很重要了。

流消费者的 API#

查看英文版参与翻译

几乎所有的 Node.js 应用,不管多么简单,都在某种程度上使用了流。 下面是在 Node.js 应用中使用流实现的一个简单的 HTTP 服务器:

const http = require('http');

const server = http.createServer((req, res) => {
  // req 是 http.IncomingMessage 的实例,这是一个 Readable Stream
  // res 是 http.ServerResponse 的实例,这是一个 Writable Stream

  let body = '';
  // 接收数据为 utf8 字符串,
  // 如果没有设置字符编码,将接收到 Buffer 对象。
  req.setEncoding('utf8');

  // 如果监听了 'data' 事件,Readable streams 触发 'data' 事件 
  req.on('data', (chunk) => {
    body += chunk;
  });

  // end 事件表明整个 body 都接收完毕了 
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // 发送一些信息给用户
      res.write(typeof data);
      res.end();
    } catch (er) {
      // json 数据解析失败 
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token o in JSON at position 1

Writable 流 (比如例子中的 res) 暴露了一些方法,比如 write()end() 。这些方法可以将数据写入到流中。

当流中的数据可以读取时,Readable 流使用 EventEmitter API 来通知应用。 这些数据可以使用多种方法从流中读取。

WritableReadable 流都使用了 EventEmitter API ,通过多种方式, 与流的当前状态进行交互。

DuplexTransform 都是同时满足 WritableReadable

对于只是简单写入数据到流和从流中消费数据的应用来说, 不要求直接实现流接口,通常也不需要调用 require('stream')

需要实现两种类型流的开发者可以参考 API for Stream Implementers

可写流#

查看英文版参与翻译

可写流是对数据写入'目的地'的一种抽象。

Writable 的例子包括了:

注意: 上面的某些例子事实上是 Duplex 流,只是实现了 Writable 接口。

所有 Writable 流都实现了 stream.Writable 类定义的接口。

尽管特定的 Writable 流的实现可能略有差别, 所有的 Writable streams 都可以按一种基本模式进行使用,如下面例子所示:

const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');

stream.Writable 类#

查看英文版参与翻译

'close' 事件#

查看英文版参与翻译

'close' 事件将在流或其底层资源(比如一个文件)关闭后触发。'close' 事件触发后,该流将不会再触发任何事件。

不是所有可写流都会触发 'close' 事件。

'drain' 事件#

查看英文版参与翻译

如果调用 stream.write(chunk) 方法返回 false'drain' 事件会在适合恢复写入数据到流的时候触发。

// 向可写流中写入数据一百万次。
// 需要注意背压 (back-pressure)。
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // 最后 一次
        writer.write(data, encoding, callback);
      } else {
        // 检查是否可以继续写入。 
        // 这里不要传递 callback, 因为写入还没有结束! 
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // 不得不提前停下!
      // 当 'drain' 事件触发后继续写入  
      writer.once('drain', write);
    }
  }
}
'error' 事件#

查看英文版参与翻译

'error' 事件在写入数据出错或者使用管道出错时触发。事件发生时,回调函数仅会接收到一个 Error 参数。

注意: 'error' 事件发生时,流并不会关闭。

'finish' 事件#

查看英文版参与翻译

在调用了 stream.end() 方法,且缓冲区数据都已经传给底层系统(underlying system)之后, 'finish' 事件将被触发。

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.end('This is the end\n');
writer.on('finish', () => {
  console.error('All writes are now complete.');
});
'pipe' 事件#

查看英文版参与翻译

  • src <stream.Readable> 输出到目标可写流(writable)的源流(source stream)

在可读流(readable stream)上调用 stream.pipe() 方法,并在目标流向 (destinations) 中添加当前可写流 ( writable ) 时,将会在可写流上触发 'pipe' 事件。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.error('something is piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);
'unpipe' 事件#

查看英文版参与翻译

Readable 上调用 stream.unpipe() 方法,从目标流向中移除当前 Writable 时,将会触发 'unpipe' 事件。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.error('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()#

查看英文版参与翻译

调用 writable.cork() 方法将强制所有写入数据都存放到内存中的缓冲区里。 直到调用 stream.uncork()stream.end() 方法时,缓冲区里的数据才会被输出。

在向流中写入大量小块数据(small chunks of data)时,内部缓冲区(internal buffer)可能失效,从而导致性能下降。writable.cork() 方法主要就是用来避免这种情况。 对于这种情况, 实现了 writable._writev() 方法的流可以对写入的数据进行缓冲,从而提高写入效率。

也可查看 writable.uncork()

writable.destroy([error])#

查看英文版参与翻译

  • 返回: this

摧毁这个流,并发出传过来的错误。当这个函数被调用后,这个写入流就结束了。 使用者不应该重写这个函数,而是重写 writable._destroy

writable.end([chunk][, encoding][, callback])#

查看英文版参与翻译

  • chunk <string> | <Buffer> | <Uint8Array> | <any> 可选的,需要写入的数据。对于非对象模式下的流, chunk 必须是字符串、或 Buffer、或 Uint8Array。对于对象模式下的流, chunk 可以是任意的 JavaScript 值,除了 null
  • encoding <string> 如果 chunk 是字符串,这里指定字符编码。
  • callback <Function> 可选的,流结束时的回调函数。

调用 writable.end() 方法表明接下来没有数据要被写入 Writable。通过传入可选的 chunkencoding 参数,可以在关闭流之前再写入一段数据。如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数。

在调用了 stream.end() 方法之后,再调用 stream.write() 方法将会导致错误。

// 写入 'hello, ' ,并用 'world!' 来结束写入
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// 后面不允许再写入数据!
writable.setDefaultEncoding(encoding)#

查看英文版参与翻译

  • encoding <string> 新的默认编码
  • 返回: this

writable.setDefaultEncoding() 用于为 Writable 设置 encoding

writable.uncork()#

查看英文版参与翻译

writable.uncork() 将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据。

如果使用 writable.cork()writable.uncork() 来管理写入缓存,建议使用 process.nextTick() 来延迟调用 writable.uncork() 方法。通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 方法进行批处理。

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());

如果一个流多次调用了 writable.cork() 方法,那么也必须调用同样次数的 writable.uncork() 方法以输出缓冲区数据。

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();
  // 之前的数据只有在 uncork() 被二次调用后才会输出
  stream.uncork();
});

也可查看 writable.cork()

writable.writableHighWaterMark#

查看英文版参与翻译

返回构造该可写流时传入的 highWaterMark 参数值。

writable.writableLength#

查看英文版参与翻译

这个属性包含了写入就绪队列的字节(或者对象)数,这个值提供了关于highWaterMark状 态的内省数据。

writable.write(chunk[, encoding][, callback])#

查看英文版参与翻译

  • chunk <string> | <Buffer> | <Uint8Array> | <any> 要写入的数据。可选的。 对于非对象模式下的流, chunk 必须是字符串, Buffer 或者 Uint8Array。对于对象模式下的流,chunk 可以是除 null 外的任意 JavaScript 值。
  • encoding <string> 如果 chunk 是字符串,这里指定字符编码
  • callback <Function> 缓冲数据输出时的回调函数
  • 返回: <boolean> 如果流需要等待 'drain' 事件触发才能继续写入数据,这里将返回 false ; 否则返回 true

writable.write() 方法向流中写入数据,并在数据处理完成后调用 callback 。如果有错误发生, callback 不一定 以这个错误作为第一个参数并被调用。要确保可靠地检测到写入错误,应该监听 'error' 事件。

在确认了 chunk 后,如果内部缓冲区的大小小于创建流时设定的 highWaterMark 阈值,函数将返回 true 。 如果返回值为 false ,应该停止向流中写入数据,直到 'drain' 事件被触发。

当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发。 我们建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块。 然而,当流不处在 'drain' 状态时, 调用 write() 是被允许的, Node.js 会缓存所有已经写入的数据块, 直到达到最大内存占用, 这时它会无条件中止。 甚至在它中止之前, 高内存占用将会导致差的垃圾回收器的性能和高的系统相对敏感性 (即使内存不再需要,也通常不会被释放回系统)。 如果远程的另一端没有读取数据, TCP sockets 可能永远也不会 drain , 所以写入到一个不会drain的socket可能会导致远程可利用的漏洞。

对于一个 Transform, 写入数据到一个不会drain的流尤其成问题, 因为 Transform 流默认被暂停, 直到它们被pipe或者被添加了 'data''readable' 事件处理函数。

如果将要被写入的数据可以根据需要生成或者取得,我们建议将逻辑封装为一个 Readable 流并且使用 stream.pipe()。 但是如果调用 write() 优先, 那么可以使用 'drain' 事件来防止回压并且避免内存问题:

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// 在回调函数被执行后再进行其他的写入
write('hello', () => {
  console.log('write completed, do more writes now');
});

对象模式的写入流将忽略 encoding 参数。

可读流#

查看英文版参与翻译

可读流(Readable streams)是对提供数据的 源头 (source)的抽象。

可读流的例子包括:

所有的 Readable 都实现了 stream.Readable 类定义的接口。

两种模式#

查看英文版参与翻译

可读流事实上工作在下面两种模式之一:flowing 和 paused 。

在 flowing 模式下, 可读流自动从系统底层读取数据,并通过 EventEmitter 接口的事件尽快将数据提供给应用。

在 paused 模式下,必须显式调用 stream.read() 方法来从流中读取数据片段。

所有初始工作模式为 paused 的 Readable 流,可以通过下面三种途径切换到 flowing 模式:

可读流可以通过下面途径切换到 paused 模式:

  • 如果不存在管道目标(pipe destination),可以通过调用 stream.pause() 方法实现。
  • 如果存在管道目标,可以通过取消 'data' 事件监听,并调用 stream.unpipe() 方法移除所有管道目标来实现。

这里需要记住的重要概念就是,可读流需要先为其提供消费或忽略数据的机制,才能开始提供数据。如果消费机制被禁用或取消,可读流将 尝试 停止生成数据。

注意: 为了向后兼容,取消 'data' 事件监听并 不会 自动将流暂停。同时,如果存在管道目标(pipe destination),且目标状态变为可以接收数据(drain and ask for more data),调用了 stream.pause() 方法也并不保证流会一直 保持 暂停状态。

注意: 如果 Readable 切换到 flowing 模式,且没有消费者处理流中的数据,这些数据将会丢失。 比如, 调用了 readable.resume() 方法却没有监听 'data' 事件,或是取消了 'data' 事件监听,就有可能出现这种情况。

三种状态#

查看英文版参与翻译

可读流的“两种操作模式”是一种简单抽象。它抽象了在可读流实现(Readable stream implementation)内部发生的复杂的状态管理过程。

在任意时刻,任意可读流应确切处于下面三种状态之一:

  • readable._readableState.flowing = null
  • readable._readableState.flowing = false
  • readable._readableState.flowing = true

readable._readableState.flowingnull,由于不存在数据消费者,可读流将不会产生数据。 在这个状态下,监听 'data' 事件,调用 readable.pipe() 方法,或者调用 readable.resume() 方法, readable._readableState.flowing 的值将会变为 true 。这时,随着数据生成,可读流开始频繁触发事件。

调用 readable.pause() 方法, readable.unpipe() 方法, 或者接收 “背压”(back pressure), 将导致 readable._readableState.flowing 值变为 false。 这将暂停事件流,但 不会 暂停数据生成。 在这种情况下,为 'data' 事件设置监听函数不会导致 readable._readableState.flowing 变为 true

const { PassThrough, Writable } = require('stream');
const pass = new PassThrough();
const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// flowing 现在为 false

pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // 不会触发 'data' 事件
pass.resume(); // 只有被调用了才会触发 'data' 事件

readable._readableState.flowing 值为 false 时, 数据可能堆积到流的内部缓存中。

选择一种#

查看英文版参与翻译

可读流 API 的演化贯穿了多个 Node.js 版本,提供了多种方法来消费流数据。通常开发者应该选择其中 一种 来消费数据,而 不应该 在单个流使用多种方法来消费数据。

对于大多数用户,建议使用 readable.pipe() 方法来消费流数据,因为它是最简单的一种实现。开发者如果要精细地控制数据传递和产生的过程,可以使用 EventEmitterreadable.pause()/readable.resume() 提供的 API 。

stream.Readable 类#

查看英文版参与翻译

'close' 事件#

查看英文版参与翻译

'close' 事件将在流或其底层资源(比如一个文件)关闭后触发。'close' 事件触发后,该流将不会再触发任何事件。

不是所有 Readable 都会触发 'close' 事件。

'data' 事件#

查看英文版参与翻译

  • chunk <Buffer> | <string> | <any> 数据片段。对于非对象模式的可读流,这是一个字符串或者 Buffer。 对于对象模式的可读流,这可以是除 null 以外的任意类型 JavaScript 值。

'data' 事件会在流将数据传递给消费者时触发。当流转换到 flowing 模式时会触发该事件。调用 readable.pipe()readable.resume() 方法,或为 'data' 事件添加回调可以将流转换到 flowing 模式。 'data' 事件也会在调用 readable.read() 方法并有数据返回时触发。

在没有明确暂停的流上添加 'data' 事件监听会将流转换为 flowing 模式。 数据会在可用时尽快传递给下个流程。

如果调用 readable.setEncoding() 方法明确为流指定了默认编码,回调函数将接收到一个字符串,否则接收到的数据将是一个 Buffer 实例。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
'end' 事件#

查看英文版参与翻译

'end' 事件将在流中再没有数据可供消费时触发。

注意'end' 事件只有在数据被完全消费后 才会触发 。 可以在数据被完全消费后,通过将流转换到 flowing 模式, 或反复调用 stream.read() 方法来实现这一点。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
});
'error' 事件#

查看英文版参与翻译

'error' 事件可以在任何时候在可读流实现(Readable implementation)上触发。 通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。

回调函数将接收到一个 Error 对象。

'readable' 事件#

查看英文版参与翻译

'readable' 事件将在流中有数据可供读取时触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中。

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // 有一些数据可读了
});

当到达流数据尾部时, 'readable' 事件也会触发。触发顺序在 'end' 事件之前。

事实上, 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。 例如,下面的例子中的 foo.txt 是一个空文件:

const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
  console.log('end');
});

上面脚本的输出如下:

$ node test.js
readable: null
end

注意: 通常情况下,readable.pipe() 方法和 'data' 事件机制比 'readable' 事件更容易理解。然而处理 'readable'事件可能造成吞吐量升高。

readable.destroy([error])#

查看英文版参与翻译

销毁流,并且触发error事件。然后,可读流将释放所有的内部资源。

开发者不应该覆盖这个方法,应该覆盖readable._destroy方法。

readable.isPaused()#

查看英文版参与翻译

readable.isPaused() 方法返回可读流的当前操作状态。 该方法主要是在 readable.pipe() 方法的底层机制中用到。大多数情况下,没有必要直接使用该方法。

const readable = new stream.Readable();

readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()#

查看英文版参与翻译

  • 返回: this

readable.pause() 方法将会使 flowing 模式的流停止触发 'data' 事件, 进而切出 flowing 模式。任何可用的数据都将保存在内部缓存中。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});
readable.pipe(destination[, options])#

查看英文版参与翻译

readable.pipe() 绑定一个 Writablereadable 上, 将可写流自动切换到 flowing 模式并将所有数据传给绑定的 Writable。数据流将被自动管理。这样,即使是可读流较快,目标可写流也不会超负荷(overwhelmed)。

下面例子将 readable 中的所有数据通过管道传递给名为 file.txt 的文件:

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// readable 中的所有数据都传给了 'file.txt'
readable.pipe(writable);

可以在单个可读流上绑定多个可写流。

readable.pipe() 方法返回 目标流 的引用,这样就可以对流进行链式地管道操作:

const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

默认情况下,当源可读流(the source Readable stream)触发 'end' 事件时,目标流也会调用 stream.end() 方法从而结束写入。要禁用这一默认行为, end 选项应该指定为 false, 这将使目标流保持打开, 如下面例子所示:

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
});

这里有一点要警惕,如果可读流在处理时发生错误,目标可写流 不会 自动关闭。 如果发生错误,需要 手动 关闭所有流以避免内存泄漏。

注意:不管对 process.stderrprocess.stdout 指定什么选项,它们都是直到 Node.js 进程退出才关闭。

readable.read([size])#

查看英文版参与翻译

readable.read()方法从内部缓冲区中抽出并返回一些数据。 如果没有可读的数据,返回null。readable.read()方法默认数据将作为“Buffer”对象返回 ,除非已经使用readable.setEncoding()方法设置编码或流运行在对象模式。

可选的size参数指定要读取的特定数量的字节。如果size字节不可读,将返回null除非流已经结束,在这种情况下所有保留在内部缓冲区的数据将被返回。

如果没有指定size参数,则内部缓冲区包含的所有数据将返回。

readable.read()方法只应该在暂停模式下的可读流上运行。在流模式下,readable.read()自动调用直到内部缓冲区的数据完全耗尽。

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
});

一般来说,建议开发人员避免使用'readable'事件和readable.read()方法,使用readable.pipe()'data'事件代替。

无论size参数的值是什么,对象模式中的可读流将始终返回调用readable.read(size)的单个项目。

注意:如果readable.read()方法返回一个数据块,那么一个'data'事件也将被发送。

注意:在已经被发出的'end'事件后调用stream.read([size])事件将返回null。不会抛出运行时错误。

readable.readableHighWaterMark#

查看英文版参与翻译

返回构造该可读流时传入的 'highWaterMark' 属性。

readable.readableLength#

查看英文版参与翻译

This property contains the number of bytes (or objects) in the queue ready to be read. The value provides introspection data regarding the status of the highWaterMark.

readable.resume()#

查看英文版参与翻译

  • 返回: this

readable.resume() 方法会重新触发 'data' 事件, 将暂停模式切换到流动模式。

readable.resume() 方法可以用来充分使用流中的数据,而不用实际处理任何数据,如以下示例所示:

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  });
readable.setEncoding(encoding)#

查看英文版参与翻译

  • encoding <string> 要使用的编码
  • Returns: this

readble.setEncoding() 方法会为从可读流读入的数据设置字符编码

默认返回Buffer对象。设置编码会使得该流数据返回指定编码的字符串而不是Buffer对象。例如,调用readable.setEncoding('utf-8')会使得输出数据作为UTF-8数据解析,并作为字符串返回。调用readable.setEncoding('hex')使得数据被编码成16进制字符串格式。

可读流会妥善处理多字节字符,如果仅仅直接从流中取出Buffer对象,很可能会导致错误解码。

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});
readable.unpipe([destination])#

查看英文版参与翻译

readable.unpipe() 方法将之前通过stream.pipe()方法绑定的流分离

如果 destination 没有传入, 则所有绑定的流都会被分离.

如果传入 destination, 但它没有被pipe()绑定过,则该方法不作为.

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt');
  readable.unpipe(writable);
  console.log('Manually close the file stream');
  writable.end();
}, 1000);
readable.unshift(chunk)#

查看英文版参与翻译

  • chunk <Buffer> | <Uint8Array> | <string> | <any> 数据块移动到可读队列底部。对于不以对象模式运行的流,chunk 必须是字符串, Buffer 或者 Uint8Array。对于对象流, chunk 任何非null的值。

readable.unshift() 方法会把一块数据压回到Buffer内部。 这在如下特定情形下有用: 代码正在消费一个数据流,已经"乐观地"拉取了数据。 又需要"反悔-消费"一些数据,以便这些数据可以传给其他人用。

注意: 'end' 事件已经触发或者运行时错误抛出后,stream.unshift(chunk) 方法不能被调用。

使用 stream.unshift() 的开发者一般需要换一下思路,考虑用一个Transform 流替代. 更多信息请查看API for Stream Implementers部分。

// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
const { StringDecoder } = require('string_decoder');
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  let header = '';
  function onReadable() {
    let chunk;
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // found the header boundary
        const split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // remove the readable listener before unshifting
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // now the body of the message can be read from the stream.
        callback(null, header, stream);
      } else {
        // still reading the header.
        header += str;
      }
    }
  }
}

注意: 不像 stream.push(chunk)stream.unshift(chunk)在重置流的内部读取状态时是不会结束读取过程。 如果在读取过程中调用 readable.unshift() 则会导致异常 (例如:即来自自定义流上的 stream._read()内部方法上的实现)。 应该在调用 readable.unshift()方法之后适当调用 stream.push('') 来重置读取状态,执行读取的过程中最好避免调用 readable.unshift()方法。

readable.wrap(stream)#

查看英文版参与翻译

  • stream <Stream> 一个老版本的readable stream

Node.js在v0.10版本之前的流没有实现当前定义的所有流模块的API.(查看更多兼容性信息 Compatibility )

当使用老版本的Node.js库来触发'data'事件和stream.pause()方法仅是建议性的, readable.wrap()方法能创建一个把老版本的流作为数据源的Readable stream。

几乎没有必要使用readable.wrap(),但是这个方法已经为老版本的Node.js应用和一些库提供了方便。

例子:

const { OldReader } = require('./old-api-module.js');
const { Readable } = require('stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
});
readable[Symbol.asyncIterator]()#

查看英文版参与翻译

Stability: 1 - Experimental
const fs = require('fs');

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const k of readable) {
    data += k;
  }
  console.log(data);
}

print(fs.createReadStream('file')).catch(console.log);

If the loop terminates with a break or a throw, the stream will be destroyed. In other terms, iterating over a stream will consume the stream fully. The stream will be read in chunks of size equal to the highWaterMark option. In the code example above, data will be in a single chunk if the file has less then 64kb of data because no highWaterMark option is provided to fs.createReadStream().

Duplex 流与 Transform 流#

stream.Duplex 类#

查看英文版参与翻译

Duplex 流是同时实现了 ReadableWritable 接口的流。

Duplex 流的实例包括了:

stream.Transform 类#

查看英文版参与翻译

变换流(Transform streams) 是一种 Duplex 流。它的输出与输入是通过某种方式关联的。和所有 Duplex 流一样,变换流同时实现了 ReadableWritable 接口。

变换流的实例包括:

transform.destroy([error])#

查看英文版参与翻译

销毁这个流,发射'error'事件。 调用这个之后,变换流会释放全部内部资源 实现者不应该重载此方法,而应该实现readable._destroyTransform的默认_destroy实现也发射'close'事件。

stream.finished(stream, callback)#

查看英文版参与翻译

  • stream <Stream> 一个可读或可写的流
  • callback <Function> 一个回调函数,可以带有一个错误信息参数,也可没有

使用此函数,以在一个流不再可读、可写或发生了错误、提前关闭等事件时获得通知。

const { finished } = require('stream');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
  if (err) {
    console.error('流发生错误', err);
  } else {
    console.log('流已读完');
  }
});

rs.resume(); // 将流读完

在处理流的提前销毁(如被抛弃的HTTP请求)等错误事件时特别有用,此时流不会触发 'end''finish' 事件。

finished API 也可做成承诺:

const finished = util.promisify(stream.finished);

const rs = fs.createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('流已读完');
}

run().catch(console.error);
rs.resume(); // 将流读完

stream.pipeline(...streams[, callback])#

查看英文版参与翻译

  • ...streams <Stream> 两个或多个要用管道连接的流
  • callback <Function> 一个回调函数,可以带有一个错误信息参数

该模块方法用于在多个流之间架设管道,可以自动传递错误和完成扫尾工作,并且可在管道架设完成时提供一个回调函数:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// 使用 pipeline API 轻松连接多个流
// 并在管道完成时获得通知

// 使用pipeline高效压缩一个可能很大的tar文件:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('管道架设失败', err);
    } else {
      console.log('管道架设成功');
    }
  }
);

pipeline API 也可做成承诺:

const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('管道架设成功');
}

run().catch(console.error);

API for Stream Implementers#

查看英文版参与翻译

stream模块API的设计是为了让JavaScript的原型继承模式可以简单的实现流。

首先,一个流开发者可能声明了一个JavaScript类并且继承四个基本流类中的一个(stream.Writeablestream.Readablestream.Duplex,或者stream.Transform),确保他们调用合适的父类构造函数:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    // ...
  }
}

新的流类必须实现一个或多个特定的方法,根据所创建的流类型,如下图所示:

用例 实现的方法
只读流 Readable _read
只写流 writable _write_writev_final
可读可写流 Duplex _read_write_writev_final
操作写数据,然后读结果 Transform _transform_flush_final

注意:实现流的代码里面不应该出现调用“public”方法的地方因为这些方法是给使用者使用的(流使用者部分的API所述)。这样做可能会导致使用流的应用程序代码产生不利的副作用。

Simplified Construction#

查看英文版参与翻译

对于许多简单的案例,它是有可能在不依赖继承的情况下创建流。这可以直接创建流实例,通过流基础类stream.Writablestream.Readablestream.Duplex,或者stream.Transform传入对象完成,对象包含合适的方法作为构造函数选项。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  }
});

Implementing a Writable Stream#

查看英文版参与翻译

这个stream.Writable类被用于实现可写流。

自定义可写流必须调用new stream.Writable([options])构造函数并且实现writable._write()方法。writable._writev()方法也是可以实现的。

Constructor: new stream.Writable([options])#

查看英文版参与翻译

例如:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    // Calls the stream.Writable() constructor
    super(options);
    // ...
  }
}

或者,使用ES6之前的语法来创建构造函数:

const { Writable } = require('stream');
const util = require('util');

function MyWritable(options) {
  if (!(this instanceof MyWritable))
    return new MyWritable(options);
  Writable.call(this, options);
}
util.inherits(MyWritable, Writable);

或者,使用简化的构造函数方法:

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  }
});

writable._write(chunk, encoding, callback)#

查看英文版参与翻译

  • chunk <Buffer> | <string> | <any> 要写的块。会一直作为缓冲区,除非decodeStrings选项设置为false或者流以对象模式运行。
  • encoding <string> 如果块是字符串,那么encoding就是该字符串的字符编码。 如果块是Buffer,或者是流在对象模式下运行,encoding可能被忽略。
  • callback <Function> 调用此函数(err参数可选)当块处理完成时。

所有可写流实现必须提供一个 writable._write() 方法将数据发送到底层资源。

注意Transform 流提供自己实现的writable._write()

注意:此函数不得直接由应用程序代码调用。 它应该由子类实现,并由内部的Writable类方法调用。

必须调用callback方法来表示写完成成功或失败,出现错误。callback第一个参数必须是Error对象如果调用失败,成功时为null

所有writable._write()被调用并且callback被调用将导致要缓冲的写入数据。 一旦调用callback,流将会执行'drain'事件。 如果想让流实现一次能够处理多个数据块,writable._writev()方法应该被实现。

如果在构造函数选项中设置decodeStrings属性,那么chunk可能是一个字符串而不是一个缓冲区,encodeing将会表示字符串的字符编码。 这是为了支持对某些字符串具有优化处理的实现数据编码。 如果decodeStrings属性显式设置为falseencoding参数可以安全地忽略,chunk将保持不变传递给.write()的对象。

writable._write()方法前缀为下划线,因为它是在定义它的类的内部,不应该直接调用用户程序。

writable._writev(chunks, callback)#

查看英文版参与翻译

  • chunks <Array> 要写的块 每个块都有以下格式:{chunk:...,encoding:...}
  • callback <Function> 一个回调函数(可选地带有一个错误参数)在提供的块的处理完成时被调用。

:此函数不得直接通过应用程序代码调用。 它应由子类实现,并由内部Writable进行调用类方法。

writable._writev()方法能够一次处理多个数据块的流除了writable._write()之外。如果实现,该方法将缓存的所有数据块写入队列。

writable._writev()方法前缀为下划线,因为它是定义它的类的内部,不应该由用户程序直接调用。

writable._destroy(err, callback)#

查看英文版参与翻译

通过 writable.destroy() 方法调用_destroy()。它可以被子类覆盖,但不能直接调用。

writable._final(callback)#

查看英文版参与翻译

  • callback <Function> 在完成写入所有剩余数据时调用该函数(err参数可选)。

_final()方法不能直接调用。 应该由子类负责实现,如果是,将仅可以由内部的Writable类方法进行调用。

这个可选的函数将在流关闭之前被调用, 直到callback回调函数执行完成才触发finish事件。这对于关闭资源或在流结束之前写入缓冲数据很有用。

Errors While Writing#

查看英文版参与翻译

建议在处理writable._write()writable._writev()方法期间发生的错误时传给回调函数的第一个参数来处理。这将导致Writable触发error事件。从writable._write()中抛出一个错误可能会导致意外和不一致的行为,具体取决于如何使用流。使用回调可确保对错误进行一致且可预测的处理。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
});

一个可写流的例子#

查看英文版参与翻译

下面说明了一个相当简单(有点无意义)的可写流实现。虽然这个具体的可写流实例没有任何真正的特殊用途,但该示例说明了一个自定义流实例所需要的元素:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    // ...
  }

  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
}

Decoding buffers in a Writable Stream#

查看英文版参与翻译

解码buffers是一个常见任务,比如用变换流处理字符串输入。 当用多字节字符编码方式(比如UTF-8)时,这是一个微不足道的过程。 下面的例子展示了如何用StringDecoderWritable解码多字节字符串。

const { Writable } = require('stream');
const { StringDecoder } = require('string_decoder');

class StringWritable extends Writable {
  constructor(options) {
    super(options);
    const state = this._writableState;
    this._decoder = new StringDecoder(state.defaultEncoding);
    this.data = '';
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk);
    }
    this.data += chunk;
    callback();
  }
  _final(callback) {
    this.data += this._decoder.end();
    callback();
  }
}

const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();

w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);

console.log(w.data); // currency: €

Implementing a Readable Stream#

查看英文版参与翻译

stream.Readable 类扩展并实现了Readable

用户实现的自定义可读流 必须 调用new stream.Readable([options]) 构造函数并且实现readable._read()方法。

new stream.Readable([options])#

查看英文版参与翻译

  • options <Object>
    • highWaterMark <number> 从底层资源读取数据并存储在内部缓冲区中的最大字节数。默认16384 (16kb), 或者 16对应objectMode流模式。
    • encoding <string> 指定解析的字符编码格式. 默认 为null
    • objectMode <boolean> 一个对象的流。 这意味着 stream.read(n) 返回的是一个单一的对象而不是n个字节的缓冲区。默认 false
    • read <Function>stream._read()方法的实现。
    • destroy <Function>stream._destroy() 方法的实现。

例如:

const { Readable } = require('stream');

class MyReadable extends Readable {
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
    // ...
  }
}

或者使用ES6语法:

const { Readable } = require('stream');
const util = require('util');

function MyReadable(options) {
  if (!(this instanceof MyReadable))
    return new MyReadable(options);
  Readable.call(this, options);
}
util.inherits(MyReadable, Readable);

或者使用简化的构造方法:

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    // ...
  }
});

readable._read(size)#

查看英文版参与翻译

  • size <number> 异步读取的字节数。

注意: 这个函数不能直接被应用程序代码调用。 它应由子类实现,并仅能由Readable对象内部方法调用。

所有实现可读流的实例必须实现readable._read() 方法去获得底层的数据资源。

readable._read() 被调用,如果读取的数据是可用的,应该在最开始的实现的时候使用this.push(dataChunk)方法将该数据推入读取队列。_read() 应该一直读取资源直到推送数据方法readable.push()返回false的时候停止。想再次调用_read()方法,需要再次往可读流里面push数据。

注意:一旦readable._read()方法被调用,只有在 readable.push()方法被调用之后,才能再次被调用。

size 可选参数。_read()方法是一个实现读取数据的单操作,设置size参数来确定要读取数据的大小。 其他的实现可能会忽略这个参数,只要数据可用就提供数据。 不需要等到stream.push(chunk)方法推入一定size的数据后才能调用。

readable._read()方法的前缀是一个下划线,因为它是在定义它的类的内部,不应该被直接调用用户程序。

readable._destroy(err, callback)#

查看英文版参与翻译

  • err <Error> 错误。
  • callback <Function> 回调函数,第一个参数为err参数。

_destroy()需通过readable.destroy()方法调用。它可以被子类覆盖,但不能直接调用。

readable.push(chunk[, encoding])#

查看英文版参与翻译

  • chunk <Buffer> | <Uint8Array> | <string> | <null> | <any> 压入读队列的数据块。 对于没有处在object mode的流来说,chunk必须是一个字符串,BufferUint8Array; 对object mode 的流来说,chunk可以使任何JavaScript值。
  • encoding <string> 字符串数据块的编码方式. 必须是可用的Buffer编码方式,例如'utf8''ascii'
  • 返回 <boolean> 如果多余的数据块可能会继续压入,那么返回true; 否则返回 false.

chunk是一个Buffer, Uint8Array或者string时, 这个数据块(chunk)会被添加到内部队列供使用者消费。 在没有数据可写入后,给chunk传了null发出流结束(EOF)的信号。

当可读流处在传输模式下,'data'事件触发时,可以通过 调用readable.read() 方法读出来数据,这数据是用readable.push()添加的。

readable.push()方法被设计得尽可能的灵活。 比如,当封装一个有'暂停/恢复'机制和带数据回调的底层source的时候, 那么这个底层的source可以被常规的可读流实例封装。就像下面的例子一样。

// source 是一个有readStop()和 readStart()方法的对象。
// 有数据就调`ondata`成员函数;
// 数据结束就调`onend`成员函数。

class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowlevelSourceObject();

    // Every time there's data, push it into the internal buffer.
    this._source.ondata = (chunk) => {
      // if push() returns false, then stop reading from source
      if (!this.push(chunk))
        this._source.readStop();
    };

    // When the source ends, push the EOF-signaling `null` chunk
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read will be called when the stream wants to pull more data in
  // the advisory size argument is ignored in this case.
  _read(size) {
    this._source.readStart();
  }
}

注意: readable.push()方法是为了让可读流实现者调用的, 而且只来自readable._read()方法内部。

Errors While Reading#

查看英文版参与翻译

建议在调用 readable._read() 方法时发生的错误应该执行触发 'error' 事件,而不是抛出异常错误。从readable._read()中抛出错误可能会导致意外的和不一致的行为,具体取决于流是以流还是暂停模式运行。 使用“错误”事件可确保一致且可预测的错误处理。

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    if (checkSomeErrorCondition()) {
      process.nextTick(() => this.emit('error', err));
      return;
    }
    // do some work
  }
});

一个数流的例子#

查看英文版参与翻译

以下是可读流的一个基本例子,触发数字1到1,000,000升序,然后结束

const { Readable } = require('stream');

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      const str = '' + i;
      const buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
}

Implementing a Duplex Stream#

查看英文版参与翻译

双工流(可读可写流)是可读流可写流的实现,例如TCP套接字连接。

因为JavaScript不支持多重继承,所以stream.Duplex类被扩展以实现双工流(而不是扩展stream.Readablestream.Writable类)。

注意。stream.Duplex类原型继承来自stream.Readable和寄生的stream.Writable,但是instanceof将会在这两个基础类上正确工作,由于stream.Writable覆盖了 Symbol.hasInstance方法。

自定义双工流必须通过new stream.Duplex([options])构造函数并实现readable._read()writable._write()方法。

new stream.Duplex(options)#

查看英文版参与翻译

  • options <Object> 传给可读和可写流的构造函数,还有如下字段:
    • allowHalfOpen <boolean> 默认是true。如果设置为false, 那么当读端停止时,写端自动停止。
    • readableObjectMode <boolean> 默认是 false。会为流的读端设置objectMode。如果 objectModetrue,那就没有任何用。
    • writableObjectMode <boolean> 默认是 false。会为流的写端设置objectMode。如果 objectModetrue,那就没有任何用。
    • readableHighWaterMark <number> 设置 highWaterMark 可读流的缓冲区大小。 如果已经设置 highWaterMarkreadableHighWaterMark不起作用。
    • writableHighWaterMark <number> 设置 highWaterMark 可写流缓冲区大小。如果设置了highWaterMarkwritableHighWaterMark不起作用。

例如:

const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
}

或者, 使用ES6之前的语法来创建构造函数:

const { Duplex } = require('stream');
const util = require('util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);

又或者, 用简化的构造函数:

const { Duplex } = require('stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  }
});

An Example Duplex Stream#

查看英文版参与翻译

下面是一个可读可写流包装了一个假定的可读可写的底层源对象, 尽管用了一个与Node.js流不兼容的API。

下面是一个简单的例子, 在一个可读可写流中,来的buffers通过Writable 接口写入数据,再通过Readable接口读回数据。

const { Duplex } = require('stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // The underlying source only deals with strings
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
}

尽管在一个对象实例中共存,读端和写端却是相互独立于彼此,这是可读可写流最为重要的一点。

Object Mode Duplex Streams#

查看英文版参与翻译

对可读可写流来说,objectMode可以通过readableObjectModewritableObjectMode选项 来分别设置读端和写端。

比如下面的例子。 创建了一个变换流(一种可读可写流)。 在写端接收JavaScript数字,在读端转换为16进制字符串。

const { Transform } = require('stream');

// All Transform streams are also Duplex Streams
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Coerce the chunk to a number if necessary
    chunk |= 0;

    // Transform the chunk into something else.
    const data = chunk.toString(16);

    // Push the data onto the readable queue.
    callback(null, '0'.repeat(data.length % 2) + data);
  }
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64

Implementing a Transform Stream#

查看英文版参与翻译

一个Transform流是一种Duplex流,输入经过Transform流,做某种计算然后输出。 比如 zlib流和crypto流会做压缩,加密和解密数据。

注意: 输出流的大小,有多少数据包,到达时间都不一定非要和输入流一样。 比如,一个哈希流再输入结束时永远只会输出单个数据块; 而一个zlib流的输出,可能比输入大得多或小得多。

stream.Transform 类被扩展了,实现了一个Transform流。

stream.Transform类最初继承自stream.Duplex,并且实现了它自己版本的writable._write()readable._read()方法。 一般地,变换流必须实现transform._transform() 方法; 而transform._flush() 方法是非必须的。

注意: 用变换流时要注意,如果读端输出没有被消费,那么往写数据可能会引起写端暂停。

new stream.Transform([options])#

查看英文版参与翻译

例如:

const { Transform } = require('stream');

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // ...
  }
}

或者,用ES6构造方法:

const { Transform } = require('stream');
const util = require('util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform);

又或者, 用简化构造方法:

const { Transform } = require('stream');

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  }
});

Events: 'finish' and 'end'#

查看英文版参与翻译

'finish'事件来自stream.Writable'end'事件来自stream.Readable类。

在调用了stream.end()并且stream._transform()处理了全部数据块之后, 'finish'事件触发。

transform._flush()中的回调函数被调用之后,所有数据已经输出,此时,'end'事件触发

transform._flush(callback)#

查看英文版参与翻译

  • callback <Function> 一个当剩余的数据被冲刷后被调用的回调函数(error参数和data可选)。

Note: 应用程序代码禁止直接调用这个函数。它应该由子类来实现,并且只能被内部可读流的类方法调用。

在某些情况下,转换操作需要在流的末尾发射一块额外的数据。例如,zlib 压缩流会存储一种优先用于压缩输出的内部状态。但是在流结束的时候,那段额外的数据需要被冲刷才能完成数据压缩。

自定义的 Transform 实现,可以实现transform._flush()方法。在没有更多的要写的数据被消耗时,会调用这个方法,但是在发射'end' 事件之前会发出可读流结束的信号。

transform._flush() 实现里,readable.push()方法会在适当的时候调用零次或者多次。callback在冲刷操作完成的时候一定会被调用。

transform._flush() 方法前缀为下划线,因为它是在定义它的类的内部,绝不应该被用户程序直接调用。

transform._transform(chunk, encoding, callback)#

查看英文版参与翻译

  • chunk <Buffer> | <string> | <any> 被转换的数据块。它总是一个buffer除非在option中配置decodeStringfalse或者当前流处在object mode下。
  • encoding <string> 如果 chunk 是字符串,那么encoding就是该字符串的字符编码。如果块是Buffer,它是一个特殊的值'buffer',这种情况encoding可以被忽略。
  • callback <Function> 当块被处理完成时调用此函数(包含error和data参数)。

注意:此函数不得直接由应用程序代码调用。它应该由子类实现,并由内部的Readable类方法调用。

所有的变换流的执行必须提供一个_transform()方法接收输入并提供输出。transform._transform()的实现会处理写入的字节,做某种计算并输出,然后使用readable.push()方法把这个输出传递到可读流。

从一个单个输入数据块可能会调用零次或多次transform.push()方法,调用次数取决于每次把多少数据做为一个输出结果。

有可能从任意给定输入的数据块中没有产生任何输出。

callback 会在当前数据被完全消费之后调用。在处理过程输入的过程中如果出错了,第一个参数是一个错误对象,没有出错Error参数则为null。如果传递第二个参数,它会被转发到readable.push()中。就像下面的例子:

transform.prototype._transform = function(data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function(data, encoding, callback) {
  callback(null, data);
};

transform._transform() 方法前缀为下划线,因为它是定义在它的类的内部,不应该直接被用户程序调用。

transform._transform() 方法永远不能并行调用;流使用了队列机制,不论同步或者异步情况下,都必须先调用callback之后才能接收下一个数据。

Class: stream.PassThrough#

查看英文版参与翻译

stream.PassThrough类是一个极简Transform流,只是把输入字节原样不动直接输出。 一开始的目的是用来做例子和测试,但是有时也作为某些新颖的流的基本组成部分起作用。

Additional Notes#

Compatibility with Older Node.js Versions#

查看英文版参与翻译

在v0.10之前的Node.js版本中,可读流接口更简单,但功能更弱,功能更少。

  • 相比需要等待stream.read() 方法调用之后才触发,'data' 事件自己会立即触发。 需要执行一定量工作来决定如何处理数据的应用程序需要将读取数据存储到缓冲区中,以便数据不会丢失。
  • The stream.pause() 方法是建议性的,而不是保证。 这意味着即使流处于暂停状态,仍然需要准备接收data事件。

在Node.js v0.10中,添加了Readable类。 为了向后兼容较旧的Node.js程序,当添加data事件处理程序或调用stream.resume()方法时,可读流将切换到“流动模式”。 结果是,即使不使用新的stream.read()方法和'readable'事件,也不必担心丢失'data'块。

虽然大多数应用程序将继续正常工作,但在以下情况下会引入极端情况:

  • 未添加'data' 事件监听。
  • 未调用stream.resume()方法。
  • 通过管道没有传送到任何可写的目的地。

例如,请考虑以下代码:

// WARNING!  BROKEN!
net.createServer((socket) => {

  // we add an 'end' method, but never consume the data
  socket.on('end', () => {
    // It will never get here.
    socket.end('The message was received but was not processed.\n');
  });

}).listen(1337);

在v0.10之前的Node.js版本中,传入的消息数据将会是简单地丢弃。 但是,在Node.js v0.10及更高版本中,套接字仍然存在永远停顿。

在这种情况下的解决方法是调用stream.resume() 开始读取数据:

// Workaround
net.createServer((socket) => {

  socket.on('end', () => {
    socket.end('The message was received but was not processed.\n');
  });

  // start the flow of data, discarding it.
  socket.resume();

}).listen(1337);

除了新的可读流切换到流动模式之外,pre-v0.10风格的流可以使用包装在Readable类中readable.wrap() 方法。

readable.read(0)#

查看英文版参与翻译

在某些情况下,需要有机制来触发刷新基础可读流, 而没有实际消费任何数据。在这种情况下,可以调用readable.read(0),返回null

如果内部读取缓冲区低于highWaterMark,并且该流目前未读取,则调用stream.read(0)将触发调用底层 stream._read()方法。

虽然大多数应用程序几乎都不需要这样做,但Node.js中会出现这种情况,尤其是在可读流类内部。

readable.push('')#

查看英文版参与翻译

不推荐使用readable.push('')

向一个不处在object mode的流压入一个BufferUint8Array0字节字符串,会产生有趣的副作用。 因为调用了readable.push(),所以会停止读进程。 然而,因为参数是一个空字符串,没有可读的数据添加到可读buffer, 所以没有可以供用户消费的数据。

highWaterMark discrepancy after calling readable.setEncoding()#

查看英文版参与翻译

调用 readable.setEncoding() 会改变 highWaterMark 属性在非对象模式中的作用。

一般而言,我们直接将缓冲器存储的 字节数highWaterMark 相比较。然而在调用 setEncoding() 之后,程序会将缓冲器中存储的 字符数highWaterMark 相比较。

在通常情况下,如使用 latin1ascii 时,这不成问题。但在处理可能含有多字节字符的字符串时,此行为需要当心。