Go netpoll I/O 多路复用构建原生网络模型之源码深度解析

原创
2019/11/09 19:26
阅读数 0

导言

Go 基于 I/O multiplexing 和 goroutine 构建了一个简洁而高性能的原生网络模型(基于 Go 的I/O 多路复用 netpoll),提供了 goroutine-per-connection 这样简单的网络编程模式。在这种模式下,开发者使用的是同步的模式去编写异步的逻辑,极大地降低了开发者编写网络应用时的心智负担,且借助于 Go runtime scheduler 对 goroutines 的高效调度,这个原生网络模型不论从适用性还是性能上都足以满足绝大部分的应用场景。

然而,在工程性上能做到如此高的普适性和兼容性,最终暴露给开发者提供接口/模式如此简洁,其底层必然是基于非常复杂的封装,做了很多取舍,也有可能放弃了一些『极致』的设计和理念。事实上 netpoll底层就是基于 epoll/kqueue/iocp 这些系统调用来做封装的,最终暴露出 goroutine-per-connection 这样的极简的开发模式给使用者。

Go netpoll 在不同的操作系统,其底层使用的 I/O 多路复用技术也不一样,可以从 Go 源码目录结构和对应代码文件了解 Go 在不同平台下的网络 I/O 模式的实现。比如,在 Linux 系统下基于 epoll,freeBSD 系统下基于 kqueue,以及 Windows 系统下基于 iocp。

本文将基于 linux 平台来解析 Go netpoll 之 I/O 多路复用的底层是如何基于 epoll 封装实现的,从源码层层推进,全面而深度地解析 Go netpoll 的设计理念和实现原理,以及 Go 是如何利用 netpoll来构建它的原生网络模型的。主要涉及到的一些概念:I/O 模式、用户/内核空间、epoll、linux 源码、goroutine scheduler 等等,我会尽量简单地讲解,如果有对相关概念不熟悉的同学,还是希望能提前熟悉一下。

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对 32 位操作系统而言,它的寻址空间(虚拟存储空间)为 4G(2 的 32 次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。针对 linux 操作系统而言,将最高的 1G 字节(从虚拟地址 0xC0000000 到 0xFFFFFFFF),供内核使用,称为内核空间,而将较低的 3G 字节(从虚拟地址 0x00000000 到0xBFFFFFFF),供各个进程使用,称为用户空间。

I/O 多路复用

在神作《UNIX 网络编程》里,总结归纳了 5 种 I/O 模型,包括同步和异步 I/O:

  • 阻塞 I/O (Blocking I/O)

  • 非阻塞 I/O (Nonblocking I/O)

  • I/O 多路复用 (I/O multiplexing)

  • 信号驱动 I/O (Signal driven I/O)

  • 异步 I/O (Asynchronous I/O)

操作系统上的 I/O 是用户空间和内核空间的数据交互,因此 I/O 操作通常包含以下两个步骤:

  1. 等待网络数据到达网卡(读就绪)/等待网卡可写(写就绪) –> 读取/写入到内核缓冲区

  2. 从内核缓冲区复制数据 –> 用户空间(读)/从用户空间复制数据 -> 内核缓冲区(写)

而判定一个 I/O 模型是同步还是异步,主要看第二步:数据在用户和内核空间之间复制的时候是不是会阻塞当前进程,如果会,则是同步 I/O,否则,就是异步 I/O。基于这个原则,这 5 种 I/O 模型中只有一种异步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。

这 5 种 I/O 模型的对比如下:

所谓 I/O 多路复用指的就是 select/poll/epoll 这一系列的多路选择器:支持单一线程同时监听多个文件描述符(I/O事件),阻塞等待,并在其中某个文件描述符可读写时收到通知。 I/O 复用其实复用的不是 I/O 连接,而是复用线程,让一个 thread of control 能够处理多个连接(I/O 事件)。

select & poll

  
    
  
  
  
  1. #include <sys/select.h>


  2. /* According to earlier standards */

  3. #include <sys/time.h>

  4. #include <sys/types.h>

  5. #include <unistd.h>


  6. int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);


  7. // 和 select 紧密结合的四个宏:

  8. void FD_CLR(int fd, fd_set *set);

  9. int FD_ISSET(int fd, fd_set *set);

  10. void FD_SET(int fd, fd_set *set);

  11. void FD_ZERO(fd_set *set);

select 是 epoll 之前 linux 使用的 I/O 事件驱动技术。

理解 select 的关键在于理解 fdset,为说明方便,取 fdset 长度为 1 字节,fdset 中的每一 bit 可以对应一个文件描述符 fd,则 1 字节长的 fdset 最大可以对应 8 个 fd。select 的调用过程如下:

  1. 执行 FD_ZERO(&set), 则 set 用位表示是 0000,0000

  2. 若 fd=5, 执行 FD_SET(fd, &set); 后 set 变为 0001,0000(第5位置为1)

  3. 再加入 fd=2, fd=1,则 set 变为 0001,0011

  4. 执行 select(6, &set, 0, 0, 0) 阻塞等待

  5. 若 fd=1, fd=2 上都发生可读事件,则 select 返回,此时 set 变为 0000,0011 (注意:没有事件发生的 fd=5 被清空)

