【Nginx源码解读】事件处理模块
【Nginx源码解读】事件处理模块
yangliucheng 发表于3个月前
【Nginx源码解读】事件处理模块
  • 发表于 3个月前
  • 阅读 17
  • 收藏 0
  • 点赞 0
  • 评论 0

Nginx的精髓就在于事件处理模块,代码量也非常大,在整理笔记时难免有所纰漏,希望能批评指正不足之处。

一、事件模块主流程讲解

二、介绍事件处理模型的初始化

 

一、事件模块主流程讲解

事件模块主要处理两类事件:① 定时任务 ② I/O事件。其中定时任务是指nginx通过ngx_add_timer添加的事件,原型函数如下:

ngx_event_timer.h

static ngx_inline void
ngx_event_add_timer(ngx_event_t *ev, ngx_msec_t timer)
{
	// 计算过期时间点
    // 过期时间 = 现在时间 + 过期时间
    key = ngx_current_msec + timer;
	
	// 设置超时时间到
	ev->timer.key = key;
	
	// 添加的红黑树
	ngx_rbtree_insert(&ngx_event_timer_rbtree, &ev->timer);
}


// 并设置了全局变量方便在任何地方调用
#define ngx_add_timer        ngx_event_add_timer

在此列举一个本文涉及的超时管理:

① 工作进程在争抢互斥锁时,因为只有一个进程能够获取锁,如果其他进程无法获取锁,不可能一直等待,所以设置accept_mutex_delay,超过了设定的事件就会发生超时处理,也就放弃争抢锁二选择去释放资源。

ngx_event_accept.c

void
ngx_event_accept(ngx_event_t *ev)
{
	if (ngx_use_accept_mutex) {
	// xxx
	}else{
	ngx_add_timer(ev, ecf->accept_mutex_delay);
	}
}

其次就是I/O事件了,很容易理解就是处理读写事件,那么I/O事件以epoll为例,先来看下nginx处理事件的主流程:

ngx_process_cycle.c

static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
	for ( ;; ) {
		// 不断循环处理事件
		ngx_process_events_and_timers(cycle);
	}
}

事件处理的入口在nginx进程模型的代码里面,工作进程会一直循环取处理事件,其中的阻塞点就是epoll_wait,处理完一轮事件后又循环这个过程,直到master进程发来关闭的信号。接下来就是真正的开始处理事件的地方:

