Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[译]有关Node.js Stream你所应该知道的 #12

Open
EasonYou opened this issue May 19, 2020 · 0 comments
Open

[译]有关Node.js Stream你所应该知道的 #12

EasonYou opened this issue May 19, 2020 · 0 comments

Comments

@EasonYou
Copy link
Owner

有关Node.js Stream你所应该知道的

[译] Node.js Streams: Everything you need to know

众所周知,Node.js的Stream非常难以运用且难以理解。现在我将告诉你一个好消息,这不再是个问题。

这些年来,开发者为了更容易地去运用stream模块,开发了非常多的代码包。但在这篇文章,我将把注意力集中在原生的Node.js stream API。

stream到底是什么?

stream是数据的集合————就像是字符串或者是数组。但与其不同的是,可能不会一次性全部获取到stream,也不需要常驻在内存中。这使得stream在处理大批量数据或者从外部的数据源一次性获取大块数据的时候会非常有用

然而,stream并不只能用于处理大容量的数据。它也可以使得我们的代码具有组合性。就像我们把强大的linux指令和其他的linux小指令用管道结合起来一样,我们也可以在Node中用stream将其串联起来。

const grep = ... // A stream for the grep output
const wc = ... // A stream for the wc input

grep.pipe(wc)

在Node中很多内建模块使用了stream的接口

image

上面这个列表列出了Node.js中一些用了可读(readable)和可写(writable)流的原生模块。这里面有些对象是即可读又可写的流,例如TCP sockets,zlib以及crypto等。

注意,里面有些对象也是密切相关的。当一个在客户端的HTTP响应是一个可读流的时候,它在服务端上会是一个可写流。这是因为在HTTP的情境中,我们基本上是从一个对象(http.IncomingMessage)中读并写入另一个对象中(http.ServerResponse)

我们也注意到,标准输入输出在涉及子进程时,具有相反的stream类型。这使得我们有一个非常简单的方法从主进程的标准输入输出通过管道来回传输。

stream的练习

理论很棒,但不能100%令人信服。现在来看下一个例子,它演示了在内存消耗方面,stream可以产生哪些不同的效果。

让我们生成一个大的文件

const fs = require('fs');
const file = fs.createWriteStream('./big.file');

for(let i=0; i<= 1e6; i++) {
  file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n');
}

file.end();

看看我是怎么创建一个大文件的,一个可读流(writable steam)。

fs模块可以用利用stream的接口来对文件进行读或写。在上面这个例子中,我们通过了可写流,把一百万行数据循环写到了big.file里面。

跑完上面这个脚本大概会生成一个400MB左右的文件。

这是一个专门针对处理big.file设计的一个简易服务:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  fs.readFile('./big.file', (err, data) => {
    if (err) throw err;
  
    res.end(data);
  });
});

server.listen(8000);

当服务收到了一个请求,会用异步的方法fs.readFile,对这个大文件进行处理。但我们好像没有阻塞时间轮询或者其他任何事情,所有东西开起来都很正常,是吗?

来我们来看看当我们请求这个服务的时候,同时监控下内存。

当我们把服务刚跑起来的时候,我们看到内存的占用是8.7MB:

image

接着我们请求服务,注意内存占用

image

内存占用直接飙升到了400多MB

在我们写到响应对象之前,我们只是单纯地把整个big.file的内容放到了内存中。这是性能非常差的。

HTTP的响应对象也是一个可写流。这意味着,如果我们对big.file的内容做一个可读流的改造,我们就可以让数据再两个读写流之间流动,而不需要消耗到400MB的内存。

Node的fs模块可以让我们通过createReadStream方法,创建一个可读流。现在我们把数据通过响应体进行流动传输:

const fs = require('fs');
const server = require('http').createServer();

server.on('request', (req, res) => {
  const src = fs.createReadStream('./big.file');
  src.pipe(res);
})

server.listen(8000);

现在当你连接了服务,神奇的事情发生了(看看内存消耗)

image

发生了什么?

当客户端发出了对大文件的请求,我们一次只往流中放一个数据块,这意味着,我们完全不需要把所有的数据都放在内存中进行缓冲。内存占用仅仅提升到了25MB。

你可以把这个例子的文件大小扩展到极限。重新生成一个五百万行而不是一百万行的big.file,这将会是一个几乎为2GB的文件,这完完全全超过了Node的默认内存限制的大小。

如果你在默认的内存限制大小的条件下,尝试去用fs.readFile方法,你将无法进行操作。但是用fs.createReadStream,完全可以把2GB的数据通过流的形式传递给客户端。最棒的是,进程的内存占用也是差不多的。

Streams 101

