拾遗笔记

go net包里的异步IO实现原理分析

以DialTCP为例 分析初始化 epoll_create的流程分析过程中只保留关键代码

epoll_create 的跟踪

流程之 设置IO非阻塞 syscall.SetNonblock(s, true)设置NonBlock 为true,

DialTCP()->dialTCP()->internetSocket()->socket()
在socket()函数内首先会走到这个分支 sysSocket()->syscall.SetNonblock(s, true); 这里设置NonBlock 为true,即IO是非阻塞的

  1. net/tcpsock.go/DialTCP
    func DialTCP(net string, laddr, raddr *TCPAddr) (*TCPConn, error) {
        c, err := dialTCP(context.Background(), net, laddr, raddr)
    }
    
  2. net/tcpsockopt_posix.go
           func dialTCP(ctx context.Context, net string, laddr, raddr *TCPAddr) (*TCPConn, error) {
    	   if testHookDialTCP != nil {
    	       return testHookDialTCP(ctx, net, laddr, raddr)
    	   }
    	   return doDialTCP(ctx, net, laddr, raddr)
           }
    
           func doDialTCP(ctx context.Context, net string, laddr, raddr *TCPAddr) (*TCPConn, error) {
    	   fd, err := internetSocket(ctx, net, laddr, raddr, syscall.SOCK_STREAM, 0, "dial")
    
    	   return newTCPConn(fd), nil
           }
    func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (fd *netFD, err error) {
        family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
        return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr)
    }
    
  3. net/sock_posix.go
    // socket returns a network file descriptor that is ready for
    // asynchronous I/O using the network poller.
    func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) {
        s, err := sysSocket(family, sotype, proto)  // 里面设置NonBlock 为true,即IO是非阻塞的
        if err != nil {
    	return nil, err
        }
        //重要的一行,这里创建了 netFD结构体,
        if fd, err = newFD(s, family, sotype, net); err != nil {
    	closeFunc(s)
    	return nil, err
        }
         if err := fd.dial(ctx, laddr, raddr); err != nil { //这里call dial 函数
    	 fd.Close()
    	 return nil, err
         }
    
    
        return fd, nil
    }
    
  4. net/sock_cloexec.go
    func sysSocket(family, sotype, proto int) (int, error) {
        if err = syscall.SetNonblock(s, true); err != nil { //这里设置NonBlock 为true,即IO是非阻塞的
    	closeFunc(s)
    	return -1, os.NewSyscallError("setnonblock", err)
        }
    }
    

epoll_create的流程 即epoll的初始化