// 开始处理事件
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    ngx_uint_t  flags;
    ngx_msec_t  timer, delta;

    // ngx_timer_resolution 用于决定使用何种超时策略
    // 如果ngx_timer_resolution非零0,则设置timer未无限大
    // 另外ngx_timer_resolution还有作用就是控制gettimeofday调用的频率,不过在x86_64系统中影响已经可忽略了
    // 此时采用定时方案,在规定的时间,默认500ms,对红黑树中的元素进行一次扫描
    // 并处理超时的节点
    if (ngx_timer_resolution) {
        // 先设置时间为无限待,后面会设置未500
        timer = NGX_TIMER_INFINITE;
        flags = 0;

    } else {
        // 如果ngx_timer_resolution为0
        // 采用超时的方案,首先计算出最快超时的时间,然后等待这个时间段取处理超时的事件,
        // 处理完超时任务,再次计算下一次的超时时间,不断地循环处理。

        // 查询红黑树,返回超时时间最小的节点,即最左边的元素
        /**
         * while (node->left != sentinel) {
              node = node->left;
           }
        */
        // nginx通过红黑树来维护所有的time节点
        // 将超时检测时间设置未最快发生超时的事件对象的超时时刻和当前时刻的差
        // timer = (ngx_msec_int_t) (node->key - ngx_current_msec);
        timer = ngx_event_find_timer();
        flags = NGX_UPDATE_TIME;

#if (NGX_WIN32)

        /* handle signals from master in case of network inactivity */
        // 如果timer是无限或者大于500,则设置未500
        // 即如果选择定时方案,会设置定时时间为500ms
        if (timer == NGX_TIMER_INFINITE || timer > 500) {
            timer = 500;
        }

#endif
    }
    // 处理惊群现象(nginx是一个master,多个work竞争请求)
    // 惊群现象是指多个线程/进程同时监听一个socket事件(nginx是多进程程序,共享80端口),
    // 当事件发生时,会唤醒所有等待的线程/进程,争抢事件
    // 但是最后只有一个线程/进程可以读取事件成功
    // 其他进程/线程争抢失败后重新等待或者其他操作,这种浪费资源的现象叫惊群

    // 设置通过Accept互斥锁来解决惊群现象
    // 当nginx配置文件中worker的数量大于1时
    // 且配置文件打开了accept_mutex时,ngx_use_accept_mutex会设为1
    /*
    * nginx配置
    events {
        accept_mutex on;   #设置网路连接序列化,防止惊群现象发生,默认为on
        multi_accept on;  #设置一个进程是否同时接受多个网络连接,默认为off
        #use epoll;      #事件驱动模型,select|poll|kqueue|epoll|resig|/dev/poll|eventport
        worker_connections  1024;    #最大连接数,默认为512
    }
    */
    if (ngx_use_accept_mutex) {
        // ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;
        // 其中connection_n表示当前工作进程最大可承受连接数,free_connection_n表现当前空闲连接数
        // 假设当前连接数为x,那么ngx_cycle->connection_n / 8 - connection_n + x => x - 7/8 * connection_n
        // 即当连接数超过最大数的7/8时,ngx_accept_disabled的值将大于0时,当前work处理的连接已经达到饱和
        // 此时work不会竞争新的连接,nginx会任务当前函数已经经历了一轮事件处理
        // 即相应的负载减少了一点,并对ngx_accept_disabled自减
        if (ngx_accept_disabled > 0) {
            ngx_accept_disabled--;

        } else {
            // 如果活动连接数还没有达到饱和,则获取accept锁
            // 多个work,只有一个可以得到锁,
            // 且获取锁的过程不是阻塞,获取成功后,ngx_accept_mutex_held会设为1
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }
            // 如果ngx_accept_mutex_held = 1表示成功获取到锁
            if (ngx_accept_mutex_held) {
                // 抢到了accept锁的进程会被加上了NGX_POST_EVENTS标志
                // 并加入到post_events队列里
                // 设置标志,延迟处理事件
                flags |= NGX_POST_EVENTS;

            } else {
                if (timer == NGX_TIMER_INFINITE
                    || timer > ngx_accept_mutex_delay)
                {
                    timer = ngx_accept_mutex_delay;
                }
            }
        }
    }

    delta = ngx_current_msec;
    // 获得互斥锁的进程开始处理请求
    // 主要调用epoll_wait等待事件
    // 并根据事件类型加入到队列ngx_posted_accept_events或者ngx_posted_events
    (void) ngx_process_events(cycle, timer, flags);

    // delta就是调用ngx_process_events消耗的时间
    // ngx_current_msec的值是通过ngx_time_update函数得到的
    // 如果不执行ngx_time_update,则delta依然为0
    delta = ngx_current_msec - delta;

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "timer delta: %M", delta);

    // 处理新建连接事件
    ngx_event_process_posted(cycle, &ngx_posted_accept_events);

    // accept事件一旦处理完成,当前进程就会释放互斥锁
    if (ngx_accept_mutex_held) {
        // 释放锁
        ngx_shmtx_unlock(&ngx_accept_mutex);
    }
    
    if (delta) {
        // 不断的从红黑树的节点中取出时间值最小的,判断是否超时
        // 如果超时就执行他们的事件函数,直到最小的时间不超时
        // 处理超时事件
        ngx_event_expire_timers();
    }
    // 释放锁后开始处理ngx_posted_events队列中的普通读写事件
    ngx_event_process_posted(cycle, &ngx_posted_events);
}

上一段代码较长,这里解读下多做了什么操作:

1.  首先是根据ngx_timer_resolution的值判断当前定时策略,这个值是在nginx.conf文件可选配置:

timer_resolution 10ms

 如果timer_resolution 非0,则选择定时检查方案,即设置定时时间,500ms,隔这个时间去红黑树中检查时候有事件超时,如果有则处理超时事件。如果ngx_timer_resolution为0,则采用超时检测方案,首先先计算最快超时的时间 timer = ngx_event_find_timer(),然后到了timer这个时间去处理超时事件,接着在计算一次timer,依次循环处理超时事件。

