变换


🌐 Transforms

变换有两种形式:

🌐 Transforms come in two forms:

  • 无状态 -- 每批次调用一次函数 (chunks, options) => result。接收 Uint8Array[](或作为刷新信号的 null)和一个 options 对象。返回 Uint8Array[]null,或一个块的可迭代对象。
  • 有状态 -- 一个对象 { transform(source, options) },其中 transform 是一个生成器(同步或异步),接收整个上游可迭代对象和一个 options 对象,并生成输出。这种形式用于压缩、加密以及任何需要跨批处理缓冲的转换。

这两种形式都接收一个具有以下属性的 options 参数:

🌐 Both forms receive an options parameter with the following property:

  • options.signal <AbortSignal> 当管道被取消、遇到错误或消费者停止读取时,会触发的 AbortSignal。Transform 可以检查 signal.aborted 或监听 'abort' 事件以执行提前清理。

在源结束后发送刷新信号(null),给转换器一个机会来输出尾随数据(例如,压缩尾部)。

🌐 The flush signal (null) is sent after the source ends, giving transforms a chance to emit trailing data (e.g., compression footers).

// Stateless: uppercase transform
const upper = (chunks) => {
  if (chunks === null) return null; // flush
  return chunks.map((c) => new TextEncoder().encode(
    new TextDecoder().decode(c).toUpperCase(),
  ));
};

// Stateful: line splitter
const lines = {
  transform: async function*(source) {
    let partial = '';
    for await (const chunks of source) {
      if (chunks === null) {
        if (partial) yield [new TextEncoder().encode(partial)];
        continue;
      }
      for (const chunk of chunks) {
        const str = partial + new TextDecoder().decode(chunk);
        const parts = str.split('\n');
        partial = parts.pop();
        for (const line of parts) {
          yield [new TextEncoder().encode(`${line}\n`)];
        }
      }
    }
  },
};