跳到内容

流中的背压

🌐 Backpressuring in Streams

在数据处理过程中会出现一个通用问题,称为backpressure,它描述了在数据传输过程中缓冲区后面的数据积累。当传输的接收端进行复杂操作,或由于某种原因速度较慢时,来自输入源的数据往往会积累,就像堵塞一样。

🌐 There is a general problem that occurs during data handling called backpressure and describes a buildup of data behind a buffer during data transfer. When the receiving end of the transfer has complex operations, or is slower for whatever reason, there is a tendency for data from the incoming source to accumulate, like a clog.

要解决这个问题,必须建立一个委托系统,以确保数据能够从一个来源顺利流向另一个来源。不同的社区根据各自的程序独特地解决了这个问题,Unix 管道和 TCP 套接字就是很好的例子,这通常被称为 _ 流控制 _。在 Node.js 中,流(streams)已经成为被采纳的解决方案。

🌐 To solve this problem, there must be a delegation system in place to ensure a smooth flow of data from one source to another. Different communities have resolved this issue uniquely to their programs, Unix pipes and TCP sockets are good examples of this, and are often referred to as flow control. In Node.js, streams have been the adopted solution.

本指南的目的是进一步详细说明什么是背压,以及流在 Node.js 源代码中如何处理它。指南的第二部分将介绍一些建议的最佳实践,以确保在实现流时你的应用代码安全且经过优化。

🌐 The purpose of this guide is to further detail what backpressure is, and how exactly streams address this in Node.js' source code. The second part of the guide will introduce suggested best practices to ensure your application's code is safe and optimized when implementing streams.

我们假设读者对 Node.js 中 backpressureBufferEventEmitters 的一般定义有一些了解,并且对 Stream 有一定的使用经验。如果你还没有阅读过这些文档,先看看 API 文档是个不错的选择,这将有助于在阅读本指南时加深你的理解。

🌐 We assume a little familiarity with the general definition of backpressure, Buffer, and EventEmitters in Node.js, as well as some experience with Stream. If you haven't read through those docs, it's not a bad idea to take a look at the API documentation first, as it will help expand your understanding while reading this guide.

数据处理的问题

🌐 The Problem with Data Handling

在计算机系统中,数据通过管道、套接字和信号在不同进程之间传输。在 Node.js 中,我们可以找到一个类似的机制,称为 Stream。Streams 非常棒!它们对 Node.js 帮助很大,几乎内部代码库的每一部分都在使用这个模块。作为开发者,你也非常被鼓励去使用它们!

🌐 In a computer system, data is transferred from one process to another through pipes, sockets, and signals. In Node.js, we find a similar mechanism called Stream. Streams are great! They do so much for Node.js and almost every part of the internal codebase utilizes that module. As a developer, you are more than encouraged to use them too!

const  = ('node:readline');

// process.stdin and process.stdout are both instances of Streams.
const  = .({
  : .,
  : .,
});

.('Why should you use streams? ',  => {
  .(`Maybe it's ${}, maybe it's because they are awesome! :)`);

  .();
});

一个很好的例子可以说明为什么通过流实现的背压机制是一种很好的优化,那就是通过比较 Node.js Stream 实现的内部系统工具。

🌐 A good example of why the backpressure mechanism implemented through streams is a great optimization can be demonstrated by comparing the internal system tools from Node.js' Stream implementation.

在一种场景中,我们将取一个大文件(大约 ~9 GB)并使用熟悉的 zip(1) 工具进行压缩。

🌐 In one scenario, we will take a large file (approximately ~9 GB) and compress it using the familiar zip(1) tool.

zip The.Matrix.1080p.mkv

虽然这需要几分钟才能完成,但在另一个终端中,我们可以运行一个脚本,该脚本使用 Node.js 的模块 zlib,它封装了另一个压缩工具 gzip(1).

🌐 While that will take a few minutes to complete, in another shell we may run a script that takes Node.js' module zlib, that wraps around another compression tool, gzip(1).

const  = ('node:fs');
const  = ('node:zlib').();

const  = .('The.Matrix.1080p.mkv');
const  = .('The.Matrix.1080p.mkv.gz');

.().();

要测试结果,请尝试打开每个压缩文件。使用 zip(1) 工具压缩的文件会提示文件已损坏,而使用 Stream 完成的压缩则可以正常解压,无错误发生。

🌐 To test the results, try opening each compressed file. The file compressed by the zip(1) tool will notify you the file is corrupt, whereas the compression finished by Stream will decompress without error.

在这个例子中,我们使用 .pipe() 将数据源从一端传输到另一端。然而,请注意这里没有附加适当的错误处理程序。如果某个数据块未能被正确接收,Readable 源或 gzip 流将不会被销毁。pump 是一个实用工具,如果管道中的某个流失败或关闭,它会正确地销毁管道中的所有流,在这种情况下是必备的!

pump 仅在 Node.js 8.x 或更早版本中才需要,对于 Node.js 10.x 或更高版本,引入了 pipeline 来取代 pump。这是一个在流之间传递、转发错误、正确清理并在管道完成时提供回调的模块方法。

以下是使用管道的示例:

🌐 Here is an example of using pipeline:

const  = ('node:fs');
const {  } = ('node:stream');
const  = ('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

(
  .('The.Matrix.1080p.mkv'),
  .(),
  .('The.Matrix.1080p.mkv.gz'),
   => {
    if () {
      .('Pipeline failed', );
    } else {
      .('Pipeline succeeded');
    }
  }
);

你也可以使用 stream/promises 模块来配合 async / await 使用 pipeline:

🌐 You can also use the stream/promises module to use pipeline with async / await:

const  = ('node:fs');
const {  } = ('node:stream/promises');
const  = ('node:zlib');

async function () {
  try {
    await (
      .('The.Matrix.1080p.mkv'),
      .(),
      .('The.Matrix.1080p.mkv.gz')
    );
    .('Pipeline succeeded');
  } catch () {
    .('Pipeline failed', );
  }
}

信息量太大,来得太快

🌐 Too Much Data, Too Quickly

有些情况下,Readable 流可能会向 Writable 提供数据太快 - 远超过消费者能够处理的速度!

🌐 There are instances where a Readable stream might give data to the Writable much too quickly — much more than the consumer can handle!

当这种情况发生时,消费者将开始将所有数据块排队,以便稍后使用。写入队列会越来越长,因此必须在内存中保留更多的数据,直到整个过程完成为止。

🌐 When that occurs, the consumer will begin to queue all the chunks of data for later consumption. The write queue will get longer and longer, and because of this more data must be kept in memory until the entire process has been completed.

写入磁盘比从磁盘读取要慢得多,因此,当我们尝试压缩文件并将其写入硬盘时,会出现回压,因为写入磁盘无法跟上读取的速度。

🌐 Writing to a disk is a lot slower than reading from a disk, thus, when we are trying to compress a file and write it to our hard disk, backpressure will occur because the write disk will not be able to keep up with the speed from the read.

// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);

这就是为什么背压机制很重要。如果没有背压系统,进程会消耗系统的内存,实际上会减慢其他进程的速度,并占用系统的大部分资源,直到完成。

🌐 This is why a backpressure mechanism is important. If a backpressure system was not present, the process would use up your system's memory, effectively slowing down other processes, and monopolizing a large part of your system until completion.

这会导致一些事情:

🌐 This results in a few things:

  • 减慢所有其他当前进程
  • 一个非常过度劳累的垃圾收集器
  • 内存耗尽

在以下示例中,我们将取出.write()函数的返回值并将其更改为true,这实际上禁用了 Node.js 核心的回压支持。任何提到“修改后的”二进制文件时,我们指的是运行node二进制文件时,不包含return ret;这一行,而是替换为return true;

🌐 In the following examples, we will take out the return value of the .write() function and change it to true, which effectively disables backpressure support in Node.js core. In any reference to 'modified' binary, we are talking about running the node binary without the return ret; line, and instead with the replaced return true;.

垃圾回收的过度开销

🌐 Excess Drag on Garbage Collection

让我们来看一个快速基准测试。使用上面的相同示例,我们进行了几次时间测试,以获取两个二进制文件的中位时间。

🌐 Let's take a look at a quick benchmark. Using the same example from above, we ran a few time trials to get a median time for both binaries.

   trial (#)  | `node` binary (ms) | modified `node` binary (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
average time: |      55299         |           55975

两者运行大约需要一分钟,所以几乎没有什么差别,但让我们仔细看看,以确认我们的猜测是否正确。我们使用 Linux 工具 dtrace 来评估 V8 垃圾回收器的运行情况。

🌐 Both take around a minute to run, so there's not much of a difference at all, but let's take a closer look to confirm whether our suspicions are correct. We use the Linux tool dtrace to evaluate what's happening with the V8 garbage collector.

GC(垃圾回收器)测量时间表示垃圾回收器完成一次完整扫描周期的时间间隔:

🌐 The GC (garbage collector) measured time indicates the intervals of a full cycle of a single sweep done by the garbage collector:

approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1

         *             *           *
         *             *           *
         *             *           *

      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

虽然这两个过程开始时相同,并且似乎以相同的速度进行垃圾回收,但显而易见的是,在使用正常工作的背压系统几秒钟后,它会将垃圾回收负载分散到持续的 4-8 毫秒间隔,直到数据传输结束。

🌐 While the two processes start the same and seem to work the GC at the same rate, it becomes evident that after a few seconds with a properly working backpressure system in place, it spreads the GC load across consistent intervals of 4-8 milliseconds until the end of the data transfer.

然而,当没有回压系统时,V8 垃圾回收开始变慢。普通的 GC 二进制文件每分钟大约执行 75 次,而修改后的二进制文件每分钟仅执行 36 次。

🌐 However, when a backpressure system is not in place, the V8 garbage collection starts to drag out. The normal binary called the GC fires approximately 75 times in a minute, whereas, the modified binary fires only 36 times.

这是由于内存使用增长而慢慢积累的债务。随着数据的传输,如果没有反压系统,每次数据块传输都会使用更多的内存。

🌐 This is the slow and gradual debt accumulating from growing memory usage. As data gets transferred, without a backpressure system in place, more memory is being used for each chunk transfer.

分配的内存越多,GC(垃圾回收)在一次清理中需要处理的内容就越多。清理范围越大,GC 就越需要判断哪些可以被释放,而且在更大的内存空间中扫描悬空指针会消耗更多的计算能力。

🌐 The more memory that is being allocated, the more the GC has to take care of in one sweep. The bigger the sweep, the more the GC needs to decide what can be freed up, and scanning for detached pointers in a larger memory space will consume more computing power.

内存耗尽

🌐 Memory Exhaustion

为了确定每个二进制文件的内存消耗,我们对每个进程都单独使用 /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js 进行了计时。

🌐 To determine the memory consumption of each binary, we've clocked each process with /usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js individually.

这是普通二进制文件的输出:

🌐 This is the output on the normal binary:

Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

虚拟内存占用的最大字节大小约为 87.81 MB。

🌐 The maximum byte size occupied by virtual memory turns out to be approximately 87.81 mb.

现在改变 .write() 函数的 返回值,我们得到:

🌐 And now changing the return value of the .write() function, we get:

Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

虚拟内存占用的最大字节大小约为 1.52 GB。

🌐 The maximum byte size occupied by virtual memory turns out to be approximately 1.52 gb.

如果没有流来处理回压,将会分配大一个数量级的内存空间 - 同一流程之间的差距非常大!

🌐 Without streams in place to delegate the backpressure, there is an order of magnitude greater of memory space being allocated - a huge margin of difference between the same process!

这个实验展示了优化且具有成本效益的 Node.js 回压机制对你的计算系统的作用。现在,让我们分解一下它是如何工作的!

🌐 This experiment shows how optimized and cost-effective Node.js' backpressure mechanism is for your computing system. Now, let's do a breakdown of how it works!

背压如何解决这些问题?

🌐 How Does Backpressure Resolve These Issues?

有不同的函数可以将数据从一个进程传输到另一个进程。在 Node.js 中,有一个内部内置函数叫做 .pipe()。你也可以使用其他 其他封装!不过,从这个过程的基本层面来看,我们有两个独立的组件:数据的 和 _ 消费者 _。

🌐 There are different functions to transfer data from one process to another. In Node.js, there is an internal built-in function called .pipe(). There are other packages out there you can use too! Ultimately though, at the basic level of this process, we have two separate components: the source of the data and the consumer.

当从源调用 .pipe() 时,它会向消费者发出有数据需要传输的信号。pipe 函数有助于为事件触发器设置适当的回压闭包。

🌐 When .pipe() is called from the source, it signals to the consumer that there is data to be transferred. The pipe function helps to set up the appropriate backpressure closures for the event triggers.

在 Node.js 中,源是一个 Readable 流,而消费者是 Writable 流(这两者都可以与 DuplexTransform 流互换,但这超出了本指南的范围)。

🌐 In Node.js the source is a Readable stream and the consumer is the Writable stream (both of these may be interchanged with a Duplex or a Transform stream, but that is out-of-scope for this guide).

触发回压的时刻可以精确地缩小到 Writable.write() 函数的返回值。当然,这个返回值是由几个条件决定的。

🌐 The moment that backpressure is triggered can be narrowed exactly to the return value of a Writable's .write() function. This return value is determined by a few conditions, of course.

在任何数据缓冲区已超过 highWaterMark 或写入队列当前正忙的情况下,.write() 将返回 false

🌐 In any scenario where the data buffer has exceeded the highWaterMark or the write queue is currently busy, .write() will return false.

当返回 false 值时,背压系统会启动。它会暂停传入的 Readable 流发送任何数据,并等待消费者准备好再次接收数据。一旦数据缓冲区被清空,就会发出 'drain' 事件,并恢复传入的数据流。

🌐 When a false value is returned, the backpressure system kicks in. It will pause the incoming Readable stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a 'drain' event will be emitted and resume the incoming data flow.

一旦队列处理完毕,反压机制将允许数据再次发送。内存中正在使用的空间将被释放,为下一批数据做好准备。

🌐 Once the queue is finished, backpressure will allow data to be sent again. The space in memory that was being used will free itself up and prepare for the next batch of data.

这实际上允许在任何给定时间为 .pipe() 函数使用固定数量的内存。不会有内存泄漏,也不会无限缓冲,垃圾回收器只需要处理内存中的一个区域!

🌐 This effectively allows a fixed amount of memory to be used at any given time for a .pipe() function. There will be no memory leakage, and no infinite buffering, and the garbage collector will only have to deal with one area in memory!

那么,如果回压如此重要,为什么你(可能)从未听说过它呢?答案很简单:Node.js 会为你自动处理这一切。

🌐 So, if backpressure is so important, why have you (probably) not heard of it? Well, the answer is simple: Node.js does all of this automatically for you.

那太棒了!但当我们试图理解如何实现自定义流时,情况又不那么理想。

🌐 That's so great! But also not so great when we are trying to understand how to implement our custom streams.

在大多数机器上,有一个字节大小用来确定缓冲区何时已满(这个值在不同机器上会有所不同)。Node.js 允许你设置自定义的 highWaterMark,但通常,默认值设置为 16kb(16384,或在 objectMode 流中为 16)。在某些情况下,如果你想提高这个值,可以这样做,但请谨慎!

.pipe() 的生命周期

🌐 Lifecycle of .pipe()

为了更好地理解背压,这里有一个关于 Readable 流被 pipedWritable 流的生命周期的流程图:

🌐 To achieve a better understanding of backpressure, here is a flow-chart on the lifecycle of a Readable stream being piped into a Writable stream:

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

如果你正在搭建一个管道,将几个流连接起来以处理数据,你最有可能会实现 Transform 流。

在这种情况下,你从 Readable 流输出的数据将进入 Transform,并会传入 Writable

🌐 In this case, your output from your Readable stream will enter in the Transform and will pipe into the Writable.

Readable.pipe(Transformable).pipe(Writable);

回压将会自动应用,但请注意,Transform 流的传入和传出 highWaterMark 都可以被操作,并且会影响回压系统。

🌐 Backpressure will be automatically applied, but note that both the incoming and outgoing highWaterMark of the Transform stream may be manipulated and will affect the backpressure system.

反压指南

🌐 Backpressure Guidelines

Node.js v0.10 起,Stream 类提供了通过使用这些相应函数的下划线版本 (._read()._write()) 来修改 .read().write() 行为的能力。

🌐 Since Node.js v0.10, the Stream class has offered the ability to modify the behavior of the .read() or .write() by using the underscore version of these respective functions (._read() and ._write()).

实现可读流实现可写流 有相关的指南文档。我们假设你已经阅读过这些内容,接下来的部分将会更深入一些。

🌐 There are guidelines documented for implementing Readable streams and implementing Writable streams. We will assume you've read these over, and the next section will go a little bit more in-depth.

实现自定义流时必须遵守的规则

🌐 Rules to Abide By When Implementing Custom Streams

流的黄金法则是始终尊重背压。最佳实践的标准就是不自相矛盾的实践。只要你注意避免与内部背压支持相冲突的行为,就可以肯定你正在遵循良好的实践。

🌐 The golden rule of streams is to always respect backpressure. What constitutes as best practice is non-contradictory practice. So long as you are careful to avoid behaviors that conflict with internal backpressure support, you can be sure you're following good practice.

一般来说,

🌐 In general,

  1. 如果没有被要求,绝不要使用 .push()
  2. .write() 返回 false 后不要再次调用它,而应该等待 'drain' 事件。
  3. 不同的 Node.js 版本和你使用的库对流的处理方式有所不同。请小心并进行测试。

关于第 3 点,一个非常有用的用于构建浏览器流的包是readable-stream。Rodd Vagg 撰写了一篇很棒的博文,介绍了这个库的实用性。简而言之,它为Readable流提供了一种自动优雅降级的方式,并支持较旧版本的浏览器和 Node.js。

可读流的特定规则

🌐 Rules specific to Readable Streams

到目前为止,我们已经了解了 .write() 如何影响背压,并且主要关注了 Writable 流。由于 Node.js 的功能,数据从技术上讲是从 Readable 流向 Writable 的。然而,正如我们在任何数据、物质或能量的传输中所观察到的那样,源头和目的地同样重要,而 Readable 流对于背压的处理至关重要。

🌐 So far, we have taken a look at how .write() affects backpressure and have focused much on the Writable stream. Because of Node.js' functionality, data is technically flowing downstream from Readable to Writable. However, as we can observe in any transmission of data, matter, or energy, the source is just as important as the destination, and the Readable stream is vital to how backpressure is handled.

这两个过程都依赖于彼此进行有效通信,如果 Readable 忽略了 Writable 流要求其停止发送数据的情况,就会像 .write() 的返回值不正确一样造成问题。

🌐 Both these processes rely on one another to communicate effectively, if the Readable ignores when the Writable stream asks for it to stop sending in data, it can be just as problematic as when the .write()'s return value is incorrect.

因此,除了要尊重 .write() 的返回值之外,我们还必须尊重在 ._read() 方法中使用的 .push() 的返回值。如果 .push() 返回 false,流将停止从源读取。否则,它将继续读取而不暂停。

🌐 So, as well as respecting the .write() return, we must also respect the return value of .push() used in the ._read() method. If .push() returns a false value, the stream will stop reading from the source. Otherwise, it will continue without pause.

下面是一个使用 .push() 的不良示例:

🌐 Here is an example of bad practice using .push():

// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class  extends Readable {
  () {
    let ;
    while (null !== ( = getNextChunk())) {
      this.push();
    }
  }
}

这是一个良好实践的示例,其中 Readable 流通过检查 this.push() 的返回值来尊重背压:

🌐 Here is an example of good practice, where the Readable stream respects backpressure by checking the return value of this.push():

class  extends Readable {
  () {
    let ;
    let  = true;
    while ( && null !== ( = getNextChunk())) {
       = this.push();
    }
  }
}

此外,从自定义流之外,也存在忽视背压的陷阱。在这个不良实践的反例中,应用的代码在数据可用时(由 'data' 事件 信号指示)强行传输数据:

🌐 Additionally, from outside the custom stream, there are pitfalls to ignoring backpressure. In this counter-example of good practice, the application's code forces data through whenever it is available (signaled by the 'data' event):

// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data',  => writable.write());

这是一个使用 Readable 流和 .push() 的示例。

🌐 Here's an example of using .push() with a Readable stream.

const {  } = ('node:stream');

// Create a custom Readable stream
const  = new ({
  : true,
  () {
    // Push some data onto the stream
    this.({ : 'Hello, world!' });
    this.(null); // Mark the end of the stream
  },
});

// Consume the stream
.('data',  => {
  .();
});

// Output:
// { message: 'Hello, world!' }

在这个例子中,我们创建了一个自定义的可读流,它使用 .push() 将一个对象推送到流中。当流准备好消费数据时,会调用 ._read() 方法,在这种情况下,我们立即将一些数据推送到流中,并通过推送 null 来标记流的结束。

🌐 In this example, we create a custom Readable stream that pushes a single object onto the stream using .push(). The ._read() method is called when the stream is ready to consume data, and in this case, we immediately push some data onto the stream and mark the end of the stream by pushing null.

然后,我们通过监听 'data' 事件并记录每个被推送到流的数据块来消费流。在这种情况下,我们只向流中推送一个数据块,因此我们只看到一条日志消息。

🌐 We then consume the stream by listening for the 'data' event and logging each chunk of data that is pushed onto the stream. In this case, we only push a single chunk of data onto the stream, so we only see one log message.

可写流的特定规则

🌐 Rules specific to Writable Streams

请记住,.write() 可能会根据某些条件返回 true 或 false。幸运的是,在构建我们自己的 Writable 流时,stream state machine 会处理我们的回调,并决定何时处理回压以及为我们优化数据流。

🌐 Recall that a .write() may return true or false dependent on some conditions. Luckily for us, when building our own Writable stream, the stream state machine will handle our callbacks and determine when to handle backpressure and optimize the flow of data for us.

然而,当我们想直接使用 Writable 时,我们必须注意 .write() 的返回值,并密切关注以下条件:

🌐 However, when we want to use a Writable directly, we must respect the .write() return value and pay close attention to these conditions:

// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class  extends Writable {
  (, , ) {
    if (.toString().indexOf('a') >= 0) {
      ();
    } else if (.toString().indexOf('b') >= 0) {
      ();
    }
    ();
  }
}

// The proper way to write this would be:
if (chunk.contains('a')) {
  return callback();
}

if (chunk.contains('b')) {
  return callback();
}
callback();

在实现 ._writev() 时也有一些需要注意的事项。该函数与 .cork() 相关,但在写入时常见的错误是:

🌐 There are also some things to look out for when implementing ._writev(). The function is coupled with .cork(), but there is a common mistake when writing:

// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();

ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();

// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
.(, ws);

ws.cork();
ws.write('from ');
ws.write('Matteo');
.(, ws);

// As a global function.
function () {
  .uncork();
}

.cork() 可以调用任意多次,我们只需要注意调用相同次数的 .uncork() 才能让它重新流动。

结论

🌐 Conclusion

流(Streams)是 Node.js 中经常使用的模块。它们对于内部结构很重要,对于开发者来说,也有助于在 Node.js 模块生态系统中扩展和连接。

🌐 Streams are an often-used module in Node.js. They are important to the internal structure, and for developers, to expand and connect across the Node.js modules ecosystem.

希望你现在能够在考虑背压的情况下,排查问题并安全地编写自己的 WritableReadable 流,同时与同事和朋友分享你的知识。

🌐 Hopefully, you will now be able to troubleshoot and safely code your own Writable and Readable streams with backpressure in mind, and share your knowledge with colleagues and friends.

请务必多了解 Stream 的其他 API 函数,以帮助在使用 Node.js 构建应用时提升和释放你的流处理能力。

🌐 Be sure to read up more on Stream for other API functions to help improve and unleash your streaming capabilities when building an application with Node.js.