Node.js v16.20.1 文档


网络流 API#

稳定性: 1 - 实验

WHATWG 流标准 的一个实现。

An implementation of the WHATWG Streams Standard.

import {
  ReadableStream,
  WritableStream,
  TransformStream,
} from 'node:stream/web';const {
  ReadableStream,
  WritableStream,
  TransformStream,
} = require('stream/web');

概述#

WHATWG 流标准(或 "网络流")定义了一个用于处理流数据的 API。 它类似于 Node.js API,但出现较晚,已成为跨许多 JavaScript 环境流式传输数据的 "standard" API。

The WHATWG Streams Standard (or "web streams") defines an API for handling streaming data. It is similar to the Node.js Streams API but emerged later and has become the "standard" API for streaming data across many JavaScript environments.

存在三种主要类型的对象:

There are three primary types of objects:

  • ReadableStream - 表示流式数据源。
  • WritableStream - 表示流数据的目的地。
  • TransformStream - 表示用于转换流数据的算法。

示例 ReadableStream#

此示例创建一个简单的 ReadableStream,它永远每秒推送一次当前的 performance.now() 时间戳。 异步迭代器用于从流中读取数据。

This example creates a simple ReadableStream that pushes the current performance.now() timestamp once every second forever. An async iterable is used to read the data from the stream.

import {
  ReadableStream
} from 'node:stream/web';

import {
  setInterval as every
} from 'node:timers/promises';

import {
  performance
} from 'node:perf_hooks';

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  }
});

for await (const value of stream)
  console.log(value);const {
  ReadableStream
} = require('node:stream/web');

const {
  setInterval: every
} = require('node:timers/promises');

const {
  performance
} = require('node:perf_hooks');

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  }
});

(async () => {
  for await (const value of stream)
    console.log(value);
})();

API#

类:ReadableStream#

new ReadableStream([underlyingSource [, strategy]])#

readableStream.locked#

readableStream.locked 属性默认为 false,当有活动读取器使用流的数据时切换为 true

The readableStream.locked property is false by default, and is switched to true while there is an active reader consuming the stream's data.

readableStream.cancel([reason])#
  • reason <any>
  • 返回: 一旦取消完成,undefined 就会兑现 promise。

readableStream.getReader([options])#
import { ReadableStream } from 'node:stream/web';

const stream = new ReadableStream();

const reader = stream.getReader();

console.log(await reader.read());const { ReadableStream } = require('node:stream/web');

const stream = new ReadableStream();

const reader = stream.getReader();

reader.read().then(console.log);

使 readableStream.locked 成为 true

Causes the readableStream.locked to be true.

readableStream.pipeThrough(transform[, options])#
  • transform <Object>
    • readable <ReadableStream> transform.writable 将向其推送可能修改的数据的 ReadableStream 是从该 ReadableStream 接收的。
    • writable <WritableStream> ReadableStream 的数据将写入的 WritableStream
  • options <Object>
    • preventAbort <boolean>true 时,此 ReadableStream 中的错误不会导致 transform.writable 中止。
    • preventCancel <boolean>true 时,目标 transform.writable 中的错误不会导致此 ReadableStream 被取消。
    • preventClose <boolean> true 时,关闭这个 ReadableStream 不会导致 transform.writable 关闭。
    • signal <AbortSignal> 允许使用 <AbortController> 取消数据传输。
  • 返回: <ReadableStream> 来自 transform.readable

将此 <ReadableStream> 连接到 transform 参数中提供的一对 <ReadableStream><WritableStream>,以便将来自此 <ReadableStream> 的数据写入 transform.writable,可能进行转换,然后推送到 transform.readable。 配置管道后,将返回 transform.readable

Connects this <ReadableStream> to the pair of <ReadableStream> and <WritableStream> provided in the transform argument such that the data from this <ReadableStream> is written in to transform.writable, possibly transformed, then pushed to transform.readable. Once the pipeline is configured, transform.readable is returned.