2.  根据ngx_use_accept_mutex判断是否打开了互斥锁配置,如果打开了互斥锁配置则开始争抢锁,否则的话直接处理事件。首先先了解下争抢锁的过程,其核心就是函数ngx_trylock_accept_mutex:

// 各个工作线程会尝试去获取锁
ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
    // 非阻塞的方式获取锁,返回1表示获取成功,0表示获取失败
    if (ngx_shmtx_trylock(&ngx_accept_mutex)) {

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "accept mutex locked");

        if (ngx_accept_mutex_held && ngx_accept_events == 0) {
            return NGX_OK;
        }
        // 将cycle->listening中的端口信息添加到epoll事件中
        // 即把监听socket加入到epoll中进行监听
        if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
            ngx_shmtx_unlock(&ngx_accept_mutex);
            return NGX_ERROR;
        }

        // 监听完成后会获取到
        ngx_accept_events = 0;
        ngx_accept_mutex_held = 1;

        return NGX_OK;
    }

    // 获取锁失败的情况
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "accept mutex lock failed: %ui", ngx_accept_mutex_held);
    // 获取锁失败:本来就拥有锁的情况
    if (ngx_accept_mutex_held) {
        // 本来拥有锁就直接将监听套接口从自身的事件监听机制删除
        if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }
        // 拥有锁标志置位0
        ngx_accept_mutex_held = 0;
    }

    return NGX_OK;
}

代码较累赘,还是画个图说明下流程:

       

拿到锁后就开始处理事件了。开始处理之前先 flags |= NGX_POST_EVENTS延迟事件实际处理的事件,那么是如何延迟的呢?实际是在函数ngx_epoll_process_events中:

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
	if (flags & NGX_POST_EVENTS) {
		queue = rev->accept ? &ngx_posted_accept_events
							: &ngx_posted_events;

		ngx_post_event(rev, queue);

	} else {
		rev->handler(rev);
	}
}

我们已经直到影响flags的值是变量ngx_timer_resolution,其中有如下这一段代码:

void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
    if (ngx_timer_resolution) {
        flags = 0;
    } else {
        flags = NGX_UPDATE_TIME;
	}	
}

flags被设置为0或者1,所以执行 flags |= NGX_POST_EVENTS会导致flags & NGX_POST_EVENTS为非零,所以来自epoll_wait的事件会被缓存在队列里面。

3.  接下来的操作就是要处理事件了,按照2中争抢锁的过程,抢到锁的进程会用于事件的监听套接口,调用ngx_process_events方法处理事件,实际上是调用了ngx_epoll_module.c中的ngx_epoll_process_events方法:

ngx_epoll_module.c

static ngx_int_t
ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags)
{
	events = epoll_wait(ep, event_list, (int) nevents, timer);
	// 定时方案:回调函数会调用ngx_timer_signal_handler方法,并设置ngx_event_timer_alarm = 1,如果没有执行回
    //         调函数,则不会执行ngx_time_update。
	// 超时方案:flags = 1,则flags & NGX_UPDATE_TIME一定为真,所以每次会调用该方法
	if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
		// 更新ngx_current_msec的值
		ngx_time_update();
	}
	// 缓存事件到队列
	if (flags & NGX_POST_EVENTS) {
		queue = rev->accept ? &ngx_posted_accept_events
							: &ngx_posted_events;

		ngx_post_event(rev, queue);

	} else {
		rev->handler(rev);
	}
}

代码中可以看出,其中accept事件(即监听端口上的可读事件)会被缓存到队列ngx_posted_accept_events,普通事件会被缓存到队列ngx_posted_events。

4. 缓存完事件,接下来就是处理新建连接事件(accept事件),因为当前进程已经监听了某个客户端的端口,该端口的请求中的可读事件先要处理下,该读的数据读完,即处理队列ngx_posted_accept_events中的新建连接事件,如果在处理新建连接期间还有新的请求连接事件,会阻塞,等待下次进程获取锁后读取。读完可读事件后就执行解锁操作ngx_shmtx_unlock。

5. 锁释放完之后就处理连接套接口之后的连接事件了,即保存在队列ngx_posted_events中的事件。

void
ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted)
{
    ngx_queue_t  *q;
    ngx_event_t  *ev;

    while (!ngx_queue_empty(posted)) {

        q = ngx_queue_head(posted);
        ev = ngx_queue_data(q, ngx_event_t, queue);

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                      "posted event %p", ev);

        ngx_delete_posted_event(ev);

        ev->handler(ev);
    }
}

