Node.js in Practice总结2

原创
2016/12/17 11:36
阅读数 411

Streams

Streams概述

什么时候使用Streams

无论我们使用fs.readFileSync还是fs.readFile进行文件的读取, 我们都要将文件全部读取进内存中. 如果文件特别的大, 那么解决方案应该是对大数据一块一块的读取, 即读取完一块数据后, 要求继续读取下一块数据.

屏幕快照 2016-11-20 下午1.52.07

内建Streams

读取单个文件

假设我们使用fs.readFile进行单文件的读取:

var fs = require('fs');

fs.readFile(__filename, (err, chunk) => {
  if (err) return console.error(err);
  if (chunk) {
    console.log(chunk.toString());
  }
});

如果文件过大, 甚至文件大于0x3FFFFFFF(Node.js最大的缓存大小)怎么办? 这时候我们应该使用流来处理数据.

var fs = require('fs');

fs.createReadStream(__filename).pipe(process.stdout);

错误处理

因为stream是继承于EventEmitter, 所以它同样监听error事件, 用于错误处理.

var fs = require('fs');

var stream = fs.createReadStream('not-found');

stream.on('error', (err) => {
  console.trace();
  console.error('Stack:', err.stack);
  console.error('The error raised was:', err);
});

使用Stream base classes

正确的继承stream base classes

Readable: 输入流

Writable: 输出流

Transform: 解析数据时候改变数据

Duplex: 输入输出流

PassThrough: 测试, 分析, 检查数据

实现一个readable stream

我们可以通过继承于stream.Readable并且实现_read(size)方法, 来实现一个readable.stream.

在_read中, 需要执行push将数据读取出, 终止读取则push(null).

var fs = require('fs');
var ReadStream = require('stream').Readable;

class MyRead extends ReadStream {
  constructor(options) {
    super(options);
  }
  _read(size) {
    this.push('hello\n');
    this.push(null);
  }
}

var myRead = new MyRead();
myRead.pipe(process.stdout);

 

实现一个writable stream

通过继承stream.Writable并实现_write方法, 来实现一个输出流.

var fs = require('fs');
var WriteStream = require('stream').Writable;

class MyWrite extends WriteStream {
  constructor(options) {
    super(options);
  }
  _write(chunk, encoding, cb) {
    process.stdout.write(chunk);
    cb();
  }
}

var myWrite = new MyWrite();
process.stdin.pipe(myWrite);

实现一个duplex streams

继承stream.Duplex并且实现_read/_write方法.

var fs = require('fs');
var Duplex = require('stream').Duplex;

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    this.waiting = false;
  }
  _read(size) {
    if (!this.waiting) {
      this.push('Feed me data! >');
      this.waiting = true;
    }
  }
  _write(chunk, encoding, cb) {
    this.waiting = false;
    this.push(chunk);
    cb();
  }
}

var myDuplex = new MyDuplex();

process.stdin.pipe(myDuplex).pipe(process.stdout);

实现一个transform streams

继承stream.Transform并实现_transform方法.

var fs = require('fs');
var Transform = require('stream').Transform;

class MyTransform extends Transform {
  constructor(options) {
    super(options);
  }
  _transform(chunk, encoding, cb) {
    this.push(chunk + ':::\n');
    cb();
  }
}

var myTransform = new MyTransform();

process.stdin.pipe(myTransform).pipe(process.stdout);

文件系统: 同步和异步操作文件

fs模块概述

fs实际上是POSIX的包装模块:

屏幕快照 2016-11-20 下午7.15.38

POSIX方法提供了一些底层的操作, 例如:

var fs = require('fs');
var assert = require('assert');

var fd = fs.openSync('./file.txt', 'w+');
var writeBuf = Buffer.from('some data to write');
fs.writeSync(fd, writeBuf, 0, writeBuf.length, 0);

var readBuf = Buffer.alloc(writeBuf.length);
fs.readSync(fd, readBuf, 0, writeBuf.length, 0);
assert.equal(writeBuf.toString(), readBuf.toString());

fs.closeSync(fd);

Streaming

fs模块提供fs.createReadStream/fs.createWriteStream功能模块. 它们分别可创建一个输入输出流, 例如可用于pipe.

var fs = require('fs');

var read = fs.createReadStream('./file.txt');
var write = fs.createWriteStream('./out.txt');
read.pipe(write);

Bulk file I/O

fs模块也提供了fs.readFile/fs.writeFile/fs.appendFile, 用于将文件全部读取.

var fs = require('fs');
fs.readFile('./file.txt', function(err, buf) {
  console.log(buf.toString());
});

File watching

fs模块提供了fs.watch/fa.watchFile, 来观察文件是否被改变.

文件操作

同步异步操作文件

考虑存在config.json文件, 我们分别使用同步和异步读取文件:

var fs = require('fs');
// 异步
fs.readFile('./config.json', function(err, buf) {
  if (err) throw err;
  var config = JSON.parse(buf.toString());
  doThisThing(config);
});

// 同步
try {
  var config = JSON.parse(fs.readFileSync('./config.json').toString());
  doThisThing(config);
} catch (err) {
  console.error(err);
}

备注: try/catch不可用于异步编程.

文件描述符的使用