基于上面的调用过程,可以得出 select 的特点:

  • 可监控的文件描述符个数取决于 sizeof(fdset) 的值。假设服务器上 sizeof(fdset)=512,每 bit 表示一个文件描述符,则服务器上支持的最大文件描述符是 512*8=4096。fd_set的大小调整可参考 【原创】技术系列之 网络模型(二) 中的模型 2,可以有效突破 select 可监控的文件描述符上限

  • 将 fd 加入 select 监控集的同时,还要再使用一个数据结构 array 保存放到 select 监控集中的 fd,一是用于在 select 返回后,array 作为源数据和 fdset 进行 FDISSET 判断。二是 select 返回后会把以前加入的但并无事件发生的 fd 清空,则每次开始 select 前都要重新从 array 取得 fd 逐一加入(FD_ZERO最先),扫描 array 的同时取得 fd 最大值 maxfd,用于 select 的第一个参数

  • 可见 select 模型必须在 select 前循环 array(加 fd,取 maxfd),select 返回后循环 array(FD_ISSET判断是否有事件发生)

所以,select 有如下的缺点:

  1. 最大并发数限制:使用 32 个整数的 32 位,即 32*32=1024 来标识 fd,虽然可修改,但是有以下第 2, 3 点的瓶颈

  2. 每次调用 select,都需要把 fd 集合从用户态拷贝到内核态,这个开销在 fd 很多时会很大

  3. 性能衰减严重:每次 kernel 都需要线性扫描整个 fd_set,所以随着监控的描述符 fd 数量增长,其 I/O 性能会线性下降

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 select 的 fd_set 结构,poll 解决了最大文件描述符数量限制的问题,但是同样需要从用户态拷贝所有的 fd 到内核态,也需要线性遍历所有的 fd 集合,所以它和 select 只是实现细节上的区分,并没有本质上的区别。

epoll

epoll 是 linux kernel 2.6 之后引入的新 I/O 事件驱动技术,I/O 多路复用的核心设计是 1 个线程处理所有连接的 等待消息准备好I/O 事件,这一点上 epoll 和 select&poll 是大同小异的。但 select&poll 预估错误了一件事,当数十万并发连接存在时,可能每一毫秒只有数百个活跃的连接,同时其余数十万连接在这一毫秒是非活跃的。select&poll 的使用方法是这样的: 返回的活跃连接==select(全部待监控的连接)

什么时候会调用 select&poll 呢?在你认为需要找出有报文到达的活跃连接时,就应该调用。所以,select&poll 在高并发时是会被频繁调用的。这样,这个频繁调用的方法就很有必要看看它是否有效率,因为,它的轻微效率损失都会被 高频二字所放大。它有效率损失吗?显而易见,全部待监控连接是数以十万计的,返回的只是数百个活跃连接,这本身就是无效率的表现。被放大后就会发现,处理并发上万个连接时,select&poll 就完全力不从心了。这个时候就该 epoll 上场了,epoll 通过一些新的设计和优化,基本上解决了 select&poll 的问题。

epoll 的 API 非常简洁,涉及到的只有 3 个系统调用:

  
    
  
  
  
  1. #include <sys/epoll.h>

  2. int epoll_create(int size); // int epoll_create1(int flags);

  3. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

  4. int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

其中,epollcreate 创建一个 epoll 实例并返回 epollfd;epollctl 注册 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 实例上;epollwait 则是阻塞监听 epoll 实例上所有的 file descriptor 的 I/O 事件,它接收一个用户空间上的一块内存地址 (events 数组),kernel 会在有 I/O 事件发生的时候把文件描述符列表复制到这块内存地址上,然后 epollwait 解除阻塞并返回,最后用户空间上的程序就可以对相应的 fd 进行读写了:

  
    
  
  
  
  1. #include <unistd.h>

  2. ssize_t read(int fd, void *buf, size_t count);

  3. ssize_t write(int fd, const void *buf, size_t count);

epoll 的工作原理如下:

与 select&poll 相比,epoll 分清了高频调用和低频调用。例如,epollctl 相对来说就是不太频繁被调用的,而 epollwait 则是非常频繁被调用的。所以 epoll 利用 epollctl 来插入或者删除一个 fd,实现用户态到内核态的数据拷贝,这确保了每一个 fd 在其生命周期只需要被拷贝一次,而不是每次调用 epollwait 的时候都拷贝一次。 epoll_wait 则被设计成几乎没有入参的调用,相比 select&poll 需要把全部监听的 fd 集合从用户态拷贝至内核态的做法,epoll 的效率就高出了一大截。

在实现上 epoll 采用红黑树来存储所有监听的 fd,而红黑树本身插入和删除性能比较稳定,时间复杂度 O(logN)。通过 epollctl 函数添加进来的 fd 都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把 fd 添加进来的时候时候会完成关键的一步:该 fd 都会与相应的设备(网卡)驱动程序建立回调关系,也就是在内核中断处理程序为它注册一个回调函数,在 fd 相应的事件触发(中断)之后(设备就绪了),内核就会调用这个回调函数,该回调函数在内核中被称为: ep_poll_callback这个回调函数其实就是把这个 fd 添加到 rdllist 这个双向链表(就绪链表)中。epollwait 实际上就是去检查 rdlist 双向链表中是否有就绪的 fd,当 rdlist 为空(无就绪fd)时挂起当前进程,直到 rdlist 非空时进程才被唤醒并返回。

相比于 select&poll 调用时会将全部监听的 fd 从用户态空间拷贝至内核态空间并线性扫描一遍找出就绪的 fd 再返回到用户态,epoll_wait 则是直接返回已就绪 fd,因此 epoll 的 I/O 性能不会像 select&poll 那样随着监听的 fd 数量增加而出现线性衰减,是一个非常高效的 I/O 事件驱动技术。