可以看出,就是不断遍历队列,调用对应的handler处理事件。

 

二、介绍事件处理模型的初始化

因为我们上面的讲解都是以epoll为例的,接下来就解释下ngnix如何来选择事件处理机制,首先先看下nginx.conf中的events模块配置:

events
{
	// 选择使用的事件处理机制,这里使用epoll
    use epoll;
	
	// 是否同时接受多个网络请求
	multi_accept on; 
	
	// 是否激活互斥锁
	accept_mutex on; 
	
	// 设置最大可用连接数
	worker_connections 65535;
	
	// 配置http连接的超时时间
    keepalive_timeout 60;
	
	// 客户端request请求中header的缓存大小
    client_header_buffer_size 4k;
	
	// 静态文件的缓存大小和缓存时间,比如html/css/image
    open_file_cache max=65535 inactive=60s;
	
	// 设置每次检查缓存有效性的时间间隔
    open_file_cache_valid 80s;
	
	// 静态文件有效缓存时间内最少使用次数
    open_file_cache_min_uses 1;
    
	// 设置是否允许缓存错误信息
    open_file_cache_errors on;
}

配置比较详细,其实相关的也就是 use epoll配置。

首先nginx定义了统一的事件处理接口,封装了各种事件处理机制(epoll/poll/select等)的执行函数:

ngx_event.h

typedef struct {

    // 将一个事件(读事件/写事件)添加到事件驱动机制
    ngx_int_t  (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    // 将一个事件(读事件/写事件)从事件驱动中上删除
    ngx_int_t  (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    // 启用一个已经添加的事件(代码暂时未使用)
    ngx_int_t  (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    // 禁用一个已经添加的事件(代码暂时未使用)
    ngx_int_t  (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);

    // 将指定连接关联的描述符加入到多路复用监控里
    ngx_int_t  (*add_conn)(ngx_connection_t *c);

    // 从多路复用监控里删除指定连接关联的描述符
    ngx_int_t  (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);

    // 仅在多线程环境下调用,目前未使用
    ngx_int_t  (*notify)(ngx_event_handler_pt handler);

    // 阻塞等待发生,并对发生的事件逐个处理
    ngx_int_t  (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer,ngx_uint_t flags);

    // 初始化事件驱动模块
    ngx_int_t  (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);

    // 回收资源
    void       (*done)(ngx_cycle_t *cycle);

} ngx_event_actions_t;

对封装的接口定义全局的变量,以及该接口函数也定义相应的全局函数

ngx_event_actions_t   ngx_event_actions

ngx_process_events   ngx_event_actions.process_events
ngx_done_events      ngx_event_actions.done
ngx_add_event        ngx_event_actions.add
ngx_del_event        ngx_event_actions.del
ngx_add_conn         ngx_event_actions.add_conn
ngx_del_conn         ngx_event_actions.del_conn
ngx_notify           ngx_event_actions.notify

所以添加一个事件(读/写)只需要调用ngx_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)。那么重点就是如何对ngx_event_actions进行赋值?搜索event/modules会发现在各种事件处理机制的模块里面都存在对ngx_event_actions的赋值:

D:\mycode\nginx\src\event\modules\ngx_devpoll_module.c:
  186:     ngx_event_actions = ngx_devpoll_module_ctx.actions;

D:\mycode\nginx\src\event\modules\ngx_epoll_module.c:
  189: ngx_event_actions = ngx_epoll_module_ctx.actions

D:\mycode\nginx\src\event\modules\ngx_eventport_module.c:
  279:     ngx_event_actions = ngx_eventport_module_ctx.actions;

D:\mycode\nginx\src\event\modules\ngx_kqueue_module.c:
  224:     ngx_event_actions = ngx_kqueue_module_ctx.actions;

D:\mycode\nginx\src\event\modules\ngx_poll_module.c:
   96:     ngx_event_actions = ngx_poll_module_ctx.actions;

D:\mycode\nginx\src\event\modules\ngx_select_module.c:
  105:     ngx_event_actions = ngx_select_module_ctx.actions;

D:\mycode\nginx\src\event\modules\ngx_win32_select_module.c:
  106:     ngx_event_actions = ngx_select_module_ctx.actions;



而系统采用哪个赋值语句取决于用户在events模块的配置中的use epoll

具体初始化的过程:

static char *
ngx_event_core_init_conf(ngx_cycle_t *cycle, void *conf)
{
	ngx_conf_init_uint_value(ecf->use, module->ctx_index);
}


#define ngx_conf_init_size_value(conf, default)                             
// 用户没有指定则设置默认的值
if (conf == NGX_CONF_UNSET_SIZE) {                                       
    conf = default;                                                      
}

以上配置项通过函数ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module)获取到,即ecf->use的值是epoll值的序号,而序号的设置在ngx_module.c中:

ngx_int_t
ngx_preinit_modules(void)
{
    ngx_uint_t  i;
    for (i = 0; ngx_modules[i]; i++) {
        ngx_modules[i]->index = i;
        ngx_modules[i]->name = ngx_module_names[i];
    }
    ngx_modules_n = i;
    ngx_max_module = ngx_modules_n + NGX_MAX_DYNAMIC_MODULES;
    return NGX_OK;
}

在ngx_preinit_modules会循环遍历所有ngx_modules中的值,并对模块的index值赋值,用于标识这个模块,所以这里存在疑问的是ngx_modules的值来自于哪里?查看源码,发现在nginx.c中存在赋值语句 modules = ngx_dlsym(handle, "ngx_modules"):

ngx_event.c:
ngx_event_process_init(ngx_cycle_t *cycle)
{
	// 返回events模块的配置集合
	ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module);
	// 遍历所有模块
	for (m = 0; cycle->modules[m]; m++) {
        if (cycle->modules[m]->type != NGX_EVENT_MODULE) {
            continue;
        }
		// 如果不等于ecf->use的值就继续遍历
        if (cycle->modules[m]->ctx_index != ecf->use) {
            continue;
        }
		//找到用户指定的事件处理机制
        module = cycle->modules[m]->ctx;
		// 执行该模块的初始化函数
        if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
            /* fatal */
            exit(2);
        }
        break;
    }
}

