select, poll, epoll的实现分析

2018/03/06 19:48
阅读数 15

  select, poll, epoll都是Linux上的IO多路复用机制.知其然知其所以然,为了更好地理解其底层实现,这几天我阅读了这三个系统调用的源码.

  以下源代码摘自Linux4.4.0内核.

 

  预备知识

  在了解IO多路复用技术之前,首先需要了解Linux内核的3个方面.

  1.等待队列waitqueue

  等待队列(@ include/linux/wait.h)的队列头(wait_queue_head_t)往往是资源生产者,队列成员(wait_queue_t)往往是资源消费者.当队列头的资源ready后,内核会逐个执行每个队列成员指定的回调函数,来通知它们该资源已经ready了.

  2.内核的poll机制

  被poll的fd,必须在实现上支持内核的poll技术,比如fd是某个字符设备,或者是一个socket,它必须实现file_operation中的poll操作,这个操作会给该fd分配一个等待队列头.主动poll该fd的进程必须分配一个等待队列成员,并将其添加到该fd的等待队列中去,同时指定该资源ready时的回调函数.拿socket举例,它必须实现一个poll操作,该操作是由发起轮询者(即监听它的进程)主动调用的.这个poll操作必须调用poll_wait(),后者会将发起轮询者作为等待队列成员添加到该socket的等待队列中.当该socket发生状态改变时,就会通过队列头逐个通知所有监听它的进程.

  3.epollfd

  epollfd其实质就是一个fd.

 

  select

  1.源码分析