当管道操作处于活动状态时,使 readableStream.locked 变为 true

Causes the readableStream.locked to be true while the pipe operation is active.

import {
  ReadableStream,
  TransformStream,
} from 'node:stream/web';

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
});

const transformedStream = stream.pipeThrough(transform);

for await (const chunk of transformedStream)
  console.log(chunk);const {
  ReadableStream,
  TransformStream,
} = require('node:stream/web');

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
});

const transformedStream = stream.pipeThrough(transform);

(async () => {
  for await (const chunk of transformedStream)
    console.log(chunk);
})();

readableStream.pipeTo(destination, options)#
  • destination <WritableStream> 将写入此 ReadableStream 的数据的 <WritableStream>
  • options <Object>
    • preventAbort <boolean>true 时,此 ReadableStream 中的错误不会导致 destination 中止。
    • preventCancel <boolean>true 时,destination 中的错误不会导致此 ReadableStream 被取消。
    • preventClose <boolean> true 时,关闭这个 ReadableStream 不会导致 destination 关闭。
    • signal <AbortSignal> 允许使用 <AbortController> 取消数据传输。
  • 返回: undefined 兑现的 promise

当管道操作处于活动状态时,使 readableStream.locked 变为 true

Causes the readableStream.locked to be true while the pipe operation is active.

readableStream.tee()#

返回一对新的 <ReadableStream> 实例,此 ReadableStream 的数据将转发到该实例。 每个人都会收到相同的数据。

Returns a pair of new <ReadableStream> instances to which this ReadableStream's data will be forwarded. Each will receive the same data.

使 readableStream.locked 成为 true

Causes the readableStream.locked to be true.

readableStream.values([options])#

创建并返回可用于使用此 ReadableStream 的数据的异步迭代器。

Creates and returns an async iterator usable for consuming this ReadableStream's data.

当异步迭代器处于活动状态时,使 readableStream.locked 成为 true

Causes the readableStream.locked to be true while the async iterator is active.

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream.values({ preventCancel: true }))
  console.log(Buffer.from(chunk).toString()); 

异步迭代#

<ReadableStream> 对象支持使用 for await 语法的异步迭代器协议。

The <ReadableStream> object supports the async iterator protocol using for await syntax.

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream)
  console.log(Buffer.from(chunk).toString()); 

异步迭代器将消耗 <ReadableStream> 直到它终止。

The async iterator will consume the <ReadableStream> until it terminates.

默认情况下,如果异步迭代器提前退出(通过 breakreturnthrow),<ReadableStream> 将被关闭。 为防止 <ReadableStream> 自动关闭,使用 readableStream.values() 方法获取异步迭代器并将 preventCancel 选项设置为 true

By default, if the async iterator exits early (via either a break, return, or a throw), the <ReadableStream> will be closed. To prevent automatic closing of the <ReadableStream>, use the readableStream.values() method to acquire the async iterator and set the preventCancel option to true.

<ReadableStream> 不得锁定(即,它不得有现有的活动阅读器)。 在异步迭代期间,<ReadableStream> 将被锁定。

The <ReadableStream> must not be locked (that is, it must not have an existing active reader). During the async iteration, the <ReadableStream> will be locked.

postMessage() 一起转移#

可以使用 <MessagePort> 传输 <ReadableStream> 实例。

A <ReadableStream> instance can be transferred using a <MessagePort>.

const stream = new ReadableStream(getReadableSourceSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getReader().read().then((chunk) => {
    console.log(chunk);
  });
};

port2.postMessage(stream, [stream]); 

类:ReadableStreamDefaultReader#

默认情况下,不带参数调用 readableStream.getReader() 将返回 ReadableStreamDefaultReader 的实例。 默认读取器将通过流传递的数据块视为不透明值,这允许 <ReadableStream> 通常使用任何 JavaScript 值。

