前面介绍了Readable
与Writable
的创建方法、使用方式,以及如何控制流中的数据类型,
其中多次提到“缓存”,这部分将探究一下这个“缓存”为何物,为什么会存在。
var readable = Readable({ highWaterMark: highWaterMark })
var state = readable._readableState
先介绍两个字段:
state.buffer
:Array
,每个元素对应push(data)
中的data
(可能进行过编码,见前面的解释)。state.length
:Number
,整个缓存的长度。 如果是objectMode
,与state.buffer.length
是一样的; 否则是state.buffer
中字节数的总和。
本文中一开始给出的大文件处理案例,要求不能将文件内容一次性全读进内存,
所以fs.createReadStream
创建的Readable
对象,底层会调用fs.read
去多次从底层文件中将数据读出,
内存中存储的便是一次读取的量。
因此,每次读取的数据需要放入缓存中,等待被消耗。
那什么时候会从底层文件中去读取一次?
简单来说,每次执行readable.read()
时,
如果state.length
低于highWaterMark
,
便会执行readable._read(highWaterMark)
从底层读取数据存入缓存中。
var Stream = require('stream')
var source = ['a', 'b', 'c']
var readable = Stream.Readable({
read: function () {
var data = source.shift() || null
console.log('buffer before push', this._readableState.buffer)
console.log('push', data)
this.push(data)
console.log('buffer after push', this._readableState.buffer)
console.log('--------------')
},
})
readable.on('data', function (data) {
console.log('consume', data)
})
输出:
⌘ node example/highWaterMark.js
buffer before push []
push a
buffer after push [ <Buffer 61> ]
--------------
buffer before push [ <Buffer 61> ]
push b
buffer after push [ <Buffer 61>, <Buffer 62> ]
--------------
consume <Buffer 61>
buffer before push [ <Buffer 62> ]
push c
buffer after push [ <Buffer 62>, <Buffer 63> ]
--------------
consume <Buffer 62>
buffer before push [ <Buffer 63> ]
push null
buffer after push [ <Buffer 63> ]
--------------
consume <Buffer 63>
在监听data
事件时,发生以下事情:
- 将回调放入事件队列中,与正常的事件监听无异
- 调用
read(0)
,进而引起_read
的调用。 实际效果等同于state.buffer.push('a')
- 调用
flow()
,试图将缓存读空。 效果等同于while (read()) read()
。 - 调用
read(0)
。 由于已调用过push(null)
,所以会直接调用endReadable
来结束流。
其中的flow
环节,就是源源不断产生数据的环节。
每次调用chunk = read()
时,先检查是否需要从底层读点数据到缓存中来
(当本次读取后,剩余的数据量小于highWaterMark
时,便需要),
如果需要,就调用_read(highWaterMark)
。
然后从state.buffer
中取出一定的数据chunk
。
objectMode
或flowing
模式时即为第一个元素。
如果chunk
不为null
,便emit('data', chunk)
。
于是事件回调被执行,数据被消耗。
var Stream = require('stream')
var source = ['a', 'b', 'c']
var readable = Stream.Readable({
read: function () {
var state = this._readableState
process.nextTick(function () {
var data = source.shift() || null
console.log('buffer before push', state.buffer)
console.log('push', data)
readable.push(data)
console.log('buffer after push', state.buffer)
console.log('- - - - - - - - - - - - - -')
})
},
})
readable.on('data', function (data) {
console.log('consume', data)
})
输出:
⌘ node example/highWaterMark-async.js
buffer before push []
push a
consume <Buffer 61>
buffer after push []
--------------
buffer before push []
push b
consume <Buffer 62>
buffer after push []
--------------
buffer before push []
push c
consume <Buffer 63>
buffer after push []
--------------
buffer before push []
push null
buffer after push []
--------------
对于在_read
中异步调用push
而言,只要push
前state.buffer
为空,
便可确定当前的数据即是下一个要求的数据,所以会直接emit('data', data)
,
因而,也便不会再写入缓存。
当然,只是这个简单的例子如此而已。
在emit('data')
后,会立即调用read(0)
,触发下一次的_read
调用。
于是,数据便源源不断的产生,直到push(null)
。
var writable = Writable({ highWaterMark: highWaterMark })
var state = writable._writableState
前面解释了Readable
中highWaterMark
的作用:
控制底层读取的速度。
Writable
中highWaterMark
的作用也是控制速度:
当state.length
大于highWaterMark
时,write(data)
会返回false
,
上游可以判断这个返回值,停止往writable
中写数据,
同时监听drain
事件触发再继续写。
Writable
的缓存实际是一个待写入数据队列,
state.length
也就是这个队列的长度。
每次底层的写操作完成时,检查state.length
,
如果为0,则有可能触发drain
事件。
这个“有可能”,便是之前出现了state.length
大于highWaterMark
的情况,
外面还在等待drain
事件。