nodejs Stream使用中的陷阱

原创
2014/01/02 11:43
阅读数 8.6K

      最近公司有个专供下载文件的http服务器出现了内存泄露的问题,该服务器是用node写的,后来测试发现只有在下载很大文件的时候才会出现内存泄露的情况。最后干脆抓了一个profile看看,发现有很多等待发送的buff占用着内存,我的profile如下(怎么抓取profile,大家可以google一下):   

于是查看了一下发送数据的代码,如下:     

    var fReadStream = fs.createReadStream(filename);
    fReadStream.on('data', function (chunk) {
        res.write(chunk);
    });
    fReadStream.on('end', function () {
        res.end();
    });

开始觉得没有什么问题,于是在google上查了一下node http处理大文件的方法,结果发现有人使用pipe方法,于是将代码修改如下:  

    var fReadStream = fs.createReadStream(filename);
    fReadStream.pipe(res)

测试了一下,发现OK,但是还是不明白为什么会这样,于是研究一个一下pipe方法的代码,发现pipe有如下代码:

function pipeOnDrain(src) {//可写流可以执行写操作
  return function() {
    var dest = this;
    var state = src._readableState;
    state.awaitDrain--;
    if (state.awaitDrain === 0)
      flow(src);//写数据
  };
}

function flow(src) {//写操作函数
  var state = src._readableState;
  var chunk;
  state.awaitDrain = 0;

  function write(dest, i, list) {
    var written = dest.write(chunk);
    if (false === written) {//判断写数据是否成功
      state.awaitDrain++;//计数器
    }
  }

  while (state.pipesCount && null !== (chunk = src.read())) {

    if (state.pipesCount === 1)
      write(state.pipes, 0, null);
    else
      state.pipes.forEach(write);

    src.emit('data', chunk);

    // if anyone needs a drain, then we have to wait for that.
    if (state.awaitDrain > 0)
      return;
  }

  // if every destination was unpiped, either before entering this
  // function, or in the while loop, then stop flowing.
  //
  // NB: This is a pretty rare edge case.
  if (state.pipesCount === 0) {
    state.flowing = false;

    // if there were data event listeners added, then switch to old mode.
    if (EE.listenerCount(src, 'data') > 0)
      emitDataEvents(src);
    return;
  }

  // at this point, no one needed a drain, so we just ran out of data
  // on the next readable event, start it over again.
  state.ranOut = true;
}

原来pipe方法每次写数据的时候,都会判断是否写成功,如果写失败,会等待可写流触发"drain"事件,表示可写流可以继续写数据了,然后pipe才会继续写数据。

     这下明白了,我们第一次使用的代码没有判断res.write(chunk)是否执行成功,就继续写,这样如果文件比较大,而可写流的写速度比较慢的话,会导致大量的buff缓存在内存中,就会导致内存撑爆的情况。

总结:

      在使用流的过程中,一定要注意可读流和可写流读和写之间的平衡,负责会导致内存泄露,而pipe就实现了这样的功能。稍微研究了一下文档,发现stream类有pause()和resume()两个方法,这样的话我们也可以自己控制读写的平衡。代码如下:

var http = require("http");
var fs = require("fs");
var filename = "file.iso";

var serv = http.createServer(function (req, res) {
    var stat = fs.statSync(filename);
    res.writeHeader(200, {"Content-Length": stat.size});
    var fReadStream = fs.createReadStream(filename);
    fReadStream.on('data', function (chunk) {
        if(!res.write(chunk)){//判断写缓冲区是否写满(node的官方文档有对write方法返回值的说明)
            fReadStream.pause();//如果写缓冲区不可用,暂停读取数据
        }
    });
    fReadStream.on('end', function () {
        res.end();
    });
    res.on("drain", function () {//写缓冲区可用,会触发"drain"事件
        fReadStream.resume();//重新启动读取数据
    });
});

serv.listen(8888);


展开阅读全文
打赏
5
28 收藏
分享
加载中
frank21博主

引用来自“左岸佐”的评论

fReadStream.on('data', function (chunk) {
if(!res.write(chunk)){//判断写缓冲区是否写满(node的官方文档有对write方法返回值的说明)
fReadStream.pause();//如果写缓冲区不可用,暂停读取数据
}
});
问个问题,如果写缓存区满了,当前读取的chunk已经有了,又没有写成功,是不是就丢了
好问题,关于这个问题可以看看nodejs的官方文档 https://nodejs.org/api/stream.html#stream_class_stream_writable

The return value is true if the internal buffer is less than the highWaterMark configured when the stream was created after admitting chunk. If false is returned, further attempts to write data to the stream should stop until the 'drain' event is emitted.


大概的意思是"如果chunk被缓存以后,发现缓冲区超过预先设置的值,就会返回false", 而且按照文档来说,write永远不会丢掉数据,只会不断的往缓冲区里写数据,无论缓冲区是否超过设定的值,所以才会造成内存问题。
2018/02/02 17:46
回复
举报
fReadStream.on('data', function (chunk) {
if(!res.write(chunk)){//判断写缓冲区是否写满(node的官方文档有对write方法返回值的说明)
fReadStream.pause();//如果写缓冲区不可用,暂停读取数据
}
});
问个问题,如果写缓存区满了,当前读取的chunk已经有了,又没有写成功,是不是就丢了
2018/02/02 15:58
回复
举报
学习了, 谢谢👍
2017/01/08 21:17
回复
举报
frank21博主

引用来自“苏生不惑”的评论

可以完全代替on data吗

我修改了一下博文,最后我添加了自己控制读写平衡的方法。
2014/01/03 09:16
回复
举报
写的不错学习一下。很多人是用node的时候,直接从书上抄到一些sample代码,并未认真研究,所以考虑不够周全。认真研究文档还是很需要滴啊。
2014/01/02 20:41
回复
举报
可以完全代替on data吗
2014/01/02 20:16
回复
举报

引用来自“dqsun”的评论

引用来自“ajie0112”的评论

79写的不错!我们的项目正在进行中,后面也需要进行性能测试,我对你抓取profile的过程很感兴趣,这种图形化的展示效果请问是用什么工具分析的?

首先 npm -g install heapdump,然后再你的代码中添加 var heapdump = require('heapdump');,然后运行你的代码,给你的进程发送USR2(kill -USR2 pid)信号即可产生heapdump文件,载入chrom浏览器开发工具即可看见上面的效果。

谢谢!
2014/01/02 16:19
回复
举报
frank21博主

引用来自“ajie0112”的评论

79写的不错!我们的项目正在进行中,后面也需要进行性能测试,我对你抓取profile的过程很感兴趣,这种图形化的展示效果请问是用什么工具分析的?

首先 npm -g install heapdump,然后再你的代码中添加 var heapdump = require('heapdump');,然后运行你的代码,给你的进程发送USR2(kill -USR2 pid)信号即可产生heapdump文件,载入chrom浏览器开发工具即可看见上面的效果。
2014/01/02 16:07
回复
举报
79写的不错!我们的项目正在进行中,后面也需要进行性能测试,我对你抓取profile的过程很感兴趣,这种图形化的展示效果请问是用什么工具分析的?
2014/01/02 15:48
回复
举报
更多评论
打赏
9 评论
28 收藏
5
分享
返回顶部
顶部