所以我们可以知道nginx对事件具体的处理逻辑定义在event/modules各个处理机制的模块里,我们取其中一个为例(ngx_epoll_module.c),在上面的赋值语句ngx_event_actions = ngx_epoll_module_ctx.actions,其中ngx_event_actions封装了nginx统一的事件处理调用函数,而ngx_epoll_module_ctx则定义了epoll模块的上下文信息,是ngx_event_module_t类型的静态变量,ngx_epoll_module.c中有如下定义:

typedef struct {
    ngx_str_t              *name;
    void                 *(*create_conf)(ngx_cycle_t *cycle);
    char                 *(*init_conf)(ngx_cycle_t *cycle, void *conf);
    ngx_event_actions_t     actions;
} ngx_event_module_t;

具体的赋值语句就是定义静态变量ngx_epoll_module_ctx:

static ngx_event_module_t  ngx_epoll_module_ctx = {
    &epoll_name,
    ngx_epoll_create_conf,               /* create configuration */
    ngx_epoll_init_conf,                 /* init configuration */
	// 对actions的赋值
    {
        // 添加事件(读/写事件)
        ngx_epoll_add_event,             /* add an event */
        ngx_epoll_del_event,             /* delete an event */
        ngx_epoll_add_event,             /* enable an event */
        ngx_epoll_del_event,             /* disable an event */
        ngx_epoll_add_connection,        /* add an connection */
        ngx_epoll_del_connection,        /* delete an connection */
#if (NGX_HAVE_EVENTFD)
        ngx_epoll_notify,                /* trigger a notify */
#else
        NULL,                            /* trigger a notify */
#endif
        ngx_epoll_process_events,        /* process the events */
        ngx_epoll_init,                  /* init the events */
        ngx_epoll_done,                  /* done the events */
    }
};

到这里,就已经通过ngx_event_actions = ngx_epoll_module_ctx.actions赋值语句将epoll的处理函数赋值给了nginx的全局变量ngx_event_actions,所以当我们调用全局函数ngx_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags)实际上就是调用ngx_epoll_add_event。

共有 人打赏支持
粉丝 0
博文 2
码字总数 5369
×
yangliucheng
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: