Skip to content

在构建较复杂的系统时,通常将其拆解为功能独立的若干部分。这些部分的接口遵循一定的规范,通过某种方式相连,以共同完成较复杂的任务。

譬如,shell 通过管道 | 连接各部分,其输入输出的规范是文本流。

Node.js 中,内置的 Stream 模块也实现了类似功能,各部分通过 .pipe() 连接。

  1. stream 种类分为以下四种:
js
const Stream = require('stream')
// 可读流
const Readable = Stream.Readable
// 可写流
const Writable = Stream.Writable
// 双工(可读可写流)
const Duplex = Stream.Duplex
// 转换(可读可写流 且支持transform转换)
const Transform = Stream.Transform
  1. pipe()

  2. zip压缩(本例中实际代码同Transform

  3. stream数据积压

js
const path = require('node:path')
const fs = require('node:fs')
const stream = require('node:stream')

// console.log(stream)

// `fs.createReadStream`创建一个`Readable`对象以读取`bigFile`的内容,并输出到标准输出
// 如果使用`fs.readFile`则可能由于文件过大而失败
fs.createReadStream(path.resolve(__dirname, './1.index.js')).pipe(process.stdout)

1.Readable

Readable:用来读取数据,比如 fs.createReadStream()

js
// const stream = require('node:stream')

// const { Readable } = stream

// console.log(Readable)

const path = require('node:path')
const fs = require('node:fs')
const filePath = path.resolve(__dirname, './sample.txt')

fs.createReadStream(filePath).pipe(process.stdout)

// const onEnd = function() {
// 	process.stdout.write(']')	
// }
// const fileStream = fs.createReadStream(filePath)
// fileStream.on('end', onEnd)
// fileStream.pipe(process.stdout)
// process.stdout.write('文件读取完成,文件内容是[')

// 文件读取完成,文件内容是[HelloWorld]

2.Writable

js
// const stream = require('node:stream')

// const { Writable } = stream

// console.log(Writable)

const path = require('node:path')
const fs = require('node:fs')
const filePath = path.resolve(__dirname, './sample.txt')

const stream = fs.createWriteStream(filePath)
stream.write('Hello World')
stream.end()

3.Duplex

Duplex 双工读写

js
const { Duplex } = require('node:stream')

const duplexStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(`Writing: ${chunk.toString()}`)
    this.data.push(chunk)
    callback()
  },
  read(size) {
    if (this.data.length > 0) {
      const chunk = this.data.shift()
      this.push(chunk)
    } else {
      this.push(null) // Signal the end of the stream
    }
  }
})

duplexStream.data = []

// 写入数据
duplexStream.write('Hello, ')
duplexStream.write('world!')
duplexStream.end() // 结束写入

// 读取数据
duplexStream.on('data', (chunk) => {
  console.log(`Reading: ${chunk.toString()}`)
})

duplexStream.on('end', () => {
  console.log('No more data.')
})

4.Transform

js
const path = require('node:path')
const fs = require('node:fs')
const zlib = require('node:zlib')

const gzip = zlib.createGzip()
const filePath = path.resolve(__dirname, './sample.txt')
const outFilePath = path.resolve(__dirname, './outSample.txt.gz')

fs.createReadStream(filePath).pipe(gzip).pipe(fs.createWriteStream(outFilePath))

5.Pipe

Node.js 中,pipe 方法是流(stream)模块中的一个重要方法,用于将一个可读流(readable stream)的输出传递到一个可写流(writable stream)中。

这使得处理数据流变得非常方便,尤其是在需要将数据从一个源传递到另一个目标的情况下,例如从文件读取数据并将其写入到另一个文件,或者在网络连接中传输数据。

js
const path = require('node:path')
const fs = require('node:fs')

// 创建可读流
const readableStream = fs.createReadStream(path.resolve(__dirname, 'sample.txt'))

// 创建可写流
const writableStream = fs.createWriteStream(path.resolve(__dirname, 'destination.txt'))

// 使用 pipe 方法将可读流的数据传递到可写流
readableStream.pipe(writableStream)

// 监听完成事件
writableStream.on('finish', () => {
  console.log('Data has been piped and written to destination.txt')
})

6.backpressure

Node.js 的流(stream)处理中,数据积压(backpressure)是一个重要的概念。

数据积压是指当一个可写流(writable stream)的速度比可读流(readable stream)慢时,可写流无法及时处理从可读流传输过来的数据,导致数据在可读流中积压。

js
const fs = require('fs')

// 创建可读流
const readableStream = fs.createReadStream('source.txt')

// 创建可写流
const writableStream = fs.createWriteStream('destination.txt')

// 处理数据积压
readableStream.on('data', (chunk) => {
  // 写入数据到可写流
  const canWrite = writableStream.write(chunk)

  // 如果返回 false,表示数据积压,暂停可读流
  if (!canWrite) {
    console.log('Backpressure detected: Pausing readable stream')
    readableStream.pause()
  }
})

// 当可写流缓冲区被清空时,恢复可读流
writableStream.on('drain', () => {
  console.log('Drain event: Resuming readable stream')
  readableStream.resume()
})

// 处理可读流结束事件
readableStream.on('end', () => {
  console.log('Readable stream ended')
  writableStream.end()
})

// 处理错误
readableStream.on('error', (err) => {
  console.error('Readable stream error:', err)
})

writableStream.on('error', (err) => {
  console.error('Writable stream error:', err)
})

writableStream.on('finish', () => {
  console.log('Writable stream finished')
})