go net包里的异步IO实现原理分析
Table of Contents
以DialTCP为例 分析初始化 epoll_create的流程分析过程中只保留关键代码
epoll_create 的跟踪
流程之 设置IO非阻塞 syscall.SetNonblock(s, true)设置NonBlock 为true,
DialTCP()->dialTCP()->internetSocket()->socket()
在socket()函数内首先会走到这个分支 sysSocket()->syscall.SetNonblock(s, true); 这里设置NonBlock 为true,即IO是非阻塞的
net/tcpsock.go/DialTCP
func DialTCP(net string, laddr, raddr *TCPAddr) (*TCPConn, error) { c, err := dialTCP(context.Background(), net, laddr, raddr) }net/tcpsockopt_posix.go
func dialTCP(ctx context.Context, net string, laddr, raddr *TCPAddr) (*TCPConn, error) { if testHookDialTCP != nil { return testHookDialTCP(ctx, net, laddr, raddr) } return doDialTCP(ctx, net, laddr, raddr) } func doDialTCP(ctx context.Context, net string, laddr, raddr *TCPAddr) (*TCPConn, error) { fd, err := internetSocket(ctx, net, laddr, raddr, syscall.SOCK_STREAM, 0, "dial") return newTCPConn(fd), nil } func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (fd *netFD, err error) { family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode) return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr) }net/sock_posix.go
// socket returns a network file descriptor that is ready for // asynchronous I/O using the network poller. func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) { s, err := sysSocket(family, sotype, proto) // 里面设置NonBlock 为true,即IO是非阻塞的 if err != nil { return nil, err } //重要的一行,这里创建了 netFD结构体, if fd, err = newFD(s, family, sotype, net); err != nil { closeFunc(s) return nil, err } if err := fd.dial(ctx, laddr, raddr); err != nil { //这里call dial 函数 fd.Close() return nil, err } return fd, nil }net/sock_cloexec.go
func sysSocket(family, sotype, proto int) (int, error) { if err = syscall.SetNonblock(s, true); err != nil { //这里设置NonBlock 为true,即IO是非阻塞的 closeFunc(s) return -1, os.NewSyscallError("setnonblock", err) } }
epoll_create的流程 即epoll的初始化
接着上面的socket()函数看
然后socket()内的流程 -> fd.dial()->fd.connect()->fd.init()
会看到 流程会走到fd.init()的调用,这里的fd 是一个netFD结构
net/sock_posix.go
// socket returns a network file descriptor that is ready for // asynchronous I/O using the network poller. func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) { s, err := sysSocket(family, sotype, proto) // 里面设置NonBlock 为true,即IO是非阻塞的 if err != nil { return nil, err } if err := fd.dial(ctx, laddr, raddr); err != nil { //这里call dial 函数 fd.Close() return nil, err } return fd, nil } func (fd *netFD) dial(ctx context.Context, laddr, raddr sockaddr) error { //这个函数 无论哪个分支 最终都会走到 fd.init() 下面有注释 if raddr != nil { if rsa, err = raddr.sockaddr(fd.family); err != nil { return err } if err := fd.connect(ctx, lsa, rsa); err != nil { return err } fd.isConnected = true } else { if err := fd.init(); err != nil {//最终会走到 fd.init() return err } } return nil } func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret error) { if err := fd.init(); err != nil { // 最终会call到 fd.init() return err }net/fd_unix.go 然后看看netFD结构体的init(),就是上面的fd.init()
// Network file descriptor. type netFD struct { // locking/lifetime of sysfd + serialize access to Read and Write methods fdmu fdMutex // immutable until Close sysfd int family int sotype int isConnected bool net string laddr Addr raddr Addr // wait server pd pollDesc //这个结构体很重要 } func (fd *netFD) init() error { //fd.pd 是个pollDesc类型,具体的pd.init()见下面 if err := fd.pd.init(fd); err != nil { return err } return nil }net/fd_poll_runtime.go 注意找到文件
// runtimeNano returns the current value of the runtime clock in nanoseconds. func runtimeNano() int64 func runtime_pollServerInit() func runtime_pollOpen(fd uintptr) (uintptr, int) func runtime_pollClose(ctx uintptr) func runtime_pollWait(ctx uintptr, mode int) int func runtime_pollWaitCanceled(ctx uintptr, mode int) int func runtime_pollReset(ctx uintptr, mode int) int func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) func runtime_pollUnblock(ctx uintptr) type pollDesc struct { runtimeCtx uintptr } var serverInit sync.Once func (pd *pollDesc) init(fd *netFD) error { // 这里的代码开始进入真正的初始化,通过sync.Once来保证runtime_pollServerInit这个函数只会被调用一次 //文件开头却只是声明了runtime_pollServerInit一个空函数 serverInit.Do(runtime_pollServerInit) 上面是epoll_create的初始化 下面这句则是本fd的open过程 //runtime_pollOpen的跟踪,直接看本文 runtime_pollOpen分支 ctx, errno := runtime_pollOpen(uintptr(fd.sysfd)) if errno != 0 { return syscall.Errno(errno) } pd.runtimeCtx = ctx return nil }
runtime_pollServerInit跟踪 这个过程会跟踪到epollcreate的调用
net/fd_poll_runtime.go 文件中声明的 runtime_pollServerInit是个空函数,导致似乎跟踪到此就结束了
然后似乎没非如此,用grep在整个代码搜索runtime_pollServerInit 会有以下结果
./net/fd_poll_runtime.go:18:func runtime_pollServerInit()
./net/fd_poll_runtime.go:34: serverInit.Do(runtime_pollServerInit)
./runtime/netpoll.go:80://go:linkname net_runtime_pollServerInit net.runtime_pollServerInit
./runtime/netpoll.go:81:func net_runtime_pollServerInit() {
我们打开./runtime/netpoll.go找到80行左右有如下代码
//从这行注释来看 net.runtime_pollServerInit 似乎是有关联的,
//怀疑 go compile时会对其进行相应的处理,
//这里暂且就认为 net.runtime_pollServerInit() == runtime.net_runtime_pollServerInit()
//go:linkname net_runtime_pollServerInit net.runtime_pollServerInit
func net_runtime_pollServerInit() {
netpollinit()
atomic.Store(&netpollInited, 1)
}
runtime/netpoll_epoll.go
var (
epfd int32 = -1 // epoll descriptor
)
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
if epfd >= 0 {
return
}
epfd = epollcreate(1024) // 看到这里就看到了epollcreate的身影了
if epfd >= 0 {
closeonexec(epfd)
return
}
println("netpollinit: failed to create epoll descriptor", -epfd)
throw("netpollinit: failed to create descriptor")
}
epoll_ctrl的跟踪
我们再次回到上文中提到的一块代码pollDesc.init()的过程
net/fd_poll_runtime.go 注意找到文件
// runtimeNano returns the current value of the runtime clock in nanoseconds. func runtimeNano() int64 func runtime_pollServerInit() func runtime_pollOpen(fd uintptr) (uintptr, int) func runtime_pollClose(ctx uintptr) func runtime_pollWait(ctx uintptr, mode int) int func runtime_pollWaitCanceled(ctx uintptr, mode int) int func runtime_pollReset(ctx uintptr, mode int) int func runtime_pollSetDeadline(ctx uintptr, d int64, mode int) func runtime_pollUnblock(ctx uintptr) type pollDesc struct { runtimeCtx uintptr } var serverInit sync.Once func (pd *pollDesc) init(fd *netFD) error { serverInit.Do(runtime_pollServerInit) 上面是epoll_create的初始化 下面这句则是本fd的open过程 //runtime_pollOpen的跟踪, ctx, errno := runtime_pollOpen(uintptr(fd.sysfd)) //重点这一行 if errno != 0 { return syscall.Errno(errno) } pd.runtimeCtx = ctx return nil }runtime/netpoll.go 相同的方式,我们找到了net_runtime_pollOpen这个函数
//go:linkname net_runtime_pollOpen net.runtime_pollOpen func net_runtime_pollOpen(fd uintptr) (*pollDesc, int) { pd := pollcache.alloc() errno = netpollopen(fd, pd) return pd, int(errno) }runtime/netpoll_epoll.go
func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd //分析到这里,我们找到epollctl的身影 //再往下分析就分析到各种操作系统使用汇编实现epollctl的过程了,epollctl到此结束 return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) }
epoll_wait 与golang 的异步IO
runtime/netpoll_epoll.go文件中 我们找到了 epollwait的身影
先不管哪些地方会调用到这,先分析下这个函数
文档中有这样一段描述
http://man7.org/linux/man-pages/man2/epoll_wait.2.html
When successful, epoll_wait() returns the number of file descriptors
ready for the requested I/O, or zero if no file descriptor became
ready during the requested timeout milliseconds. When an error
occurs, epoll_wait() returns -1 and errno is set appropriately.
// polls for ready network connections
// returns list of goroutines that become runnable
func netpoll(block bool) *g {
if epfd == -1 {
return nil
}
waitms := int32(-1) //-1 表示无限期的block
if !block {
waitms = 0 //马上返回,即使没有任何事件发生
}
var events [128]epollevent
我的理解的(不知道对不对):
下面这段代码 基本可以确定无论传入参数block 值为何 都在一值在epollwait这里等待
直到收到内核相应的处理结果才会返回,差别仅仅是block在内核还是block 在这里
retry:
n := epollwait(epfd, &events[0], int32(len(events)), waitms) //
if n < 0 {
if n != -_EINTR {
# EINTR The call was interrupted by a signal handler before either (1)
# any of the requested events occurred or (2) the timeout
# expired; see signal(7).
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("epollwait failed")
}
goto retry
}
var gp guintptr
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
netpollready(&gp, pd, mode) //内核数据准备就继,可以进行IO了
}
}
if block && gp == 0 {
goto retry
}
return gp.ptr()
}
再往下分析就是 goroutines的调度分析了,
在runtime.proc.go这个文件中可以看到netpoll的身影,goroutines的调度 暂时还没时间看, 先不乱贴代码了,留待以后分析
sysmon goroutine调度 与epoll_wait 相关代码分析
从runtime/proc.go 文件中 的入口函数,入手
可以看到它会走到sysmon函数
func main() {
g := getg()
// Racectx of m0->g0 is used only as the parent of the main goroutine.
// It must not be used for anything else.
g.m.g0.racectx = 0
// Max stack size is 1 GB on 64-bit, 250 MB on 32-bit.
// Using decimal instead of binary GB and MB because
// they look nicer in the stack overflow failure message.
if sys.PtrSize == 8 {
maxstacksize = 1000000000
} else {
maxstacksize = 250000000
}
// Allow newproc to start new Ms.
mainStarted = true
systemstack(func() {
// 重点看这里,newm 函数的意义可以理解为启动一个单独的操作系统线程
// 运行sysmon函数内的代码
newm(sysmon, nil)
})
继续看sysmon 函数相关代码
func sysmon() {
//可以看到其内部会有一个无限for循环中定期sleep一定的时长
//也就是说这个线程会每隔一定的时长就会执行一次for里面的代码
for { //
if idle == 0 { // start with 20us sleep...
delay = 20
} else if idle > 50 { // start doubling the sleep after 1ms...
delay *= 2
}
if delay > 10*1000 { // up to 10ms
delay = 10 * 1000
}
usleep(delay)
//... 只保留与netpull相关的代码
// poll network if not polled for more than 10ms
lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
// 这里检查netpoll 是否初始化,如果初始化,会调用netpoll函数
// netpoll这个函数我们上面分析过,里面会调用epoll_wait 函数来检查io是否就续
// 并将就续的goroutine返回,以便尝试对这些goroutine进行调度
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
gp := netpoll(false) // non-blocking - returns list of goroutines
if gp != nil {
// Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist.
// Otherwise it can lead to the following situation:
// injectglist grabs all P's but before it starts M's to run the P's,
// another M returns from syscall, finishes running its G,
// observes that there is no work to do and no other running M's
// and reports deadlock.
incidlelocked(-1)
injectglist(gp)
incidlelocked(1)
}
}