在Node.js中stream用4个基础类型: 可读流Readable,可写流Writable,可读又可写流Duplex,读写过程中可修改或转换数据的Transform

  • 可读流Readable stream是一个数据源的抽象。例如fs.createReadStream
  • 可写流Writable stream 是写入对象的一个抽象。例如fs.createWriteStream
  • 可读可写流Duplex stream,例如TCP socket
  • Transform stream是在Duplex stream的基础上,可以对传输的数据进行修改。一个例子是可用来对目标数据进行gzip压缩的zlib.createGzip。你可以将transform stream想象成一个函数,输入是一个可写流,输出是一个可读流。你也许听说过,transform stream也被称作through streams

所有的流都是EventEmitter的实例。它们通过发送事件来进行读写文件。当然,我们可以通过pipe方法来简单地操作流数据。

pipe方法

你需要记住这个神奇的方法

readableSrc.pipe(writableDest)

在这行简单的代码中,我们从一个可读流读取源数据到从管道推入到目标,一个可写流中。源数据要求是一个可读流且目标要求是一个可写流。当然,他们也可以是可读可写流duplex/transform流。实际上,如果我们在两个duplex流之间来进行数据流动,我们也可以像在Linux中那样调用管道。

readableSrc
	.pipe(transformStream1)
  .pipe(transformStream2)
  .pipe(finalWritableDest)

pipe方法返回了一个可以让我们通过链式调用的目标流。对于流a(可读),b和c(双工)和d(可写),我们可以这么做:

// a可读流,b,c双工流,d可写流
a.pipe(b).pipe(c).pipe(d)
# 上面的语法跟下面的相同
a.pipe(b)
b.pipe(c)
c.pipe(d)

# 等同于在linux中的
$ a | b | c| d

pipe方法是消费stream最简单的方法。通常建议使用pipe方法或者事件来消费流数据,但要避免混合使用这两种方式。通常我们如果使用了pipe方法的话就不需要用事件了。但是如果你需要通过更多自定义的方式来消费流的话,事件的方式会是你想要的。

流事件

除了从一个可读流数据源读取数据再写到一个可写流目标对象,pipe方法也帮助我们自动管理了一些事情。例如,它帮忙处理了错误,文件结束,以及流之间的速度不统一的情况。

当然,流也可以直接被事件消费。这里有一些在pipe中做读写数据处理的用事件实现的等价代码:

// readable.pipe(writable)
readable.on('data', (chunk) => {
  writable.write(chunk);
});

readable.on('end', () => {
  writable.end();
});

这里列出了一些可读流和可写流中重要的事件和方法:

![1_HGXpeiF5-hJrOk_8tT2jFA](/Users/easonyou/Library/Mobile Documents/iCloudcomqinxiu~MarkLite/Documents/Node/Images/1_HGXpeiF5-hJrOk_8tT2jFA.png)

这些事件和方法会在某些不知情的情况下被关联起来了,因为我们经常去使用他们

可读流最重要的事件有

  • data事件,在流传递一个数据块给消费者的时候,会发送data事件
  • end事件,当流中已经没有更多数据需要被消费的时候,会发送这个事件

可写流最重要的事件有

  • drain事件,发送一个信号,来通知可写流还可以获取更多的数据
  • finish事件,当所有数据都刷到系统底层的时候,会发送这个时间

事件和方法可以通过组合的方式,来自定义流的处理方式以及性能优化。我们可以用pipe/unpipe方法或者read/unshift/resume方法,来自定义一个可读流。我们可以使它成为pipe/unpie的终点或者单单对其用writed方法写入以及结束时调用end方法来消费一个可写流。

可读流的暂停和流动

可读流有两种主要的形式可以让我们来消费它们:

  • 它们也可以处于暂停模式
  • 或者是流动模式

这些模式有时候被称为pull(拉)模式或者是push(推)模式。

所有可读流默认都是从暂停模式开始,但它们可以在我们需要的时候很简单地在流动模式和暂停模式之间切换。有时候可以是自动地进行切换。

当一个可读流处于暂停模式,我们可以用read方法强行从流中读取数据,当然,如果可读流就是处于流动模式,数据会连续不断地流动,而我们主要监听事件就可以进行消费了。

在流动模式中,数据会因为没有消费者可以来处理数据而丢失。所以我们需要一个data事件句柄来处理在流动模式的可读流。事实上,只需要添加一个data事件就可以切换到流模式,只要把这个事件移除,就可以自动地切换回暂停模式。有一些比较旧的Node stream接口在底层已经做些这些兼容适配。

你也可以用resumepause方法来手动切换两种模式。

image

当正在消费的可读流使用了pipe方法,我们不用去担心这些模式,因为pipe方法会自动地管理它们

实现一个可写流

我们需要一个stream模块的writable构造方法来实现一个可写流。

const { Writable } = require('stream');

我们可以有很多方法来实现一个可写流。例如,继承Writable的构造方法

class myWritableStream extends Writable {}