由于使用 epoll 的 I/O 多路复用需要用户进程自己负责 I/O 读写,从用户进程的角度看,读写过程是阻塞的,所以 select&poll&epoll 本质上都是同步 I/O 模型,而像 Windows 的 IOCP 这一类的异步 I/O,只需要在调用 WSARecv 或 WSASend 方法读写数据的时候把用户空间的内存 buffer 提交给 kernel,kernel 负责数据在用户空间和内核空间拷贝,完成之后就会通知用户进程,整个过程不需要用户进程参与,所以是真正的异步 I/O。

延伸

另外,我看到有些文章说 epoll 之所以性能高是因为利用了 linux 的 mmap 内存映射让内核和用户进程共享了一片物理内存,用来存放就绪 fd 列表和它们的数据 buffer,所以用户进程在 epoll_wait返回之后用户进程就可以直接从共享内存那里读取/写入数据了,这让我很疑惑,因为首先看 epoll_wait的函数声明:

  
    
  
  
  
  1. int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

第二个参数:就绪事件列表,是需要在用户空间分配内存然后再传给 epoll_wait的,如果内核会用 mmap 设置共享内存,直接传递一个指针进去就行了,根本不需要在用户态分配内存,多此一举。其次,内核和用户进程通过 mmap 共享内存是一件极度危险的事情,内核无法确定这块共享内存什么时候会被回收,而且这样也会赋予用户进程直接操作内核数据的权限和入口,非常容易出现大的系统漏洞,因此一般极少会这么做。所以我很怀疑 epoll 是不是真的在 linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 linux kernel 源码:

  
    
  
  
  
  1. /*

  2. * Implement the event wait interface for the eventpoll file. It is the kernel

  3. * part of the user space epoll_wait(2).

  4. */

  5. static int do_epoll_wait(int epfd, struct epoll_event __user *events,

  6. int maxevents, int timeout)

  7. {

  8. // ...


  9. /* Time to fish for events ... */

  10. error = ep_poll(ep, events, maxevents, timeout);

  11. }


  12. // 如果 epoll_wait 入参时设定 timeout == 0, 那么直接通过 ep_events_available 判断当前是否有用户感兴趣的事件发生,如果有则通过 ep_send_events 进行处理

  13. // 如果设置 timeout > 0,并且当前没有用户关注的事件发生,则进行休眠,并添加到 ep->wq 等待队列的头部;对等待事件描述符设置 WQ_FLAG_EXCLUSIVE 标志

  14. // ep_poll 被事件唤醒后会重新检查是否有关注事件,如果对应的事件已经被抢走,那么 ep_poll 会继续休眠等待

  15. static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,

  16. int maxevents, long timeout)

  17. {

  18. // ...


  19. send_events:

  20. /*

  21. * Try to transfer events to user space. In case we get 0 events and

  22. * there's still timeout left over, we go trying again in search of

  23. * more luck.

  24. */


  25. // 如果一切正常, 有 event 发生, 就开始准备数据 copy 给用户空间了

  26. // 如果有就绪的事件发生,那么就调用 ep_send_events 将就绪的事件 copy 到用户态内存中,

  27. // 然后返回到用户态,否则判断是否超时,如果没有超时就继续等待就绪事件发生,如果超时就返回用户态。

  28. // 从 ep_poll 函数的实现可以看到,如果有就绪事件发生,则调用 ep_send_events 函数做进一步处理

  29. if (!res && eavail &&

  30. !(res = ep_send_events(ep, events, maxevents)) && !timed_out)

  31. goto fetch_events;


  32. // ...

  33. }


  34. // ep_send_events 函数是用来向用户空间拷贝就绪 fd 列表的,它将用户传入的就绪 fd 列表内存简单封装到

  35. // ep_send_events_data 结构中,然后调用 ep_scan_ready_list 将就绪队列中的事件写入用户空间的内存;

  36. // 用户进程就可以访问到这些数据进行处理

  37. static int ep_send_events(struct eventpoll *ep,

  38. struct epoll_event __user *events, int maxevents)

  39. {

  40. struct ep_send_events_data esed;


  41. esed.maxevents = maxevents;

  42. esed.events = events;

  43. // 调用 ep_scan_ready_list 函数检查 epoll 实例 eventpoll 中的 rdllist 就绪链表,

  44. // 并注册一个回调函数 ep_send_events_proc,如果有就绪 fd,则调用 ep_send_events_proc 进行处理

  45. ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);

  46. return esed.res;

  47. }


  48. // 调用 ep_scan_ready_list 的时候会传递指向 ep_send_events_proc 函数的函数指针作为回调函数,

  49. // 一旦有就绪 fd,就会调用 ep_send_events_proc 函数

  50. static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head,

  51. void *priv)

  52. {

  53. // ...


  54. /*

  55. * If the event mask intersect the caller-requested one,

  56. * deliver the event to userspace. Again, ep_scan_ready_list()

  57. * is holding ep->mtx, so no operations coming from userspace

  58. * can change the item.

  59. */

  60. revents = ep_item_poll(epi, &pt, 1);

  61. // 如果 revents 为 0,说明没有就绪的事件,跳过,否则就将就绪事件拷贝到用户态内存中

  62. if (!revents)

  63. continue;

  64. // 将当前就绪的事件和用户进程传入的数据都通过 __put_user 拷贝回用户空间,

  65. // 也就是调用 epoll_wait 之时用户进程传入的 fd 列表的内存

  66. if (__put_user(revents, &uevent->events) ||

  67. __put_user(epi->event.data, &uevent->data)) {

  68. list_add(&epi->rdllink, head);

  69. ep_pm_stay_awake(epi);

  70. if (!esed->res)

  71. esed->res = -EFAULT;

  72. return 0;

  73. }


  74. // ...

  75. }