By default, calling readableStream.getReader() with no arguments will return an instance of ReadableStreamDefaultReader. The default reader treats the chunks of data passed through the stream as opaque values, which allows the <ReadableStream> to work with generally any JavaScript value.

new ReadableStreamDefaultReader(stream)#

创建锁定到给定 <ReadableStream> 的新 <ReadableStreamDefaultReader>

Creates a new <ReadableStreamDefaultReader> that is locked to the given <ReadableStream>.

readableStreamDefaultReader.cancel([reason])#
  • reason <any>
  • 返回: undefined 兑现的 promise。

取消 <ReadableStream> 并返回一个在基础流被取消时完成的 promise。

Cancels the <ReadableStream> and returns a promise that is fulfilled when the underlying stream has been canceled.

readableStreamDefaultReader.closed#
  • 类型: <Promise> 当关联的 <ReadableStream> 关闭时用 undefined 完成,或者如果流错误或读取器的锁在流完成关闭之前被释放则被拒绝。

readableStreamDefaultReader.read()#

从底层 <ReadableStream> 请求下一个数据块,并返回一个 promise,一旦数据可用,该 promise 就会实现。

Requests the next chunk of data from the underlying <ReadableStream> and returns a promise that is fulfilled with the data once it is available.

readableStreamDefaultReader.releaseLock()#

释放此读者对底层 <ReadableStream> 的锁定。

Releases this reader's lock on the underlying <ReadableStream>.

类:ReadableStreamBYOBReader#

ReadableStreamBYOBReader 是面向字节的 <ReadableStream> 的替代消费者(在创建 ReadableStream 时将 underlyingSource.type 设置为 'bytes')。

The ReadableStreamBYOBReader is an alternative consumer for byte-oriented <ReadableStream>s (those that are created with underlyingSource.type set equal to 'bytes' when the ReadableStream was created).

BYOB 是 "带上你自己的缓冲区" 的缩写。 这是一种模式,可以更有效地读取面向字节的数据,避免多余的复制。

The BYOB is short for "bring your own buffer". This is a pattern that allows for more efficient reading of byte-oriented data that avoids extraneous copying.

import {
  open
} from 'node:fs/promises';

import {
  ReadableStream
} from 'node:stream/web';

import { Buffer } from 'node:buffer';

class Source {
  type = 'bytes';
  autoAllocateChunkSize = 1024;

  async start(controller) {
    this.file = await open(new URL(import.meta.url));
    this.controller = controller;
  }

  async pull(controller) {
    const view = controller.byobRequest?.view;
    const {
      bytesRead,
    } = await this.file.read({
      buffer: view,
      offset: view.byteOffset,
      length: view.byteLength
    });

    if (bytesRead === 0) {
      await this.file.close();
      this.controller.close();
    }
    controller.byobRequest.respond(bytesRead);
  }
}

const stream = new ReadableStream(new Source());

async function read(stream) {
  const reader = stream.getReader({ mode: 'byob' });

  const chunks = [];
  let result;
  do {
    result = await reader.read(Buffer.alloc(100));
    if (result.value !== undefined)
      chunks.push(Buffer.from(result.value));
  } while (!result.done);

  return Buffer.concat(chunks);
}

const data = await read(stream);
console.log(Buffer.from(data).toString()); 

new ReadableStreamBYOBReader(stream)#

创建锁定到给定 <ReadableStream> 的新 ReadableStreamBYOBReader

Creates a new ReadableStreamBYOBReader that is locked to the given <ReadableStream>.

readableStreamBYOBReader.cancel([reason])#
  • reason <any>
  • 返回: undefined 兑现的 promise。

取消 <ReadableStream> 并返回一个在基础流被取消时完成的 promise。

Cancels the <ReadableStream> and returns a promise that is fulfilled when the underlying stream has been canceled.

readableStreamBYOBReader.closed#
  • 类型: <Promise> 当关联的 <ReadableStream> 关闭时用 undefined 完成,或者如果流错误或读取器的锁在流完成关闭之前被释放则被拒绝。

readableStreamBYOBReader.read(view)#

从底层 <ReadableStream> 请求下一个数据块,并返回一个 promise,一旦数据可用,该 promise 就会实现。

Requests the next chunk of data from the underlying <ReadableStream> and returns a promise that is fulfilled with the data once it is available.

不要将池化的 <Buffer> 对象实例传递给此方法。 池化的 Buffer 对象是使用 Buffer.allocUnsafe()Buffer.from() 创建的,或者通常由各种 node:fs 模块回调返回。 这些类型的 Buffer 使用共享的底层 <ArrayBuffer> 对象,该对象包含来自所有池化的 Buffer 实例的所有数据。 当 Buffer<TypedArray><DataView> 传递给 readableStreamBYOBReader.read() 时,视图的底层 ArrayBuffer 被分离,使该 ArrayBuffer 上可能存在的所有现有视图无效。 这可能会给你的应用带来灾难性的后果。

Do not pass a pooled <Buffer> object instance in to this method. Pooled Buffer objects are created using Buffer.allocUnsafe(), or Buffer.from(), or are often returned by various node:fs module callbacks. These types of Buffers use a shared underlying <ArrayBuffer> object that contains all of the data from all of the pooled Buffer instances. When a Buffer, <TypedArray>, or <DataView> is passed in to readableStreamBYOBReader.read(), the view's underlying ArrayBuffer is detached, invalidating all existing views that may exist on that ArrayBuffer. This can have disastrous consequences for your application.

readableStreamBYOBReader.releaseLock()#

释放此读者对底层 <ReadableStream> 的锁定。

Releases this reader's lock on the underlying <ReadableStream>.

类:ReadableStreamDefaultController#

每个 <ReadableStream> 都有一个控制器,负责流队列的内部状态和管理。 ReadableStreamDefaultControllerReadableStream 的默认控制器实现,它不是面向字节的。

Every <ReadableStream> has a controller that is responsible for the internal state and management of the stream's queue. The ReadableStreamDefaultController is the default controller implementation for ReadableStreams that are not byte-oriented.

readableStreamDefaultController.close()#

关闭与此控制器关联的 <ReadableStream>

Closes the <ReadableStream> to which this controller is associated.

readableStreamDefaultController.desiredSize#

返回填充 <ReadableStream> 队列的剩余数据量。

Returns the amount of data remaining to fill the <ReadableStream>'s queue.

readableStreamDefaultController.enqueue(chunk)#

将新数据块附加到 <ReadableStream> 的队列。

Appends a new chunk of data to the <ReadableStream>'s queue.

readableStreamDefaultController.error(error)#

触发导致 <ReadableStream> 出错并关闭的错误信号。

Signals an error that causes the <ReadableStream> to error and close.

类:ReadableByteStreamController#

每个 <ReadableStream> 都有一个控制器,负责流队列的内部状态和管理。 ReadableByteStreamController 用于面向字节的 ReadableStream

Every <ReadableStream> has a controller that is responsible for the internal state and management of the stream's queue. The ReadableByteStreamController is for byte-oriented ReadableStreams.

readableByteStreamController.byobRequest#

readableByteStreamController.close()#

关闭与此控制器关联的 <ReadableStream>

Closes the <ReadableStream> to which this controller is associated.

readableByteStreamController.desiredSize#

返回填充 <ReadableStream> 队列的剩余数据量。

Returns the amount of data remaining to fill the <ReadableStream>'s queue.

readableByteStreamController.enqueue(chunk)#

将新数据块附加到 <ReadableStream> 的队列。

Appends a new chunk of data to the <ReadableStream>'s queue.

readableByteStreamController.error(error)#

触发导致 <ReadableStream> 出错并关闭的错误信号。

