new stream.Duplex(options)


  • options <Object> 传给 WritableReadable 构造函数。 还具有以下字段:
    • allowHalfOpen <boolean> 如果设置为 false,则流将在可读端结束时自动结束可写端。 默认值: true
    • readable <boolean> 设置 Duplex 是否可读。 默认值: true
    • writable <boolean> 设置 Duplex 是否可写。 默认值: true
    • readableObjectMode <boolean> 为流的可读端设置 objectMode。 如果 objectModetrue,则无效。 默认值: false
    • writableObjectMode <boolean> 为流的可写端设置 objectMode。 如果 objectModetrue,则无效。 默认值: false
    • readableHighWaterMark <number> 为流的可读端设置 highWaterMark。 如果提供 highWaterMark,则无效。
    • writableHighWaterMark <number> 为流的可写端设置 highWaterMark。 如果提供 highWaterMark,则无效。
const { Duplex } = require('node:stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
}

或者,当使用 ES6 之前的风格构造函数时:

const { Duplex } = require('node:stream');
const util = require('node:util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);

或者,使用简化的构造函数方法:

const { Duplex } = require('node:stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
});

当使用管道时:

const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');

pipeline(
  fs.createReadStream('object.json')
    .setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // 接受字符串输入而不是缓冲区
    construct(callback) {
      this.data = '';
      callback();
    },
    transform(chunk, encoding, callback) {
      this.data += chunk;
      callback();
    },
    flush(callback) {
      try {
        // 确保是有效的 json。
        JSON.parse(this.data);
        this.push(this.data);
        callback();
      } catch (err) {
        callback(err);
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('completed');
    }
  },
);
  • options <Object> Passed to both Writable and Readable constructors. Also has the following fields:
    • allowHalfOpen <boolean> If set to false, then the stream will automatically end the writable side when the readable side ends. Default: true.
    • readable <boolean> Sets whether the Duplex should be readable. Default: true.
    • writable <boolean> Sets whether the Duplex should be writable. Default: true.
    • readableObjectMode <boolean> Sets objectMode for readable side of the stream. Has no effect if objectMode is true. Default: false.
    • writableObjectMode <boolean> Sets objectMode for writable side of the stream. Has no effect if objectMode is true. Default: false.
    • readableHighWaterMark <number> Sets highWaterMark for the readable side of the stream. Has no effect if highWaterMark is provided.
    • writableHighWaterMark <number> Sets highWaterMark for the writable side of the stream. Has no effect if highWaterMark is provided.
const { Duplex } = require('node:stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
}

Or, when using pre-ES6 style constructors:

const { Duplex } = require('node:stream');
const util = require('node:util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex);

Or, using the simplified constructor approach:

const { Duplex } = require('node:stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
});

When using pipeline:

const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');

pipeline(
  fs.createReadStream('object.json')
    .setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Accept string input rather than Buffers
    construct(callback) {
      this.data = '';
      callback();
    },
    transform(chunk, encoding, callback) {
      this.data += chunk;
      callback();
    },
    flush(callback) {
      try {
        // Make sure is valid json.
        JSON.parse(this.data);
        this.push(this.data);
        callback();
      } catch (err) {
        callback(err);
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('completed');
    }
  },
);