文档章节

事件库之Redis自己的事件模型-ae

C_Z
 C_Z
发布于 2013/09/13 10:26
字数 3773
阅读 1.7W
收藏 16

#Redis自己的事件模型 ae

##1.Redis的事件模型库

大家到网上Google“Redis libevent”就可以搜到Redis为什么没有选择libevent以及libev为其事件模型库,而是自己写了一个事件模型。 从代码中可以看到它主要支持了epoll、select、kqueue、以及基于Solaris的event ports。主要提供了对两种类型的事件驱动:

  1. IO事件(文件事件),包括有IO的读事件和写事件。
  2. 定时器事件,包括有一次性定时器和循环定时器。

##2.使用示例

这里写了一个由标准输入的读事件驱动的echo服务例子,同时用一个5秒的循环定时器每个5秒打印一次服务器状态。这里用了epoll为底层 事件接口。具体的代码抽取可以从Redis的源码中抽取"ae.c"、“ae.h”、"ae_select.c"、“ae_epoll.c”、"ae_evport.c"这几个文件,通过 ae.c中的宏::

#define HAVE_EPOLL 1    // illustrate to use epoll
#ifdef HAVE_EVPORT
#   include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#    include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#    include "ae_kqueue.c"
#else
#    include "ae_select.c"
#endif
#endif
#endif

这里主要是分析Redis的事件模型的封装,因此对于其对socket的包装以及内存管理都不做分析。故采用标准输入,同时需要将这些文件中 的内存管理接口"zmalloc()"以及"zfree()"替换成C库中的“malloc()”还有"free()"。可以使用sed或者vim的%s做替换操作。

将主程序贴在这里::

#include "ae.h"

#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <sys/time.h>


#define MAXFD 5


void loop_init(struct aeEventLoop *l) 
{
        puts("I'm loop_init!!! \n");
}

void file_cb(struct aeEventLoop *l,int fd,void *data,int mask)
{
        char buf[51] ={0};
        read(fd,buf,51);
        printf("I'm file_cb ,here [EventLoop: %p],[fd : %d],[data: %p],[mask: %d] \n",l,fd,data,mask);
        printf("get %s",buf);
}

int time_cb(struct aeEventLoop *l,long long id,void *data)
{
        printf("now is %ld\n",time(NULL));
        printf("I'm time_cb,here [EventLoop: %p],[id : %lld],[data: %p] \n",l,id,data);
        return 5*1000;

}

void fin_cb(struct aeEventLoop *l,void *data)
{
        puts("call the unknow final function \n");
}

int main(int argc,char *argv[])
{
        aeEventLoop *l; 
        char *msg = "Here std say:";
        char *user_data = malloc(50*sizeof(char));
        if(! user_data)
                assert( ("user_data malloc error",user_data) );
        memset(user_data,'\0',50);
        memcpy(user_data,msg,sizeof(msg));

        l = aeCreateEventLoop(MAXFD);
        aeSetBeforeSleepProc(l,loop_init);
        int res;
        res = aeCreateFileEvent(l,STDIN_FILENO,AE_READABLE,file_cb,user_data);
        printf("create file event is ok? [%d]\n",res);
        res = aeCreateTimeEvent(l,5*1000,time_cb,NULL,fin_cb);
        printf("create time event is ok? [%d]\n",!res);

        aeMain(l);

        puts("Everything is ok !!!\n");
return 0;
}

没有什么逻辑,就是注册一个标准输入的读事件,和一个定时器事件。这里要说明的就是在ae.h中定义了读、写、定时器等回调函数的类型::

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); 
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); 
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);  

按这个类型定义回调函数就可以。其中asFileProc和aeTimeProc比较容易理解,就是IO读写事件和定时器事件的回调函数。这里要注意了,如果 定义的定时器回调函数返回值为正数,那么表示该定时器是一个循环定时器,即在第一次执行完后添加定时器事件时给定的延迟后不删除定时器, 在延迟该返回值时间(单位是毫秒)后再次执行该定时器。所以就要注意,比如要每5秒执行一个操作,那么在添加定时器时要给定其定时时间为 5000毫秒,同时在该定时器的回调函数中也要返回5000.

然后aeBeforeSleepProc回调函数比较的扑朔迷离,从Sleep上不容易理解,其实想到select和epoll这些机制的作用就可以想到了,这个函数是在 poll之前执行,从源码中看到就是在每个处理事件的循环开始出执行的。而aeEventFinalizerProc单从名字就更难理解,从源码中看到它是在删除 定时器事件时候执行的。

clientData比较好理解,就和在epoll中的ptr指针的作用一样。主要可以存放用户对每个事件上附加的数据。

事件循环的入口函数是aeMain(),将创建好的aeEventLoop传入就可以了。

使用起来很简单,对于不是很复杂或者对接入层要求不高的应用可以一试。

##3.ae.c分析

Redis的ae(姑且这么称呼Redis用的事件模型库的名字)主要逻辑在文件“ae.c”中,其中根据使用的系统事件接口分别选择包含"ae_epoll.c"或其他 文件。用到的主要数据结构在文件“ae.h”中定义。下面用一个不规范的UML类图表示了几个主要数据结构之间的关系,其中连在一起的表示一个数组或者 箭头表示的链表。这么画主要是帮助理解。

类图

下面根据上面的示例程序一一做说明。

###3.1 主要数据结构的创建

####3.1.1 aeCreateEventLoop

首先要创建一个aeCreateEventLoop对象。该对象需要一个最大文件描述符作为参数setSize,这个参数的意义需要了解ae的数据存放结构。从上面的图可以看到 在aeEventLoop结构中有两个数组(其实就是服务器程序惯用提前分配好内存然后用index映射到相应位置的做法),这两个数组的大小就是这里的参数值。 ae会创建一个 setSize*sizeof(aeFileEvent) 以及一个 setSize*siezeof(aeFiredEvent) 大小的内存,用文件描述符作为其索引。这一可以达到0(1)的速度找到事件数据所在位置。那么这个大小定位多少合适呢?在Linux个中,文件描述符是个有限的资源,当打开一个文件时就会消耗一个文件描述符,当关闭该文件描述符或者程序结束时会释放该文件描述符资源,从而供其他文件打开操作使用。当文件描述符超过最大值后,打开文件就会出错。那么这个最大值是多少呢?可以通过/proc/sys/fs/file-max看到系统支持的最大的文件描述符数。通过 ulimit -n 可以看到当前用户能打开的最大的文件描述符。在我这里的一台8g内存的机器上,系统支持最大的文件描述是365146。而在这台64bit的机器上 sizeof(aeFiredEvent) + sizeof(aeFileEvent) 大小为40byte。按系统最大支持的文件描述符来算,固定消耗内存为14.6M。这样以文件描述符作为数组的下标来索引,虽然这样的哈希在接入量不大的情况下会有大量的浪费。但是最多也就浪费14M 的内存,因此这样的设计是可取的。

在做好这些准备后还要准备系统提供的事件模型接口。这里以epoll为例,其他的可以自行查阅源码了解。ae首先提供了一个统一的结构名aeApiState,可以想象成c++中接口。在包装epoll的aeApiState中有一个epfd表示epoll占用的fd,一个epoll_event *events,其实也是一个aeApiState数组::

其和aeFiredEvent相对应,当epoll_wait()返回时,会将pending的文件描述符的信息放在aeFiredEvent数组中,包括有fd,以及mask事件类型,此时的aeFiredEvent不是以fd作为下标的,而是把这个数组当成一个缓冲区,存放一次epoll_wait()返回的所有fd,同时用epoll_event数组存放了epoll_wait()返回中的epoll_data数据,用其数据可以填充aeFiredEvent数组的内容供ae使用找到pending d的aeFileEvent对象。并在下一次进入epoll_wait()前处理完。这样完成了对epoll数据封装。

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

3.1.2 aeCreateFileEvent

创建IO事件时需要指定要要注册的文件的文件描述符fd,以及要监听的事件类型mask。ae会先通过fd找到其对应的aeCreateFileEvent对象所在内存位置::

typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE) */
    aeFileProc *rfileProc;  
    aeFileProc *wfileProc;  
    void *clientData;   
} aeFileEvent;

然后添加其要监听的事件类型mask fe->mask |= mask;,接着回根据要监听的类型添加其读事件或者写事件的回调函数,即aeFileProc。同时更新maxfd以备后用,如在select中的最大fd的指定。

在创建文件事件的过程中还要通过宏判断后include进来的底层事件模型接口来注册IO事件。这里和上面一样以epoll为例,其他的事件模型也类似。通过aeApiAddEvent将文件描述符fd和事件类型mask传给epoll操作。首先通过fd为下标找到aeCreateFileEvent对应的位置,然后取得epoll的epfd.通过EPOLL_CTL_ADD和EPOLL_CTL_MOD来加入或者修改epoll在该fd上事件的类型。

####3.1.3 aeCreateTimeEvent

ae的定时器是用一个单链表来管理的,将定时器依次从head插入到单链表中。插入的过程中会取得未来的墙上时间作为其超时的时刻。即将当前时间加上添加定时器时给定的延迟时间。定时器结构如下。并设置超时以及注销定时器时的回调函数还用clientData::

typedef struct aeTimeEvent {
    long long id; /* time event identifier. */ 
    long when_sec; /* seconds */                 
    long when_ms; /* milliseconds */           
    aeTimeProc *timeProc;                      
    aeEventFinalizerProc *finalizerProc;        
    void *clientData;                           
    struct aeTimeEvent *next;                   
} aeTimeEvent;

###3.2 事件循环

####3.2.1 aeMain入口函数

ae事件循环的基本机构就是用一个无限循环,然后再循环中去检测各个事件的发生。当然这里不是完全意义上的轮询,因为循环里面封装了epoll,select等事件驱动机制::

while (!eventLoop->stop) {
    if (eventLoop->beforesleep != NULL)
        eventLoop->beforesleep(eventLoop);
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}

这里的beforesleep就是上文中叙述过的,进入一次循环之前做的操作。后面会说到定时的过程其实也就是一个epoll或者select模拟的sleep过程,而等待事件到来也是“sleep”在epoll或者select上。所以这个叫名字感觉也算贴切。当然这里是YY一下。不过可以帮助理解。

####3.2.2 aeProcessEvents

ae中最主要的逻辑应该也就是事件的处理了。从上面知道aeProcessEvents是处理事件的入口。在进入事件处理函数时,首先若没有任何事件则立即返回::

/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

这里注释中说的ASAP我不太理解表示的啥意思,望高人指点。

然后判断是否有定时器事件,如果有那么就去取得最近的一个将超时定时器的时间减去当前时间作为epoll或者select等事件接口的超时时间。该寻找过程就是通过遍历单链表得来的。这样指定超时时间,在有IO事件pending时可以处理IO事件,同时若没有则可以保证从epoll或者select中返回去处理定时器事件。不过这里也可以不注册定时器事件然后将事件的flags与上AE_DONT_WAIT,那么就会在poll中一直等待IO时间的到来。

在获得事件接口的超时时间后,用其调用封装事件接口的函数aeApiPoll。这里依旧以epoll作为示例。其将首先获得apidata,然后从中获得epoll的文件描述符epfd,并用events指针指向的数组内存以及超时时间调用epoll的epoll_wait().在上面已经描述了,epoll()返回时会将结果放在epoll_event数组中同时返回新的文件描述符。通过对返回数据的事件类型做判断可以填充到aeFiredEvent中fd和事件类型信息。

然后返回到ae的逻辑中,通过遍历aeFiredEvent数组取得fd可以找到pending事件的aeFileEvent,然后根据事件的类型去调用用户定义的IO回调函数。

当epoll或者select超时返回时并注册了定时器事件时,通过processTimeEvents进入去处理超时事件::

/* If the system clock is moved to the future, and then set back to the
 * right value, time events may be delayed in a random way. Often this
 * means that scheduled operations will not be performed soon enough.
 *
 * Here we try to detect system clock skews, and force all the time
 * events to be processed ASAP when this happens: the idea is that
 * processing events earlier is less dangerous than delaying them
 * indefinitely, and practice suggests it is. */  
if (now < eventLoop->lastTime) {  
    te = eventLoop->timeEventHead;
    while(te) {
        te->when_sec = 0;
        te = te->next;
    }
}

这里的注释说明了这么做的意义,其实就是如果系统事件变更了,就将所有的定时器时间设为0,让他在本次循环中超时并被执行。

当一个定时器被处理的时候,此时可能会加入新的定时,比如在定时器处理函数中加入新的定时器。而此时仅应该处理上一个时间段的状态,不应该在本次循环中去处理新的定时器。因此ae在EventLoop中加入了一个timeEventNextId的成员表示此次循环中最大的定时器id+1,这样在遍历定时器列表时,先保存最大的定时器id,然后遍历过程过滤掉定时器列表可能加入新的定时器即可::

    if (te->id > maxId) {
        te = te->next;   
        continue;
    }

这里定时器的逻辑是若单链表中的定时器时间比当前时间晚就执行定时器注册的回调函数。如果该回调函数返回正值,那么就更新定时器时间为该值之后,从而可以循环执行定时器。如果该回调函数返回AE_NOMORE,那么在执行完回调函数后注销该定时器。

###3.3 清理工作

####3.3.1 注销IO事件

注销IO事件不是以aeFileEvent为单位而是该IO事件加上其监听的事件类型为对象,因此其接口为aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)。其首先通过fd找到去掉aeFileEvent对象,然后获得已有的mask,对其进行减操作后,构成fd上新的mask事件类型。通过修改epoll或者select中注册的IO事件来完成。这里以epoll为例,会根据该文件描述符上是否还有待等待的事件类型分别调用epoll_ctr的EPOLL_CTL_MOD或者EPOLL_CTL_DEL命令。

####3.3.2注销Timer时间

注销定时器事件的操作比较暴力,直接遍历链表,找到定时器id匹配的项,使用单链表删除操作进行删除。这里再删除之前会调用定时器上的finalizerProc。

####3.3.3注销aeEventLooop

最后注销aeEventLooop就是对相关内存的释放。

##4.总结

分析到这就结束了。感觉ae比较的直观。主要提供了一个IO事件和定时器事件的事件驱动模型。定时器的单链表逻辑可以再改进,比如用最小堆或者Timing-Wheel等著名的定时器解决方法。这样的一个模型用select可以跨到Windows上。因此用这套东西写的server再客户端测试的时候,也可以复用接入层。

© 著作权归作者所有

下一篇: 学着用Sed
C_Z

C_Z

粉丝 42
博文 7
码字总数 18356
作品 0
深圳
程序员
私信 提问
加载中

评论(11)

一叶舟troy
一叶舟troy
你好 楼主
平时写代码 我们也是 用for 循环轮询 容纳后epoll 呀redis也什么特别的吗?
一叶舟troy
一叶舟troy
for( ; ; )
{
nfds = epoll_wait(epfd,events,20,500);
C_Z
C_Z 博主

引用来自“logbird”的评论

偷了楼主的图片 放在自己博客当作笔记了。。楼主见谅
没事。共同学习嘛
logbird
logbird
偷了楼主的图片 放在自己博客当作笔记了。。楼主见谅
岁月漫步
岁月漫步
ASAP --- as soon as possible 尽快
是这个意思,哈哈,源码中很多地方用到这个了
__eric__
__eric__
ASAP as soon as possible
徐顺
徐顺

引用来自“chenjing1989”的评论

引用来自“C_Z”的评论

引用来自“chenjing1989”的评论

你好,我最近也在看redis的源代码,能否留个邮箱给我,我有些不懂可以请教你!!另外:你的那个流程图是什么软件画的啊。0

我邮箱:cz.theng@gmail.com。不过我没有去研读Redis其他部分。是因为看Libev的时候看到Redis自己没有使用这样的网络事件库,所以去看了下。推荐你搜下国内有个研读Reids的(http://www.redisbook.com/en/latest/ )。这个图是用Dia画的(http://zh.wikipedia.org/wiki/Dia)

谢谢啦!现在在研读整个源代码,但是我下的是比较早版本的,容易理解些。呵呵!!
我也在研读Redis源码,一起交流,xushun007@gmail.com
c
chenjing1989

引用来自“C_Z”的评论

引用来自“chenjing1989”的评论

你好,我最近也在看redis的源代码,能否留个邮箱给我,我有些不懂可以请教你!!另外:你的那个流程图是什么软件画的啊。0

我邮箱:cz.theng@gmail.com。不过我没有去研读Redis其他部分。是因为看Libev的时候看到Redis自己没有使用这样的网络事件库,所以去看了下。推荐你搜下国内有个研读Reids的(http://www.redisbook.com/en/latest/ )。这个图是用Dia画的(http://zh.wikipedia.org/wiki/Dia)

谢谢啦!现在在研读整个源代码,但是我下的是比较早版本的,容易理解些。呵呵!!
C_Z
C_Z 博主

引用来自“chenjing1989”的评论

你好,我最近也在看redis的源代码,能否留个邮箱给我,我有些不懂可以请教你!!另外:你的那个流程图是什么软件画的啊。0

我邮箱:cz.theng@gmail.com。不过我没有去研读Redis其他部分。是因为看Libev的时候看到Redis自己没有使用这样的网络事件库,所以去看了下。推荐你搜下国内有个研读Reids的(http://www.redisbook.com/en/latest/ )。这个图是用Dia画的(http://zh.wikipedia.org/wiki/Dia)
c
chenjing1989
你好,我最近也在看redis的源代码,能否留个邮箱给我,我有些不懂可以请教你!!另外:你的那个流程图是什么软件画的啊。0
Redis 2.8.9源码 - ae模块

本文为作者原创,转载请注明出处:http://my.oschina.net/fuckphp/blog/505956 Redis 的 ae模块的代码主要分布在 ae.c ae.h 还有 ae*.c 中,分别实现了epoll、evport、kqueue、select几种网络...

logbird
2015/09/14
148
0
并发服务器:Redis案例研究分析

1.事件处理库 Redis 最初发布于 2009 年,它最牛逼的一件事情大概就是它的速度 —— 它能够处理大量的并发客户端连接。需要特别指出的是,它是用一个单线程来完成的,而且还不对保存在内存中...

问题终结者
2018/06/03
27
0
Redis 的事件驱动库结构

原文地址:http://blog.ddup.us/?p=114 这是一篇翻译文章,原文见这里。Redis实现了它自己的事件库。事件库的实现在ae.c文件中。要弄明白Redis事件库是如何工作的最好的方法就是弄明白Red...

红薯
2011/08/14
1.7K
0
学习笔记-Redis设计与实现-事件

Redis服务器是一个事件驱动程序,服务器需要处理以下两类事件: 文件事件(file event):Redis服务器通过套接字与客户端(或者其他Redis服务器)进行连接,而文件事件就是服务器对套接字操作的抽...

技术小阿哥
2017/11/27
0
0
Redis——事件

问题: Redis 是单线程的,怎么实现的多个客户端的连接访问? Redis服务器是一个事件驱动程序,服务器需要处理一下两类事件: * 文件事件(file event): Redis服务器通过套接字与客户端(或者其...

nao
2016/05/18
155
0

没有更多内容

加载失败,请刷新页面

加载更多

在两个日期之间查找对象MongoDB

我一直在围绕在mongodb中存储推文,每个对象看起来像这样: {"_id" : ObjectId("4c02c58de500fe1be1000005"),"contributors" : null,"text" : "Hello world","user" : { "following......

javail
18分钟前
35
0
《aelf经济和治理白皮书》重磅发布:为DAPP提供治理高效、价值驱动的生态环境

2020年2月17日,aelf正式发布《aelf经济和治理白皮书》,这是aelf继项目白皮书后,在aelf网络经济模型和治理模式方面的权威论述。 《aelf经济和治理白皮书》描述了aelf生态中各个角色及利益的...

AELF开发者社区
29分钟前
44
0
EditText的首字母大写

我正在开发一个小小的个人待办事项列表应用程序,到目前为止,一切都运行良好。 我想知道一个小怪癖。 每当我去添加一个新项目时,我都会看到一个带有EditText视图的Dialog。 当我选择EditT...

技术盛宴
33分钟前
30
0
战疫 | 高德工程师如何在3天上线“医护专车”

新冠状病毒肺炎疫情突袭,无数医护人员放弃与家人团聚,明知凶险,仍然奋战在一线。但因为武汉公交、地铁、网约车停运,医护人员上下班很难。白衣天使疾呼打车难。 (截图摘自《财经国家周刊...

amap_tech
41分钟前
41
0
img在IE中无法按比例显示

在IE浏览器中使用img标签当给img标签设置width:98%时,显示时还是会把img的原始高度显示出来 解决方式给父标签设置width,但width不能使用100%需要指定一个值 <div style="width:900px;"> ...

有理想的鸭子
41分钟前
45
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部