do_epoll_wait开始层层跳转,我们可以很清楚地看到最后内核是通过 __put_user函数把就绪 fd 列表和事件返回到用户空间,而 __put_user正是内核用来拷贝数据到用户空间的标准函数。此外,我并没有在 linux kernel 的源码中和 epoll 相关的代码里找到 mmap 系统调用做内存映射的逻辑,所以基本可以得出结论:epoll 在 linux kernel 里并没有使用 mmap 来做用户空间和内核空间的内存共享,所以那些说 epoll 使用了 mmap 的文章都是误解。

Non-blocking I/O

什么叫非阻塞 I/O,顾名思义就是:所有 I/O 操作都是立刻返回而不会阻塞当前用户进程。I/O 多路复用通常情况下需要和非阻塞 I/O 搭配使用,否则可能会产生意想不到的问题。比如,epoll 的 ET(边缘触发) 模式下,如果不使用非阻塞 I/O,有极大的概率会导致阻塞 event-loop 线程,从而降低吞吐量,甚至导致 bug。

Linux 下,我们可以通过 fcntl 系统调用来设置 O_NONBLOCK标志位,从而把 socket 设置成 non-blocking。当对一个 non-blocking socket 执行读操作时,流程是这个样子的:

当用户进程发出 read 操作时,如果 kernel 中的数据还没有准备好,那么它并不会 block 用户进程,而是立刻返回一个 EAGAIN error。从用户进程角度讲 ,它发起一个 read 操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个 error 时,它就知道数据还没有准备好,于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了,并且又再次收到了用户进程的 system call,那么它马上就将数据拷贝到了用户内存,然后返回。

所以,non-blocking I/O 的特点是用户进程需要不断的主动询问 kernel 数据好了没有。

Go netpoll

一个典型的 Go TCP server:

  
    
  
  
  
  1. package main


  2. import (

  3. "fmt"

  4. "net"

  5. )


  6. func main() {

  7. listen, err := net.Listen("tcp", ":8888")

  8. if err != nil {

  9. fmt.Println("listen error: ", err)

  10. return

  11. }


  12. for {

  13. conn, err := listen.Accept()

  14. if err != nil {

  15. fmt.Println("accept error: ", err)

  16. break

  17. }


  18. // start a new goroutine to handle the new connection

  19. go HandleConn(conn)

  20. }

  21. }

  22. func HandleConn(conn net.Conn) {

  23. defer conn.Close()

  24. packet := make([]byte, 1024)

  25. for {

  26. // 如果没有可读数据,也就是读 buffer 为空,则阻塞

  27. _, _ = conn.Read(packet)

  28. // 同理,不可写则阻塞

  29. _, _ = conn.Write(packet)

  30. }

  31. }

上面是一个基于 Go 原生网络模型(基于 netpoll)编写的一个 TCP server,模式是 goroutine-per-connection,在这种模式下,开发者使用的是同步的模式去编写异步的逻辑而且对于开发者来说 I/O 是否阻塞是无感知的,也就是说开发者无需考虑 goroutines 甚至更底层的线程、进程的调度和上下文切换。而 Go netpoll 最底层的事件驱动技术肯定是基于 epoll/kqueue/iocp 这一类的 I/O 事件驱动技术,只不过是把这些调度和上下文切换的工作转移到了 runtime 的 Go scheduler,让它来负责调度 goroutines,从而极大地降低了程序员的心智负担!

Go netpoll 核心

Go netpoll 通过在底层对 epoll/kqueue/iocp 的封装,从而实现了使用同步编程模式达到异步执行的效果。总结来说,所有的网络操作都以网络描述符 netFD 为中心实现。netFD 与底层 PollDesc 结构绑定,当在一个 netFD 上读写遇到 EAGAIN 错误时,就将当前 goroutine 存储到这个 netFD 对应的 PollDesc 中,同时调用 gopark 把当前 goroutine 给 park 住,直到这个 netFD 上再次发生读写事件,才将此 goroutine 给 ready 激活重新运行。显然,在底层通知 goroutine 再次发生读写等事件的方式就是 epoll/kqueue/iocp 等事件驱动机制。

接下来我们通过分析最新的 Go 源码(v1.13.4),解读一下整个 netpoll 的运行流程。

上面的示例代码中相关的在源码里的几个数据结构和方法:

  
    
  
  
  
  1. // TCPListener is a TCP network listener. Clients should typically

  2. // use variables of type Listener instead of assuming TCP.

  3. type TCPListener struct {

  4. fd *netFD

  5. lc ListenConfig

  6. }


  7. // Accept implements the Accept method in the Listener interface; it

  8. // waits for the next call and returns a generic Conn.

  9. func (l *TCPListener) Accept() (Conn, error) {

  10. if !l.ok() {

  11. return nil, syscall.EINVAL

  12. }

  13. c, err := l.accept()

  14. if err != nil {

  15. return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}

  16. }

  17. return c, nil

  18. }


  19. func (ln *TCPListener) accept() (*TCPConn, error) {

  20. fd, err := ln.fd.accept()

  21. if err != nil {

  22. return nil, err

  23. }

  24. tc := newTCPConn(fd)

  25. if ln.lc.KeepAlive >= 0 {

  26. setKeepAlive(fd, true)

  27. ka := ln.lc.KeepAlive

  28. if ln.lc.KeepAlive == 0 {

  29. ka = defaultTCPKeepAlive

  30. }

  31. setKeepAlivePeriod(fd, ka)

  32. }

  33. return tc, nil

  34. }


  35. // TCPConn is an implementation of the Conn interface for TCP network

  36. // connections.

  37. type TCPConn struct {

  38. conn

  39. }


  40. // Conn

  41. type conn struct {

  42. fd *netFD

  43. }


  44. type conn struct {

  45. fd *netFD

  46. }


  47. func (c *conn) ok() bool { return c != nil && c.fd != nil }


  48. // Implementation of the Conn interface.


  49. // Read implements the Conn Read method.

  50. func (c *conn) Read(b []byte) (int, error) {

  51. if !c.ok() {

  52. return 0, syscall.EINVAL

  53. }

  54. n, err := c.fd.Read(b)

  55. if err != nil && err != io.EOF {

  56. err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}

  57. }

  58. return n, err

  59. }


  60. // Write implements the Conn Write method.

  61. func (c *conn) Write(b []byte) (int, error) {

  62. if !c.ok() {

  63. return 0, syscall.EINVAL

  64. }

  65. n, err := c.fd.Write(b)

  66. if err != nil {

  67. err = &OpError{Op: "write", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}

  68. }

  69. return n, err

  70. }

netFD

net.Listen("tcp",":8888") 方法返回了一个 *TCPListener,它是一个实现了 net.Listener 接口的 struct,而通过 listen.Accept() 接收的新连接 *TCPConn 则是一个实现了 net.Conn 接口的 struct,它内嵌了 net.conn struct。仔细阅读上面的源码可以发现,不管是 Listener 的 Accept 还是 Conn 的 Read/Write 方法,都是基于一个 netFD 的数据结构的操作, netFD 是一个网络描述符,类似于 Linux 的文件描述符的概念,netFD 中包含一个 poll.FD 数据结构,而 poll.FD 中包含两个重要的数据结构 Sysfd 和 pollDesc,前者是真正的系统文件描述符,后者对是底层事件驱动的封装,所有的读写超时等操作都是通过调用后者的对应方法实现的。

  1. 服务端的 netFD 在 listen时会创建 epoll 的实例,并将 listenerFD 加入 epoll 的事件队列

  2. netFD 在 accept时将返回的 connFD 也加入 epoll 的事件队列

  3. netFD 在读写时出现 syscall.EAGAIN错误,通过 pollDesc 的 waitRead 方法将当前的 goroutine park 住,直到 ready,从 pollDesc 的 waitRead中返回

  
    
  
  
  
  1. // Network file descriptor.

  2. type netFD struct {

  3. pfd poll.FD


  4. // immutable until Close

  5. family int

  6. sotype int

  7. isConnected bool // handshake completed or use of association with peer

  8. net string

  9. laddr Addr

  10. raddr Addr

  11. }


  12. // FD is a file descriptor. The net and os packages use this type as a

  13. // field of a larger type representing a network connection or OS file.

  14. type FD struct {

  15. // Lock sysfd and serialize access to Read and Write methods.

  16. fdmu fdMutex


  17. // System file descriptor. Immutable until Close.

  18. Sysfd int


  19. // I/O poller.

  20. pd pollDesc


  21. // Writev cache.

  22. iovecs *[]syscall.Iovec


  23. // Semaphore signaled when file is closed.

  24. csema uint32


  25. // Non-zero if this file has been set to blocking mode.

  26. isBlocking uint32


  27. // Whether this is a streaming descriptor, as opposed to a

  28. // packet-based descriptor like a UDP socket. Immutable.

  29. IsStream bool


  30. // Whether a zero byte read indicates EOF. This is false for a

  31. // message based socket connection.

  32. ZeroReadIsEOF bool


  33. // Whether this is a file rather than a network socket.

  34. isFile bool

  35. }

pollDesc

前面提到了 pollDesc 是底层事件驱动的封装,netFD 通过它来完成各种 I/O 相关的操作,它的定义如下:

  
    
  
  
  
  1. type pollDesc struct {

  2. runtimeCtx uintptr

  3. }

这里的 struct 只包含了一个指针,而通过 pollDesc 的 init 方法,我们可以找到它具体的定义是在 runtime.pollDesc这里:

  
    
  
  
  
  1. func (pd *pollDesc) init(fd *FD) error {

  2. serverInit.Do(runtime_pollServerInit)

  3. ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))

  4. if errno != 0 {

  5. if ctx != 0 {

  6. runtime_pollUnblock(ctx)

  7. runtime_pollClose(ctx)

  8. }

  9. return syscall.Errno(errno)

  10. }

  11. pd.runtimeCtx = ctx

  12. return nil

  13. }


  14. // Network poller descriptor.

  15. //

  16. // No heap pointers.

  17. //

  18. //go:notinheap

  19. type pollDesc struct {

  20. link *pollDesc // in pollcache, protected by pollcache.lock


  21. // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations.

  22. // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime.

  23. // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification)

  24. // proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated

  25. // in a lock-free way by all operations.

  26. // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg),

  27. // that will blow up when GC starts moving objects.

  28. lock mutex // protects the following fields

  29. fd uintptr

  30. closing bool

  31. everr bool // marks event scanning error happened

  32. user uint32 // user settable cookie

  33. rseq uintptr // protects from stale read timers

  34. rg uintptr // pdReady, pdWait, G waiting for read or nil

  35. rt timer // read deadline timer (set if rt.f != nil)

  36. rd int64 // read deadline

  37. wseq uintptr // protects from stale write timers

  38. wg uintptr // pdReady, pdWait, G waiting for write or nil

  39. wt timer // write deadline timer

  40. wd int64 // write deadline

  41. }

