- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用 Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- env 环境变量
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议 2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS 模块
- module/esm ECMAScript 模块
- module/package 包模块
- module/typescript TS 模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- sea 单个可执行应用程序
- sqlite 轻型数据库
- stream 流
- stream/iter 可迭代流
- stream/web 网络流
- string_decoder 字符串解码器
- test 测试
- timers 定时器
- tls 安全传输层
- trace_events 跟踪事件
- tty 终端
- url 网址
- util 实用工具
- v8 引擎
- vm 虚拟机
- wasi 网络汇编系统接口
- worker_threads 工作线程
- zlib 压缩
- zlib/iter 可迭代压缩
Node.js v26.0.0 文档
- Node.js v26.0.0
- 目录
-
导航
- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用 Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- env 环境变量
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议 2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS 模块
- module/esm ECMAScript 模块
- module/package 包模块
- module/typescript TS 模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- sea 单个可执行应用程序
- sqlite 轻型数据库
- stream 流
- stream/iter 可迭代流
- stream/web 网络流
- string_decoder 字符串解码器
- test 测试
- timers 定时器
- tls 安全传输层
- trace_events 跟踪事件
- tty 终端
- url 网址
- util 实用工具
- v8 引擎
- vm 虚拟机
- wasi 网络汇编系统接口
- worker_threads 工作线程
- zlib 压缩
- zlib/iter 可迭代压缩
可迭代流#>
🌐 Iterable Streams
源代码: lib/stream/iter.js
node:stream/iter 模块提供了一个基于可迭代对象的流式 API,而不是基于事件驱动的 Readable/Writable/Transform 类层次结构,或 Web 流的 ReadableStream/WritableStream/TransformStream 接口。
🌐 The node:stream/iter module provides a streaming API built on iterables
rather than the event-driven Readable/Writable/Transform class hierarchy,
or the Web Streams ReadableStream/WritableStream/TransformStream interfaces.
此模块仅在启用 --experimental-stream-iter CLI 标志时可用。
🌐 This module is available only when the --experimental-stream-iter CLI flag
is enabled.
流被表示为 AsyncIterable<Uint8Array[]>(异步)或 Iterable<Uint8Array[]>(同步)。没有可继承的基类——任何实现了可迭代协议的对象都可以参与。转换是普通函数或拥有 transform 方法的对象。
🌐 Streams are represented as AsyncIterable<Uint8Array[]> (async) or
Iterable<Uint8Array[]> (sync). There are no base classes to extend -- any
object implementing the iterable protocol can participate. Transforms are plain
functions or objects with a transform method.
数据以批次(每次迭代 Uint8Array[])流动,以分摊异步操作的成本。
🌐 Data flows in batches (Uint8Array[] per iteration) to amortize the cost
of async operations.
import { from, pull, text } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';
// Compress and decompress a string
const compressed = pull(from('Hello, world!'), compressGzip());
const result = await text(pull(compressed, decompressGzip()));
console.log(result); // 'Hello, world!'const { from, pull, text } = require('node:stream/iter');
const { compressGzip, decompressGzip } = require('node:zlib/iter');
async function run() {
// Compress and decompress a string
const compressed = pull(from('Hello, world!'), compressGzip());
const result = await text(pull(compressed, decompressGzip()));
console.log(result); // 'Hello, world!'
}
run().catch(console.error);
import { open } from 'node:fs/promises';
import { text, pipeTo } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';
// Read a file, compress, write to another file
const src = await open('input.txt', 'r');
const dst = await open('output.gz', 'w');
await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true }));
await src.close();
// Read it back
const gz = await open('output.gz', 'r');
console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));const { open } = require('node:fs/promises');
const { text, pipeTo } = require('node:stream/iter');
const { compressGzip, decompressGzip } = require('node:zlib/iter');
async function run() {
// Read a file, compress, write to another file
const src = await open('input.txt', 'r');
const dst = await open('output.gz', 'w');
await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true }));
await src.close();
// Read it back
const gz = await open('output.gz', 'r');
console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));
}
run().catch(console.error);
概念#>
🌐 Concepts
字节流#>
🌐 Byte streams
此 API 中的所有数据均表示为 Uint8Array 字节。字符串在传递给 from()、push() 或 pipeTo() 时会自动进行 UTF-8 编码。这消除了编码方面的歧义,并实现了流与本地代码之间的零拷贝传输。
🌐 All data in this API is represented as Uint8Array bytes. Strings
are automatically UTF-8 encoded when passed to from(), push(), or
pipeTo(). This removes ambiguity around encodings and enables zero-copy
transfers between streams and native code.
批处理#>
🌐 Batching
每次迭代会产生一个批次——一个包含 Uint8Array 个块(Uint8Array[])的数组。批处理可以将 await 和 Promise 创建的成本分摊到多个块上。每次处理一个块的消费者可以简单地迭代内部数组:
🌐 Each iteration yields a batch -- an array of Uint8Array chunks
(Uint8Array[]). Batching amortizes the cost of await and Promise creation
across multiple chunks. A consumer that processes one chunk at a time can
simply iterate the inner array:
for await (const batch of source) {
for (const chunk of batch) {
handle(chunk);
}
}async function run() {
for await (const batch of source) {
for (const chunk of batch) {
handle(chunk);
}
}
}
变换#>
🌐 Transforms
变换有两种形式:
🌐 Transforms come in two forms:
- 无状态 -- 每批次调用一次函数
(chunks, options) => result。接收Uint8Array[](或作为刷新信号的null)和一个options对象。返回Uint8Array[]、null,或一个块的可迭代对象。 - 有状态 -- 一个对象
{ transform(source, options) },其中transform是一个生成器(同步或异步),接收整个上游可迭代对象和一个options对象,并生成输出。这种形式用于压缩、加密以及任何需要跨批处理缓冲的转换。
这两种形式都接收一个具有以下属性的 options 参数:
🌐 Both forms receive an options parameter with the following property:
options.signal<AbortSignal> 当管道被取消、遇到错误或消费者停止读取时,会触发的 AbortSignal。Transform 可以检查signal.aborted或监听'abort'事件以执行提前清理。
在源结束后发送刷新信号(null),给转换器一个机会来输出尾随数据(例如,压缩尾部)。
🌐 The flush signal (null) is sent after the source ends, giving transforms
a chance to emit trailing data (e.g., compression footers).
// Stateless: uppercase transform
const upper = (chunks) => {
if (chunks === null) return null; // flush
return chunks.map((c) => new TextEncoder().encode(
new TextDecoder().decode(c).toUpperCase(),
));
};
// Stateful: line splitter
const lines = {
transform: async function*(source) {
let partial = '';
for await (const chunks of source) {
if (chunks === null) {
if (partial) yield [new TextEncoder().encode(partial)];
continue;
}
for (const chunk of chunks) {
const str = partial + new TextDecoder().decode(chunk);
const parts = str.split('\n');
partial = parts.pop();
for (const line of parts) {
yield [new TextEncoder().encode(`${line}\n`)];
}
}
}
},
};
拉与推#>
🌐 Pull vs. push
该 API 支持两个模型:
🌐 The API supports two models:
- 拉取——数据按需流动。
pull()和pullSync()创建惰性管道,只有在消费者迭代时才从源读取。 - 推送 -- 数据是明确写入的。
push()创建一个具有背压的写入器/可读对象对。写入器推送数据;可读对象作为异步可迭代对象被消费。
反压#>
🌐 Backpressure
拉模式流有自然的背压——消费者控制速度,因此源的数据读取不会超过消费者的处理能力。推模式流需要显式背压,因为生产者和消费者独立运行。push()、broadcast() 和 share() 上的 highWaterMark 和 backpressure 选项控制这个机制的工作方式。
🌐 Pull streams have natural backpressure -- the consumer drives the pace, so
the source is never read faster than the consumer can process. Push streams
need explicit backpressure because the producer and consumer run
independently. The highWaterMark and backpressure options on push(),
broadcast(), and share() control how this works.
双缓冲模型#>
🌐 The two-buffer model
推送流使用一个两部分的缓冲系统。可以把它想象成一个通过水管(待写入数据)灌满的桶(槽),当桶满时浮球阀会关闭:
🌐 Push streams use a two-part buffering system. Think of it like a bucket (slots) being filled through a hose (pending writes), with a float valve that closes when the bucket is full:
highWaterMark (e.g., 3)
|
Producer v
| +---------+
v | |
[ write() ] ----+ +--->| slots |---> Consumer pulls
[ write() ] | | | (bucket)| for await (...)
[ write() ] v | +---------+
+--------+ ^
| pending| |
| writes | float valve
| (hose) | (backpressure)
+--------+
^
|
'strict' mode limits this too!
- 槽(桶) -- 为消费者准备的数据,最多为
highWaterMark。当消费者拉取时,它会一次性将所有槽排入一个批次中。 - 挂起写入(管道)——等待插槽空间的写入。当消费者消耗后,挂起的写入会被提升到现在空闲的插槽中,并且它们的承诺会被解决。
每项政策如何使用这些缓冲区:
🌐 How each policy uses these buffers:
| 策略 | 插槽限制 | 待处理写入限制 |
|---|---|---|
'strict' | highWaterMark | highWaterMark |
'block' | highWaterMark | 无上限 |
'drop-oldest' | highWaterMark | 不适用(从不等待) |
'drop-newest' | highWaterMark | 不适用(从不等待) |
严格(默认)#>
🌐 Strict (default)
严格模式会捕捉“抛出即忘”模式,其中生产者在没有等待的情况下调用 write(),这会导致内存无限增长。它将槽缓冲区和待处理写入队列都限制为 highWaterMark。
🌐 Strict mode catches "fire-and-forget" patterns where the producer calls
write() without awaiting, which would cause unbounded memory growth.
It limits both the slots buffer and the pending writes queue to
highWaterMark.
如果你正确地等待每一次写入,你一次只能有一个未完成的写入(你自己的),所以你永远不会达到未完成写入的限制。未等待的写入会在待处理队列中积累,并在队列溢出时抛出异常:
🌐 If you properly await each write, you can only ever have one pending write at a time (yours), so you never hit the pending writes limit. Unawaited writes accumulate in the pending queue and throw once it overflows:
import { push, text } from 'node:stream/iter';
const { writer, readable } = push({ highWaterMark: 16 });
// Consumer must run concurrently -- without it, the first write
// that fills the buffer blocks the producer forever.
const consuming = text(readable);
// GOOD: awaited writes. The producer waits for the consumer to
// make room when the buffer is full.
for (const item of dataset) {
await writer.write(item);
}
await writer.end();
console.log(await consuming);const { push, text } = require('node:stream/iter');
async function run() {
const { writer, readable } = push({ highWaterMark: 16 });
// Consumer must run concurrently -- without it, the first write
// that fills the buffer blocks the producer forever.
const consuming = text(readable);
// GOOD: awaited writes. The producer waits for the consumer to
// make room when the buffer is full.
for (const item of dataset) {
await writer.write(item);
}
await writer.end();
console.log(await consuming);
}
run().catch(console.error);
忘记 await 最终会抛出:
🌐 Forgetting to await will eventually throw:
// BAD: fire-and-forget. Strict mode throws once both buffers fill.
for (const item of dataset) {
writer.write(item); // Not awaited -- queues without bound
}
// --> throws "Backpressure violation: too many pending writes"
块#>
🌐 Block
块模式将插槽限制为 highWaterMark,但不限制待处理写入队列。等待中的写入会阻塞,直到消费者腾出空间,就像严格模式一样。区别在于,未等待的写入会默默地永久排队,而不是抛出错误——如果生产者忘记 await,可能会导致内存泄漏。
🌐 Block mode caps slots at highWaterMark but places no limit on the
pending writes queue. Awaited writes block until the consumer makes room,
just like strict mode. The difference is that unawaited writes silently
queue forever instead of throwing -- a potential memory leak if the
producer forgets to await.
这是现有 Node.js 经典流和 Web 流默认使用的模式。当你控制生产者并且知道它会正确等待时,或者在从这些 API 迁移代码时使用它。
🌐 This is the mode that existing Node.js classic streams and Web Streams default to. Use it when you control the producer and know it awaits properly, or when migrating code from those APIs.
import { push, text } from 'node:stream/iter';
const { writer, readable } = push({
highWaterMark: 16,
backpressure: 'block',
});
const consuming = text(readable);
// Safe -- awaited writes block until the consumer reads.
for (const item of dataset) {
await writer.write(item);
}
await writer.end();
console.log(await consuming);const { push, text } = require('node:stream/iter');
async function run() {
const { writer, readable } = push({
highWaterMark: 16,
backpressure: 'block',
});
const consuming = text(readable);
// Safe -- awaited writes block until the consumer reads.
for (const item of dataset) {
await writer.write(item);
}
await writer.end();
console.log(await consuming);
}
run().catch(console.error);
丢弃最旧的#>
🌐 Drop-oldest
写入操作从不等待。当槽缓冲区已满时,最旧的缓冲块会被清除,以为即将写入的数据腾出空间。消费者始终看到最新的数据。适用于实时信息流、遥测或任何陈旧数据价值低于当前数据的场景。
🌐 Writes never wait. When the slots buffer is full, the oldest buffered chunk is evicted to make room for the incoming write. The consumer always sees the most recent data. Useful for live feeds, telemetry, or any scenario where stale data is less valuable than current data.
import { push } from 'node:stream/iter';
// Keep only the 5 most recent readings
const { writer, readable } = push({
highWaterMark: 5,
backpressure: 'drop-oldest',
});const { push } = require('node:stream/iter');
// Keep only the 5 most recent readings
const { writer, readable } = push({
highWaterMark: 5,
backpressure: 'drop-oldest',
});
丢弃最新#>
🌐 Drop-newest
写入操作永不等待。当槽缓冲区已满时,传入的写入会被静 silently丢弃。消费者处理已经缓冲的数据,而不会被新的数据淹没。在限流或在压力下减轻负载时非常有用。
🌐 Writes never wait. When the slots buffer is full, the incoming write is silently discarded. The consumer processes what is already buffered without being overwhelmed by new data. Useful for rate-limiting or shedding load under pressure.
import { push } from 'node:stream/iter';
// Accept up to 10 buffered items; discard anything beyond that
const { writer, readable } = push({
highWaterMark: 10,
backpressure: 'drop-newest',
});const { push } = require('node:stream/iter');
// Accept up to 10 buffered items; discard anything beyond that
const { writer, readable } = push({
highWaterMark: 10,
backpressure: 'drop-newest',
});
写入器接口#>
🌐 Writer interface
作家是任何符合 Writer 接口的对象。只有 write() 是必需的;所有其他方法都是可选的。
🌐 A writer is any object conforming to the Writer interface. Only write() is
required; all other methods are optional.
每个异步方法都有一个同步的 *Sync 对应方法,设计用于尝试-回退模式:首先尝试快速的同步路径,只有当同步调用表明无法完成时才回退到异步版本:
🌐 Each async method has a synchronous *Sync counterpart designed for a
try-fallback pattern: attempt the fast synchronous path first, and fall back
to the async version only when the synchronous call indicates it could not
complete:
if (!writer.writeSync(chunk)) await writer.write(chunk);
if (!writer.writevSync(chunks)) await writer.writev(chunks);
if (writer.endSync() < 0) await writer.end();
writer.fail(err); // Always synchronous, no fallback needed
writer.desiredSize#>
在达到高水位之前可用的缓冲区槽数量。如果写入器已关闭或消费者已断开连接,则返回 null。
🌐 The number of buffer slots available before the high water mark is reached.
Returns null if the writer is closed or the consumer has disconnected.
该值始终为非负数。
🌐 The value is always non-negative.
writer.end([options])#>
options<Object>signal<AbortSignal> 仅取消此操作。该信号仅取消挂起的end()调用;它不会使写入器本身失败。
- 返回:Promise
写入的总字节数。
表示不再写入数据。
🌐 Signal that no more data will be written.
writer.endSync()#>
- 返回:<number> 写入的总字节数,如果写入器未打开,则为
-1。
writer.end() 的同步变体。如果写入器已关闭或出错,则返回 -1。可以作为尝试-回退模式使用:
🌐 Synchronous variant of writer.end(). Returns -1 if the writer is already
closed or errored. Can be used as a try-fallback pattern:
const result = writer.endSync();
if (result < 0) {
writer.end();
}
writer.fail(reason)#>
reason<any>
将写入器置于终端错误状态。如果写入器已关闭或出错,则此操作无效。与 write() 和 end() 不同,fail() 是无条件同步的,因为使写入器失败仅是一个纯状态转换,不涉及任何异步操作。
🌐 Put the writer into a terminal error state. If the writer is already closed
or errored, this is a no-op. Unlike write() and end(), fail() is
unconditionally synchronous because failing a writer is a pure state
transition with no async work to perform.
writer.write(chunk[, options])#>
chunk<Uint8Array> | <string>options<Object>signal<AbortSignal> 仅取消此写操作。该信号只取消待处理的write()调用;它不会导致写入器本身失败。
- 返回:Promise
写入一个块。当缓冲区有可用空间时,承诺会被解决。
🌐 Write a chunk. The promise resolves when buffer space is available.
writer.writeSync(chunk)#>
chunk<Uint8Array> | <string>- 返回:<boolean>
true如果写入被接受,false如果缓冲区已满。
同步写入。不阻塞;如果回压处于活动状态,则返回 false。
🌐 Synchronous write. Does not block; returns false if backpressure is active.
writer.writev(chunks[, options])#>
chunks<Uint8Array[]> | <string[]>options<Object>signal<AbortSignal> 仅取消此写操作。该信号只取消待处理的writev()调用;它不会导致写入器本身失败。
- 返回:Promise
将多个块作为一个批处理写入。
🌐 Write multiple chunks as a single batch.
writer.writevSync(chunks)#>
chunks<Uint8Array[]> | <string[]>- 返回:<boolean>
true如果写入被接受,false如果缓冲区已满。
同步批量写入。
🌐 Synchronous batch write.
stream/iter 模块#>
🌐 The stream/iter module
所有函数既可以作为命名导出使用,也可以作为 Stream 命名空间对象的属性使用:
🌐 All functions are available both as named exports and as properties of the
Stream namespace object:
// Named exports
import { from, pull, bytes, Stream } from 'node:stream/iter';
// Namespace access
Stream.from('hello');// Named exports
const { from, pull, bytes, Stream } = require('node:stream/iter');
// Namespace access
Stream.from('hello');
在模块说明符中包含 node: 前缀是可选的。
🌐 Including the node: prefix on the module specifier is optional.
来源#>
🌐 Sources
from(input)#>
input<string> | <ArrayBuffer> | <ArrayBufferView> | <Iterable> | <AsyncIterable> | <Object> 不能是null或undefined。- 返回:AsyncIterable
从给定输入创建一个异步字节流。字符串使用 UTF-8 编码。ArrayBuffer 和 ArrayBufferView 值被封装为 Uint8Array。数组和可迭代对象会被递归展开并规范化。
🌐 Create an async byte stream from the given input. Strings are UTF-8 encoded.
ArrayBuffer and ArrayBufferView values are wrapped as Uint8Array. Arrays
and iterables are recursively flattened and normalized.
实现 Symbol.for('Stream.toAsyncStreamable') 或 Symbol.for('Stream.toStreamable') 的对象通过这些协议进行转换。toAsyncStreamable 协议优先于 toStreamable,而 toStreamable 又优先于迭代协议(Symbol.asyncIterator、Symbol.iterator)。
🌐 Objects implementing Symbol.for('Stream.toAsyncStreamable') or
Symbol.for('Stream.toStreamable') are converted via those protocols. The
toAsyncStreamable protocol takes precedence over toStreamable, which takes
precedence over the iteration protocols (Symbol.asyncIterator,
Symbol.iterator).
import { Buffer } from 'node:buffer';
import { from, text } from 'node:stream/iter';
console.log(await text(from('hello'))); // 'hello'
console.log(await text(from(Buffer.from('hello')))); // 'hello'const { Buffer } = require('node:buffer');
const { from, text } = require('node:stream/iter');
async function run() {
console.log(await text(from('hello'))); // 'hello'
console.log(await text(from(Buffer.from('hello')))); // 'hello'
}
run().catch(console.error);
fromSync(input)#>
input<string> | <ArrayBuffer> | <ArrayBufferView> | <Iterable> | <Object> 不能是null或undefined。- 返回:Iterable
from() 的同步版本。返回一个同步可迭代对象。不能接受异步可迭代对象或 Promise。实现 Symbol.for('Stream.toStreamable') 的对象会通过该协议转换(优先于 Symbol.iterator)。toAsyncStreamable 协议将被完全忽略。
🌐 Synchronous version of from(). Returns a sync iterable. Cannot accept
async iterables or promises. Objects implementing
Symbol.for('Stream.toStreamable') are converted via that protocol (takes
precedence over Symbol.iterator). The toAsyncStreamable protocol is
ignored entirely.
import { fromSync, textSync } from 'node:stream/iter';
console.log(textSync(fromSync('hello'))); // 'hello'const { fromSync, textSync } = require('node:stream/iter');
console.log(textSync(fromSync('hello'))); // 'hello'
管道#>
🌐 Pipelines
pipeTo(source[, ...transforms], writer[, options])#>
source<AsyncIterable> | <Iterable> 数据源。...transforms<Function> | <Object> 要应用的零个或多个转换。writer<Object> 使用write(chunk)方法的目的地。options<Object>signal<AbortSignal> 中止管道。preventClose<boolean> 如果true,当源结束时不要调用writer.end()。默认值:false。preventFail<boolean> 如果true,在出现错误时不要调用writer.fail()。默认:false。
- 返回:Promise
写入的总字节数。
将一个源通过转换传输到一个写入器。如果写入器有一个 writev(chunks) 方法,整个批次会在一次调用中传递(支持分散/聚集 I/O)。
🌐 Pipe a source through transforms into a writer. If the writer has a
writev(chunks) method, entire batches are passed in a single call (enabling
scatter/gather I/O).
如果编写者实现了可选的 *Sync 方法(writeSync、writevSync、endSync),pipeTo() 将首先尝试使用同步方法作为快速路径,并且仅在同步方法表明无法完成时(例如,背压或等待下一个周期)才回退到异步版本。fail() 总是以同步方式调用。
🌐 If the writer implements the optional *Sync methods (writeSync, writevSync,
endSync), pipeTo() will attempt to use the synchronous methods
first as a fast path, and fall back to the async versions only when the sync
methods indicate they cannot complete (e.g., backpressure or waiting for the
next tick). fail() is always called synchronously.
import { from, pipeTo } from 'node:stream/iter';
import { compressGzip } from 'node:zlib/iter';
import { open } from 'node:fs/promises';
const fh = await open('output.gz', 'w');
const totalBytes = await pipeTo(
from('Hello, world!'),
compressGzip(),
fh.writer({ autoClose: true }),
);const { from, pipeTo } = require('node:stream/iter');
const { compressGzip } = require('node:zlib/iter');
const { open } = require('node:fs/promises');
async function run() {
const fh = await open('output.gz', 'w');
const totalBytes = await pipeTo(
from('Hello, world!'),
compressGzip(),
fh.writer({ autoClose: true }),
);
}
run().catch(console.error);
pipeToSync(source[, ...transforms], writer[, options])#>
source<Iterable> 同步数据源。...transforms<Function> | <Object> 零个或多个同步变换。writer<Object> 使用write(chunk)方法的目的地。options<Object>- 返回:<number> 写入的总字节数。
pipeTo() 的同步版本。source、所有转换以及 writer 必须是同步的。不能接受异步可迭代对象或 Promise。
🌐 Synchronous version of pipeTo(). The source, all transforms, and the
writer must be synchronous. Cannot accept async iterables or promises.
writer 必须具有 *Sync 方法(writeSync、writevSync、endSync)和 fail() 才能工作。
🌐 The writer must have the *Sync methods (writeSync, writevSync,
endSync) and fail() for this to work.
pull(source[, ...transforms][, options])#>
source<AsyncIterable> | <Iterable> 数据源。...transforms<Function> | <Object> 要应用的零个或多个转换。options<Object>signal<AbortSignal> 中止管道。
- 返回:AsyncIterable
创建一个惰性异步管道。数据不会从 source 读取,直到返回的可迭代对象被消费。转换按顺序应用。
🌐 Create a lazy async pipeline. Data is not read from source until the
returned iterable is consumed. Transforms are applied in order.
import { from, pull, text } from 'node:stream/iter';
const asciiUpper = (chunks) => {
if (chunks === null) return null;
return chunks.map((c) => {
for (let i = 0; i < c.length; i++) {
c[i] -= (c[i] >= 97 && c[i] <= 122) * 32;
}
return c;
});
};
const result = pull(from('hello'), asciiUpper);
console.log(await text(result)); // 'HELLO'const { from, pull, text } = require('node:stream/iter');
const asciiUpper = (chunks) => {
if (chunks === null) return null;
return chunks.map((c) => {
for (let i = 0; i < c.length; i++) {
c[i] -= (c[i] >= 97 && c[i] <= 122) * 32;
}
return c;
});
};
async function run() {
const result = pull(from('hello'), asciiUpper);
console.log(await text(result)); // 'HELLO'
}
run().catch(console.error);
使用 AbortSignal:
🌐 Using an AbortSignal:
import { pull } from 'node:stream/iter';
const ac = new AbortController();
const result = pull(source, transform, { signal: ac.signal });
ac.abort(); // Pipeline throws AbortError on next iterationconst { pull } = require('node:stream/iter');
const ac = new AbortController();
const result = pull(source, transform, { signal: ac.signal });
ac.abort(); // Pipeline throws AbortError on next iteration
pullSync(source[, ...transforms])#>
source<Iterable> 同步数据源。...transforms<Function> | <Object> 零个或多个同步变换。- 返回:Iterable
pull() 的同步版本。所有转换都必须是同步的。
🌐 Synchronous version of pull(). All transforms must be synchronous.
推流#>
🌐 Push streams
push([...transforms][, options])#>
...transforms<Function> | <Object> 可选转换应用于可读侧。options<Object>highWaterMark<number> 在应用回压之前的最大缓冲槽数。必须 >= 1;小于 1 的值将被限制为 1。默认值:4。backpressure<string> 背压策略:'strict'、'block'、'drop-oldest'或'drop-newest'。默认:'strict'。signal<AbortSignal> 中止流。
- 返回:<Object>
writerPushWriter 作家的方面。readableAsyncIterable可读的一面。
创建一个具有背压的推流。写入器推送数据;可读端作为异步可迭代对象被消费。
🌐 Create a push stream with backpressure. The writer pushes data in; the readable side is consumed as an async iterable.
import { push, text } from 'node:stream/iter';
const { writer, readable } = push();
// Producer and consumer must run concurrently. With strict backpressure
// (the default), awaited writes block until the consumer reads.
const producing = (async () => {
await writer.write('hello');
await writer.write(' world');
await writer.end();
})();
console.log(await text(readable)); // 'hello world'
await producing;const { push, text } = require('node:stream/iter');
async function run() {
const { writer, readable } = push();
// Producer and consumer must run concurrently. With strict backpressure
// (the default), awaited writes block until the consumer reads.
const producing = (async () => {
await writer.write('hello');
await writer.write(' world');
await writer.end();
})();
console.log(await text(readable)); // 'hello world'
await producing;
}
run().catch(console.error);
写入器返回 push() 符合 [Writer 接口][]。
🌐 The writer returned by push() conforms to the [Writer interface][].
双工通道#>
🌐 Duplex channels
duplex([options])#>
创建一对连接的双工通道以实现双向通信,类似于 socketpair()。写入一个通道的写入端的数据会出现在另一个通道的可读端。
🌐 Create a pair of connected duplex channels for bidirectional communication,
similar to socketpair(). Data written to one channel's writer appears in
the other channel's readable.
每个通道都有:
🌐 Each channel has:
writer— 一个用于向节点发送数据的 [Writer 接口][] 对象。readable— 一个用于从对等方读取数据的AsyncIterable<Uint8Array[]>。close()— 关闭该端的通道(幂等)。[Symbol.asyncDispose]()— 为await using提供异步释放支持。
import { duplex, text } from 'node:stream/iter';
const [client, server] = duplex();
// Server echoes back
const serving = (async () => {
for await (const chunks of server.readable) {
await server.writer.writev(chunks);
}
})();
await client.writer.write('hello');
await client.writer.end();
console.log(await text(server.readable)); // handled by echo
await serving;const { duplex, text } = require('node:stream/iter');
async function run() {
const [client, server] = duplex();
// Server echoes back
const serving = (async () => {
for await (const chunks of server.readable) {
await server.writer.writev(chunks);
}
})();
await client.writer.write('hello');
await client.writer.end();
console.log(await text(server.readable)); // handled by echo
await serving;
}
run().catch(console.error);
消费者#>
🌐 Consumers
array(source[, options])#>
sourceAsyncIterable|Iterable options<Object>signal<AbortSignal>limit<number> 要消耗的最大字节数。如果收集的总字节数超过限制,将抛出ERR_OUT_OF_RANGE错误
- 返回:Promise
将所有块收集为 Uint8Array 值的数组(不进行连接)。
🌐 Collect all chunks as an array of Uint8Array values (without concatenating).
arrayBuffer(source[, options])#>
sourceAsyncIterable|Iterable options<Object>signal<AbortSignal>limit<number> 要消耗的最大字节数。如果收集的总字节数超过限制,将抛出ERR_OUT_OF_RANGE错误
- 返回:Promise
将所有字节收集到 ArrayBuffer 中。
🌐 Collect all bytes into an ArrayBuffer.
arrayBufferSync(source[, options])#>
sourceIterableoptions<Object>limit<number> 要消耗的最大字节数。如果收集的总字节数超过限制,将抛出ERR_OUT_OF_RANGE错误
- 返回:<ArrayBuffer>
arrayBuffer() 的同步版本。
🌐 Synchronous version of arrayBuffer().
arraySync(source[, options])#>
sourceIterableoptions<Object>limit<number> 要消耗的最大字节数。如果收集的总字节数超过限制,将抛出ERR_OUT_OF_RANGE错误
- 返回:<Uint8Array[]>
array() 的同步版本。
🌐 Synchronous version of array().
bytes(source[, options])#>
sourceAsyncIterable|Iterable options<Object>signal<AbortSignal>limit<number> 要消耗的最大字节数。如果收集的总字节数超过限制,将抛出ERR_OUT_OF_RANGE错误
- 返回:Promise
将流中的所有字节收集到一个 Uint8Array 中。
🌐 Collect all bytes from a stream into a single Uint8Array.
import { from, bytes } from 'node:stream/iter';
const data = await bytes(from('hello'));
console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ]const { from, bytes } = require('node:stream/iter');
async function run() {
const data = await bytes(from('hello'));
console.log(data); // Uint8Array(5) [ 104, 101, 108, 108, 111 ]
}
run().catch(console.error);
bytesSync(source[, options])#>
sourceIterableoptions<Object>limit<number> 要消耗的最大字节数。如果收集的总字节数超过限制,将抛出ERR_OUT_OF_RANGE错误
- 返回:<Uint8Array>
bytes() 的同步版本。
🌐 Synchronous version of bytes().
text(source[, options])#>
sourceAsyncIterable|Iterable options<Object>encoding<string> 文本编码。默认:'utf-8'。signal<AbortSignal>limit<number> 要消耗的最大字节数。如果收集的总字节数超过限制,将抛出ERR_OUT_OF_RANGE错误
- 返回:Promise
收集所有字节并将其解码为文本。
🌐 Collect all bytes and decode as text.
import { from, text } from 'node:stream/iter';
console.log(await text(from('hello'))); // 'hello'const { from, text } = require('node:stream/iter');
async function run() {
console.log(await text(from('hello'))); // 'hello'
}
run().catch(console.error);
textSync(source[, options])#>
text() 的同步版本。
🌐 Synchronous version of text().
实用工具#>
🌐 Utilities
ondrain(drainable)#>
等待可排水写入器的背压清除。当写入器可以接受更多数据时,返回一个解析为 true 的 Promise;如果对象未实现可排水协议,则返回 null。
🌐 Wait for a drainable writer's backpressure to clear. Returns a promise that
resolves to true when the writer can accept more data, or null if the
object does not implement the drainable protocol.
import { push, ondrain, text } from 'node:stream/iter';
const { writer, readable } = push({ highWaterMark: 2 });
writer.writeSync('a');
writer.writeSync('b');
// Start consuming so the buffer can actually drain
const consuming = text(readable);
// Buffer is full -- wait for drain
const canWrite = await ondrain(writer);
if (canWrite) {
await writer.write('c');
}
await writer.end();
await consuming;const { push, ondrain, text } = require('node:stream/iter');
async function run() {
const { writer, readable } = push({ highWaterMark: 2 });
writer.writeSync('a');
writer.writeSync('b');
// Start consuming so the buffer can actually drain
const consuming = text(readable);
// Buffer is full -- wait for drain
const canWrite = await ondrain(writer);
if (canWrite) {
await writer.write('c');
}
await writer.end();
await consuming;
}
run().catch(console.error);
merge(...sources[, options])#>
...sourcesAsyncIterable|Iterable 两个或更多可迭代对象。 options<Object>signal<AbortSignal>
- 返回:AsyncIterable
通过按时间顺序产出批次来合并多个异步可迭代对象(无论哪个源先产生数据)。所有源都同时被消费。
🌐 Merge multiple async iterables by yielding batches in temporal order (whichever source produces data first). All sources are consumed concurrently.
import { from, merge, text } from 'node:stream/iter';
const merged = merge(from('hello '), from('world'));
console.log(await text(merged)); // Order depends on timingconst { from, merge, text } = require('node:stream/iter');
async function run() {
const merged = merge(from('hello '), from('world'));
console.log(await text(merged)); // Order depends on timing
}
run().catch(console.error);
tap(callback)#>
callback<Function>(chunks) => void在每个批次调用一次。- 返回:<Function> 无状态变换。
创建一个能够观察批处理而不修改它们的通道转换。适用于日志记录、指标或调试。
🌐 Create a pass-through transform that observes batches without modifying them. Useful for logging, metrics, or debugging.
import { from, pull, text, tap } from 'node:stream/iter';
const result = pull(
from('hello'),
tap((chunks) => console.log('Batch size:', chunks.length)),
);
console.log(await text(result));const { from, pull, text, tap } = require('node:stream/iter');
async function run() {
const result = pull(
from('hello'),
tap((chunks) => console.log('Batch size:', chunks.length)),
);
console.log(await text(result));
}
run().catch(console.error);
tap() 故意不阻止 tapping 回调对块的就地修改;但返回值会被忽略。
tapSync(callback)#>
callback<Function>- 返回:<Function>
tap() 的同步版本。
🌐 Synchronous version of tap().
多消费者#>
🌐 Multi-consumer
broadcast([options])#>
options<Object>highWaterMark<number> 缓冲区大小(以槽为单位)。必须 >= 1;小于 1 的值将被限制为 1。默认值:16。backpressure<string>'strict'、'block'、'drop-oldest'或'drop-newest'。默认值:'strict'。signal<AbortSignal>
- 返回:<Object>
writerBroadcastWriterbroadcastBroadcast
创建一个推模式多消费者广播通道。单个写入器将数据推送给多个消费者。每个消费者在共享缓冲区中都有一个独立的游标。
🌐 Create a push-model multi-consumer broadcast channel. A single writer pushes data to multiple consumers. Each consumer has an independent cursor into a shared buffer.
import { broadcast, text } from 'node:stream/iter';
const { writer, broadcast: bc } = broadcast();
// Create consumers before writing
const c1 = bc.push(); // Consumer 1
const c2 = bc.push(); // Consumer 2
// Producer and consumers must run concurrently. Awaited writes
// block when the buffer fills until consumers read.
const producing = (async () => {
await writer.write('hello');
await writer.end();
})();
const [r1, r2] = await Promise.all([text(c1), text(c2)]);
console.log(r1); // 'hello'
console.log(r2); // 'hello'
await producing;const { broadcast, text } = require('node:stream/iter');
async function run() {
const { writer, broadcast: bc } = broadcast();
// Create consumers before writing
const c1 = bc.push(); // Consumer 1
const c2 = bc.push(); // Consumer 2
// Producer and consumers must run concurrently. Awaited writes
// block when the buffer fills until consumers read.
const producing = (async () => {
await writer.write('hello');
await writer.end();
})();
const [r1, r2] = await Promise.all([text(c1), text(c2)]);
console.log(r1); // 'hello'
console.log(r2); // 'hello'
await producing;
}
run().catch(console.error);
broadcast.bufferSize#>
当前缓冲的块数。
🌐 The number of chunks currently buffered.
broadcast.cancel([reason])#>
reason<Error>
取消广播。所有消费者都收到错误。
🌐 Cancel the broadcast. All consumers receive an error.
broadcast.consumerCount#>
活跃消费者的数量。
🌐 The number of active consumers.
broadcast.push([...transforms][, options])#>
...transforms<Function> | <Object>options<Object>signal<AbortSignal>
- 返回:AsyncIterable
创建一个新的消费者。每个消费者从订阅时起接收写入广播的所有数据。可选的转换将应用于该消费者对数据的视图。
🌐 Create a new consumer. Each consumer receives all data written to the broadcast from the point of subscription onward. Optional transforms are applied to this consumer's view of the data.
broadcast[Symbol.dispose]()#>
broadcast.cancel() 的别名。
🌐 Alias for broadcast.cancel().
Broadcast.from(input[, options])#>
input<AsyncIterable> | <Iterable>options<Object> 与broadcast()相同。- 返回:<Object>
{ writer, broadcast }
从现有来源创建一个 Broadcast。该来源会被自动使用并推送给所有订阅者。
🌐 Create a Broadcast from an existing source. The source is consumed automatically and pushed to all subscribers.
share(source[, options])#>
source<AsyncIterable> 共享的来源。options<Object>- 返回:Share
创建一个拉取模式的多消费者共享流。与 broadcast() 不同,源只有在消费者拉取时才会被读取。多个消费者共享一个缓冲区。
🌐 Create a pull-model multi-consumer shared stream. Unlike broadcast(), the
source is only read when a consumer pulls. Multiple consumers share a single
buffer.
import { from, share, text } from 'node:stream/iter';
const shared = share(from('hello'));
const c1 = shared.pull();
const c2 = shared.pull();
// Consume concurrently to avoid deadlock with small buffers.
const [r1, r2] = await Promise.all([text(c1), text(c2)]);
console.log(r1); // 'hello'
console.log(r2); // 'hello'const { from, share, text } = require('node:stream/iter');
async function run() {
const shared = share(from('hello'));
const c1 = shared.pull();
const c2 = shared.pull();
// Consume concurrently to avoid deadlock with small buffers.
const [r1, r2] = await Promise.all([text(c1), text(c2)]);
console.log(r1); // 'hello'
console.log(r2); // 'hello'
}
run().catch(console.error);
share.bufferSize#>
当前缓冲的块数。
🌐 The number of chunks currently buffered.
share.cancel([reason])#>
reason<Error>
取消份额。所有消费者都会收到错误信息。
🌐 Cancel the share. All consumers receive an error.
share.consumerCount#>
活跃消费者的数量。
🌐 The number of active consumers.
share.pull([...transforms][, options])#>
...transforms<Function> | <Object>options<Object>signal<AbortSignal>
- 返回:AsyncIterable
创建共享源的新消费者。
🌐 Create a new consumer of the shared source.
share[Symbol.dispose]()#>
share.cancel() 的别名。
🌐 Alias for share.cancel().
Share.from(input[, options])#>
input<AsyncIterable>options<Object> 与share()相同。- 返回:Share
从现有来源创建一个 Share。
🌐 Create a Share from an existing source.
shareSync(source[, options])#>
source<Iterable> 要共享的同步源。options<Object>- 返回:SyncShare
share() 的同步版本。
🌐 Synchronous version of share().
SyncShare.fromSync(input[, options])#>
input<Iterable>options<Object>- 返回:SyncShare
压缩和解压变换#>
🌐 Compression and decompression transforms
pull()、pullSync()、pipeTo() 和 pipeToSync() 使用的压缩和解压缩转换可通过 node:zlib/iter 模块使用。详情请参见 node:zlib/iter 文档。
🌐 Compression and decompression transforms for use with pull(), pullSync(),
pipeTo(), and pipeToSync() are available via the node:zlib/iter
module. See the node:zlib/iter documentation for details.
协议符号#>
🌐 Protocol symbols
这些众所周知的符号允许第三方对象参与流协议,而无需直接从 node:stream/iter 导入。
🌐 These well-known symbols allow third-party objects to participate in the
streaming protocol without importing from node:stream/iter directly.
Stream.broadcastProtocol#>
- 值:
Symbol.for('Stream.broadcastProtocol')
该值必须是一个函数。当被 Broadcast.from() 调用时,它会接收到传递给 Broadcast.from() 的选项,并且必须返回符合 Broadcast 接口的对象。实现完全自定义 —— 它可以以任意方式管理消费者、缓冲和背压。
🌐 The value must be a function. When called by Broadcast.from(), it receives
the options passed to Broadcast.from() and must return an object conforming
to the Broadcast interface. The implementation is fully custom -- it can
manage consumers, buffering, and backpressure however it wants.
import { Broadcast, text } from 'node:stream/iter';
// This example defers to the built-in Broadcast, but a custom
// implementation could use any mechanism.
class MessageBus {
#broadcast;
#writer;
constructor() {
const { writer, broadcast } = Broadcast();
this.#writer = writer;
this.#broadcast = broadcast;
}
[Symbol.for('Stream.broadcastProtocol')](options) {
return this.#broadcast;
}
send(data) {
this.#writer.write(new TextEncoder().encode(data));
}
close() {
this.#writer.end();
}
}
const bus = new MessageBus();
const { broadcast } = Broadcast.from(bus);
const consumer = broadcast.push();
bus.send('hello');
bus.close();
console.log(await text(consumer)); // 'hello'const { Broadcast, text } = require('node:stream/iter');
// This example defers to the built-in Broadcast, but a custom
// implementation could use any mechanism.
class MessageBus {
#broadcast;
#writer;
constructor() {
const { writer, broadcast } = Broadcast();
this.#writer = writer;
this.#broadcast = broadcast;
}
[Symbol.for('Stream.broadcastProtocol')](options) {
return this.#broadcast;
}
send(data) {
this.#writer.write(new TextEncoder().encode(data));
}
close() {
this.#writer.end();
}
}
const bus = new MessageBus();
const { broadcast } = Broadcast.from(bus);
const consumer = broadcast.push();
bus.send('hello');
bus.close();
text(consumer).then(console.log); // 'hello'
Stream.drainableProtocol#>
- 值:
Symbol.for('Stream.drainableProtocol')
实现以使写入器与 ondrain() 兼容。该方法应返回一个在背压清除时解决的 Promise,或者如果没有背压则返回 null。
🌐 Implement to make a writer compatible with ondrain(). The method should
return a promise that resolves when backpressure clears, or null if no
backpressure.
import { ondrain } from 'node:stream/iter';
class CustomWriter {
#queue = [];
#drain = null;
#closed = false;
[Symbol.for('Stream.drainableProtocol')]() {
if (this.#closed) return null;
if (this.#queue.length < 3) return Promise.resolve(true);
this.#drain ??= Promise.withResolvers();
return this.#drain.promise;
}
write(chunk) {
this.#queue.push(chunk);
}
flush() {
this.#queue.length = 0;
this.#drain?.resolve(true);
this.#drain = null;
}
close() {
this.#closed = true;
}
}
const writer = new CustomWriter();
const ready = ondrain(writer);
console.log(ready); // Promise { true } -- no backpressureconst { ondrain } = require('node:stream/iter');
class CustomWriter {
#queue = [];
#drain = null;
#closed = false;
[Symbol.for('Stream.drainableProtocol')]() {
if (this.#closed) return null;
if (this.#queue.length < 3) return Promise.resolve(true);
this.#drain ??= Promise.withResolvers();
return this.#drain.promise;
}
write(chunk) {
this.#queue.push(chunk);
}
flush() {
this.#queue.length = 0;
this.#drain?.resolve(true);
this.#drain = null;
}
close() {
this.#closed = true;
}
}
const writer = new CustomWriter();
const ready = ondrain(writer);
console.log(ready); // Promise { true } -- no backpressure
Stream.shareProtocol#>
- 值:
Symbol.for('Stream.shareProtocol')
该值必须是一个函数。当被 Share.from() 调用时,它会接收传递给 Share.from() 的选项,并且必须返回一个符合 Share 接口的对象。实现完全自定义——它可以以任何方式管理共享源、消费者、缓冲和背压。
🌐 The value must be a function. When called by Share.from(), it receives the
options passed to Share.from() and must return an object conforming the the
Share interface. The implementation is fully custom -- it can manage the shared
source, consumers, buffering, and backpressure however it wants.
import { share, Share, text } from 'node:stream/iter';
// This example defers to the built-in share(), but a custom
// implementation could use any mechanism.
class DataPool {
#share;
constructor(source) {
this.#share = share(source);
}
[Symbol.for('Stream.shareProtocol')](options) {
return this.#share;
}
}
const pool = new DataPool(
(async function* () {
yield 'hello';
})(),
);
const shared = Share.from(pool);
const consumer = shared.pull();
console.log(await text(consumer)); // 'hello'const { share, Share, text } = require('node:stream/iter');
// This example defers to the built-in share(), but a custom
// implementation could use any mechanism.
class DataPool {
#share;
constructor(source) {
this.#share = share(source);
}
[Symbol.for('Stream.shareProtocol')](options) {
return this.#share;
}
}
const pool = new DataPool(
(async function* () {
yield 'hello';
})(),
);
const shared = Share.from(pool);
const consumer = shared.pull();
text(consumer).then(console.log); // 'hello'
Stream.shareSyncProtocol#>
- 值:
Symbol.for('Stream.shareSyncProtocol')
该值必须是一个函数。当被 SyncShare.fromSync() 调用时,它会接收到传递给 SyncShare.fromSync() 的选项,并且必须返回一个符合 SyncShare 接口的对象。实现完全自定义——它可以按照自己的方式管理共享源、消费者和缓冲。
🌐 The value must be a function. When called by SyncShare.fromSync(), it receives
the options passed to SyncShare.fromSync() and must return an object conforming
to the SyncShare interface. The implementation is fully custom -- it can manage
the shared source, consumers, and buffering however it wants.
import { shareSync, SyncShare, textSync } from 'node:stream/iter';
// This example defers to the built-in shareSync(), but a custom
// implementation could use any mechanism.
class SyncDataPool {
#share;
constructor(source) {
this.#share = shareSync(source);
}
[Symbol.for('Stream.shareSyncProtocol')](options) {
return this.#share;
}
}
const encoder = new TextEncoder();
const pool = new SyncDataPool(
function* () {
yield [encoder.encode('hello')];
}(),
);
const shared = SyncShare.fromSync(pool);
const consumer = shared.pull();
console.log(textSync(consumer)); // 'hello'const { shareSync, SyncShare, textSync } = require('node:stream/iter');
// This example defers to the built-in shareSync(), but a custom
// implementation could use any mechanism.
class SyncDataPool {
#share;
constructor(source) {
this.#share = shareSync(source);
}
[Symbol.for('Stream.shareSyncProtocol')](options) {
return this.#share;
}
}
const encoder = new TextEncoder();
const pool = new SyncDataPool(
function* () {
yield [encoder.encode('hello')];
}(),
);
const shared = SyncShare.fromSync(pool);
const consumer = shared.pull();
console.log(textSync(consumer)); // 'hello'
Stream.toAsyncStreamable#>
- 值:
Symbol.for('Stream.toAsyncStreamable')
该值必须是一个将对象转换为可流式传输值的函数。当对象在流式处理管道中的任何位置被遇到时(作为传递给 from() 的源,或作为从转换中返回的值),将调用此方法以生成实际数据。它可以返回(或解析为)任何可流式传输的值:字符串、Uint8Array、AsyncIterable、Iterable 或另一个可流式传输对象。
🌐 The value must be a function that converts the object into a streamable value.
When the object is encountered anywhere in the streaming pipeline (as a source
passed to from(), or as a value returned from a transform), this method is
called to produce the actual data. It may return (or resolve to) any streamable
value: a string, Uint8Array, AsyncIterable, Iterable, or another streamable
object.
import { from, text } from 'node:stream/iter';
class Greeting {
#name;
constructor(name) {
this.#name = name;
}
[Symbol.for('Stream.toAsyncStreamable')]() {
return `hello ${this.#name}`;
}
}
const stream = from(new Greeting('world'));
console.log(await text(stream)); // 'hello world'const { from, text } = require('node:stream/iter');
class Greeting {
#name;
constructor(name) {
this.#name = name;
}
[Symbol.for('Stream.toAsyncStreamable')]() {
return `hello ${this.#name}`;
}
}
const stream = from(new Greeting('world'));
text(stream).then(console.log); // 'hello world'
Stream.toStreamable#>
- 值:
Symbol.for('Stream.toStreamable')
该值必须是一个函数,能够同步地将对象转换为可流式传输的值。当在流式管道中任何地方遇到该对象时(作为传递给 fromSync() 的源,或作为从同步转换返回的值),将调用此方法以生成实际数据。它必须同步返回一个可流式传输的值:字符串、Uint8Array 或 Iterable。
🌐 The value must be a function that synchronously converts the object into a
streamable value. When the object is encountered anywhere in the streaming
pipeline (as a source passed to fromSync(), or as a value returned from a
sync transform), this method is called to produce the actual data. It must
synchronously return a streamable value: a string, Uint8Array, or Iterable.
import { fromSync, textSync } from 'node:stream/iter';
class Greeting {
#name;
constructor(name) {
this.#name = name;
}
[Symbol.for('Stream.toStreamable')]() {
return `hello ${this.#name}`;
}
}
const stream = fromSync(new Greeting('world'));
console.log(textSync(stream)); // 'hello world'const { fromSync, textSync } = require('node:stream/iter');
class Greeting {
#name;
constructor(name) {
this.#name = name;
}
[Symbol.for('Stream.toStreamable')]() {
return `hello ${this.#name}`;
}
}
const stream = fromSync(new Greeting('world'));
console.log(textSync(stream)); // 'hello world'