接着上面的socket()函数看
然后socket()内的流程 -> fd.dial()->fd.connect()->fd.init()
会看到 流程会走到fd.init()的调用,这里的fd 是一个netFD结构

  1. net/sock_posix.go
    // socket returns a network file descriptor that is ready for
    // asynchronous I/O using the network poller.
    func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) {
        s, err := sysSocket(family, sotype, proto)  // 里面设置NonBlock 为true,即IO是非阻塞的
        if err != nil {
    	return nil, err
        }
    	if err := fd.dial(ctx, laddr, raddr); err != nil { //这里call dial 函数
    	    fd.Close()
    	    return nil, err
    	}
    
    
        return fd, nil
    }
    
    func (fd *netFD) dial(ctx context.Context, laddr, raddr sockaddr) error {
        //这个函数 无论哪个分支 最终都会走到 fd.init()  下面有注释
        if raddr != nil {
    	if rsa, err = raddr.sockaddr(fd.family); err != nil {
    	    return err
    	}
    	if err := fd.connect(ctx, lsa, rsa); err != nil {
    	    return err
    	}
    	fd.isConnected = true
        } else {
    	if err := fd.init(); err != nil {//最终会走到 fd.init()
    	    return err
    	}
        }
        return nil
    }
    func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret error) {
        if err := fd.init(); err != nil { // 最终会call到 fd.init()
    	return err
        }
    
  2. net/fd_unix.go 然后看看netFD结构体的init(),就是上面的fd.init()
    // Network file descriptor.
    type netFD struct {
        // locking/lifetime of sysfd + serialize access to Read and Write methods
        fdmu fdMutex
    
        // immutable until Close
        sysfd       int
        family      int
        sotype      int
        isConnected bool
        net         string
        laddr       Addr
        raddr       Addr
    
        // wait server
        pd pollDesc //这个结构体很重要
    }
    func (fd *netFD) init() error {
        //fd.pd 是个pollDesc类型,具体的pd.init()见下面
        if err := fd.pd.init(fd); err != nil {
    	return err
        }
        return nil
    }
    
  3. net/fd_poll_runtime.go 注意找到文件
    // runtimeNano returns the current value of the runtime clock in nanoseconds.
    func runtimeNano() int64
    
    func runtime_pollServerInit()
    func runtime_pollOpen(fd uintptr) (uintptr, int)
    func runtime_pollClose(ctx uintptr)
    func runtime_pollWait(ctx uintptr, mode int) int
    func runtime_pollWaitCanceled(ctx uintptr, mode int) int
    func runtime_pollReset(ctx uintptr, mode int) int
    func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
    func runtime_pollUnblock(ctx uintptr)
    
    type pollDesc struct {
        runtimeCtx uintptr
    }
    
    var serverInit sync.Once
    
    func (pd *pollDesc) init(fd *netFD) error {
        // 这里的代码开始进入真正的初始化,通过sync.Once来保证runtime_pollServerInit这个函数只会被调用一次
        //文件开头却只是声明了runtime_pollServerInit一个空函数
        serverInit.Do(runtime_pollServerInit)
         上面是epoll_create的初始化
          下面这句则是本fd的open过程
    
         //runtime_pollOpen的跟踪,直接看本文 runtime_pollOpen分支
        ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
        if errno != 0 {
    	return syscall.Errno(errno)
        }
        pd.runtimeCtx = ctx
        return nil
    }
    

runtime_pollServerInit跟踪 这个过程会跟踪到epollcreate的调用

net/fd_poll_runtime.go 文件中声明的 runtime_pollServerInit是个空函数,导致似乎跟踪到此就结束了
然后似乎没非如此,用grep在整个代码搜索runtime_pollServerInit 会有以下结果

./net/fd_poll_runtime.go:18:func runtime_pollServerInit()
./net/fd_poll_runtime.go:34: serverInit.Do(runtime_pollServerInit)
./runtime/netpoll.go:80://go:linkname net_runtime_pollServerInit net.runtime_pollServerInit
./runtime/netpoll.go:81:func net_runtime_pollServerInit() {

我们打开./runtime/netpoll.go找到80行左右有如下代码

  //从这行注释来看 net.runtime_pollServerInit 似乎是有关联的,
  //怀疑 go compile时会对其进行相应的处理,
  //这里暂且就认为 net.runtime_pollServerInit() == runtime.net_runtime_pollServerInit()
//go:linkname net_runtime_pollServerInit net.runtime_pollServerInit
func net_runtime_pollServerInit() {
    netpollinit()
    atomic.Store(&netpollInited, 1)
}

runtime/netpoll_epoll.go

  var (
      epfd int32 = -1 // epoll descriptor
  )

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd >= 0 {
	return
    }
    epfd = epollcreate(1024) // 看到这里就看到了epollcreate的身影了
    if epfd >= 0 {
	closeonexec(epfd)
	return
    }
    println("netpollinit: failed to create epoll descriptor", -epfd)
    throw("netpollinit: failed to create descriptor")
}

epoll_ctrl的跟踪

我们再次回到上文中提到的一块代码pollDesc.init()的过程

  1. net/fd_poll_runtime.go 注意找到文件
    // runtimeNano returns the current value of the runtime clock in nanoseconds.
    func runtimeNano() int64
    
    func runtime_pollServerInit()
    func runtime_pollOpen(fd uintptr) (uintptr, int)
    func runtime_pollClose(ctx uintptr)
    func runtime_pollWait(ctx uintptr, mode int) int
    func runtime_pollWaitCanceled(ctx uintptr, mode int) int
    func runtime_pollReset(ctx uintptr, mode int) int
    func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
    func runtime_pollUnblock(ctx uintptr)
    
    type pollDesc struct {
        runtimeCtx uintptr
    }
    
    var serverInit sync.Once
    
    func (pd *pollDesc) init(fd *netFD) error {
        serverInit.Do(runtime_pollServerInit)
    	上面是epoll_create的初始化
    	下面这句则是本fd的open过程
    	//runtime_pollOpen的跟踪,
        ctx, errno := runtime_pollOpen(uintptr(fd.sysfd)) //重点这一行
        if errno != 0 {
    	return syscall.Errno(errno)
        }
        pd.runtimeCtx = ctx
        return nil
    }
    
  2. runtime/netpoll.go 相同的方式,我们找到了net_runtime_pollOpen这个函数
    //go:linkname net_runtime_pollOpen net.runtime_pollOpen
    func net_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
        pd := pollcache.alloc()
        errno = netpollopen(fd, pd)
        return pd, int(errno)
    }
    
  3. runtime/netpoll_epoll.go
    func netpollopen(fd uintptr, pd *pollDesc) int32 {
        var ev epollevent
        ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
        *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
        //分析到这里,我们找到epollctl的身影
        //再往下分析就分析到各种操作系统使用汇编实现epollctl的过程了,epollctl到此结束
        return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
    
    }
    

epoll_wait 与golang 的异步IO

runtime/netpoll_epoll.go文件中 我们找到了 epollwait的身影
先不管哪些地方会调用到这,先分析下这个函数
文档中有这样一段描述
http://man7.org/linux/man-pages/man2/epoll_wait.2.html

When successful, epoll_wait() returns the number of file descriptors
ready for the requested I/O, or zero if no file descriptor became
ready during the requested timeout milliseconds. When an error
occurs, epoll_wait() returns -1 and errno is set appropriately.

    // polls for ready network connections
    // returns list of goroutines that become runnable
    func netpoll(block bool) *g {
	if epfd == -1 {
	    return nil
	}
	waitms := int32(-1) //-1 表示无限期的block
	if !block {
	    waitms = 0 //马上返回,即使没有任何事件发生
	}
	var events [128]epollevent
   我的理解的(不知道对不对):
  下面这段代码 基本可以确定无论传入参数block 值为何 都在一值在epollwait这里等待
  直到收到内核相应的处理结果才会返回,差别仅仅是block在内核还是block 在这里
    retry:
	n := epollwait(epfd, &events[0], int32(len(events)), waitms)   //
	if n < 0 {
	    if n != -_EINTR {
		# EINTR  The call was interrupted by a signal handler before either (1)
		# any of the requested events occurred or (2) the timeout
		# expired; see signal(7).
		println("runtime: epollwait on fd", epfd, "failed with", -n)
		throw("epollwait failed")
	    }
	    goto retry
	}
	var gp guintptr
	for i := int32(0); i < n; i++ {
	    ev := &events[i]
	    if ev.events == 0 {
		continue
	    }
	    var mode int32
	    if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
		mode += 'r'
	    }
	    if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
		mode += 'w'
	    }
	    if mode != 0 {
		pd := *(**pollDesc)(unsafe.Pointer(&ev.data))

		netpollready(&gp, pd, mode) //内核数据准备就继,可以进行IO了
	    }
	}
	if block && gp == 0 {
	    goto retry
	}
	return gp.ptr()
    }

再往下分析就是 goroutines的调度分析了,
在runtime.proc.go这个文件中可以看到netpoll的身影,goroutines的调度 暂时还没时间看, 先不乱贴代码了,留待以后分析

Comments

comments powered by Disqus