readable.reduce(fn[, initial[, options]])
fn<Function> | <AsyncFunction> 是一个在流中的每个数据块上调用的归约函数。previous<any> 上一次调用fn获得的值,如果指定了initial值则使用该值,否则使用流的第一个块。data<any> 从流中获取一块数据。options<Object>signal<AbortSignal> 如果流被销毁则中止,从而允许提前中止fn调用。
initial<any> 用于归约的初始值。options<Object>signal<AbortSignal> 允许在信号被中止时销毁流。
- 返回:<Promise> 一个用于归约最终值的 Promise。
此方法按顺序对流的每个块调用 fn,将前一个元素计算的结果传递给它。它返回一个表示归约最终值的 Promise。
【This method calls fn on each chunk of the stream in order, passing it the
result from the calculation on the previous element. It returns a promise for
the final value of the reduction.】
如果未提供 initial 值,则流的第一个数据块将用作初始值。如果流为空,Promise 将被拒绝,并返回一个 TypeError,其 code 属性为 ERR_INVALID_ARGS。
【If no initial value is supplied the first chunk of the stream is used as the
initial value. If the stream is empty, the promise is rejected with a
TypeError with the ERR_INVALID_ARGS code property.】
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.reduce(async (totalSize, file) => {
const { size } = await stat(join(directoryPath, file));
return totalSize + size;
}, 0);
console.log(folderSize); reducer 函数逐元素迭代流,这意味着没有 concurrency 参数或并行处理。要并发执行 reduce,你可以将异步函数提取到 readable.map 方法中。
【The reducer function iterates the stream element-by-element which means that
there is no concurrency parameter or parallelism. To perform a reduce
concurrently, you can extract the async function to readable.map method.】
import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';
const directoryPath = './src';
const filesInDir = await readdir(directoryPath);
const folderSize = await Readable.from(filesInDir)
.map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
.reduce((totalSize, { size }) => totalSize + size, 0);
console.log(folderSize);