// @ fs/select.c
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
		fd_set __user *, exp, struct timeval __user *, tvp)
{
	struct timespec end_time, *to = NULL;
	struct timeval tv;
	int ret;
	if (tvp) { // 如果timeout值不为NULL
		if (copy_from_user(&tv, tvp, sizeof(tv))) // 则把超时值从用户空间拷贝到内核空间.
			return -EFAULT;
		to = &end_time;
		// 计算timespec格式的未来timeout时间.
		if (poll_select_set_timeout(to,
				tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
				(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
			return -EINVAL;
	}
	ret = core_sys_select(n, inp, outp, exp, to); // 关键函数
	// poll_select_copy_remaining函数的作用是:
	// 如果有超时值,则把距离超时时间所剩的时间,从内核空间拷贝到用户空间.
	ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
	return ret;
}
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
			   fd_set __user *exp, struct timespec *end_time)
{
	fd_set_bits fds;
	/**
	 @ include/linux/poll.h
	 typedef struct {
		unsigned long *in, *out, *ex;
		unsigned long *res_in, *res_out, *res_ex;
	} fd_set_bits;
	**/
	// 该结构体内部定义的都是指针,指向描述符集合.
	void *bits;
	int ret, max_fds;
	unsigned int size;
	struct fdtable *fdt;
	/**
	 @ include/linux/fdtable.h
	 struct fdtable {
	     unsigned int max_fds;
		 struct file __rcu **fd;      // current fd array
		 unsigned long *close_on_exec;
		 unsigned long *open_fds;
		 unsigned long *full_fds_bits;
		 struct rcu_head rcu;
	 };
	 **/
	/* Allocate small arguments on the stack to save memory and be faster */
	long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];
	/**
	 @ include/linux/poll.h
	 #define FRONTEND_STACK_ALLOC	256
	 #define SELECT_STACK_ALLOC	FRONTEND_STACK_ALLOC
	 **/
	 // 256 / 8 = 32
	 // 预先在stack中分配小部分的空间,以节省内存且更加快速
	ret = -EINVAL;
	if (n < 0)
		goto out_nofds;
	/* max_fds can increase, so grab it once to avoid race */
	rcu_read_lock();
	fdt = files_fdtable(current->files); // 获取当前进程的文件描述符表
	max_fds = fdt->max_fds;
	rcu_read_unlock();
	if (n > max_fds)  // 如果传入的n大于当前进程最大的文件描述符,则修改之.
		n = max_fds;
	/*
	 * We need 6 bitmaps (in/out/ex for both incoming and outgoing),
	 * since we used fdset we need to allocate memory in units of
	 * long-words.
	 */
	 /**
	  需要分配6个bitmap,
	  用户传入的in, out, ex,
	  以及返回给用户的res_in, res_out, res_ex.
	  **/
	size = FDS_BYTES(n); // 以一个描述符占一个bit来计算,求出传入的描述符需要多少个byte(以long为单位分配byte)。
	bits = stack_fds;
	if (size > sizeof(stack_fds) / 6) {
		// 因为一共有6个bitmap, 所以除以6,求得每个bitmap所占的byte个数,用以和size比较。
		/* Not enough space in on-stack array; must use kmalloc */
		ret = -ENOMEM;
		bits = kmalloc(6 * size, GFP_KERNEL); // 如果预先在stack分配的空间太小,则在heap上申请内存。
		if (!bits)
			goto out_nofds;
	}
	// fds结构体的妙用
	fds.in      = bits;
	fds.out     = bits +   size;
	fds.ex      = bits + 2*size;
	fds.res_in  = bits + 3*size;
	fds.res_out = bits + 4*size;
	fds.res_ex  = bits + 5*size;
	/**
	 @ include/linux/poll.h
	 static inline
	 int get_fd_set(unsigned long nr, void __user *ufdset, unsigned long *fdset)
	 {
	 	nr = FDS_BYTES(nr);
		if (ufdset)
		return copy_from_user(fdset, ufdset, nr) ? -EFAULT : 0;
		memset(fdset, 0, nr);
		return 0;
	}
	 get_fd_set的作用是调用copy_from_user函数,
	 把用户传入的fd_set从用户空间拷贝到内核空间。
	 **/
	if ((ret = get_fd_set(n, inp, fds.in)) ||
	    (ret = get_fd_set(n, outp, fds.out)) ||
	    (ret = get_fd_set(n, exp, fds.ex)))
		goto out;
	// 对返回给用户的fd_set初始化为零
	zero_fd_set(n, fds.res_in);
	zero_fd_set(n, fds.res_out);
	zero_fd_set(n, fds.res_ex);
	ret = do_select(n, &fds, end_time); // 关键函数,完成主要的工作。
	if (ret < 0) // 出错
		goto out;
	if (!ret) { // 无文件设备就绪,但超时或有未决信号。
		ret = -ERESTARTNOHAND;
		if (signal_pending(current))
			goto out;
		ret = 0;
	}
	// 把结果集合拷贝到用户空间.
	if (set_fd_set(n, inp, fds.res_in) ||
	    set_fd_set(n, outp, fds.res_out) ||
	    set_fd_set(n, exp, fds.res_ex))
		ret = -EFAULT;
out:
	if (bits != stack_fds) // 如果申请了heap上的内存,则释放之.
		kfree(bits);
out_nofds:
	return ret;
}
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
	ktime_t expire, *to = NULL;
	struct poll_wqueues table;
	/**
	 @ inclue/linux/poll.h
	 struct poll_wqueues {
	 	poll_table pt;
		struct poll_table_page *table;
		struct task_struct *polling_task;
		int triggered;
		int error;
		int inline_index;
		struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
	};
	**/
	poll_table *wait;
	/**
	 @ inclue/linux/poll.h
	 typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
	 typedef struct poll_table_struct {
		poll_queue_proc _qproc;
		unsigned long _key;
	} poll_table;
	**/
	int retval, i, timed_out = 0;
	unsigned long slack = 0;
	unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0; // TODO
	unsigned long busy_end = 0;
	rcu_read_lock(); // TODO
	retval = max_select_fd(n, fds); // 检查fds中fd的有效性(即要求打开),并获得当前最大的fd.
	rcu_read_unlock();
	if (retval < 0)
		return retval;
	n = retval;
	// 初始化poll_wqueues对象table,
	// 包括初始化poll_wqueues.poll_table._qproc函数指针为__pollwait.
	poll_initwait(&table);
	/**
	 @ fs/select.c
	 void poll_initwait(struct poll_wqueues *pwq)
	 {
		init_poll_funcptr(&pwq->pt, __pollwait);
		pwq->polling_task = current;
		pwq->triggered = 0;
		pwq->error = 0;
		pwq->table = NULL;
		pwq->inline_index = 0;
	}
	**/
	wait = &table.pt;
	// 如果用户传入的timeout不为NULL,但设定的时间为0,
	// 则设置poll_table._qproc函数指针为NULL,
	// 并设置timed_out = 1.
	if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
		wait->_qproc = NULL;
		timed_out = 1;
	}
	// 如果用户传入的timeout不为NULL,且设定的timeout时间不为0,
	// 则转换timeout时间.
	if (end_time && !timed_out)
		slack = select_estimate_accuracy(end_time);
	retval = 0;
	for (;;) {
		unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
		bool can_busy_loop = false;
		inp = fds->in; outp = fds->out; exp = fds->ex;
		rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
			unsigned long in, out, ex, all_bits, bit = 1, mask, j;
			unsigned long res_in = 0, res_out = 0, res_ex = 0;
			// 将in, out, exception进行位或运算,得到all_bits.
			in = *inp++; out = *outp++; ex = *exp++;
			all_bits = in | out | ex;
			// 如果这BITS_PER_LONG个描述符不需要被监听,
			// 则continue到下一个32个描述符中的循环.
			if (all_bits == 0) {
				i += BITS_PER_LONG;
				continue;
			}
			// 本次这BITS_PER_LONG个描述符中有需要监听的.
			for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {
				struct fd f;
				if (i >= n) // 检测i是否超出了待监听的最大描述符.
					break;
				// 每次循环后bit左移一位,用来跳过不需要被监听的描述符.
				if (!(bit & all_bits))
					continue;
				f = fdget(i); // 获取file结构,并增加其引用计数.
				if (f.file) {
					const struct file_operations *f_op;
					f_op = f.file->f_op;
					mask = DEFAULT_POLLMASK;
					if (f_op->poll) {
						wait_key_set(wait, in, out,  // 设置当前描述符待监听的事件掩码.
							     bit, busy_flag);
						mask = (*f_op->poll)(f.file, wait);
						// f_op->poll函数执行如下:
						// 1, 调用poll_wait函数(在include/linux/poll.h);
						// 2, 检测当前描述符所对应的文件设备的状态,并返回状态掩码mask.
						// poll_wait函数的定义如下:
						/**
						 @ include/linux/poll.h
						 static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
						 {
						 	if (p && p->_qproc && wait_address)
							p->_qproc(filp, wait_address, p);
						}
						**/
						// 而由上文的poll_initwait函数可知,p->_qproc指向__pollwait函数.
						// __pollwait函数的定义如下:
						/**
						 @ fs/select.c
						 // __pollwait函数的作用是把当前进程添加到当前文件设备的等待队列中.
						 static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
						 {
						 	struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
							struct poll_table_entry *entry = poll_get_entry(pwq);
							if (!entry)
								return;
							entry->filp = get_file(filp);
							entry->wait_address = wait_address;
							entry->key = p->_key;
							init_waitqueue_func_entry(&entry->wait, pollwake);
							entry->wait.private = pwq;
							add_wait_queue(wait_address, &entry->wait);
						}
						**/
					}
					fdput(f); // 释放file结构指针,实质是减少其引用计数.
					// mask是执行f_op->poll函数后所返回的文件设备状态掩码.
					if ((mask & POLLIN_SET) && (in & bit)) {
						res_in |= bit; // 描述符对应的文件设备可读
						retval++;
						wait->_qproc = NULL; // 避免重复执行__pollwait函数
					}
					if ((mask & POLLOUT_SET) && (out & bit)) {
						res_out |= bit; // 描述符对应的文件设备可写
						retval++;
						wait->_qproc = NULL;
					}
					if ((mask & POLLEX_SET) && (ex & bit)) {
						res_ex |= bit; // 描述符对应的文件设备发生error
						retval++;
						wait->_qproc = NULL;
					}
					/* got something, stop busy polling */
					if (retval) {
						can_busy_loop = false;
						busy_flag = 0;
					/*
					 * only remember a returned
					 * POLL_BUSY_LOOP if we asked for it
					 */
					} else if (busy_flag & mask)
						can_busy_loop = true;
				}
			}
			// 根据f_op->poll函数的结果,设置rinp, routp, rexp
			if (res_in)
				*rinp = res_in;
			if (res_out)
				*routp = res_out;
			if (res_ex)
				*rexp = res_ex;
			cond_resched();
		}
		wait->_qproc = NULL; // 避免重复执行__pollwait函数
		// 如果有文件设备就绪,或超时,或有未决信号;
		// 亦或者发生错误,
		// 则退出大循环.
		if (retval || timed_out || signal_pending(current))
			break;
		if (table.error) {
			retval = table.error;
			break;
		}
		/* only if found POLL_BUSY_LOOP sockets && not out of time */
		if (can_busy_loop && !need_resched()) { // TODO
			if (!busy_end) {
				busy_end = busy_loop_end_time();
				continue;
			}
			if (!busy_loop_timeout(busy_end))
				continue;
		}
		busy_flag = 0;
		/*
		 * If this is the first loop and we have a timeout
		 * given, then we convert to ktime_t and set the to
		 * pointer to the expiry value.
		 */
		if (end_time && !to) {
			expire = timespec_to_ktime(*end_time); // 转换timeout时间.
			to = &expire;
		}
		// 第一次循环时,当前用户进程在这里进入睡眠.
		// 超时,poll_schedule_timeout()返回0;被唤醒时返回-EINTR.
		if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
					   to, slack))
			timed_out = 1;
	}
	// 把当前进程从所有文件的等待队列中删掉,并回收内存.
	poll_freewait(&table);
	// 返回就绪的文件描述符的个数.
	return retval;
}

   2.实现过程

  一. 如果用户传入的超时值不为NULL,则把用户空间的timeout对象拷贝到内 核空间,并计算timespec格式的超时时间.
  二. 调用core_sys_select函数.过程如下:

  1. 根据传入的maxfds值,计算出保存所有文件描述符需要多少字节(1个字节占1bit),再根据字节数判断是在栈上还是堆上分配内存(如果字节数太大,则在堆上分配).一共需要分配6个bitmap,用户传入的in,out,ex,以及返回给用户的res_in,res_out,res_ex.
  2. 将3个用户传入的fdset(in, out, ex)从用户空间拷贝到内核空间,并将3个返回给用户的fdset(res_in, res_out, res_ex)初始化为0.
  3. 调用do_select函数.过程如下:
    3a. 检查传入的fds中fd的有效性.
    3b. 调用poll_initwait函数,以初始化poll_wqueues对象table,其中初始化poll_wqueues.poll_table._qproc函数指针为__pollwait.
    3c. 如果传入的超时值不为空,但设定的时间为0,则设置poll_table._qproc函数指针为NULL.
    3d. 进入大循环.
    3e. 将in, out, ex进行位或运算,得到all_bits.然后遍历all_bits中bit为1的文件描述符,根据当前进程的文件描述符表,获取与当前描述符对应的file结构指针f,并设  置当前描述符待监听的事件掩码,调用f_op->poll函数,该函数会把当前用户进程添加到当前描述符的等待队列中,并获得返回值mask.根据mask设置相应的  返回fdset,执行retval++,并设置wait->_qproc为NULL.
    3f. 遍历完所有的文件描述符后,设置wait->_qproc为NULL,并检查是否有文件设备就绪,或超时,或有未决信号,或发生错误,是则退出大循环,执行第8步.
    3g. 在第一次循环后,如果设置了超时值,用户进程会调用poll_schedule_timeout进入睡眠.
    3h. 最终,调用poll_freewait函数,把当前进程从所有文件的等待队列中删除,回收内存.并返回retval值.
  4. 把res_in, res_out, res_ex从内核空间拷贝到用户空间.并返回ret.

