写在最前
本次试图浅析探索Nodejs的Stream??橹卸杂赗eadable类的一部分实现(可写流也差不多)。其中会以可读流两种模式中的paused mode即暂停模式的表现形式来解读源码上的实现,为什么不分析flowing mode自然是因为这个模式是我们常用的其原理相比暂停模式下相对简单(其实是因为笔者总是喜欢关注一些边边角角的东西,不按套路出牌=。=),同时核心方法都是一样的,一通百通嘛,有兴趣的童鞋可以自己看下完整源码。
欢迎关注我的博客,不定期更新中——
生产者消费者问题
首先先明确为什么Nodejs要实现一个stream,这就要清楚关于生产者消费者问题的概念。
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
简单来说就是内存问题。与前端不同,后端对于内存还是相当敏感的,比如读取文件这种操作,如果文件很小就算了,但如果这个文件一个g呢?难道全读出来?这肯定是不可取的。通过流的形式读一部分写一部分慢慢处理才是一个可取的方式。PS:有关为什么使用stream欢迎大家百(谷)度(歌)一下。
实现一个可读流
现在我们将自己实现一个可读流,以此来方便观察之后数据的流动过程:
const Readable = require('stream').Readable;
// 实现一个可读流
class SubReadable extends Readable {
constructor(dataSource, options) {
super(options);
this.dataSource = dataSource;
}
// 文档提出必须通过_read方法调用push来实现对底层数据的读取
_read() {
console.log('阈值规定大?。?, arguments['0'] + ' bytes')
const data = this.dataSource.makeData()
let result = this.push(data)
if(data) console.log('添加数据大?。?, data.toString().length + ' bytes')
console.log('已缓存数据大小: ', subReadable._readableState.length + ' bytes')
console.log('超过阈值限制或数据推送完毕:', !result)
console.log('====================================')
}
}
// 模拟资源池
const dataSource = {
data: new Array(1000000).fill('1'),
// 每次读取时推送一定量数据
makeData() {
if (!dataSource.data.length) return null;
return dataSource.data.splice(dataSource.data.length - 5000).reduce((a,b) => a + '' + b)
}
//每次向缓存推5000字节数据
};
const subReadable = new SubReadable(dataSource);
至此subReadable便是我们实现的自定义可读流。
Paused Mode 暂停模式都做了什么?
先来看下整体的流程:
可读流会通过
_read()
方式从资源读取数据到缓存池,同时设置了一个阈值highWaterMark
,标记数据到缓存池大小的一个上限,这个阈值是会浮动的,最小值也是默认值为16384。当消费者监听了readable
事件之后,就可以显式调用read()
方法来读取数据。
触发暂停模式
通过注册readable事件以此来触发暂停模式:
subReadable.on('readable', () => {
console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte')
console.log('------------------------------------')
})
可以发现当注册
readable
事件后可对流会从底层资源推送数据到缓存直到达到超过阈值或者底层数据全部加载完。
开始消费数据
调用read(n); n = 1000;
首先修改资源池大小data: new Array(10000).fill('1')
(方便打印数据),执行read(1000)每次读取1000字节资源读取资源:
subReadable.on('readable', () => {
let chunk = subReadable.read(1000)
if(chunk)
console.log(`读取 ${chunk.length} bytes数据`);
console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte')
console.log('------------------------------------')
})
结果执行了两次读取数据,同时如果每次读取的字节少于缓存中的数据,则可读流不会再从资源加载新的数据。
无参调用read()
subReadable.on('readable', () => {
let chunk = subReadable.read()
if(chunk)
console.log(`读取 ${chunk.length} bytes数据`);
console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte')
console.log('------------------------------------')
})
直接调用read()
后,会逐步读取完全部资源,至于每次读取多少下文会统一探讨。
小结
以上我们依次尝试了在实现可读流后触发暂停模式会发生的事情,接下来作者将会对以下几个可能有疑问的点进行探究:
- 为什么自己实现的可读流要实现
_read()
方法并在其中调用push()
- 触发暂停模式后缓存池如何被蓄满,以及为何会直接执行一次回调
- 无参调用
read()
与传入固定数据的区别
为什么自己实现的可读流要实现_read()
方法并在其中调用push()
Readable.prototype._read = function(n) {
this.emit('error', new errors.Error('ERR_STREAM_READ_NOT_IMPLEMENTED'));
}; //只是定义接口
Readable.prototype.read = function(n) {
...
var doRead = state.needReadable;
if (doRead) {
this._read(state.highWaterMark);
}
}
当我们调用subReadable.read()便会执行到上面的代码,可以发现,源码中
对于_read()
只是定义了一个接口,里面并没有具体实现,如果我们不自己定义那么就会报错。同时read()
中会执行它通过它调用push()
来从资源中读取数据,并且传入highWaterMark
,这个值你可以用也可以不用因为_read()
是我们自己实现的。
Readable.prototype.push = function(chunk, encoding) {
...
return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};
从代码中可以看出,将底层资源推送到缓存中的核心操作是通过push,通过语义化也可以看出push方法中最后会进行添加新数据的操作。由于之后方法中嵌套很多,不一一展示,直接来看最后调用的方法:
// readableAddChunk最后会调用addChunk
function addChunk(stream, state, chunk, addToFront) {
...
state.buffer.push(chunk); //数据推送到buffer中
if (state.needReadable)//判断此属性值来看是否触发readable事件
emitReadable(stream);
maybeReadMore(stream, state);//可能会推送更多数据到缓存
}
我们可以看出,方法调用的最后确实执行了资源数据推送到缓存的操作。与此同时在会判断needReadable属性值来看是否触发readable回调事件。而这也为之后我们来分析为什么注册了readable事件之后会执行一次回调埋下了伏笔。最后调用maybeReadMore()则是蓄满缓存池的方法。
触发暂停模式后缓存池如何被蓄满
先来看下源码里是如何绑定的事件:
Readable.prototype.on = function(ev, fn) {
if (ev === 'data') {
...
} else if (ev === 'readable') {
const state = this._readableState;
state.needReadable = true;//设定属性为true,触发readable回调
...
process.nextTick(nReadingNextTick, this);
}
};
function nReadingNextTick(self) {
self.read(0);
}
//之后执行read(0) => _read() => push() => addChunk()
// => maybeReadMore()
maybeReadMore()中当缓存池存储大小小于阈值时则会一直调用read(0)不读取数据,但是会一直push底层资源到缓存:
function maybeReadMore_(stream, state) {
...
if (state.length < state.highWaterMark) {
stream.read(0);
}
}
绑定监听事件后为何会直接执行一次回调
上文提到过,绑定事件后会开始推送数据至缓存池,最后会执行到addChunk()方法,内部通过needReadable属性来判断是否触发readable事件。当你第一次绑定事件时会执行state.needReadable = true;,从而在最后推送数据后会执行触发readable的操作。
read()
与传入特定数值的区别
区别在执行read()方法的时候,会将参数n传入到下面这个函数中由它来计算现在应该应该读取多少数据:
function howMuchToRead(n, state) {
if (n <= 0 || (state.length === 0 && state.ended))
return 0;
if (state.objectMode)
return 1;
if (n !== n) {
// Only flow one buffer at a time
if (state.flowing && state.length)
return state.buffer.head.data.length;
else
return state.length;
}
// If we're asking for more than the current hwm, then raise the hwm.
if (n > state.highWaterMark)
state.highWaterMark = computeNewHighWaterMark(n);
if (n <= state.length)
return n;
// Don't have enough
if (!state.ended) { //传输没有结束都是false
state.needReadable = true;
return 0;
}
return state.length;
}
当直接调用read(),n参数则为NaN,当处于流动模式的时候n则为buffer头数据的长度,否则是整个缓存的数据长度。若为read(n)传入数字,大于当前的hwm时可以发现会重新计算一个hwm,与此同时如果已缓存的数据小于请求的数据量,那么将设置state.needReadable = true;
并返回0;
总结
第一次试图梳理源码的思路,一路写下来发现有很多想说但是又不知道怎么连贯的理清楚=。= 既然代码细节也有些说不清,不过最后还是进行一个核心思路的提炼:
核心方法:
- Readable.prototype.read()
- Readable.prototype.push(); push中可能会执行emitReadable();
核心属性:
- this.needReadable:通过此属性来决定是否触发回调
核心思路:
- 推送数据至缓存与读取缓存数据的操作均由read()控制(因为read内部既实现了push也实现了howMuchToread(),不同的是前者为read(0)即只推送不读??;后者为read()或read(n);
- 注册readable事件后,执行read(0)资源就被push到缓存中,直到达到highWaterMark
- 期间会触发回调函数,如果执行read()则相当于每次都会把缓存中的数据全部取出来,缓存中时刻没有数据只能继续从资源获取直到数据全部取出并读取完毕。若执行read(n)(n < highWaterMark),则只会取出2n的数据,同时缓存资源大于n时将会停止。因为此时会认为你每次只取n个数据,缓存中完全够用,所以就不会再更新数据也就不会再触发回调。
参考资料
最后
源码的边界情况比较多。作者如果哪里说错了请指正=。=
惯例po作者的博客,不定时更新中——
有问题欢迎在issues下交流。