- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- Corepack 核心包
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS模块
- module/esm ECMAScript模块
- module/package 包模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- sea 单个可执行应用程序
- stream 流
- stream/web 网络流
- string_decoder 字符串解码器
- test 测试
- timers 定时器
- tls 安全传输层
- trace_events 跟踪事件
- tty 终端
- url 网址
- util 实用工具
- v8 引擎
- vm 虚拟机
- wasi 网络汇编系统接口
- worker_threads 工作线程
- zlib 压缩
Node.js v20.7.0 文档
- Node.js v20.7.0
-
►
目录
- 网络流 API
- 概述
- API
- 类:
ReadableStream
new ReadableStream([underlyingSource [, strategy]])
readableStream.locked
readableStream.cancel([reason])
readableStream.getReader([options])
readableStream.pipeThrough(transform[, options])
readableStream.pipeTo(destination[, options])
readableStream.tee()
readableStream.values([options])
- 异步迭代
- 与
postMessage()
一起转移
ReadableStream.from(iterable)
- 类:
ReadableStreamDefaultReader
- 类:
ReadableStreamBYOBReader
- 类:
ReadableStreamDefaultController
- 类:
ReadableByteStreamController
- 类:
ReadableStreamBYOBRequest
- 类:
WritableStream
- 类:
WritableStreamDefaultWriter
new WritableStreamDefaultWriter(stream)
writableStreamDefaultWriter.abort([reason])
writableStreamDefaultWriter.close()
writableStreamDefaultWriter.closed
writableStreamDefaultWriter.desiredSize
writableStreamDefaultWriter.ready
writableStreamDefaultWriter.releaseLock()
writableStreamDefaultWriter.write([chunk])
- 类:
WritableStreamDefaultController
- 类:
TransformStream
- 类:
TransformStreamDefaultController
- 类:
ByteLengthQueuingStrategy
- 类:
CountQueuingStrategy
- 类:
TextEncoderStream
- 类:
TextDecoderStream
- 类:
CompressionStream
- 类:
DecompressionStream
- 实用工具消费者
- 类:
- 网络流 API
-
►
导航
- assert 断言
- async_hooks 异步钩子
- async_hooks/context 异步上下文
- buffer 缓冲区
- C++插件
- C/C++插件(使用Node-API)
- C++嵌入器
- child_process 子进程
- cluster 集群
- CLI 命令行
- console 控制台
- Corepack 核心包
- crypto 加密
- crypto/webcrypto 网络加密
- debugger 调试器
- deprecation 弃用
- dgram 数据报
- diagnostics_channel 诊断通道
- dns 域名服务器
- domain 域
- Error 错误
- events 事件触发器
- fs 文件系统
- global 全局变量
- http 超文本传输协议
- http2 超文本传输协议2.0
- https 安全超文本传输协议
- inspector 检查器
- Intl 国际化
- module 模块
- module/cjs CommonJS模块
- module/esm ECMAScript模块
- module/package 包模块
- net 网络
- os 操作系统
- path 路径
- perf_hooks 性能钩子
- permission 权限
- process 进程
- punycode 域名代码
- querystring 查询字符串
- readline 逐行读取
- repl 交互式解释器
- report 诊断报告
- sea 单个可执行应用程序
- stream 流
- stream/web 网络流
- string_decoder 字符串解码器
- test 测试
- timers 定时器
- tls 安全传输层
- trace_events 跟踪事件
- tty 终端
- url 网址
- util 实用工具
- v8 引擎
- vm 虚拟机
- wasi 网络汇编系统接口
- worker_threads 工作线程
- zlib 压缩
- ► 其他版本
目录
- 网络流 API
- 概述
- API
- 类:
ReadableStream
new ReadableStream([underlyingSource [, strategy]])
readableStream.locked
readableStream.cancel([reason])
readableStream.getReader([options])
readableStream.pipeThrough(transform[, options])
readableStream.pipeTo(destination[, options])
readableStream.tee()
readableStream.values([options])
- 异步迭代
- 与
postMessage()
一起转移
ReadableStream.from(iterable)
- 类:
ReadableStreamDefaultReader
- 类:
ReadableStreamBYOBReader
- 类:
ReadableStreamDefaultController
- 类:
ReadableByteStreamController
- 类:
ReadableStreamBYOBRequest
- 类:
WritableStream
- 类:
WritableStreamDefaultWriter
new WritableStreamDefaultWriter(stream)
writableStreamDefaultWriter.abort([reason])
writableStreamDefaultWriter.close()
writableStreamDefaultWriter.closed
writableStreamDefaultWriter.desiredSize
writableStreamDefaultWriter.ready
writableStreamDefaultWriter.releaseLock()
writableStreamDefaultWriter.write([chunk])
- 类:
WritableStreamDefaultController
- 类:
TransformStream
- 类:
TransformStreamDefaultController
- 类:
ByteLengthQueuingStrategy
- 类:
CountQueuingStrategy
- 类:
TextEncoderStream
- 类:
TextDecoderStream
- 类:
CompressionStream
- 类:
DecompressionStream
- 实用工具消费者
- 类:
网络流 API#
WHATWG 流标准 的一个实现。
概述#
WHATWG 流标准(或 "网络流")定义了一个用于处理流数据的 API。 它类似于 Node.js 流 API,但出现较晚,已成为跨许多 JavaScript 环境流式传输数据的 "standard" API。
存在三种主要类型的对象:
ReadableStream
- 表示流式数据源。WritableStream
- 表示流数据的目的地。TransformStream
- 表示用于转换流数据的算法。
示例 ReadableStream
#
此示例创建一个简单的 ReadableStream
,它永远每秒推送一次当前的 performance.now()
时间戳。 异步迭代器用于从流中读取数据。
import {
ReadableStream,
} from 'node:stream/web';
import {
setInterval as every,
} from 'node:timers/promises';
import {
performance,
} from 'node:perf_hooks';
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
for await (const value of stream)
console.log(value);
const {
ReadableStream,
} = require('node:stream/web');
const {
setInterval: every,
} = require('node:timers/promises');
const {
performance,
} = require('node:perf_hooks');
const SECOND = 1000;
const stream = new ReadableStream({
async start(controller) {
for await (const _ of every(SECOND))
controller.enqueue(performance.now());
},
});
(async () => {
for await (const value of stream)
console.log(value);
})();
API#
类:ReadableStream
#
new ReadableStream([underlyingSource [, strategy]])
#
underlyingSource
<Object>start
<Function> 创建ReadableStream
时立即调用的用户定义函数。controller
<ReadableStreamDefaultController> | <ReadableByteStreamController>- 返回:
undefined
或用undefined
实现的 promise。
pull
<Function> 当ReadableStream
内部队列未满时重复调用的用户定义函数。 该操作可以是同步的或异步的。 如果是异步的,该函数将不会被再次调用,直到先前返回的 promise 得到满足。controller
<ReadableStreamDefaultController> | <ReadableByteStreamController>- 返回:
undefined
兑现的 promise。
cancel
<Function> 取消ReadableStream
时调用的用户定义函数。reason
<any>- 返回:
undefined
兑现的 promise。
type
<string> 必须是'bytes'
或undefined
。autoAllocateChunkSize
<number> 仅在type
等于'bytes'
时使用。 当设置为非零值时,视图缓冲区会自动分配给ReadableByteStreamController.byobRequest
。 如果未设置,则必须使用流的内部队列通过默认读取器ReadableStreamDefaultReader
传输数据。
strategy
<Object>highWaterMark
<number> 应用背压之前的最大内部队列大小。size
<Function> 一个用户定义的函数,用于标识每个数据块的大小。
readableStream.locked
#
- 类型: <boolean> 如果此 <ReadableStream> 有活动的阅读器,则设置为
true
。
readableStream.locked
属性默认为 false
,当有活动读取器使用流的数据时切换为 true
。
readableStream.cancel([reason])
#
reason
<any>- 返回: 一旦取消完成,
undefined
就会兑现 promise。
readableStream.getReader([options])
#
options
<Object>mode
<string>'byob'
或undefined
- 返回: <ReadableStreamDefaultReader> | <ReadableStreamBYOBReader>
import { ReadableStream } from 'node:stream/web';
const stream = new ReadableStream();
const reader = stream.getReader();
console.log(await reader.read());
const { ReadableStream } = require('node:stream/web');
const stream = new ReadableStream();
const reader = stream.getReader();
reader.read().then(console.log);
使 readableStream.locked
成为 true
。
readableStream.pipeThrough(transform[, options])
#
transform
<Object>readable
<ReadableStream>transform.writable
将向其推送可能修改的数据的ReadableStream
是从该ReadableStream
接收的。writable
<WritableStream>ReadableStream
的数据将写入的WritableStream
。
options
<Object>preventAbort
<boolean> 当true
时,此ReadableStream
中的错误不会导致transform.writable
中止。preventCancel
<boolean> 当true
时,目标transform.writable
中的错误不会导致此ReadableStream
被取消。preventClose
<boolean>true
时,关闭这个ReadableStream
不会导致transform.writable
关闭。signal
<AbortSignal> 允许使用 <AbortController> 取消数据传输。
- 返回: <ReadableStream> 来自
transform.readable
。
将此 <ReadableStream> 连接到 transform
参数中提供的一对 <ReadableStream> 和 <WritableStream>,以便将来自此 <ReadableStream> 的数据写入 transform.writable
,可能进行转换,然后推送到 transform.readable
。 配置管道后,将返回 transform.readable
。
当管道操作处于活动状态时,使 readableStream.locked
变为 true
。
import {
ReadableStream,
TransformStream,
} from 'node:stream/web';
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
for await (const chunk of transformedStream)
console.log(chunk);
// Prints: A
const {
ReadableStream,
TransformStream,
} = require('node:stream/web');
const stream = new ReadableStream({
start(controller) {
controller.enqueue('a');
},
});
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
const transformedStream = stream.pipeThrough(transform);
(async () => {
for await (const chunk of transformedStream)
console.log(chunk);
// Prints: A
})();
readableStream.pipeTo(destination[, options])
#
destination
<WritableStream> 将写入此ReadableStream
的数据的 <WritableStream>。options
<Object>preventAbort
<boolean> 当true
时,此ReadableStream
中的错误不会导致destination
中止。preventCancel
<boolean> 当true
时,destination
中的错误不会导致此ReadableStream
被取消。preventClose
<boolean>true
时,关闭这个ReadableStream
不会导致destination
关闭。signal
<AbortSignal> 允许使用 <AbortController> 取消数据传输。
- 返回:
undefined
兑现的 promise
当管道操作处于活动状态时,使 readableStream.locked
变为 true
。
readableStream.tee()
#
返回一对新的 <ReadableStream> 实例,此 ReadableStream
的数据将转发到该实例。 每个人都会收到相同的数据。
使 readableStream.locked
成为 true
。
readableStream.values([options])
#
options
<Object>preventCancel
<boolean> 当true
时,防止在异步迭代器突然终止时关闭 <ReadableStream>。 默认:false
。
创建并返回可用于使用此 ReadableStream
的数据的异步迭代器。
当异步迭代器处于活动状态时,使 readableStream.locked
成为 true
。
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream.values({ preventCancel: true }))
console.log(Buffer.from(chunk).toString());
异步迭代#
<ReadableStream> 对象支持使用 for await
语法的异步迭代器协议。
import { Buffer } from 'node:buffer';
const stream = new ReadableStream(getSomeSource());
for await (const chunk of stream)
console.log(Buffer.from(chunk).toString());
异步迭代器将消耗 <ReadableStream> 直到它终止。
默认情况下,如果异步迭代器提前退出(通过 break
、return
或 throw
),<ReadableStream> 将被关闭。 为防止 <ReadableStream> 自动关闭,使用 readableStream.values()
方法获取异步迭代器并将 preventCancel
选项设置为 true
。
<ReadableStream> 不得锁定(即,它不得有现有的活动阅读器)。 在异步迭代期间,<ReadableStream> 将被锁定。
与 postMessage()
一起转移#
可以使用 <MessagePort> 传输 <ReadableStream> 实例。
const stream = new ReadableStream(getReadableSourceSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getReader().read().then((chunk) => {
console.log(chunk);
});
};
port2.postMessage(stream, [stream]);
ReadableStream.from(iterable)
#
iterable
<Iterable> 实现Symbol.asyncIterator
或Symbol.iterator
可迭代协议的对象。
一种从可迭代对象创建新 <ReadableStream> 的实用方法。
import { ReadableStream } from 'node:stream/web';
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // Prints: 'a', 'b', 'c'
const { ReadableStream } = require('node:stream/web');
async function* asyncIterableGenerator() {
yield 'a';
yield 'b';
yield 'c';
}
(async () => {
const stream = ReadableStream.from(asyncIterableGenerator());
for await (const chunk of stream)
console.log(chunk); // Prints: 'a', 'b', 'c'
})();
类:ReadableStreamDefaultReader
#
默认情况下,不带参数调用 readableStream.getReader()
将返回 ReadableStreamDefaultReader
的实例。 默认读取器将通过流传递的数据块视为不透明值,这允许 <ReadableStream> 通常使用任何 JavaScript 值。
new ReadableStreamDefaultReader(stream)
#
stream
<ReadableStream>
创建锁定到给定 <ReadableStream> 的新 <ReadableStreamDefaultReader>。
readableStreamDefaultReader.cancel([reason])
#
reason
<any>- 返回:
undefined
兑现的 promise。
取消 <ReadableStream> 并返回一个在基础流被取消时完成的 promise。
readableStreamDefaultReader.closed
#
- 类型: <Promise> 当关联的 <ReadableStream> 关闭时用
undefined
完成,或者如果流错误或读取器的锁在流完成关闭之前被释放则被拒绝。
readableStreamDefaultReader.read()
#
- 返回: 用对象实现的 promise:
value
<ArrayBuffer>done
<boolean>
从底层 <ReadableStream> 请求下一个数据块,并返回一个 promise,一旦数据可用,该 promise 就会实现。
readableStreamDefaultReader.releaseLock()
#
释放此读者对底层 <ReadableStream> 的锁定。
类:ReadableStreamBYOBReader
#
ReadableStreamBYOBReader
是面向字节的 <ReadableStream> 的替代消费者(在创建 ReadableStream
时将 underlyingSource.type
设置为 'bytes'
)。
BYOB
是 "带上你自己的缓冲区" 的缩写。 这是一种模式,可以更有效地读取面向字节的数据,避免多余的复制。
import {
open,
} from 'node:fs/promises';
import {
ReadableStream,
} from 'node:stream/web';
import { Buffer } from 'node:buffer';
class Source {
type = 'bytes';
autoAllocateChunkSize = 1024;
async start(controller) {
this.file = await open(new URL(import.meta.url));
this.controller = controller;
}
async pull(controller) {
const view = controller.byobRequest?.view;
const {
bytesRead,
} = await this.file.read({
buffer: view,
offset: view.byteOffset,
length: view.byteLength,
});
if (bytesRead === 0) {
await this.file.close();
this.controller.close();
}
controller.byobRequest.respond(bytesRead);
}
}
const stream = new ReadableStream(new Source());
async function read(stream) {
const reader = stream.getReader({ mode: 'byob' });
const chunks = [];
let result;
do {
result = await reader.read(Buffer.alloc(100));
if (result.value !== undefined)
chunks.push(Buffer.from(result.value));
} while (!result.done);
return Buffer.concat(chunks);
}
const data = await read(stream);
console.log(Buffer.from(data).toString());
new ReadableStreamBYOBReader(stream)
#
stream
<ReadableStream>
创建锁定到给定 <ReadableStream> 的新 ReadableStreamBYOBReader
。
readableStreamBYOBReader.cancel([reason])
#
reason
<any>- 返回:
undefined
兑现的 promise。
取消 <ReadableStream> 并返回一个在基础流被取消时完成的 promise。
readableStreamBYOBReader.closed
#
- 类型: <Promise> 当关联的 <ReadableStream> 关闭时用
undefined
完成,或者如果流错误或读取器的锁在流完成关闭之前被释放则被拒绝。
readableStreamBYOBReader.read(view)
#
view
<Buffer> | <TypedArray> | <DataView>- 返回: 用对象实现的 promise:
value
<ArrayBuffer>done
<boolean>
从底层 <ReadableStream> 请求下一个数据块,并返回一个 promise,一旦数据可用,该 promise 就会实现。
不要将池化的 <Buffer> 对象实例传递给此方法。
池化的 Buffer
对象是使用 Buffer.allocUnsafe()
或 Buffer.from()
创建的,或者通常由各种 node:fs
模块回调返回。 这些类型的 Buffer
使用共享的底层 <ArrayBuffer> 对象,该对象包含来自所有池化的 Buffer
实例的所有数据。 当 Buffer
、<TypedArray> 或 <DataView> 传递给 readableStreamBYOBReader.read()
时,视图的底层 ArrayBuffer
被分离,使该 ArrayBuffer
上可能存在的所有现有视图无效。 这可能会给你的应用带来灾难性的后果。
readableStreamBYOBReader.releaseLock()
#
释放此读者对底层 <ReadableStream> 的锁定。
类:ReadableStreamDefaultController
#
每个 <ReadableStream> 都有一个控制器,负责流队列的内部状态和管理。 ReadableStreamDefaultController
是 ReadableStream
的默认控制器实现,它不是面向字节的。
readableStreamDefaultController.close()
#
关闭与此控制器关联的 <ReadableStream>。
readableStreamDefaultController.desiredSize
#
- 类型: <number>
返回填充 <ReadableStream> 队列的剩余数据量。
readableStreamDefaultController.enqueue([chunk])
#
chunk
<any>
将新数据块附加到 <ReadableStream> 的队列。
readableStreamDefaultController.error([error])
#
error
<any>
触发导致 <ReadableStream> 出错并关闭的错误信号。
类:ReadableByteStreamController
#
每个 <ReadableStream> 都有一个控制器,负责流队列的内部状态和管理。 ReadableByteStreamController
用于面向字节的 ReadableStream
。
readableByteStreamController.byobRequest
#
readableByteStreamController.close()
#
关闭与此控制器关联的 <ReadableStream>。
readableByteStreamController.desiredSize
#
- 类型: <number>
返回填充 <ReadableStream> 队列的剩余数据量。
readableByteStreamController.enqueue(chunk)
#
chunk
: <Buffer> | <TypedArray> | <DataView>
将新数据块附加到 <ReadableStream> 的队列。
readableByteStreamController.error([error])
#
error
<any>
触发导致 <ReadableStream> 出错并关闭的错误信号。
类:ReadableStreamBYOBRequest
#
在面向字节的流中使用 ReadableByteStreamController
和使用 ReadableStreamBYOBReader
时,readableByteStreamController.byobRequest
属性提供对表示当前读取请求的 ReadableStreamBYOBRequest
实例的访问。 该对象用于获取为读取请求填充而提供的 ArrayBuffer
/TypedArray
的访问权限,并提供用于触发数据已提供信号的方法。
readableStreamBYOBRequest.respond(bytesWritten)
#
bytesWritten
<number>
表示已将 bytesWritten
个字节写入 readableStreamBYOBRequest.view
。
readableStreamBYOBRequest.respondWithNewView(view)
#
view
<Buffer> | <TypedArray> | <DataView>
表示请求已通过写入新的 Buffer
、TypedArray
或 DataView
的字节得到满足。
readableStreamBYOBRequest.view
#
- 类型: <Buffer> | <TypedArray> | <DataView>
类:WritableStream
#
WritableStream
是流数据发送到的目的地。
import {
WritableStream,
} from 'node:stream/web';
const stream = new WritableStream({
write(chunk) {
console.log(chunk);
},
});
await stream.getWriter().write('Hello World');
new WritableStream([underlyingSink[, strategy]])
#
underlyingSink
<Object>start
<Function> 创建WritableStream
时立即调用的用户定义函数。controller
<WritableStreamDefaultController>- 返回:
undefined
或用undefined
实现的 promise。
write
<Function> 将数据块写入WritableStream
时调用的用户定义函数。chunk
<any>controller
<WritableStreamDefaultController>- 返回:
undefined
兑现的 promise。
close
<Function>WritableStream
关闭时调用的用户定义函数。- 返回:
undefined
兑现的 promise。
- 返回:
abort
<Function> 调用以突然关闭WritableStream
的用户定义函数。reason
<any>- 返回:
undefined
兑现的 promise。
type
<any>type
选项保留供将来使用,必须未定义。
strategy
<Object>highWaterMark
<number> 应用背压之前的最大内部队列大小。size
<Function> 一个用户定义的函数,用于标识每个数据块的大小。
writableStream.abort([reason])
#
reason
<any>- 返回:
undefined
兑现的 promise。
突然终止 WritableStream
。 所有排队的写入都将被取消,并拒绝相关的 promise。
writableStream.close()
#
- 返回:
undefined
兑现的 promise。
当不需要额外写入时关闭 WritableStream
。
writableStream.getWriter()
#
创建并返回一个新的写入器实例,可用于将数据写入 WritableStream
。
writableStream.locked
#
- 类型: <boolean>
writableStream.locked
属性默认为 false
,当有活动写入器附加到此 WritableStream
时切换为 true
。
使用 postMessage() 传输#
可以使用 <MessagePort> 传输 <WritableStream> 实例。
const stream = new WritableStream(getWritableSinkSomehow());
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
data.getWriter().write('hello');
};
port2.postMessage(stream, [stream]);
类:WritableStreamDefaultWriter
#
new WritableStreamDefaultWriter(stream)
#
stream
<WritableStream>
创建锁定到给定 WritableStream
的新 WritableStreamDefaultWriter
。
writableStreamDefaultWriter.abort([reason])
#
reason
<any>- 返回:
undefined
兑现的 promise。
突然终止 WritableStream
。 所有排队的写入都将被取消,并拒绝相关的 promise。
writableStreamDefaultWriter.close()
#
- 返回:
undefined
兑现的 promise。
当不需要额外写入时关闭 WritableStream
。
writableStreamDefaultWriter.closed
#
- 类型: <Promise> 当关联的 <WritableStream> 关闭时用
undefined
完成,或者如果在流完成关闭之前流错误或作者的锁被释放则拒绝。
writableStreamDefaultWriter.desiredSize
#
- 类型: <number>
填充 <WritableStream> 队列所需的数据量。
writableStreamDefaultWriter.ready
#
- 类型: <Promise> 当写入器准备好使用时,用
undefined
完成。
writableStreamDefaultWriter.releaseLock()
#
释放作者对底层 <ReadableStream> 的锁定。
writableStreamDefaultWriter.write([chunk])
#
chunk
: <any>- 返回:
undefined
兑现的 promise。
将新数据块附加到 <WritableStream> 的队列。
类:WritableStreamDefaultController
#
WritableStreamDefaultController
管理 <WritableStream> 的内部状态。
writableStreamDefaultController.error([error])
#
error
<any>
由用户代码调用以表示在处理 WritableStream
数据时发生了错误。 调用时,<WritableStream> 将被中止,当前挂起的写入被取消。
writableStreamDefaultController.signal
#
- 类型: <AbortSignal>
AbortSignal
可用于在 <WritableStream> 中止时取消挂起的写入或关闭操作。
类:TransformStream
#
TransformStream
由 <ReadableStream> 和 <WritableStream> 组成,它们相互连接,以便在将写入 WritableStream
的数据推送到 ReadableStream
的队列之前接收并可能进行转换。
import {
TransformStream,
} from 'node:stream/web';
const transform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
await Promise.all([
transform.writable.getWriter().write('A'),
transform.readable.getReader().read(),
]);
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])
#
transformer
<Object>start
<Function> 创建TransformStream
时立即调用的用户定义函数。controller
<TransformStreamDefaultController>- 返回:
undefined
或用undefined
实现的 promise
transform
<Function> 一个用户定义的函数,它接收并可能修改写入transformStream.writable
的数据块,然后再将其转发到transformStream.readable
。chunk
<any>controller
<TransformStreamDefaultController>- 返回:
undefined
兑现的 promise。
flush
<Function> 在TransformStream
的可写端关闭之前立即调用的用户定义函数,表示转换过程结束。controller
<TransformStreamDefaultController>- 返回:
undefined
兑现的 promise。
readableType
<any>readableType
选项保留供将来使用,必须是undefined
。writableType
<any>writableType
选项保留供将来使用,必须是undefined
。
writableStrategy
<Object>highWaterMark
<number> 应用背压之前的最大内部队列大小。size
<Function> 一个用户定义的函数,用于标识每个数据块的大小。
readableStrategy
<Object>highWaterMark
<number> 应用背压之前的最大内部队列大小。size
<Function> 一个用户定义的函数,用于标识每个数据块的大小。
transformStream.readable
#
- 类型: <ReadableStream>
transformStream.writable
#
- 类型: <WritableStream>
使用 postMessage() 传输#
可以使用 <MessagePort> 传输 <TransformStream> 实例。
const stream = new TransformStream();
const { port1, port2 } = new MessageChannel();
port1.onmessage = ({ data }) => {
const { writable, readable } = data;
// ...
};
port2.postMessage(stream, [stream]);
类:TransformStreamDefaultController
#
TransformStreamDefaultController
管理 TransformStream
的内部状态。
transformStreamDefaultController.desiredSize
#
- 类型: <number>
填充可读端队列所需的数据量。
transformStreamDefaultController.enqueue([chunk])
#
chunk
<any>
将一大块数据附加到可读端的队列中。
transformStreamDefaultController.error([reason])
#
reason
<any>
向可读和可写端触发信号,表明在处理转换数据时发生了错误,导致双方突然关闭。
transformStreamDefaultController.terminate()
#
关闭传输的可读端并导致可写端因错误而突然关闭。
类:ByteLengthQueuingStrategy
#
new ByteLengthQueuingStrategy(init)
#
byteLengthQueuingStrategy.highWaterMark
#
- 类型: <number>
byteLengthQueuingStrategy.size
#
- 类型: <Function>
类:CountQueuingStrategy
#
new CountQueuingStrategy(init)
#
countQueuingStrategy.highWaterMark
#
- 类型: <number>
countQueuingStrategy.size
#
- 类型: <Function>
类:TextEncoderStream
#
new TextEncoderStream()
#
创建新的 TextEncoderStream
实例。
textEncoderStream.encoding
#
- 类型: <string>
TextEncoderStream
实例支持的编码。
textEncoderStream.readable
#
- 类型: <ReadableStream>
textEncoderStream.writable
#
- 类型: <WritableStream>
类:TextDecoderStream
#
new TextDecoderStream([encoding[, options]])
#
创建新的 TextDecoderStream
实例。
textDecoderStream.encoding
#
- 类型: <string>
TextDecoderStream
实例支持的编码。
textDecoderStream.fatal
#
- 类型: <boolean>
如果解码错误导致抛出 TypeError
,则该值将为 true
。
textDecoderStream.ignoreBOM
#
- 类型: <boolean>
如果解码结果将包含字节顺序标记,则该值将为 true
。
textDecoderStream.readable
#
- 类型: <ReadableStream>
textDecoderStream.writable
#
- 类型: <WritableStream>
类:CompressionStream
#
new CompressionStream(format)
#
format
<string>'deflate'
或'gzip'
之一。
compressionStream.readable
#
- 类型: <ReadableStream>
compressionStream.writable
#
- 类型: <WritableStream>
类:DecompressionStream
#
new DecompressionStream(format)
#
format
<string>'deflate'
或'gzip'
之一。
decompressionStream.readable
#
- 类型: <ReadableStream>
decompressionStream.writable
#
- 类型: <WritableStream>
实用工具消费者#
实用程序消费者函数提供了用于消费流的通用选项。
使用以下方式访问它们:
import {
arrayBuffer,
blob,
buffer,
json,
text,
} from 'node:stream/consumers';
const {
arrayBuffer,
blob,
buffer,
json,
text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)
#
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回: <Promise> 使用包含流的全部内容的
ArrayBuffer
来实现。
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76
const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');
const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76
});
streamConsumers.blob(stream)
#
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回: <Promise> 用包含流的全部内容的 <Blob> 来实现。
import { blob } from 'node:stream/consumers';
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27
const { blob } = require('node:stream/consumers');
const dataBlob = new Blob(['hello world from consumers!']);
const readable = dataBlob.stream();
blob(readable).then((data) => {
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27
});
streamConsumers.buffer(stream)
#
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回: <Promise> 用包含流的全部内容的 <Buffer> 来实现。
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');
const dataBuffer = Buffer.from('hello world from consumers!');
const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
});
streamConsumers.json(stream)
#
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回: <Promise> 将流的内容解析为 UTF-8 编码字符串,然后通过
JSON.parse()
传递。
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100
const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const items = Array.from(
{
length: 100,
},
() => ({
message: 'hello world from consumers!',
}),
);
const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100
});
streamConsumers.text(stream)
#
stream
<ReadableStream> | <stream.Readable> | <AsyncIterator>- 返回: <Promise> 满足解析为 UTF-8 编码字符串的流的内容。
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';
const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27
});