三. 如果设有超时值,则把剩余的超时时间从内核空间拷贝到用户空间.

  3.小结

  从上述源代码的分析可见,select的低效体现在两个方面:

  1. 内存复制开销,每次调用select,都要把文件描述符集合从用户空间拷贝到内核空间,又从内核空间拷贝到用户空间.
  2. 遍历开销,每次调用select,都要在内核中遍历所有传入的文件描述符.

  另外select系统调用的第一个参数maxfdp1,其大小被限制在1024内(即FD_SETSIZE),可监听的文件数量有限,这也是select低效的原因.

 

  poll

  poll的实现与select大致相同,故不做赘述.区别有两点:

  1. 文件描述符集合的结构不同,poll使用的是pollfd,而select使用的是fd_set.
  2. poll没有限制可监听的文件数量.

 

  epoll

  1.源码分析

// @ fs/eventpoll.c
/*
 * This structure is stored inside the "private_data" member of the file
 * structure and represents the main data structure for the eventpoll
 * interface.
 */
 // 每创建一个epollfd,内核就会分配一个eventpoll与之对应,可以理解成内核态的epollfd.
struct eventpoll {
	/* Protect the access to this structure */
	spinlock_t lock;
	/*
	 * This mutex is used to ensure that files are not removed
	 * while epoll is using them. This is held during the event
	 * collection loop, the file cleanup path, the epoll file exit
	 * code and the ctl operations.
	 */
	 /**
	  添加,修改或删除监听fd的时候,以及epoll_wait返回,向用户空间传递数据时,都会持有这个互斥锁.
	  因此,在用户空间中执行epoll相关操作是线程安全的,内核已经做了保护.
	  **/
	struct mutex mtx;
	/* Wait queue used by sys_epoll_wait() */
	/**
	 等待队列头部.
	 当在该等待队列中的进程调用epoll_wait()时,会进入睡眠.
	 **/
	wait_queue_head_t wq;
	/* Wait queue used by file->poll() */
	/**
	 用于epollfd被f_op->poll()的时候
	 **/
	wait_queue_head_t poll_wait;
	/* List of ready file descriptors */
	/**
	 所有已经ready的epitem被存放在这个链表里
	 **/
	struct list_head rdllist;
	/* RB tree root used to store monitored fd structs */
	/**
	 所有待监听的epitem被存放在这个红黑树里
	 **/
	struct rb_root rbr;
	/*
	 * This is a single linked list that chains all the "struct epitem" that
	 * happened while transferring ready events to userspace w/out
	 * holding ->lock.
	 */
	 /**
	  当event转移到用户空间时,这个单链表存放着所有struct epitem
	  **/
	struct epitem *ovflist;
	/* wakeup_source used when ep_scan_ready_list is running */
	struct wakeup_source *ws; // TODO
	/* The user that created the eventpoll descriptor */
	/**
	 这里存放了一些用户变量,比如fd监听数量的最大值等
	 **/
	struct user_struct *user;
	struct file *file;
	/* used to optimize loop detection check */ // TODO
	int visited;
	struct list_head visited_list_link;
};
/*
 * Each file descriptor added to the eventpoll interface will
 * have an entry of this type linked to the "rbr" RB tree.
 * Avoid increasing the size of this struct, there can be many thousands
 * of these on a server and we do not want this to take another cache line.
 */
 // epitem表示一个被监听的fd
struct epitem {
	union {
		/* RB tree node links this structure to the eventpoll RB tree */
		/**
		 红黑树结点,当使用epoll_ctl()将一批fd加入到某个epollfd时,内核会分配一批epitem与fd一一对应,
		 并且以红黑树的形式来组织它们,tree的root被存放在struct eventpoll中.
		 **/
		struct rb_node rbn;
		/* Used to free the struct epitem */
		struct rcu_head rcu; // TODO
	};
	/* List header used to link this structure to the eventpoll ready list */
	/**
	 链表结点,所有已经ready的epitem都会被存放在eventpoll的rdllist链表中.
	 **/
	struct list_head rdllink;
	/*
	 * Works together "struct eventpoll"->ovflist in keeping the
	 * single linked chain of items.
	 */
	struct epitem *next; // 用于eventpoll的ovflist
	/* The file descriptor information this item refers to */
	/**
	 epitem对应的fd和struct file
	 **/
	struct epoll_filefd ffd;
	/* Number of active wait queue attached to poll operations */
	int nwait; // 当前epitem被加入到多少个等待队列中
	/* List containing poll wait queues */
	struct list_head pwqlist;
	/* The "container" of this item */
	/**
	 当前epitem属于那个eventpoll
	 **/
	struct eventpoll *ep;
	/* List header used to link this item to the "struct file" items list */
	struct list_head fllink;
	/* wakeup_source used when EPOLLWAKEUP is set */
	struct wakeup_source __rcu *ws;
	/* The structure that describe the interested events and the source fd */
	/**
	 当前epitem关心哪些event,这个数据是由执行epoll_ctl时从用户空间传递过来的
	 **/
	struct epoll_event event;
};
struct epoll_filefd {
	struct file *file;
	int fd;
} __packed;
/* Wait structure used by the poll hooks */
struct eppoll_entry {
	/* List header used to link this structure to the "struct epitem" */
	struct list_head llink;
	/* The "base" pointer is set to the container "struct epitem" */
	struct epitem *base;
	/*
	 * Wait queue item that will be linked to the target file wait
	 * queue head.
	 */
	wait_queue_t wait;
	/* The wait queue head that linked the "wait" wait queue item */
	wait_queue_head_t *whead;
};
/* Used by the ep_send_events() function as callback private data */
struct ep_send_events_data {
	int maxevents;
	struct epoll_event __user *events;
};
/**
 调用epoll_create()的实质,就是调用epoll_create1().
 **/
SYSCALL_DEFINE1(epoll_create, int, size)
{
	if (size <= 0)
		return -EINVAL;
	return sys_epoll_create1(0);
}
/*
 * Open an eventpoll file descriptor.
 */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
	int error, fd;
	struct eventpoll *ep = NULL;
	struct file *file;
	/* Check the EPOLL_* constant for consistency.  */
	BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
	/**
	 对于epoll来说,目前唯一有效的FLAG是CLOSEXEC
	 **/
	if (flags & ~EPOLL_CLOEXEC)
		return -EINVAL;
	/*
	 * Create the internal data structure ("struct eventpoll").
	 */
	 /**
	 分配一个struct eventpoll,ep_alloc()的具体分析在下面
	 **/
	error = ep_alloc(&ep);
	if (error < 0)
		return error;
	/*
	 * Creates all the items needed to setup an eventpoll file. That is,
	 * a file structure and a free file descriptor.
	 */
	fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC)); // TODO
	if (fd < 0) {
		error = fd;
		goto out_free_ep;
	}
	/**
	 创建一个匿名fd.
	 epollfd本身并不存在一个真正的文件与之对应,所以内核需要创建一个"虚拟"的文件,并为之分配
	 真正的struct file结构,并且具有真正的fd.
	 **/
	file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
				 O_RDWR | (flags & O_CLOEXEC));
	if (IS_ERR(file)) {
		error = PTR_ERR(file);
		goto out_free_fd;
	}
	ep->file = file;
	fd_install(fd, file);
	return fd;
