流中的背压

¥Backpressuring in Streams

在数据处理过程中会发生一个称为 backpressure 的一般问题,它描述了数据传输期间缓冲区后面的数据累积。当传输的接收端具有复杂的操作或由于某种原因而速度较慢时,来自传入源的数据往往会像堵塞一样累积。

¥There is a general problem that occurs during data handling called backpressure and describes a buildup of data behind a buffer during data transfer. When the receiving end of the transfer has complex operations, or is slower for whatever reason, there is a tendency for data from the incoming source to accumulate, like a clog.

要解决这个问题,必须有一个委派系统来确保数据从一个源顺利流向另一个源。不同的社区已经针对他们的程序独特地解决了这个问题,Unix 管道和 TCP 套接字就是很好的例子,通常被称为流控制。在 Node.js 中,流是采用的解决方案。

¥To solve this problem, there must be a delegation system in place to ensure a smooth flow of data from one source to another. Different communities have resolved this issue uniquely to their programs, Unix pipes and TCP sockets are good examples of this, and are often referred to as flow control. In Node.js, streams have been the adopted solution.

本指南的目的是进一步详细说明什么是背压,以及流在 Node.js 源代码中如何解决这个问题。指南的第二部分将介绍建议的最佳实践,以确保你的应用代码在实现流时是安全和优化的。

¥The purpose of this guide is to further detail what backpressure is, and how exactly streams address this in Node.js' source code. The second part of the guide will introduce suggested best practices to ensure your application's code is safe and optimized when implementing streams.

我们假设你对 Node.js 中 backpressureBufferEventEmitters 的一般定义有一点熟悉,并且对 Stream 有一些经验。如果你还没有阅读这些文档,首先查看 API 文档是个不错的主意,因为它将有助于在阅读本指南时扩展你的理解。

¥We assume a little familiarity with the general definition of backpressure, Buffer, and EventEmitters in Node.js, as well as some experience with Stream. If you haven't read through those docs, it's not a bad idea to take a look at the API documentation first, as it will help expand your understanding while reading this guide.

数据处理问题

¥The Problem with Data Handling

在计算机系统中,数据通过管道、套接字和信号从一个进程传输到另一个进程。在 Node.js 中,我们发现了一个名为 Stream 的类似机制。流很棒!它们为 Node.js 做了很多事情,几乎每个内部代码库都使用了该模块。作为开发者,我们也非常鼓励你使用它们!

¥In a computer system, data is transferred from one process to another through pipes, sockets, and signals. In Node.js, we find a similar mechanism called Stream. Streams are great! They do so much for Node.js and almost every part of the internal codebase utilizes that module. As a developer, you are more than encouraged to use them too!

const readline = require('node:readline');

// process.stdin and process.stdout are both instances of Streams.
const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

rl.question('Why should you use streams? ', answer => {
  console.log(`Maybe it's ${answer}, maybe it's because they are awesome! :)`);

  rl.close();
});

通过比较 Node.js 的 Stream 实现中的内部系统工具,可以很好地说明为什么通过流实现的背压机制是一项出色的优化。

¥A good example of why the backpressure mechanism implemented through streams is a great optimization can be demonstrated by comparing the internal system tools from Node.js' Stream implementation.

在一种情况下,我们将获取一个大文件(大约~9 GB)并使用熟悉的 zip(1) 工具对其进行压缩。

¥In one scenario, we will take a large file (approximately ~9 GB) and compress it using the familiar zip(1) tool.

zip The.Matrix.1080p.mkv

虽然这需要几分钟才能完成,但在另一个 shell 中,我们可以运行一个脚本,该脚本采用 Node.js 的模块 zlib,该模块封装了另一个压缩工具 gzip(1)

¥While that will take a few minutes to complete, in another shell we may run a script that takes Node.js' module zlib, that wraps around another compression tool, gzip(1).

const gzip = require('node:zlib').createGzip();
const fs = require('node:fs');

const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');

inp.pipe(gzip).pipe(out);

要测试结果,请尝试打开每个压缩文件。zip(1) 工具压缩的文件将通知你文件已损坏,而 Stream 完成的压缩将无错误地解压缩。

¥To test the results, try opening each compressed file. The file compressed by the zip(1) tool will notify you the file is corrupt, whereas the compression finished by Stream will decompress without error.

在此示例中,我们使用 .pipe() 将数据源从一端获取到另一端。但是,请注意没有附加适当的错误处理程序。如果无法正确接收数据块,Readable 源或 gzip 流将不会被销毁。pump 是一个实用工具,如果其中一个流失败或关闭,它会正确地销毁管道中的所有流,在这种情况下它是必备的!

¥In this example, we use .pipe() to get the data source from one end to the other. However, notice there are no proper error handlers attached. If a chunk of data were to fail to be properly received, the Readable source or gzip stream will not be destroyed. pump is a utility tool that would properly destroy all the streams in a pipeline if one of them fails or closes, and is a must-have in this case!

pump 仅对 Node.js 8.x 或更早版本是必需的,对于 Node.js 10.x 或更高版本,引入了 pipeline 来替代 pump。这是一种模块方法,用于在流之间进行管道传输,转发错误并在管道完成时正确清理并提供回调。

¥pump is only necessary for Node.js 8.x or earlier, as for Node.js 10.x or later version, pipeline is introduced to replace for pump. This is a module method to pipe between streams forwarding errors and properly cleaning up and providing a callback when the pipeline is complete.

以下是使用管道的示例:

¥Here is an example of using pipeline:

const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  err => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

你还可以使用 stream/promises 模块将管道与 async/await 一起使用:

¥You can also use the stream/promises module to use pipeline with async / await:

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  try {
    await pipeline(
      fs.createReadStream('The.Matrix.1080p.mkv'),
      zlib.createGzip(),
      fs.createWriteStream('The.Matrix.1080p.mkv.gz')
    );
    console.log('Pipeline succeeded');
  } catch (err) {
    console.error('Pipeline failed', err);
  }
}

数据太多,太快

¥Too Much Data, Too Quickly

在某些情况下,Readable 流可能会过快地将数据提供给 Writable — 远远超过消费者可以处理的速度!

¥There are instances where a Readable stream might give data to the Writable much too quickly — much more than the consumer can handle!

当发生这种情况时,消费者将开始将所有数据块排队以供以后使用。写入队列将越来越长,因此必须将更多数据保存在内存中,直到整个过程完成。

¥When that occurs, the consumer will begin to queue all the chunks of data for later consumption. The write queue will get longer and longer, and because of this more data must be kept in memory until the entire process has been completed.

写入磁盘比从磁盘读取慢得多,因此,当我们尝试压缩文件并将其写入硬盘时,将发生背压,因为写入磁盘将无法跟上读取的速度。

¥Writing to a disk is a lot slower than reading from a disk, thus, when we are trying to compress a file and write it to our hard disk, backpressure will occur because the write disk will not be able to keep up with the speed from the read.

// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);

这就是背压机制如此重要的原因。如果没有背压系统,该进程将耗尽系统的内存,从而有效地减慢其他进程的速度,并垄断系统的很大一部分,直到完成。

¥This is why a backpressure mechanism is important. If a backpressure system was not present, the process would use up your system's memory, effectively slowing down other processes, and monopolizing a large part of your system until completion.

这会导致一些事情:

¥This results in a few things:

  • 减慢所有其他当前进程的速度

    ¥Slowing down all other current processes

  • 一个工作量很大的垃圾收集器

    ¥A very overworked garbage collector

  • 内存耗尽

    ¥Memory exhaustion

在以下示例中,我们将取出 .write() 函数的 返回值 并将其更改为 true,这有效地禁用了 Node.js 核心中的背压支持。在任何对 'modified' 二进制文件的引用中,我们都在讨论运行没有 return ret; 行的 node 二进制文件,而是使用替换的 return true;

¥In the following examples, we will take out the return value of the .write() function and change it to true, which effectively disables backpressure support in Node.js core. In any reference to 'modified' binary, we are talking about running the node binary without the return ret; line, and instead with the replaced return true;.

垃圾收集的过度拖累

¥Excess Drag on Garbage Collection

让我们看一个快速基准测试。使用上面的相同示例,我们进行了几次计时试验,以获得两个二进制文件的中位时间。

¥Let's take a look at a quick benchmark. Using the same example from above, we ran a few time trials to get a median time for both binaries.

   trial (#)  | `node` binary (ms) | modified `node` binary (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
average time: |      55299         |           55975

两者都需要大约一分钟才能运行,所以没有太大的区别,但让我们仔细看看以确认我们的怀疑是否正确。我们使用 Linux 工具 dtrace 来评估 V8 垃圾收集器的情况。

¥Both take around a minute to run, so there's not much of a difference at all, but let's take a closer look to confirm whether our suspicions are correct. We use the Linux tool dtrace to evaluate what's happening with the V8 garbage collector.

GC(垃圾收集器)测量的时间表示垃圾收集器完成的单次扫描的完整周期的间隔:

¥The GC (garbage collector) measured time indicates the intervals of a full cycle of a single sweep done by the garbage collector:

approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1

         *             *           *

         *             *           *

         *             *           *

      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

虽然这两个进程的启动方式相同,并且似乎以相同的速率运行 GC,但很明显,在正常工作的背压系统到位几秒钟后,它会将 GC 负载分散到 4-8 毫秒的一致间隔内,直到数据传输结束。

¥While the two processes start the same and seem to work the GC at the same rate, it becomes evident that after a few seconds with a properly working backpressure system in place, it spreads the GC load across consistent intervals of 4-8 milliseconds until the end of the data transfer.

但是,当没有背压系统时,V8 垃圾收集开始拖延。称为 GC 的正常二进制文件在一分钟内大约触发 75 次,而修改后的二进制文件仅触发 36 次。

¥However, when a backpressure system is not in place, the V8 garbage collection starts to drag out. The normal binary called the GC fires approximately 75 times in a minute, whereas, the modified binary fires only 36 times.

这是由于内存使用量不断增长而缓慢而逐渐积累的债务。随着数据的传输,如果没有背压系统,每个块传输都会使用更多的内存。

¥This is the slow and gradual debt accumulating from growing memory usage. As data gets transferred, without a backpressure system in place, more memory is being used for each chunk transfer.

分配的内存越多,GC 一次扫描需要处理的内存就越多。扫描范围越大,GC 就越需要决定可以释放什么,并且在更大的内存空间中扫描分离指针将消耗更多的计算能力。

¥The more memory that is being allocated, the more the GC has to take care of in one sweep. The bigger the sweep, the more the GC needs to decide what can be freed up, and scanning for detached pointers in a larger memory space will consume more computing power.

内存耗尽

¥Memory Exhaustion

为了确定每个二进制文件的内存消耗,我们分别使用 /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js 对每个进程进行计时。

¥To determine the memory consumption of each binary, we've clocked each process with /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js individually.

这是普通二进制文件的输出:

¥This is the output on the normal binary:

Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

虚拟内存占用的最大字节大小约为 87.81 mb。

¥The maximum byte size occupied by virtual memory turns out to be approximately 87.81 mb.

现在更改 .write() 函数的 返回值,我们得到:

¥And now changing the return value of the .write() function, we get:

Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

虚拟内存占用的最大字节大小约为 1.52 gb。

¥The maximum byte size occupied by virtual memory turns out to be approximately 1.52 gb.

如果没有适当的流来委托背压,则分配的内存空间将大一个数量级 - 同一过程之间的巨大差异!

¥Without streams in place to delegate the backpressure, there is an order of magnitude greater of memory space being allocated - a huge margin of difference between the same process!

此实验展示了 Node.js 的背压机制对你的计算系统的优化和成本效益。现在,让我们分析一下它的工作原理!

¥This experiment shows how optimized and cost-effective Node.js' backpressure mechanism is for your computing system. Now, let's do a breakdown of how it works!

背压如何解决这些问题?

¥How Does Backpressure Resolve These Issues?

有不同的函数可以将数据从一个进程传输到另一个进程。在 Node.js 中,有一个名为 .pipe() 的内部内置函数。你也可以使用那里的 其他软件包!但最终,在此过程的基本层面上,我们有两个独立的组件:数据源和消费者。

¥There are different functions to transfer data from one process to another. In Node.js, there is an internal built-in function called .pipe(). There are other packages out there you can use too! Ultimately though, at the basic level of this process, we have two separate components: the source of the data and the consumer.

当从源调用 .pipe() 时,它会向消费者触发信号,表示有数据要传输。pipe 函数有助于为事件触发器设置适当的背压闭包。

¥When .pipe() is called from the source, it signals to the consumer that there is data to be transferred. The pipe function helps to set up the appropriate backpressure closures for the event triggers.

在 Node.js 中,源是 Readable 流,消费者是 Writable 流(这两者可以与 DuplexTransform 流互换,但这超出了本指南的范围)。

¥In Node.js the source is a Readable stream and the consumer is the Writable stream (both of these may be interchanged with a Duplex or a Transform stream, but that is out-of-scope for this guide).

触发背压的时刻可以精确缩小到 Writable.write() 函数的返回值。当然,这个返回值由几个条件决定。

¥The moment that backpressure is triggered can be narrowed exactly to the return value of a Writable's .write() function. This return value is determined by a few conditions, of course.

在任何数据缓冲区已超过 highWaterMark 或写入队列当前繁忙的情况下,.write() 将返回 false

¥In any scenario where the data buffer has exceeded the highWaterMark or the write queue is currently busy, .write() will return false.

当返回 false 值时,背压系统启动。它将暂停传入的 Readable 流发送任何数据并等待消费者再次准备就绪。一旦数据缓冲区清空,就会触发 'drain' 事件并恢复传入的数据流。

¥When a false value is returned, the backpressure system kicks in. It will pause the incoming Readable stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a 'drain' event will be emitted and resume the incoming data flow.

一旦队列完成,背压将允许再次发送数据。正在使用的内存空间将自行释放,并为下一批数据做好准备。

¥Once the queue is finished, backpressure will allow data to be sent again. The space in memory that was being used will free itself up and prepare for the next batch of data.

这实际上允许在任何给定时间为 .pipe() 函数使用固定数量的内存。不会有内存泄漏,也不会有无限缓冲,垃圾收集器只需处理内存中的一个区域!

¥This effectively allows a fixed amount of memory to be used at any given time for a .pipe() function. There will be no memory leakage, and no infinite buffering, and the garbage collector will only have to deal with one area in memory!

因此,如果背压如此重要,为什么你(可能)没有听说过它?嗯,答案很简单:Node.js 会自动为你完成所有这些工作。

¥So, if backpressure is so important, why have you (probably) not heard of it? Well, the answer is simple: Node.js does all of this automatically for you.

太棒了!但当我们试图了解如何实现自定义流时,情况就不那么好了。

¥That's so great! But also not so great when we are trying to understand how to implement our custom streams.

在大多数机器中,有一个字节大小决定缓冲区何时已满(这会因机器而异)。Node.js 允许你设置自定义 highWaterMark,但通常默认设置为 16kb(16384,或 objectMode 流的 16)。在你可能想要提高该值的情况下,请去做,但要谨慎行事!

¥In most machines, there is a byte size that determines when a buffer is full (which will vary across different machines). Node.js allows you to set your custom highWaterMark, but commonly, the default is set to 16kb (16384, or 16 for objectMode streams). In instances where you might want to raise that value, go for it, but do so with caution!

.pipe() 的生命周期

¥Lifecycle of .pipe()

为了更好地理解背压,这里是 Readable 流从 pipedWritable 流的生命周期的流程图:

¥To achieve a better understanding of backpressure, here is a flow-chart on the lifecycle of a Readable stream being piped into a Writable stream:

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

如果你正在设置管道以将几个流链接在一起来操作数据,则你很可能会实现 Transform 流。

¥If you are setting up a pipeline to chain together a few streams to manipulate your data, you will most likely be implementing Transform stream.

在这种情况下,你的 Readable 流的输出将进入 Transform 并通过管道传输到 Writable

¥In this case, your output from your Readable stream will enter in the Transform and will pipe into the Writable.

Readable.pipe(Transformable).pipe(Writable);

将自动应用背压,但请注意,Transform 流的传入和传出 highWaterMark 都可能被操纵,并会影响背压系统。

¥Backpressure will be automatically applied, but note that both the incoming and outgoing highWaterMark of the Transform stream may be manipulated and will affect the backpressure system.

背压指南

¥Backpressure Guidelines

Node.js v0.10 以来,Stream 类提供了通过使用这些相应函数(._read()._write())的下划线版本来修改 .read().write() 的行为的能力。

¥Since Node.js v0.10, the Stream class has offered the ability to modify the behavior of the .read() or .write() by using the underscore version of these respective functions (._read() and ._write()).

有针对 实现可读流实现可写流 的指导方针。我们假设你已经阅读过这些内容,下一节将更深入地介绍。

¥There are guidelines documented for implementing Readable streams and implementing Writable streams. We will assume you've read these over, and the next section will go a little bit more in-depth.

实现自定义流时要遵守的规则

¥Rules to Abide By When Implementing Custom Streams

流的黄金法则是始终尊重背压。什么是最佳实践,什么是非矛盾的实践。只要你小心避免与内部背压支持相冲突的行为,你就可以确保你遵循了良好的做法。

¥The golden rule of streams is to always respect backpressure. What constitutes as best practice is non-contradictory practice. So long as you are careful to avoid behaviors that conflict with internal backpressure support, you can be sure you're following good practice.

一般来说,

¥In general,

  1. 如果没有要求,切勿使用 .push()

    ¥Never .push() if you are not asked.

  2. 切勿在它返回 false 后调用 .write(),而是等待 'drain'。

    ¥Never call .write() after it returns false but wait for 'drain' instead.

  3. 流在不同的 Node.js 版本和你使用的库之间发生变化。小心并测试事物。

    ¥Streams changes between different Node.js versions, and the library you use. Be careful and test things.

关于第 3 点,用于构建浏览器流的一个非常有用的包是 readable-stream。Rodd Vagg 编写了一个 很棒的博客文章,描述了这个库的实用性。简而言之,它为 Readable 流提供了一种自动优雅降级,并支持旧版本的浏览器和 Node.js。

¥In regards to point 3, an incredibly useful package for building browser streams is readable-stream. Rodd Vagg has written a great blog post describing the utility of this library. In short, it provides a type of automated graceful degradation for Readable streams, and supports older versions of browsers and Node.js.

可读流特定规则

¥Rules specific to Readable Streams

到目前为止,我们已经研究了 .write() 如何影响背压,并将重点放在了 Writable 流上。由于 Node.js 的功能,数据在技术上从 Readable 流向 Writable。但是,正如我们在任何数据、物质或能量传输中观察到的那样,源与目的地同样重要,而 Readable 流对于如何处理背压至关重要。

¥So far, we have taken a look at how .write() affects backpressure and have focused much on the Writable stream. Because of Node.js' functionality, data is technically flowing downstream from Readable to Writable. However, as we can observe in any transmission of data, matter, or energy, the source is just as important as the destination, and the Readable stream is vital to how backpressure is handled.

这两个过程都依赖彼此进行有效通信,如果 Readable 忽略了 Writable 流要求它停止发送数据,那么它就会像 .write() 的返回值不正确时一样成问题。

¥Both these processes rely on one another to communicate effectively, if the Readable ignores when the Writable stream asks for it to stop sending in data, it can be just as problematic as when the .write()'s return value is incorrect.

因此,除了尊重 .write() 返回之外,我们还必须尊重 ._read() 方法中使用的 .push() 的返回值。如果 .push() 返回 false 值,则流将停止从源读取。否则,它将继续不间断运行。

¥So, as well as respecting the .write() return, we must also respect the return value of .push() used in the ._read() method. If .push() returns a false value, the stream will stop reading from the source. Otherwise, it will continue without pause.

以下是使用 .push() 的不良做法的示例:

¥Here is an example of bad practice using .push():

// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class MyReadable extends Readable {
  _read(size) {
    let chunk;
    while (null !== (chunk = getNextChunk())) {
      this.push(chunk);
    }
  }
}

此外,从自定义流之外来看,忽略背压存在陷阱。在这个良好实践的反例中,应用的代码在数据可用时强制通过(由 'data' 事件 触发信号):

¥Additionally, from outside the custom stream, there are pitfalls to ignoring backpressure. In this counter-example of good practice, the application's code forces data through whenever it is available (signaled by the 'data' event):

// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data', data => writable.write(data));

以下是将 .push() 与可读流结合使用的示例。

¥Here's an example of using .push() with a Readable stream.

const { Readable } = require('node:stream');

// Create a custom Readable stream
const myReadableStream = new Readable({
  objectMode: true,
  read(size) {
    // Push some data onto the stream
    this.push({ message: 'Hello, world!' });
    this.push(null); // Mark the end of the stream
  },
});

// Consume the stream
myReadableStream.on('data', chunk => {
  console.log(chunk);
});

// Output:
// { message: 'Hello, world!' }

在此示例中,我们创建了一个自定义的 Readable 流,使用 .push() 将单个对象推送到流上。当流准备好使用数据时,会调用 ._read() 方法,在这种情况下,我们立即将一些数据推送到流上,并通过推送 null 来标记流的结尾。

¥In this example, we create a custom Readable stream that pushes a single object onto the stream using .push(). The ._read() method is called when the stream is ready to consume data, and in this case, we immediately push some data onto the stream and mark the end of the stream by pushing null.

然后,我们通过监听 'data' 事件并记录推送到流上的每个数据块来使用该流。在这种情况下,我们只将单个数据块推送到流上,因此我们只看到一条日志消息。

¥We then consume the stream by listening for the 'data' event and logging each chunk of data that is pushed onto the stream. In this case, we only push a single chunk of data onto the stream, so we only see one log message.

特定规则可写流

¥Rules specific to Writable Streams

回想一下,.write() 可能会根据某些条件返回 true 或 false。幸运的是,在构建我们自己的 Writable 流时,stream state machine 将处理我们的回调并确定何时处理背压并为我们优化数据流。

¥Recall that a .write() may return true or false dependent on some conditions. Luckily for us, when building our own Writable stream, the stream state machine will handle our callbacks and determine when to handle backpressure and optimize the flow of data for us.

但是,当我们想直接使用 Writable 时,我们必须尊重 .write() 的返回值并密切注意这些条件:

¥However, when we want to use a Writable directly, we must respect the .write() return value and pay close attention to these conditions:

  • 如果写入队列繁忙,.write() 将返回 false。

    ¥If the write queue is busy, .write() will return false.

  • 如果数据块太大,.write() 将返回 false(限制由变量 highWaterMark 指示)。

    ¥If the data chunk is too large, .write() will return false (the limit is indicated by the variable, highWaterMark).

// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) callback();
    else if (chunk.toString().indexOf('b') >= 0) callback();
    callback();
  }
}

// The proper way to write this would be:
if (chunk.contains('a')) return callback();
if (chunk.contains('b')) return callback();
callback();

在实现 ._writev() 时,还有一些需要注意的事项。该函数与 .cork() 耦合,但在编写时有一个常见错误:

¥There are also some things to look out for when implementing ._writev(). The function is coupled with .cork(), but there is a common mistake when writing:

// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();

ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();

// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);

ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);

// As a global function.
function doUncork(stream) {
  stream.uncork();
}

.cork() 可以根据需要调用任意多次,我们只需要小心地调用 .uncork() 相同的次数即可使其再次流动。

¥.cork() can be called as many times as we want, we just need to be careful to call .uncork() the same amount of times to make it flow again.

结论

¥Conclusion

流是 Node.js 中经常使用的模块。它们对于内部结构和开发者来说都很重要,因为它们可以扩展和连接 Node.js 模块生态系统。

¥Streams are an often-used module in Node.js. They are important to the internal structure, and for developers, to expand and connect across the Node.js modules ecosystem.

希望你现在能够排除故障并安全地编写你自己的 WritableReadable 流,同时考虑到背压,并与同事和朋友分享你的知识。

¥Hopefully, you will now be able to troubleshoot and safely code your own Writable and Readable streams with backpressure in mind, and share your knowledge with colleagues and friends.

在使用 Node.js 构建应用时,请务必阅读有关 Stream 的其他 API 函数的更多信息,以帮助改进和释放你的流式功能。

¥Be sure to read up more on Stream for other API functions to help improve and unleash your streaming capabilities when building an application with Node.js.