Node.js v10.6.0 文档


目录

stream - 流#

查看英文版参与翻译

稳定性: 2 - 稳定的

流(stream)是一种在 Node.js 中处理流式数据的抽象接口。 stream 模块提供了一些基础的 API,用于构建实现了流接口的对象。

Node.js 提供了多种流对象。 例如,发送到 HTTP 服务器的请求process.stdout 都是流的实例。

流可以是可读的、可写的、或是可读写的。 所有的流都是 EventEmitter 的实例。

stream 模块可以通过以下方式使用:

const stream = require('stream');

尽管理解流的工作方式很重要,但是 stream 模块本身主要用于开发者创建新类型的流实例。 对于以消费流对象为主的开发者,极少需要直接使用 stream 模块。

本文档的组织结构#

查看英文版参与翻译

本文档分为两个主要章节,外加其他注意事项作为第三章节。 第一章节阐述了在应用程序中使用流时需要的 API 要素。 第二章节阐述了实现新类型的流时需要的 API 要素。

流的类型#

查看英文版参与翻译

Node.js 中有四种基本的流类型:

另外本模块还包含了工具类函数 pipelinefinished

对象模式#

查看英文版参与翻译

所有 Node.js API 创建的流都是专门运作在字符串和 Buffer(或 Uint8Array)对象上。 当然,流的实现也可以使用其它类型的 JavaScript 值(除了 null,因为它在流中有特殊用途)。 这些流会以“对象模式”进行操作。

当创建流时,可以使用 objectMode 选项把流实例切换到对象模式。 试图将已经存在的流切换到对象模式是不安全的。

缓冲#

查看英文版参与翻译

可写流可读流都会在一个内部的缓冲器中存储数据,可以分别使用的 writable.writableBufferreadable.readableBuffer 来获取。

可缓冲的数据的数量取决于传入流构造函数的 highWaterMark 选项。 对于普通的流,highWaterMark 选项指定了字节的总数量。 对于以对象模式运作的流,highWaterMark 指定了对象的总数量。

当调用 stream.push(chunk) 时,数据会被缓冲在可读流中。 如果流的消费程序没有调用 stream.read(),则这些数据会停留在内部队列中,直到被消费。

一旦内部的可读缓冲的总大小达到 highWaterMark 指定的阈值时,流会暂时停止从底层资源读取数据,直到当前缓冲的数据被消费 (也就是说,流会停止调用内部的用于填充可读缓冲的 readable._read() 方法)。

当反复地调用 writable.write(chunk) 方法时,数据会被缓冲在可写流中。 当内部的可写缓冲的总大小小于 highWaterMark 设置的阈值时,调用 writable.write() 会返回 true。 一旦内部缓冲的大小达到或超过 highWaterMark 时,则会返回 false

stream API 的主要目标,特别是 stream.pipe() 方法,是为了限制数据的缓冲到可接受的程度,也就是读写速度不一致的源头与目的地不会压垮可用的内存。

因为 DuplexTransform 都是可读又可写的,所以它们各自维护着两个相互独立的内部缓冲器用于读取和写入, 这使得它们在维护数据流时,读取和写入两边可以各自独立地运作。 例如,net.Socket 实例是 Duplex 流,它的可读端可以消费从 socket 接收的数据,而可写端则可以将数据写入到 socket。 因为数据写入到 socket 的速度可能比接收数据的速度快或者慢,所以在读写两端独立地进行操作(或缓冲)就显得很重要了。

用于消费流的 API#

查看英文版参与翻译

几乎所有的 Node.js 应用都在某种程度上使用了流。 下面是一个例子,在 Node.js 应用程序中使用流实现了一个 HTTP 服务器:

const http = require('http');

const server = http.createServer((req, res) => {
  // req 是一个 http.IncomingMessage 实例,它是可读流。
  // res 是一个 http.ServerResponse 实例,它是可写流。

  let body = '';
  // 接收数据为 utf8 字符串,
  // 如果没有设置字符编码,则会接收到 Buffer 对象。
  req.setEncoding('utf8');

  // 如果添加了监听器,则可读流会触发 'data' 事件。
  req.on('data', (chunk) => {
    body += chunk;
  });

  // 'end' 事件表明整个请求体已被接收。 
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // 响应一些信息给用户。
      res.write(typeof data);
      res.end();
    } catch (er) {
      // json 解析失败。
      res.statusCode = 400;
      return res.end(`错误: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// 错误: Unexpected token o in JSON at position 1

可写流(比如例子中的 res)会暴露了一些方法,比如 write()end() 用于写入数据到流。

当数据可以从流读取时,可读流会使用 EventEmitter API 来通知应用程序。 从流读取数据的方式有很多种。

可写流可读流都通过多种方式使用 EventEmitter API 来通讯流的当前状态。

Duplex 流和 Transform 流都是可写又可读的。

对于只需写入数据到流或从流消费数据的应用程序,并不需要直接实现流的接口,通常也不需要调用 require('stream')

需要实现新类型的流的开发者可以参考用于实现流的API章节。

可写流#

查看英文版参与翻译

可写流是对数据要被写入的目的地的一种抽象。

可写流的例子包括:

上面的一些例子事实上是实现了可写流接口的 Duplex 流。

所有可写流都实现了 stream.Writable 类定义的接口。

尽管可写流的具体实例可能略有差别,但所有的可写流都遵循同一基本的使用模式,如以下例子所示:

const myStream = getWritableStreamSomehow();
myStream.write('一些数据');
myStream.write('再来一些数据');
myStream.end('完成写入数据');

stream.Writable 类#

查看英文版参与翻译

'close' 事件#

查看英文版参与翻译

当流或其底层资源(比如文件描述符)被关闭时,触发 'close' 事件。 该事件表明不会再触发其他事件,且不会再发生运算。

不是所有可写流都会触发 'close' 事件。

'drain' 事件#

查看英文版参与翻译

如果调用 stream.write(chunk) 方法返回 false,则在适合恢复写入数据到流时触发 'drain' 事件。

// 向提供的可写流中写入数据一百万次。
// 注意背压(back-pressure)。
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // 最后一次。
        writer.write(data, encoding, callback);
      } else {
        // 检查是否可以继续写入。 
        // 不要传入 callback,因为写入还没有结束。
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // 不得不提前停下!
      // 当 'drain' 事件触发后继续写入。
      writer.once('drain', write);
    }
  }
}
'error' 事件#

查看英文版参与翻译

当写入数据出错或使用管道出错时,触发 'error' 事件。 监听器回调函数被调用时会传入一个 Error 参数。

但触发 'error' 事件时,流还未被关闭。

'finish' 事件#

查看英文版参与翻译

调用 stream.end() 方法且缓冲数据都已经传给底层系统之后,触发 'finish' 事件。

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`你好,#${i}!\n`);
}
writer.end('这是结尾\n');
writer.on('finish', () => {
  console.error('所有写入已完成。');
});
'pipe' 事件#

查看英文版参与翻译

当在可读流上调用 stream.pipe() 方法添加可写流到目标流向时,触发 'pipe' 事件。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.error('有数据正通过管道流入写入器');
  assert.equal(src, reader);
});
reader.pipe(writer);
'unpipe' 事件#