runtime.pollDesc包含自身类型的一个指针,用来保存下一个 runtime.pollDesc的地址,以此来实现链表,可以减少数据结构的大小,所有的 runtime.pollDesc保存在 runtime.pollCache结构中,定义如下:

  
    
  
  
  
  1. type pollCache struct {

  2. lock mutex

  3. first *pollDesc

  4. // PollDesc objects must be type-stable,

  5. // because we can get ready notification from epoll/kqueue

  6. // after the descriptor is closed/reused.

  7. // Stale notifications are detected using seq variable,

  8. // seq is incremented when deadlines are changed or descriptor is reused.

  9. }

net.Listen

调用 net.Listen之后,底层会通过 Linux 的系统调用 socket 方法创建一个 fd 分配给 listener,并用以来初始化 listener 的 netFD,接着调用 netFD 的 listenStream方法完成对 socket 的 bind&listen 操作以及对 netFD 的初始化(主要是对 netFD 里的 pollDesc 的初始化),相关源码如下:

  
    
  
  
  
  1. // 调用 linux 系统调用 socket 创建 listener fd 并设置为为阻塞 I/O

  2. s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)

  3. // On Linux the SOCK_NONBLOCK and SOCK_CLOEXEC flags were

  4. // introduced in 2.6.27 kernel and on FreeBSD both flags were

  5. // introduced in 10 kernel. If we get an EINVAL error on Linux

  6. // or EPROTONOSUPPORT error on FreeBSD, fall back to using

  7. // socket without them.


  8. socketFunc func(int, int, int) (int, error) = syscall.Socket


  9. // 用上面创建的 listener fd 初始化 listener netFD

  10. if fd, err = newFD(s, family, sotype, net); err != nil {

  11. poll.CloseFunc(s)

  12. return nil, err

  13. }


  14. // 对 listener fd 进行 bind&listen 操作,并且调用 init 方法完成初始化

  15. func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {

  16. // ...


  17. // 完成绑定操作

  18. if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {

  19. return os.NewSyscallError("bind", err)

  20. }


  21. // 完成监听操作

  22. if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {

  23. return os.NewSyscallError("listen", err)

  24. }


  25. // 调用 init,内部会调用 poll.FD.Init,最后调用 pollDesc.init

  26. if err = fd.init(); err != nil {

  27. return err

  28. }

  29. lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)

  30. fd.setAddr(fd.addrFunc()(lsa), nil)

  31. return nil

  32. }


  33. // 使用 sync.Once 来确保一个 listener 只持有一个 epoll 实例

  34. var serverInit sync.Once


  35. // netFD.init 会调用 poll.FD.Init 并最终调用到 pollDesc.init,

  36. // 它会创建 epoll 实例并把 listener fd 加入监听队列

  37. func (pd *pollDesc) init(fd *FD) error {

  38. // runtime_pollServerInit 内部调用了 netpollinit 来创建 epoll 实例

  39. serverInit.Do(runtime_pollServerInit)


  40. // runtime_pollOpen 内部调用了 netpollopen 来将 listener fd 注册到 epoll 实例中,

  41. // 另外,它会初始化一个 pollDesc 并返回

  42. ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))

  43. if errno != 0 {

  44. if ctx != 0 {

  45. runtime_pollUnblock(ctx)

  46. runtime_pollClose(ctx)

  47. }

  48. return syscall.Errno(errno)

  49. }

  50. // 把真正初始化完成的 pollDesc 实例赋值给当前的 pollDesc 代表自身的指针,后续使用直接通过该指针操作

  51. pd.runtimeCtx = ctx

  52. return nil

  53. }

我们前面提到的 epoll 的三个基本调用,Go 在源码里实现了对那三个调用的封装:

  
    
  
  
  
  1. #include <sys/epoll.h>

  2. int epoll_create(int size);

  3. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

  4. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);


  5. // Go 对上面三个调用的封装

  6. func netpollinit()

  7. func netpollopen(fd uintptr, pd *pollDesc) int32

  8. func netpoll(block bool) gList

netFD 就是通过这三个封装来对 epoll 进行创建实例、注册 fd 和等待事件操作的。

Listener.Accept()

Listener.Accept()接收来自客户端的新连接,具体还是调用 netFD.accept方法来完成这个功能:

  
    
  
  
  
  1. // Accept implements the Accept method in the Listener interface; it

  2. // waits for the next call and returns a generic Conn.

  3. func (l *TCPListener) Accept() (Conn, error) {

  4. if !l.ok() {

  5. return nil, syscall.EINVAL

  6. }

  7. c, err := l.accept()

  8. if err != nil {

  9. return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}

  10. }

  11. return c, nil

  12. }


  13. func (ln *TCPListener) accept() (*TCPConn, error) {

  14. fd, err := ln.fd.accept()

  15. if err != nil {

  16. return nil, err

  17. }

  18. tc := newTCPConn(fd)

  19. if ln.lc.KeepAlive >= 0 {

  20. setKeepAlive(fd, true)

  21. ka := ln.lc.KeepAlive

  22. if ln.lc.KeepAlive == 0 {

  23. ka = defaultTCPKeepAlive

  24. }

  25. setKeepAlivePeriod(fd, ka)

  26. }

  27. return tc, nil

  28. }

netFD.accept方法里再调用 poll.FD.Accept,最后会使用 linux 的系统调用 accept来完成新连接的接收,并且会把 accept 的 socket 设置成非阻塞 I/O 模式:

  
    
  
  
  
  1. // Accept wraps the accept network call.

  2. func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {

  3. if err := fd.readLock(); err != nil {

  4. return -1, nil, "", err

  5. }

  6. defer fd.readUnlock()


  7. if err := fd.pd.prepareRead(fd.isFile); err != nil {

  8. return -1, nil, "", err

  9. }

  10. for {

  11. // 使用 linux 系统调用 accept 接收新连接,创建对应的 socket

  12. s, rsa, errcall, err := accept(fd.Sysfd)

  13. // 因为 listener fd 在创建的时候已经设置成非阻塞的了,

  14. // 所以 accept 方法会直接返回,不管有没有新连接到来;如果 err == nil 则表示正常建立新连接,直接返回

  15. if err == nil {

  16. return s, rsa, "", err

  17. }

  18. // 如果 err != nil,则判断 err == syscall.EAGAIN,符合条件则进入 pollDesc.waitRead 方法

  19. switch err {

  20. case syscall.EAGAIN:

  21. if fd.pd.pollable() {

  22. // 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里

  23. if err = fd.pd.waitRead(fd.isFile); err == nil {

  24. continue

  25. }

  26. }

  27. case syscall.ECONNABORTED:

  28. // This means that a socket on the listen

  29. // queue was closed before we Accept()ed it;

  30. // it's a silly error, so try again.

  31. continue

  32. }

  33. return -1, nil, errcall, err

  34. }

  35. }


  36. // 使用 linux 的 accept 系统调用接收新连接并把这个 socket fd 设置成非阻塞 I/O

  37. ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)

  38. // On Linux the accept4 system call was introduced in 2.6.28

  39. // kernel and on FreeBSD it was introduced in 10 kernel. If we

  40. // get an ENOSYS error on both Linux and FreeBSD, or EINVAL

  41. // error on Linux, fall back to using accept.


  42. // Accept4Func is used to hook the accept4 call.

  43. var Accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4

pollDesc.waitRead方法主要负责检测当前这个 pollDesc 的上层 netFD 对应的 fd 是否有『期待的』I/O 事件发生,如果有就直接返回,否则就 park 住当前的 goroutine 并持续等待直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,然后它就会返回到外层的 for 循环,让 goroutine 继续执行逻辑。

Conn.Read/Conn.Write

我们先来看看 Conn.Read方法是如何实现的,原理其实和 Listener.Accept 是一样的,具体调用链还是首先调用 conn 的 netFD.Read,然后内部再调用 poll.FD.Read,最后使用 linux 的系统调用 read: syscall.Read完成数据读取:

  
    
  
  
  
  1. // Implementation of the Conn interface.


  2. // Read implements the Conn Read method.

  3. func (c *conn) Read(b []byte) (int, error) {

  4. if !c.ok() {

  5. return 0, syscall.EINVAL

  6. }

  7. n, err := c.fd.Read(b)

  8. if err != nil && err != io.EOF {

  9. err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}

  10. }

  11. return n, err

  12. }


  13. func (fd *netFD) Read(p []byte) (n int, err error) {

  14. n, err = fd.pfd.Read(p)

  15. runtime.KeepAlive(fd)

  16. return n, wrapSyscallError("read", err)

  17. }


  18. // Read implements io.Reader.

  19. func (fd *FD) Read(p []byte) (int, error) {

  20. if err := fd.readLock(); err != nil {

  21. return 0, err

  22. }

  23. defer fd.readUnlock()

  24. if len(p) == 0 {

  25. // If the caller wanted a zero byte read, return immediately

  26. // without trying (but after acquiring the readLock).

  27. // Otherwise syscall.Read returns 0, nil which looks like

  28. // io.EOF.

  29. // TODO(bradfitz): make it wait for readability? (Issue 15735)

  30. return 0, nil

  31. }

  32. if err := fd.pd.prepareRead(fd.isFile); err != nil {

  33. return 0, err

  34. }

  35. if fd.IsStream && len(p) > maxRW {

  36. p = p[:maxRW]

  37. }

  38. for {

  39. // 尝试从该 socket 读取数据,因为 socket 在被 listener accept 的时候设置成了非阻塞 I/O,

  40. // 所以这里同样也是直接返回,不管有没有可读的数据

  41. n, err := syscall.Read(fd.Sysfd, p)

  42. if err != nil {

  43. n = 0

  44. // err == syscall.EAGAIN 表示当前没有期待的 I/O 事件发生,也就是 socket 不可读

  45. if err == syscall.EAGAIN && fd.pd.pollable() {

  46. // 如果当前没有发生期待的 I/O 事件,那么 waitRead 会通过 park goroutine 让逻辑 block 在这里

  47. if err = fd.pd.waitRead(fd.isFile); err == nil {

  48. continue

  49. }

  50. }


  51. // On MacOS we can see EINTR here if the user

  52. // pressed ^Z. See issue #22838.

  53. if runtime.GOOS == "darwin" && err == syscall.EINTR {

  54. continue

  55. }

  56. }

  57. err = fd.eofError(n, err)

  58. return n, err

  59. }

  60. }

conn.Writeconn.Read的原理是一致的,它也是通过类似 pollDesc.waitReadpollDesc.waitWrite来 park 住 goroutine 直至期待的 I/O 事件发生才返回,而 pollDesc.waitWrite的内部实现原理和 pollDesc.waitRead是一样的,都是基于 runtime_pollWait,这里就不再赘述。

pollDesc.waitRead

pollDesc.waitRead内部调用了 runtime_pollWait来达成无 I/O 事件时 park 住 goroutine 的目的:

  
    
  
  
  
  1. //go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait

  2. func poll_runtime_pollWait(pd *pollDesc, mode int) int {

  3. err := netpollcheckerr(pd, int32(mode))

  4. if err != 0 {

  5. return err

  6. }

  7. // As for now only Solaris, illumos, and AIX use level-triggered IO.

  8. if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {

  9. netpollarm(pd, mode)

  10. }

  11. // 进入 netpollblock 并且判断是否有期待的 I/O 事件发生,这里的 for 循环是为了一直等到 io ready

  12. for !netpollblock(pd, int32(mode), false) {

  13. err = netpollcheckerr(pd, int32(mode))

  14. if err != 0 {

  15. return err

  16. }

  17. // Can happen if timeout has fired and unblocked us,

  18. // but before we had a chance to run, timeout has been reset.

  19. // Pretend it has not happened and retry.

  20. }

  21. return 0

  22. }


  23. // returns true if IO is ready, or false if timedout or closed

  24. // waitio - wait only for completed IO, ignore errors

  25. func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {

  26. // gpp 代表了 io ready 事件,这里会根据 mode 的值决定是赋值为读等待还是写等待事件

  27. gpp := &pd.rg

  28. if mode == 'w' {

  29. gpp = &pd.wg

  30. }


  31. // set the gpp semaphore to WAIT

  32. // 这个 for 循环是为了等待 io ready 或者 io wait

  33. for {

  34. old := *gpp

  35. // gpp == pdReady 表示此时已有期待的 I/O 事件发生,

  36. // 可以直接返回 unblock 当前 goroutine 并执行响应的 I/O 操作

  37. if old == pdReady {

  38. *gpp = 0

  39. return true

  40. }

  41. if old != 0 {

  42. throw("runtime: double wait")

  43. }

  44. // 如果没有期待的 I/O 事件发生,则通过原子操作把 gpp 的值置为 pdWait 并退出 for 循环

  45. if atomic.Casuintptr(gpp, 0, pdWait) {

  46. break

  47. }

  48. }


  49. // need to recheck error states after setting gpp to WAIT

  50. // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl

  51. // do the opposite: store to closing/rd/wd, membarrier, load of rg/wg


  52. // waitio 此时是 false,netpollcheckerr 方法当前 pollDesc 对应的 fd 是否是正常的,

  53. //通常来说 netpollcheckerr(pd, mode) == 0 是成立的,所以这里会执行 gopark 把当前 goroutine 给 park住,直至对应的 fd 上发生可读/可写或者其他『期待的』I/O 事件为止,然后 unpark 返回

  54. if waitio || netpollcheckerr(pd, mode) == 0 {

  55. gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)

  56. }

  57. // be careful to not lose concurrent READY notification

  58. old := atomic.Xchguintptr(gpp, 0)

  59. if old > pdWait {

  60. throw("runtime: corrupted polldesc")

  61. }

  62. return old == pdReady

  63. }

netpoll

前面已经从源码的角度分析完了 netpoll 是如何通过 park goroutine 从而达到阻塞 Accept/Read/Write 的效果,而通过调用 gopark,goroutine 会被放置在某个等待队列中(如 channel 的 waitq ,此时 G 的状态由 _Grunning_Gwaitting),因此G必须被手动唤醒(通过 goready ),否则会丢失任务,应用层阻塞通常使用这种方式。

所以,最后还有一个非常关键的问题是:当 I/O 事件发生之后,netpoll 是通过什么方式唤醒那些在 I/O wait 的 goroutine 的?答案是通过 epoll_wait,在 Go 源码中的 src/runtime/netpoll_epoll.go文件中有一个 func netpoll(blockbool)gList 方法,它会内部调用 epoll_wait获取就绪的 fd 列表,并将每个 fd 对应的 goroutine 添加到链表返回

  
    
  
  
  
  1. // polls for ready network connections

  2. // returns list of goroutines that become runnable

  3. func netpoll(block bool) gList {

  4. if epfd == -1 {

  5. return gList{}

  6. }

  7. waitms := int32(-1)

  8. // 是否以阻塞模式调用 epoll_wait

  9. if !block {

  10. waitms = 0

  11. }

  12. var events [128]epollevent

  13. retry:

  14. // 获取就绪的 fd 列表

  15. n := epollwait(epfd, &events[0], int32(len(events)), waitms)

  16. if n < 0 {

  17. if n != -_EINTR {

  18. println("runtime: epollwait on fd", epfd, "failed with", -n)

  19. throw("runtime: netpoll failed")

  20. }

  21. goto retry

  22. }

  23. var toRun gList

  24. for i := int32(0); i < n; i++ {

  25. ev := &events[i]

  26. if ev.events == 0 {

  27. continue

  28. }

  29. var mode int32

  30. if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {

  31. mode += 'r'

  32. }

  33. if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {