文档章节

[转]Nodejs进程间通信

o
 osc_y8yehimr
发布于 2019/03/20 13:52
字数 3083
阅读 35
收藏 0

钉钉、微博极速扩容黑科技,点击观看阿里云弹性计算年度发布会!>>>

本文转自:http://www.cnblogs.com/rubyxie/articles/8949417.html

一.场景

Node运行在单线程下,但这并不意味着无法利用多核/多机下多进程的优势

事实上,Node最初从设计上就考虑了分布式网络场景:

Node is a single-threaded, single-process system which enforces shared-nothing design with OS process boundaries. It has rather good libraries for networking. I believe this to be a basis for designing very large distributed programs. The “nodes” need to be organized: given a communication protocol, told how to connect to each other. In the next couple months we are working on libraries for Node that allow these networks.

P.S.关于Node之所以叫Node,见 Why is Node.js named Node.js?

二.创建进程

通信方式与进程产生方式有关,而Node有4种创建进程的方式: spawn() , exec() , execFile() 和 fork()

spawn

const { spawn } = require('child_process'); const child = spawn('pwd'); // 带参数的形式 // const child = spawn('find', ['.', '-type', 'f']);

spawn() 返回 ChildProcess 实例, ChildProcess 同样基于事件机制(EventEmitter API),提供了一些事件:

  • exit :子进程退出时触发,可以得知进程退出状态( code 和 signal 

  • disconnect :父进程调用 child.disconnect() 时触发

  • error :子进程创建失败,或被 kill 时触发

  • close :子进程的 stdio 流(标准输入输出流)关闭时触发

  • message :子进程通过 process.send() 发送消息时触发,父子进程之间可以通过这种 内置的消息机制通信

可以通过 child.stdin , child.stdout 和 child.stderr 访问子进程的 stdio 流,这些流被关闭的时,子进程会触发 close 事件

P.S. close 与 exit 的区别主要体现在多进程共享同一 stdio 流的场景,某个进程退出了并不意味着 stdio 流被关闭了

在子进程中, stdout/stderr 具有Readable特性,而 stdin 具有Writable特性, 与主进程的情况正好相反 

child.stdout.on('data', (data) => { console.log(`child stdout:\n${data}`); }); child.stderr.on('data', (data) => { console.error(`child stderr:\n${data}`); });

利用进程 stdio 流的管道特性,就可以完成更复杂的事情,例如:

const { spawn } = require('child_process'); const find = spawn('find', ['.', '-type', 'f']); const wc = spawn('wc', ['-l']); find.stdout.pipe(wc.stdin); wc.stdout.on('data', (data) => { console.log(`Number of files ${data}`); });

作用等价于 find . -type f | wc -,递归统计当前目录文件数量

IPC选项

另外,通过 spawn() 方法的 stdio 选项可以建立IPC机制:

const { spawn } = require('child_process'); const child = spawn('node', ['./ipc-child.js'], { stdio: [null, null, null, 'ipc'] }); child.on('message', (m) => { console.log(m); }); child.send('Here Here'); // ./ipc-child.js process.on('message', (m) => { process.send(`< ${m}`); process.send('> 不要回答x3'); });

关于 spawn() 的IPC选项的详细信息,请查看 options.stdio

exec

spawn() 方法默认不会创建shell去执行传入的命令(所以 性能上稍微好一点 ),而 exec() 方法会创建一个shell。另外, exec() 不是基于stream的,而是把传入命令的执行结果暂存到buffer中,再整个传递给回调函数

exec() 方法的特点是 完全支持shell语法 ,可以直接传入任意shell脚本,例如:

const { exec } = require('child_process'); exec('find . -type f | wc -l', (err, stdout, stderr) => { if (err) { console.error(`exec error: ${err}`); return; } console.log(`Number of files ${stdout}`); });

但 exec() 方法也因此存在 命令注入 的安全风险,在含有用户输入等动态内容的场景要特别注意。所以, exec() 方法的适用场景是:希望直接使用shell语法,并且预期输出数据量不大(不存在内存压力)

那么,有没有既支持shell语法,还具有stream IO优势的方式?

有。 两全其美的方式 如下:

const { spawn } = require('child_process'); const child = spawn('find . -type f | wc -l', { shell: true }); child.stdout.pipe(process.stdout);

开启 spawn() 的 shell 选项,并通过 pipe() 方法把子进程的标准输出简单地接到当前进程的标准输入上,以便看到命令执行结果。实际上还有更容易的方式:

const { spawn } = require('child_process'); process.stdout.on('data', (data) => { console.log(data); }); const child = spawn('find . -type f | wc -l', { shell: true, stdio: 'inherit' });

stdio: 'inherit' 允许子进程继承当前进程的标准输入输出(共享 stdin , stdout 和 stderr ),所以上例能够通过监听当前进程 process.stdout 的 data 事件拿到子进程的输出结果

另外,除了 stdio 和 shell 选项, spawn() 还支持一些其它选项,如:

const child = spawn('find . -type f | wc -l', {  stdio: 'inherit',  shell: true, // 修改环境变量,默认process.env  env: { HOME: '/tmp/xxx' }, // 改变当前工作目录  cwd: '/tmp', // 作为独立进程存在  detached: true });

注意 , env 选项除了以环境变量形式向子进程传递数据外,还可以用来实现沙箱式的环境变量隔离,默认把 process.env 作为子进程的环境变量集,子进程与当前进程一样能够访问所有环境变量,如果像上例中指定自定义对象作为子进程的环境变量集,子进程就无法访问其它环境变量

所以,想要增/删环境变量的话,需要这样做:

var spawn_env = JSON.parse(JSON.stringify(process.env)); // remove those env vars delete spawn_env.ATOM_SHELL_INTERNAL_RUN_AS_NODE; delete spawn_env.ELECTRON_RUN_AS_NODE; var sp = spawn(command, ['.'], {cwd: cwd, env: spawn_env});

detached 选项更有意思:

const { spawn } = require('child_process'); const child = spawn('node', ['stuff.js'], { detached: true, stdio: 'ignore' }); child.unref();

以这种方式创建的独立进程行为取决于操作系统,Windows上detached子进程将拥有自己的console窗口,而Linux上该进程会 创建新的process group (这个特性可以用来管理子进程族,实现类似于 tree-kill 的特性)

unref() 方法用来断绝关系,这样“父”进程可以独立退出(不会导致子进程跟着退出),但要注意这时子进程的 stdio 也应该独立于“父”进程,否则“父”进程退出后子进程仍会受到影响

execFile

const { execFile } = require('child_process'); const child = execFile('node', ['--version'], (error, stdout, stderr) => { if (error) { throw error; } console.log(stdout); });

与 exec() 方法类似,但不通过shell来执行(所以性能稍好一点),所以要求传入 可执行文件 。Windows下某些文件无法直接执行,比如 .bat 和 .cmd ,这些文件就不能用 execFile() 来执行,只能借助 exec() 或开启了 shell 选项的 spawn()

P.S.与 exec() 一样也 不是基于stream的 ,同样存在输出数据量风险

xxxSync

spawn , exec 和 execFile 都有对应的同步阻塞版本,一直等到子进程退出

const { spawnSync, execSync, execFileSync, } = require('child_process');

同步方法用来简化脚本任务,比如启动流程,其它时候应该避免使用这些方法

fork

fork() 是 spawn() 的变体,用来创建Node进程,最大的特点是父子进程自带通信机制(IPC管道):

The child_process.fork() method is a special case of child_process.spawn() used specifically to spawn new Node.js processes. Like child_process.spawn(), a ChildProcess object is returned. The returned ChildProcess will have an additional communication channel built-in that allows messages to be passed back and forth between the parent and child. See subprocess.send() for details.

例如:

var n = child_process.fork('./child.js'); n.on('message', function(m) { console.log('PARENT got message:', m); }); n.send({ hello: 'world' }); // ./child.js process.on('message', function(m) { console.log('CHILD got message:', m); }); process.send({ foo: 'bar' });

因为 fork() 自带通信机制的优势,尤其适合用来拆分耗时逻辑,例如:

const http = require('http'); const longComputation = () => { let sum = 0; for (let i = 0; i < 1e9; i++) { sum += i; }; return sum; }; const server = http.createServer(); server.on('request', (req, res) => { if (req.url === '/compute') { const sum = longComputation(); return res.end(`Sum is ${sum}`); } else { res.end('Ok') } }); server.listen(3000);

这样做的致命问题是一旦有人访问 /compute ,后续请求都无法及时处理,因为事件循环还被 longComputation 阻塞着,直到耗时计算结束才能恢复服务能力

为了避免耗时操作阻塞主进程的事件循环,可以把 longComputation() 拆分到子进程中:

// compute.js
const longComputation = () => { let sum = 0; for (let i = 0; i < 1e9; i++) { sum += i; }; return sum; }; // 开关,收到消息才开始做 process.on('message', (msg) => { const sum = longComputation(); process.send(sum); });

主进程开启子进程执行 longComputation 

const http = require('http'); const { fork } = require('child_process'); const server = http.createServer(); server.on('request', (req, res) => { if (req.url === '/compute') { const compute = fork('compute.js'); compute.send('start'); compute.on('message', sum => { res.end(`Sum is ${sum}`); }); } else { res.end('Ok') } }); server.listen(3000);

主进程的事件循环不会再被耗时计算阻塞,但进程数量还需要进一步限制,否则资源被进程消耗殆尽时服务能力仍会受到影响

P.S.实际上, cluster 模块就是对多进程服务能力的封装, 思路与这个简单示例类似

三.通信方式

1.通过stdin/stdout传递json

stdin/stdout and a JSON payload

最直接的通信方式,拿到子进程的handle后,可以访问其 stdio 流,然后约定一种 message 格式开始愉快地通信:

const { spawn } = require('child_process'); child = spawn('node', ['./stdio-child.js']); child.stdout.setEncoding('utf8'); // 父进程-发 child.stdin.write(JSON.stringify({ type: 'handshake', payload: '你好吖' })); // 父进程-收 child.stdout.on('data', function (chunk) { let data = chunk.toString(); let message = JSON.parse(data); console.log(`${message.type} ${message.payload}`); });

子进程与之类似:

// ./stdio-child.js
// 子进程-收 process.stdin.on('data', (chunk) => { let data = chunk.toString(); let message = JSON.parse(data); switch (message.type) { case 'handshake': // 子进程-发 process.stdout.write(JSON.stringify({ type: 'message', payload: message.payload + ' : hoho' })); break; default: break; } });

P.S.VS Code进程间通信就采用了这种方式,具体见 access electron API from vscode extension

明显的 限制 是需要拿到“子”进程的handle,两个完全独立的进程之间无法通过这种方式来通信(比如跨应用,甚至跨机器的场景)

P.S.关于stream及pipe的详细信息,请查看 Node中的流

2.原生IPC支持

如 spawn() 及 fork() 的例子,进程之间可以借助内置的IPC机制通信

父进程:

  • process.on('message') 

  • child.send() 

子进程:

  • process.on('message') 

  • process.send() 

限制同上,同样要有一方能够拿到另一方的handle才行

3.sockets

借助网络来完成进程间通信, 不仅能跨进程,还能跨机器

node-ipc 就采用这种方案,例如:

// server
const ipc=require('../../../node-ipc'); ipc.config.id = 'world'; ipc.config.retry= 1500; ipc.config.maxConnections=1; ipc.serveNet( function(){ ipc.server.on( 'message', function(data,socket){ ipc.log('got a message : ', data); ipc.server.emit( socket, 'message', data+' world!' ); } ); ipc.server.on( 'socket.disconnected', function(data,socket){ console.log('DISCONNECTED\n\n',arguments); } ); } ); ipc.server.on( 'error', function(err){ ipc.log('Got an ERROR!',err); } ); ipc.server.start(); // client const ipc=require('node-ipc'); ipc.config.id = 'hello'; ipc.config.retry= 1500; ipc.connectToNet( 'world', function(){ ipc.of.world.on( 'connect', function(){ ipc.log('## connected to world ##', ipc.config.delay); ipc.of.world.emit( 'message', 'hello' ); } ); ipc.of.world.on( 'disconnect', function(){ ipc.log('disconnected from world'); } ); ipc.of.world.on( 'message', function(data){ ipc.log('got a message from world : ', data); } ); } );

P.S.更多示例见 RIAEvangelist/node-ipc

当然,单机场景下通过网络来完成进程间通信有些浪费性能,但网络通信的 优势 在于跨环境的兼容性与更进一步的RPC场景

4.message queue

父子进程都通过外部消息机制来通信,跨进程的能力取决于MQ支持

即进程间不直接通信,而是通过中间层(MQ), 加一个控制层 就能获得更多灵活性和优势:

  • 稳定性:消息机制提供了强大的稳定性保证,比如确认送达(消息回执ACK),失败重发/防止多发等等

  • 优先级控制:允许调整消息响应次序

  • 离线能力:消息可以被缓存

  • 事务性消息处理:把关联消息组合成事务,保证其送达顺序及完整性

P.S.不好实现?包一层能解决吗,不行就包两层……

比较受欢迎的有 smrchy/rsmq ,例如:

// init
RedisSMQ = require("rsmq"); rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} ); // create queue rsmq.createQueue({qname:"myqueue"}, function (err, resp) { if (resp===1) { console.log("queue created") } }); // send message rsmq.sendMessage({qname:"myqueue", message:"Hello World"}, function (err, resp) { if (resp) { console.log("Message sent. ID:", resp); } }); // receive message rsmq.receiveMessage({qname:"myqueue"}, function (err, resp) { if (resp.id) { console.log("Message received.", resp) } else { console.log("No messages for me...") } });

会起一个Redis server,基本原理如下:

Using a shared Redis server multiple Node.js processes can send / receive messages.

消息的收/发/缓存/持久化依靠Redis提供的能力,在此基础上实现完整的队列机制

5.Redis

基本思路与message queue类似:

Use Redis as a message bus/broker.

Redis自带 Pub/Sub机制 (即发布-订阅模式),适用于简单的通信场景,比如一对一或一对多并且 不关注消息可靠性 的场景

另外,Redis有list结构,可以用作消息队列,以此提高消息可靠性。一般做法是生产者 LPUSH 消息,消费者 BRPOP 消息。适用于要求消息可靠性的简单通信场景,但缺点是消息不具状态,且没有ACK机制,无法满足复杂的通信需求

P.S.Redis的Pub/Sub示例见 What’s the most efficient node.js inter-process communication library/method?

四.总结

Node进程间通信有4种方式:

  • 通过stdin/stdout传递json:最直接的方式,适用于能够拿到“子”进程handle的场景,适用于关联进程之间通信,无法跨机器

  • Node原生IPC支持:最native(地道?)的方式,比上一种“正规”一些,具有同样的局限性

  • 通过sockets:最通用的方式,有良好的跨环境能力,但存在网络的性能损耗

  • 借助message queue:最强大的方式,既然要通信,场景还复杂,不妨扩展出一层消息中间件,漂亮地解决各种通信问题

 
o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
Node 异步通信解决方案 - crosscom

crosscom crosscom是nodejs进程间及浏览器非同域页面间的Callback方式及Promise方式异步通信解决方案。 支持的环境 nodejs环境 浏览器requirejs环境 浏览器原生JavaScript环境 nodejs环境与浏...

开发者王晨旭
2017/12/24
278
0
使用 ASP.NET Core 作为 mediasoup 的信令服务器

一、概述 (图片来源:李超) 的服务端由两部分构成: 1、使用 编写的作为子进程的媒体层 (, , 等)。可执行文件在 或 上为 ,在 上为 。 2、使用 () 编写的、基于 的用于与 mediasoup-worker 进...

osc_6tgtqi6v
06/22
4
0
Nodejs 进阶:解答 Cluster 模块的几个疑问

敬请关注「Nodejs技术栈」微信公众号,获取优质文章 本文分享自微信公众号 - Nodejs技术栈(NodejsRoadmap)。 如有侵权,请联系 support@oschina.cn 删除。 本文参与“OSC源创计划”,欢迎正...

五月君
03/12
0
0
理解Egg.js中的多进程模型(egg-cluster)

https://juejin.im/post/5e3640ea6fb9a030111f79b0 背景 我们知道,js是单线程的,意味着一个nodejs进程只能运行在单个cpu上面。nodejs在io处理方面是非常优秀的,但是在密集运算型应用中仍然...

osc_uie90flw
03/19
1
0
理解Egg.js中的多进程模型(egg-cluster)

https://juejin.im/post/5e3640ea6fb9a030111f79b0 背景 我们知道,js是单线程的,意味着一个nodejs进程只能运行在单个cpu上面。nodejs在io处理方面是非常优秀的,但是在密集运算型应用中仍然...

osc_8adtko4d
03/19
22
0

没有更多内容

加载失败,请刷新页面

加载更多

2020-2021 设计趋势ISUX报告 · 用户体验篇

据说点击蓝色字体关注同学都升职加薪了 前言 身为用户体验设计师,无时无刻不被世界上的新事物冲刷着认知——互联网红利下降带来变化莫测的商业动向、循着摩尔定律野蛮生长日新月异的新技术、...

静电1983
07/04
0
0
当查询的数据来自多个数据源,有哪些好的分页策略?

概述 在业务系统开发中,尤其是后台管理系统,列表页展示的数据来自多个数据源,列表页需要支持分页,怎么解决? 问题 如上图,数据源可能来自不同 DB 数据库,可能来自不同 API 接口,也可能...

新亮笔记
03/14
9
0
花最少的时间点亮OLED之RT-Thread u8g2之(DIY一个小小天气站+万年历)

准备花几天时间DIY一个小小天气站+万年历,一来可以送给好友,二来也是蹦着熟悉RT-Thread的目的去学习,以提高自己的工作效率,指不定哪天就用上了,总之技多不压身嘛! 1、什么是u8g2? u8g...

Aladdin-Wang
07/10
0
0
HTML5 不得不看的本地存储 LocalStorage

用过HTML5 本地存储和sessionStorage的,你就感觉html5很强大,比cookie和session好用很多,下面让我们来学习这个知识吧... 最早的Cookies自然是大家都知道,问题主要就是太小,大概也就4KB...

胡哥有话说
2016/08/16
0
0
Oracle SQL自动化审核工具的实现

作者介绍 梁铭图,新炬网络首席架构师,十多年数据库运维、数据库设计、数据治理以及系统规划建设经验,拥有Oracle OCM、Togaf企业架构师(鉴定级)、IBM CATE等认证,曾获dbaplus年度MVP以及...

osc_sezkegv6
24分钟前
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部