查看英文版参与翻译

当在可读流上调用 stream.unpipe() 方法从目标流向中移除当前可写流时,触发 'unpipe' 事件。

当可读流通过管道流向可写流发生错误时,也会触发 'unpipe' 事件。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.error('已停止写入流的管道。');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);
writable.cork()#

查看英文版参与翻译

调用 writable.cork() 方法将强制所有写入数据都存放到内存中的缓冲区里。 直到调用 stream.uncork()stream.end() 方法时,缓冲区里的数据才会被输出。

在向流中写入大量小块数据(small chunks of data)时,内部缓冲区(internal buffer)可能失效,从而导致性能下降。writable.cork() 方法主要就是用来避免这种情况。 对于这种情况, 实现了 writable._writev() 方法的流可以对写入的数据进行缓冲,从而提高写入效率。

也可查看 writable.uncork()

writable.destroy([error])#

查看英文版参与翻译

摧毁这个流,并发出传过来的错误。当这个函数被调用后,这个写入流就结束了。 使用者不应该重写这个函数,而是重写 writable._destroy

writable.end([chunk][, encoding][, callback])#

查看英文版参与翻译

  • chunk <string> | <Buffer> | <Uint8Array> | <any> 可选的,需要写入的数据。对于非对象模式下的流, chunk 必须是字符串、或 Buffer、或 Uint8Array。对于对象模式下的流, chunk 可以是任意的 JavaScript 值,除了 null
  • encoding <string> 如果 chunk 是字符串,这里指定字符编码。
  • callback <Function> 可选的,流结束时的回调函数。

调用 writable.end() 方法表明接下来没有数据要被写入可写流。通过传入可选的 chunkencoding 参数,可以在关闭流之前再写入一段数据。如果传入了可选的 callback 函数,它将作为 'finish' 事件的回调函数。

在调用了 stream.end() 方法之后,再调用 stream.write() 方法将会导致错误。

// 写入 'hello, ' ,并用 'world!' 来结束写入。
const fs = require('fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// 后面不允许再写入数据!
writable.setDefaultEncoding(encoding)#

查看英文版参与翻译

writable.setDefaultEncoding() 用于为可写流设置 encoding

writable.uncork()#

查看英文版参与翻译

writable.uncork() 将输出在 stream.cork() 方法被调用之后缓冲在内存中的所有数据。

如果使用 writable.cork()writable.uncork() 来管理写入缓存,建议使用 process.nextTick() 来延迟调用 writable.uncork() 方法。通过这种方式,可以对单个 Node.js 事件循环中调用的所有 writable.write() 方法进行批处理。

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork());

如果一个流多次调用了 writable.cork() 方法,那么也必须调用同样次数的 writable.uncork() 方法以输出缓冲区数据。

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();
  // 之前的数据只有在 uncork() 被二次调用后才会输出
  stream.uncork();
});

也可查看 writable.cork()

writable.writableHighWaterMark#

查看英文版参与翻译

返回构造该可写流时传入的 highWaterMark 参数值。

writable.writableLength#

查看英文版参与翻译

这个属性包含了写入就绪队列的字节(或者对象)数,这个值提供了关于highWaterMark状态的内省数据。

writable.write(chunk[, encoding][, callback])#

查看英文版参与翻译

  • chunk <string> | <Buffer> | <Uint8Array> | <any> 要写入的数据。可选的。 对于非对象模式下的流, chunk 必须是字符串, Buffer 或者 Uint8Array。对于对象模式下的流,chunk 可以是除 null 外的任意 JavaScript 值。
  • encoding <string> 如果 chunk 是字符串,这里指定字符编码。
  • callback <Function> 缓冲数据输出时的回调函数。
  • 返回: <boolean> 如果流需要等待 'drain' 事件触发才能继续写入数据,这里将返回 false ; 否则返回 true

writable.write() 方法向流中写入数据,并在数据处理完成后调用 callback 。如果有错误发生, callback 不一定 以这个错误作为第一个参数并被调用。要确保可靠地检测到写入错误,应该监听 'error' 事件。

在确认了 chunk 后,如果内部缓冲区的大小小于创建流时设定的 highWaterMark 阈值,函数将返回 true 。 如果返回值为 false ,应该停止向流中写入数据,直到 'drain' 事件被触发。

当一个流不处在 drain 的状态, 对 write() 的调用会缓存数据块, 并且返回 false。 一旦所有当前所有缓存的数据块都排空了(被操作系统接受来进行输出), 那么 'drain' 事件就会被触发。 我们建议, 一旦 write() 返回 false, 在 'drain' 事件触发前, 不能写入任何数据块。 然而,当流不处在 'drain' 状态时, 调用 write() 是被允许的, Node.js 会缓存所有已经写入的数据块, 直到达到最大内存占用, 这时它会无条件中止。 甚至在它中止之前, 高内存占用将会导致差的垃圾回收器的性能和高的系统相对敏感性 (即使内存不再需要,也通常不会被释放回系统)。 如果远程的另一端没有读取数据, TCP sockets 可能永远也不会 drain , 所以写入到一个不会drain的socket可能会导致远程可利用的漏洞。

对于一个 Transform, 写入数据到一个不会drain的流尤其成问题, 因为 Transform 流默认被暂停, 直到它们被pipe或者被添加了 'data''readable' 事件处理函数。

