可迭代流


🌐 Iterable Streams

稳定性: 1 - 实验性

源代码: lib/stream/iter.js

node:stream/iter 模块提供了一个基于可迭代对象的流式 API,而不是基于事件驱动的 Readable/Writable/Transform 类层次结构,或 Web 流的 ReadableStream/WritableStream/TransformStream 接口。

🌐 The node:stream/iter module provides a streaming API built on iterables rather than the event-driven Readable/Writable/Transform class hierarchy, or the Web Streams ReadableStream/WritableStream/TransformStream interfaces.

此模块仅在启用 --experimental-stream-iter CLI 标志时可用。

🌐 This module is available only when the --experimental-stream-iter CLI flag is enabled.

流被表示为 AsyncIterable<Uint8Array[]>(异步)或 Iterable<Uint8Array[]>(同步)。没有可继承的基类——任何实现了可迭代协议的对象都可以参与。转换是普通函数或拥有 transform 方法的对象。

🌐 Streams are represented as AsyncIterable<Uint8Array[]> (async) or Iterable<Uint8Array[]> (sync). There are no base classes to extend -- any object implementing the iterable protocol can participate. Transforms are plain functions or objects with a transform method.

数据以批次(每次迭代 Uint8Array[])流动,以分摊异步操作的成本。

🌐 Data flows in batches (Uint8Array[] per iteration) to amortize the cost of async operations.

import { from, pull, text } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';

// Compress and decompress a string
const compressed = pull(from('Hello, world!'), compressGzip());
const result = await text(pull(compressed, decompressGzip()));
console.log(result); // 'Hello, world!'const { from, pull, text } = require('node:stream/iter');
const { compressGzip, decompressGzip } = require('node:zlib/iter');

async function run() {
  // Compress and decompress a string
  const compressed = pull(from('Hello, world!'), compressGzip());
  const result = await text(pull(compressed, decompressGzip()));
  console.log(result); // 'Hello, world!'
}

run().catch(console.error);
import { open } from 'node:fs/promises';
import { text, pipeTo } from 'node:stream/iter';
import { compressGzip, decompressGzip } from 'node:zlib/iter';

// Read a file, compress, write to another file
const src = await open('input.txt', 'r');
const dst = await open('output.gz', 'w');
await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true }));
await src.close();

// Read it back
const gz = await open('output.gz', 'r');
console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));const { open } = require('node:fs/promises');
const { text, pipeTo } = require('node:stream/iter');
const { compressGzip, decompressGzip } = require('node:zlib/iter');

async function run() {
  // Read a file, compress, write to another file
  const src = await open('input.txt', 'r');
  const dst = await open('output.gz', 'w');
  await pipeTo(src.pull(), compressGzip(), dst.writer({ autoClose: true }));
  await src.close();

  // Read it back
  const gz = await open('output.gz', 'r');
  console.log(await text(gz.pull(decompressGzip(), { autoClose: true })));
}

run().catch(console.error);