- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用 Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- Corepack 核心包
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议 2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS 模块
- module/esm ECMAScript 模块
- module/package 包模块
- module/typescript TS 模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- sea 单个可执行应用程序
Node.js v22.12.0 文档
- Node.js v22.12.0
-
目录
- 网络流 API
- 概述
- API
- 类:
ReadableStream
new ReadableStream([underlyingSource [, strategy]])
readableStream.locked
readableStream.cancel([reason])
readableStream.getReader([options])
readableStream.pipeThrough(transform[, options])
readableStream.pipeTo(destination[, options])
readableStream.tee()
readableStream.values([options])
- 异步迭代
- 使用
postMessage()
传输
ReadableStream.from(iterable)
- 类:
ReadableStreamDefaultReader
- 类:
ReadableStreamBYOBReader
- 类:
ReadableStreamDefaultController
- 类:
ReadableByteStreamController
- 类:
ReadableStreamBYOBRequest
- 类:
WritableStream
- 类:
WritableStreamDefaultWriter
new WritableStreamDefaultWriter(stream)
writableStreamDefaultWriter.abort([reason])
writableStreamDefaultWriter.close()
writableStreamDefaultWriter.closed
writableStreamDefaultWriter.desiredSize
writableStreamDefaultWriter.ready
writableStreamDefaultWriter.releaseLock()
writableStreamDefaultWriter.write([chunk])
- 类:
WritableStreamDefaultController
- 类:
TransformStream
- 类:
TransformStreamDefaultController
- 类:
ByteLengthQueuingStrategy
- 类:
CountQueuingStrategy
- 类:
TextEncoderStream
- 类:
TextDecoderStream
- 类:
CompressionStream
- 类:
DecompressionStream
- 实用工具消费者
- 类:
- 网络流 API
-
导航
- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用 Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- Corepack 核心包
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议 2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS 模块
- module/esm ECMAScript 模块
- module/package 包模块
- module/typescript TS 模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- sea 单个可执行应用程序
- 其他版本
网络流 API#
¥Web Streams API
¥Stability: 2 - Stable
WHATWG 流标准 的一个实现。
¥An implementation of the WHATWG Streams Standard.
概述#
¥Overview
WHATWG 流标准(或 "网络流")定义了一个用于处理流数据的 API。它类似于 Node.js 流 API,但出现较晚,已成为跨许多 JavaScript 环境流式传输数据的 "标准" API。
¥The WHATWG Streams Standard (or "web streams") defines an API for handling streaming data. It is similar to the Node.js Streams API but emerged later and has become the "standard" API for streaming data across many JavaScript environments.
存在三种主要类型的对象:
¥There are three primary types of objects:
-
ReadableStream
- 表示流式数据源。¥
ReadableStream
- Represents a source of streaming data. -
WritableStream
- 表示流数据的目标。¥
WritableStream
- Represents a destination for streaming data. -
TransformStream
- 表示用于转换流数据的算法。¥
TransformStream
- Represents an algorithm for transforming streaming data.
示例 ReadableStream
#
¥Example ReadableStream
此示例创建一个简单的 ReadableStream
,它永远每秒推送一次当前的 performance.now()
时间戳。异步迭代器用于从流中读取数据。
¥This example creates a simple ReadableStream
that pushes the current
performance.now()
timestamp once every second forever. An async iterable
is used to read the data from the stream.
import {
ReadableStream,
} from 'node:stream/web';
import {
setInterval as every,
} from 'node:timers/promises';
import {
performance,
} from 'node:perf_hooks';
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
for await (const value of stream)
console.log(value);
const {
ReadableStream,
} = require('node:stream/web');
const {
setInterval: every,
} = require('node:timers/promises');
const {
performance,
} = require('node:perf_hooks');
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
(async () => {
for await (const value of stream)
console.log(value);
})();
API#
类:ReadableStream
#
¥Class: ReadableStream
new ReadableStream([underlyingSource [, strategy]])
#
-
underlyingSource
<Object>-
start
<Function> 创建ReadableStream
时立即调用的用户定义函数。¥
start
<Function> A user-defined function that is invoked immediately when theReadableStream
is created.-
controller
<ReadableStreamDefaultController> | <ReadableByteStreamController> -
返回:
undefined
或用undefined
实现的 promise。¥Returns:
undefined
or a promise fulfilled withundefined
.
-
-
pull
<Function> 当ReadableStream
内部队列未满时重复调用的用户定义函数。该操作可以是同步的或异步的。如果是异步的,该函数将不会被再次调用,直到先前返回的 promise 得到满足。¥
pull
<Function> A user-defined function that is called repeatedly when theReadableStream
internal queue is not full. The operation may be sync or async. If async, the function will not be called again until the previously returned promise is fulfilled.-
controller
<ReadableStreamDefaultController> | <ReadableByteStreamController> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
-
-
cancel
<Function> 取消ReadableStream
时调用的用户定义函数。¥
cancel
<Function> A user-defined function that is called when theReadableStream
is canceled.-
reason
<any> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
-
-
type
<string> 必须是'bytes'
或undefined
。¥
type
<string> Must be'bytes'
orundefined
. -
autoAllocateChunkSize
<number> 仅在type
等于'bytes'
时使用。当设置为非零值时,视图缓冲区会自动分配给ReadableByteStreamController.byobRequest
。如果未设置,则必须使用流的内部队列通过默认读取器ReadableStreamDefaultReader
传输数据。¥
autoAllocateChunkSize
<number> Used only whentype
is equal to'bytes'
. When set to a non-zero value a view buffer is automatically allocated toReadableByteStreamController.byobRequest
. When not set one must use stream's internal queues to transfer data via the default readerReadableStreamDefaultReader
.
-
-
strategy
<Object>-
highWaterMark
<number> 应用背压之前的最大内部队列大小。¥
highWaterMark
<number> The maximum internal queue size before backpressure is applied. -
size
<Function> 一个用户定义的函数,用于标识每个数据块的大小。¥
size
<Function> A user-defined function used to identify the size of each chunk of data.
-
readableStream.locked
#
-
类型:<boolean> 如果该 <ReadableStream> 有活动读取器,则设置为
true
。¥Type: <boolean> Set to
true
if there is an active reader for this <ReadableStream>.
readableStream.locked
属性默认为 false
,当有活动读取器使用流的数据时切换为 true
。
¥The readableStream.locked
property is false
by default, and is
switched to true
while there is an active reader consuming the
stream's data.
readableStream.cancel([reason])
#
-
reason
<any> -
返回:一旦取消完成,
undefined
就会兑现 promise。¥Returns: A promise fulfilled with
undefined
once cancelation has been completed.
readableStream.getReader([options])
#
-
options
<Object> -
返回:<ReadableStreamDefaultReader> | <ReadableStreamBYOBReader>
¥Returns: <ReadableStreamDefaultReader> | <ReadableStreamBYOBReader>
import { ReadableStream } from 'node:stream/web';
const stream = new ReadableStream();
const reader = stream.getReader();
console.log(await reader.read());
const { ReadableStream } = require('node:stream/web');
const stream = new ReadableStream();
const reader = stream.getReader();
reader.read().then(console.log);
使 readableStream.locked
成为 true
。
¥Causes the readableStream.locked
to be true
.
readableStream.pipeThrough(transform[, options])
#
-
transform
<Object>-
readable
<ReadableStream>transform.writable
将从ReadableStream
接收到的可能修改的数据推送到ReadableStream
。¥
readable
<ReadableStream> TheReadableStream
to whichtransform.writable
will push the potentially modified data it receives from thisReadableStream
. -
writable
<WritableStream>ReadableStream
的数据将写入的WritableStream
。¥
writable
<WritableStream> TheWritableStream
to which thisReadableStream
's data will be written.
-
-
options
<Object>-
preventAbort
<boolean> 当true
时,此ReadableStream
中的错误不会导致transform.writable
中止。¥
preventAbort
<boolean> Whentrue
, errors in thisReadableStream
will not causetransform.writable
to be aborted. -
preventCancel
<boolean> 当true
时,目标transform.writable
中的错误不会导致此ReadableStream
被取消。¥
preventCancel
<boolean> Whentrue
, errors in the destinationtransform.writable
do not cause thisReadableStream
to be canceled. -
preventClose
<boolean>true
时,关闭这个ReadableStream
不会导致transform.writable
关闭。¥
preventClose
<boolean> Whentrue
, closing thisReadableStream
does not causetransform.writable
to be closed. -
signal
<AbortSignal> 允许使用 <AbortController> 取消数据传输。¥
signal
<AbortSignal> Allows the transfer of data to be canceled using an <AbortController>.
-
-
返回:<ReadableStream> 来自
transform.readable
。¥Returns: <ReadableStream> From
transform.readable
.
将此 <ReadableStream> 连接到 transform
参数中提供的一对 <ReadableStream> 和 <WritableStream>,以便将来自此 <ReadableStream> 的数据写入 transform.writable
,可能进行转换,然后推送到 transform.readable
。配置管道后,将返回 transform.readable
。
¥Connects this <ReadableStream> to the pair of <ReadableStream> and
<WritableStream> provided in the transform
argument such that the
data from this <ReadableStream> is written in to transform.writable
,
possibly transformed, then pushed to transform.readable
. Once the
pipeline is configured, transform.readable
is returned.
当管道操作处于活动状态时,使 readableStream.locked
变为 true
。
¥Causes the readableStream.locked
to be true
while the pipe operation
is active.
import {
ReadableStream,
TransformStream,
} from 'node:stream/web';
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
for await (const chunk of transformedStream)
console.log(chunk);
// Prints: A
const {
ReadableStream,
TransformStream,
} = require('node:stream/web');
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
(async () => {
for await (const chunk of transformedStream)
console.log(chunk);
// Prints: A
})();
readableStream.pipeTo(destination[, options])
#
-
destination
<WritableStream> 将写入此ReadableStream
的数据的 <WritableStream>。¥
destination
<WritableStream> A <WritableStream> to which thisReadableStream
's data will be written. -
options
<Object>-
preventAbort
<boolean> 当true
时,此ReadableStream
中的错误不会导致destination
中止。¥
preventAbort
<boolean> Whentrue
, errors in thisReadableStream
will not causedestination
to be aborted. -
preventCancel
<boolean> 当true
时,destination
中的错误不会导致此ReadableStream
被取消。¥
preventCancel
<boolean> Whentrue
, errors in thedestination
will not cause thisReadableStream
to be canceled. -
preventClose
<boolean>true
时,关闭这个ReadableStream
不会导致destination
关闭。¥
preventClose
<boolean> Whentrue
, closing thisReadableStream
does not causedestination
to be closed. -
signal
<AbortSignal> 允许使用 <AbortController> 取消数据传输。¥
signal
<AbortSignal> Allows the transfer of data to be canceled using an <AbortController>.
-
-
返回:
undefined
兑现的 promise¥Returns: A promise fulfilled with
undefined
当管道操作处于活动状态时,使 readableStream.locked
变为 true
。
¥Causes the readableStream.locked
to be true
while the pipe operation
is active.
readableStream.tee()
#
-
¥Returns: <ReadableStream[]>
返回一对新的 <ReadableStream> 实例,此 ReadableStream
的数据将转发到该实例。每个人都会收到相同的数据。
¥Returns a pair of new <ReadableStream> instances to which this
ReadableStream
's data will be forwarded. Each will receive the
same data.
使 readableStream.locked
成为 true
。
¥Causes the readableStream.locked
to be true
.
readableStream.values([options])
#
-
options
<Object>-
preventCancel
<boolean> 当true
时,防止在异步迭代器突然终止时关闭 <ReadableStream>。默认值:false
。¥
preventCancel
<boolean> Whentrue
, prevents the <ReadableStream> from being closed when the async iterator abruptly terminates. Default:false
.
-
创建并返回可用于使用此 ReadableStream
的数据的异步迭代器。
¥Creates and returns an async iterator usable for consuming this
ReadableStream
's data.
当异步迭代器处于活动状态时,使 readableStream.locked
成为 true
。
¥Causes the readableStream.locked
to be true
while the async iterator
is active.
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream.values({ preventCancel: true }))
console.log(Buffer.from(chunk).toString());
异步迭代#
¥Async Iteration
<ReadableStream> 对象支持使用 for await
语法的异步迭代器协议。
¥The <ReadableStream> object supports the async iterator protocol using
for await
syntax.
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream)
console.log(Buffer.from(chunk).toString());
异步迭代器将消耗 <ReadableStream> 直到它终止。
¥The async iterator will consume the <ReadableStream> until it terminates.
默认情况下,如果异步迭代器提前退出(通过 break
、return
或 throw
),<ReadableStream> 将被关闭。为防止 <ReadableStream> 自动关闭,使用 readableStream.values()
方法获取异步迭代器并将 preventCancel
选项设置为 true
。
¥By default, if the async iterator exits early (via either a break
,
return
, or a throw
), the <ReadableStream> will be closed. To prevent
automatic closing of the <ReadableStream>, use the readableStream.values()
method to acquire the async iterator and set the preventCancel
option to
true
.
<ReadableStream> 不得锁定(即,它不得有现有的活动读取器)。在异步迭代期间,<ReadableStream> 将被锁定。
¥The <ReadableStream> must not be locked (that is, it must not have an existing active reader). During the async iteration, the <ReadableStream> will be locked.
使用 postMessage()
传输#
¥Transferring with postMessage()
可以使用 <MessagePort> 传输 <ReadableStream> 实例。
¥A <ReadableStream> instance can be transferred using a <MessagePort>.
const stream = new ReadableStream(getReadableSourceSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getReader().read().then((chunk) => {
console.log(chunk);
});
};
port2.postMessage(stream, [stream]);
ReadableStream.from(iterable)
#
-
iterable
<Iterable> 实现Symbol.asyncIterator
或Symbol.iterator
可迭代协议的对象。¥
iterable
<Iterable> Object implementing theSymbol.asyncIterator
orSymbol.iterator
iterable protocol.
一种从可迭代对象创建新 <ReadableStream> 的实用方法。
¥A utility method that creates a new <ReadableStream> from an iterable.
import { ReadableStream } from 'node:stream/web';
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // Prints: 'a', 'b', 'c'
const { ReadableStream } = require('node:stream/web');
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
(async () => {
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // Prints: 'a', 'b', 'c'
})();
类:ReadableStreamDefaultReader
#
¥Class: ReadableStreamDefaultReader
默认情况下,不带参数调用 readableStream.getReader()
将返回 ReadableStreamDefaultReader
的实例。默认读取器将通过流传递的数据块视为不透明值,这允许 <ReadableStream> 通常使用任何 JavaScript 值。
¥By default, calling readableStream.getReader()
with no arguments
will return an instance of ReadableStreamDefaultReader
. The default
reader treats the chunks of data passed through the stream as opaque
values, which allows the <ReadableStream> to work with generally any
JavaScript value.
new ReadableStreamDefaultReader(stream)
#
stream
<ReadableStream>
创建锁定到给定 <ReadableStream> 的新 <ReadableStreamDefaultReader>。
¥Creates a new <ReadableStreamDefaultReader> that is locked to the given <ReadableStream>.
readableStreamDefaultReader.cancel([reason])
#
-
reason
<any> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
取消 <ReadableStream> 并返回一个在基础流被取消时完成的 promise。
¥Cancels the <ReadableStream> and returns a promise that is fulfilled when the underlying stream has been canceled.
readableStreamDefaultReader.closed
#
-
类型:<Promise> 当关联的 <ReadableStream> 被关闭或拒绝时,如果流发生错误或在流完成关闭之前释放读取器的锁,则由
undefined
完成。¥Type: <Promise> Fulfilled with
undefined
when the associated <ReadableStream> is closed or rejected if the stream errors or the reader's lock is released before the stream finishes closing.
readableStreamDefaultReader.read()
#
从底层 <ReadableStream> 请求下一个数据块,并返回一个 promise,一旦数据可用,该 promise 就会实现。
¥Requests the next chunk of data from the underlying <ReadableStream> and returns a promise that is fulfilled with the data once it is available.
readableStreamDefaultReader.releaseLock()
#
释放此读者对底层 <ReadableStream> 的锁定。
¥Releases this reader's lock on the underlying <ReadableStream>.
类:ReadableStreamBYOBReader
#
¥Class: ReadableStreamBYOBReader
ReadableStreamBYOBReader
是面向字节的 <ReadableStream> 的替代消费者(在创建 ReadableStream
时将 underlyingSource.type
设置为 'bytes'
)。
¥The ReadableStreamBYOBReader
is an alternative consumer for
byte-oriented <ReadableStream> (those that are created with
underlyingSource.type
set equal to 'bytes'
when the
ReadableStream
was created).
BYOB
是 "带上你自己的缓冲区" 的缩写。这是一种模式,可以更有效地读取面向字节的数据,避免多余的复制。
¥The BYOB
is short for "bring your own buffer". This is a
pattern that allows for more efficient reading of byte-oriented
data that avoids extraneous copying.
import {
open,
} from 'node:fs/promises';
import {
ReadableStream,
} from 'node:stream/web';
import { Buffer } from 'node:buffer';
class Source {
type = 'bytes';
autoAllocateChunkSize = 1024;
async start(controller) {
this.file = await open(new URL(import.meta.url));
this.controller = controller;
}
async pull(controller) {
const view = controller.byobRequest?.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength,
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
controller.byobRequest.respond(bytesRead);
}
}
const stream = new ReadableStream(new Source());
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
const data = await read(stream);
console.log(Buffer.from(data).toString());
new ReadableStreamBYOBReader(stream)
#
stream
<ReadableStream>
创建锁定到给定 <ReadableStream> 的新 ReadableStreamBYOBReader
。
¥Creates a new ReadableStreamBYOBReader
that is locked to the
given <ReadableStream>.
readableStreamBYOBReader.cancel([reason])
#
-
reason
<any> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
取消 <ReadableStream> 并返回一个在基础流被取消时完成的 promise。
¥Cancels the <ReadableStream> and returns a promise that is fulfilled when the underlying stream has been canceled.
readableStreamBYOBReader.closed
#
-
类型:<Promise> 当关联的 <ReadableStream> 被关闭或拒绝时,如果流发生错误或在流完成关闭之前释放读取器的锁,则由
undefined
完成。¥Type: <Promise> Fulfilled with
undefined
when the associated <ReadableStream> is closed or rejected if the stream errors or the reader's lock is released before the stream finishes closing.
readableStreamBYOBReader.read(view[, options])
#
-
view
<Buffer> | <TypedArray> | <DataView> -
options
<Object> -
返回:用对象实现的 promise:
¥Returns: A promise fulfilled with an object:
-
value
<TypedArray> | <DataView> -
done
<boolean>
-
从底层 <ReadableStream> 请求下一个数据块,并返回一个 promise,一旦数据可用,该 promise 就会实现。
¥Requests the next chunk of data from the underlying <ReadableStream> and returns a promise that is fulfilled with the data once it is available.
不要将池化的 <Buffer> 对象实例传递给此方法。池化的 Buffer
对象是使用 Buffer.allocUnsafe()
或 Buffer.from()
创建的,或者通常由各种 node:fs
模块回调返回。这些类型的 Buffer
使用共享的底层 <ArrayBuffer> 对象,该对象包含来自所有池化的 Buffer
实例的所有数据。当 Buffer
、<TypedArray> 或 <DataView> 传递给 readableStreamBYOBReader.read()
时,视图的底层 ArrayBuffer
被分离,使该 ArrayBuffer
上可能存在的所有现有视图无效。这可能会给你的应用带来灾难性的后果。
¥Do not pass a pooled <Buffer> object instance in to this method.
Pooled Buffer
objects are created using Buffer.allocUnsafe()
,
or Buffer.from()
, or are often returned by various node:fs
module
callbacks. These types of Buffer
s use a shared underlying
<ArrayBuffer> object that contains all of the data from all of
the pooled Buffer
instances. When a Buffer
, <TypedArray>,
or <DataView> is passed in to readableStreamBYOBReader.read()
,
the view's underlying ArrayBuffer
is detached, invalidating
all existing views that may exist on that ArrayBuffer
. This
can have disastrous consequences for your application.
readableStreamBYOBReader.releaseLock()
#
释放此读者对底层 <ReadableStream> 的锁定。
¥Releases this reader's lock on the underlying <ReadableStream>.
类:ReadableStreamDefaultController
#
¥Class: ReadableStreamDefaultController
每个 <ReadableStream> 都有一个控制器,负责流队列的内部状态和管理。ReadableStreamDefaultController
是 ReadableStream
的默认控制器实现,它不是面向字节的。
¥Every <ReadableStream> has a controller that is responsible for
the internal state and management of the stream's queue. The
ReadableStreamDefaultController
is the default controller
implementation for ReadableStream
s that are not byte-oriented.
readableStreamDefaultController.close()
#
关闭与此控制器关联的 <ReadableStream>。
¥Closes the <ReadableStream> to which this controller is associated.
readableStreamDefaultController.desiredSize
#
返回填充 <ReadableStream> 队列的剩余数据量。
¥Returns the amount of data remaining to fill the <ReadableStream>'s queue.
readableStreamDefaultController.enqueue([chunk])
#
chunk
<any>
将新数据块附加到 <ReadableStream> 的队列。
¥Appends a new chunk of data to the <ReadableStream>'s queue.
readableStreamDefaultController.error([error])
#
error
<any>
触发导致 <ReadableStream> 出错并关闭的错误信号。
¥Signals an error that causes the <ReadableStream> to error and close.
类:ReadableByteStreamController
#
¥Class: ReadableByteStreamController
每个 <ReadableStream> 都有一个控制器,负责流队列的内部状态和管理。ReadableByteStreamController
用于面向字节的 ReadableStream
。
¥Every <ReadableStream> has a controller that is responsible for
the internal state and management of the stream's queue. The
ReadableByteStreamController
is for byte-oriented ReadableStream
s.
readableByteStreamController.byobRequest
#
readableByteStreamController.close()
#
关闭与此控制器关联的 <ReadableStream>。
¥Closes the <ReadableStream> to which this controller is associated.
readableByteStreamController.desiredSize
#
返回填充 <ReadableStream> 队列的剩余数据量。
¥Returns the amount of data remaining to fill the <ReadableStream>'s queue.
readableByteStreamController.enqueue(chunk)
#
chunk
:<Buffer> | <TypedArray> | <DataView>
将新数据块附加到 <ReadableStream> 的队列。
¥Appends a new chunk of data to the <ReadableStream>'s queue.
readableByteStreamController.error([error])
#
error
<any>
触发导致 <ReadableStream> 出错并关闭的错误信号。
¥Signals an error that causes the <ReadableStream> to error and close.
类:ReadableStreamBYOBRequest
#
¥Class: ReadableStreamBYOBRequest
在面向字节的流中使用 ReadableByteStreamController
和使用 ReadableStreamBYOBReader
时,readableByteStreamController.byobRequest
属性提供对表示当前读取请求的 ReadableStreamBYOBRequest
实例的访问。该对象用于获取为读取请求填充而提供的 ArrayBuffer
/TypedArray
的访问权限,并提供用于触发数据已提供信号的方法。
¥When using ReadableByteStreamController
in byte-oriented
streams, and when using the ReadableStreamBYOBReader
,
the readableByteStreamController.byobRequest
property
provides access to a ReadableStreamBYOBRequest
instance
that represents the current read request. The object
is used to gain access to the ArrayBuffer
/TypedArray
that has been provided for the read request to fill,
and provides methods for signaling that the data has
been provided.
readableStreamBYOBRequest.respond(bytesWritten)
#
bytesWritten
<number>
表示已将 bytesWritten
个字节写入 readableStreamBYOBRequest.view
。
¥Signals that a bytesWritten
number of bytes have been written
to readableStreamBYOBRequest.view
.
readableStreamBYOBRequest.respondWithNewView(view)
#
view
<Buffer> | <TypedArray> | <DataView>
表示请求已通过写入新的 Buffer
、TypedArray
或 DataView
的字节得到满足。
¥Signals that the request has been fulfilled with bytes written
to a new Buffer
, TypedArray
, or DataView
.
readableStreamBYOBRequest.view
#
-
类型:<Buffer> | <TypedArray> | <DataView>
¥Type: <Buffer> | <TypedArray> | <DataView>
类:WritableStream
#
¥Class: WritableStream
WritableStream
是流数据发送到的目标。
¥The WritableStream
is a destination to which stream data is sent.
import {
WritableStream,
} from 'node:stream/web';
const stream = new WritableStream({
write(chunk) {
console.log(chunk);
},
});
await stream.getWriter().write('Hello World');
new WritableStream([underlyingSink[, strategy]])
#
-
underlyingSink
<Object>-
start
<Function> 创建WritableStream
时立即调用的用户定义函数。¥
start
<Function> A user-defined function that is invoked immediately when theWritableStream
is created.-
controller
<WritableStreamDefaultController> -
返回:
undefined
或用undefined
实现的 promise。¥Returns:
undefined
or a promise fulfilled withundefined
.
-
-
write
<Function> 将数据块写入WritableStream
时调用的用户定义函数。¥
write
<Function> A user-defined function that is invoked when a chunk of data has been written to theWritableStream
.-
chunk
<any> -
controller
<WritableStreamDefaultController> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
-
-
close
<Function>WritableStream
关闭时调用的用户定义函数。¥
close
<Function> A user-defined function that is called when theWritableStream
is closed.-
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
-
-
abort
<Function> 调用以突然关闭WritableStream
的用户定义函数。¥
abort
<Function> A user-defined function that is called to abruptly close theWritableStream
.-
reason
<any> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
-
-
type
<any>type
选项保留供将来使用,必须未定义。¥
type
<any> Thetype
option is reserved for future use and must be undefined.
-
-
strategy
<Object>-
highWaterMark
<number> 应用背压之前的最大内部队列大小。¥
highWaterMark
<number> The maximum internal queue size before backpressure is applied. -
size
<Function> 一个用户定义的函数,用于标识每个数据块的大小。¥
size
<Function> A user-defined function used to identify the size of each chunk of data.
-
writableStream.abort([reason])
#
-
reason
<any> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
突然终止 WritableStream
。所有排队的写入都将被取消,并拒绝相关的 promise。
¥Abruptly terminates the WritableStream
. All queued writes will be
canceled with their associated promises rejected.
writableStream.close()
#
-
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
当不需要额外写入时关闭 WritableStream
。
¥Closes the WritableStream
when no additional writes are expected.
writableStream.getWriter()
#
创建并返回一个新的写入器实例,可用于将数据写入 WritableStream
。
¥Creates and returns a new writer instance that can be used to write
data into the WritableStream
.
writableStream.locked
#
writableStream.locked
属性默认为 false
,当有活动写入器附加到此 WritableStream
时切换为 true
。
¥The writableStream.locked
property is false
by default, and is
switched to true
while there is an active writer attached to this
WritableStream
.
使用 postMessage() 传输#
¥Transferring with postMessage()
可以使用 <MessagePort> 传输 <WritableStream> 实例。
¥A <WritableStream> instance can be transferred using a <MessagePort>.
const stream = new WritableStream(getWritableSinkSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getWriter().write('hello');
};
port2.postMessage(stream, [stream]);
类:WritableStreamDefaultWriter
#
¥Class: WritableStreamDefaultWriter
new WritableStreamDefaultWriter(stream)
#
stream
<WritableStream>
创建锁定到给定 WritableStream
的新 WritableStreamDefaultWriter
。
¥Creates a new WritableStreamDefaultWriter
that is locked to the given
WritableStream
.
writableStreamDefaultWriter.abort([reason])
#
-
reason
<any> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
突然终止 WritableStream
。所有排队的写入都将被取消,并拒绝相关的 promise。
¥Abruptly terminates the WritableStream
. All queued writes will be
canceled with their associated promises rejected.
writableStreamDefaultWriter.close()
#
-
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
当不需要额外写入时关闭 WritableStream
。
¥Closes the WritableStream
when no additional writes are expected.
writableStreamDefaultWriter.closed
#
-
类型:<Promise> 当关联的 <WritableStream> 被关闭或拒绝时,如果流发生错误或写入器的锁在流完成关闭之前被释放,则由
undefined
完成。¥Type: <Promise> Fulfilled with
undefined
when the associated <WritableStream> is closed or rejected if the stream errors or the writer's lock is released before the stream finishes closing.
writableStreamDefaultWriter.desiredSize
#
填充 <WritableStream> 队列所需的数据量。
¥The amount of data required to fill the <WritableStream>'s queue.
writableStreamDefaultWriter.ready
#
-
类型:<Promise> 当写入器准备好使用时,用
undefined
完成。¥Type: <Promise> Fulfilled with
undefined
when the writer is ready to be used.
writableStreamDefaultWriter.releaseLock()
#
释放作者对底层 <ReadableStream> 的锁定。
¥Releases this writer's lock on the underlying <ReadableStream>.
writableStreamDefaultWriter.write([chunk])
#
-
chunk
:<any> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
将新数据块附加到 <WritableStream> 的队列。
¥Appends a new chunk of data to the <WritableStream>'s queue.
类:WritableStreamDefaultController
#
¥Class: WritableStreamDefaultController
WritableStreamDefaultController
管理 <WritableStream> 的内部状态。
¥The WritableStreamDefaultController
manages the <WritableStream>'s
internal state.
writableStreamDefaultController.error([error])
#
error
<any>
由用户代码调用以表示在处理 WritableStream
数据时发生了错误。调用时,<WritableStream> 将被中止,当前挂起的写入被取消。
¥Called by user-code to signal that an error has occurred while processing
the WritableStream
data. When called, the <WritableStream> will be aborted,
with currently pending writes canceled.
writableStreamDefaultController.signal
#
-
类型:<AbortSignal>
AbortSignal
可用于在 <WritableStream> 中止时取消挂起的写入或关闭操作。¥Type: <AbortSignal> An
AbortSignal
that can be used to cancel pending write or close operations when a <WritableStream> is aborted.
类:TransformStream
#
¥Class: TransformStream
TransformStream
由 <ReadableStream> 和 <WritableStream> 组成,它们相互连接,以便在将写入 WritableStream
的数据推送到 ReadableStream
的队列之前接收并可能进行转换。
¥A TransformStream
consists of a <ReadableStream> and a <WritableStream> that
are connected such that the data written to the WritableStream
is received,
and potentially transformed, before being pushed into the ReadableStream
's
queue.
import {
TransformStream,
} from 'node:stream/web';
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
await Promise.all([
transform.writable.getWriter().write('A'),
transform.readable.getReader().read(),
]);
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])
#
-
transformer
<Object>-
start
<Function> 创建TransformStream
时立即调用的用户定义函数。¥
start
<Function> A user-defined function that is invoked immediately when theTransformStream
is created.-
controller
<TransformStreamDefaultController> -
返回:
undefined
或用undefined
实现的 promise¥Returns:
undefined
or a promise fulfilled withundefined
-
-
transform
<Function> 一个用户定义的函数,它接收并可能修改写入transformStream.writable
的数据块,然后再将其转发到transformStream.readable
。¥
transform
<Function> A user-defined function that receives, and potentially modifies, a chunk of data written totransformStream.writable
, before forwarding that on totransformStream.readable
.-
chunk
<any> -
controller
<TransformStreamDefaultController> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
-
-
flush
<Function> 在TransformStream
的可写端关闭之前立即调用的用户定义函数,表示转换过程结束。¥
flush
<Function> A user-defined function that is called immediately before the writable side of theTransformStream
is closed, signaling the end of the transformation process.-
controller
<TransformStreamDefaultController> -
返回:
undefined
兑现的 promise。¥Returns: A promise fulfilled with
undefined
.
-
-
readableType
<any>readableType
选项保留供将来使用,并且必须是undefined
。¥
readableType
<any> thereadableType
option is reserved for future use and must beundefined
. -
writableType
<any>writableType
选项保留供将来使用,并且必须是undefined
。¥
writableType
<any> thewritableType
option is reserved for future use and must beundefined
.
-
-
writableStrategy
<Object>-
highWaterMark
<number> 应用背压之前的最大内部队列大小。¥
highWaterMark
<number> The maximum internal queue size before backpressure is applied. -
size
<Function> 一个用户定义的函数,用于标识每个数据块的大小。¥
size
<Function> A user-defined function used to identify the size of each chunk of data.
-
-
readableStrategy
<Object>-
highWaterMark
<number> 应用背压之前的最大内部队列大小。¥
highWaterMark
<number> The maximum internal queue size before backpressure is applied. -
size
<Function> 一个用户定义的函数,用于标识每个数据块的大小。¥
size
<Function> A user-defined function used to identify the size of each chunk of data.
-
transformStream.readable
#
-
¥Type: <ReadableStream>
transformStream.writable
#
-
¥Type: <WritableStream>
使用 postMessage() 传输#
¥Transferring with postMessage()
可以使用 <MessagePort> 传输 <TransformStream> 实例。
¥A <TransformStream> instance can be transferred using a <MessagePort>.
const stream = new TransformStream();
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
const { writable, readable } = data;
// ...
};
port2.postMessage(stream, [stream]);
类:TransformStreamDefaultController
#
¥Class: TransformStreamDefaultController
TransformStreamDefaultController
管理 TransformStream
的内部状态。
¥The TransformStreamDefaultController
manages the internal state
of the TransformStream
.
transformStreamDefaultController.desiredSize
#
填充可读端队列所需的数据量。
¥The amount of data required to fill the readable side's queue.
transformStreamDefaultController.enqueue([chunk])
#
chunk
<any>
将一大块数据附加到可读端的队列中。
¥Appends a chunk of data to the readable side's queue.
transformStreamDefaultController.error([reason])
#
reason
<any>
向可读和可写端触发信号,表明在处理转换数据时发生了错误,导致双方突然关闭。
¥Signals to both the readable and writable side that an error has occurred while processing the transform data, causing both sides to be abruptly closed.
transformStreamDefaultController.terminate()
#
关闭传输的可读端并导致可写端因错误而突然关闭。
¥Closes the readable side of the transport and causes the writable side to be abruptly closed with an error.
类:ByteLengthQueuingStrategy
#
¥Class: ByteLengthQueuingStrategy
new ByteLengthQueuingStrategy(init)
#
byteLengthQueuingStrategy.highWaterMark
#
byteLengthQueuingStrategy.size
#
-
类型:<Function>
¥Type: <Function>
类:CountQueuingStrategy
#
¥Class: CountQueuingStrategy
new CountQueuingStrategy(init)
#
countQueuingStrategy.highWaterMark
#
countQueuingStrategy.size
#
-
类型:<Function>
¥Type: <Function>
类:TextEncoderStream
#
¥Class: TextEncoderStream
new TextEncoderStream()
#
创建新的 TextEncoderStream
实例。
¥Creates a new TextEncoderStream
instance.
textEncoderStream.encoding
#
TextEncoderStream
实例支持的编码。
¥The encoding supported by the TextEncoderStream
instance.
textEncoderStream.readable
#
-
¥Type: <ReadableStream>
textEncoderStream.writable
#
-
¥Type: <WritableStream>
类:TextDecoderStream
#
¥Class: TextDecoderStream
new TextDecoderStream([encoding[, options]])
#
-
encoding
<string> 标识此TextDecoder
实例支持的encoding
。默认值:'utf-8'
。¥
encoding
<string> Identifies theencoding
that thisTextDecoder
instance supports. Default:'utf-8'
. -
options
<Object>-
fatal
<boolean> 如果解码失败是致命的,则为true
。¥
fatal
<boolean>true
if decoding failures are fatal. -
ignoreBOM
<boolean> 当true
时,TextDecoderStream
将在解码结果中包含字节顺序标记。当false
时,字节顺序标记将从输出中删除。此选项仅在encoding
为'utf-8'
、'utf-16be'
或'utf-16le'
时使用。默认值:false
。¥
ignoreBOM
<boolean> Whentrue
, theTextDecoderStream
will include the byte order mark in the decoded result. Whenfalse
, the byte order mark will be removed from the output. This option is only used whenencoding
is'utf-8'
,'utf-16be'
, or'utf-16le'
. Default:false
.
-
创建新的 TextDecoderStream
实例。
¥Creates a new TextDecoderStream
instance.
textDecoderStream.encoding
#
TextDecoderStream
实例支持的编码。
¥The encoding supported by the TextDecoderStream
instance.
textDecoderStream.fatal
#
如果解码错误导致抛出 TypeError
,则该值将为 true
。
¥The value will be true
if decoding errors result in a TypeError
being
thrown.
textDecoderStream.ignoreBOM
#
如果解码结果将包含字节顺序标记,则该值将为 true
。
¥The value will be true
if the decoding result will include the byte order
mark.
textDecoderStream.readable
#
-
¥Type: <ReadableStream>
textDecoderStream.writable
#
-
¥Type: <WritableStream>
类:CompressionStream
#
¥Class: CompressionStream
new CompressionStream(format)
#
-
format
<string>'deflate'
、'deflate-raw'
或'gzip'
之一。¥
format
<string> One of'deflate'
,'deflate-raw'
, or'gzip'
.
compressionStream.readable
#
-
¥Type: <ReadableStream>
compressionStream.writable
#
-
¥Type: <WritableStream>
类:DecompressionStream
#
¥Class: DecompressionStream
new DecompressionStream(format)
#
-
format
<string>'deflate'
、'deflate-raw'
或'gzip'
之一。¥
format
<string> One of'deflate'
,'deflate-raw'
, or'gzip'
.
decompressionStream.readable
#
-
¥Type: <ReadableStream>
decompressionStream.writable
#
-
¥Type: <WritableStream>
实用工具消费者#
¥Utility Consumers
实用程序消费者函数提供了用于消费流的通用选项。
¥The utility consumer functions provide common options for consuming streams.
使用以下方式访问它们:
¥They are accessed using:
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';
const {
arrayBuffer,
blob,
buffer,
json,
text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)
#
-
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator> -
返回:<Promise> 使用包含流的全部内容的
ArrayBuffer
来实现。¥Returns: <Promise> Fulfills with an
ArrayBuffer
containing the full contents of the stream.
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76
const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76
});
streamConsumers.blob(stream)
#
-
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator> -
返回:<Promise> 使用包含流的完整内容的 <Blob> 来实现。
¥Returns: <Promise> Fulfills with a <Blob> containing the full contents of the stream.
import { blob } from 'node:stream/consumers';
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27
const { blob } = require('node:stream/consumers');
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
blob(readable).then((data) => {
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27
});
streamConsumers.buffer(stream)
#
-
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator> -
返回:<Promise> 使用包含流的完整内容的 <Buffer> 来实现。
¥Returns: <Promise> Fulfills with a <Buffer> containing the full contents of the stream.
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
});
streamConsumers.json(stream)
#
-
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator> -
返回:<Promise> 将流的内容解析为 UTF-8 编码字符串,然后通过
JSON.parse()
传递。¥Returns: <Promise> Fulfills with the contents of the stream parsed as a UTF-8 encoded string that is then passed through
JSON.parse()
.
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100
const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100
});
streamConsumers.text(stream)
#
-
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator> -
返回:<Promise> 满足解析为 UTF-8 编码字符串的流的内容。
¥Returns: <Promise> Fulfills with the contents of the stream parsed as a UTF-8 encoded string.
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
});