如果将要被写入的数据可以根据需要生成或者取得,我们建议将逻辑封装为一个可读流并且使用 stream.pipe()。 但是如果调用 write() 优先, 那么可以使用 'drain' 事件来防止回压并且避免内存问题:

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// 在回调函数被执行后再进行其他的写入
write('hello', () => {
  console.log('write completed, do more writes now');
});

对象模式的写入流将忽略 encoding 参数。

可读流#

查看英文版参与翻译

可读流是对提供数据的来源的一种抽象。

可读流的例子包括:

所有的可读流都实现了 stream.Readable 类上定义的接口。

两种模式#

查看英文版参与翻译

可读流实质上运作于流动中(flowing)或已暂停(paused)两种模式之一。

在 flowing 模式中,数据自动地从底层的系统被读取,并通过 EventEmitter 接口的事件尽可能快地被提供给应用程序。

在 paused 模式中,必须显式调用 stream.read() 方法来从流中读取数据片段。

所有可读流都开始于 paused 模式,可以通过以下方式切换到 flowing 模式:

可读流可以通过以下方式切换回 paused 模式:

  • 如果没有管道目标,调用 stream.pause() 方法。
  • 如果有管道目标,移除所有管道目标。调用 stream.unpipe() 方法可以移除多个管道目标。

需要记住的重要概念是,只有提供了消费或忽略数据的机制后,可读流才会产生数据。 如果消费的机制被禁用或移除,则可读流会停止产生数据。

为了向后兼容,移除 'data' 事件处理函数不会自动地暂停流。 如果存在管道目标,一旦目标变为 drain 状态并请求接收数据时,则调用 stream.pause() 也不能保证流会保持暂停状态。

如果可读流切换到 flowing 模式,且没有可用的消费函数处理数据,则这些数据将会丢失。 例如,当调用 readable.resume() 方法时,没有监听 'data' 事件或 'data' 事件的处理函数从流中被移除了。

三种状态#

查看英文版参与翻译

可读流运作的两种模式是对发生在可读流中更加复杂的内部状态管理的一种简化的抽象。

在任意时刻,任一可读流会处于以下三种状态之一:

  • readable.readableFlowing = null
  • readable.readableFlowing = false
  • readable.readableFlowing = true

readable.readableFlowingnull 时,没有提供消费流数据的机制,所以流不会产生数据。 在这个状态下,监听 'data' 事件、调用 readable.pipe() 方法、或调用 readable.resume() 方法, 则 readable.readableFlowing 会变成 true ,可读流开始主动地产生数据触发事件。

调用 readable.pause()readable.unpipe()、或接收背压,则 readable.readableFlowing 会被设为 false,暂时停止事件流动但不会停止数据的生成。 在这个状态下,为 'data' 事件设置监听器不会使 readable.readableFlowing 变成 true

const { PassThrough, Writable } = require('stream');
const pass = new PassThrough();
const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing 现在为 false。

pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // 不会触发 'data' 事件。
pass.resume(); // 必须调用它才会触发 'data' 事件。

readable.readableFlowingfalse 时,数据可能会堆积在流的内部缓冲中。

选择一种#

查看英文版参与翻译

可读流 API 的演化贯穿了多个 Node.js 版本,提供了多种方法来消费流数据。通常开发者应该选择其中 一种 来消费数据,而 不应该 在单个流使用多种方法来消费数据。

对于大多数用户,建议使用 readable.pipe() 方法来消费流数据,因为它是最简单的一种实现。开发者如果要精细地控制数据传递和产生的过程,可以使用 EventEmitterreadable.pause()/readable.resume() 提供的 API 。

stream.Readable 类#

查看英文版参与翻译

'close' 事件#

查看英文版参与翻译

'close' 事件将在流或其底层资源(比如一个文件)关闭后触发。'close' 事件触发后,该流将不会再触发任何事件。

不是所有 [Readable][] 都会触发 'close' 事件。

'data' 事件#

查看英文版参与翻译

  • chunk <Buffer> | <string> | <any> 数据片段。对于非对象模式的可读流,这是一个字符串或者 Buffer。 对于对象模式的可读流,这可以是除 null 以外的任意类型 JavaScript 值。

'data' 事件会在流将数据传递给消费者时触发。当流转换到 flowing 模式时会触发该事件。调用 readable.pipe()readable.resume() 方法,或为 'data' 事件添加回调可以将流转换到 flowing 模式。 'data' 事件也会在调用 readable.read() 方法并有数据返回时触发。

在没有明确暂停的流上添加 'data' 事件监听会将流转换为 flowing 模式。 数据会在可用时尽快传递给下个流程。

如果调用 readable.setEncoding() 方法明确为流指定了默认编码,回调函数将接收到一个字符串,否则接收到的数据将是一个 Buffer 实例。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
'end' 事件#

查看英文版参与翻译

'end' 事件将在流中再没有数据可供消费时触发。

注意'end' 事件只有在数据被完全消费后 才会触发 。 可以通过将流转换到 flowing 模式, 或反复调用 stream.read() 方法来实现这一点。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
});
'error' 事件#

查看英文版参与翻译

'error' 事件可以在任何时候在可读流实现(Readable implementation)上触发。 通常,这会在底层系统内部出错从而不能产生数据,或当流的实现试图传递错误数据时发生。

回调函数将接收到一个 Error 对象。

'readable' 事件#

查看英文版参与翻译

'readable' 事件将在流中有数据可供读取时触发。在某些情况下,为 'readable' 事件添加回调将会导致一些数据被读取到内部缓存中。

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  // 有一些数据可读了
});

当到达流数据尾部时, 'readable' 事件也会触发。触发顺序在 'end' 事件之前。

事实上, 'readable' 事件表明流有了新的动态:要么是有了新的数据,要么是到了流的尾部。 对于前者, stream.read() 将返回可用的数据。而对于后者, stream.read() 将返回 null。 例如,下面的例子中的 foo.txt 是一个空文件:

const fs = require('fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
  console.log('end');
});

上面脚本的输出如下:

$ node test.js
readable: null
end

注意: 通常情况下,readable.pipe() 方法和 'data' 事件机制比 'readable' 事件更容易理解。然而处理 'readable'事件可能造成吞吐量升高。

readable.destroy([error])#

查看英文版参与翻译

销毁流,并且触发error事件。然后,可读流将释放所有的内部资源。

开发者不应该覆盖这个方法,应该覆盖readable._destroy方法。

readable.isPaused()#

查看英文版参与翻译

readable.isPaused() 方法返回可读流的当前操作状态。 该方法主要是在 readable.pipe() 方法的底层机制中用到。大多数情况下,没有必要直接使用该方法。

const readable = new stream.Readable();

readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false
readable.pause()#

查看英文版参与翻译

  • 返回: this

readable.pause() 方法将会使 flowing 模式的流停止触发 'data' 事件, 进而切出 flowing 模式。任何可用的数据都将保存在内部缓存中。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});
readable.pipe(destination[, options])#

查看英文版参与翻译

readable.pipe() 绑定一个 [Writable][] 到 readable 上, 将可写流自动切换到 flowing 模式并将所有数据传给绑定的 [Writable][]。数据流将被自动管理。这样,即使是可读流较快,目标可写流也不会超负荷(overwhelmed)。

下面例子将 readable 中的所有数据通过管道传递给名为 file.txt 的文件:

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// readable 中的所有数据都传给了 'file.txt'
readable.pipe(writable);

可以在单个可读流上绑定多个可写流。

readable.pipe() 方法返回 目标流 的引用,这样就可以对流进行链式地管道操作:

const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w);

默认情况下,当源可读流(the source Readable stream)触发 'end' 事件时,目标流也会调用 stream.end() 方法从而结束写入。要禁用这一默认行为, end 选项应该指定为 false, 这将使目标流保持打开, 如下面例子所示:

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
});

这里有一点要警惕,如果可读流在处理时发生错误,目标可写流 不会 自动关闭。 如果发生错误,需要 手动 关闭所有流以避免内存泄漏。

注意:不管对 process.stderrprocess.stdout 指定什么选项,它们都是直到 Node.js 进程退出才关闭。

readable.read([size])#

查看英文版参与翻译

readable.read()方法从内部缓冲区中抽出并返回一些数据。 如果没有可读的数据,返回null。readable.read()方法默认数据将作为“Buffer”对象返回 ,除非已经使用readable.setEncoding()方法设置编码或流运行在对象模式。

可选的size参数指定要读取的特定数量的字节。如果size字节不可读,将返回null除非流已经结束,在这种情况下所有保留在内部缓冲区的数据将被返回。

如果没有指定size参数,则内部缓冲区包含的所有数据将返回。

readable.read()方法只应该在暂停模式下的可读流上运行。在流模式下,readable.read()自动调用直到内部缓冲区的数据完全耗尽。

const readable = getReadableStreamSomehow();
readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
});

一般来说,建议开发人员避免使用'readable'事件和readable.read()方法,使用readable.pipe()'data'事件代替。

无论size参数的值是什么,对象模式中的可读流将始终返回调用readable.read(size)的单个项目。

注意:如果readable.read()方法返回一个数据块,那么一个'data'事件也将被发送。

注意:在已经被发出的'end'事件后调用stream.read([size])事件将返回null。不会抛出运行时错误。

readable.readableHighWaterMark#

查看英文版参与翻译

返回构造该可读流时传入的 'highWaterMark' 属性。

readable.readableLength#

查看英文版参与翻译

This property contains the number of bytes (or objects) in the queue ready to be read. The value provides introspection data regarding the status of the highWaterMark.

readable.resume()#

查看英文版参与翻译

  • 返回: this

readable.resume() 方法会重新触发 'data' 事件, 将暂停模式切换到流动模式。

readable.resume() 方法可以用来充分使用流中的数据,而不用实际处理任何数据,如以下示例所示:

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  });
readable.setEncoding(encoding)#

查看英文版参与翻译

  • encoding <string> 要使用的编码
  • Returns: this

readble.setEncoding() 方法会为从可读流读入的数据设置字符编码

默认返回Buffer对象。设置编码会使得该流数据返回指定编码的字符串而不是Buffer对象。例如,调用readable.setEncoding('utf-8')会使得输出数据作为UTF-8数据解析,并作为字符串返回。调用readable.setEncoding('hex')使得数据被编码成16进制字符串格式。

可读流会妥善处理多字节字符,如果仅仅直接从流中取出Buffer对象,很可能会导致错误解码。

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});
readable.unpipe([destination])#

查看英文版参与翻译

readable.unpipe() 方法将之前通过stream.pipe()方法绑定的流分离

如果 destination 没有传入, 则所有绑定的流都会被分离.

如果传入 destination, 但它没有被pipe()绑定过,则该方法不作为.

const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second
readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt');
  readable.unpipe(writable);
  console.log('Manually close the file stream');
  writable.end();
}, 1000);
readable.unshift(chunk)#

查看英文版参与翻译

  • chunk <Buffer> | <Uint8Array> | <string> | <any> 数据块移动到可读队列底部。对于不以对象模式运行的流,chunk 必须是字符串, Buffer 或者 Uint8Array。对于对象流, chunk 任何非null的值。

readable.unshift() 方法会把一块数据压回到Buffer内部。 这在如下特定情形下有用: 代码正在消费一个数据流,已经"乐观地"拉取了数据。 又需要"反悔-消费"一些数据,以便这些数据可以传给其他人用。

注意: 'end' 事件已经触发或者运行时错误抛出后,stream.unshift(chunk) 方法不能被调用。

使用 stream.unshift() 的开发者一般需要换一下思路,考虑用一个[Transform][] 流替代. 更多信息请查看API for Stream Implementers部分。

// Pull off a header delimited by \n\n
// use unshift() if we get too much
// Call the callback with (error, header, stream)
const { StringDecoder } = require('string_decoder');
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  let header = '';
  function onReadable() {
    let chunk;
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk);
      if (str.match(/\n\n/)) {
        // found the header boundary
        const split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // remove the readable listener before unshifting
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // now the body of the message can be read from the stream.
        callback(null, header, stream);
      } else {
        // still reading the header.
        header += str;
      }
    }
  }
}

注意: 不像 stream.push(chunk)stream.unshift(chunk)在重置流的内部读取状态时是不会结束读取过程。 如果在读取过程中调用 readable.unshift() 则会导致异常 (例如:即来自自定义流上的 stream._read()内部方法上的实现)。 应该在调用 readable.unshift()方法之后适当调用 stream.push('') 来重置读取状态,执行读取的过程中最好避免调用 readable.unshift()方法。