当然,我更倾向于简化构造方法。我们只要创建一个Writable的实例方法,并传递一些参数过去。唯一一个必须需要的参数是一个write函数,它暴露出需要写入的数据块。

const { Writable } = require('stream');

const outStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(chunck.toString());
    callback();
  }
})

process.stdin.pipe(outStream);

这个write方法有三个回参:

  • chunk一直会传递buffer,除非我们以不同的方式去配置流
  • encoding回参在这个这个场景中是必须的,但是我们经常会忽略它
  • callback是一个我们在完成处理数据块后需要调用的方法

outStream中,我们简单地把数据块以字符串的形式打印出来,然后在没有判断是否成功的情况下执行了回调方法。这是一个非常简单但不大有用的一个echo流,这会接受它所有收到的数据。

为了消费这个流,我们可以简单地用process.stdin,这是一个可读流。我们现在把process.stdin通过管道放到我们的outStream

当我们执行上面的代码,我们输入的到process.stdin的所有东西将会回流到使用了outSteam的输出行中。

这不是一个非常有用的流,因为它已经在底层实现了。这已经非常接近process.stdout的实现了。我们只要把stdin通过管道流向stdout,我们会通过下面这行代码,实现一模一样的回流功能。

process.stdin.pipe(process.stdout);

实现一个可读流

我们可以通过引用Readable接口生成实例来实现一个可读流,然后在流的配置参数中,加入read方法:

const { Readable } = require('stream');
const inStream = new Readable({
	read() {}
});

这个简单的方法实现一个可读流。我们只要直接把数据推入到我们的目标消费者去进行消费就可以了。

const { Readable } = require('stream');

const inStream = new Readable({
  read() {}
});

inStream.push('ABCDEFGHIJKLM');
inStream.push('NOPQRSTUVWXYZ');

inStream.push(null); // No more data

inStream.pipe(process.stdout);

当我们推送一个null,那就意味着我们想要发送一个流里已经没有更多数据的信号。

为了消费这个简单的可读流,我们可以把它简单地用管道流到可写流process.stdout

当我们跑上面的代码的时候,我们将会读到inStream的所有数据,然后再标准输出中输出。非常简单,单也依然不高效。

我们在基本上是流导入到process.stdout之前,就把数据推到了流中。有一个更好的方式是当消费者请求数据的时候按需推送数据。我们可以再read()方法中进行配置来实现。

const inStraem = new Readable({
  read(size){
    // there is a demand on the data... Someone wants to read it.
  }
})

当可读流在read方法时,这种实现方式可以把一部分的数据推入到流中。例如,我们可以以一个字母码为65(显示为A)的时候,推送一个单词到流中,然后在每个推送递增。

const inStream = new Readable({
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
})
inStream.currentCharCode = 65;
inStream.pipe(process.stdout);

当消费者正在读者一个可读流,那么read方法将会继续触发,我们将会推送和更多的单词。当我们因为某些原因需要停下这个循环,就需要在当前的CharCode大于90(Z)的时候推送一个null给到目标。

这块逻辑等同效我们开始使用的简单代码,但是现在我们在用户要求时按需推送了数据。

实现一个Duplex/Transform流

我们可以通过Duplex流在同个对象上实现一个可读可写流。就像我们继承了两个接口一样。

这是用duplex流结合了可读可写流的例子:

const { Duplex } = require('stream');

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  },
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++));
    if (this.currentCharCode > 90) {
      this.push(null);
    }
  }
})
inoutStream.currentCharCode = 65;
process.stdin.pipe(inoutStream).pipe(process.stdout);
// 这里等同于
inoutStream.pipe(process.stdout); // pipe调用read
process.stdin.pipe(inoutStream) // pipe调用witre

通过合并这两个方法的方式,我们可以用双工流来读从A到Z的单词,也可以用它来回流数据。我们通过管道把可读流stdin流到这个双工流,然后将双工流本身通过管道传输他自己到stdout流以看到单词A到Z。

理解双工流中可读模块和可写模块最重要的是完全独立地从读的模块操作到另外一个写的模块。这仅仅是把两个特性组合到了一个对象中。

tranform流是比双工流更有趣的,因为它的输出是它的输入的一个计算结果。

对于一个transform流,我们不需要事先一个读或写的方法,我们只要实现一个transform方法来结合读写方法。它具有读的方法,我们也可以使用它来推送数据。

这是一个简单的transfrom流用你输入的任何内容再转换为大写格式来回显。

const { Transform } = require('stream');

const upperCaseTr = new Transform({
  transform(chunk, encoding,callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  }
});

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

在这个transofrm流中,我们完全像前面的双流实力那样使用它,我们只实现了一个transform方法。在那个方法,我们将数据块转换为它的大写版本,然后将该数据作为可读部分的推送。

