文档章节

nginx基于epoll模型事件驱动流程详解

爱宝贝丶
 爱宝贝丶
发布于 01/14 10:34
字数 6012
阅读 7.3K
收藏 72

        epoll是一种基于事件驱动的模型,其是nginx能够高效处理客户端请求的重要原因之一。从流程上来讲,epoll模型的使用主要分为三步:epoll句柄的创建,监听文件描述符的添加和等待事件的触发,本文将介绍nginx是如何基于这三个步骤实现客户端请求的高效处理的。

1. epoll模型介绍

        在介绍nginx的实现原理之前,我们首先需要介绍一下epoll模型的基本使用方式。epoll在使用的时候主要有三个方法:

// 创建epoll句柄
int epoll_create(int size);
// 往epoll句柄中添加需要进行监听的文件描述符
int epoll_ctl(int epfd,int op,int fd,struct epoll_event* event);
// 等待需要监听的文件描述符上对应的事件的发生
int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);

        首先,我们会调用epoll_create()方法创建一个epoll实例的句柄,可以将这里的句柄理解为一个eventpoll结构体实例,而这个结构体中有一个红黑树和一个队列,红黑树中主要存储需要监听的文件描述符,而队列则是在所监听的文件描述符中有指定的事件发生时就会将这些事件添加到队列中,如下图所示为eventpoll的示意图:

        一般来说,这个epoll句柄在程序的整个运行周期中只会有一个,比如nginx每个worker进程就都只维护了一个epoll句柄。在创建完句柄之后,对于我们的程序监听的每一个端口,其实本质上也都是一个文件描述符,这个文件描述符上是可以发生Accept事件,也即接收到客户端请求的。因而,初始时,我们会将需要监听的端口对应的文件描述符通过epoll_ctl()方法添加到epoll句柄中。添加成功之后,这每一个监听的文件描述符就对应了eventpoll的红黑树中的一个节点。另外,在调用epoll_ctl()方法添加了文件描述符之后,会将其与相应的设备(网卡)进行关联,当设备驱动发生某个事件时,就会回调当前文件描述符的回调方法ep_poll_callback(),从而生成一个事件,并且将该事件添加到eventpoll的事件队列中。最后,当我们调用epoll_wait()方法时,就会从epoll句柄中获取对应的事件,本质上就是检查eventpoll的事件队列是否为空,如果有事件则将其返回,否则就会等待事件的发生。另外,对于epoll的使用,这里获取的事件一般都是Accept事件,而在处理这个事件的时候,会获取客户端的连接的句柄,这个句柄本质上也是一个文件描述符,此时我们则会将其继续通过epoll_ctl()方法添加到当前的epoll句柄中,以继续通过epoll_wait()方法等待其数据的读取和写入事件。

        通过这里我们可以看出,在epoll使用的过程中,会有两类文件描述符,一类是我们所监听的端口所对应的文件描述符,这类描述符我们一般监听其Accept事件,以等待客户端连接,另一类则是每个客户端连接所对应的一个文件描述符,而这里描述符我们一般监听其读写事件以接收和发送数据给客户端。

2. nginx中epoll实现方式

        在前面的文章中,我们讲解了nginx是如何初始化事件驱动框架的,其中讲到事件框架的一个核心模块的定义如下:

ngx_module_t ngx_event_core_module = {
    NGX_MODULE_V1,
    &ngx_event_core_module_ctx,            /* module context */
    ngx_event_core_commands,               /* module directives */
    NGX_EVENT_MODULE,                      /* module type */
    NULL,                                  /* init master */
    // 该方法主要是在master进程启动的过程中调用的,用于初始化时间模块
    ngx_event_module_init,                 /* init module */
    // 该方法是在各个worker进程启动之后调用的
    ngx_event_process_init,                /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    NULL,                                  /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

        这里我们需要特别注意一下ngx_event_process_init()方法,我们讲到,这个方法是在每个worker创建的时候进行初始化调用的,这里面就涉及到两个非常重要的调用:a. 进行对应的事件模型的初始化;b. 监听配置文件中指定的各个端口。如下是这两个步骤的主要代码:

static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) {
  // 省略部分代码....
  
  // 在nginx.conf配置文件的events{}配置块中需要使用use指令指定当前使用的事件模型,
  // 此时就会将所使用的事件模型的索引号存储在ecf->use中,下面的代码就是通过这种方式获取当前
  // 所指定的事件模型所对应的模块的,然后调用该模块的actions.init()方法初始化该事件模型
  for (m = 0; cycle->modules[m]; m++) {
    if (cycle->modules[m]->type != NGX_EVENT_MODULE) {
      continue;
    }

    // ecf->use存储了所选用的事件模型的模块序号,这里是找到该模块
    if (cycle->modules[m]->ctx_index != ecf->use) {
      continue;
    }

    // module即为所选用的事件模型对应的模块
    module = cycle->modules[m]->ctx;

    // 调用指定事件模型的初始化方法
    if (module->actions.init(cycle, ngx_timer_resolution) != NGX_OK) {
      exit(2);
    }

    break;
  }

  // 省略部分代码...
  
  ls = cycle->listening.elts;
  for (i = 0; i < cycle->listening.nelts; i++) {

#if (NGX_HAVE_REUSEPORT)
    if (ls[i].reuseport && ls[i].worker != ngx_worker) {
      continue;
    }
#endif

    // 这里是为当前所监听的每一个端口都绑定一个ngx_connection_t结构体
    c = ngx_get_connection(ls[i].fd, cycle->log);

    if (c == NULL) {
      return NGX_ERROR;
    }

    rev = c->read;

    // SOCK_STREAM表示TCP,一般都是TCP,也就是说在接收到客户端的accept事件之后,
    // 就会调用ngx_event_accept()方法处理该事件
    rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept : ngx_event_recvmsg;

    if ((ngx_event_flags & NGX_USE_EPOLL_EVENT) && ccf->worker_processes > 1) {
        if (ngx_add_event(rev, NGX_READ_EVENT, NGX_EXCLUSIVE_EVENT) == NGX_ERROR) {
            return NGX_ERROR;
        }

        continue;
    }
  }

  return NGX_OK;
}

        对这里的代码主要完成了如下几部分的工作:

  • 首先找到所使用的事件模型模块,然后调用其init()方法初始化该模型,这个方法里主要做了两件事,一个是通过epoll_create()方法创建一个epoll句柄,该句柄是当前worker进程运行的一个基础;另一个是为全局变量ngx_event_actions进行了赋值,即:

    // 这里将epoll相关的事件操作方法赋值给ngx_event_actions,
    // 也就是说后续有相关的事件发生则都会使用epoll相关的方法
    ngx_event_actions = ngx_epoll_module_ctx.actions;
    

    这个赋值的调用是非常重要的,在赋值之后,nginx所定义的几个方法宏就都是使用的epoll模块中所指定的方法,这里的几个宏定义如下:

    #define ngx_process_events   ngx_event_actions.process_events
    #define ngx_done_events      ngx_event_actions.done
    
    #define ngx_add_event        ngx_event_actions.add
    #define ngx_del_event        ngx_event_actions.del
    #define ngx_add_conn         ngx_event_actions.add_conn
    #define ngx_del_conn         ngx_event_actions.del_conn
    

    而这里的ngx_epoll_module_ctx.actions结构体的定义如下:

    {
      // 对应于ngx_event_actions_t中的add方法
      ngx_epoll_add_event,             /* add an event */
      // 对应于ngx_event_actions_t中的del方法
      ngx_epoll_del_event,             /* delete an event */
      // 对应于ngx_event_actions_t中的enable方法,与add方法一致
      ngx_epoll_add_event,             /* enable an event */
      // 对应于ngx_event_actions_t中的disable方法,与del方法一致
      ngx_epoll_del_event,             /* disable an event */
      // 对应于ngx_event_actions_t中的add_conn方法
      ngx_epoll_add_connection,        /* add an connection */
      // 对应于ngx_event_actions_t中的del_conn方法
      ngx_epoll_del_connection,        /* delete an connection */
      #if (NGX_HAVE_EVENTFD)
      ngx_epoll_notify,                /* trigger a notify */
      #else
      NULL,                            /* trigger a notify */
      #endif
      // 对应于ngx_event_actions_t中的process_events方法
      ngx_epoll_process_events,        /* process the events */
      // 对应于ngx_event_actions_t中的init方法
      ngx_epoll_init,                  /* init the events */
      // 对应于ngx_event_actions_t中的done方法
      ngx_epoll_done,                  /* done the events */
    }
    

    由此,就可以看出nginx出色的设计方式了,通过我们所选用的事件模型,就可以动态的为ngx_add_event()等宏指定所实现的子模块了。

  • 上面的方法完成的第二个主要的工作就是遍历所有监听的端口,获取其描述符,然后通过ngx_add_event()方法将其添加到epoll句柄中以监听其客户端连接事件。从这里就可以感觉到比较巧妙了,因为上面一步中正好对epoll模块进行了初始化,并且设置了ngx_add_event()宏的实现方法,而这里就使用到了这里设置的方法,该方法本质上就是通过epoll_ctl()方法将当前监听的socket描述符添加到epoll句柄中;

  • 最后就是上面的方法在遍历所有监听的端口的时候,为每个连接的accept事件添加的回调方法是ngx_event_accept(),通过前面我们对epoll模型的使用方式的介绍,我们大概可以理解,这里的ngx_event_accept()方法的主要作用是将当前accept到的客户端连接的句柄通过epoll_ctl()方法添加到当前epoll句柄中,以继续监听其读写事件;

        这里我们首先看一下上面第一点中介绍的module->actions.init(cycle, ngx_timer_resolution)方法调用时是如何初始化epoll模块的。由于是epoll模块,这里的init()方法指向的就是ngx_epoll_init()方法,如下是该方法的源码:

static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) {
  ngx_epoll_conf_t *epcf;

  // 获取解析得到的ngx_epoll_conf_t结构体
  epcf = ngx_event_get_conf(cycle->conf_ctx, ngx_epoll_module);

  if (ep == -1) {
    // 创建eventpoll结构体,将创建得到的文件描述符返回
    ep = epoll_create(cycle->connection_n / 2);

    // ep==-1表示创建失败
    if (ep == -1) {
      ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                    "epoll_create() failed");
      return NGX_ERROR;
    }
  }

  // 如果nevents小于epcf->events,说明event_list数组的长度不够,因而需要重新申请内存空间
  if (nevents < epcf->events) {
    if (event_list) {
      ngx_free(event_list);
    }

    // 为event_list重新申请内存空间
    event_list = ngx_alloc(sizeof(struct epoll_event) * epcf->events, cycle->log);
    if (event_list == NULL) {
      return NGX_ERROR;
    }
  }

  // 将nevents更新为配置文件中指定的大小
  nevents = epcf->events;

  ngx_io = ngx_os_io;

  // 这里将epoll相关的事件操作方法赋值给ngx_event_actions,也就是说后续有相关的事件发生则
  // 都会使用epoll相关的方法
  ngx_event_actions = ngx_epoll_module_ctx.actions;

  // 这里NGX_USE_CLEAR_EVENT指的是使用ET模式来使用epoll,默认使用ET模式,
  // 而NGX_USE_LEVEL_EVENT表示使用LE模式来使用epoll
#if (NGX_HAVE_CLEAR_EVENT)
  ngx_event_flags = NGX_USE_CLEAR_EVENT
                    #else
                    ngx_event_flags = NGX_USE_LEVEL_EVENT
                    #endif
                        // NGX_USE_GREEDY_EVENT表示每次拉取事件是都尝试拉取最多的事件
                    | NGX_USE_GREEDY_EVENT
                    | NGX_USE_EPOLL_EVENT;

  return NGX_OK;
}

        可以看到,这里的ngx_epoll_init()方法主要的作用有两个:a. 通过epoll_create()方法创建一个epoll句柄;b. 设置ngx_event_actions属性所指向的方法的实现,从而确定ngx_add_event()等宏的实现方法。下面我们来看一下ngx_add_event()是如何将需要监听的文件描述符添加到epoll句柄中的:

static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags) {
  int op;
  uint32_t events, prev;
  ngx_event_t *e;
  ngx_connection_t *c;
  struct epoll_event ee;

  // ev->data在使用的过程中存储的是当前对应的ngx_connection_t,如果是free_connection,
  // 则存储的是下一个节点的指针
  c = ev->data;

  // 事件类型
  events = (uint32_t) event;

  // 如果是读事件
  if (event == NGX_READ_EVENT) {
    e = c->write;
    prev = EPOLLOUT;
#if (NGX_READ_EVENT != EPOLLIN | EPOLLRDHUP)
    events = EPOLLIN | EPOLLRDHUP;  // 设置读事件类型
#endif

  } else {
    e = c->read;
    prev = EPOLLIN | EPOLLRDHUP;
#if (NGX_WRITE_EVENT != EPOLLOUT)
    events = EPOLLOUT;  // 设置写事件类型
#endif
  }

  // 根据active标志位确定是否为活跃事件,以决定到底是修改还是添加事件
  if (e->active) {
    op = EPOLL_CTL_MOD; // 类型为修改事件
    events |= prev;

  } else {
    op = EPOLL_CTL_ADD; // 类型为添加事件
  }

#if (NGX_HAVE_EPOLLEXCLUSIVE && NGX_HAVE_EPOLLRDHUP)
  if (flags & NGX_EXCLUSIVE_EVENT) {
      events &= ~EPOLLRDHUP;
  }
#endif

  // 将flags参数指定的事件添加到监听列表中
  ee.events = events | (uint32_t) flags;
  // 这里是将connection指针的最后一位赋值为ev->instance,然后将其赋值给事件的ptr属性,通过这种方式检测事件是否过期
  ee.data.ptr = (void *) ((uintptr_t) c | ev->instance);

  ngx_log_debug3(NGX_LOG_DEBUG_EVENT, ev->log, 0,
                 "epoll add event: fd:%d op:%d ev:%08XD",
                 c->fd, op, ee.events);

  // 将事件添加到epoll句柄中
  if (epoll_ctl(ep, op, c->fd, &ee) == -1) {
    ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
                  "epoll_ctl(%d, %d) failed", op, c->fd);
    return NGX_ERROR;
  }

  // 将事件标记为活跃状态
  ev->active = 1;
#if 0
  ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
#endif

  return NGX_OK;
}

        这里的ngx_add_event()方法本质上是比较简单的,就是将当前的ngx_event_t转换为一个epoll_event结构体,并且会设置该结构体中需要监听的事件类型,然后通过epoll_ctl()方法将当前epoll_event添加到epoll句柄中。

        在前面的ngx_event_process_init()方法中,nginx通过ngx_add_event()方法将各个监听的端口的描述符添加到epoll句柄中之后,就会开始监听这些描述符上的accept连接事件,如果有客户端连接请求,此时就会回调ngx_event_accept()方法处理该请求,我们来看一下该方法是如何处理客户端建立连接的请求的:

/**
 * 当客户端有accept事件到达时,将调用此方法处理该事件
 */
void ngx_event_accept(ngx_event_t *ev) {
  socklen_t socklen;
  ngx_err_t err;
  ngx_log_t *log;
  ngx_uint_t level;
  ngx_socket_t s;
  ngx_event_t *rev, *wev;
  ngx_sockaddr_t sa;
  ngx_listening_t *ls;
  ngx_connection_t *c, *lc;
  ngx_event_conf_t *ecf;
#if (NGX_HAVE_ACCEPT4)
  static ngx_uint_t  use_accept4 = 1;
#endif

  if (ev->timedout) {
    // 如果当前事件超时了,则继续将其添加到epoll句柄中以监听accept事件
    if (ngx_enable_accept_events((ngx_cycle_t *) ngx_cycle) != NGX_OK) {
      return;
    }

    ev->timedout = 0;
  }

  // 获取解析event核心配置结构体
  ecf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_event_core_module);

  if (!(ngx_event_flags & NGX_USE_KQUEUE_EVENT)) {
    ev->available = ecf->multi_accept;
  }

  lc = ev->data;
  ls = lc->listening;
  ev->ready = 0;

  do {
    socklen = sizeof(ngx_sockaddr_t);

#if (NGX_HAVE_ACCEPT4)
    if (use_accept4) {
        s = accept4(lc->fd, &sa.sockaddr, &socklen, SOCK_NONBLOCK);
    } else {
        s = accept(lc->fd, &sa.sockaddr, &socklen);
    }
#else
    // 这里lc->fd指向的是监听的文件句柄,调用accept()获取客户端的连接,并且将其存储到sa.sockaddr中
    s = accept(lc->fd, &sa.sockaddr, &socklen);
#endif

    // 检查当前进程获取的连接个数是否超过了最大可用连接数的7/8,是则不再继续接收连接
    ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

    // 获取新的连接
    c = ngx_get_connection(s, ev->log);

    // 获取连接失败则直接返回
    if (c == NULL) {
      if (ngx_close_socket(s) == -1) {
        ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                      ngx_close_socket_n
                          " failed");
      }

      return;
    }

    // 标记当前为TCP连接
    c->type = SOCK_STREAM;

    // 为当前连接创建连接池
    c->pool = ngx_create_pool(ls->pool_size, ev->log);
    if (c->pool == NULL) {
      ngx_close_accepted_connection(c);
      return;
    }

    // 更新socklen的长度
    if (socklen > (socklen_t) sizeof(ngx_sockaddr_t)) {
      socklen = sizeof(ngx_sockaddr_t);
    }

    // 为sockaddr申请内存空间,并且将客户端连接地址复制到c->sockaddr中
    c->sockaddr = ngx_palloc(c->pool, socklen);
    if (c->sockaddr == NULL) {
      ngx_close_accepted_connection(c);
      return;
    }

    ngx_memcpy(c->sockaddr, &sa, socklen);

    // 申请ngx_log_t结构体的内存空间
    log = ngx_palloc(c->pool, sizeof(ngx_log_t));
    if (log == NULL) {
      ngx_close_accepted_connection(c);
      return;
    }

    /* set a blocking mode for iocp and non-blocking mode for others */

    if (ngx_inherited_nonblocking) {
      if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
        // 将连接设置为阻塞模式
        if (ngx_blocking(s) == -1) {
          ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                        ngx_blocking_n
                            " failed");
          ngx_close_accepted_connection(c);
          return;
        }
      }

    } else {
      if (!(ngx_event_flags & NGX_USE_IOCP_EVENT)) {
        // 将连接设置为非阻塞模式
        if (ngx_nonblocking(s) == -1) {
          ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_socket_errno,
                        ngx_nonblocking_n
                            " failed");
          ngx_close_accepted_connection(c);
          return;
        }
      }
    }

    *log = ls->log;

    // 设置连接的基本属性
    c->recv = ngx_recv;
    c->send = ngx_send;
    c->recv_chain = ngx_recv_chain;
    c->send_chain = ngx_send_chain;

    c->log = log;
    c->pool->log = log;

    c->socklen = socklen;
    c->listening = ls;
    c->local_sockaddr = ls->sockaddr;
    c->local_socklen = ls->socklen;

