变换
🌐 Transforms
变换有两种形式:
🌐 Transforms come in two forms:
- 无状态 -- 每批次调用一次函数
(chunks, options) => result。接收Uint8Array[](或作为刷新信号的null)和一个options对象。返回Uint8Array[]、null,或一个块的可迭代对象。 - 有状态 -- 一个对象
{ transform(source, options) },其中transform是一个生成器(同步或异步),接收整个上游可迭代对象和一个options对象,并生成输出。这种形式用于压缩、加密以及任何需要跨批处理缓冲的转换。
这两种形式都接收一个具有以下属性的 options 参数:
🌐 Both forms receive an options parameter with the following property:
options.signal<AbortSignal> 当管道被取消、遇到错误或消费者停止读取时,会触发的 AbortSignal。Transform 可以检查signal.aborted或监听'abort'事件以执行提前清理。
在源结束后发送刷新信号(null),给转换器一个机会来输出尾随数据(例如,压缩尾部)。
🌐 The flush signal (null) is sent after the source ends, giving transforms
a chance to emit trailing data (e.g., compression footers).
// Stateless: uppercase transform
const upper = (chunks) => {
if (chunks === null) return null; // flush
return chunks.map((c) => new TextEncoder().encode(
new TextDecoder().decode(c).toUpperCase(),
));
};
// Stateful: line splitter
const lines = {
transform: async function*(source) {
let partial = '';
for await (const chunks of source) {
if (chunks === null) {
if (partial) yield [new TextEncoder().encode(partial)];
continue;
}
for (const chunk of chunks) {
const str = partial + new TextDecoder().decode(chunk);
const parts = str.split('\n');
partial = parts.pop();
for (const line of parts) {
yield [new TextEncoder().encode(`${line}\n`)];
}
}
}
},
};