如何使用流
¥How To Use Streams
在 Node.js 应用中处理大量数据可能是一把双刃剑。处理大量数据的能力非常方便,但也可能导致性能瓶颈和内存耗尽。传统上,开发者通过一次性将整个数据集读入内存来解决这一挑战。这种方法虽然对于较小的数据集很直观,但对于大数据(例如文件、网络请求……)来说效率低下且资源密集。
¥Working with large amounts of data in Node.js applications can be a double-edged sword. The ability to handle massive amounts of data is extremely handy but can also lead to performance bottlenecks and memory exhaustion. Traditionally, developers tackled this challenge by reading the entire dataset into memory at once. This approach, while intuitive for smaller datasets, becomes inefficient and resource-intensive for large data (e.g., files, network requests…).
这就是 Node.js 流的用武之地。Streams 提供了一种根本不同的方法,允许你逐步处理数据并优化内存使用。通过以可管理的块处理数据,流使你能够构建可扩展的应用,从而高效地处理最艰巨的数据集。正如人们普遍引用的那样,“流是随时间变化的数组。”
¥This is where Node.js streams come in. Streams offer a fundamentally different approach, allowing you to process data incrementally and optimize memory usage. By handling data in manageable chunks, streams empower you to build scalable applications that can efficiently tackle even the most daunting datasets. As popularly quoted, “streams are arrays over time.”
在本指南中,我们概述了 Stream 概念、历史和 API,以及有关如何使用和操作它们的一些建议。
¥In this guide, we give an overview of the Stream concept, history, and API as well as some recommendations on how to use and operate them.
什么是 Node.js 流?
¥What are Node.js Streams?
Node.js 流为管理应用中的数据流提供了强大的抽象。它们擅长处理大型数据集,例如从文件和网络请求中读取或写入,而不会影响性能。
¥Node.js streams offer a powerful abstraction for managing data flow in your applications. They excel at processing large datasets, such as reading or writing from files and network requests, without compromising performance.
这种方法不同于一次将整个数据集加载到内存中。Streams 以块的形式处理数据,大大减少了内存使用量。Node.js 中的所有流都从 EventEmitter
类继承,允许它们在数据处理的各个阶段触发事件。这些流可以是可读的、可写的或两者兼有,为不同的数据处理场景提供灵活性。
¥This approach differs from loading the entire dataset into memory at once. Streams process data in chunks, significantly reducing memory usage. All streams in Node.js inherit from the EventEmitter
class, allowing them to emit events at various stages of data processing. These streams can be readable, writable, or both, providing flexibility for different data-handling scenarios.
事件驱动架构
¥Event-Driven Architecture
Node.js 依靠事件驱动架构蓬勃发展,使其成为实时 I/O 的理想选择。这意味着在输入可用时立即使用输入,并在应用生成输出后立即发送输出。Streams 与这种方法无缝集成,可实现连续数据处理。
¥Node.js thrives on an event-driven architecture, making it ideal for real-time I/O. This means consuming input as soon as it's available and sending output as soon as the application generates it. Streams seamlessly integrate with this approach, enabling continuous data processing.
它们通过在关键阶段触发事件来实现这一点。这些事件包括接收数据的信号(data
事件)和流的完成(end
事件)。开发者可以监听这些事件并相应地执行自定义逻辑。这种事件驱动特性使流在处理来自外部源的数据时非常高效。
¥They achieve this by emitting events at key stages. These events include signals for received data (data
event) and the stream's completion (end
event). Developers can listen to these events and execute custom logic accordingly. This event-driven nature makes streams highly efficient for the processing of data from external sources.
为什么使用 Streams?
¥Why use Streams?
与其他数据处理方法相比,Streams 具有三个主要优势:
¥Streams provide three key advantages over other data-handling methods:
-
内存效率:Streams 以增量的方式处理数据,以块的形式使用和处理数据,而不是将整个数据集加载到内存中。这在处理大型数据集时是一个主要优势,因为它可以显着减少内存使用量并防止与内存相关的性能问题。
¥Memory Efficiency: Streams process data incrementally, consuming and processing data in chunks rather than loading the entire dataset into memory. This is a major advantage when dealing with large datasets, as it significantly reduces memory usage and prevents memory-related performance issues.
-
改进的响应时间:Streams 允许立即处理数据。当一大块数据到达时,无需等待接收整个有效负载或数据集即可对其进行处理。这减少了延迟并提高了应用的整体响应能力。
¥Improved Response Time: Streams allow for immediate data processing. When a chunk of data arrives, it can be processed without waiting for the entire payload or dataset to be received. This reduces latency and improves your application's overall responsiveness.
-
实时处理的可扩展性:通过分块处理数据,Node.js 流可以用有限的资源高效地处理大量数据。这种可扩展性使流成为实时处理大量数据的应用的理想选择。
¥Scalability for Real-Time Processing: By handling data in chunks, Node.js streams can efficiently handle large amounts of data with limited resources. This scalability makes streams ideal for applications that process high volumes of data in real time.
这些优势使流成为构建高性能、可扩展的 Node.js 应用的强大工具,特别是在处理大型数据集或实时数据处理时。
¥These advantages make streams a powerful tool for building high-performance, scalable Node.js applications, particularly when working with large datasets or real-time data processing.
性能说明
¥Note on performance
如果你的应用已经在内存中准备好了所有数据,使用流可能会增加不必要的开销、复杂性并降低应用的速度。
¥If your application already has all the data readily available in memory, using streams might add unnecessary overhead, complexity, and slow down your application.
流历史
¥Stream history
本节是 Node.js 中流历史的参考。除非你使用的是为 0.11.5(2013)之前的 Node.js 版本编写的代码库,否则你很少会遇到旧版本的流 API,但这些术语可能仍在使用。
¥This section is a reference of the history of streams in Node.js. Unless you’re working with a codebase written for a Node.js version prior to 0.11.5 (2013), you will rarely encounter older versions of the streams API, but the terms might still be in use.
流 0
¥Streams 0
流的第一个版本与 Node.js 同时发布。虽然还没有 Stream 类,但不同的模块使用了该概念并实现了 read
/write
函数。util.pump()
函数可用于控制流之间的数据流。
¥The first version of streams was released at the same time as Node.js. Although there wasn't a Stream class yet, different modules used the concept and implemented the read
/write
functions. The util.pump()
function was available to control the flow of data between streams.
流 1(经典)
¥Streams 1 (Classic)
随着 2011 年 Node v0.4.0 的发布,引入了 Stream 类以及 pipe()
方法。
¥With the release of Node v0.4.0 in 2011, the Stream class was introduced, as well as the pipe()
method.
流 2
¥Streams 2
2012 年,随着 Node v0.10.0 的发布,Streams 2 面世。此更新带来了新的流子类,包括 Readable、Writable、Duplex 和 Transform。此外,还添加了 readable
事件。为了保持向后兼容性,可以通过添加 data
事件监听器或调用 pause()
或 resume()
方法将流切换到旧模式。
¥In 2012, with the release of Node v0.10.0, Streams 2 were unveiled. This update brought new stream subclasses, including Readable, Writable, Duplex, and Transform. Additionally, the readable
event was added. To maintain backwards compatibility, streams could be switched to the old mode by adding a data
event listener or calling pause()
or resume()
methods.
流 3
¥Streams 3
2013 年,Streams 3 与 Node v0.11.5 一起发布,以解决流同时具有 data
和 readable
事件处理程序的问题。这消除了在 'current' 和 'old' 模式之间进行选择的需要。Streams 3 是 Node.js 中流的当前版本。
¥In 2013, Streams 3 were released with Node v0.11.5, to address the problem of a stream having both a data
and readable
event handlers. This removed the need to choose between 'current' and 'old' modes. Streams 3 is the current version of streams in Node.js.
流类型
¥Stream types
可读
¥Readable
Readable
是我们用来按顺序读取数据源的类。Node.js API 中 Readable
流的典型示例是读取文件时的 fs.ReadStream
、读取 HTTP 请求时的 http.IncomingMessage
和从标准输入读取时的 process.stdin
。
¥Readable
is the class that we use to sequentially read a source of data. Typical examples of Readable
streams in Node.js API are fs.ReadStream
when reading files, http.IncomingMessage
when reading HTTP requests, and process.stdin
when reading from the standard input.
关键方法和事件
¥Key Methods and Events
可读流使用几种核心方法和事件进行操作,允许对数据处理进行精细控制:
¥A readable stream operates with several core methods and events that allow fine control over data handling:
-
on('data')
:只要流中有数据可用,就会触发此事件。它非常快,因为流会以它可以处理的速度推送数据,使其适用于高吞吐量场景。¥
on('data')
: This event is triggered whenever data is available from the stream. It is very fast, as the stream pushes data as quickly as it can handle, making it suitable for high-throughput scenarios. -
on('end')
:当没有更多数据可从流中读取时触发。它表示数据传输的完成。仅当流中的所有数据都已使用时,才会触发此事件。¥
on('end')
: Emitted when there is no more data to read from the stream. It signifies the completion of data delivery. This event is only fired when all the data from the stream has been consumed. -
on('readable')
:当有数据可从流中读取或已到达流末尾时,将触发此事件。它允许在需要时进行更受控制的数据读取。¥
on('readable')
: This event is triggered when there is data available to read from the stream or when the end of the stream has been reached. It allows for more controlled data reading when needed. -
on('close')
:当流及其底层资源已关闭时,将触发此事件,并表示不会再触发任何事件。¥
on('close')
: This event is emitted when the stream and its underlying resources have been closed and indicates that no more events will be emitted. -
on('error')
:此事件可在任何时间点触发,表示处理时出现错误。此事件的处理程序可用于避免未捕获的异常。¥
on('error')
: This event can be emitted at any point, signaling that there was an error processing. A handler for this event can be used to avoid uncaught exceptions.
可以在以下部分中看到这些事件的使用演示。
¥A demonstration of the use of these events can be seen in the following sections.
基本可读流
¥Basic Readable Stream
以下是动态生成数据的简单可读流实现的示例:
¥Here's an example of a simple readable stream implementation that generates data dynamically:
const { Readable } = require('node:stream');
class MyStream extends Readable {
#count = 0;
_read(size) {
this.push(':-)');
if (++this.#count === 5) {
this.push(null);
}
}
}
const stream = new MyStream();
stream.on('data', chunk => {
console.log(chunk.toString());
});
在此代码中,MyStream
类扩展了 Readable 并重写了 [_read
][_read
] 方法以将字符串 ":-)" 推送到内部缓冲区。在推送字符串五次后,它通过推送 null
来表示流的结束。on('data')
事件处理程序在收到每个块时将其记录到控制台。
¥In this code, the MyStream
class extends Readable and overrides the [_read
][] method to push a string ":-)" to the internal buffer. After pushing the string five times, it signals the end of the stream by pushing null
. The on('data')
event handler logs each chunk to the console as it is received.
使用可读事件进行高级控制
¥Advanced Control with the readable Event
为了更好地控制数据流,可以使用可读事件。此事件更复杂,但通过允许明确控制何时从流中读取数据,可以为某些应用提供更好的性能:
¥For even finer control over data flow, the readable event can be used. This event is more complex but provides better performance for certain applications by allowing explicit control over when data is read from the stream:
const stream = new MyStream({
highWaterMark: 1,
});
stream.on('readable', () => {
console.count('>> readable event');
let chunk;
while ((chunk = stream.read()) !== null) {
console.log(chunk.toString()); // Process the chunk
}
});
stream.on('end', () => console.log('>> end event'));
此处,可读事件用于根据需要手动从流中提取数据。可读事件处理程序内的循环继续从流缓冲区读取数据,直到返回 null
,表示缓冲区暂时为空或流已结束。将 highWaterMark
设置为 1 可使缓冲区大小保持较小,从而更频繁地触发可读事件并允许对数据流进行更精细的控制。
¥Here, the readable event is used to pull data from the stream as needed manually. The loop inside the readable event handler continues to read data from the stream buffer until it returns null
, indicating that the buffer is temporarily empty or the stream has ended. Setting highWaterMark
to 1 keeps the buffer size small, triggering the readable event more frequently and allowing more granular control over the data flow.
使用上面的代码,你将获得如下输出
¥With the previous code, you’ll get an output like
>> readable event: 1
:-):-)
:-)
:-)
:-)
>> readable event: 2
>> readable event: 3
>> readable event: 4
>> end event
让我们试着消化一下。当我们附加 on('readable')
事件时,它会首先调用 read()
,因为这可能会触发 readable
事件的触发。在所述事件触发后,我们在 while
循环的第一次迭代中调用 read
。这就是我们在一行中得到前两个笑脸的原因。之后,我们继续调用 read
,直到推送 null
。每次调用 read
都会编程触发新的 readable
事件,但由于我们处于“流”模式(即使用 readable
事件),因此触发计划在 nextTick
进行。这就是我们在最后,即循环的同步代码完成时,获得所有笑脸的原因。
¥Let’s try to digest that. When we attach the on('readable')
event, it makes a first call to read()
because that is what might trigger the emission of a readable
event. After the emission of said event, we call read
on the first iteration of the while
loop. That’s why we get the first two smileys in one row. After that, we keep calling read
until null
is pushed. Each call to read
programs the emission of a new readable
event, but as we are in “flow” mode (i.e., using the readable
event), the emission is scheduled for the nextTick
. That’s why we get them all at the end, when the synchronous code of the loop is finished.
注意:你可以尝试使用 NODE_DEBUG=stream
运行代码,以查看每次 push
之后是否触发 emitReadable
。
¥NOTE: You can try to run the code with NODE_DEBUG=stream
to see that emitReadable
is triggered after each push
.
如果我们想看到在每个笑脸之前调用的可读事件,我们可以像这样将 push
封装到 setImmediate
或 process.nextTick
中:
¥If we want to see readable events called before each smiley, we can wrap push
into a setImmediate
or process.nextTick
like this:
class MyStream extends Readable {
#count = 0;
_read(size) {
setImmediate(() => {
this.push(':-)');
if (++this.#count === 5) {
return this.push(null);
}
});
}
}
我们将获得:
¥And we’ll get:
>> readable event: 1
:-)
>> readable event: 2
:-)
>> readable event: 3
:-)
>> readable event: 4
:-)
>> readable event: 5
:-)
>> readable event: 6
>> end event
可写
¥Writable
Writable
流对于创建文件、上传数据或任何涉及顺序输出数据的任务都很有用。虽然可读流提供了数据源,但 Node.js 中的可写流充当了数据的目标。Node.js API 中可写流的典型示例是 fs.WriteStream
、process.stdout
和 process.stderr
。
¥Writable
streams are useful for creating files, uploading data, or any task that involves sequentially outputting data. While readable streams provide the source of data, writable streams in Node.js act as the destination for your data. Typical examples of writable streams in the Node.js API are fs.WriteStream
, process.stdout
, and process.stderr
.
可写流中的关键方法和事件
¥Key Methods and Events in Writable Streams
-
.write()
:此方法用于将一块数据写入流。它通过将数据缓冲到定义的限制(highWaterMark)来处理数据,并返回一个布尔值,指示是否可以立即写入更多数据。¥
.write()
: This method is used to write a chunk of data to the stream. It handles the data by buffering it up to a defined limit (highWaterMark), and returns a boolean indicating whether more data can be written immediately. -
.end()
:此方法表示数据写入过程的结束。它向流触发信号以完成写入操作并可能执行任何必要的清理。¥
.end()
: This method signals the end of the data writing process. It signals the stream to complete the write operation and potentially perform any necessary cleanup.
创建可写
¥Creating a Writable
以下是创建可写流的示例,该流将所有传入数据转换为大写,然后再将其写入标准输出:
¥Here's an example of creating a writable stream that converts all incoming data to uppercase before writing it to the standard output:
const { Writable } = require('node:stream');
const { once } = require('node:events');
class MyStream extends Writable {
constructor() {
super({ highWaterMark: 10 /* 10 bytes */ });
}
_write(data, encode, cb) {
process.stdout.write(data.toString().toUpperCase() + '\n', cb);
}
}
const stream = new MyStream();
for (let i = 0; i < 10; i++) {
const waitDrain = !stream.write('hello');
if (waitDrain) {
console.log('>> wait drain');
await once(stream, 'drain');
}
}
stream.end('world');
在此代码中,MyStream
是一个自定义的 Writable
流,缓冲区容量(highWaterMark
)为 10 字节。它重写了 _write
方法,在写出数据之前将数据转换为大写。
¥In this code, MyStream
is a custom Writable
stream with a buffer capacity (highWaterMark
) of 10 bytes. It overrides the _write
method to convert data to uppercase before writing it out.
循环尝试将 hello 写入流十次。如果缓冲区已填满(waitDrain
变为 true
),它会等待 drain
事件再继续,确保我们不会使流的缓冲区超负荷。
¥The loop attempts to write hello ten times to the stream. If the buffer fills up (waitDrain
becomes true
), it waits for a drain
event before continuing, ensuring we do not overwhelm the stream's buffer.
输出将是:
¥The output will be:
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
HELLO
>> wait drain
HELLO
WORLD
双工
¥Duplex
Duplex
流实现了可读和可写接口。
¥Duplex
streams implement both the readable and writable interfaces.
双工流中的关键方法和事件
¥Key Methods and Events in Duplex Streams
双工流实现了可读和可写流中描述的所有方法和事件。
¥Duplex streams implement all the methods and events described in Readable and Writable Streams.
双工流的一个很好的例子是 net
模块中的 Socket
类:
¥A good example of a duplex stream is the Socket
class in the net
module:
const net = require('node:net');
// Create a TCP server
const server = net.createServer(socket => {
socket.write('Hello from server!\n');
socket.on('data', data => {
console.log(`Client says: ${data.toString()}`);
});
// Handle client disconnection
socket.on('end', () => {
console.log('Client disconnected');
});
});
// Start the server on port 8080
server.listen(8080, () => {
console.log('Server listening on port 8080');
});
上面的代码将在端口 8080 上打开 TCP 套接字,将 Hello from server!
发送到任何连接的客户端,并记录任何收到的数据。
¥The previous code will open a TCP socket on port 8080, send Hello from server!
to any connecting client, and log any data received.
const net = require('node:net');
// Connect to the server at localhost:8080
const client = net.createConnection({ port: 8080 }, () => {
client.write('Hello from client!\n');
});
client.on('data', data => {
console.log(`Server says: ${data.toString()}`);
});
// Handle the server closing the connection
client.on('end', () => {
console.log('Disconnected from server');
});
上面的代码将连接到 TCP 套接字,发送 Hello from client
消息并记录任何收到的数据。
¥The previous code will connect to the TCP socket, send a Hello from client
message, and log any received data.
转换
¥Transform
Transform
流是双工流,其中输出是根据输入计算的。顾名思义,它们通常用于可读流和可写流之间,以在数据通过时对其进行转换。
¥Transform
streams are duplex streams, where the output is computed based on the input. As the name suggests, they are usually used between a readable and a writable stream to transform the data as it passes through.
转换流中的关键方法和事件
¥Key Methods and Events in Transform Streams
除了 Duplex Streams 中的所有方法和事件之外,还有:
¥Apart from all the methods and events in Duplex Streams, there is:
-
_transform
:此函数在内部调用以处理可读部分和可写部分之间的数据流。这一定不能由应用代码调用。¥
_transform
: This function is called internally to handle the flow of data between the readable and writable parts. This MUST NOT be called by application code.
创建转换流
¥Creating a Transform Stream
要创建新的转换流,我们可以将 options
对象传递给 Transform
构造函数,其中包括一个 transform
函数,该函数处理如何使用 push
方法从输入数据计算输出数据。
¥To create a new transform stream, we can pass an options
object to the Transform
constructor, including a transform
function that handles how the output data is computed from the input data using the push
method.
const { Transform } = require('node:stream');
const upper = new Transform({
transform: function (data, enc, cb) {
this.push(data.toString().toUpperCase());
cb();
},
});
此流将接受任何输入并以大写形式输出。
¥This stream will take any input and output it in uppercase.
如何使用流
¥How to operate with streams
使用流时,我们通常希望从源读取并写入目标,可能需要在中间对数据进行一些转换。以下部分将介绍不同的方法。
¥When working with streams, we usually want to read from a source and write to a destination, possibly needing some transformation of the data in between. The following sections will cover different ways to do so.
.pipe()
.pipe()
方法将一个可读流连接到一个可写(或转换)流。虽然这似乎是实现我们目标的一种简单方法,但它将所有错误处理委托给程序员,因此很难做到正确。
¥The .pipe()
method concatenates one readable stream to a writable (or transform) stream. Although this seems like a simple way to achieve our goal, it delegates all error handling to the programmer, making it difficult to get it right.
以下示例显示了一个管道试图将当前文件以大写形式输出到控制台。
¥The following example shows a pipe trying to output the current file in uppercase to the console.
const fs = require('node:fs');
const { Transform } = require('node:stream');
let errorCount = 0;
const upper = new Transform({
transform: function (data, enc, cb) {
if (errorCount === 10) {
return cb(new Error('BOOM!'));
}
errorCount++;
this.push(data.toString().toUpperCase());
cb();
},
});
const readStream = fs.createReadStream(__filename, { highWaterMark: 1 });
const writeStream = process.stdout;
readStream.pipe(upper).pipe(writeStream);
readStream.on('close', () => {
console.log('Readable stream closed');
});
upper.on('close', () => {
console.log('Transform stream closed');
});
upper.on('error', err => {
console.error('\nError in transform stream:', err.message);
});
writeStream.on('close', () => {
console.log('Writable stream closed');
});
写入 10 个字符后,upper
将在回调中返回错误,这将导致流关闭。但是,其他流将不会收到通知,从而导致内存泄漏。输出将是:
¥After writing 10 characters, upper
will return an error in the callback, which will cause the stream to close. However, the other streams won’t be notified, resulting in memory leaks. The output will be:
CONST FS =
Error in transform stream: BOOM!
Transform stream closed
pipeline()
为了避免 .pipe()
方法的陷阱和底层复杂性,在大多数情况下,建议使用 pipeline()
方法。此方法是一种更安全、更强大的将流连接在一起的方法,可自动处理错误和清理。
¥To avoid the pitfalls and low-level complexity of the .pipe()
method, in most cases, it is recommended to use the pipeline()
method. This method is a safer and more robust way to pipe streams together, handling errors and cleanup automatically.
以下示例演示了如何使用 pipeline()
避免上一个示例的陷阱:
¥The following example demonstrates how using pipeline()
prevents the pitfalls of the previous example:
const fs = require('node:fs');
const { Transform, pipeline } = require('node:stream');
let errorCount = 0;
const upper = new Transform({
transform: function (data, enc, cb) {
if (errorCount === 10) {
return cb(new Error('BOOM!'));
}
errorCount++;
this.push(data.toString().toUpperCase());
cb();
},
});
const readStream = fs.createReadStream(__filename, { highWaterMark: 1 });
const writeStream = process.stdout;
readStream.on('close', () => {
console.log('Readable stream closed');
});
upper.on('close', () => {
console.log('\nTransform stream closed');
});
writeStream.on('close', () => {
console.log('Writable stream closed');
});
pipeline(readStream, upper, writeStream, err => {
if (err) {
return console.error('Pipeline error:', err.message);
}
console.log('Pipeline succeeded');
});
在这种情况下,所有流都将关闭,输出如下:
¥In this case, all streams will be closed with the following output:
CONST FS =
Transform stream closed
Writable stream closed
Pipeline error: BOOM!
Readable stream closed
pipeline()
方法还有一个 async pipeline()
版本,它不接受回调,而是返回一个 promise,如果管道失败,该 promise 将被拒绝。
¥The pipeline()
method also has an async pipeline()
version, which doesn’t accept a callback but instead returns a promise that is rejected if the pipeline fails.
异步迭代器
¥Async Iterators
建议使用异步迭代器作为与 Streams API 交互的标准方式。与 Web 和 Node.js 中的所有流原语相比,异步迭代器更易于理解和使用,有助于减少错误并提高代码的可维护性。在最近的 Node.js 版本中,异步迭代器已经成为一种与流交互的更优雅、更易读的方式。异步迭代器基于事件的基础,提供了更高级别的抽象,简化了流的使用。
¥Async iterators are recommended as the standard way of interfacing with the Streams API. Compared to all the stream primitives in both the Web and Node.js, async iterators are easier to understand and use, contributing to fewer bugs and more maintainable code. In recent versions of Node.js, async iterators have emerged as a more elegant and readable way to interact with streams. Building upon the foundation of events, async iterators provide a higher-level abstraction that simplifies stream consumption.
在 Node.js 中,所有可读流都是异步可迭代的。这意味着你可以使用 for await...of
语法在流的数据可用时循环遍历流的数据,以异步代码的效率和简单性处理每一段数据。
¥In Node.js, all readable streams are asynchronous iterables. This means you can use the for await...of
syntax to loop through the stream's data as it becomes available, handling each piece of data with the efficiency and simplicity of asynchronous code.
使用异步迭代器和流的好处
¥Benefits of Using Async Iterators with Streams
使用带有流的异步迭代器可以通过多种方式简化异步数据流的处理:
¥Using async iterators with streams simplifies the handling of asynchronous data flows in several ways:
-
增强的可读性:代码结构更干净、更易读,特别是在处理多个异步数据源时。
¥Enhanced Readability: The code structure is cleaner and more readable, particularly when dealing with multiple asynchronous data sources.
-
错误处理:异步迭代器允许使用 try/catch 块进行直接的错误处理,类似于常规异步函数。
¥Error Handling: Async iterators allow straightforward error handling using try/catch blocks, akin to regular asynchronous functions.
-
流量控制:它们固有地管理背压,因为消费者通过等待下一个数据来控制流程,从而实现更高效的内存使用和处理。
¥Flow Control: They inherently manage backpressure, as the consumer controls the flow by awaiting the next piece of data, allowing for more efficient memory usage and processing.
异步迭代器提供了一种更现代、更易读的方式来处理可读流,尤其是在处理异步数据源或你更喜欢更连续、基于循环的数据处理方法时。
¥Async iterators offer a more modern and often more readable way to work with readable streams, especially when dealing with asynchronous data sources or when you prefer a more sequential, loop-based approach to data processing.
以下是演示如何将异步迭代器与可读流结合使用的示例:
¥Here's an example demonstrating the use of async iterators with a readable stream:
const fs = require('node:fs');
const { pipeline } = require('node:stream/promises');
await pipeline(
fs.createReadStream(import.meta.filename),
async function* (source) {
for await (let chunk of source) {
yield chunk.toString().toUpperCase();
}
},
process.stdout
);
此代码实现了与前面的示例相同的结果,而无需定义新的转换流。为简洁起见,已删除前面示例中的错误。已使用管道的异步版本,应将其封装在 try...catch
块中以处理可能的错误。
¥This code achieves the same result as the previous examples, without the need to define a new transform stream. The error from the previous examples has been removed for the sake of brevity. The async version of the pipeline has been used, and it should be wrapped in a try...catch
block to handle possible errors.
对象模式
¥Object mode
默认情况下,流可以使用字符串、Buffer
、TypedArray
或 DataView
。如果将与这些不同的任意值(例如,对象)推送到流中,则会抛出 TypeError
。但是,可以通过将 objectMode
选项设置为 true
来处理对象。这允许流使用任何 JavaScript 值,但 null
除外,它用于表示流的结束。这意味着你可以在可读流中 push
和 read
任何值,在可写流中 write
任何值。
¥By default, streams can work with strings, Buffer
, TypedArray
, or DataView
. If an arbitrary value different from these (e.g., an object) is pushed into a stream, a TypeError
will be thrown. However, it is possible to work with objects by setting the objectMode
option to true
. This allows the stream to work with any JavaScript value, except for null
, which is used to signal the end of the stream. This means you can push
and read
any value in a readable stream, and write
any value in a writable stream.
const { Readable } = require('node:stream');
const readable = Readable({
objectMode: true,
read() {
this.push({ hello: 'world' });
this.push(null);
},
});
在对象模式下工作时,重要的是要记住 highWaterMark
选项指的是对象的数量,而不是字节数。
¥When working in object mode, it is important to remember that the highWaterMark
option refers to the number of objects, not bytes.
背压
¥Backpressure
使用流时,重要的是要确保生产者不会压倒消费者。为此,Node.js API 中的所有流都使用背压机制,实现者负责维护该行为。
¥When using streams, it is important to make sure the producer doesn't overwhelm the consumer. For this, the backpressure mechanism is used in all streams in the Node.js API, and implementors are responsible for maintaining that behavior.
在任何数据缓冲区已超过 highWaterMark
或写入队列当前繁忙的情况下,.write()
将返回 false
。
¥In any scenario where the data buffer has exceeded the highWaterMark
or the write queue is currently busy, .write()
will return false
.
当返回 false
值时,背压系统启动。它将暂停传入的 Readable
流发送任何数据并等待消费者再次准备就绪。一旦数据缓冲区清空,就会触发 'drain'
事件以恢复传入的数据流。
¥When a false
value is returned, the backpressure system kicks in. It will pause the incoming Readable
stream from sending any data and wait until the consumer is ready again. Once the data buffer is emptied, a 'drain'
event will be emitted to resume the incoming data flow.
要更深入地了解背压,请查看 backpressure guide
。
¥For a deeper understanding of backpressure, check the backpressure guide
.
流与 Web 流
¥Streams vs Web streams
流概念并非 Node.js 独有。事实上,Node.js 对流概念有另一种实现,称为 Web Streams
,它实现了 WHATWG Streams Standard
。尽管它们背后的概念相似,但重要的是要注意它们具有不同的 API 并且不直接兼容。
¥The stream concept is not exclusive to Node.js. In fact, Node.js has a different implementation of the stream concept called Web Streams
, which implements the WHATWG Streams Standard
. Although the concepts behind them are similar, it is important to be aware that they have different APIs and are not directly compatible.
Web Streams
实现 ReadableStream
、WritableStream
和 TransformStream
类,它们与 Node.js 的 Readable
、Writable
和 Transform
流同源。
¥Web Streams
implement the ReadableStream
, WritableStream
, and TransformStream
classes, which are homologous to Node.js's Readable
, Writable
, and Transform
streams.
流和 Web 流的互操作性
¥Interoperability of streams and Web Streams
Node.js 提供实用程序函数来转换为/从 Web 流和 Node.js 流。这些函数在每个流类中作为 toWeb
和 fromWeb
方法实现。
¥Node.js provides utility functions to convert to/from Web Streams and Node.js streams. These functions are implemented as toWeb
and fromWeb
methods in each stream class.
Duplex
类中的以下示例演示了如何使用转换为 Web 流的可读和可写流:
¥The following example in the Duplex
class demonstrates how to work with both readable and writable streams converted to Web Streams:
const { Duplex } = require('node:stream');
const duplex = Duplex({
read() {
this.push('world');
this.push(null);
},
write(chunk, encoding, callback) {
console.log('writable', chunk);
callback();
},
});
const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');
readable
.getReader()
.read()
.then(result => {
console.log('readable', result.value);
});
如果你需要从 Node.js 模块返回 Web 流或反之亦然,则辅助函数非常有用。对于流的常规使用,异步迭代器可实现与 Node.js 和 Web 流的无缝交互。
¥The helper functions are useful if you need to return a Web Stream from a Node.js module or vice versa. For regular consumption of streams, async iterators enable seamless interaction with both Node.js and Web Streams.
const { pipeline } = require('node:stream/promises');
const { body } = await fetch('https://nodejs.cn/api/stream.html');
await pipeline(
body,
new TextDecoderStream(),
async function* (source) {
for await (const chunk of source) {
yield chunk.toString().toUpperCase();
}
},
process.stdout
);
请注意,获取主体是 ReadableStream<Uint8Array>
,因此需要 TextDecoderStream
才能将块作为字符串使用。
¥Be aware that the fetch body is a ReadableStream<Uint8Array>
, and therefore a TextDecoderStream
is needed to work with chunks as strings.
这项工作源自 Matteo Collina 在 Platformatic 的博客 中发布的内容。
¥This work is derived from content published by Matteo Collina in Platformatic's Blog.