Welcome

首页 / 脚本样式 / JavaScript / 浅谈Node.js:理解stream

Stream在node.js中是一个抽象的接口,基于EventEmitter,也是一种Buffer的高级封装,用来处理流数据。流模块便是提供各种API让我们可以很简单的使用Stream。

流分为四种类型,如下所示:
  • Readable,可读流
  • Writable,可写流
  • Duplex,读写流
  • Transform,扩展的Duplex,可修改写入的数据
1、Readable可读流
通过stream.Readable可创建一个可读流,它有两种模式:暂停和流动。

在流动模式下,将自动从下游系统读取数据并使用data事件输出;暂停模式下,必须显示调用stream.read()方法读取数据,并触发data事件。

所有的可读流最开始都是暂停模式,可以通过以下方法切换到流动模式:
  • 监听"data"事件
  • 调用stream.resume()方法
  • 调用stream.pipe()方法将数据输出到一个可写流Writable
同样地,也可以切换到暂停模式,有两种方法:
  • 如果没有设置pipe目标,调用stream.pause()方法即可。
  • 如果设置了pipe目标,则需要移除所有的data监听和调用stream.unpipe()方法
在Readable对象中有一个_readableSate的对象,通过该对象可以得知流当前处于什么模式,如下所示:
  • readable._readableState.flowing = null,没有数据消费者,流不产生数据
  • readable._readableState.flowing = true,处于流动模式
  • readable._readableState.flowing = false,处于暂停模式
为什么使用流取数据

对于小文件,使用fs.readFile()方法读取数据更方便,但需要读取大文件的时候,比如几G大小的文件,使用该方法将消耗大量的内存,甚至使程序崩溃。这种情况下,使用流来处理是更合适的,采用分段读取,便不会造成内存的"爆仓"问题。
data事件

在stream提供数据块给消费者时触发,有可能是切换到流动模式的时候,也有可能是调用readable.read()方法且有有效数据块的时候,使用如下所示:
const fs = require("fs");const rs = fs.createReadStream("./appbak.js");var chunkArr = [],chunkLen = 0;rs.on("data",(chunk)=>{chunkArr.push(chunk);chunkLen+=chunk.length;});rs.on("end",(chunk)=>{console.log(Buffer.concat(chunkArr,chunkLen).toString());});
readable事件

