文档章节

Swoole源码学习记录(六)——Pipe管道

摇滚哈哈狗
 摇滚哈哈狗
发布于 2015/09/24 21:15
字数 2205
阅读 29
收藏 0

Swoole版本:1.7.4-stable

Pipe(管道)用于进程之间的数据交互,Linux系统本身提供了pipe函数用于创建一个半双工通信管道,而在swoole中也利用eventfd和unix sock封装了两种管道,使得进程间的通信更加灵活。

Swoole提供了一个基本结构体swPipe用于操作不同类型的Pipe,其声明在swoole.h文件的315 – 326 行,声明如下:

typedef struct  _swPipe
{
         void *object;
         int blocking;
         double timeout;
 
         int (*read)(struct _swPipe *, void*recv, int length);
         int (*write)(struct _swPipe *, void*send, int length);
         int (*getFd)(struct _swPipe *, intisWriteFd);
         int (*close)(struct _swPipe *);
}swPipe;

其中,首部的void *object指向具体的管道结构体(Base,Eventfd,Unsock),intblocking标记该管道是否是阻塞类型,double timeout为阻塞的超时时间,余下四个函数用于对管道进行读写、关闭等操作。


一.基于pipe函数的PipeBase

前面提到过,Linux本身提供了一个pipe函数用于创建一个管道,swoole基于这个函数进行了一层封装用于简化操作,并命名为PipeBase,其结构体声明在PipeBase.c文件中,声明如下:

typedef struct  _swPipeBase
{
         int pipes[2];
} swPipeBase;

其中的 int pipes[2] 用于存放pipe的读端fd和写端fd。

创建一个PipeBase管道的函数声明在swoole.h文件的328行,声明如下:

int swPipeBase_create(swPipe *p, int blocking);

第二个参数标记该管道是否阻塞。该函数具体定义在PipeBase.c文件中,其核心代码如下:

swPipeBase *object = sw_malloc(sizeof(swPipeBase));
         if (object == NULL)
         {
                   return -1;
         }
         p->blocking = blocking;
         ret = pipe(object->pipes);
         if (ret < 0)
         {
                   swWarn("pipe createfail. Error: %s[%d]", strerror(errno), errno);
                   return -1;
         }
         else
         {
                   //Nonblock
                   if (blocking == 0)
                   {
                            swSetNonBlock(object->pipes[0]);
                            swSetNonBlock(object->pipes[1]);
                   }
                   else
                   {
                            p->timeout = -1;
                   }
 
                   p->object = object;
                   p->read = swPipeBase_read;
                   p->write =swPipeBase_write;
                   p->getFd =swPipeBase_getFd;
                   p->close =swPipeBase_close;
         }

源码解释:创建一个PipeBase结构体,指定管道阻塞类型为blocking参数,并调用pipe函数创建一个linuxpipe,获取pipe fd。如果创建管道成功,若管道为非阻塞模式,则调用swSetNonBlock函数(声明于swoole.h文件的641行,定义于Base.c文件的507 – 529行,该函数的分析在本章末尾)设置两个fd为非阻塞;若管道为阻塞模式,则指定timeout为-1.

PipeBase有四个操作函数分别用于读、写、获取描述符以及关闭管道。其声明如下:

static int swPipeBase_read(swPipe *p, void *data, int length);
static int swPipeBase_write(swPipe *p, void *data, int length);
static int swPipeBase_getFd(swPipe *p, int isWriteFd);
static int swPipeBase_close(swPipe *p);

1.      swPipeBase_read函数核心源码如下:

if (p->blocking == 1 && p->timeout > 0)
{
	if (swSocket_wait(object->pipes[0], p->timeout * 1000,SW_EVENT_READ) < 0)
	{
        	 return SW_ERR;
	}
}
return read(object->pipes[0], data, length);

源码解释:如果该管道为阻塞模式,且指定了timeout时间,则调用swSocket_wait函数(定义于Base.c文件中的236 – 268行,该函数分析在本章末尾)执行等待,若超时,则返回SW_ERR;否则,直接调用read方法读取数据(此时,若管道为非阻塞模式,因已经指定了fd的options,所以read方法不会阻塞;若为阻塞模式,则read函数会一直阻塞到有数据可读)

2.      swPipeBase_write函数直接调用write函数写出数据

3.      swPipeBase_getFd函数根据 isWriteFd判定返回 写fd 或者 读fd

4.      swPipeBase_close函数调用close函数关闭描述符并释放swPipeBase内存。

二.基于eventfd函数的PipeEventfd

Eventfd是Linux提供的一个事件提醒(eventnotification)功能,eventfd函数创建一个对象用于事件的等待/提醒,可以从内核态发送通知到用户态。一个eventfd创建的对象包括一个无符号的64bit整型计数器,该计数器被内核持有。

eventfd函数的第二个参数用于指定flags,其中EFD_NONBLOCK指定eventfd是否非阻塞,EFD_SEMAPHORE用于指定eventfd的读写效果。

Swoole用一个结构体swPipeEventfd代表一个eventfd类型的管道,其声明如下:

typedef struct _swPipeEventfd
{
         int event_fd;
}swPipeEventfd;

其中event_fd就是用eventfd函数获得的描述符。

创建一个swPipeEventfd的函数声明在swoole.h的329行,声明如下:

int swPipeEventfd_create(swPipe *p, int blocking, int semaphore, int timeout);

其中第三个参数semaphore标记该管道是否仅用于提供通知,第四个参数timeout指定阻塞超时的时间。

该函数具体定义在PipeEventfd.c文件,其核心代码如下:

swPipeEventfd *object =sw_malloc(sizeof(swPipeEventfd));
         if (object == NULL)
         {
                   return -1;
         }
 
         flag = EFD_NONBLOCK;
 
         if (blocking == 1)
         {
                   if (timeout > 0)
                   {
                            flag = 0;
                            p->timeout = -1;
                   }
                   else
                   {
                            p->timeout =timeout;
                   }
         }
 
#ifdefEFD_SEMAPHORE
         if (semaphore == 1)
         {
                   flag |= EFD_SEMAPHORE;
         }
#endif
 
         p->blocking = blocking;
         efd = eventfd(0, flag);

源码解释:创建一个swPipeEventfd对象,默认设置管道为非阻塞的,若参数blocking指定管道为阻塞,则指定timeout的值。如果指定了semaphore参数,则将EFD_SEMAPHORE加入flags中。最后调用eventfd函数获取event_fd.

这里需要特别说明EFD_SEMAPHORE参数。如果一个eventfd被指定了该参数,当eventfd的计数器不为0时,对eventfd的read操作会读到一个8byte长度的整数1,然后计数器的值减1;如果没有指定该参数,当eventfd的计数器不为0时,对eventfd的read操作会将计数器的值读出(8byte的整数),并将计数器置0.

PipeEventfd同样有四个操作函数分别用于读、写、获取描述符以及关闭管道。其声明如下

static int swPipeEventfd_read(swPipe *p, void *data, int length);
static int swPipeEventfd_write(swPipe *p, void *data, int length);
static int swPipeEventfd_getFd(swPipe *p, int isWriteFd);
static int swPipeEventfd_close(swPipe *p);

1.      swPipeEventfd_read核心源码如下:

if (p->blocking == 1 && p->timeout > 0)
         {
                   if(swSocket_wait(object->event_fd, p->timeout * 1000, SW_EVENT_READ) <0)
                   {
                            return SW_ERR;
                   }
         }
 
         while (1)
         {
                   ret =read(object->event_fd, data, sizeof(uint64_t));
                   if (ret < 0 &&errno == EINTR)
                   {
                            continue;
                   }
                   break;
         }

源码解释:如果管道为可阻塞且timeout大于0,则调用swSocket_wait函数(定义于Base.c文件中的236 – 268行,该函数分析在本章末尾)执行等待。若超时,则返回SW_ERR;否则,循环调用read函数直到读取到数据。

2.      swPipeEventfd_write函数循环调用write函数直到写入数据成功。

3.      swPipeEventfd_getFd函数返回event_fd

4.      swPipeEventfd_close函数调用close关闭event_fd并释放内存。

三.基于unix socket的PipeUnsock

PipeUnsock使用了socketpair函数和AF_UNIX(Unix Socket)来创建一个全双工的“管道”。这其实类似于Linux的pipe函数,不同的是文件描述符fd换成了套接字描述符sockfd,半双工通讯变成了全双工通讯。

swPipeUnsock结构体声明在PipeUnsock.c文件中,其声明如下:

</pre><p><pre name="code" class="cpp">typedef struct _swPipeUnsock
{
         int socks[2];
} swPipeUnsock;

其中socks用于存放两个socket套接字

创建一个swPipeUnsock的函数声明在swoole.h文件的330行,声明如下:

int swPipeUnsock_create(swPipe *p, intblocking, int protocol);

其中第二个参数指定socket使用的协议族(TCP/UDP)

其核心代码如下:

p->blocking =blocking;
         ret= socketpair(AF_UNIX, protocol, 0, object->socks);
         if(ret < 0)
         {
                   swWarn("socketpair()failed. Error: %s [%d]", strerror(errno), errno);
                   returnSW_ERR;
         }
         else
         {
                   //Nonblock
                   if(blocking == 0)
                   {
                            swSetNonBlock(object->socks[0]);
                            swSetNonBlock(object->socks[1]);
                   }
 
                   intsbsize = SwooleG.unixsock_buffer_size;
                   setsockopt(object->socks[1],SOL_SOCKET, SO_SNDBUF, &sbsize, sizeof(sbsize));
                   setsockopt(object->socks[1],SOL_SOCKET, SO_RCVBUF, &sbsize, sizeof(sbsize));
                   setsockopt(object->socks[0],SOL_SOCKET, SO_SNDBUF, &sbsize, sizeof(sbsize));
                   setsockopt(object->socks[0],SOL_SOCKET, SO_RCVBUF, &sbsize, sizeof(sbsize));
         }

源码解释:设置管道阻塞类型,并调用socketpair获取两个套接字,并指定套接字类型为AF_UNIX,如果管道为非阻塞类型,则调用swSetNonBlock函数设置套接字属性。然后指定socket buffer 的大小(SwooleG是一个swServerG结构体,用于存放一些全局变量,该对象声明在Server.c中,在此不作分析),并调用setsockopt函数设置套接字选项。

swPipeUnsock的操作函数和swPipeBase基本一样,在此不再分析。

 

至此,Swoole的Pipe模块已分析完成。


附录:Base.c中相关函数的分析

1.     swSetNonBlock

函数原型:

void swSetNonBlock(int sock); // swoole.h  641行

核心代码:(Base.c  507 - 529行)

do
{
           opts =fcntl(sock, F_GETFL);
}
while(opts <0&& errno == EINTR);
if (opts < 0)
{
           swWarn("fcntl(sock,GETFL)fail");
}
opts = opts |O_NONBLOCK;
do
{
           ret =fcntl(sock, F_SETFL, opts);
}
while(ret <0&& errno == EINTR);

源码解释:调用fcntl函数,通过F_GETFL命令获得描述符的状态flags(没有一个比较好的翻译……百度翻译成旗标);将O_NONBLOCK(非阻塞标记)加入flags,并通过fcntl函数的F_SETFL命令设置描述符的状态flags。

2.     swSocket_wait

函数原型:

int swSocket_wait(int fd, inttimeout_ms, int events)  // swoole.h  648行

核心代码:(Base.c  236 – 268行)

structpollfd event;
event.fd = fd;
event.events = 0;
 
if (events & SW_EVENT_READ)
{
           event.events |= POLLIN;
}
if (events & SW_EVENT_WRITE)
{
           event.events |= POLLOUT;
}
while (1)
{
           int ret = poll(&event, 1,timeout_ms);
           if (ret == 0)
           {
                    return SW_ERR;
           }
           else if (ret < 0 && errno!= EINTR)
           {
                    swWarn("poll() failed.Error: %s[%d]", strerror(errno), errno);
                    return SW_ERR;
           }
           else
           {
                    return SW_OK;
           }
}

源码解释:创建 pollfd 结构体,指定监听的fd,并根据events的值指定需要监听的事件(读事件or写事件)。接着在循环中调用poll函数监听,并指定poll函数的超时时间为timeout_ms参数。若成功监听到事件,则返回SW_OK,否则返回SW_ERR。


版权声明:本文为博主原创文章,未经博主允许不得转载。

本文转载自:http://blog.csdn.net/ldy3243942/article/details/38848239

摇滚哈哈狗
粉丝 14
博文 226
码字总数 28445
作品 0
深圳
程序员
私信 提问
Linux学习笔记——管道PIPE

管道:当从一个进程连接数据流到另一个进程时,使用术语管道(pipe)。 # include int pipe(int filedes[2]); //创建管道 pipe()说明: 返回值:0成功,-1出错。 如果调用成功,则进程此时由了...

长平狐
2013/01/06
72
0
Swoole 1.9 正式版发布,增加多项新特性

Swoole 1.9 正式版发布了。 PHP的异步、并行、高性能网络通信引擎Swoole 已发布 1.9 正式版。新版本增加了多项新特性,修复了多个已知问题。1.9版本是100%向下兼容1.8的,用户可无缝升级。 ...

matyhtf
2016/11/22
6K
20
RxJS的另外四种实现方式(后记)—— 同时实现管道和链式编程

目录 RxJS的另外四种实现方式(序) RxJS的另外四种实现方式(一)——代码最小的库 RxJS的另外四种实现方式(二)——代码最小的库(续) RxJS的另外四种实现方式(三)——性能最高的库 Rx...

一个灰
2018/09/24
68
0
RxJS进阶——关于流的理解和应用

RxJS是微软公司推出的响应式编程的JavaScript库。 对于它的学习,最开始我的理解是把它当成是 能优雅地解决异步问题的lodash。 随着学习的深入,发现它采用了订阅者模式,其中也带有纯函数的...

genetalks_大数据
04/01
0
0
[Linux]管道的创建和使用

一、管道由函数pipe创建,只能提供单向的数据传递的数据传送。 格式: #include <unistd.h> int pipe(int fd[2]); fd为两个文件描述符:fd[0]用来读,fd[1]用来写。 1.父子进程的单向通信方式...

亭子happy
2012/09/14
156
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring Boot + Mybatis-Plus 集成与使用(二)

前言: 本章节介绍MyBatis-Puls的CRUD使用。在开始之前,先简单讲解下上章节关于Spring Boot是如何自动配置MyBatis-Plus。 一、自动配置 当Spring Boot应用从主方法main()启动后,首先加载S...

伴学编程
今天
7
0
用最通俗的方法讲spring [一] ──── AOP

@[TOC](用最通俗的方法讲spring [一] ──── AOP) 写这个系列的目的(可以跳过不看) 自己写这个系列的目的,是因为自己是个比较笨的人,我曾一度怀疑自己的智商不适合干编程这个行业.因为在我...

小贼贼子
今天
7
0
Flutter系列之在 macOS 上安装和配置 Flutter 开发环境

本文为Flutter开发环境在macOS下安装全过程: 一、系统配置要求 想要安装并运行 Flutter,你的开发环境需要最低满足以下要求: 操作系统:macOS(64位) 磁盘空间:700 MB(不包含 IDE 或其余...

過愙
今天
6
0
OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
2.5K
16
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
42
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部