- assert断言
- async_hooks异步钩子
- async_hooks/context异步上下文
- buffer缓冲区
- C++插件
- C/C++插件(使用Node-API)
- C++嵌入器
- child_process子进程
- cluster集群
- CLI命令行
- console控制台
- Corepack核心包
- crypto加密
- crypto/webcrypto网络加密
- debugger调试器
- deprecation弃用
- dgram数据报
- diagnostics_channel诊断通道
- dns域名服务器
- domain域
- Error错误
- events事件触发器
- fs文件系统
- global全局变量
- http超文本传输协议
- http2超文本传输协议2.0
- https安全超文本传输协议
- inspector检查器
- Intl国际化
- module模块
- module/cjsCommonJS模块
- module/esmECMAScript模块
- module/package包模块
- net网络
- os操作系统
- path路径
- perf_hooks性能钩子
- permission权限
- policy安全策略
- process进程
- punycode域名代码
- querystring查询字符串
- readline逐行读取
- repl交互式解释器
- report诊断报告
- stream流
- stream/web网络流
- string_decoder字符串解码器
- test测试
- timers定时器
- tls安全传输层
- trace_events跟踪事件
- tty终端
- url网址
- util实用工具
- v8引擎
- vm虚拟机
- wasi网络汇编系统接口
- worker_threads工作线程
- zlib压缩
Node.js v16.18.1 文档
- Node.js 16.18.1
-
►
目录
- stream/web 网络流
- 概述
- API
ReadableStream
类new ReadableStream([underlyingSource [, strategy]])
readableStream.locked
readableStream.cancel([reason])
readableStream.getReader([options])
readableStream.pipeThrough(transform[, options])
readableStream.pipeTo(destination, options)
readableStream.tee()
readableStream.values([options])
- 异步迭代
- 使用 postMessage() 传输
ReadableStreamDefaultReader
类ReadableStreamBYOBReader
类ReadableStreamDefaultController
类ReadableByteStreamController
类ReadableStreamBYOBRequest
类WritableStream
类WritableStreamDefaultWriter
类new WritableStreamDefaultWriter(stream)
writableStreamDefaultWriter.abort([reason])
writableStreamDefaultWriter.close()
writableStreamDefaultWriter.closed
writableStreamDefaultWriter.desiredSize
writableStreamDefaultWriter.ready
writableStreamDefaultWriter.releaseLock()
writableStreamDefaultWriter.write([chunk])
WritableStreamDefaultController
类TransformStream
类TransformStreamDefaultController
类ByteLengthQueuingStrategy
类CountQueuingStrategy
类TextEncoderStream
类TextDecoderStream
类- 实用消费者
- stream/web 网络流
-
►
索引
- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- Corepack 核心包
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS模块
- module/esm ECMAScript模块
- module/package 包模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- policy 安全策略
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- stream 流
- stream/web 网络流
- string_decoder 字符串解码器
- test 测试
- timers 定时器
- tls 安全传输层
- trace_events 跟踪事件
- tty 终端
- url 网址
- util 实用工具
- v8 引擎
- vm 虚拟机
- wasi 网络汇编系统接口
- worker_threads 工作线程
- zlib 压缩
- ► 其他版本
- 文档搜索
目录
- stream/web 网络流
- 概述
- API
ReadableStream
类new ReadableStream([underlyingSource [, strategy]])
readableStream.locked
readableStream.cancel([reason])
readableStream.getReader([options])
readableStream.pipeThrough(transform[, options])
readableStream.pipeTo(destination, options)
readableStream.tee()
readableStream.values([options])
- 异步迭代
- 使用 postMessage() 传输
ReadableStreamDefaultReader
类ReadableStreamBYOBReader
类ReadableStreamDefaultController
类ReadableByteStreamController
类ReadableStreamBYOBRequest
类WritableStream
类WritableStreamDefaultWriter
类new WritableStreamDefaultWriter(stream)
writableStreamDefaultWriter.abort([reason])
writableStreamDefaultWriter.close()
writableStreamDefaultWriter.closed
writableStreamDefaultWriter.desiredSize
writableStreamDefaultWriter.ready
writableStreamDefaultWriter.releaseLock()
writableStreamDefaultWriter.write([chunk])
WritableStreamDefaultController
类TransformStream
类TransformStreamDefaultController
类ByteLengthQueuingStrategy
类CountQueuingStrategy
类TextEncoderStream
类TextDecoderStream
类- 实用消费者
stream/web 网络流#
An implementation of the WHATWG Streams Standard.
import {
ReadableStream,
WritableStream,
TransformStream,
} from 'node:stream/web';
const {
ReadableStream,
WritableStream,
TransformStream,
} = require('stream/web');
概述#
The WHATWG Streams Standard (or "web streams") defines an API for handling streaming data. It is similar to the Node.js Streams API but emerged later and has become the "standard" API for streaming data across many JavaScript environments.
There are three primary types of objects:
ReadableStream
- Represents a source of streaming data.WritableStream
- Represents a destination for streaming data.TransformStream
- Represents an algorithm for transforming streaming data.
ReadableStream 示例#
This example creates a simple ReadableStream
that pushes the current
performance.now()
timestamp once every second forever. An async iterable
is used to read the data from the stream.
import {
ReadableStream
} from 'node:stream/web';
import {
setInterval as every
} from 'node:timers/promises';
import {
performance
} from 'node:perf_hooks';
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
}
});
for await (const value of stream)
console.log(value);
const {
ReadableStream
} = require('node:stream/web');
const {
setInterval: every
} = require('node:timers/promises');
const {
performance
} = require('node:perf_hooks');
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
}
});
(async () => {
for await (const value of stream)
console.log(value);
})();
API#
ReadableStream
类#
new ReadableStream([underlyingSource [, strategy]])
#
underlyingSource
<Object>start
<Function> A user-defined function that is invoked immediately when theReadableStream
is created.controller
<ReadableStreamDefaultController> | <ReadableByteStreamController>- 返回:
undefined
or a promise fulfilled withundefined
.
pull
<Function> A user-defined function that is called repeatedly when theReadableStream
internal queue is not full. The operation may be sync or async. If async, the function will not be called again until the previously returned promise is fulfilled.controller
<ReadableStreamDefaultController> | <ReadableByteStreamController>- 返回: A promise fulfilled with
undefined
.
cancel
<Function> A user-defined function that is called when theReadableStream
is canceled.reason
<any>- 返回: A promise fulfilled with
undefined
.
type
<string> Must be'bytes'
orundefined
.autoAllocateChunkSize
<number> Used only whentype
is equal to'bytes'
.
strategy
<Object>highWaterMark
<number> The maximum internal queue size before backpressure is applied.size
<Function> A user-defined function used to identify the size of each chunk of data.
readableStream.locked
#
- 类型: <boolean> Set to
true
if there is an active reader for this <ReadableStream>.
The readableStream.locked
property is false
by default, and is
switched to true
while there is an active reader consuming the
stream's data.
readableStream.cancel([reason])
#
reason
<any>- 返回: A promise fulfilled with
undefined
once cancelation has been completed.
readableStream.getReader([options])
#
options
<Object>mode
<string>'byob'
orundefined
- 返回: <ReadableStreamDefaultReader> | <ReadableStreamBYOBReader>
import { ReadableStream } from 'node:stream/web';
const stream = new ReadableStream();
const reader = stream.getReader();
console.log(await reader.read());
const { ReadableStream } = require('node:stream/web');
const stream = new ReadableStream();
const reader = stream.getReader();
reader.read().then(console.log);
Causes the readableStream.locked
to be true
.
readableStream.pipeThrough(transform[, options])
#
transform
<Object>readable
<ReadableStream> TheReadableStream
to whichtransform.writable
will push the potentially modified data is receives from thisReadableStream
.writable
<WritableStream> TheWritableStream
to which thisReadableStream
's data will be written.
options
<Object>preventAbort
<boolean> Whentrue
, errors in thisReadableStream
will not causetransform.writable
to be aborted.preventCancel
<boolean> Whentrue
, errors in the destinationtransform.writable
do not cause thisReadableStream
to be canceled.preventClose
<boolean> Whentrue
, closing thisReadableStream
does not causetransform.writable
to be closed.signal
<AbortSignal> Allows the transfer of data to be canceled using an <AbortController>.
- 返回: <ReadableStream> From
transform.readable
.
Connects this <ReadableStream> to the pair of <ReadableStream> and
<WritableStream> provided in the transform
argument such that the
data from this <ReadableStream> is written in to transform.writable
,
possibly transformed, then pushed to transform.readable
. Once the
pipeline is configured, transform.readable
is returned.
Causes the readableStream.locked
to be true
while the pipe operation
is active.
import {
ReadableStream,
TransformStream,
} from 'node:stream/web';
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const transformedStream = stream.pipeThrough(transform);
for await (const chunk of transformedStream)
console.log(chunk);
const {
ReadableStream,
TransformStream,
} = require('node:stream/web');
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
const transformedStream = stream.pipeThrough(transform);
(async () => {
for await (const chunk of transformedStream)
console.log(chunk);
})();
readableStream.pipeTo(destination, options)
#
destination
<WritableStream> A <WritableStream> to which thisReadableStream
's data will be written.options
<Object>preventAbort
<boolean> Whentrue
, errors in thisReadableStream
will not causedestination
to be aborted.preventCancel
<boolean> Whentrue
, errors in thedestination
will not cause thisReadableStream
to be canceled.preventClose
<boolean> Whentrue
, closing thisReadableStream
does not causedestination
to be closed.signal
<AbortSignal> Allows the transfer of data to be canceled using an <AbortController>.
- 返回: A promise fulfilled with
undefined
Causes the readableStream.locked
to be true
while the pipe operation
is active.
readableStream.tee()
#
Returns a pair of new <ReadableStream> instances to which this
ReadableStream
's data will be forwarded. Each will receive the
same data.
Causes the readableStream.locked
to be true
.
readableStream.values([options])
#
options
<Object>preventCancel
<boolean> Whentrue
, prevents the <ReadableStream> from being closed when the async iterator abruptly terminates. 默认值:false
.
Creates and returns an async iterator usable for consuming this
ReadableStream
's data.
Causes the readableStream.locked
to be true
while the async iterator
is active.
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream.values({ preventCancel: true }))
console.log(Buffer.from(chunk).toString());
异步迭代#
The <ReadableStream> object supports the async iterator protocol using
for await
syntax.
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream)
console.log(Buffer.from(chunk).toString());
The async iterator will consume the <ReadableStream> until it terminates.
By default, if the async iterator exits early (via either a break
,
return
, or a throw
), the <ReadableStream> will be closed. To prevent
automatic closing of the <ReadableStream>, use the readableStream.values()
method to acquire the async iterator and set the preventCancel
option to
true
.
The <ReadableStream> must not be locked (that is, it must not have an existing active reader). During the async iteration, the <ReadableStream> will be locked.
使用 postMessage() 传输#
A <ReadableStream> instance can be transferred using a <MessagePort>.
const stream = new ReadableStream(getReadableSourceSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getReader().read().then((chunk) => {
console.log(chunk);
});
};
port2.postMessage(stream, [stream]);
ReadableStreamDefaultReader
类#
By default, calling readableStream.getReader()
with no arguments
will return an instance of ReadableStreamDefaultReader
. The default
reader treats the chunks of data passed through the stream as opaque
values, which allows the <ReadableStream> to work with generally any
JavaScript value.
new ReadableStreamDefaultReader(stream)
#
stream
<ReadableStream>
Creates a new <ReadableStreamDefaultReader> that is locked to the given <ReadableStream>.
readableStreamDefaultReader.cancel([reason])
#
reason
<any>- 返回: A promise fulfilled with
undefined
.
Cancels the <ReadableStream> and returns a promise that is fulfilled when the underlying stream has been canceled.
readableStreamDefaultReader.closed
#
- 类型: <Promise> Fulfilled with
undefined
when the associated <ReadableStream> is closed or rejected if the stream errors or the reader's lock is released before the stream finishes closing.
readableStreamDefaultReader.read()
#
- 返回: A promise fulfilled with an object:
value
<ArrayBuffer>done
<boolean>
Requests the next chunk of data from the underlying <ReadableStream> and returns a promise that is fulfilled with the data once it is available.
readableStreamDefaultReader.releaseLock()
#
Releases this reader's lock on the underlying <ReadableStream>.
ReadableStreamBYOBReader
类#
The ReadableStreamBYOBReader
is an alternative consumer for
byte-oriented <ReadableStream>s (those that are created with
underlyingSource.type
set equal to 'bytes'
when the
ReadableStream
was created).
The BYOB
is short for "bring your own buffer". This is a
pattern that allows for more efficient reading of byte-oriented
data that avoids extraneous copying.
import {
open
} from 'node:fs/promises';
import {
ReadableStream
} from 'node:stream/web';
import { Buffer } from 'node:buffer';
class Source {
type = 'bytes';
autoAllocateChunkSize = 1024;
async start(controller) {
this.file = await open(new URL(import.meta.url));
this.controller = controller;
}
async pull(controller) {
const view = controller.byobRequest?.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
controller.byobRequest.respond(bytesRead);
}
}
const stream = new ReadableStream(new Source());
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
const data = await read(stream);
console.log(Buffer.from(data).toString());
new ReadableStreamBYOBReader(stream)
#
stream
<ReadableStream>
Creates a new ReadableStreamBYOBReader
that is locked to the
given <ReadableStream>.
readableStreamBYOBReader.cancel([reason])
#
reason
<any>- 返回: A promise fulfilled with
undefined
.
Cancels the <ReadableStream> and returns a promise that is fulfilled when the underlying stream has been canceled.
readableStreamBYOBReader.closed
#
- 类型: <Promise> Fulfilled with
undefined
when the associated <ReadableStream> is closed or rejected if the stream errors or the reader's lock is released before the stream finishes closing.
readableStreamBYOBReader.read(view)
#
view
<Buffer> | <TypedArray> | <DataView>- 返回: A promise fulfilled with an object:
value
<ArrayBuffer>done
<boolean>
Requests the next chunk of data from the underlying <ReadableStream> and returns a promise that is fulfilled with the data once it is available.
Do not pass a pooled <Buffer> object instance in to this method.
Pooled Buffer
objects are created using Buffer.allocUnsafe()
,
or Buffer.from()
, or are often returned by various node:fs
module
callbacks. These types of Buffer
s use a shared underlying
<ArrayBuffer> object that contains all of the data from all of
the pooled Buffer
instances. When a Buffer
, <TypedArray>,
or <DataView> is passed in to readableStreamBYOBReader.read()
,
the view's underlying ArrayBuffer
is detached, invalidating
all existing views that may exist on that ArrayBuffer
. This
can have disastrous consequences for your application.
readableStreamBYOBReader.releaseLock()
#
Releases this reader's lock on the underlying <ReadableStream>.
ReadableStreamDefaultController
类#
Every <ReadableStream> has a controller that is responsible for
the internal state and management of the stream's queue. The
ReadableStreamDefaultController
is the default controller
implementation for ReadableStream
s that are not byte-oriented.
readableStreamDefaultController.close()
#
Closes the <ReadableStream> to which this controller is associated.
readableStreamDefaultController.desiredSize
#
- 类型: <number>
Returns the amount of data remaining to fill the <ReadableStream>'s queue.
readableStreamDefaultController.enqueue(chunk)
#
chunk
<any>
Appends a new chunk of data to the <ReadableStream>'s queue.
readableStreamDefaultController.error(error)
#
error
<any>
Signals an error that causes the <ReadableStream> to error and close.
ReadableByteStreamController
类#
Every <ReadableStream> has a controller that is responsible for
the internal state and management of the stream's queue. The
ReadableByteStreamController
is for byte-oriented ReadableStream
s.
readableByteStreamController.byobRequest
#
readableByteStreamController.close()
#
Closes the <ReadableStream> to which this controller is associated.
readableByteStreamController.desiredSize
#
- 类型: <number>
Returns the amount of data remaining to fill the <ReadableStream>'s queue.
readableByteStreamController.enqueue(chunk)
#
chunk
: <Buffer> | <TypedArray> | <DataView>
Appends a new chunk of data to the <ReadableStream>'s queue.
readableByteStreamController.error(error)
#
error
<any>
Signals an error that causes the <ReadableStream> to error and close.
ReadableStreamBYOBRequest
类#
When using ReadableByteStreamController
in byte-oriented
streams, and when using the ReadableStreamBYOBReader
,
the readableByteStreamController.byobRequest
property
provides access to a ReadableStreamBYOBRequest
instance
that represents the current read request. The object
is used to gain access to the ArrayBuffer
/TypedArray
that has been provided for the read request to fill,
and provides methods for signaling that the data has
been provided.
readableStreamBYOBRequest.respond(bytesWritten)
#
bytesWritten
<number>
Signals that a bytesWritten
number of bytes have been written
to readableStreamBYOBRequest.view
.
readableStreamBYOBRequest.respondWithNewView(view)
#
view
<Buffer> | <TypedArray> | <DataView>
Signals that the request has been fulfilled with bytes written
to a new Buffer
, TypedArray
, or DataView
.
readableStreamBYOBRequest.view
#
- 类型: <Buffer> | <TypedArray> | <DataView>
WritableStream
类#
The WritableStream
is a destination to which stream data is sent.
import {
WritableStream
} from 'node:stream/web';
const stream = new WritableStream({
write(chunk) {
console.log(chunk);
}
});
await stream.getWriter().write('Hello World');
new WritableStream([underlyingSink[, strategy]])
#
underlyingSink
<Object>start
<Function> A user-defined function that is invoked immediately when theWritableStream
is created.controller
<WritableStreamDefaultController>- 返回:
undefined
or a promise fulfilled withundefined
.
write
<Function> A user-defined function that is invoked when a chunk of data has been written to theWritableStream
.chunk
<any>controller
<WritableStreamDefaultController>- 返回: A promise fulfilled with
undefined
.
close
<Function> A user-defined function that is called when theWritableStream
is closed.- 返回: A promise fulfilled with
undefined
.
- 返回: A promise fulfilled with
abort
<Function> A user-defined function that is called to abruptly close theWritableStream
.reason
<any>- 返回: A promise fulfilled with
undefined
.
type
<any> Thetype
option is reserved for future use and must be undefined.
strategy
<Object>highWaterMark
<number> The maximum internal queue size before backpressure is applied.size
<Function> A user-defined function used to identify the size of each chunk of data.
writableStream.abort([reason])
#
reason
<any>- 返回: A promise fulfilled with
undefined
.
Abruptly terminates the WritableStream
. All queued writes will be
canceled with their associated promises rejected.
writableStream.close()
#
- 返回: A promise fulfilled with
undefined
.
Closes the WritableStream
when no additional writes are expected.
writableStream.getWriter()
#
Creates and creates a new writer instance that can be used to write
data into the WritableStream
.
writableStream.locked
#
- 类型: <boolean>
The writableStream.locked
property is false
by default, and is
switched to true
while there is an active writer attached to this
WritableStream
.
使用 postMessage() 传输#
A <WritableStream> instance can be transferred using a <MessagePort>.
const stream = new WritableStream(getWritableSinkSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getWriter().write('hello');
};
port2.postMessage(stream, [stream]);
WritableStreamDefaultWriter
类#
new WritableStreamDefaultWriter(stream)
#
stream
<WritableStream>
Creates a new WritableStreamDefaultWriter
that is locked to the given
WritableStream
.
writableStreamDefaultWriter.abort([reason])
#
reason
<any>- 返回: A promise fulfilled with
undefined
.
Abruptly terminates the WritableStream
. All queued writes will be
canceled with their associated promises rejected.
writableStreamDefaultWriter.close()
#
- 返回: A promise fulfilled with
undefined
.
Closes the WritableStream
when no additional writes are expected.
writableStreamDefaultWriter.closed
#
- 类型: <Promise> Fulfilled with
undefined
when the associated <WritableStream> is closed or rejected if the stream errors or the writer's lock is released before the stream finishes closing.
writableStreamDefaultWriter.desiredSize
#
- 类型: <number>
The amount of data required to fill the <WritableStream>'s queue.
writableStreamDefaultWriter.ready
#
- type: A promise that is fulfilled with
undefined
when the writer is ready to be used.
writableStreamDefaultWriter.releaseLock()
#
Releases this writer's lock on the underlying <ReadableStream>.
writableStreamDefaultWriter.write([chunk])
#
chunk
: <any>- 返回: A promise fulfilled with
undefined
.
Appends a new chunk of data to the <WritableStream>'s queue.
WritableStreamDefaultController
类#
The WritableStreamDefaultController
manage's the <WritableStream>'s
internal state.
writableStreamDefaultController.abortReason
#
- 类型: <any> The
reason
value passed towritableStream.abort()
.
writableStreamDefaultController.error(error)
#
error
<any>
Called by user-code to signal that an error has occurred while processing
the WritableStream
data. When called, the <WritableStream> will be aborted,
with currently pending writes canceled.
writableStreamDefaultController.signal
#
- 类型: <AbortSignal> An
AbortSignal
that can be used to cancel pending write or close operations when a <WritableStream> is aborted.
TransformStream
类#
A TransformStream
consists of a <ReadableStream> and a <WritableStream> that
are connected such that the data written to the WritableStream
is received,
and potentially transformed, before being pushed into the ReadableStream
's
queue.
import {
TransformStream
} from 'node:stream/web';
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
await Promise.all([
transform.writable.getWriter().write('A'),
transform.readable.getReader().read(),
]);
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])
#
transformer
<Object>start
<Function> A user-defined function that is invoked immediately when theTransformStream
is created.controller
<TransformStreamDefaultController>- 返回:
undefined
or a promise fulfilled withundefined
transform
<Function> A user-defined function that receives, and potentially modifies, a chunk of data written totransformStream.writable
, before forwarding that on totransformStream.readable
.chunk
<any>controller
<TransformStreamDefaultController>- 返回: A promise fulfilled with
undefined
.
flush
<Function> A user-defined function that is called immediately before the writable side of theTransformStream
is closed, signaling the end of the transformation process.controller
<TransformStreamDefaultController>- 返回: A promise fulfilled with
undefined
.
readableType
<any> thereadableType
option is reserved for future use and must beundefined
.writableType
<any> thewritableType
option is reserved for future use and must beundefined
.
writableStrategy
<Object>highWaterMark
<number> The maximum internal queue size before backpressure is applied.size
<Function> A user-defined function used to identify the size of each chunk of data.
readableStrategy
<Object>highWaterMark
<number> The maximum internal queue size before backpressure is applied.size
<Function> A user-defined function used to identify the size of each chunk of data.
transformStream.readable
#
- 类型: <ReadableStream>
transformStream.writable
#
- 类型: <WritableStream>
使用 postMessage() 传输#
A <TransformStream> instance can be transferred using a <MessagePort>.
const stream = new TransformStream();
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
const { writable, readable } = data;
// ...
};
port2.postMessage(stream, [stream]);
TransformStreamDefaultController
类#
The TransformStreamDefaultController
manages the internal state
of the TransformStream
.
transformStreamDefaultController.desiredSize
#
- 类型: <number>
The amount of data required to fill the readable side's queue.
transformStreamDefaultController.enqueue([chunk])
#
chunk
<any>
Appends a chunk of data to the readable side's queue.
transformStreamDefaultController.error([reason])
#
reason
<any>
Signals to both the readable and writable side that an error has occurred while processing the transform data, causing both sides to be abruptly closed.
transformStreamDefaultController.terminate()
#
Closes the readable side of the transport and causes the writable side to be abruptly closed with an error.
ByteLengthQueuingStrategy
类#
new ByteLengthQueuingStrategy(options)
#
byteLengthQueuingStrategy.highWaterMark
#
- 类型: <number>
byteLengthQueuingStrategy.size
#
- 类型: <Function>
CountQueuingStrategy
类#
new CountQueuingStrategy(options)
#
countQueuingStrategy.highWaterMark
#
- 类型: <number>
countQueuingStrategy.size
#
- 类型: <Function>
TextEncoderStream
类#
new TextEncoderStream()
#
Creates a new TextEncoderStream
instance.
textEncoderStream.encoding
#
- 类型: <string>
The encoding supported by the TextEncoderStream
instance.
textEncoderStream.readable
#
- 类型: <ReadableStream>
textEncoderStream.writable
#
- 类型: <WritableStream>
TextDecoderStream
类#
new TextDecoderStream([encoding[, options]])
#
encoding
<string> Identifies theencoding
that thisTextDecoder
instance supports. 默认值:'utf-8'
。options
<Object>fatal
<boolean>true
if decoding failures are fatal.ignoreBOM
<boolean> Whentrue
, theTextDecoderStream
will include the byte order mark in the decoded result. Whenfalse
, the byte order mark will be removed from the output. This option is only used whenencoding
is'utf-8'
,'utf-16be'
, or'utf-16le'
. 默认值:false
。
Creates a new TextDecoderStream
instance.
textDecoderStream.encoding
#
- 类型: <string>
The encoding supported by the TextDecoderStream
instance.
textDecoderStream.fatal
#
- 类型: <boolean>
The value will be true
if decoding errors result in a TypeError
being
thrown.
textDecoderStream.ignoreBOM
#
- 类型: <boolean>
The value will be true
if the decoding result will include the byte order
mark.
textDecoderStream.readable
#
- 类型: <ReadableStream>
textDecoderStream.writable
#
- 类型: <WritableStream>
实用消费者#
The utility consumer functions provide common options for consuming streams.
They are accessed using:
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';
const {
arrayBuffer,
blob,
buffer,
json,
text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)
#
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回: <Promise> Fulfills with an
ArrayBuffer
containing the full contents of the stream.
import { buffer as arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('stream');
const { TextEncoder } = require('util');
const encoder = new TextEncoder();
const dataArray = encoder.encode(['hello world from consumers!']);
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
console.log(`from readable: ${data.byteLength}`);
});
streamConsumers.blob(stream)
#
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回: <Promise> Fulfills with a <Blob> containing the full contents of the stream.
import { blob } from 'node:stream/consumers';
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
const { blob } = require('node:stream/consumers');
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
blob(readable).then((data) => {
console.log(`from readable: ${data.size}`);
});
streamConsumers.buffer(stream)
#
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回: <Promise> Fulfills with a <Buffer> containing the full contents of the stream.
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
console.log(`from readable: ${data.length}`);
});
streamConsumers.json(stream)
#
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回: <Promise> Fulfills with the contents of the stream parsed as a
UTF-8 encoded string that is then passed through
JSON.parse()
.
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const items = Array.from(
{
length: 100
},
() => ({
message: 'hello world from consumers!'
})
);
const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const items = Array.from(
{
length: 100
},
() => ({
message: 'hello world from consumers!'
})
);
const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
console.log(`from readable: ${data.length}`);
});
streamConsumers.text(stream)
#
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回: <Promise> Fulfills with the contents of the stream parsed as a UTF-8 encoded string.
import { json, text, blob, buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
console.log(`from readable: ${data.length}`);
});