文档章节

Node.js Design Patterns--3. Coding with Streams

fzyz_sb
 fzyz_sb
发布于 2017/01/17 15:06
字数 1096
阅读 28
收藏 0

流的重要性

针对Node.js平台来说, I/O的操作必须是实时的. 针对Stream来说, 只要数据有效则即可进行传输, 而非等到数据全部到达.

空间性能

Node.js默认最多能缓存0x3FFFFFFF字节(稍微小于1GB). 考虑我们将一个文件压缩:

var fs = require('fs');
var zlib = require('zlib');
var file = process.argv[2];

fs.readFile(file, (err, buffer) => {
  zlib.gzip(buffer, (err, buffer) => {
    fs.writeFile(file + '.gz', buffer, (err) => {
      console.log('File successfully compressed');
    });
  });
});

如果我们压缩一个大于1GB的文件, 则会报错:

RangeError: File size is greater than possible Buffer: 0x3FFFFFFF bytes

但是, 如果使用流做处理, 则不存在此问题:

var fs = require('fs');
var zlib = require('zlib');
var file = process.argv[2];

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream(file + '.gz'))
  .on('finish', function() {
    console.log('File successfully compressed');
  });

使用pipe一个重大的优势是: 可组合.

备注: 在Node.js版本7.1.0中, 我使用了1.74G的视频文件进行测试, 结果依旧压缩成功, 并不存在最大缓冲区的问题.

时间性能

考虑一个实际例子: 我们从客户端将文件压缩往服务端发送文件, 服务端解压后存储为同名文件.

服务端gzipReceive.js:

var http = require('http');
var fs = require('fs');
var zlib = require('zlib');
var server = http.createServer((req, res) => {
  var filename = req.headers.filename;
  console.log('File request received: ' + filename);
  req.pipe(zlib.createGunzip())
    .pipe(fs.createWriteStream(filename))
    .on('finish', () => {
      res.writeHead(201, {'Content-Type': 'text/plain'});
      res.end('That\'s it\n');
      console.log('File saved: ' + filename);
    });
});

server.listen(3000, () => {
  console.log('Listening');
});

客户端gzipSend.js:

var fs = require('fs');
var zlib = require('zlib');
var http = require('http');
var path = require('path');
var file = process.argv[2];
var server = process.argv[3];

var options = {
  hostname: server,
  port: 3000,
  path: '/',
  method: 'PUT',
  headers: {
    filename: path.basename(file),
    'Content-Type': 'application/octet-stream',
    'Content-Encoding': 'gzip'
  }
};

var req = http.request(options, (res) => {
  console.log('Server response: ' + res.statusCode);
});

fs.createReadStream(file)
  .pipe(zlib.createGzip())
  .pipe(req)
  .on('finish', () => {
    console.log('File successfully sent');
  });

服务端运行:

leicj@leicj:~/test$ node gzipReceive.js
Listening
File request received: test.txt
File saved: test.txt

客户端运行:

leicj@leicj:~/test$ node gzipSend.js ./file/test.txt localhost
File successfully sent
Server response: 201

考虑如果使用Buffer来传递文件, 那么必须得等文件全部读取完毕才进行传输, 而stream则可做到边读取边传输, 如下图所示:

 

流入门

Readable streams

读取流有两种基本的模式: non-flowing和flowing.

non-flowing模式

non-flowing最基本的就是读取特定字节的数据: readable.read([size])

process.stdin.on('readable', () => {
  var chunk;
  console.log('New data available');
  while ((chunk = process.stdin.read(200)) !== null) {
    console.log('Chunk read: (' + chunk.length + ')"' + chunk.toString() + '"');
  }
})
.on('end', () => {
  process.stdout.write('End of stream\n');
});

运行程序, 输出:

leicj@leicj:~/test$ cat test.js | node test.js
New data available
Chunk read: (200)"process.stdin.on('readable', () => {
  var chunk;
  console.log('New data available');
  while ((chunk = process.stdin.read(200)) !== null) {
    console.log('Chunk read: (' + chunk.length + ')"' + ch"
New data available
Chunk read: (95)"unk.toString() + '"');
  }
})
.on('end', () => {
  process.stdout.write('End of stream\n');
});"
End of stream

flow模式

flow模式最基本在于监听data来获取数据:

process.stdin.on('data', (chunk) => {
  console.log('New data available');
  console.log('Chunk read: (' + chunk.length + ')"' + chunk.toString() + '"');
})
.on('end', () => {
  process.stdout.write('End of stream\n');
});

运行程序, 输出:

leicj@leicj:~/test$ cat test.js | node test.js
New data available
Chunk read: (222)"process.stdin.on('data', (chunk) => {
  console.log('New data available');
  console.log('Chunk read: (' + chunk.length + ')"' + chunk.toString() + '"');
})
.on('end', () => {
  process.stdout.write('End of stream\n');
});"
End of stream

实现一个Readable stream

var Readable = require('stream').Readable;
class MyReadable extends Readable {
  constructor(options) {
    super(options);
  }
  _read(size) {
    this.push('hello');
    this.push(null);
  }
}

var myReadable = new MyReadable();
myReadable.pipe(process.stdout);

Writable streams

Writable streams中用于写入的函数格式为:

writable.write(chunk, [encoding], [callback]);

而如果没有数据可写入, 则调用:

writable.end([chunk], [encoding], [callback]);

实现一个Writable streams:

var Writable = require('stream').Writable;

class MyWritable extends Writable {
  constructor(options) {
    super(options);
  }
  _write(chunk, encoding, callback) {
    process.stdout.write('output: ' + chunk.toString());
    callback();
  }
}

var myWritable = new MyWritable();
process.stdin.pipe(myWritable);

Duplex streams

var Duplex = require('stream').Duplex;

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.waiting = false;
  }
  _read(size) {
    if (!this.waiting) {
      this.push('Feed me data! >');
      this.waiting = true;
    }
  }
  _write(chunk, encoding, cb) {
    this.waiting = false;
    this.push(chunk);
    cb();
  }
}

var myDuplex = new MyDuplex();
process.stdin.pipe(myDuplex).pipe(process.stdout);

Transform streams

var Transform = require('stream').Transform;

class MyTransform extends Transform {
  constructor(options) {
    super(options);
  }
  _transform(data, encoding, callback) {
    this.push('output:' + data);
    callback();
  }
}

var myTransform = new MyTransform();
process.stdin.pipe(myTransform).pipe(process.stdout);

 

使用流进行异步控制

Sequential execution

在编写类似Writable/Duplex/Transform的情况下, _transform需要执行callback才能继续下去, 这样可以保证所处理的流是顺序的.

下例为顺序读取多个文件, 写入到目标文件中:

var fs = require('fs');

function concatFile(desc, files, cb) {
  if (files.length === 0) return cb();
  var input = fs.createReadStream(files[0]);
  var output = fs.createWriteStream(desc, {flags: 'a'});
  input.pipe(output);
  output.on('finish', (err) => {
    if (err) return console.error(err);
    concatFile(desc, files.slice(1), cb);
  });
}

var desc = process.argv[2];
var files = process.argv.slice(3);

concatFile(desc, files, () => {
  console.log('File concatenated successfully');
});

 

Unordered parallel execution

考虑如下特殊的情况, 我们读取多个文件, 并且输出, 但我们并不考虑其顺序问题:

var fs = require('fs');
var files = process.argv.slice(2);

files.forEach((item) => {
  fs.createReadStream(item).pipe(process.stdout);
});

如果测试的文件过小, 则还是顺序输出. 当文件过大时候, 会发现多个文件混合输出到终端.

Unordered limited parallel execution

 

Piping模式

Combining streams

 

Forking streams

 

Merging streams

 

Multiplexing and demultiplexing

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

© 著作权归作者所有

fzyz_sb
粉丝 415
博文 209
码字总数 447144
作品 0
武汉
程序员
私信 提问
Node.js Streams 基础总结

前段时间遇到项目上需要请求资源方获取opus编码的音频文件,然后置入ogg容器中传输给前端标准化播放器进行播放的需求。流程模式是,通过服务上建立的socket连接不断接收资源方传送的文件块。...

Azurewarth
2018/12/05
0
0
nodejs Stream使用手册

介绍 本文介绍了使用 node.js streams 开发程序的基本方法。 ![cc-by-3.0" alt="cc-by-3.0">![cc-by-3.0" /> Doug McIlroy. October 11, 1964 *** 最早接触Stream是从早期的unix开始的数十年......

frank21
2014/01/11
5.2K
0
Node v6.9.1 (LTS) 发布,Javascript 运行环境

Node v6.9.1 (LTS) 发布了。Node.js 是一套用来编写高性能网络服务器的 JavaScript 工具包,Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境。本次更新如下: 值得关注的更新 stre...

局长
2016/10/20
3.2K
13
Node.js v0.12.0 (Stable) 发布

Node.js v0.12.0 (Stable) 发布,此版本相比 0.10 版本有较大的改进,详细改进请看 wiki。请注意,此版本包括 API 更新,需要更新大量依赖。 此版本值得关注的特性如下: Streams 3 The Stre...

oschina
2015/02/07
5.2K
19
Node.js 8.1.3 发布,JavaScript 运行时

Node.js 8.1.3 已发布,Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境,它使用了一个事件驱动、非阻塞式 I/O 的模型,使其轻量又高效。 值得关注的变更 Stream:已经修复了 模块...

局长
2017/06/30
1K
5

没有更多内容

加载失败,请刷新页面

加载更多

mysql概览

学习知识,首先要有一个总体的认识。以下为mysql概览 1-架构图 2-Detail csdn |简书 | 头条 | SegmentFault 思否 | 掘金 | 开源中国 |

程序员深夜写bug
39分钟前
2
0
golang微服务框架go-micro 入门笔记2.2 micro工具之微应用利器micro web

micro web micro 功能非常强大,本文将详细阐述micro web 命令行的功能 阅读本文前你可能需要进行如下知识储备 golang分布式微服务框架go-micro 入门笔记1:搭建go-micro环境, golang微服务框架...

非正式解决方案
今天
3
0
前端——使用base64编码在页面嵌入图片

因为页面中插入一个图片都要写明图片的路径——相对路径或者绝对路径。而除了具体的网站图片的图片地址,如果是在自己电脑文件夹里的图片,当我们的HTML文件在别人电脑上打开的时候图片则由于...

被毒打的程序猿
今天
2
0
Flutter 系列之Dart语言概述

Dart语言与其他语言究竟有什么不同呢?在已有的编程语言经验的基础上,我们该如何快速上手呢?本篇文章从编程语言中最重要的组成部分,也就是基础语法与类型变量出发,一起来学习Dart吧 一、...

過愙
今天
2
0
rime设置为默认简体

转载 https://github.com/ModerRAS/ModerRAS.github.io/blob/master/_posts/2018-11-07-rime%E8%AE%BE%E7%BD%AE%E4%B8%BA%E9%BB%98%E8%AE%A4%E7%AE%80%E4%BD%93.md 写在开始 我的Arch Linux上......

zhenruyan
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部