readable.wrap(stream)#

查看英文版参与翻译

  • stream <Stream> 一个老版本的readable stream

Node.js在v0.10版本之前的流没有实现当前定义的所有流模块的API.(查看更多兼容性信息 Compatibility )

当使用老版本的Node.js库来触发'data'事件和stream.pause()方法仅是建议性的, readable.wrap()方法能创建一个把老版本的流作为数据源的[Readable][] stream。

几乎没有必要使用readable.wrap(),但是这个方法已经为老版本的Node.js应用和一些库提供了方便。

例子:

const { OldReader } = require('./old-api-module.js');
const { Readable } = require('stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
});
readable[Symbol.asyncIterator]()#

查看英文版参与翻译

Stability: 1 - Experimental
const fs = require('fs');

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const k of readable) {
    data += k;
  }
  console.log(data);
}

print(fs.createReadStream('file')).catch(console.log);

If the loop terminates with a break or a throw, the stream will be destroyed. In other terms, iterating over a stream will consume the stream fully. The stream will be read in chunks of size equal to the highWaterMark option. In the code example above, data will be in a single chunk if the file has less then 64kb of data because no highWaterMark option is provided to fs.createReadStream().

Duplex 流与 Transform 流#

stream.Duplex 类#

查看英文版参与翻译

Duplex 流是同时实现了 [Readable][] 和 [Writable][] 接口的流。

Duplex 流的实例包括了:

stream.Transform 类#

查看英文版参与翻译

变换流(Transform streams) 是一种 [Duplex][] 流。它的输出与输入是通过某种方式关联的。和所有 [Duplex][] 流一样,变换流同时实现了 [Readable][] 和 [Writable][] 接口。

变换流的实例包括:

transform.destroy([error])#

查看英文版参与翻译

销毁这个流,发射'error'事件。 调用这个之后,变换流会释放全部内部资源 实现者不应该重载此方法,而应该实现readable._destroyTransform的默认_destroy实现也发射'close'事件。

stream.finished(stream, callback)#

查看英文版参与翻译

  • stream <Stream> 一个可读或可写的流
  • callback <Function> 一个回调函数,可以带有一个错误信息参数,也可没有

使用此函数,以在一个流不再可读、可写或发生了错误、提前关闭等事件时获得通知。

const { finished } = require('stream');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
  if (err) {
    console.error('流发生错误', err);
  } else {
    console.log('流已读完');
  }
});

rs.resume(); // 将流读完

在处理流的提前销毁(如被抛弃的HTTP请求)等错误事件时特别有用,此时流不会触发 'end''finish' 事件。

finished API 也可做成承诺:

const finished = util.promisify(stream.finished);

const rs = fs.createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('流已读完');
}

run().catch(console.error);
rs.resume(); // 将流读完

stream.pipeline(...streams[, callback])#

查看英文版参与翻译

  • ...streams <Stream> 两个或多个要用管道连接的流
  • callback <Function> 一个回调函数,可以带有一个错误信息参数

该模块方法用于在多个流之间架设管道,可以自动传递错误和完成扫尾工作,并且可在管道架设完成时提供一个回调函数:

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');

// 使用 pipeline API 轻松连接多个流
// 并在管道完成时获得通知

// 使用pipeline高效压缩一个可能很大的tar文件:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('管道架设失败', err);
    } else {
      console.log('管道架设成功');
    }
  }
);

pipeline API 也可做成承诺:

const pipeline = util.promisify(stream.pipeline);

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz')
  );
  console.log('管道架设成功');
}

run().catch(console.error);

用于实现流的 API#

查看英文版参与翻译

stream模块API的设计是为了让JavaScript的原型继承模式可以简单的实现流。

首先,一个流开发者可能声明了一个JavaScript类并且继承四个基本流类中的一个(stream.Writeablestream.Readablestream.Duplex,或者stream.Transform),确保他们调用合适的父类构造函数:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    // ...
  }
}

新的流类必须实现一个或多个特定的方法,根据所创建的流类型,如下图所示:

用例 实现的方法
只读流 Readable _read
只写流 writable _write_writev_final
可读可写流 Duplex _read_write_writev_final
操作写数据,然后读结果 Transform _transform_flush_final

注意:实现流的代码里面不应该出现调用“public”方法的地方因为这些方法是给使用者使用的(流使用者部分的API所述)。这样做可能会导致使用流的应用程序代码产生不利的副作用。

Simplified Construction#

查看英文版参与翻译

对于许多简单的案例,它是有可能在不依赖继承的情况下创建流。这可以直接创建流实例,通过流基础类stream.Writablestream.Readablestream.Duplex,或者stream.Transform传入对象完成,对象包含合适的方法作为构造函数选项。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  }
});

Implementing a Writable Stream#

查看英文版参与翻译

这个stream.Writable类被用于实现可写流。

自定义可写流必须调用new stream.Writable([options])构造函数并且实现writable._write()方法。writable._writev()方法也是可以实现的。

Constructor: new stream.Writable([options])#

查看英文版参与翻译

例如:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    // Calls the stream.Writable() constructor
    super(options);
    // ...
  }
}

或者,使用ES6之前的语法来创建构造函数:

const { Writable } = require('stream');
const util = require('util');

function MyWritable(options) {
  if (!(this instanceof MyWritable))
    return new MyWritable(options);
  Writable.call(this, options);
}
util.inherits(MyWritable, Writable);

或者,使用简化的构造函数方法:

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  }
});

writable._write(chunk, encoding, callback)#

查看英文版参与翻译

  • chunk <Buffer> | <string> | <any> 要写的块。会一直作为缓冲区,除非decodeStrings选项设置为false或者流以对象模式运行。
  • encoding <string> 如果块是字符串,那么encoding就是该字符串的字符编码。 如果块是Buffer,或者是流在对象模式下运行,encoding可能被忽略。
  • callback <Function> 调用此函数(err参数可选)当块处理完成时。

所有可写流实现必须提供一个 writable._write() 方法将数据发送到底层资源。

注意:[Transform][] 流提供自己实现的writable._write()

注意:此函数不得直接由应用程序代码调用。 它应该由子类实现,并由内部的Writable类方法调用。

必须调用callback方法来表示写完成成功或失败,出现错误。callback第一个参数必须是Error对象如果调用失败,成功时为null

所有writable._write()被调用并且callback被调用将导致要缓冲的写入数据。 一旦调用callback,流将会执行'drain'事件。 如果想让流实现一次能够处理多个数据块,writable._writev()方法应该被实现。

如果在构造函数选项中设置decodeStrings属性,那么chunk可能是一个字符串而不是一个缓冲区,encodeing将会表示字符串的字符编码。 这是为了支持对某些字符串具有优化处理的实现数据编码。 如果decodeStrings属性显式设置为falseencoding参数可以安全地忽略,chunk将保持不变传递给.write()的对象。

writable._write()方法前缀为下划线,因为它是在定义它的类的内部,不应该直接调用用户程序。

writable._writev(chunks, callback)#

查看英文版参与翻译

  • chunks <Array> 要写的块 每个块都有以下格式:{chunk:...,encoding:...}
  • callback <Function> 一个回调函数(可选地带有一个错误参数)在提供的块的处理完成时被调用。

:此函数不得直接通过应用程序代码调用。 它应由子类实现,并由内部Writable进行调用类方法。

writable._writev()方法能够一次处理多个数据块的流除了writable._write()之外。如果实现,该方法将缓存的所有数据块写入队列。

writable._writev()方法前缀为下划线,因为它是定义它的类的内部,不应该由用户程序直接调用。

writable._destroy(err, callback)#

查看英文版参与翻译

通过 writable.destroy() 方法调用_destroy()。它可以被子类覆盖,但不能直接调用。

writable._final(callback)#

查看英文版参与翻译

  • callback <Function> 在完成写入所有剩余数据时调用该函数(err参数可选)。

_final()方法不能直接调用。 应该由子类负责实现,如果是,将仅可以由内部的Writable类方法进行调用。

这个可选的函数将在流关闭之前被调用, 直到callback回调函数执行完成才触发finish事件。这对于关闭资源或在流结束之前写入缓冲数据很有用。

Errors While Writing#

查看英文版参与翻译

建议在处理writable._write()writable._writev()方法期间发生的错误时传给回调函数的第一个参数来处理。这将导致Writable触发error事件。从writable._write()中抛出一个错误可能会导致意外和不一致的行为,具体取决于如何使用流。使用回调可确保对错误进行一致且可预测的处理。

const { Writable } = require('stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
});

一个可写流的例子#

查看英文版参与翻译

下面说明了一个相当简单(有点无意义)的可写流实现。虽然这个具体的可写流实例没有任何真正的特殊用途,但该示例说明了一个自定义流实例所需要的元素:

const { Writable } = require('stream');

class MyWritable extends Writable {
  constructor(options) {
    super(options);
    // ...
  }

  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
}

Decoding buffers in a Writable Stream#

查看英文版参与翻译

解码buffers是一个常见任务,比如用变换流处理字符串输入。 当用多字节字符编码方式(比如UTF-8)时,这是一个微不足道的过程。 下面的例子展示了如何用StringDecoder 和 [Writable][]解码多字节字符串。

const { Writable } = require('stream');
const { StringDecoder } = require('string_decoder');

class StringWritable extends Writable {
  constructor(options) {
    super(options);
    const state = this._writableState;
    this._decoder = new StringDecoder(state.defaultEncoding);
    this.data = '';
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk);
    }
    this.data += chunk;
    callback();
  }
  _final(callback) {
    this.data += this._decoder.end();
    callback();
  }
}

const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();

w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);

console.log(w.data); // currency: €

Implementing a Readable Stream#

查看英文版参与翻译

stream.Readable 类扩展并实现了[Readable][]。

用户实现的自定义可读流 必须 调用new stream.Readable([options]) 构造函数并且实现readable._read()方法。

new stream.Readable([options])#

查看英文版参与翻译

  • options <Object>
    • highWaterMark <number> 从底层资源读取数据并存储在内部缓冲区中的最大字节数。默认16384 (16kb), 或者 16对应objectMode流模式。
    • encoding <string> 指定解析的字符编码格式. 默认 为null
    • objectMode <boolean> 一个对象的流。 这意味着 stream.read(n) 返回的是一个单一的对象而不是n个字节的缓冲区。默认 false
    • read <Function>stream._read()方法的实现。
    • destroy <Function>stream._destroy() 方法的实现。

例如:

const { Readable } = require('stream');

class MyReadable extends Readable {
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
    // ...
  }
}

或者使用ES6语法:

const { Readable } = require('stream');
const util = require('util');

function MyReadable(options) {
  if (!(this instanceof MyReadable))
    return new MyReadable(options);
  Readable.call(this, options);
}
util.inherits(MyReadable, Readable);

或者使用简化的构造方法:

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    // ...
  }
});

readable._read(size)#

查看英文版参与翻译

  • size <number> 异步读取的字节数。

注意: 这个函数不能直接被应用程序代码调用。 它应由子类实现,并仅能由Readable对象内部方法调用。

所有实现可读流的实例必须实现readable._read() 方法去获得底层的数据资源。

readable._read() 被调用,如果读取的数据是可用的,应该在最开始的实现的时候使用this.push(dataChunk)方法将该数据推入读取队列。_read() 应该一直读取资源直到推送数据方法readable.push()返回false的时候停止。想再次调用_read()方法,需要再次往可读流里面push数据。

注意:一旦readable._read()方法被调用,只有在 readable.push()方法被调用之后,才能再次被调用。

size 可选参数。_read()方法是一个实现读取数据的单操作,设置size参数来确定要读取数据的大小。 其他的实现可能会忽略这个参数,只要数据可用就提供数据。 不需要等到stream.push(chunk)方法推入一定size的数据后才能调用。

readable._read()方法的前缀是一个下划线,因为它是在定义它的类的内部,不应该被直接调用用户程序。

readable._destroy(err, callback)#

查看英文版参与翻译

  • err <Error> 错误。
  • callback <Function> 回调函数,第一个参数为err参数。

_destroy()需通过readable.destroy()方法调用。它可以被子类覆盖,但不能直接调用。

readable.push(chunk[, encoding])#

查看英文版参与翻译

  • chunk <Buffer> | <Uint8Array> | <string> | <null> | <any> 压入读队列的数据块。 对于没有处在object mode的流来说,chunk必须是一个字符串,BufferUint8Array; 对object mode 的流来说,chunk可以使任何JavaScript值。
  • encoding <string> 字符串数据块的编码方式. 必须是可用的Buffer编码方式,例如'utf8''ascii'
  • 返回 <boolean> 如果多余的数据块可能会继续压入,那么返回true; 否则返回 false.