Signals an error that causes the <ReadableStream> to error and close.

类:ReadableStreamBYOBRequest#

在面向字节的流中使用 ReadableByteStreamController 和使用 ReadableStreamBYOBReader 时,readableByteStreamController.byobRequest 属性提供对表示当前读取请求的 ReadableStreamBYOBRequest 实例的访问。 该对象用于获取为读取请求填充而提供的 ArrayBuffer/TypedArray 的访问权限,并提供用于触发数据已提供信号的方法。

When using ReadableByteStreamController in byte-oriented streams, and when using the ReadableStreamBYOBReader, the readableByteStreamController.byobRequest property provides access to a ReadableStreamBYOBRequest instance that represents the current read request. The object is used to gain access to the ArrayBuffer/TypedArray that has been provided for the read request to fill, and provides methods for signaling that the data has been provided.

readableStreamBYOBRequest.respond(bytesWritten)#

表示已将 bytesWritten 个字节写入 readableStreamBYOBRequest.view

Signals that a bytesWritten number of bytes have been written to readableStreamBYOBRequest.view.

readableStreamBYOBRequest.respondWithNewView(view)#

表示请求已通过写入新的 BufferTypedArrayDataView 的字节得到满足。

Signals that the request has been fulfilled with bytes written to a new Buffer, TypedArray, or DataView.

readableStreamBYOBRequest.view#

类:WritableStream#

WritableStream 是流数据发送到的目的地。

The WritableStream is a destination to which stream data is sent.

import {
  WritableStream
} from 'node:stream/web';

const stream = new WritableStream({
  write(chunk) {
    console.log(chunk);
  }
});

await stream.getWriter().write('Hello World'); 

new WritableStream([underlyingSink[, strategy]])#
  • underlyingSink <Object>
    • start <Function> 创建 WritableStream 时立即调用的用户定义函数。
    • write <Function> 将数据块写入 WritableStream 时调用的用户定义函数。
    • close <Function> WritableStream 关闭时调用的用户定义函数。
      • 返回: undefined 兑现的 promise。
    • abort <Function> 调用以突然关闭 WritableStream 的用户定义函数。
      • reason <any>
      • 返回: undefined 兑现的 promise。
    • type <any> type 选项保留供将来使用,必须未定义。
  • strategy <Object>
    • highWaterMark <number> 应用背压之前的最大内部队列大小。
    • size <Function> 一个用户定义的函数,用于标识每个数据块的大小。

writableStream.abort([reason])#
  • reason <any>
  • 返回: undefined 兑现的 promise。

突然终止 WritableStream。 所有排队的写入都将被取消,并拒绝相关的 promise。

Abruptly terminates the WritableStream. All queued writes will be canceled with their associated promises rejected.

writableStream.close()#
  • 返回: undefined 兑现的 promise。

当不需要额外写入时关闭 WritableStream

Closes the WritableStream when no additional writes are expected.

writableStream.getWriter()#

创建并创建一个新的写入器实例,可用于将数据写入 WritableStream

Creates and creates a new writer instance that can be used to write data into the WritableStream.

writableStream.locked#

writableStream.locked 属性默认为 false,当有活动写入器附加到此 WritableStream 时切换为 true

The writableStream.locked property is false by default, and is switched to true while there is an active writer attached to this WritableStream.

使用 postMessage() 传输#

可以使用 <MessagePort> 传输 <WritableStream> 实例。

A <WritableStream> instance can be transferred using a <MessagePort>.

const stream = new WritableStream(getWritableSinkSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getWriter().write('hello');
};

port2.postMessage(stream, [stream]); 

类:WritableStreamDefaultWriter#

new WritableStreamDefaultWriter(stream)#

创建锁定到给定 WritableStream 的新 WritableStreamDefaultWriter

Creates a new WritableStreamDefaultWriter that is locked to the given WritableStream.

writableStreamDefaultWriter.abort([reason])#
  • reason <any>
  • 返回: undefined 兑现的 promise。

突然终止 WritableStream。 所有排队的写入都将被取消,并拒绝相关的 promise。

Abruptly terminates the WritableStream. All queued writes will be canceled with their associated promises rejected.

writableStreamDefaultWriter.close()#
  • 返回: undefined 兑现的 promise。

当不需要额外写入时关闭 WritableStream

Closes the WritableStream when no additional writes are expected.

writableStreamDefaultWriter.closed#
  • 类型: <Promise> 当关联的 <WritableStream> 关闭时用 undefined 完成,或者如果在流完成关闭之前流错误或作者的锁被释放则拒绝。

writableStreamDefaultWriter.desiredSize#

填充 <WritableStream> 队列所需的数据量。

The amount of data required to fill the <WritableStream>'s queue.

writableStreamDefaultWriter.ready#
  • type: 当编写器准备好使用时,undefined 实现的 promise。

writableStreamDefaultWriter.releaseLock()#

释放作者对底层 <ReadableStream> 的锁定。

Releases this writer's lock on the underlying <ReadableStream>.

writableStreamDefaultWriter.write([chunk])#
  • chunk: <any>
  • 返回: undefined 兑现的 promise。

将新数据块附加到 <WritableStream> 的队列。

Appends a new chunk of data to the <WritableStream>'s queue.

类:WritableStreamDefaultController#

WritableStreamDefaultController 管理 <WritableStream> 的内部状态。

The WritableStreamDefaultController manage's the <WritableStream>'s internal state.

writableStreamDefaultController.abortReason#
  • 类型: <any> reason 值传递给 writableStream.abort()

writableStreamDefaultController.error(error)#

由用户代码调用以表示在处理 WritableStream 数据时发生了错误。 调用时,<WritableStream> 将被中止,当前挂起的写入被取消。

Called by user-code to signal that an error has occurred while processing the WritableStream data. When called, the <WritableStream> will be aborted, with currently pending writes canceled.

writableStreamDefaultController.signal#

类:TransformStream#

TransformStream<ReadableStream><WritableStream> 组成,它们相互连接,以便在将写入 WritableStream 的数据推送到 ReadableStream 的队列之前接收并可能进行转换。

A TransformStream consists of a <ReadableStream> and a <WritableStream> that are connected such that the data written to the WritableStream is received, and potentially transformed, before being pushed into the ReadableStream's queue.

import {
  TransformStream
} from 'node:stream/web';

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  }
});

await Promise.all([
  transform.writable.getWriter().write('A'),
  transform.readable.getReader().read(),
]); 

new TransformStream([transformer[, writableStrategy[, readableStrategy]]])#
  • transformer <Object>
    • start <Function> 创建 TransformStream 时立即调用的用户定义函数。
    • transform <Function> 一个用户定义的函数,它接收并可能修改写入 transformStream.writable 的数据块,然后再将其转发到 transformStream.readable
    • flush <Function>TransformStream 的可写端关闭之前立即调用的用户定义函数,表示转换过程结束。
    • readableType <any> readableType 选项保留供将来使用,必须是 undefined
    • writableType <any> writableType 选项保留供将来使用,必须是 undefined
  • writableStrategy <Object>
    • highWaterMark <number> 应用背压之前的最大内部队列大小。
    • size <Function> 一个用户定义的函数,用于标识每个数据块的大小。
  • readableStrategy <Object>
    • highWaterMark <number> 应用背压之前的最大内部队列大小。
    • size <Function> 一个用户定义的函数,用于标识每个数据块的大小。

transformStream.readable#

transformStream.writable#

使用 postMessage() 传输#

可以使用 <MessagePort> 传输 <TransformStream> 实例。

A <TransformStream> instance can be transferred using a <MessagePort>.

const stream = new TransformStream();

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  const { writable, readable } = data;
  // ...
};

port2.postMessage(stream, [stream]); 