任何一个文件操作, 都可通过文件描述符来操作, 默认情况下, 标准输入, 输出和错误分别对应0,1,2.

var fs = require('fs');
var fd = fs.openSync('./file.txt', 'r');
var buf = fs.readFileSync(fd);
console.log(buf.toString());
fs.closeSync(fd);

文件锁的操作

Node.js并未原生支持文件锁的操作. 一般我们可以通过以下两种方法达到类似锁的操作: 使用exclusive标志来创建锁文件, 使用mkdir来创建锁文件.

var fs = require('fs');
fs.open('file.txt', 'wx', function(err) {
  if (err) return console.error(err);
});

存在'x'标志情况下, 如果文件存在, 则抛出异常.

同理, 我们可以创建一个之前不存在的新目录, 在新目录下进行文件的读写, 也可以达到类似的锁功能.

文件递归操作

可查看fs的API, 通过readdir来读取文件夹, 通过fs.stat来判断当前路径为文件还是目录.

var fs = require('fs');
function readFileName(filename, dir_path) {
  fs.stat(dir_path + '/' + filename, (err, stats) => {
    if (err) return console.error(err);
    if (stats.isFile()) console.log(dir_path + '/' + filename);
    else if (stats.isDirectory()) {
      fs.readdir(dir_path + '/' + filename, (err, files) => {
        if (err) return console.error(err);
        for (let i = 0; i < files.length; i++) {
          readFileName(files[i], dir_path + '/' + filename);
        }
      });
    }
  });
}

readFileName('bin', '/usr');

观察文件/文件夹的变动

可使用fs.watch/fs.watchFile来观察文件/文件夹是否变动.

var fs = require('fs');

fs.watch('./watchdir', console.log);
fs.watchFile('./watchdir', console.log);

打开两个窗口, 一个运行上例代码, 一个在watchdir目录内执行touch/mv等操作, 则可看到效果.

网络

网络概述

基本网络层次

屏幕快照 2016-11-25 下午7.14.31

TCP/IP

在IP协议中, 一个host由一个IP地址标识. 在Node.js中由net模块产生TCP连接.

但IP协议不能保证数据传输的完整性, 所以需要加入TCP传输协议.

UDP

数据包是UDP中基本的单元. UDP是不能保证数据传输的完整性.

TCP客户端和服务端

在Node.js中, 创建服务端很简单, 使用net模块, 创建服务器, 监听端口即可:

var net = require('net');
var clients = 0;

var server = net.createServer((client) => {
  clients++;
  var clientId = clients;
  console.log('Client connected:', clientId);

  client.on('end', () => {
    console.log('Client disconnected:', clientId);
  });

  client.write('Welcome client:' + clientId + '\r\n');
  client.pipe(client);
});

server.listen(8000, () => {
  console.log('Server started on port 8000');
});

这里简单介绍一下可能难理解的一段代码:

client.pipe(client);

因为TCP/IP是一个duplex, 即既可输入, 又可输出的stream.

leicj@leicj:~/test$ telnet localhost 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome client:1
hello world
hello world
i love this wrold
i love this wrold

如果我们想要编写一个低时延的实时应用, 则我们需要socket.setNoDelay()去开启TCP_NODELAY.

由于Nagle's算法的存在, 小数据包会累积成大的数据包, 然后发送出去.

var net = require('net');

var server = net.createServer((c) => {
  // 设置非延时
  c.setNoDelay(true);
  c.write('377375042377373001', 'binary');
  console.log('server connected');
  c.on('end', () => {
    console.log('server disconnected');
    // 当没有客户端连接时候, 自动关闭
    server.unref();
  });
  c.on('data', (data) => {
    process.stdout.write(data.toString());
    c.write(data.toString());
  });
});

server.listen(8000, () => {
  console.log('Server started on port 8000');
});

UDP客户端和服务端

我们尝试使用UDP进行文件的传输. 我们可以使用dgram模块来创建数据包, 通过socket.send来发送数据.

var dgram = require('dgram'),
  fs = require('fs'),
  port = 41230,
  defaultSize = 16;

function Client(remoteIP) {
  var inStream = fs.createReadStream(__filename),
    socket = dgram.createSocket('udp4');

  inStream.on('readable', () => {
    sendData();
  });

  function sendData() {
    var msg = inStream.read(defaultSize);
    if (!msg) return socket.unref();
    socket.send(msg, 0, msg.length, port, remoteIP, (err, bytes) => {
      sendData();
    });
  }
}

function Server() {
  var socket = dgram.createSocket('udp4');

  socket.on('message', (msg, rinfo) => {
    process.stdout.write(msg.toString());
  });

  socket.on('listening', () => {
    console.log('Server ready:', socket.address());
  });

  socket.bind(port);
}

if (process.argv[2] === 'client') {
  new Client(process.argv[3]);
} else {
  new Server();
}

对上述代码进行简单的解释:

  1. 对于UDP来说, 使用dgram.createSocket('udp4')创建一个socket对象, 使用socket.sendData进行数据的传输.
  2. 作为服务器的socket, 则不仅仅需要绑定端口(socket.bind(port)), 还会监听两个信息: message为接收数据时候的事件, 而listening为服务器做好接收数据.
展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部