go net包里的异步IO实现原理分析
Table of Contents
以DialTCP为例 分析初始化 epoll_create的流程分析过程中只保留关键代码
epoll_create 的跟踪
流程之 设置IO非阻塞 syscall.SetNonblock(s, true)设置NonBlock 为true,
DialTCP()->dialTCP()->internetSocket()->socket()
在socket()函数内首先会走到这个分支 sysSocket()->syscall.SetNonblock(s, true); 这里设置NonBlock 为true,即IO是非阻塞的
net/tcpsock.go/DialTCP
func DialTCP(net string, laddr, raddr *TCPAddr) (*TCPConn, error) { c, err := dialTCP(context.Background(), net, laddr, raddr) }
net/tcpsockopt_posix.go
func dialTCP(ctx context.Context, net string, laddr, raddr *TCPAddr) (*TCPConn, error) { if testHookDialTCP != nil { return testHookDialTCP(ctx, net, laddr, raddr) } return doDialTCP(ctx, net, laddr, raddr) } func doDialTCP(ctx context.Context, net string, laddr, raddr *TCPAddr) (*TCPConn, error) { fd, err := internetSocket(ctx, net, laddr, raddr, syscall.SOCK_STREAM, 0, "dial") return newTCPConn(fd), nil } func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (fd *netFD, err error) { family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode) return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr) }
net/sock_posix.go
// socket returns a network file descriptor that is ready for // asynchronous I/O using the network poller. func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) { s, err := sysSocket(family, sotype, proto) // 里面设置NonBlock 为true,即IO是非阻塞的 if err != nil { return nil, err } //重要的一行,这里创建了 netFD结构体, if fd, err = newFD(s, family, sotype, net); err != nil { closeFunc(s) return nil, err } if err := fd.dial(ctx, laddr, raddr); err != nil { //这里call dial 函数 fd.Close() return nil, err } return fd, nil }
net/sock_cloexec.go
func sysSocket(family, sotype, proto int) (int, error) { if err = syscall.SetNonblock(s, true); err != nil { //这里设置NonBlock 为true,即IO是非阻塞的 closeFunc(s) return -1, os.NewSyscallError("setnonblock", err) } }
epoll_create的流程 即epoll的初始化
接着上面的socket()函数看
然后socket()内的流程 -> fd.dial()->fd.connect()->fd.init()
会看到 流程会走到fd.init()的调用,这里的fd 是一个netFD结构
net/sock_posix.go
// socket returns a network file descriptor that is ready for // asynchronous I/O using the network poller. func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) { s, err := sysSocket(family, sotype, proto) // 里面设置NonBlock 为true,即IO是非阻塞的 if err != nil { return nil, err } if err := fd.dial(ctx, laddr, raddr); err != nil { //这里call dial 函数 fd.Close() return nil, err } return fd, nil } func (fd *netFD) dial(ctx context.Context, laddr, raddr sockaddr) error { //这个函数 无论哪个分支 最终都会走到 fd.init() 下面有注释 if raddr != nil { if rsa, err = raddr.sockaddr(fd.family); err != nil { return err } if err := fd.connect(ctx, lsa, rsa); err != nil { return err } fd.isConnected = true } else { if err := fd.init(); err != nil {//最终会走到 fd.init() return err } } return nil } func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret error) { if err := fd.init(); err != nil { // 最终会call到 fd.init() return err }
net/fd_unix.go 然后看看netFD结构体的init(),就是上面的fd.init()
// Network file descriptor. type netFD struct { // locking/lifetime of sysfd + serialize access to Read and Write methods fdmu fdMutex // immutable until Close sysfd int family int sotype int isConnected bool net string laddr Addr raddr Addr // wait server pd pollDesc //这个结构体很重要 } func (fd *netFD) init() error { //fd.pd 是个pollDesc类型,具体的pd.init()见下面 if err := fd.pd.init(fd); err != nil { return err } return nil }
net/fd_poll_runtime.go 注意找到文件
// runtimeNano returns the current value of the runtime clock in nanoseconds. func runtimeNano() int64 func runtime_pollServerInit() func runtime_pollOpen(fd uintptr) (uintptr, int) func runtime_pollClose(ctx uintptr) func runtime_pollWait(ctx uintptr, mode int) int func runtime_pollWaitCanceled(ctx uintptr, mode int) int func runtime_pollReset(ctx uintptr, mode int) int func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) func runtime_pollUnblock(ctx uintptr) type pollDesc struct { runtimeCtx uintptr } var serverInit sync.Once func (pd *pollDesc) init(fd *netFD) error { // 这里的代码开始进入真正的初始化,通过sync.Once来保证runtime_pollServerInit这个函数只会被调用一次 //文件开头却只是声明了runtime_pollServerInit一个空函数 serverInit.Do(runtime_pollServerInit) 上面是epoll_create的初始化 下面这句则是本fd的open过程 //runtime_pollOpen的跟踪,直接看本文 runtime_pollOpen分支 ctx, errno := runtime_pollOpen(uintptr(fd.sysfd)) if errno != 0 { return syscall.Errno(errno) } pd.runtimeCtx = ctx return nil }
runtime_pollServerInit跟踪 这个过程会跟踪到epollcreate的调用
net/fd_poll_runtime.go 文件中声明的 runtime_pollServerInit是个空函数,导致似乎跟踪到此就结束了
然后似乎没非如此,用grep在整个代码搜索runtime_pollServerInit 会有以下结果
./net/fd_poll_runtime.go:18:func runtime_pollServerInit()
./net/fd_poll_runtime.go:34: serverInit.Do(runtime_pollServerInit)
./runtime/netpoll.go:80://go:linkname net_runtime_pollServerInit net.runtime_pollServerInit
./runtime/netpoll.go:81:func net_runtime_pollServerInit() {
我们打开./runtime/netpoll.go找到80行左右有如下代码
//从这行注释来看 net.runtime_pollServerInit 似乎是有关联的, //怀疑 go compile时会对其进行相应的处理, //这里暂且就认为 net.runtime_pollServerInit() == runtime.net_runtime_pollServerInit() //go:linkname net_runtime_pollServerInit net.runtime_pollServerInit func net_runtime_pollServerInit() { netpollinit() atomic.Store(&netpollInited, 1) }
runtime/netpoll_epoll.go
var ( epfd int32 = -1 // epoll descriptor ) func netpollinit() { epfd = epollcreate1(_EPOLL_CLOEXEC) if epfd >= 0 { return } epfd = epollcreate(1024) // 看到这里就看到了epollcreate的身影了 if epfd >= 0 { closeonexec(epfd) return } println("netpollinit: failed to create epoll descriptor", -epfd) throw("netpollinit: failed to create descriptor") }
epoll_ctrl的跟踪
我们再次回到上文中提到的一块代码pollDesc.init()的过程
net/fd_poll_runtime.go 注意找到文件
// runtimeNano returns the current value of the runtime clock in nanoseconds. func runtimeNano() int64 func runtime_pollServerInit() func runtime_pollOpen(fd uintptr) (uintptr, int) func runtime_pollClose(ctx uintptr) func runtime_pollWait(ctx uintptr, mode int) int func runtime_pollWaitCanceled(ctx uintptr, mode int) int func runtime_pollReset(ctx uintptr, mode int) int func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) func runtime_pollUnblock(ctx uintptr) type pollDesc struct { runtimeCtx uintptr } var serverInit sync.Once func (pd *pollDesc) init(fd *netFD) error { serverInit.Do(runtime_pollServerInit) 上面是epoll_create的初始化 下面这句则是本fd的open过程 //runtime_pollOpen的跟踪, ctx, errno := runtime_pollOpen(uintptr(fd.sysfd)) //重点这一行 if errno != 0 { return syscall.Errno(errno) } pd.runtimeCtx = ctx return nil }
runtime/netpoll.go 相同的方式,我们找到了net_runtime_pollOpen这个函数
//go:linkname net_runtime_pollOpen net.runtime_pollOpen func net_runtime_pollOpen(fd uintptr) (*pollDesc, int) { pd := pollcache.alloc() errno = netpollopen(fd, pd) return pd, int(errno) }
runtime/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd //分析到这里,我们找到epollctl的身影 //再往下分析就分析到各种操作系统使用汇编实现epollctl的过程了,epollctl到此结束 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) }
epoll_wait 与golang 的异步IO
runtime/netpoll_epoll.go文件中 我们找到了 epollwait的身影
先不管哪些地方会调用到这,先分析下这个函数
文档中有这样一段描述
http://man7.org/linux/man-pages/man2/epoll_wait.2.html
When successful, epoll_wait() returns the number of file descriptors
ready for the requested I/O, or zero if no file descriptor became
ready during the requested timeout milliseconds. When an error
occurs, epoll_wait() returns -1 and errno is set appropriately.
// polls for ready network connections // returns list of goroutines that become runnable func netpoll(block bool) *g { if epfd == -1 { return nil } waitms := int32(-1) //-1 表示无限期的block if !block { waitms = 0 //马上返回,即使没有任何事件发生 } var events [128]epollevent 我的理解的(不知道对不对): 下面这段代码 基本可以确定无论传入参数block 值为何 都在一值在epollwait这里等待 直到收到内核相应的处理结果才会返回,差别仅仅是block在内核还是block 在这里 retry: n := epollwait(epfd, &events[0], int32(len(events)), waitms) // if n < 0 { if n != -_EINTR { # EINTR The call was interrupted by a signal handler before either (1) # any of the requested events occurred or (2) the timeout # expired; see signal(7). println("runtime: epollwait on fd", epfd, "failed with", -n) throw("epollwait failed") } goto retry } var gp guintptr for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } var mode int32 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) netpollready(&gp, pd, mode) //内核数据准备就继,可以进行IO了 } } if block && gp == 0 { goto retry } return gp.ptr() }
再往下分析就是 goroutines的调度分析了,
在runtime.proc.go这个文件中可以看到netpoll的身影,goroutines的调度 暂时还没时间看, 先不乱贴代码了,留待以后分析
sysmon goroutine调度 与epoll_wait 相关代码分析
从runtime/proc.go 文件中 的入口函数,入手
可以看到它会走到sysmon函数
func main() { g := getg() // Racectx of m0->g0 is used only as the parent of the main goroutine. // It must not be used for anything else. g.m.g0.racectx = 0 // Max stack size is 1 GB on 64-bit, 250 MB on 32-bit. // Using decimal instead of binary GB and MB because // they look nicer in the stack overflow failure message. if sys.PtrSize == 8 { maxstacksize = 1000000000 } else { maxstacksize = 250000000 } // Allow newproc to start new Ms. mainStarted = true systemstack(func() { // 重点看这里,newm 函数的意义可以理解为启动一个单独的操作系统线程 // 运行sysmon函数内的代码 newm(sysmon, nil) })
继续看sysmon 函数相关代码
func sysmon() { //可以看到其内部会有一个无限for循环中定期sleep一定的时长 //也就是说这个线程会每隔一定的时长就会执行一次for里面的代码 for { // if idle == 0 { // start with 20us sleep... delay = 20 } else if idle > 50 { // start doubling the sleep after 1ms... delay *= 2 } if delay > 10*1000 { // up to 10ms delay = 10 * 1000 } usleep(delay) //... 只保留与netpull相关的代码 // poll network if not polled for more than 10ms lastpoll := int64(atomic.Load64(&sched.lastpoll)) now := nanotime() if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { // 这里检查netpoll 是否初始化,如果初始化,会调用netpoll函数 // netpoll这个函数我们上面分析过,里面会调用epoll_wait 函数来检查io是否就续 // 并将就续的goroutine返回,以便尝试对这些goroutine进行调度 atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) gp := netpoll(false) // non-blocking - returns list of goroutines if gp != nil { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. // Otherwise it can lead to the following situation: // injectglist grabs all P's but before it starts M's to run the P's, // another M returns from syscall, finishes running its G, // observes that there is no work to do and no other running M's // and reports deadlock. incidlelocked(-1) injectglist(gp) incidlelocked(1) } }