流对象模式

在默认中,流需要的是Buffer或者String格式的数据。这里有一个objectMode参数,可以让我们设置任意一个JavaScript的对象。

这里有一个简单的例子来进行验证。下面的transform组合提供了一个将逗号分割的值字符串映射到JavaScript对象的功能。例如a,b,c,d变成{a:b, c:d}

const { Transform } = require('stream');

const commaSplitter = new Transform({
  readableObjectMode: true,
  
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().trim().split(','));
    callback();
  }
});

const arrayToObject = new Transform({
  readableObjectMode: true,
  writableObjectMode: true,
  
  transform(chunk, encoding, callback) {
    const obj = {};
    for(let i=0; i < chunk.length; i+=2) {
      obj[chunk[i]] = chunk[i+1];
    }
    this.push(obj);
    callback();
  }
});

const objectToString = new Transform({
  writableObjectMode: true,
  
  transform(chunk, encoding, callback) {
    this.push(JSON.stringify(chunk) + '\n');
    callback();
  }
});

process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)

我们传递了一个字符串,通过了commaSplitter方法的处理之后放在了一个数组中作为一个可读数据([“a”, “b”, “c”, “d”])。在流上添加readableObjectMode标识是必要的,因为我们在那里推的是一个对象而不是一个字符串。

当我们获取了数据,然后通过管道推动到了arrayToObject流的。我们需要一个writableObjectMode标识,来使流可接受一个对象。它也会推送一个对象,这也是我们需要readabaleObjectMode标识的原因。最后一个objectToString流接收一个对象,但推送的是一个字符串,所以我们只需要一个writableObjectMode标志。可读的部分知识一个普通的字符串(转为字符串后的对象)

image

Node的内建transform流

Node有一些非常有用的内建transfrom流。例如zlibcrypto

这是一个使用zlib.createGzip()方法来结合可读可写流来实现一个生成压缩文件的功能:

const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

fs.createReadStream(file)
	.pipe(zlib.createGzip())
	.pipe(fs.createWriteStream(file + '.gz'))

你可以用脚本来压缩你传入的文件。我们将该文件的可读流导入到zlib的内置transfom流中`,然后将其导入新的gzip压缩文件的可写流。

很酷的事情是,如果需要,我们可以将它跟事件结合起来。例如,我想要用户看到脚本运行中的进度以及结束信息。当pipe方法返回一个目标流,我们也可以连接事件处理方法的注册:

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .on('data', () => process.stdout.write('.'))
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

所以通过pipe方法,我们可以有一个简单的方法去消费流,但我们还可以再需要的地方使用事件进一步对流的交互进行定制。

pipe方法最棒的是,我们可以用它以一个更具可读性的方式来跟我们的程序一块一块地进行合体。例如,我们可以不监听上面的数据事件,而是简单地穿件一个转换流来通知进度,并且将on滴啊用替换成另一个pipe的调用

const fs = require('fs');
const zlib = require('zlib');
const file = process.argv[2];

const { Transform } = require('stream');

const reportProgress = new Transform({
  transform(chunk, encoding, callback) {
    process.stdout.write('.');
    callback(null, chunk);
  }
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file + '.zz'))
  .on('finish', () => console.log('Done'));

这个汇报进度的流只是简单地对流进行传递,但它在标准输出中显示了进度。注意我是怎么去如何使用回调函数中的第二个参数来将数据推入到transform方法中的。这相当于先推送数据。

组合流的应用是无穷尽的。例如,如果我们需要在压缩前后文件前后需要对其进行加密,我们只需要按我们的需要的顺序传递到另一个trasform流。我们可以用Node的crypto模块:

const crypto = require('crypto');
fs.createReadStream(file)
	.pipe(zlib.createGzip())
	.pipe(crypto.createCipher('aes192', 'a_secret'))
	.pipe(reportProgress)
	.pipe(fs.createWriteStream(file + '.zz'))
	.on('finish', () => console.log('Done'));

上面的脚本压缩并加密传递的文件,只有拥有该秘钥的人才可以使用输出的文件。我们不能直接用普通的解压工具进行解压,因为这是加密的。

为了真正对上面压缩的文件解压,我们需要使用相反的流来执行cryptozlib,这很简单:

fs.createReadStream(file)
  .pipe(crypto.createDecipher('aes192', 'a_secret'))
  .pipe(zlib.createGunzip())
  .pipe(reportProgress)
  .pipe(fs.createWriteStream(file.slice(0, -3)))
  .on('finish', () => console.log('Done'));

假设通过文件是压缩版,上面的代码将创建一个读取流,管到加密createDecipher()流(使用相同的秘密),管的输出到zlib createGunzip()流,然后把东西写出来回到一个文件没有扩展的部分。

这就是我要做的所有事情。 谢谢阅读!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant