初探Node.js Stream中Readable类的内部实现

写在最前

本次试图浅析探索Nodejs的Stream??橹卸杂赗eadable类的一部分实现(可写流也差不多)。其中会以可读流两种模式中的paused mode即暂停模式的表现形式来解读源码上的实现,为什么不分析flowing mode自然是因为这个模式是我们常用的其原理相比暂停模式下相对简单(其实是因为笔者总是喜欢关注一些边边角角的东西,不按套路出牌=。=),同时核心方法都是一样的,一通百通嘛,有兴趣的童鞋可以自己看下完整源码

欢迎关注我的博客,不定期更新中——

生产者消费者问题

首先先明确为什么Nodejs要实现一个stream,这就要清楚关于生产者消费者问题的概念。

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

简单来说就是内存问题。与前端不同,后端对于内存还是相当敏感的,比如读取文件这种操作,如果文件很小就算了,但如果这个文件一个g呢?难道全读出来?这肯定是不可取的。通过流的形式读一部分写一部分慢慢处理才是一个可取的方式。PS:有关为什么使用stream欢迎大家百(谷)度(歌)一下。

实现一个可读流

现在我们将自己实现一个可读流,以此来方便观察之后数据的流动过程:

const Readable = require('stream').Readable;
// 实现一个可读流
class SubReadable extends Readable {
  constructor(dataSource, options) {
    super(options);
    this.dataSource = dataSource;
  }
  // 文档提出必须通过_read方法调用push来实现对底层数据的读取
  _read() {
    console.log('阈值规定大?。?, arguments['0'] + ' bytes')
    const data = this.dataSource.makeData()
    let result = this.push(data)
    if(data) console.log('添加数据大?。?, data.toString().length + ' bytes')
    console.log('已缓存数据大小: ', subReadable._readableState.length + ' bytes')
    console.log('超过阈值限制或数据推送完毕:', !result)
    console.log('====================================')
  }
}

// 模拟资源池
const dataSource = {
  data: new Array(1000000).fill('1'),
  // 每次读取时推送一定量数据
  makeData() {
    if (!dataSource.data.length) return null;
    return dataSource.data.splice(dataSource.data.length - 5000).reduce((a,b) => a + '' + b)
  }
  //每次向缓存推5000字节数据
};

const subReadable = new SubReadable(dataSource);

至此subReadable便是我们实现的自定义可读流。

Paused Mode 暂停模式都做了什么?

先来看下整体的流程:

image.png

可读流会通过_read()方式从资源读取数据到缓存池,同时设置了一个阈值highWaterMark,标记数据到缓存池大小的一个上限,这个阈值是会浮动的,最小值也是默认值为16384。当消费者监听了readable事件之后,就可以显式调用read()方法来读取数据。

触发暂停模式

通过注册readable事件以此来触发暂停模式:

subReadable.on('readable', () => {
    console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte')
    console.log('------------------------------------')
})

image.png

可以发现当注册readable事件后可对流会从底层资源推送数据到缓存直到达到超过阈值或者底层数据全部加载完。

开始消费数据

调用read(n); n = 1000;

首先修改资源池大小data: new Array(10000).fill('1')(方便打印数据),执行read(1000)每次读取1000字节资源读取资源:

subReadable.on('readable', () => {
    let chunk = subReadable.read(1000)
    if(chunk) 
      console.log(`读取 ${chunk.length} bytes数据`);
    console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte')
    console.log('------------------------------------')
})
image.png

结果执行了两次读取数据,同时如果每次读取的字节少于缓存中的数据,则可读流不会再从资源加载新的数据。

无参调用read()

subReadable.on('readable', () => {
    let chunk = subReadable.read()
    if(chunk) 
      console.log(`读取 ${chunk.length} bytes数据`);
    console.log('缓存剩余数据大小: ', subReadable._readableState.length + ' byte')
    console.log('------------------------------------')
})
image

直接调用read()后,会逐步读取完全部资源,至于每次读取多少下文会统一探讨。

小结

以上我们依次尝试了在实现可读流后触发暂停模式会发生的事情,接下来作者将会对以下几个可能有疑问的点进行探究:

  • 为什么自己实现的可读流要实现_read()方法并在其中调用push()
  • 触发暂停模式后缓存池如何被蓄满,以及为何会直接执行一次回调
  • 无参调用read()与传入固定数据的区别

为什么自己实现的可读流要实现_read()方法并在其中调用push()

Readable.prototype._read = function(n) {
  this.emit('error', new errors.Error('ERR_STREAM_READ_NOT_IMPLEMENTED'));
}; //只是定义接口
Readable.prototype.read = function(n) {
    ...
    var doRead = state.needReadable;
    if (doRead) {
        this._read(state.highWaterMark);
    }
}

当我们调用subReadable.read()便会执行到上面的代码,可以发现,源码中
对于_read()只是定义了一个接口,里面并没有具体实现,如果我们不自己定义那么就会报错。同时read()中会执行它通过它调用push()来从资源中读取数据,并且传入highWaterMark,这个值你可以用也可以不用因为_read()是我们自己实现的。

Readable.prototype.push = function(chunk, encoding) {
  ...
  return readableAddChunk(this, chunk, encoding, false, skipChunkCheck);
};

从代码中可以看出,将底层资源推送到缓存中的核心操作是通过push,通过语义化也可以看出push方法中最后会进行添加新数据的操作。由于之后方法中嵌套很多,不一一展示,直接来看最后调用的方法:

// readableAddChunk最后会调用addChunk
function addChunk(stream, state, chunk, addToFront) {
  ...
    state.buffer.push(chunk); //数据推送到buffer中
    if (state.needReadable)//判断此属性值来看是否触发readable事件
      emitReadable(stream);
    maybeReadMore(stream, state);//可能会推送更多数据到缓存
}

我们可以看出,方法调用的最后确实执行了资源数据推送到缓存的操作。与此同时在会判断needReadable属性值来看是否触发readable回调事件。而这也为之后我们来分析为什么注册了readable事件之后会执行一次回调埋下了伏笔。最后调用maybeReadMore()则是蓄满缓存池的方法。

触发暂停模式后缓存池如何被蓄满

先来看下源码里是如何绑定的事件:

Readable.prototype.on = function(ev, fn) {
  if (ev === 'data') {
    ...
  } else if (ev === 'readable') {
    const state = this._readableState;
    state.needReadable = true;//设定属性为true,触发readable回调
    ...
        process.nextTick(nReadingNextTick, this);
  }
};
function nReadingNextTick(self) {
  self.read(0);
}
//之后执行read(0) => _read() => push() => addChunk()
//        => maybeReadMore()

maybeReadMore()中当缓存池存储大小小于阈值时则会一直调用read(0)不读取数据,但是会一直push底层资源到缓存:

function maybeReadMore_(stream, state) {
...
  if (state.length < state.highWaterMark) {
    stream.read(0);
  }
}

绑定监听事件后为何会直接执行一次回调

上文提到过,绑定事件后会开始推送数据至缓存池,最后会执行到addChunk()方法,内部通过needReadable属性来判断是否触发readable事件。当你第一次绑定事件时会执行state.needReadable = true;,从而在最后推送数据后会执行触发readable的操作。

read()与传入特定数值的区别

区别在执行read()方法的时候,会将参数n传入到下面这个函数中由它来计算现在应该应该读取多少数据:

function howMuchToRead(n, state) {
  if (n <= 0 || (state.length === 0 && state.ended))
    return 0;
  if (state.objectMode)
    return 1;
  if (n !== n) {
    // Only flow one buffer at a time
    if (state.flowing && state.length)
      return state.buffer.head.data.length;
    else
      return state.length;
  }
  // If we're asking for more than the current hwm, then raise the hwm.
  if (n > state.highWaterMark)
    state.highWaterMark = computeNewHighWaterMark(n);
  if (n <= state.length)
    return n;
  // Don't have enough
  if (!state.ended) { //传输没有结束都是false
    state.needReadable = true;
    return 0;
  }
  return state.length;
}

当直接调用read(),n参数则为NaN,当处于流动模式的时候n则为buffer头数据的长度,否则是整个缓存的数据长度。若为read(n)传入数字,大于当前的hwm时可以发现会重新计算一个hwm,与此同时如果已缓存的数据小于请求的数据量,那么将设置state.needReadable = true;并返回0;

总结

第一次试图梳理源码的思路,一路写下来发现有很多想说但是又不知道怎么连贯的理清楚=。= 既然代码细节也有些说不清,不过最后还是进行一个核心思路的提炼:

核心方法:

  • Readable.prototype.read()
  • Readable.prototype.push(); push中可能会执行emitReadable();

核心属性:

  • this.needReadable:通过此属性来决定是否触发回调

核心思路:

  1. 推送数据至缓存与读取缓存数据的操作均由read()控制(因为read内部既实现了push也实现了howMuchToread(),不同的是前者为read(0)即只推送不读??;后者为read()或read(n);
  2. 注册readable事件后,执行read(0)资源就被push到缓存中,直到达到highWaterMark
  3. 期间会触发回调函数,如果执行read()则相当于每次都会把缓存中的数据全部取出来,缓存中时刻没有数据只能继续从资源获取直到数据全部取出并读取完毕。若执行read(n)(n < highWaterMark),则只会取出2n的数据,同时缓存资源大于n时将会停止。因为此时会认为你每次只取n个数据,缓存中完全够用,所以就不会再更新数据也就不会再触发回调。

参考资料

最后

源码的边界情况比较多。作者如果哪里说错了请指正=。=

惯例po作者的博客,不定时更新中——
有问题欢迎在issues下交流。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容