out_free_fd:
	put_unused_fd(fd);
out_free_ep:
	ep_free(ep);
	return error;
}
/**
 分配一个eventpoll结构
 **/
static int ep_alloc(struct eventpoll **pep)
{
	int error;
	struct user_struct *user;
	struct eventpoll *ep;
	/**
	 获取当前用户的一些信息,比如最大监听fd数目
	 **/
	user = get_current_user();
	error = -ENOMEM;
	ep = kzalloc(sizeof(*ep), GFP_KERNEL); // 话说分配eventpoll对象是使用slab还是用buddy呢?TODO
	if (unlikely(!ep))
		goto free_uid;
	/**
	 初始化
	 **/
	spin_lock_init(&ep->lock);
	mutex_init(&ep->mtx);
	init_waitqueue_head(&ep->wq);
	init_waitqueue_head(&ep->poll_wait);
	INIT_LIST_HEAD(&ep->rdllist);
	ep->rbr = RB_ROOT;
	ep->ovflist = EP_UNACTIVE_PTR;
	ep->user = user;
	*pep = ep;
	return 0;
free_uid:
	free_uid(user);
	return error;
}
/*
 * The following function implements the controller interface for
 * the eventpoll file that enables the insertion/removal/change of
 * file descriptors inside the interest set.
 */
/**
 调用epool_ctl来添加要监听的fd.
 参数说明:
 epfd,即epollfd
 op,操作,ADD,MOD,DEL
 fd,需要监听的文件描述符
 event,关心的events
 **/
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
	int error;
	int full_check = 0;
	struct fd f, tf;
	struct eventpoll *ep;
	struct epitem *epi;
	struct epoll_event epds;
	struct eventpoll *tep = NULL;
	error = -EFAULT;
	/**
	 错误处理以及
	 将event从用户空间拷贝到内核空间.
	 **/
	if (ep_op_has_event(op) &&
	    copy_from_user(&epds, event, sizeof(struct epoll_event)))
		goto error_return;
	error = -EBADF;
	/**
	 获取epollfd的file结构,该结构在epoll_create1()中,由函数anon_inode_getfile()分配
	 **/
	f = fdget(epfd);
	if (!f.file)
		goto error_return;
	/* Get the "struct file *" for the target file */
	/**
	 获取待监听的fd的file结构
	 **/
	tf = fdget(fd);
	if (!tf.file)
		goto error_fput;
	/* The target file descriptor must support poll */
	error = -EPERM;
	/**
	 待监听的文件一定要支持poll.
	 话说什么情况下文件不支持poll呢?TODO
	 **/
	if (!tf.file->f_op->poll)
		goto error_tgt_fput;
	/* Check if EPOLLWAKEUP is allowed */
	if (ep_op_has_event(op))
		ep_take_care_of_epollwakeup(&epds);
	/*
	 * We have to check that the file structure underneath the file descriptor
	 * the user passed to us _is_ an eventpoll file. And also we do not permit
	 * adding an epoll file descriptor inside itself.
	 */
	error = -EINVAL;
	/**
	 epollfd不能监听自己
	 **/
	if (f.file == tf.file || !is_file_epoll(f.file))
		goto error_tgt_fput;
	/*
	 * At this point it is safe to assume that the "private_data" contains
	 * our own data structure.
	 */
	 /**
	  获取eventpoll结构,来自于epoll_create1()的分配
	  **/
	ep = f.file->private_data;
	/*
	 * When we insert an epoll file descriptor, inside another epoll file
	 * descriptor, there is the change of creating closed loops, which are
	 * better be handled here, than in more critical paths. While we are
	 * checking for loops we also determine the list of files reachable
	 * and hang them on the tfile_check_list, so we can check that we
	 * haven't created too many possible wakeup paths.
	 *
	 * We do not need to take the global 'epumutex' on EPOLL_CTL_ADD when
	 * the epoll file descriptor is attaching directly to a wakeup source,
	 * unless the epoll file descriptor is nested. The purpose of taking the
	 * 'epmutex' on add is to prevent complex toplogies such as loops and
	 * deep wakeup paths from forming in parallel through multiple
	 * EPOLL_CTL_ADD operations.
	 */
	 /**
	  以下操作可能会修改数据结构内容,锁
	  **/
	  // TODO
	mutex_lock_nested(&ep->mtx, 0);
	if (op == EPOLL_CTL_ADD) {
		if (!list_empty(&f.file->f_ep_links) ||
						is_file_epoll(tf.file)) {
			full_check = 1;
			mutex_unlock(&ep->mtx);
			mutex_lock(&epmutex);
			if (is_file_epoll(tf.file)) {
				error = -ELOOP;
				if (ep_loop_check(ep, tf.file) != 0) {
					clear_tfile_check_list();
					goto error_tgt_fput;
				}
			} else
				list_add(&tf.file->f_tfile_llink,
							&tfile_check_list);
			mutex_lock_nested(&ep->mtx, 0);
			if (is_file_epoll(tf.file)) {
				tep = tf.file->private_data;
				mutex_lock_nested(&tep->mtx, 1);
			}
		}
	}
	/*
	 * Try to lookup the file inside our RB tree, Since we grabbed "mtx"
	 * above, we can be sure to be able to use the item looked up by
	 * ep_find() till we release the mutex.
	 */
	 /**
	  对于每一个监听的fd,内核都有分配一个epitem结构,并且不允许重复分配,所以要查找该fd是否
	  已经存在.
	  ep_find()即在红黑树中查找,时间复杂度为O(lgN).
	  **/
	epi = ep_find(ep, tf.file, fd);
	error = -EINVAL;
	switch (op) {
		/**
		 首先关心添加
		 **/
	case EPOLL_CTL_ADD:
		if (!epi) {
			/**
			 如果ep_find()没有找到相关的epitem,证明是第一次插入.
			 在此可以看到,内核总会关心POLLERR和POLLHUP.
			 **/
			epds.events |= POLLERR | POLLHUP;
			/**
			 红黑树插入,ep_insert()的具体分析在下面
			 **/
			error = ep_insert(ep, &epds, tf.file, fd, full_check);
		} else
			/**
			 如果找到了,则是重复添加
			 **/
			error = -EEXIST;
		if (full_check) // TODO
			clear_tfile_check_list();
		break;
	case EPOLL_CTL_DEL:
			/**
			 删除
			 **/
		if (epi)
			error = ep_remove(ep, epi);
		else
			error = -ENOENT;
		break;
	case EPOLL_CTL_MOD:
			/**
			 修改
			 **/
		if (epi) {
			epds.events |= POLLERR | POLLHUP;
			error = ep_modify(ep, epi, &epds);
		} else
			error = -ENOENT;
		break;
	}
	if (tep != NULL)
		mutex_unlock(&tep->mtx);
	mutex_unlock(&ep->mtx); // 解锁
error_tgt_fput:
	if (full_check)
		mutex_unlock(&epmutex);
	fdput(tf);
error_fput:
	fdput(f);
error_return:
	return error;
}
/*
 * Must be called with "mtx" held.
 */
 /**
  ep_insert()在epoll_ctl()中被调用,其工作是往epollfd的红黑树中添加一个待监听fd.
  **/
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
		     struct file *tfile, int fd, int full_check)
{
	int error, revents, pwake = 0;
	unsigned long flags;
	long user_watches;
	struct epitem *epi;
	struct ep_pqueue epq;
	/**
	 struct ep_pqueue的定义如下:
	 @ fs/eventpoll.c
	 // Wrapper struct used by poll queueing
	 struct ep_pqueue {
		 poll_table pt;
		 struct epitem *epi;
	 };
	 **/
	/**
	 查看是否达到当前用户的最大监听数
	 **/
	user_watches = atomic_long_read(&ep->user->epoll_watches);
	if (unlikely(user_watches >= max_user_watches))
		return -ENOSPC;
	/**
	 从slab中分配一个epitem
	 **/
	if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
		return -ENOMEM;
	/* Item initialization follow here ... */
	/**
	 相关数据成员的初始化
	 **/
	INIT_LIST_HEAD(&epi->rdllink);
	INIT_LIST_HEAD(&epi->fllink);
	INIT_LIST_HEAD(&epi->pwqlist);
	epi->ep = ep;
	/**
	 在该epitem中保存待监听的fd和它的file结构.
	 **/
	ep_set_ffd(&epi->ffd, tfile, fd);
	epi->event = *event;
	epi->nwait = 0;
	epi->next = EP_UNACTIVE_PTR;
	if (epi->event.events & EPOLLWAKEUP) {
		error = ep_create_wakeup_source(epi);
		if (error)
			goto error_create_wakeup_source;
	} else {
		RCU_INIT_POINTER(epi->ws, NULL);
	}
	/* Initialize the poll table using the queue callback */
	epq.epi = epi;
	/**
	 初始化一个poll_table,
	 其实质是指定调用poll_wait()时(不是epoll_wait)的回调函数,以及我们关心哪些event.
	 ep_ptable_queue_proc()就是我们的回调函数,初值是所有event都关心.
	 ep_ptable_queue_proc()的具体分析在下面.
	 **/
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
	/*
	 * Attach the item to the poll hooks and get current event bits.
	 * We can safely use the file* here because its usage count has
	 * been increased by the caller of this function. Note that after
	 * this operation completes, the poll callback can start hitting
	 * the new item.
	 */
	revents = ep_item_poll(epi, &epq.pt);
	/**
	 ep_item_poll()的定义如下:
	 @ fs/eventpoll.c
	 static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
	 {
		pt->_key = epi->event.events;
		return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
	}
	**/
	/**
	  f_op->poll()一般来说只是个wrapper,它会调用真正的poll实现.
	  拿UDP的socket来举例,调用流程如下:
	  f_op->poll(),sock_poll(),udp_poll(),datagram_poll(),sock_poll_wait(),
	  最后调用到上面指定的ep_ptable_queue_proc().
	  完成这一步,该epitem就跟这个socket关联起来了,当后者有状态变化时,会通过ep_poll_callback()
	  来通知.
	  所以,f_op->poll()做了两件事情:
	  1.将该epitem和这个待监听的fd关联起来;
	  2.查询这个待监听的fd是否已经有event已经ready了,有的话就将event返回.
	  **/
	/*
	 * We have to check if something went wrong during the poll wait queue
	 * install process. Namely an allocation for a wait queue failed due
	 * high memory pressure.
	 */
	error = -ENOMEM;
	if (epi->nwait < 0)
		goto error_unregister;
	/* Add the current item to the list of active epoll hook for this file */
	/**
	 把每个文件和对应的epitem关联起来
	 **/
	spin_lock(&tfile->f_lock);
	list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
	spin_unlock(&tfile->f_lock);
	/*
	 * Add the current item to the RB tree. All RB tree operations are
	 * protected by "mtx", and ep_insert() is called with "mtx" held.
	 */
	 /**
	  将epitem插入到eventpoll的红黑树中
	  **/
	ep_rbtree_insert(ep, epi);
	/* now check if we've created too many backpaths */
	error = -EINVAL;
	if (full_check && reverse_path_check())
		goto error_remove_epi;
	/* We have to drop the new item inside our item list to keep track of it */
	spin_lock_irqsave(&ep->lock, flags); // TODO
	/* If the file is already "ready" we drop it inside the ready list */
	/**
	 在这里,如果待监听的fd已经有事件发生,就去处理一下
	 **/
	if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
		/**
		 将当前的epitem加入到ready list中去
		 **/
		list_add_tail(&epi->rdllink, &ep->rdllist);
		ep_pm_stay_awake(epi);
		/* Notify waiting tasks that events are available */
		/**
		 哪个进程在调用epoll_wait(),就唤醒它
		 **/
		if (waitqueue_active(&ep->wq))
			wake_up_locked(&ep->wq);
		/**
		 先不通知对eventpoll进行poll的进程
		 **/
		if (waitqueue_active(&ep->poll_wait))
			pwake++;
	}
	spin_unlock_irqrestore(&ep->lock, flags);
	atomic_long_inc(&ep->user->epoll_watches);
	/* We have to call this outside the lock */
	if (pwake)
	/**
	 安全地通知对eventpoll进行poll的进程
	 **/
		ep_poll_safewake(&ep->poll_wait);
	return 0;
error_remove_epi:
	spin_lock(&tfile->f_lock);
	list_del_rcu(&epi->fllink);
	spin_unlock(&tfile->f_lock);
	rb_erase(&epi->rbn, &ep->rbr);
error_unregister:
	ep_unregister_pollwait(ep, epi);
	/*
	 * We need to do this because an event could have been arrived on some
	 * allocated wait queue. Note that we don't care about the ep->ovflist
	 * list, since that is used/cleaned only inside a section bound by "mtx".
	 * And ep_insert() is called with "mtx" held.
	 */
	spin_lock_irqsave(&ep->lock, flags);
	if (ep_is_linked(&epi->rdllink))
		list_del_init(&epi->rdllink);
	spin_unlock_irqrestore(&ep->lock, flags);
	wakeup_source_unregister(ep_wakeup_source(epi));
error_create_wakeup_source:
	kmem_cache_free(epi_cache, epi);
	return error;
}
/*
 * This is the callback that is used to add our wait queue to the
 * target file wakeup lists.
 */
 /**
  该函数在调用f_op->poll()时被调用.
  其作用是当epoll主动poll某个待监听fd时,将epitem和该fd关联起来.
  关联的方法是使用等待队列.
  **/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
	struct epitem *epi = ep_item_from_epqueue(pt);
	struct eppoll_entry *pwq;
	/**
	 @ fs/eventpoll.c
	 // Wait structure used by the poll hooks
	 struct eppoll_entry {
		// List header used to link this structure to the "struct epitem"
		struct list_head llink;
		// The "base" pointer is set to the container "struct epitem"
		struct epitem *base;
	 	// Wait queue item that will be linked to the target file wait
	    // queue head.
		wait_queue_t wait;
		// The wait queue head that linked the "wait" wait queue item
		wait_queue_head_t *whead;
	};
	**/
	if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
		/**
		 初始化等待队列,指定ep_poll_callback()为唤醒时的回调函数.
		 当监听的fd发生状态改变时,即队列头被唤醒时,指定的回调函数会被调用.
		 **/
		init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); // ep_poll_callback()的具体分析在下面
		pwq->whead = whead;
		pwq->base = epi;
		add_wait_queue(whead, &pwq->wait);
		list_add_tail(&pwq->llink, &epi->pwqlist);
		epi->nwait++;
	} else {
		/* We have to signal that an error occurred */
		epi->nwait = -1;
	}
}
/*
 * This is the callback that is passed to the wait queue wakeup
 * mechanism. It is called by the stored file descriptors when they
 * have events to report.
 */
 /**
  这是一个关键的回调函数.
  当被监听的fd发生状态改变时,该函数会被调用.
  参数key指向events.
  **/
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
	int pwake = 0;
	unsigned long flags;
	struct epitem *epi = ep_item_from_wait(wait); // 从等待队列获取epitem
	struct eventpoll *ep = epi->ep;
	spin_lock_irqsave(&ep->lock, flags);
	/*
	 * If the event mask does not contain any poll(2) event, we consider the
	 * descriptor to be disabled. This condition is likely the effect of the
	 * EPOLLONESHOT bit that disables the descriptor when an event is received,
	 * until the next EPOLL_CTL_MOD will be issued.
	 */
	if (!(epi->event.events & ~EP_PRIVATE_BITS))
		goto out_unlock;
	/*
	 * Check the events coming with the callback. At this stage, not
	 * every device reports the events in the "key" parameter of the
	 * callback. We need to be able to handle both cases here, hence the
	 * test for "key" != NULL before the event match test.
	 */
	 /**
	  没有我们关心的event
	  **/
	if (key && !((unsigned long) key & epi->event.events))
		goto out_unlock;
	/*
	 * If we are transferring events to userspace, we can hold no locks
	 * (because we're accessing user memory, and because of linux f_op->poll()
	 * semantics). All the events that happen during that period of time are
	 * chained in ep->ovflist and requeued later on.
	 */
	 /**
	  如果该函数被调用时,epoll_wait()已经返回了,
	  即此时应用程序已经在循环中获取events了,
	  这种情况下,内核将此刻发生状态改变的epitem用一个单独的链表保存起来,并且在下一次epoll_wait()
	  时返回给用户.这个单独的链表就是ovflist.
	  */
	if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
		if (epi->next == EP_UNACTIVE_PTR) {
			epi->next = ep->ovflist;
			ep->ovflist = epi;
			if (epi->ws) {
				/*
				 * Activate ep->ws since epi->ws may get
				 * deactivated at any time.
				 */
				__pm_stay_awake(ep->ws);
			}
		}
		goto out_unlock;
	}
	/* If this file is already in the ready list we exit soon */
	/**
	 将当前epitem添加到ready list中
	 **/
	if (!ep_is_linked(&epi->rdllink)) {
		list_add_tail(&epi->rdllink, &ep->rdllist);
		ep_pm_stay_awake_rcu(epi);
	}
	/*
	 * Wake up ( if active ) both the eventpoll wait list and the ->poll()
	 * wait list.
	 */
	 /**
	  唤醒调用epoll_wait()的进程
	  **/
	if (waitqueue_active(&ep->wq))
		wake_up_locked(&ep->wq);
	/**
	 先不通知对eventpoll进行poll的进程
	 **/
	if (waitqueue_active(&ep->poll_wait))
		pwake++;
out_unlock:
	spin_unlock_irqrestore(&ep->lock, flags);
	/* We have to call this outside the lock */
	if (pwake)
	/**
	 安全地通知对eventpoll进行poll的进程
	 **/
		ep_poll_safewake(&ep->poll_wait);
	if ((unsigned long)key & POLLFREE) {
		/*
		 * If we race with ep_remove_wait_queue() it can miss
		 * ->whead = NULL and do another remove_wait_queue() after
		 * us, so we can't use __remove_wait_queue().
		 */
		list_del_init(&wait->task_list);
		/*
		 * ->whead != NULL protects us from the race with ep_free()
		 * or ep_remove(), ep_remove_wait_queue() takes whead->lock
		 * held by the caller. Once we nullify it, nothing protects
		 * ep/epi or even wait.
		 */
		smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);
	}
	return 1;
}
/*
 * Implement the event wait interface for the eventpoll file. It is the kernel
 * part of the user space epoll_wait(2).
 */
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	int error;
	struct fd f;
	struct eventpoll *ep;
	/* The maximum number of event must be greater than zero */
	if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
		return -EINVAL;
	/* Verify that the area passed by the user is writeable */
	/**
	 内核要验证这一段用户空间的内存是不是有效的,可写的.
	 **/
	if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
		return -EFAULT;
	/* Get the "struct file *" for the eventpoll file */
	/**
	 获取epollfd的file结构
	 **/
	f = fdget(epfd);
	if (!f.file)
		return -EBADF;
	/*
	 * We have to check that the file structure underneath the fd
	 * the user passed to us _is_ an eventpoll file.
	 */
	error = -EINVAL;
	/**
	 检查它是不是一个真正的epollfd
	 **/
	if (!is_file_epoll(f.file))
		goto error_fput;
	/*
	 * At this point it is safe to assume that the "private_data" contains
	 * our own data structure.
	 */
	 /**
	  获取eventpoll结构
	  **/
	ep = f.file->private_data;
	/* Time to fish for events ... */
	/**
	 睡眠,等待事件到来.
	 ep_poll()的具体分析在下面.
	 **/
	error = ep_poll(ep, events, maxevents, timeout);
error_fput:
	fdput(f);
	return error;
}
/**
* ep_poll - Retrieves ready events, and delivers them to the caller supplied
*           event buffer.
*
* @ep: Pointer to the eventpoll context.
* @events: Pointer to the userspace buffer where the ready events should be
*          stored.
* @maxevents: Size (in terms of number of events) of the caller event buffer.
* @timeout: Maximum timeout for the ready events fetch operation, in
*           milliseconds. If the @timeout is zero, the function will not block,
*           while if the @timeout is less than zero, the function will block
*           until at least one event has been retrieved (or an error
*           occurred).
*
* Returns: Returns the number of ready events which have been fetched, or an
*          error code, in case of error.
*/
/**
执行epoll_wait()的进程在该函数进入休眠状态.
**/
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
	int maxevents, long timeout)
	{
		int res = 0, eavail, timed_out = 0;
		unsigned long flags;
		long slack = 0;
		wait_queue_t wait;
		ktime_t expires, *to = NULL;
		if (timeout > 0) {
			/**
			计算睡眠时间
			**/
			struct timespec end_time = ep_set_mstimeout(timeout);
			slack = select_estimate_accuracy(&end_time);
			to = &expires;
			*to = timespec_to_ktime(end_time);
		} else if (timeout == 0) {
			/**
			已经超时,直接检查ready list
			**/
			/*
			* Avoid the unnecessary trip to the wait queue loop, if the
			* caller specified a non blocking operation.
			*/
			timed_out = 1;
			spin_lock_irqsave(&ep->lock, flags);
			goto check_events;
		}
		fetch_events:
		spin_lock_irqsave(&ep->lock, flags);
	/**
	 没有可用的事件,即ready list和ovflist均为空.
	 **/
	if (!ep_events_available(ep)) {
		/*
		 * We don't have any available event to return to the caller.
		 * We need to sleep here, and we will be wake up by
		 * ep_poll_callback() when events will become available.
		 */
		 /**
		  初始化一个等待队列成员,current是当前进程.
		  然后把该等待队列成员添加到ep的等待队列中,即当前进程把自己添加到等待队列中.
		  **/
		init_waitqueue_entry(&wait, current);
		__add_wait_queue_exclusive(&ep->wq, &wait);
		for (;;) {
			/*
			 * We don't want to sleep if the ep_poll_callback() sends us
			 * a wakeup in between. That's why we set the task state
			 * to TASK_INTERRUPTIBLE before doing the checks.
			 */
			 /**
			  将当前进程的状态设置为睡眠时可以被信号唤醒.
			  仅仅是状态设置,还没有睡眠.
			  **/
			set_current_state(TASK_INTERRUPTIBLE);
			/**
			 如果此时,ready list已经有成员了,或者已经超时,则不进入睡眠.
			 **/
			if (ep_events_available(ep) || timed_out)
				break;
			/**
			 如果有信号产生,不进入睡眠.
			 **/
			if (signal_pending(current)) {
				res = -EINTR;
				break;
			}
			spin_unlock_irqrestore(&ep->lock, flags);
			/**
			 挂起当前进程,等待被唤醒或超时
			 **/
			if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
				timed_out = 1;
			spin_lock_irqsave(&ep->lock, flags);
		}
		__remove_wait_queue(&ep->wq, &wait); // 把当前进程从该epollfd的等待队列中删除.
		__set_current_state(TASK_RUNNING); // 将当前进程的状态设置为可运行.
	}
check_events:
	/* Is it worth to try to dig for events ? */
	eavail = ep_events_available(ep);
	spin_unlock_irqrestore(&ep->lock, flags);
	/*
	 * Try to transfer events to user space. In case we get 0 events and
	 * there's still timeout left over, we go trying again in search of
	 * more luck.
	 */
	 /**
	  如果一切正常,并且有event发生,则拷贝数据给用户空间
	  **/
	  // ep_send_events()的具体分析在下面
	if (!res && eavail &&
	    !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
		goto fetch_events;
	return res;
}
static int ep_send_events(struct eventpoll *ep,
			  struct epoll_event __user *events, int maxevents)
{
	struct ep_send_events_data esed;
	/**
	 @ fs/eventpoll.c
	 // Used by the ep_send_events() function as callback private data
	 struct ep_send_events_data {
	 	int maxevents;
		struct epoll_event __user *events;
	};
	**/
	esed.maxevents = maxevents;
	esed.events = events;
	// ep_scan_ready_list()的具体分析在下面
	return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);
}
/**
 * ep_scan_ready_list - Scans the ready list in a way that makes possible for
 *                      the scan code, to call f_op->poll(). Also allows for
 *                      O(NumReady) performance.
 *
 * @ep: Pointer to the epoll private data structure.
 * @sproc: Pointer to the scan callback.
 * @priv: Private opaque data passed to the @sproc callback.
 * @depth: The current depth of recursive f_op->poll calls.
 * @ep_locked: caller already holds ep->mtx
 *
 * Returns: The same integer error code returned by the @sproc callback.
 */
static int ep_scan_ready_list(struct eventpoll *ep,
			      int (*sproc)(struct eventpoll *,
					   struct list_head *, void *),
			      void *priv, int depth, bool ep_locked)
{
	int error, pwake = 0;
	unsigned long flags;
	struct epitem *epi, *nepi;
	LIST_HEAD(txlist);
	/*
	 * We need to lock this because we could be hit by
	 * eventpoll_release_file() and epoll_ctl().
	 */
	if (!ep_locked)
		mutex_lock_nested(&ep->mtx, depth);
	/*
	 * Steal the ready list, and re-init the original one to the
	 * empty list. Also, set ep->ovflist to NULL so that events
	 * happening while looping w/out locks, are not lost. We cannot
	 * have the poll callback to queue directly on ep->rdllist,
	 * because we want the "sproc" callback to be able to do it
	 * in a lockless way.
	 */
	spin_lock_irqsave(&ep->lock, flags);
	/**
	 将ready list上的epitem(即监听事件发生状态改变的epitem)移动到txlist,
	 并且将ready list清空.
	 **/
	list_splice_init(&ep->rdllist, &txlist);
	/**
	 改变ovflist的值.
	 在上面的ep_poll_callback()中可以看到,如果ovflist != EP_UNACTIVE_PTR,当等待队列成员被激活时,
	 就会将对应的epitem加入到ep->ovflist中,否则加入到ep->rdllist中.
	 所以这里是为了防止把新来的发生状态改变的epitem加入到ready list中.
	 **/
	ep->ovflist = NULL;
	spin_unlock_irqrestore(&ep->lock, flags);
	/*
	 * Now call the callback function.
	 */
	 /**
	  调用扫描函数处理txlist.
	  该扫描函数就是ep_send_events_proc.具体分析在下面.
	  **/
	error = (*sproc)(ep, &txlist, priv);
	spin_lock_irqsave(&ep->lock, flags);
	/*
	 * During the time we spent inside the "sproc" callback, some
	 * other events might have been queued by the poll callback.
	 * We re-insert them inside the main ready-list here.
	 */
	 /**
	  在调用sproc()期间,可能会有新的事件发生(被添加到ovflist中),遍历这些发生新事件的epitem,
	  将它们插入到ready list中.
	  **/
	for (nepi = ep->ovflist; (epi = nepi) != NULL;
	     nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
		/**
		 @ fs/eventpoll.c
		 #define EP_UNACTIVE_PTR ((void *) -1L)
		 **/
		/*
		 * We need to check if the item is already in the list.
		 * During the "sproc" callback execution time, items are
		 * queued into ->ovflist but the "txlist" might already
		 * contain them, and the list_splice() below takes care of them.
		 */
		 /**
		  epitem不在ready list?插入!
		  **/
		if (!ep_is_linked(&epi->rdllink)) {
			list_add_tail(&epi->rdllink, &ep->rdllist);
			ep_pm_stay_awake(epi);
		}
	}
	/*
	 * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
	 * releasing the lock, events will be queued in the normal way inside
	 * ep->rdllist.
	 */
	 /**
	  还原ovflist的状态
	  **/
	ep->ovflist = EP_UNACTIVE_PTR;
	/*
	 * Quickly re-inject items left on "txlist".
	 */
	 /**
	  将上次没有处理完的epitem,重新插入到ready list中.
	  **/
	list_splice(&txlist, &ep->rdllist);
	__pm_relax(ep->ws);
	/**
	 如果ready list不为空,唤醒.
	 **/
	if (!list_empty(&ep->rdllist)) {
		/*
		 * Wake up (if active) both the eventpoll wait list and
		 * the ->poll() wait list (delayed after we release the lock).
		 */
		if (waitqueue_active(&ep->wq))
			wake_up_locked(&ep->wq);
		if (waitqueue_active(&ep->poll_wait))
			pwake++;
	}
	spin_unlock_irqrestore(&ep->lock, flags);
	if (!ep_locked)
		mutex_unlock(&ep->mtx);
	/* We have to call this outside the lock */
	if (pwake)
		ep_poll_safewake(&ep->poll_wait);
	return error;
}
/**
 该函数作为callback在ep_scan_ready_list()中被调用.
 head是一个链表头,链接着已经ready了的epitem.
 这个链表不是eventpoll的ready list,而是上面函数中的txlist.
 **/
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
			       void *priv)
{
	struct ep_send_events_data *esed = priv;
	int eventcnt;
	unsigned int revents;
	struct epitem *epi;
	struct epoll_event __user *uevent;
	struct wakeup_source *ws;
	poll_table pt;
	init_poll_funcptr(&pt, NULL);
	/*
	 * We can loop without lock because we are passed a task private list.
	 * Items cannot vanish during the loop because ep_scan_ready_list() is
	 * holding "mtx" during this call.
	 */
	 /**
	  遍历整个链表
	  **/
	for (eventcnt = 0, uevent = esed->events;
	     !list_empty(head) && eventcnt < esed->maxevents;) {
		/**
		 取出第一个结点
		 **/
		epi = list_first_entry(head, struct epitem, rdllink);
		/*
		 * Activate ep->ws before deactivating epi->ws to prevent
		 * triggering auto-suspend here (in case we reactive epi->ws
		 * below).
		 *
		 * This could be rearranged to delay the deactivation of epi->ws
		 * instead, but then epi->ws would temporarily be out of sync
		 * with ep_is_linked().
		 */
		 // TODO
		ws = ep_wakeup_source(epi);
		if (ws) {
			if (ws->active)
				__pm_stay_awake(ep->ws);
			__pm_relax(ws);
		}
		/**
		 从ready list中删除该结点
		 **/
		list_del_init(&epi->rdllink);
		/**
		 获取ready事件掩码
		 **/
		revents = ep_item_poll(epi, &pt);
		/**
		 ep_item_poll()的具体分析在上面的ep_insert()中.
		 **/
		/*
		 * If the event mask intersect the caller-requested one,
		 * deliver the event to userspace. Again, ep_scan_ready_list()
		 * is holding "mtx", so no operations coming from userspace
		 * can change the item.
		 */
		if (revents) {
			/**
			 将ready事件和用户传入的数据都拷贝到用户空间
			 **/
			if (__put_user(revents, &uevent->events) ||
			    __put_user(epi->event.data, &uevent->data)) {
				list_add(&epi->rdllink, head);
				ep_pm_stay_awake(epi);
				return eventcnt ? eventcnt : -EFAULT;
			}
			eventcnt++;
			uevent++;
			if (epi->event.events & EPOLLONESHOT)
				epi->event.events &= EP_PRIVATE_BITS;
			else if (!(epi->event.events & EPOLLET)) {
				/**
				 边缘触发(ET)和水平触发(LT)的区别:
				 如果是ET,就绪epitem不会再次被加入到ready list中,除非fd再次发生状态改变,ep_poll_callback被调用.
				 如果是LT,不论是否还有有效的事件和数据,epitem都会被再次加入到ready list中,在下次epoll_wait()时会
				  立即返回,并通知用户空间.当然如果这个被监听的fd确实没有事件和数据,epoll_wait()会返回一个0.
				  **/
				/*
				 * If this file has been added with Level
				 * Trigger mode, we need to insert back inside
				 * the ready list, so that the next call to
				 * epoll_wait() will check again the events
				 * availability. At this point, no one can insert
				 * into ep->rdllist besides us. The epoll_ctl()
				 * callers are locked out by
				 * ep_scan_ready_list() holding "mtx" and the
				 * poll callback will queue them in ep->ovflist.
				 */
				list_add_tail(&epi->rdllink, &ep->rdllist);
				ep_pm_stay_awake(epi);
			}
		}
	}
	return eventcnt;
}
/**
 该函数在epollfd被close时调用,其工作是释放一些资源.
 **/
