如何使用流
🌐 How To Use Streams
在 Node.js 应用中处理大量数据可能是一把双刃剑。处理海量数据的能力非常方便,但也可能导致性能瓶颈和内存耗尽。传统上,开发者通过一次性将整个数据集读取到内存中来应对这一挑战。这种方法对于较小的数据集虽然直观,但对于大型数据(例如文件、网络请求等)来说效率低且资源消耗大。
🌐 Working with large amounts of data in Node.js applications can be a double-edged sword. The ability to handle massive amounts of data is extremely handy but can also lead to performance bottlenecks and memory exhaustion. Traditionally, developers tackled this challenge by reading the entire dataset into memory at once. This approach, while intuitive for smaller datasets, becomes inefficient and resource-intensive for large data (e.g., files, network requests…).
这就是 Node.js 流发挥作用的地方。流提供了一种根本不同的方法,允许你逐步处理数据并优化内存使用。通过以可管理的块处理数据,流使你能够构建可扩展的应用,即使是最庞大的数据集也能高效处理。正如大家常说的,“流是随时间变化的数组。”
🌐 This is where Node.js streams come in. Streams offer a fundamentally different approach, allowing you to process data incrementally and optimize memory usage. By handling data in manageable chunks, streams empower you to build scalable applications that can efficiently tackle even the most daunting datasets. As popularly quoted, “streams are arrays over time.”
在本指南中,我们概述了 Stream 概念、历史和 API,以及有关如何使用和操作它们的一些建议。
🌐 In this guide, we give an overview of the Stream concept, history, and API as well as some recommendations on how to use and operate them.
什么是 Node.js 流?
🌐 What are Node.js Streams?
Node.js 流为管理应用中的数据流提供了一种强大的抽象。它们在处理大数据集方面表现出色,例如从文件或网络请求中读取或写入数据,同时不影响性能。
🌐 Node.js streams offer a powerful abstraction for managing data flow in your applications. They excel at processing large datasets, such as reading or writing from files and network requests, without compromising performance.
这种方法不同于一次性将整个数据集加载到内存中。流以块的形式处理数据,大大减少了内存使用。在 Node.js 中,所有流都继承自 EventEmitter 类,使它们能够在数据处理的各个阶段触发事件。这些流可以是可读的、可写的,或者两者兼具,为不同的数据处理场景提供了灵活性。
🌐 This approach differs from loading the entire dataset into memory at once. Streams process data in chunks, significantly reducing memory usage. All streams in Node.js inherit from the EventEmitter class, allowing them to emit events at various stages of data processing. These streams can be readable, writable, or both, providing flexibility for different data-handling scenarios.
事件驱动架构
🌐 Event-Driven Architecture
Node.js 依赖事件驱动的架构,因此非常适合实时 I/O。这意味着一旦有输入就立即处理,一旦应用生成输出就立即发送。流与这种方式无缝集成,使得持续的数据处理成为可能。
🌐 Node.js thrives on an event-driven architecture, making it ideal for real-time I/O. This means consuming input as soon as it's available and sending output as soon as the application generates it. Streams seamlessly integrate with this approach, enabling continuous data processing.
他们通过在关键阶段触发事件来实现这一点。这些事件包括接收数据的信号(data 事件)和流完成的信号(end 事件)。开发者可以监听这些事件并相应地执行自定义逻辑。这种事件驱动的特性使得流在处理来自外部来源的数据时非常高效。
🌐 They achieve this by emitting events at key stages. These events include signals for received data (data event) and the stream's completion (end event). Developers can listen to these events and execute custom logic accordingly. This event-driven nature makes streams highly efficient for the processing of data from external sources.
为什么使用流?
🌐 Why use Streams?
与其他数据处理方法相比,Streams 具有三个主要优势:
🌐 Streams provide three key advantages over other data-handling methods:
- 内存效率:流式处理会增量处理数据,以块为单位消费和处理数据,而不是将整个数据集加载到内存中。当处理大型数据集时,这是一个主要优势,因为它显著减少了内存使用,并防止与内存相关的性能问题。
- 改进的响应时间:流允许即时处理数据。当一块数据到达时,可以在不等待整个数据包或数据集接收完成的情况下进行处理。这降低了延迟并提高了应用的整体响应能力。
- 实时处理的可扩展性:通过分块处理数据,Node.js 流可以用有限的资源高效地处理大量数据。这种可扩展性使得流非常适合处理实时高数据量的应用。
这些优势使流成为构建高性能、可扩展的 Node.js 应用的强大工具,特别是在处理大型数据集或实时数据处理时。
🌐 These advantages make streams a powerful tool for building high-performance, scalable Node.js applications, particularly when working with large datasets or real-time data processing.
性能说明
🌐 Note on performance
如果你的应用已经在内存中准备好了所有数据,使用流可能会增加不必要的开销、复杂性并降低应用的速度。
🌐 If your application already has all the data readily available in memory, using streams might add unnecessary overhead, complexity, and slow down your application.
流媒体历史
🌐 Stream history
本节是 Node.js 中流历史的参考资料。除非你正在处理的是为 0.11.5(2013)之前版本的 Node.js 编写的代码库,否则你很少会遇到老版本的流 API,但这些术语仍可能在使用中。
🌐 This section is a reference of the history of streams in Node.js. Unless you’re working with a codebase written for a Node.js version prior to 0.11.5 (2013), you will rarely encounter older versions of the streams API, but the terms might still be in use.
流 0
🌐 Streams 0
流的第一个版本与 Node.js 同时发布。尽管当时还没有 Stream 类,不同的模块已经使用了这个概念并实现了 read/write 函数。util.pump() 函数可用于控制流之间的数据流动。
🌐 The first version of streams was released at the same time as Node.js. Although there wasn't a Stream class yet, different modules used the concept and implemented the read/write functions. The util.pump() function was available to control the flow of data between streams.
流 1(经典)
🌐 Streams 1 (Classic)
随着 Node v0.4.0 在 2011 年的发布,引入了 Stream 类以及 pipe() 方法。
🌐 With the release of Node v0.4.0 in 2011, the Stream class was introduced, as well as the pipe() method.
流 2
🌐 Streams 2
2012 年,随着 Node v0.10.0 的发布,Streams 2 被引入。此次更新带来了新的流子类,包括 Readable、Writable、Duplex 和 Transform。此外,还新增了 readable 事件。为了保持向后兼容性,通过添加 data 事件监听器或调用 pause() 或 resume() 方法,可以将流切换回旧模式。
🌐 In 2012, with the release of Node v0.10.0, Streams 2 were unveiled. This update brought new stream subclasses, including Readable, Writable, Duplex, and Transform. Additionally, the readable event was added. To maintain backwards compatibility, streams could be switched to the old mode by adding a data event listener or calling pause() or resume() methods.
流 3
🌐 Streams 3
在 2013 年,Streams 3 随 Node v0.11.5 发布,用以解决流同时具有 data 和 readable 事件处理器的问题。这消除了在“当前模式”和“旧模式”之间选择的需要。Streams 3 是 Node.js 中流的当前版本。
🌐 In 2013, Streams 3 were released with Node v0.11.5, to address the problem of a stream having both a data and readable event handlers. This removed the need to choose between 'current' and 'old' modes. Streams 3 is the current version of streams in Node.js.
流类型
🌐 Stream types
可读
🌐 Readable
[Readable] 是我们用来顺序读取数据源的类。在 Node.js API 中,典型的 Readable 流示例包括读取文件时的 fs.ReadStream、读取 HTTP 请求时的 http.IncomingMessage 以及从标准输入读取时的 process.stdin。
主要方法和事件
🌐 Key Methods and Events
可读流使用几种核心方法和事件进行操作,允许对数据处理进行精细控制:
🌐 A readable stream operates with several core methods and events that allow fine control over data handling:
on('data'):每当流中有数据可用时,就会触发此事件。它非常快速,因为流会尽可能快地推送数据,因此适合高吞吐量的场景。on('end'):当流中没有更多数据可读时触发。它表示数据传输的完成。仅在流中的所有数据都已被读取后才会触发此事件。on('readable'):当流中有可读取的数据或已到达流末尾时,会触发此事件。它允许在需要时进行更受控的数据读取。on('close'):当流及其底层资源已被关闭时,将发出此事件,这表示不会再发出更多事件。on('error'):此事件可以在任何时候触发,用来表示处理过程中出现了错误。可以为此事件设置一个处理程序,以避免未捕获的异常。
可以在以下部分中看到这些事件的使用演示。
🌐 A demonstration of the use of these events can be seen in the following sections.
基本可读流
🌐 Basic Readable Stream
以下是动态生成数据的简单可读流实现的示例:
🌐 Here's an example of a simple readable stream implementation that generates data dynamically:
class extends Readable {
#count = 0;
() {
this.push(':-)');
if (++this.#count === 5) {
this.push(null);
}
}
}
const = new ();
.on('data', => {
.(.toString());
});
在这段代码中,MyStream 类继承自 Readable,并重写了 _read() 方法,将字符串 ":-)" 推送到内部缓冲区。在推送字符串五次之后,它通过推送 null 来表示流的结束。on('data') 事件处理程序会将接收到的每个数据块记录到控制台上。
🌐 In this code, the MyStream class extends Readable and overrides the _read() method to push a string ":-)" to the internal buffer. After pushing the string five times, it signals the end of the stream by pushing null. The on('data') event handler logs each chunk to the console as it is received.
使用可读事件的高级控制
🌐 Advanced Control with the readable Event
为了对数据流进行更精细的控制,可以使用可读事件。这个事件更复杂,但通过允许显式控制何时从流中读取数据,它可以为某些应用提供更好的性能:
🌐 For even finer control over data flow, the readable event can be used. This event is more complex but provides better performance for certain applications by allowing explicit control over when data is read from the stream:
const = new MyStream({
: 1,
});
.on('readable', () => {
.('>> readable event');
let ;
while (( = .read()) !== null) {
.(.toString()); // Process the chunk
}
});
.on('end', () => .('>> end event'));
在这里,使用可读事件根据需要手动从流中提取数据。可读事件处理器内部的循环会继续从流缓冲区读取数据,直到返回 null,表示缓冲区暂时为空或流已结束。将 highWaterMark 设置为 1 可以保持缓冲区较小,更频繁地触发可读事件,从而对数据流的控制更加细粒度。
🌐 Here, the readable event is used to pull data from the stream as needed manually. The loop inside the readable event handler continues to read data from the stream buffer until it returns null, indicating that the buffer is temporarily empty or the stream has ended. Setting highWaterMark to 1 keeps the buffer size small, triggering the readable event more frequently and allowing more granular control over the data flow.
使用上面的代码,你将获得如下输出
🌐 With the previous code, you’ll get an output like
>> readable event: 1
:-):-)
:-)
:-)
:-)
>> readable event: 2
>> readable event: 3
>> readable event: 4
>> end event
让我们尝试消化一下。当我们附加 on('readable') 事件时,它会首次调用 read(),因为这可能触发 readable 事件的发出。在该事件发出之后,我们在 while 循环的第一次迭代中调用 read。这就是为什么我们会在一行中得到前两个表情符号。之后,我们会不断调用 read,直到推送 null 为止。每次调用 read 都会安排发出一个新的 readable 事件,但由于我们处于“流动”模式(即使用 readable 事件),该事件的发出会被安排在 nextTick 中。这就是为什么当循环的同步代码执行完后,我们会在最后得到所有的表情符号。
🌐 Let’s try to digest that. When we attach the on('readable') event, it makes a first call to read() because that is what might trigger the emission of a readable event. After the emission of said event, we call read on the first iteration of the while loop. That’s why we get the first two smileys in one row. After that, we keep calling read until null is pushed. Each call to read programs the emission of a new readable event, but as we are in “flow” mode (i.e., using the readable event), the emission is scheduled for the nextTick. That’s why we get them all at the end, when the synchronous code of the loop is finished.
注意:你可以尝试使用 NODE_DEBUG=stream 运行代码,以查看每次 push 后 emitReadable 都会被触发。
🌐 NOTE: You can try to run the code with NODE_DEBUG=stream to see that emitReadable is triggered after each push.
如果我们想在每个表情符号前看到可读的事件,可以像这样将 push 封装在 setImmediate 或 process.nextTick 中:
🌐 If we want to see readable events called before each smiley, we can wrap push into a setImmediate or process.nextTick like this:
class extends Readable {
#count = 0;
() {
(() => {
this.push(':-)');
if (++this.#count === 5) {
return this.push(null);
}
});
}
}
我们将得到:
🌐 And we’ll get:
>> readable event: 1
:-)
>> readable event: 2
:-)
>> readable event: 3
:-)
>> readable event: 4
:-)
>> readable event: 5
:-)
>> readable event: 6
>> end event
可写
🌐 Writable
Writable 流对于创建文件、上传数据或任何涉及顺序输出数据的任务非常有用。虽然可读流提供数据源,但 Node.js 中的可写流则作为数据的目标。Node.js API 中可写流的典型例子有 fs.WriteStream 、process.stdout 和 process.stderr 。
可写流中的关键方法和事件
🌐 Key Methods and Events in Writable Streams
.write():此方法用于将一块数据写入流。它通过将数据缓冲到指定的限制(highWaterMark)来处理数据,并返回一个布尔值,指示是否可以立即写入更多数据。.end():此方法表示数据写入过程的结束。它向流发出完成写入操作的信号,并可能执行任何必要的清理工作。
创建可写对象
🌐 Creating a Writable
以下是创建可写流的示例,该流将所有传入数据转换为大写,然后再将其写入标准输出:
🌐 Here's an example of creating a writable stream that converts all incoming data to uppercase before writing it to the standard output:
const { } = ('node:events');
const { } = ('node:stream');
class extends {
constructor() {
super({ : 10 /* 10 bytes */ });
}
(, , ) {
..(.toString().toUpperCase() + '\n', );
}
}
async function () {
const = new ();
for (let = 0; < 10; ++) {
const = !.('hello');
if () {
.('>> wait drain');
await (, 'drain');
}
}
.('world');
}
// Call the async function
().(.);
在这段代码中,MyStream 是一个自定义的 Writable 流,缓冲区容量 (highWaterMark ) 为 10 字节。它重写了 _write 方法,在写出数据之前将其转换为大写。
🌐 In this code, MyStream is a custom Writable stream with a buffer capacity (highWaterMark) of 10 bytes. It overrides the _write method to convert data to uppercase before writing it out.
这个循环尝试向流中写入十次 hello。如果缓冲区已满(waitDrain 变为 true),它会等待 drain 事件后再继续,以确保不会压垮流的缓冲区。
🌐 The loop attempts to write hello ten times to the stream. If the buffer fills up (waitDrain becomes true), it waits for a drain event before continuing, ensuring we do not overwhelm the stream's buffer.
输出将是:
🌐 The output will be:
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
WORLD
双工
🌐 Duplex
Duplex 流同时实现了可读和可写接口。
双工流中的关键方法和事件
🌐 Key Methods and Events in Duplex Streams
双工流实现了可读和可写流中描述的所有方法和事件。
🌐 Duplex streams implement all the methods and events described in Readable and Writable Streams.
net 模块中的 Socket 类是双工流的一个很好的例子:
🌐 A good example of a duplex stream is the Socket class in the net module:
const = ('node:net');
// Create a TCP server
const = .( => {
.('Hello from server!\n');
.('data', => {
.(`Client says: ${.()}`);
});
// Handle client disconnection
.('end', () => {
.('Client disconnected');
});
});
// Start the server on port 8080
.(8080, () => {
.('Server listening on port 8080');
});
之前的代码将在端口 8080 上打开一个 TCP 套接字,向任何连接的客户端发送“Hello from server!”,并记录收到的任何数据。
🌐 The previous code will open a TCP socket on port 8080, send Hello from server! to any connecting client, and log any data received.
const = ('node:net');
// Connect to the server at localhost:8080
const = .({ : 8080 }, () => {
.('Hello from client!\n');
});
.('data', => {
.(`Server says: ${.()}`);
});
// Handle the server closing the connection
.('end', () => {
.('Disconnected from server');
});
之前的代码将连接到 TCP 套接字,发送一条 Hello from client 消息,并记录接收到的任何数据。
🌐 The previous code will connect to the TCP socket, send a Hello from client message, and log any received data.
转换
🌐 Transform
Transform 流是双工流,它的输出是基于输入计算得出的。顾名思义,它们通常用于可读流和可写流之间,以在数据通过时对其进行转换。
转换流中的关键方法和事件
🌐 Key Methods and Events in Transform Streams
除了 Duplex Streams 中的所有方法和事件之外,还有:
🌐 Apart from all the methods and events in Duplex Streams, there is:
_transform:此函数在内部调用,用于处理可读部分和可写部分之间的数据流。应用代码绝对不能调用此函数。
创建一个转换流
🌐 Creating a Transform Stream
要创建一个新的转换流,我们可以向 Transform 构造函数传递一个 options 对象,其中包括一个 transform 函数,该函数使用 push 方法处理如何从输入数据计算输出数据。
🌐 To create a new transform stream, we can pass an options object to the Transform constructor, including a transform function that handles how the output data is computed from the input data using the push method.
const { } = ('node:stream');
const = new ({
(, , ) {
this.(.toString().toUpperCase());
();
},
});
此流将接受任何输入并以大写形式输出。
🌐 This stream will take any input and output it in uppercase.
如何操作流
🌐 How to operate with streams
在使用流时,我们通常希望从一个源读取数据并写入到目标,可能还需要在中间对数据进行一些转换。以下部分将介绍实现这一过程的不同方法。
🌐 When working with streams, we usually want to read from a source and write to a destination, possibly needing some transformation of the data in between. The following sections will cover different ways to do so.
.pipe()
.pipe() 方法将一个可读流连接到可写(或转换)流。虽然这看起来是实现我们目标的简单方法,但它将所有错误处理委托给程序员,这使得正确处理变得困难。
🌐 The .pipe() method concatenates one readable stream to a writable (or transform) stream. Although this seems like a simple way to achieve our goal, it delegates all error handling to the programmer, making it difficult to get it right.
以下示例显示了一个管道试图将当前文件以大写形式输出到控制台。
🌐 The following example shows a pipe trying to output the current file in uppercase to the console.
const = ('node:fs');
const { } = ('node:stream');
let = 0;
const = new ({
(, , ) {
if ( === 10) {
return (new ('BOOM!'));
}
++;
this.(.toString().toUpperCase());
();
},
});
const = .(, { : 1 });
const = .;
.().();
.('close', () => {
.('Readable stream closed');
});
.('close', () => {
.('Transform stream closed');
});
.('error', => {
.('\nError in transform stream:', .);
});
.('close', () => {
.('Writable stream closed');
});
在写入 10 个字符后,upper将在回调中返回一个错误,这将导致流关闭。然而,其他流不会收到通知,从而导致内存泄漏。输出将是:
🌐 After writing 10 characters, upper will return an error in the callback, which will cause the stream to close. However, the other streams won’t be notified, resulting in memory leaks. The output will be:
CONST FS =
Error in transform stream: BOOM!
Transform stream closed
pipeline()
为了避免 .pipe() 方法的陷阱和底层复杂性,在大多数情况下,推荐使用 pipeline() 方法。该方法是一种更安全、更稳健的方式来连接流,它会自动处理错误和清理工作。
🌐 To avoid the pitfalls and low-level complexity of the .pipe() method, in most cases, it is recommended to use the pipeline() method. This method is a safer and more robust way to pipe streams together, handling errors and cleanup automatically.
以下示例演示了如何使用 pipeline() 避免前一个示例中的陷阱:
🌐 The following example demonstrates how using pipeline() prevents the pitfalls of the previous example:
const = ('node:fs');
const { , } = ('node:stream');
let = 0;
const = new ({
(, , ) {
if ( === 10) {
return (new ('BOOM!'));
}
++;
this.(.toString().toUpperCase());
();
},
});
const = .(, { : 1 });
const = .;
.('close', () => {
.('Readable stream closed');
});
.('close', () => {
.('\nTransform stream closed');
});
.('close', () => {
.('Writable stream closed');
});
(, , , => {
if () {
return .('Pipeline error:', .);
}
.('Pipeline succeeded');
});
在这种情况下,所有流都将关闭,输出如下:
🌐 In this case, all streams will be closed with the following output:
CONST FS =
Transform stream closed
Writable stream closed
Pipeline error: BOOM!
Readable stream closed
pipeline() 方法还有一个 async pipeline() 版本,它不接受回调,而是返回一个 Promise,如果管道失败,该 Promise 会被拒绝。
🌐 The pipeline() method also has an async pipeline() version, which doesn’t accept a callback but instead returns a promise that is rejected if the pipeline fails.
异步迭代器
🌐 Async Iterators
异步迭代器被推荐作为与 Streams API 交互的标准方式。与 Web 和 Node.js 中的所有流原语相比,异步迭代器更易于理解和使用,有助于减少错误并使代码更易维护。在较新的 Node.js 版本中,异步迭代器已经成为与流交互的更优雅且易读的方式。基于事件机制,异步迭代器提供了更高级的抽象,简化了流的使用。
🌐 Async iterators are recommended as the standard way of interfacing with the Streams API. Compared to all the stream primitives in both the Web and Node.js, async iterators are easier to understand and use, contributing to fewer bugs and more maintainable code. In recent versions of Node.js, async iterators have emerged as a more elegant and readable way to interact with streams. Building upon the foundation of events, async iterators provide a higher-level abstraction that simplifies stream consumption.
在 Node.js 中,所有可读流都是异步可迭代对象。这意味着你可以使用 for await...of 语法来遍历流中的数据,当数据可用时处理每一部分数据,从而以异步代码的高效和简洁性进行处理。
🌐 In Node.js, all readable streams are asynchronous iterables. This means you can use the for await...of syntax to loop through the stream's data as it becomes available, handling each piece of data with the efficiency and simplicity of asynchronous code.
使用异步迭代器与流的好处
🌐 Benefits of Using Async Iterators with Streams
使用带有流的异步迭代器可以通过多种方式简化异步数据流的处理:
🌐 Using async iterators with streams simplifies the handling of asynchronous data flows in several ways:
- 增强可读性:代码结构更清晰、更易读,尤其是在处理多个异步数据源时。
- 错误处理:异步迭代器允许使用 try/catch 块进行直接的错误处理,这类似于常规的异步函数。
- 流量控制:它们本质上可以管理背压,因为消费者通过等待下一条数据来控制流量,从而实现更高效的内存使用和处理。
异步迭代器提供了一种更现代、更易读的方式来处理可读流,尤其是在处理异步数据源或你更喜欢更连续、基于循环的数据处理方法时。
🌐 Async iterators offer a more modern and often more readable way to work with readable streams, especially when dealing with asynchronous data sources or when you prefer a more sequential, loop-based approach to data processing.
以下是演示如何将异步迭代器与可读流结合使用的示例:
🌐 Here's an example demonstrating the use of async iterators with a readable stream:
const = ('node:fs');
const { } = ('node:stream/promises');
async function () {
await (
.(),
async function* () {
for await (let of ) {
yield .toString().toUpperCase();
}
},
.
);
}
().(.);
这段代码实现了与前面示例相同的效果,无需定义新的转换流。为了简洁,已删除前面示例中的错误。使用了异步版本的管道,应该将其封装在 try...catch 块中以处理可能出现的错误。
🌐 This code achieves the same result as the previous examples, without the need to define a new transform stream. The error from the previous examples has been removed for the sake of brevity. The async version of the pipeline has been used, and it should be wrapped in a try...catch block to handle possible errors.
对象模式
🌐 Object mode
默认情况下,流可以处理字符串、Buffer、TypedArray 或 DataView。如果将与这些类型不同的任意值(例如对象)推入流中,将会抛出 TypeError。但是,可以通过将 objectMode 选项设置为 true 来处理对象。这允许流处理任何 JavaScript 值,但不包括 null,因为 null 用于标志流的结束。这意味着你可以在可读流中 push 和 read 任何值,在可写流中 write 任何值。
🌐 By default, streams can work with strings, Buffer, TypedArray, or DataView. If an arbitrary value different from these (e.g., an object) is pushed into a stream, a TypeError will be thrown. However, it is possible to work with objects by setting the objectMode option to true. This allows the stream to work with any JavaScript value, except for null, which is used to signal the end of the stream. This means you can push and read any value in a readable stream, and write any value in a writable stream.
const { } = ('node:stream');
const = ({
: true,
() {
this.push({ : 'world' });
this.push(null);
},
});
在对象模式下工作时,重要的是要记住 highWaterMark 选项指的是对象的数量,而不是字节数。
🌐 When working in object mode, it is important to remember that the highWaterMark option refers to the number of objects, not bytes.
背压
🌐 Backpressure
在使用流时,确保生产者不会压倒消费者非常重要。为此,Node.js API 中的所有流都使用了背压机制,实现者有责任维护这种行为。
🌐 When using streams, it is important to make sure the producer doesn't overwhelm the consumer. For this, the backpressure mechanism is used in all streams in the Node.js API, and implementors are responsible for maintaining that behavior.
在任何数据缓冲区超过 highWaterMark 或写入队列当前正忙的情况下,.write() 将返回 false。
🌐 In any scenario where the data buffer has exceeded the highWaterMark or the write queue is currently busy, .write() will return false.
当返回 false 值时,背压系统会启动。它会暂停传入的 Readable 流发送任何数据,并等待消费者再次准备好。一旦数据缓冲区被清空,将触发 'drain' 事件以恢复传入数据流。
🌐 When a false value is returned, the backpressure system kicks in. It will pause the incoming Readable stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a 'drain' event will be emitted to resume the incoming data flow.
要更深入了解背压,请查看 backpressure guide。
🌐 For a deeper understanding of backpressure, check the backpressure guide.
流与网络流
🌐 Streams vs Web streams
流的概念并不专属于 Node.js。事实上,Node.js 对流的概念有一个不同的实现,称为 Web Streams,它实现了 WHATWG Streams Standard。虽然它们背后的概念相似,但重要的是要注意,它们有不同的 API,并且不能直接兼容。
🌐 The stream concept is not exclusive to Node.js. In fact, Node.js has a different implementation of the stream concept called Web Streams, which implements the WHATWG Streams Standard. Although the concepts behind them are similar, it is important to be aware that they have different APIs and are not directly compatible.
Web Streams 实现了 ReadableStream、WritableStream 和 TransformStream 类,这些类与 Node.js 的 Readable、Writable 和 Transform 流是同源的。
流与 Web 流的互操作性
🌐 Interoperability of streams and Web Streams
Node.js 提供了用于在 Web 流和 Node.js 流之间转换的实用函数。这些函数在每个流类中实现为 toWeb 和 fromWeb 方法。
🌐 Node.js provides utility functions to convert to/from Web Streams and Node.js streams. These functions are implemented as toWeb and fromWeb methods in each stream class.
下面 Duplex 类中的示例演示了如何处理被转换为 Web 流的可读和可写流:
🌐 The following example in the Duplex class demonstrates how to work with both readable and writable streams converted to Web Streams:
const { } = ('node:stream');
const = ({
() {
this.push('world');
this.push(null);
},
(, , ) {
.('writable', );
();
},
});
const { , } = .();
.().('hello');
.()
.()
.( => {
.('readable', .);
});
如果你需要从 Node.js 模块返回 Web 流或反之,辅助函数会很有用。对于常规流的使用,异步迭代器可以实现与 Node.js 和 Web 流的无缝交互。
🌐 The helper functions are useful if you need to return a Web Stream from a Node.js module or vice versa. For regular consumption of streams, async iterators enable seamless interaction with both Node.js and Web Streams.
const { } = ('node:stream/promises');
async function () {
const { } = await ('https://nodejs.cn/api/stream.html');
await (
,
new (),
async function* () {
for await (const of ) {
yield .toString().toUpperCase();
}
},
.
);
}
().(.);
请注意,fetch 的主体是一个 ReadableStream<Uint8Array>,因此需要一个 TextDecoderStream 来将数据块作为字符串处理。
🌐 Be aware that the fetch body is a ReadableStream<Uint8Array>, and therefore a TextDecoderStream is needed to work with chunks as strings.
这项工作来源于 Matteo Collina 在 Platformatic's Blog 上发布的内容。
🌐 This work is derived from content published by Matteo Collina in Platformatic's Blog.