#if (NGX_HAVE_UNIX_DOMAIN)
    if (c->sockaddr->sa_family == AF_UNIX) {
      c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
      c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
#if (NGX_SOLARIS)
      /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
      c->sendfile = 0;
#endif
    }
#endif

    rev = c->read;
    wev = c->write;

    wev->ready = 1;

    if (ngx_event_flags & NGX_USE_IOCP_EVENT) {
      rev->ready = 1;
    }

    if (ev->deferred_accept) {
      rev->ready = 1;
#if (NGX_HAVE_KQUEUE || NGX_HAVE_EPOLLRDHUP)
      rev->available = 1;
#endif
    }

    rev->log = log;
    wev->log = log;

    // 更新连接使用次数
    c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);

    // 将网络地址更新为字符串形式的地址
    if (ls->addr_ntop) {
      c->addr_text.data = ngx_pnalloc(c->pool, ls->addr_text_max_len);
      if (c->addr_text.data == NULL) {
        ngx_close_accepted_connection(c);
        return;
      }

      c->addr_text.len = ngx_sock_ntop(c->sockaddr, c->socklen,
                                       c->addr_text.data,
                                       ls->addr_text_max_len, 0);
      if (c->addr_text.len == 0) {
        ngx_close_accepted_connection(c);
        return;
      }
    }

#if (NGX_DEBUG)
    {
    ngx_str_t  addr;
    u_char     text[NGX_SOCKADDR_STRLEN];

    ngx_debug_accepted_connection(ecf, c);

    if (log->log_level & NGX_LOG_DEBUG_EVENT) {
        addr.data = text;
        addr.len = ngx_sock_ntop(c->sockaddr, c->socklen, text,
                                 NGX_SOCKADDR_STRLEN, 1);

        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, log, 0,
                       "*%uA accept: %V fd:%d", c->number, &addr, s);
    }

    }
#endif

    // 将当前连接添加到epoll句柄中进行监控
    if (ngx_add_conn && (ngx_event_flags & NGX_USE_EPOLL_EVENT) == 0) {
      if (ngx_add_conn(c) == NGX_ERROR) {
        ngx_close_accepted_connection(c);
        return;
      }
    }

    log->data = NULL;
    log->handler = NULL;

    // 建立新连接之后的回调方法
    ls->handler(c);

    if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
      ev->available--;
    }

  } while (ev->available);
}

        这里客户端连接的建立过程主要可以分为如下几个步骤:

  • 首先调用accept()方法获取当前客户端建立的连接,并且将其地址信息保存到结构体sa中;
  • 接着通过调用ngx_get_connection()方法获取一个ngx_connection_t结构体以对应当前获取到的客户端连接,并且会初始化该结构体的各个属性;
  • 调用ngx_add_conn()方法将当前方法添加到epoll句柄中,这里的添加过程本质上就是通过epoll_ctl()方法将当前客户端的连接的文件描述符添加到epoll句柄中,以监听其读写事件;

        如此我们就讲解了从epoll句柄的创建,到指定的端口的监听,接着处理客户端连接,并且将客户端连接对应的文件描述符继续添加到epoll句柄中以监听读写事件的流程。下面我们继续来看一下nginx是如何等待所监听的这些句柄上的事件的发生的,也即整个事件框架的驱动程序。worker进程对于事件的处理,主要在ngx_process_events_and_timers()方法中,如下是该方法的源码:

void ngx_process_events_and_timers(ngx_cycle_t *cycle) {
	// 尝试获取共享锁
  if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
    return;
  }

  // 这里开始处理事件,对于kqueue模型,其指向的是ngx_kqueue_process_events()方法,
  // 而对于epoll模型,其指向的是ngx_epoll_process_events()方法
  // 这个方法的主要作用是,在对应的事件模型中获取事件列表,然后将事件添加到ngx_posted_accept_events
  // 队列或者ngx_posted_events队列中
  (void) ngx_process_events(cycle, timer, flags);

  // 这里开始处理accept事件,将其交由ngx_event_accept.c的ngx_event_accept()方法处理;
  ngx_event_process_posted(cycle, &ngx_posted_accept_events);

  // 开始释放锁
  if (ngx_accept_mutex_held) {
    ngx_shmtx_unlock(&ngx_accept_mutex);
  }

  // 如果不需要在事件队列中进行处理,则直接处理该事件
  // 对于事件的处理,如果是accept事件,则将其交由ngx_event_accept.c的ngx_event_accept()方法处理;
  // 如果是读事件,则将其交由ngx_http_request.c的ngx_http_wait_request_handler()方法处理;
  // 对于处理完成的事件,最后会交由ngx_http_request.c的ngx_http_keepalive_handler()方法处理。

  // 这里开始处理除accept事件外的其他事件
  ngx_event_process_posted(cycle, &ngx_posted_events);
}

        这里的ngx_process_events_and_timers()方法我们省略了大部分代码,只留下了主要的流程。简而言之,其主要实现了如下几个步骤的工作:

  • 获取共享锁,以得到获取客户端连接的权限;
  • 调用ngx_process_events()方法监听epoll句柄中各个文件描述符的事件,并且处理这些事件。在前面我们讲到,nginx在调用epoll模块的init()方法时,初始化了ngx_event_actions属性的值,将其指向了epoll模块所实现的方法,这里就包括ngx_process_events方法宏所对应的方法,也即ngx_epoll_process_events()方法,因而这里其实就可以理解,ngx_epoll_process_events()方法本质上就是调用epoll_wait()方法等待epoll句柄上所监听的事件的发生;
  • 处理ngx_posted_accept_events队列中的事件,这些事件其实就是前面讲到的客户端建立连接的事件,在ngx_epoll_process_events()方法中获取到事件之后,会判断其是accept事件还是读写事件,如果是accept事件,就会将其添加到ngx_posted_accept_events队列中,如果是读写事件,就会将其添加到ngx_posted_events队列中;
  • 释放共享锁,以让其他的worker进程可以获取锁,从而接收客户端连接;
  • 处理ngx_posted_events队列中的事件,也即客户端连接的读写事件。从这里就可以看出nginx高性能的一个原因,其将accept事件和读写事件放到了两个不同的队列中,accept事件是必须在锁内部处理的,而读写事件则可以异步于accept事件,这提高了nginx处理客户端请求的能力。

        下面我们来看一下ngx_epoll_process_events()方法是如何处理epoll句柄中的事件的:

static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) {
  int events;
  uint32_t revents;
  ngx_int_t instance, i;
  ngx_uint_t level;
  ngx_err_t err;
  ngx_event_t *rev, *wev;
  ngx_queue_t *queue;
  ngx_connection_t *c;

  /* NGX_TIMER_INFINITE == INFTIM */

  ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                 "epoll timer: %M", timer);

  // 通过epoll_wait()方法进行事件的获取,获取到的事件将存放在event_list中,并且会将获取的事件个数返回
  events = epoll_wait(ep, event_list, (int) nevents, timer);

  err = (events == -1) ? ngx_errno : 0;

  // 这里的ngx_event_timer_alarm是通过一个定时器任务来触发的,在定时器中会将其置为1,
  // 从而实现定期更新nginx缓存的时间的目的
  if (flags & NGX_UPDATE_TIME || ngx_event_timer_alarm) {
    ngx_time_update();
  }

  if (err) {
    if (err == NGX_EINTR) {

      if (ngx_event_timer_alarm) {
        ngx_event_timer_alarm = 0;
        return NGX_OK;
      }

      level = NGX_LOG_INFO;

    } else {
      level = NGX_LOG_ALERT;
    }

    ngx_log_error(level, cycle->log, err, "epoll_wait() failed");
    return NGX_ERROR;
  }

  // 获取的事件个数为0
  if (events == 0) {
    // 如果当前时间类型不为NGX_TIMER_INFINITE,说明获取事件超时了,则直接返回
    if (timer != NGX_TIMER_INFINITE) {
      return NGX_OK;
    }

    // 这里说明时间类型为NGX_TIMER_INFINITE,但是却返回了0个事件,说明epoll_wait()调用出现了问题
    ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                  "epoll_wait() returned no events without timeout");
    return NGX_ERROR;
  }

  // 遍历各个事件
  for (i = 0; i < events; i++) {
    // 每个事件的data.ptr中存储了当前事件对应的connection对象
    c = event_list[i].data.ptr;

    // 获取事件中存储的instance的值
    instance = (uintptr_t) c & 1;
    // 获取connection指针地址值
    c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);

    // 获取读事件结构体
    rev = c->read;

    // 如果当前连接的文件描述符为-1,获取其instance不等于当前事件的instance,
    // 说明该连接已经过期了,则不对该事件进行处理
    if (c->fd == -1 || rev->instance != instance) {

      /*
       * the stale event from a file descriptor
       * that was just closed in this iteration
       */

      ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                     "epoll: stale event %p", c);
      continue;
    }

    // 获取当前事件监听的类型
    revents = event_list[i].events;

    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "epoll: fd:%d ev:%04XD d:%p",
                   c->fd, revents, event_list[i].data.ptr);

    // 如果事件发生错误,则打印相应的日志
    if (revents & (EPOLLERR | EPOLLHUP)) {
      ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                     "epoll_wait() error on fd:%d ev:%04XD",
                     c->fd, revents);

      /*
       * if the error events were returned, add EPOLLIN and EPOLLOUT
       * to handle the events at least in one active handler
       */

      revents |= EPOLLIN | EPOLLOUT;
    }

#if 0
    if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
        ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                      "strange epoll_wait() events fd:%d ev:%04XD",
                      c->fd, revents);
    }
#endif

    // 如果当前是读事件,并且事件是活跃的
    if ((revents & EPOLLIN) && rev->active) {

#if (NGX_HAVE_EPOLLRDHUP)
      if (revents & EPOLLRDHUP) {
          rev->pending_eof = 1;
      }

      rev->available = 1;
#endif

      // 将事件标记为就绪状态
      rev->ready = 1;

      // 默认是开启了NGX_POST_EVENTS开关的
      if (flags & NGX_POST_EVENTS) {
        // 如果当前是accept事件,则将其添加到ngx_posted_accept_events队列中,
        // 如果是读写事件,则将其添加到ngx_posted_events队列中
        queue = rev->accept ? &ngx_posted_accept_events
                            : &ngx_posted_events;

        ngx_post_event(rev, queue);

      } else {
        // 如果不需要分离accept和读写事件,则直接处理该事件
        rev->handler(rev);
      }
    }

    // 获取写事件结构体
    wev = c->write;

    if ((revents & EPOLLOUT) && wev->active) {

      // 如果当前连接的文件描述符为-1,获取其instance不等于当前事件的instance,
      // 说明该连接已经过期了,则不对该事件进行处理
      if (c->fd == -1 || wev->instance != instance) {

        /*
         * the stale event from a file descriptor
         * that was just closed in this iteration
         */

        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "epoll: stale event %p", c);
        continue;
      }

      // 将当前事件标记为就绪状态
      wev->ready = 1;
#if (NGX_THREADS)
      wev->complete = 1;
#endif

      // 由于是写事件,并且需要标记为了NGX_POST_EVENTS状态,
      // 因而将其直接添加到ngx_posted_events队列中,否则直接处理该事件
      if (flags & NGX_POST_EVENTS) {
        ngx_post_event(wev, &ngx_posted_events);

      } else {
        wev->handler(wev);
      }
    }
  }

  return NGX_OK;
}

        这里ngx_epoll_process_events()方法首先就是调用epoll_wait()方法获取所监听的句柄的事件,然后遍历获取的事件,根据事件的类型,如果是accept事件,则添加到ngx_posted_accept_events队列中,如果是读写事件,则添加到ngx_posted_events队列中,而队列中事件的处理,则在上面介绍的ngx_process_events_and_timers()方法中进行。

4. 小结

        本文首先对epoll模型的实现原理进行了讲解,然后从源码的层面对nginx是如何基于epoll模型实现事件驱动模式的原理进行了讲解。

5. 广告

       读者朋友如果觉得本文还不错,可以点击下面的广告链接,这可以为作者带来一定的收入,从而激励作者创作更好的文章,非常感谢!

在项目开发过程中,企业会有很多的任务、需求、缺陷等需要进行管理,CORNERSTONE 提供敏捷、任务、需求、缺陷、测试管理、WIKI、共享文件和日历等功能模块,帮助企业完成团队协作和敏捷开发中的项目管理需求;更有甘特图、看板、思维导图、燃尽图等多维度视图,帮助企业全面把控项目情况。

© 著作权归作者所有

爱宝贝丶

爱宝贝丶

粉丝 385
博文 154
码字总数 556915
作品 0
武汉
程序员
私信 提问
加载中

评论(8)

开源中国首席罗纳尔多
开源中国首席罗纳尔多
有跟tomcat开线程的方式对比吗?
爱宝贝丶
爱宝贝丶 博主
https://my.oschina.net/zhangxufeng/blog/3151282 这篇文章介绍nginx惊群问题,虽然现在已经在内核层面解决了,但是通过这个就可以看出来Tomcat(包括netty)与nginx使用epoll模型的异同了。单纯从请求接收方面来讲,Tomcat采用的是一个单线程accept客户端请求,然后将请求的read和write事件分发到其他的线程上,这也正是Reactor模型的处理模式,但是read和write事件的处理与用户业务流程的处理都是使用的同一个线程,如果业务处理较慢,就会到这这个本来只需要专门处理read和write事件的线程被业务代码阻塞了。对于nginx而言,首先其是使用多个worker进程(每个进程实际上也只维护了一个线程)来监听客户端请求的accept事件,这在并发度上要比Tomcat的单线程处理accept事件要强。而且nginx在转发请求到上游服务器的时候,也是使用的异步事件的方式,也就是说上游服务器在处理请求的时候nginx的worker进程还是可以继续处理其他的事件的,这种方式的效率比Tomcat要强很多。
m
middle_wind
你多看看就了解了,不过我感觉最难的还是配置文件解析那部分,至今未看懂
爱宝贝丶
爱宝贝丶 博主
配置文件解析,其实也比较好理解,多看看源码,nginx的配置文件的解析分为三种方式:file、token和block。所谓的file指的是当前需要解析的是一个文件,初始状态解析的时候就是file。然后在解析的过程中,nginx每次都只会解析一个token,所谓的token指的是以一个分号结尾的完整一句配置,根据这个这个token的第一个配置名称,nginx会查找所有的ngx_command_t结构体,判断这个结构体的name属性值是不是与解析出来的这个token的名称相同(当然,还有其他的诸如类型的判断),如果相同,就说明这个结构体是用于解析这个token的。这个时候就会调用这个结构体的set属性所指定的方法解析当前token。所谓的block解析,就是解析的配置块,比如http{}配置块。同理,http这个名称也可以理解为一个名称,nginx会找到与这个名称对应的ngx_command_t结构体。然后调用这个结构体的set方法解析这个配置块,而在解析的过程中,对于http配置块的子配置项,则会在这个set方法里进行递归的调用。
爱宝贝丶
爱宝贝丶 博主
可能你感觉比较难理解的是nginx解析各个配置之后各个结构体的组织方式,尤其是http模块的结构体的嵌套方式,这部分确实比较难理解,这一块的博文我已经写了,会在后续进行讲解。
emn星宿
emn星宿
可能我刚加入开源吧,不熟悉
爱宝贝丶
爱宝贝丶 博主
如果不是c或c++开发人员,对于nginx源码的理解会相对吃力一些,不过这个不用太担心,如果能够坚持下来,nginx源码理解起来还是非常有趣的。建议先学习一下nginx的基本使用方式,然后看一下《深入理解nginx》这本书,在看的过程中也要一边看源码,并且调试源码,不然单纯只是看书可能会有种云里雾里的感觉。
emn星宿
emn星宿
我一脸懵逼的看完
事件驱动模型

事件驱动模型一般是由事件收集器、事件发送器和事件处理器三部分组成基本单元组成。 一、select库   select库是各个版本的linux和windows平台都支持的基本事件驱动模型库,并且在接口的定义...

豆芽菜橙
2017/08/28
0
0
【充电】《Nginx核心知识100讲》信号、nginx事件、同步&异步、阻塞&非阻塞

极客专栏《Nginx核心知识100讲》20-32小节的笔记 nginx 请求处理流程 1.nginx请求处理流程 传输层状态机:处理TCP、UDP HTTP状态机:应用层 MAIL状态机:处理邮件 为什么用状态机? 是因为使...

言十年
2018/12/28
0
0
nginx的io复用、阻塞非阻塞、同步非同步、apache与nginx

摘抄自博客园rikewang博客,方便自己查找阅读!!!! http://www.cnblogs.com/wxl-dede/p/5134636.html 同步异步,阻塞非阻塞 和nginx的IO模型 同步与异步 同步和异步关注的是消息通信机制 ...

AELY木
2017/07/23
0
0
同步异步 ——Nginx与Apache

同步异步,阻塞非阻塞 和nginx的IO模型 同步与异步 同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication)。所谓同步,就是在发出一个调用时,在没有得到...

wujunqi1996
2018/06/28
0
0
Nginx为什么比Apache Httpd高效:原理篇

Nginx才短短几年,就拿下了web服务器大笔江山,众所周知,Nginx在处理大并发静态请求方面,效率明显高于httpd,甚至能轻松解决C10K问题。下面我们就来聊聊Web服务器背后的一些原理。 一、进程...

小杨_Ivan
2017/02/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

laravel 多条件查询 闭包写法

laravel 多条件查询 闭包写法 直接上代码 1: 比如我要查询 符合条件的 部门和 用户 DB::table('user')->where('user_id',20)->whereIn('d_id',[82,83])->get(); 可以使用下面这种方法替换 ......

李佳顺
22分钟前
21
0
springboot实现热部署

一、前言 在实际开发过程中,每次修改代码就得将项目重启,重新部署,对于一些大型应用来说,重启时间需要花费大量的时间成本。对于一个后端开发者来说,重启过程确实很难受。在java开发领域...

素小暖OSC
22分钟前
98
0
为什么要选择开源的直播源码开发直播系统?

相信大家在购买直播源码的过程中,肯定都会咨询过是否开源这个问题。对于懂技术的人来说,开源的意思非常好理解,而对于不懂技术的人来说,开源可能是个非常难以理解的词汇。在这里跟大家简单...

图玩智能科技
24分钟前
26
0
真的在Windows中杀死一个进程

偶尔,Windows机器上的程序会发疯,只是挂起。 所以我将调用任务管理器并点击“结束进程”按钮。 但是,这并不总是有效; 如果我尝试了足够多次,那么它通常会最终死亡,但我真的希望能够立即...

技术盛宴
29分钟前
56
0
使用低代码平台 - 危险的赌注

低代码应用平台(LCAP - low code application platforms)在多样、复杂的现代软件开发情势下应运而生。依据Gartner(高德纳,全球最具权威的IT研究与顾问咨询公司)的数据,Mendix 是这方面...

CUBAChinaTeam
30分钟前
46
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部