当流中有可用数据能被读取时触发,分为两种,新的可用的数据和到达流的末尾,前者stream.read()方法返回可用数据,后者返回null,如下所示:
const rs = fs.createReadStream("./appbak.js");var chunkArr = [],chunkLen = 0;rs.on("readable",()=>{var chunk = null;//这里需要判断是否到了流的末尾if((chunk = rs.read()) !== null){chunkArr.push(chunk);chunkLen+=chunk.length;}});rs.on("end",(chunk)=>{console.log(Buffer.concat(chunkArr,chunkLen).toString());});
pause和resume方法
stream.pause()方法让流进入暂停模式,并停止"data"事件触发,stream.resume()方法使流进入流动模式,并恢复"data"事件触发,也可以用来消费所有数据,如下所示:
const rs = fs.createReadStream("./下载.png");rs.on("data",(chunk)=>{console.log(`接收到${chunk.length}字节数据...`);rs.pause();console.log(`数据接收将暂停1.5秒.`);setTimeout(()=>{rs.resume();},1000);});rs.on("end",(chunk)=>{console.log(`数据接收完毕`);});
pipe(destination[, options])方法
pipe()方法绑定一个可写流到可读流上,并自动切换到流动模式,将所有数据输出到可写流,以及做好了数据流的管理,不会发生数据丢失的问题,使用如下所示:
const rs = fs.createReadStream("./app.js");rs.pipe(process.stdout);
以上介绍了多种可读流的数据消费的方法,但对于一个可读流,最好只选择其中的一种,推荐使用pipe()方法。
2、Writable可写流
所有的可写流都是基于stream.Writable类创建的,创建之后便可将数据写入该流中。

write(chunk[, encoding][, callback])方法

write()方法向可写流中写入数据,参数含义:
  • chunk,字符串或buffer
  • encoding,若chunk为字符串,则是chunk的编码
  • callback,当前chunk数据写入磁盘时的回调函数
该方法的返回值为布尔值,如果为false,则表示需要写入的数据块被缓存并且此时缓存的大小超出highWaterMark阀值,否则为true。
 使用如下所示:
const ws = fs.createWriteStream("./test.txt");ws.write("nihao","utf8",()=>{process.stdout.write("this chunk is flushed.");});ws.end("done.")
背压机制

如果可写流的写入速度跟不上可读流的读取速度,write方法添加的数据将被缓存,逐渐增多,导致占用大量内存。我们希望的是消耗一个数据,再去读取一个数据,这样内存就维持在一个水平上。如何做到这一点?可以利用write方法的返回值来判断可写流的缓存状态和"drain"事件,及时切换可读流的模式,如下所示:
function copy(src,dest){src = path.resolve(src);dest = path.resolve(dest);const rs = fs.createReadStream(src);const ws = fs.createWriteStream(dest);console.log("正在复制中...");const stime = +new Date();rs.on("data",(chunk)=>{if(null === ws.write(chunk)){rs.pause();}});ws.on("drain",()=>{rs.resume();});rs.on("end",()=>{const etime = +new Date();console.log(`已完成,用时:${(etime-stime)/1000}秒`);ws.end();});function calcProgress(){}}copy("./CSS权威指南 第3版.pdf","./javascript.pdf");
drain事件

如果Writable.write()方法返回false,则drain事件将会被触发,上面的背压机制已经使用了该事件。
finish事件

在调用stream.end()方法之后且所有缓存区的数据都被写入到下游系统,就会触发该事件,如下所示:
const ws = fs.createWriteStream("./alphabet.txt");const alphabetStr = "abcdefghijklmnopqrstuvwxyz";ws.on("finish",()=>{console.log("done.");});for(let letter of alphabetStr.split()){ws.write(letter);}ws.end();//必须调用
end([chunk][, encoding][, callback])方法
end()方法被调用之后,便不能再调用stream.write()方法写入数据,负责将抛出错误。
3、Duplex读写流
Duplex流同时实现了Readable与Writable类的接口,既是可读流,也是可写流。例如"zlib streams"、"crypto streams"、"TCP sockets"等都是Duplex流。
4、Transform流
Duplex流的扩展,区别在于,Transform流自动将写入端的数据变换后添加到可读端。例如:"zlib streams"、"crypto streams"等都是Transform流。
5、四种流的实现
stream模块提供的API可以让我们很简单的实现流,该模块使用require("stream")引用,我们只要继承四种流中的一个基类(stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后实现它的接口就可以了,需要实现的接口如下所示:

| Use-case | Class | Method(s) to implement |
 | ------------- |-------------| -----|
 | Reading only | Readable | _read |
 | Writing only | Writable | _write, _writev |
 | Reading and writing | Duplex | _read, _write, _writev |
 | Operate on written data, then read the result | Transform | _transform, _flush |
Readable流实现

如上所示,我们只要继承Readable类并实现_read接口即可,,如下所示:
const Readable = require("stream").Readable;const util = require("util");const alphabetArr = "abcdefghijklmnopqrstuvwxyz".split();/*function AbReadable(){if(!this instanceof AbReadable){return new AbReadable();}Readable.call(this);}util.inherits(AbReadable,Readable);AbReadable.prototype._read = function(){if(!alphabetArr.length){this.push(null);}else{this.push(alphabetArr.shift());}};const abReadable = new AbReadable();abReadable.pipe(process.stdout);*//*class AbReadable extends Readable{constructor(){super();}_read(){if(!alphabetArr.length){this.push(null);}else{this.push(alphabetArr.shift());}}}const abReadable = new AbReadable();abReadable.pipe(process.stdout);*//*const abReadable = new Readable({read(){if(!alphabetArr.length){this.push(null);}else{this.push(alphabetArr.shift());}}});abReadable.pipe(process.stdout);*/const abReadable = Readable();abReadable._read = function(){if (!alphabetArr.length) {this.push(null);} else {this.push(alphabetArr.shift());}}abReadable.pipe(process.stdout);
以上代码使用了四种方法创建一个Readable可读流,必须实现_read()方法,以及用到了readable.push()方法,该方法的作用是将指定的数据添加到读取队列。

Writable流实现

我们只要继承Writable类并实现_write或_writev接口,如下所示(只使用两种方法):
/*class MyWritable extends Writable{constructor(){super();}_write(chunk,encoding,callback){process.stdout.write(chunk);callback();}}const myWritable = new MyWritable();*/const myWritable = new Writable({write(chunk,encoding,callback){process.stdout.write(chunk);callback();}});myWritable.on("finish",()=>{process.stdout.write("done");})myWritable.write("a");myWritable.write("b");myWritable.write("c");myWritable.end();
Duplex流实现

实现Duplex流,需要继承Duplex类,并实现_read和_write接口,如下所示:
class MyDuplex extends Duplex{constructor(){super();this.source = [];}_read(){if (!this.source.length) {this.push(null);} else {this.push(this.source.shift());}}_write(chunk,encoding,cb){this.source.push(chunk);cb();}}const myDuplex = new MyDuplex();myDuplex.on("finish",()=>{process.stdout.write("write done.")});myDuplex.on("end",()=>{process.stdout.write("read done.")});myDuplex.write("
a
");myDuplex.write("c
");myDuplex.end("b
");myDuplex.pipe(process.stdout);
上面的代码实现了_read()方法,可作为可读流来使用,同时实现了_write()方法,又可作为可写流来使用。

Transform流实现

实现Transform流,需要继承Transform类,并实现_transform接口,如下所示:
class MyTransform extends Transform{constructor(){super();}_transform(chunk, encoding, callback){chunk = (chunk+"").toUpperCase();callback(null,chunk);}}const myTransform = new MyTransform();myTransform.write("hello world!");myTransform.end();myTransform.pipe(process.stdout);
上面代码中的_transform()方法,其第一个参数,要么为error,要么为null,第二个参数将被自动转发给readable.push()方法,因此该方法也可以使用如下写法:
_transform(chunk, encoding, callback){chunk = (chunk+"").toUpperCase()this.push(chunk)callback();}
Object Mode流实现

我们知道流中的数据默认都是Buffer类型,可读流的数据进入流中便被转换成buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为buffer。但将构造函数的objectMode选择设置为true,便可产生原样的数据,如下所示:
const rs = Readable();rs.push("a");rs.push("b");rs.push(null);rs.on("data",(chunk)=>{console.log(chunk);});//<Buffer 61>与<Buffer 62>const rs1 = Readable({objectMode:!0});rs1.push("a");rs1.push("b");rs1.push(null);rs1.on("data",(chunk)=>{console.log(chunk);});//a与b
下面利用Transform流实现一个简单的CSS压缩工具,如下所示:
function minify(src,dest){const transform = new Transform({transform(chunk,encoding,cb){cb(null,(chunk.toString()).replace(/[s
	]/g,""));}});fs.createReadStream(src,{encoding:"utf8"}).pipe(transform).pipe(fs.createWriteStream(dest));}minify("./reset.css","./reset.min.css");
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。