类:TransformStreamDefaultController#

TransformStreamDefaultController 管理 TransformStream 的内部状态。

The TransformStreamDefaultController manages the internal state of the TransformStream.

transformStreamDefaultController.desiredSize#

填充可读端队列所需的数据量。

The amount of data required to fill the readable side's queue.

transformStreamDefaultController.enqueue([chunk])#

将一大块数据附加到可读端的队列中。

Appends a chunk of data to the readable side's queue.

transformStreamDefaultController.error([reason])#

向可读和可写端触发信号,表明在处理转换数据时发生了错误,导致双方突然关闭。

Signals to both the readable and writable side that an error has occurred while processing the transform data, causing both sides to be abruptly closed.

transformStreamDefaultController.terminate()#

关闭传输的可读端并导致可写端因错误而突然关闭。

Closes the readable side of the transport and causes the writable side to be abruptly closed with an error.

类:ByteLengthQueuingStrategy#

new ByteLengthQueuingStrategy(options)#

byteLengthQueuingStrategy.highWaterMark#

byteLengthQueuingStrategy.size#

类:CountQueuingStrategy#

new CountQueuingStrategy(options)#

countQueuingStrategy.highWaterMark#

countQueuingStrategy.size#

类:TextEncoderStream#

new TextEncoderStream()#

创建新的 TextEncoderStream 实例。

Creates a new TextEncoderStream instance.

textEncoderStream.encoding#

TextEncoderStream 实例支持的编码。

The encoding supported by the TextEncoderStream instance.

textEncoderStream.readable#

textEncoderStream.writable#

类:TextDecoderStream#

new TextDecoderStream([encoding[, options]])#
  • encoding <string> 标识此 TextDecoder 实例支持的 encoding默认值: 'utf-8'
  • options <Object>
    • fatal <boolean> 如果解码失败是致命的,则为 true
    • ignoreBOM <boolean>true 时,TextDecoderStream 将在解码结果中包含字节顺序标记。 当 false 时,字节顺序标记将从输出中删除。 此选项仅在 encoding'utf-8''utf-16be''utf-16le' 时使用。 默认值: false

创建新的 TextDecoderStream 实例。

Creates a new TextDecoderStream instance.

textDecoderStream.encoding#

TextDecoderStream 实例支持的编码。

The encoding supported by the TextDecoderStream instance.

textDecoderStream.fatal#

如果解码错误导致抛出 TypeError,则该值将为 true

The value will be true if decoding errors result in a TypeError being thrown.

textDecoderStream.ignoreBOM#

如果解码结果将包含字节顺序标记,则该值将为 true

The value will be true if the decoding result will include the byte order mark.

textDecoderStream.readable#

textDecoderStream.writable#

实用工具消费者#

实用程序消费者函数提供了用于消费流的通用选项。

The utility consumer functions provide common options for consuming streams.

使用以下方式访问它们:

They are accessed using:

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';const {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} = require('node:stream/consumers');

streamConsumers.arrayBuffer(stream)#
import { buffer as arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');

const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('stream');
const { TextEncoder } = require('util');

const encoder = new TextEncoder();
const dataArray = encoder.encode(['hello world from consumers!']);
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
  console.log(`from readable: ${data.byteLength}`);
});

streamConsumers.blob(stream)#
import { blob } from 'node:stream/consumers';

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);const { blob } = require('node:stream/consumers');

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
blob(readable).then((data) => {
  console.log(`from readable: ${data.size}`);
});

streamConsumers.buffer(stream)#
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
});

streamConsumers.json(stream)#
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const items = Array.from(
  {
    length: 100
  },
  () => ({
    message: 'hello world from consumers!'
  })
);

const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const items = Array.from(
  {
    length: 100
  },
  () => ({
    message: 'hello world from consumers!'
  })
);

const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
});

streamConsumers.text(stream)#
import { json, text, blob, buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
});