Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【node源码】stream源码阅读 #14

Open
EasonYou opened this issue May 19, 2020 · 0 comments
Open

【node源码】stream源码阅读 #14

EasonYou opened this issue May 19, 2020 · 0 comments

Comments

@EasonYou
Copy link
Owner

stream 流

Node.js中,有四种基本类型流

  • Writable,可写入流(fs.createWriteStream()
  • Readable,可读取数据的流(fs.createReadStream()
  • Duplex,可读又可写的流(net.Socket
  • Transfrom,在读写过程中可以修改或转换数据的Duplex流(zlib.createDeflate()

Node.js创建的流都是运作在字符串和buffer(或Unit8Array)上。会以对象模式进行操作

缓冲

  • 可读流和可写流都会在内部的缓冲器中存储数据,可以分别使用readable.readableBufferwritable.writableBuffer来获取
  • 可缓冲的数据大小取决于传入流构造函数的highWaterMark选项,对于普通的流,指定了字节总数,对于对象模式,指定了对象总数
  • 当可读缓冲大小达到了highWaterMark的阈值,会暂停从底层资源读取数据(readable._read()),知道当前缓冲数据被消费
  • 调用writable.write(chunk)时,数据被缓冲在可写流中,当缓冲大小小于highWaterMark的阈值,返回true,超过则返回false
  • stream的主要目标(特别是stream.pipe()),是为了限制数据的缓冲到可接受的程度,不会压垮内存

主要的使用方法参照译文有关Node.js Stream你所应该知道的

源码阅读

这里只ReadableWritable

Writable

这里,构造函数只是单纯覆盖了几个option的传入方法,方面后续调用

function Writable(options) {
  // ..
  // 初始化state
  this._writableState = new WritableState(options, this, isDuplex);
  // ..
  if (options) {
    // 覆盖调用方法
    this._write = options.write;
    this._writev = options.writev;
    this._destroy = options.destroy;
    this._final = options.final;
  }
  Stream.call(this, options);
}
Writable.prototype.write = function(chunk, encoding, cb) {
  const state = this._writableState;
  var ret = false;
  // 判断是否为buffer,不是的话需要进行转换
  const isBuf = !state.objectMode && Stream._isUint8Array(chunk);
  if (isBuf && !(chunk instanceof Buffer)) {
    chunk = Stream._uint8ArrayToBuffer(chunk);
  }
  // ...
  // 触发了end()方法,会调用write方法,此时进入writeAfterEnd
  if (state.ending)
    writeAfterEnd(this, cb);
  else if (isBuf || validChunk(this, state, chunk, cb)) {
    // 一般情况,进入writeOrBuffer
    state.pendingcb++;
    ret = writeOrBuffer(this, state, chunk, encoding, cb);
  }

  return ret;
};

// writeOrBuffer判断了一堆东西,最后调用了doWrite方法
function writeOrBuffer(stream, state, chunk, encoding, cb) {
  // ...
  if (state.writing || state.corked) {
    // ...
  } else {
    // 这里滴啊用doWrite
    doWrite(stream, state, false, len, chunk, encoding, cb);
  }
  return ret;
}

function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  // ..
  if (state.destroyed)
    state.onwrite(new ERR_STREAM_DESTROYED('write'));
  else if (writev)
    // 如果有传入writev,调用我们传入的writev方法
    stream._writev(chunk, state.onwrite);
  else
    // 否则调用write方法
    stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}

可以看到,write方法最后就是调用了_writev/_write方法,其中,_writev会覆盖_write。而这两个方法正是我们传入的两个方法,在构造函数上做了绑定

Readable

// 构造函数
function Readable(options) {
  // ...
  // 获取可读流的状态初始化,基本是状态赋值,不细究内部运作
  this._readableState = new ReadableState(options, this, isDuplex);

  if (options) {
      // read方法覆盖
      this._read = options.read;
      // destroy方法覆盖
      this._destroy = options.destroy;
  }
  // 继承Stream
  // 实质上,Stream是个遗留类,基本不需要用到Stream的方法,这里只是单纯做个继承
  Stream.call(this, options);
}

可以看到,构造函数做了两件事情

  • 初始化可读流的状态
  • 覆盖传入的read方法以及destroy方法

read方法

要调用read方法,那么options中的read参数是必传的,否则无法在read方法中去调用

read方法做了这么几件事情

  • 判断是否有readable事件,是则触发
  • 如果已经结束,返回null
  • 如果需要读取数据,调用_read方法读取数据
  • 如果读取成功,就触发data事件
Readable.prototype.read = function(n) {
  // 如果传入的size值大于设置的highWaterMark获取一个合适的highWaterMark
  if (n > state.highWaterMark)
    state.highWaterMark = computeNewHighWaterMark(n);
  // ...
  // 如果有readable事件,处理readable事件
  if (n === 0 &&
      state.needReadable &&
      ((state.highWaterMark !== 0 ?
        state.length >= state.highWaterMark :
        state.length > 0) ||
       state.ended)) {
    if (state.length === 0 && state.ended)
      endReadable(this); // 触发readable事件结束
    else
      emitReadable(this); // 触发readable事件
    return null;
  }
  // 计算剩余数据量
  n = howMuchToRead(n, state);
  // 如果没有剩余数据,结束这个读取
  if (n === 0 && state.ended) {
    if (state.length === 0) endReadable(this);
    return null;
  }
  var doRead = state.needReadable;
  // 如果当前的数据小于highWaterMark,依然读取数据
  if (state.length === 0 || state.length - n < state.highWaterMark) {
    doRead = true;
  }

  // 读取标识已经结束的话
  if (state.ended || state.reading || state.destroyed) {
    doRead = false;
  } else if (doRead) {
    // ..
    // 调用read方法,这个方法可能会被传入的read方法覆盖
    this._read(state.highWaterMark);
    state.sync = false;
    // ..
  }
  // 触发data事件
  if (ret !== null)
    this.emit('data', ret);

  return ret;
};

push方法

push方法在推送null前,可以把数据保存在流中,以下是一个简单实例

const { Readable } = require('stream');

const inStream = new Readable({
  read() {}
});
inStream.on('data', (chunk) => {
  console.log(chunk.toString()) // 触发两次
})
inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // No more data

inStream.pipe(process.stdout);

源码部分

// push方法主要是调用了readableAddChunk方法
Readable.prototype.push = function(chunk, encoding) {
  return readableAddChunk(this, chunk, encoding, false);
};

function readableAddChunk(stream, chunk, encoding, addToFront) {
  debug('readableAddChunk', chunk);
  const state = stream._readableState;

  // ..这部分代码做了是否是对象和字符串的判断,用于下面是否要检查合法性,忽略

  // 如果推送的是空的,禁止push
  if (chunk === null) {
    state.reading = false;
    onEofChunk(stream, state);
  } else {
    var er;
    // ..
    if (er) {
      errorOrDestroy(stream, er);
    } else if (state.objectMode || (chunk && chunk.length > 0)) { // 一般进入这个条件
      // 如果是对象,且不是buffer,则认为是Object,转换成buffer
      if (typeof chunk !== 'string' &&
          !state.objectMode &&
          !(chunk instanceof Buffer)) {
        chunk = Stream._uint8ArrayToBuffer(chunk);
      }

      if (addToFront) {
        // ..
      } else if (state.ended) {
        // ..
      } else if (state.destroyed) {
        return false;
      } else {
        // 正常情况
        state.reading = false;
        // 中间逻辑忽略,集中在addChunk方法中
        addChunk(stream, state, chunk, false);
      }
    } else if (!addToFront) {
      // ..
    }
  }
  return !state.ended &&
    (state.length < state.highWaterMark || state.length === 0);
}

这里发现push方法是调用的readableAddChunk方法,它的作用如下

  • 判断chunk的合法性
  • 如果chunk为null,则表示流的结束
  • 如果chunk都合法,则调用addChunk方法

addChunck中,会把push进去的数据,放到state的buffer中缓存起来

function addChunk(stream, state, chunk, addToFront) {
  // ..
    // 推入buffer
    state.length += state.objectMode ? 1 : chunk.length;
    if (addToFront)
      state.buffer.unshift(chunk);
    else
      state.buffer.push(chunk);

  // 在maybeReadMore里,会调用一个stream.read方法
  // 这个方法会触发data事件,所以,监听data的时候,push也可以触发该事件
  maybeReadMore(stream, state);
}

function maybeReadMore(stream, state) {
  if (!state.readingMore) {
    state.readingMore = true;
    process.nextTick(maybeReadMore_, stream, state);
  }
}
function maybeReadMore_(stream, state) {
  while (!state.reading && !state.ended &&
         (state.length < state.highWaterMark ||
          (state.flowing && state.length === 0))) {
    const len = state.length;
    stream.read(0); // 再次读取
    if (len === state.length)
      break;
  }
  state.readingMore = false;
}

pipe方法

把内部定义的方法细节隐藏掉,可以看到pipe方法的整体框架

  • 绑定目标可写流到state的pipes上
  • 可读流监听结束事件
  • 可写流监听pipe移除事件
  • 可读流监听data事件
  • 可写流监听error事件
  • 可写流监听close事件
  • 可写流监听finish事件
  • 可写流通知pipe事件
// pipe方法很长,我们吧里面的一些function的内部细节先隐藏掉
Readable.prototype.pipe = function(dest, pipeOpts) {
  const src = this;
  const state = this._readableState;

  switch (state.pipesCount) {
    // 这里隐藏其他代码,做用是绑定dest到state的pipie上
    state.pipes = dest;
  }
  state.pipesCount += 1;
  // ...

  // 监听结束结束
  const endFn = doEnd ? onend : unpipe;
  if (state.endEmitted)
    process.nextTick(endFn);
  else
    src.once('end', endFn);
  function onend() { /** ... */}
  function unpipe() { /** ... */}

  // dest监听移除pipe的事件
  dest.on('unpipe', onunpipe);
  function onunpipe(readable, unpipeInfo) { /** ... */}

  let ondrain;
  var cleanedUp = false;
  function cleanup() { /** ... */}

  // 监听data事件
  src.on('data', ondata);
  function ondata(chunk) { /** ... */}
  // If the dest has an error, then stop piping into it.
  // However, don't suppress the throwing behavior for this.
  function onerror(er) { /** ... */}

  // 监听dest的error事件,这里做了特殊处理,细节忽略
  prependListener(dest, 'error', onerror);

  // 监听dest的close事件
  function onclose() { /** ... */}
  dest.once('close', onclose);

  // 监听dest的finish事件
  function onfinish() { /** ... */}
  dest.once('finish', onfinish);

  // 通知dest的pipe事件
  dest.emit('pipe', src);
  // 在这里开始读取数据,开始读取,切换到流动模式
  if (!state.flowing) {
    debug('pipe resume');
    src.resume();
  }
  return dest;
};

下面把细节方法展开

当有数据流动的时候,触发了data方法,这是pipe实现的关键方法
这个方法非常简单,只是把可读流读取的data,写入了可写流,在这里实现了pipe的功能
相当于只是帮助我们把这一层,后面有做无更多数据时的一些兜底处理,监听drain事件,暂停读取数据等等

function ondata(chunk) {
  const ret = dest.write(chunk); // 直接写入chunk
  if (ret === false) {
    // ..
    if (!ondrain) {
      // ..监听drain事件
      dest.on('drain', ondrain);
    }
    // 暂停读取
    src.pause();
  }
}

首先是监听的结束事件,这里有两个点,一个是读取结束,一个是管道移除

// 管道移除
function onunpipe(readable, unpipeInfo) {
    // 这里做了各种判断,最后调用cleanup方法
}
function cleanup() {
    // 这里把pipe中,所有的方法都移除了
    dest.removeListener('close', onclose);
    dest.removeListener('finish', onfinish);
    if (ondrain) {
      dest.removeListener('drain', ondrain);
    }
    dest.removeListener('error', onerror);
    dest.removeListener('unpipe', onunpipe);
    src.removeListener('end', onend);
    src.removeListener('end', unpipe);
    src.removeListener('data', ondata);
    // 设置标志位
    cleanedUp = true;
    // ...
  }

其他的,不管是触发close还是error还是可读流的unpipe,都触发了unpipe方法
这个方法很简单,只是单纯地调用了可读流的unpipe

function unpipe() {
  src.unpipe(dest);
}

在方法的最后,调用了resume方法。从文档可以得知,resume方法将被暂停的可读流恢复触发 'data' 事件,并将流切换到流动模式

这个方法是开始文件流读取的关键,在这里开始流式读取数据

最后在stream.read(0)开始读取数据,这个内部会触发push方法,然后触发上面的maybeReadMore_方法,连续不断地读取数据

Readable.prototype.resume = function() {
  const state = this._readableState;
  if (!state.flowing) {
    // 调用resume方法
    state.flowing = !state.readableListening;
    resume(this, state);
  }
  return this;
};
// 在nextTick中,调用了resume_方法
function resume(stream, state) {
  if (!state.resumeScheduled) {
    state.resumeScheduled = true;
    process.nextTick(resume_, stream, state); // nextTick调用
  }
}
function resume_(stream, state) {
  debug('resume', state.reading);
  if (!state.reading) {
    stream.read(0);
  }

  state.resumeScheduled = false;
  stream.emit('resume');
  flow(stream);
  if (state.flowing && !state.reading)
    stream.read(0);
}

从fs的createReadStream和createWriteStream看stream类的调用

stream类,从根本上是一个EventEmiiter的一个对stream的定制的类。它并没有实际上的写入方法,只是封装了很多的事件和方法,让上层更易于去调用。

现在看看stream类的在fs上的createReadStreamcreateWriteStream的调用,来从代码层面看看stream类的应用场景

实例

这里,我们从一个文件读取数据,再通过pipe写入另一个文件,非常简单的几行代码

const { createReadStream, createWriteStream } = require('fs')
// 该文件必须存在
const read = createReadStream('./input.md')
// 写的文件可以不存在,会自动创建
const write = createWriteStream('./output.md')
// 最后通过pipe,来进行写入
read.pipe(write)

fsStream

fs的ReadStreamWriteStream通过lazyLoadStreams方法引

// 先定义两个参数
let ReadStream;
let WriteStream;

function lazyLoadStreams() {
  if (!ReadStream) { // 如果没有引用过
    // 引入并赋值
    ({ ReadStream, WriteStream } = require('internal/fs/streams'));
  }
}
// 以下的createReadStream和createWriteStream就是通过
// ReadStream和WriteStream来进行实例生成的
function createReadStream(path, options) {
  lazyLoadStreams();
  return new ReadStream(path, options);
}

function createWriteStream(path, options) {
  lazyLoadStreams();
  return new WriteStream(path, options);
}

ReadStream

构造函数做了以下几件事情

  • 设置highWaterMark
  • 继承Readable类
  • 记录路径和文件描述符等配置
  • 如果有设置开始和结束点,对它们进行检查
  • 开始读取,open方式是打开了一个文件流
  • 监听结束事件,一旦结束调用,则调用Readable类的destroy方法(_destroy覆写来做fs流的定制)
function ReadStream(path, options) {
  // ...
  if (options.highWaterMark === undefined)
    options.highWaterMark = 64 * 1024; // 默认的highWaterMark为64k
  // ...
  // 继承Stream的Readable类
  Readable.call(this, options);
  // 记录path
  this.path = toPathIfFileURL(path);
  // 记录文件描述符
  this.fd = options.fd === undefined ? null : options.fd;
  // ..
  // 如果start不是空的,检查start的合法性,并记录当前的检查点
  if (this.start !== undefined) {
    checkPosition(this.start, 'start');
    this.pos = this.start;
  }
  // 没设置edn,则默认无穷
  if (this.end === undefined) {
    this.end = Infinity;
  } else if (this.end !== Infinity) {
    checkPosition(this.end, 'end'); // 检查传入的结束点
    // ...
  }
  // 如果文件描述符合法,则调用open方法
  if (typeof this.fd !== 'number')
    this.open();
  // 监听end时间,如果结束,则调用destroy方法结束
  this.on('end', function() {
    if (this.autoClose) {
      this.destroy();
    }
  });
}

ReadStream.prototype._destroy = function(err, cb) {
  // ..其他代码省略,主要调用closeFsStream方法
  closeFsStream(this, cb, err);
};
function closeFsStream(stream, cb, err) {
  // 通过文件描述符调用fs的cloase方法
  fs.close(stream.fd, (er) => { /** ... */});
  // 文件描述符清空
  stream.fd = null;
}

下面主要来看下open方法

ReadStream.prototype.open = function() {
  fs.open(this.path, this.flags, this.mode, (er, fd) => {
    // ...
    // 记录文件描述符
    this.fd = fd;
    this.emit('open', fd); // 通知open方法
    this.emit('ready'); // 通知ready方法
    // 开始读文件的数据流
    // 在这里调用的是Stream的Readable类的_read()方法
    // 从上面的源码可知,ReadStream肯定重写了_read()方法
    this.read(); // 这里是箭头函数,所以this指向的是ReadStream
  });
};

// 从下面找到了_read方法,挂载在ReadStream的prototype上
// 从上面的源码可知,n传入的highWaterMark的值
ReadStream.prototype._read = function(n) {
  // ..
  // 这里可以看到有个allocNewPool的方法,通过这个方法可以控制内存池的大小,而不会被挤爆
  if (!pool || pool.length - pool.used < kMinPoolSpace) {
    allocNewPool(this.readableHighWaterMark); // 这里拿到的是highWaterMark的值
  }

  const thisPool = pool;
  // 取highWaterMark和内存池中可用值的最小值
  let toRead = MathMin(pool.length - pool.used, n);
  // 记录开始
  const start = pool.used;
  // 下面都是计算本次需要读的数据量大小
  if (this.pos !== undefined)
    toRead = MathMin(this.end - this.pos + 1, toRead);
  else
    toRead = MathMin(this.end - this.bytesRead + 1, toRead);
  // 如果toRead已经没有了,push个null进去以结束流的读取
  if (toRead <= 0)
    return this.push(null);
  // fs.read通过内建模块读取
  fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
    // ...
    let b = null;
    // ... 这里做了一堆的内存池的控制,忽略
    // 最终还是读到了数据并推入数据
    // 这里会触发maybeReadMore方法,引起数据的不断读取
    this.push(b);
  });

  // Move the pool positions, and internal position for reading.
  if (this.pos !== undefined)
    this.pos += toRead;

  pool.used = roundUpToMultipleOf8(pool.used + toRead);
};
// allocNewPool方法分配了buffer内存空间,记录了已用空间
function allocNewPool(poolSize) {
  if (poolFragments.length > 0)
    pool = poolFragments.pop();
  else
    pool = Buffer.allocUnsafe(poolSize);
  pool.used = 0;
}

WriteStream

WriteStream比较简单,只是继承了Writable,重写了write方法。用fs文件写入的方式,将流数据写入文件

function WriteStream(path, options) {
  // ..
  Writable.call(this, options);
  this.path = toPathIfFileURL(path);
  this.fd = options.fd === undefined ? null : options.fd;
  // ...
  if (typeof this.fd !== 'number')
    this.open(); // 打开文件
}

WriteStream.prototype.open = function() {
  fs.open(this.path, this.flags, this.mode, (er, fd) => {
    // ...
    this.fd = fd; // 记录文件操作符
    this.emit('open', fd); // 触发open事件
    this.emit('ready'); // 触发ready事件
  });
};

因为继承了Writable,内部的write方法直接调用了_write方法进行写入

举例在pipe上,有个可写流监听data事件,触发的write方法,也间接调用了_write方法

src.on('data', ondata);
function ondata(chunk) {
  const ret = dest.write(chunk); // 直接写入chunk
  // ...
}

WriteStream.prototype._write = function(data, encoding, cb) {
  // ...
  // 写入文件
  fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
    // ...
  });
  // 记录文件位置
  if (this.pos !== undefined)
    this.pos += data.length;
};

结语

至此,只是把stream的一部分内部代码进行了分析,还有很多诸如duplex以及transform等等类未深入研究。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant