在构建较复杂的系统时,通常将其拆解为功能独立的若干部分。这些部分的接口遵循一定的规范,通过某种方式相连,以共同完成较复杂的任务。
譬如,shell
通过管道 |
连接各部分,其输入输出的规范是文本流。
在 Node.js
中,内置的 Stream
模块也实现了类似功能,各部分通过 .pipe()
连接。
stream
种类分为以下四种:
js
const Stream = require('stream')
// 可读流
const Readable = Stream.Readable
// 可写流
const Writable = Stream.Writable
// 双工(可读可写流)
const Duplex = Stream.Duplex
// 转换(可读可写流 且支持transform转换)
const Transform = Stream.Transform
pipe()
zip
压缩(本例中实际代码同Transform
)stream
数据积压
js
const path = require('node:path')
const fs = require('node:fs')
const stream = require('node:stream')
// console.log(stream)
// `fs.createReadStream`创建一个`Readable`对象以读取`bigFile`的内容,并输出到标准输出
// 如果使用`fs.readFile`则可能由于文件过大而失败
fs.createReadStream(path.resolve(__dirname, './1.index.js')).pipe(process.stdout)
1.Readable
Readable
:用来读取数据,比如 fs.createReadStream()
。
js
// const stream = require('node:stream')
// const { Readable } = stream
// console.log(Readable)
const path = require('node:path')
const fs = require('node:fs')
const filePath = path.resolve(__dirname, './sample.txt')
fs.createReadStream(filePath).pipe(process.stdout)
// const onEnd = function() {
// process.stdout.write(']')
// }
// const fileStream = fs.createReadStream(filePath)
// fileStream.on('end', onEnd)
// fileStream.pipe(process.stdout)
// process.stdout.write('文件读取完成,文件内容是[')
// 文件读取完成,文件内容是[HelloWorld]
2.Writable
js
// const stream = require('node:stream')
// const { Writable } = stream
// console.log(Writable)
const path = require('node:path')
const fs = require('node:fs')
const filePath = path.resolve(__dirname, './sample.txt')
const stream = fs.createWriteStream(filePath)
stream.write('Hello World')
stream.end()
3.Duplex
Duplex
双工读写
js
const { Duplex } = require('node:stream')
const duplexStream = new Duplex({
write(chunk, encoding, callback) {
console.log(`Writing: ${chunk.toString()}`)
this.data.push(chunk)
callback()
},
read(size) {
if (this.data.length > 0) {
const chunk = this.data.shift()
this.push(chunk)
} else {
this.push(null) // Signal the end of the stream
}
}
})
duplexStream.data = []
// 写入数据
duplexStream.write('Hello, ')
duplexStream.write('world!')
duplexStream.end() // 结束写入
// 读取数据
duplexStream.on('data', (chunk) => {
console.log(`Reading: ${chunk.toString()}`)
})
duplexStream.on('end', () => {
console.log('No more data.')
})
4.Transform
js
const path = require('node:path')
const fs = require('node:fs')
const zlib = require('node:zlib')
const gzip = zlib.createGzip()
const filePath = path.resolve(__dirname, './sample.txt')
const outFilePath = path.resolve(__dirname, './outSample.txt.gz')
fs.createReadStream(filePath).pipe(gzip).pipe(fs.createWriteStream(outFilePath))
5.Pipe
在 Node.js
中,pipe
方法是流(stream
)模块中的一个重要方法,用于将一个可读流(readable stream
)的输出传递到一个可写流(writable stream
)中。
这使得处理数据流变得非常方便,尤其是在需要将数据从一个源传递到另一个目标的情况下,例如从文件读取数据并将其写入到另一个文件,或者在网络连接中传输数据。
js
const path = require('node:path')
const fs = require('node:fs')
// 创建可读流
const readableStream = fs.createReadStream(path.resolve(__dirname, 'sample.txt'))
// 创建可写流
const writableStream = fs.createWriteStream(path.resolve(__dirname, 'destination.txt'))
// 使用 pipe 方法将可读流的数据传递到可写流
readableStream.pipe(writableStream)
// 监听完成事件
writableStream.on('finish', () => {
console.log('Data has been piped and written to destination.txt')
})
6.backpressure
在 Node.js
的流(stream
)处理中,数据积压(backpressure
)是一个重要的概念。
数据积压是指当一个可写流(writable stream
)的速度比可读流(readable stream
)慢时,可写流无法及时处理从可读流传输过来的数据,导致数据在可读流中积压。
js
const fs = require('fs')
// 创建可读流
const readableStream = fs.createReadStream('source.txt')
// 创建可写流
const writableStream = fs.createWriteStream('destination.txt')
// 处理数据积压
readableStream.on('data', (chunk) => {
// 写入数据到可写流
const canWrite = writableStream.write(chunk)
// 如果返回 false,表示数据积压,暂停可读流
if (!canWrite) {
console.log('Backpressure detected: Pausing readable stream')
readableStream.pause()
}
})
// 当可写流缓冲区被清空时,恢复可读流
writableStream.on('drain', () => {
console.log('Drain event: Resuming readable stream')
readableStream.resume()
})
// 处理可读流结束事件
readableStream.on('end', () => {
console.log('Readable stream ended')
writableStream.end()
})
// 处理错误
readableStream.on('error', (err) => {
console.error('Readable stream error:', err)
})
writableStream.on('error', (err) => {
console.error('Writable stream error:', err)
})
writableStream.on('finish', () => {
console.log('Writable stream finished')
})