chunk是一个Buffer, Uint8Array或者string时, 这个数据块(chunk)会被添加到内部队列供使用者消费。 在没有数据可写入后,给chunk传了null发出流结束(EOF)的信号。

当可读流处在传输模式下,'data'事件触发时,可以通过 调用readable.read() 方法读出来数据,这数据是用readable.push()添加的。

readable.push()方法被设计得尽可能的灵活。 比如,当封装一个有'暂停/恢复'机制和带数据回调的底层source的时候, 那么这个底层的source可以被常规的可读流实例封装。就像下面的例子一样。

// source 是一个有readStop()和 readStart()方法的对象。
// 有数据就调`ondata`成员函数;
// 数据结束就调`onend`成员函数。

class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowlevelSourceObject();

    // Every time there's data, push it into the internal buffer.
    this._source.ondata = (chunk) => {
      // if push() returns false, then stop reading from source
      if (!this.push(chunk))
        this._source.readStop();
    };

    // When the source ends, push the EOF-signaling `null` chunk
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read will be called when the stream wants to pull more data in
  // the advisory size argument is ignored in this case.
  _read(size) {
    this._source.readStart();
  }
}

注意: readable.push()方法是为了让可读流实现者调用的, 而且只来自readable._read()方法内部。

Errors While Reading#

查看英文版参与翻译

建议在调用 readable._read() 方法时发生的错误应该执行触发 'error' 事件,而不是抛出异常错误。从readable._read()中抛出错误可能会导致意外的和不一致的行为,具体取决于流是以流还是暂停模式运行。 使用“错误”事件可确保一致且可预测的错误处理。

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    if (checkSomeErrorCondition()) {
      process.nextTick(() => this.emit('error', err));
      return;
    }
    // do some work
  }
});

一个数流的例子#

查看英文版参与翻译

以下是可读流的一个基本例子,触发数字1到1,000,000升序,然后结束

const { Readable } = require('stream');

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      const str = '' + i;
      const buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
}

Implementing a Duplex Stream#

查看英文版参与翻译

双工流(可读可写流)是可读流可写流的实现,例如TCP套接字连接。

因为JavaScript不支持多重继承,所以stream.Duplex类被扩展以实现双工流(而不是扩展stream.Readablestream.Writable类)。

注意。stream.Duplex类原型继承来自stream.Readable和寄生的stream.Writable,但是instanceof将会在这两个基础类上正确工作,由于stream.Writable覆盖了 Symbol.hasInstance方法。

自定义双工流必须通过new stream.Duplex([options])构造函数并实现readable._read()writable._write()方法。

new stream.Duplex(options)#

查看英文版参与翻译

  • options <Object> 传给可读和可写流的构造函数,还有如下字段:
    • allowHalfOpen <boolean> 默认是true。如果设置为false, 那么当读端停止时,写端自动停止。
    • readableObjectMode <boolean> 默认是 false。会为流的读端设置objectMode。如果 objectModetrue,那就没有任何用。
    • writableObjectMode <boolean> 默认是 false。会为流的写端设置objectMode。如果 objectModetrue,那就没有任何用。
    • readableHighWaterMark <number> 设置 highWaterMark 可读流的缓冲区大小。 如果已经设置 highWaterMarkreadableHighWaterMark不起作用。
    • writableHighWaterMark <number> 设置 highWaterMark 可写流缓冲区大小。如果设置了highWaterMarkwritableHighWaterMark不起作用。

例如:

const { Duplex } = require('stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
}

或者, 使用ES6之前的语法来创建构造函数:

const { Duplex } = require('stream');
const util = require('util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);

又或者, 用简化的构造函数:

const { Duplex } = require('stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  }
});

An Example Duplex Stream#

查看英文版参与翻译

下面是一个可读可写流包装了一个假定的可读可写的底层源对象, 尽管用了一个与Node.js流不兼容的API。

下面是一个简单的例子, 在一个可读可写流中,来的buffers通过[Writable][] 接口写入数据,再通过[Readable][]接口读回数据。

const { Duplex } = require('stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // The underlying source only deals with strings
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
}

尽管在一个对象实例中共存,读端和写端却是相互独立于彼此,这是可读可写流最为重要的一点。

Object Mode Duplex Streams#

查看英文版参与翻译

对可读可写流来说,objectMode可以通过readableObjectModewritableObjectMode选项 来分别设置读端和写端。

比如下面的例子。 创建了一个变换流(一种可读可写流)。 在写端接收JavaScript数字,在读端转换为16进制字符串。

const { Transform } = require('stream');

// All Transform streams are also Duplex Streams
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Coerce the chunk to a number if necessary
    chunk |= 0;

    // Transform the chunk into something else.
    const data = chunk.toString(16);

    // Push the data onto the readable queue.
    callback(null, '0'.repeat(data.length % 2) + data);
  }
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64

Implementing a Transform Stream#

查看英文版参与翻译

一个[Transform][]流是一种[Duplex][]流,输入经过[Transform][]流,做某种计算然后输出。 比如 zlib流和crypto流会做压缩,加密和解密数据。

注意: 输出流的大小,有多少数据包,到达时间都不一定非要和输入流一样。 比如,一个哈希流再输入结束时永远只会输出单个数据块; 而一个zlib流的输出,可能比输入大得多或小得多。

stream.Transform 类被扩展了,实现了一个[Transform][]流。

stream.Transform类最初继承自stream.Duplex,并且实现了它自己版本的writable._write()readable._read()方法。 一般地,变换流必须实现transform._transform() 方法; 而transform._flush() 方法是非必须的。

注意: 用变换流时要注意,如果读端输出没有被消费,那么往写数据可能会引起写端暂停。

new stream.Transform([options])#

查看英文版参与翻译

例如:

const { Transform } = require('stream');

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // ...
  }
}

或者,用ES6构造方法:

const { Transform } = require('stream');
const util = require('util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform);

又或者, 用简化构造方法:

const { Transform } = require('stream');

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  }
});

Events: 'finish' and 'end'#

查看英文版参与翻译

'finish'事件来自stream.Writable'end'事件来自stream.Readable类。

在调用了stream.end()并且stream._transform()处理了全部数据块之后, 'finish'事件触发。

transform._flush()中的回调函数被调用之后,所有数据已经输出,此时,'end'事件触发

transform._flush(callback)#

查看英文版参与翻译

  • callback <Function> 一个当剩余的数据被冲刷后被调用的回调函数(error参数和data可选)。

Note: 应用程序代码禁止直接调用这个函数。它应该由子类来实现,并且只能被内部可读流的类方法调用。

在某些情况下,转换操作需要在流的末尾发射一块额外的数据。例如,zlib 压缩流会存储一种优先用于压缩输出的内部状态。但是在流结束的时候,那段额外的数据需要被冲刷才能完成数据压缩。

自定义的 [Transform][] 实现,可以实现transform._flush()方法。在没有更多的要写的数据被消耗时,会调用这个方法,但是在发射'end' 事件之前会发出可读流结束的信号。

transform._flush() 实现里,readable.push()方法会在适当的时候调用零次或者多次。callback在冲刷操作完成的时候一定会被调用。

transform._flush() 方法前缀为下划线,因为它是在定义它的类的内部,绝不应该被用户程序直接调用。

transform._transform(chunk, encoding, callback)#

查看英文版参与翻译

  • chunk <Buffer> | <string> | <any> 被转换的数据块。它总是一个buffer除非在option中配置decodeStringfalse或者当前流处在object mode下。
  • encoding <string> 如果 chunk 是字符串,那么encoding就是该字符串的字符编码。如果块是Buffer,它是一个特殊的值'buffer',这种情况encoding可以被忽略。
  • callback <Function> 当块被处理完成时调用此函数(包含error和data参数)。

注意:此函数不得直接由应用程序代码调用。它应该由子类实现,并由内部的Readable类方法调用。

所有的变换流的执行必须提供一个_transform()方法接收输入并提供输出。transform._transform()的实现会处理写入的字节,做某种计算并输出,然后使用readable.push()方法把这个输出传递到可读流。

从一个单个输入数据块可能会调用零次或多次transform.push()方法,调用次数取决于每次把多少数据做为一个输出结果。

有可能从任意给定输入的数据块中没有产生任何输出。

callback 会在当前数据被完全消费之后调用。在处理过程输入的过程中如果出错了,第一个参数是一个错误对象,没有出错Error参数则为null。如果传递第二个参数,它会被转发到readable.push()中。就像下面的例子:

transform.prototype._transform = function(data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function(data, encoding, callback) {
  callback(null, data);
};

transform._transform() 方法前缀为下划线,因为它是定义在它的类的内部,不应该直接被用户程序调用。

transform._transform() 方法永远不能并行调用;流使用了队列机制,不论同步或者异步情况下,都必须先调用callback之后才能接收下一个数据。

Class: stream.PassThrough#

查看英文版参与翻译

stream.PassThrough类是一个极简[Transform][]流,只是把输入字节原样不动直接输出。 一开始的目的是用来做例子和测试,但是有时也作为某些新颖的流的基本组成部分起作用。

其他注意事项#

Compatibility with Older Node.js Versions#

查看英文版参与翻译

在v0.10之前的Node.js版本中,可读流接口更简单,但功能更弱,功能更少。

  • 相比需要等待stream.read() 方法调用之后才触发,'data' 事件自己会立即触发。 需要执行一定量工作来决定如何处理数据的应用程序需要将读取数据存储到缓冲区中,以便数据不会丢失。
  • The stream.pause() 方法是建议性的,而不是保证。 这意味着即使流处于暂停状态,仍然需要准备接收data事件。

在Node.js v0.10中,添加了[Readable][]类。 为了向后兼容较旧的Node.js程序,当添加data事件处理程序或调用stream.resume()方法时,可读流将切换到“流动模式”。 结果是,即使不使用新的stream.read()方法和'readable'事件,也不必担心丢失'data'块。

虽然大多数应用程序将继续正常工作,但在以下情况下会引入极端情况:

  • 未添加'data' 事件监听。
  • 未调用stream.resume()方法。
  • 通过管道没有传送到任何可写的目的地。

例如,请考虑以下代码:

// WARNING!  BROKEN!
net.createServer((socket) => {

  // we add an 'end' method, but never consume the data
  socket.on('end', () => {
    // It will never get here.
    socket.end('The message was received but was not processed.\n');
  });

}).listen(1337);

在v0.10之前的Node.js版本中,传入的消息数据将会是简单地丢弃。 但是,在Node.js v0.10及更高版本中,套接字仍然存在永远停顿。

在这种情况下的解决方法是调用stream.resume() 开始读取数据:

// Workaround
net.createServer((socket) => {

  socket.on('end', () => {
    socket.end('The message was received but was not processed.\n');
  });

  // start the flow of data, discarding it.
  socket.resume();

}).listen(1337);

除了新的可读流切换到流动模式之外,pre-v0.10风格的流可以使用包装在Readable类中readable.wrap() 方法。

readable.read(0)#

查看英文版参与翻译

在某些情况下,需要有机制来触发刷新基础可读流, 而没有实际消费任何数据。在这种情况下,可以调用readable.read(0),返回null

如果内部读取缓冲区低于highWaterMark,并且该流目前未读取,则调用stream.read(0)将触发调用底层 stream._read()方法。

虽然大多数应用程序几乎都不需要这样做,但Node.js中会出现这种情况,尤其是在可读流类内部。

readable.push('')#

查看英文版参与翻译

不推荐使用readable.push('')

向一个不处在object mode的流压入一个BufferUint8Array0字节字符串,会产生有趣的副作用。 因为调用了readable.push(),所以会停止读进程。 然而,因为参数是一个空字符串,没有可读的数据添加到可读buffer, 所以没有可以供用户消费的数据。

highWaterMark discrepancy after calling readable.setEncoding()#

查看英文版参与翻译

调用 readable.setEncoding() 会改变 highWaterMark 属性在非对象模式中的作用。

一般而言,我们直接将缓冲器存储的 字节数highWaterMark 相比较。然而在调用 setEncoding() 之后,程序会将缓冲器中存储的 字符数highWaterMark 相比较。

在通常情况下,如使用 latin1ascii 时,这不成问题。但在处理可能含有多字节字符的字符串时,此行为需要当心。