static void ep_free(struct eventpoll *ep)
{
	struct rb_node *rbp;
	struct epitem *epi;
	/* We need to release all tasks waiting for these file */
	if (waitqueue_active(&ep->poll_wait))
		ep_poll_safewake(&ep->poll_wait);
	/*
	 * We need to lock this because we could be hit by
	 * eventpoll_release_file() while we're freeing the "struct eventpoll".
	 * We do not need to hold "ep->mtx" here because the epoll file
	 * is on the way to be removed and no one has references to it
	 * anymore. The only hit might come from eventpoll_release_file() but
	 * holding "epmutex" is sufficient here.
	 */
	mutex_lock(&epmutex);
	/*
	 * Walks through the whole tree by unregistering poll callbacks.
	 */
	for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
		epi = rb_entry(rbp, struct epitem, rbn);
		ep_unregister_pollwait(ep, epi);
		cond_resched();
	}
	/*
	 * Walks through the whole tree by freeing each "struct epitem". At this
	 * point we are sure no poll callbacks will be lingering around, and also by
	 * holding "epmutex" we can be sure that no file cleanup code will hit
	 * us during this operation. So we can avoid the lock on "ep->lock".
	 * We do not need to lock ep->mtx, either, we only do it to prevent
	 * a lockdep warning.
	 */
	mutex_lock(&ep->mtx);
	/**
	 在epoll_ctl()中被添加的监听fd,在这里被关闭.
	 **/
	while ((rbp = rb_first(&ep->rbr)) != NULL) {
		epi = rb_entry(rbp, struct epitem, rbn);
		ep_remove(ep, epi);
		cond_resched();
	}
	mutex_unlock(&ep->mtx);
	mutex_unlock(&epmutex);
	mutex_destroy(&ep->mtx);
	free_uid(ep->user);
	wakeup_source_unregister(ep->ws);
	kfree(ep);
}

  2.实现过程

  一. epoll_create

  1. 调用ep_alloc()来创建一个struct eventpoll对象.ep_alloc()的执行过程如下:
    1a. 获取当前用户的一些信息.
    1b. 分配一个struct eventpoll对象.
    1c. 初始化相关数据成员,如等待队列,就绪链表,红黑树.
  2. 创建一个匿名fd和与之对应的struct file对象.
  3. 将该eventpoll和该file关联起来,eventpoll对象保存在file对象的private_data指针中.

  二. epoll_ctl

  1. 将event拷贝到内核空间.
  2. 判断加入的fd是否支持poll操作.
  3. 根据用户传入的op参数,以及是否在eventpoll的红黑树中找到该fd的结点,来执行相应的操作(插入,删除,修改).拿插入举例,执行ep_insert():
    3a. 在slab缓存中分配一个epitem对象,并初始化相关数据成员,如保存待监听的fd和它的file结构.
    3b. 指定调用poll_wait()时(再次强调,不是epoll_wait)时的回调函数,用于数据就绪时唤醒进程.(其实质是初始化文件的等待队列,将进程加入到等待队列).
    3c. 到此该epitem就和这个待监听的fd关联起来了.
    3d. 将该epitem插入到eventpoll的红黑树中.

  三. epoll_wait

  1. 调用ep_poll():
    1a. 计算睡眠时间(如果有).
    1b. 判断eventpoll的就绪链表是否为空,不为空则直接处理而不是睡眠.
    1c. 将当前进程添加到eventpoll的等待队列中.
    1d. 进入循环.
    1e. 将当前进程设置成TASK_INTERRUPTIBLE状态,然后判断是否有信号到来,如果没有,则进入睡眠.
    1f. 如果超时或被信号唤醒,则跳出循环.
    1g. 将当前进程从等待队列中删除,并把其状态设置成TASK_RUNNING.
    1h. 将数据拷贝给用户空间.拷贝的过程是先把ready list转移到中间链表,然后遍历中间链表拷贝到用户空间,并且判断每个结点是否水平触发,是则再次插入
    到ready list.

  3.小结

  从上述分析可见,相对于select/poll,epoll的高效体现在:

  1. fd只拷贝一次,即调用epoll_ctl()时把fd拷贝到内核空间.
  2. epoll轮询的是就绪链表,而不是所有fd.

  最后

  本以为两天时间就可以分析完这些源码并完成这篇博文,谁知道花了整整一个星期.不过收获匪浅.
  在此留下两个问题:

  1. 因为目前我的知识面有限,上述源码留下一些TODO,日后会一一完善.
  2. 话说epoll有什么缺点呢?

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部