拾遗笔记

go net包里的异步IO实现原理分析

以DialTCP为例 分析初始化 epoll_create的流程分析过程中只保留关键代码

epoll_create 的跟踪

流程之 设置IO非阻塞 syscall.SetNonblock(s, true)设置NonBlock 为true,

DialTCP()->dialTCP()->internetSocket()->socket()
在socket()函数内首先会走到这个分支 sysSocket()->syscall.SetNonblock(s, true); 这里设置NonBlock 为true,即IO是非阻塞的

  1. net/tcpsock.go/DialTCP

    func DialTCP(net string, laddr, raddr *TCPAddr) (*TCPConn, error) {
        c, err := dialTCP(context.Background(), net, laddr, raddr)
    }
    
  2. net/tcpsockopt_posix.go

           func dialTCP(ctx context.Context, net string, laddr, raddr *TCPAddr) (*TCPConn, error) {
    	   if testHookDialTCP != nil {
    	       return testHookDialTCP(ctx, net, laddr, raddr)
    	   }
    	   return doDialTCP(ctx, net, laddr, raddr)
           }
    
           func doDialTCP(ctx context.Context, net string, laddr, raddr *TCPAddr) (*TCPConn, error) {
    	   fd, err := internetSocket(ctx, net, laddr, raddr, syscall.SOCK_STREAM, 0, "dial")
    
    	   return newTCPConn(fd), nil
           }
    func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string) (fd *netFD, err error) {
        family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
        return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr)
    }
    
    
  3. net/sock_posix.go

    // socket returns a network file descriptor that is ready for
    // asynchronous I/O using the network poller.
    func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) {
        s, err := sysSocket(family, sotype, proto)  // 里面设置NonBlock 为true,即IO是非阻塞的
        if err != nil {
    	return nil, err
        }
        //重要的一行,这里创建了 netFD结构体,
        if fd, err = newFD(s, family, sotype, net); err != nil {
    	closeFunc(s)
    	return nil, err
        }
         if err := fd.dial(ctx, laddr, raddr); err != nil { //这里call dial 函数
    	 fd.Close()
    	 return nil, err
         }
    
    
        return fd, nil
    }
    
  4. net/sock_cloexec.go

    func sysSocket(family, sotype, proto int) (int, error) {
        if err = syscall.SetNonblock(s, true); err != nil { //这里设置NonBlock 为true,即IO是非阻塞的
    	closeFunc(s)
    	return -1, os.NewSyscallError("setnonblock", err)
        }
    }
    

epoll_create的流程 即epoll的初始化

接着上面的socket()函数看
然后socket()内的流程 -> fd.dial()->fd.connect()->fd.init()
会看到 流程会走到fd.init()的调用,这里的fd 是一个netFD结构

  1. net/sock_posix.go

    // socket returns a network file descriptor that is ready for
    // asynchronous I/O using the network poller.
    func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr) (fd *netFD, err error) {
        s, err := sysSocket(family, sotype, proto)  // 里面设置NonBlock 为true,即IO是非阻塞的
        if err != nil {
    	return nil, err
        }
    	if err := fd.dial(ctx, laddr, raddr); err != nil { //这里call dial 函数
    	    fd.Close()
    	    return nil, err
    	}
    
    
        return fd, nil
    }
    
    func (fd *netFD) dial(ctx context.Context, laddr, raddr sockaddr) error {
        //这个函数 无论哪个分支 最终都会走到 fd.init()  下面有注释
        if raddr != nil {
    	if rsa, err = raddr.sockaddr(fd.family); err != nil {
    	    return err
    	}
    	if err := fd.connect(ctx, lsa, rsa); err != nil {
    	    return err
    	}
    	fd.isConnected = true
        } else {
    	if err := fd.init(); err != nil {//最终会走到 fd.init()
    	    return err
    	}
        }
        return nil
    }
    func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) (ret error) {
        if err := fd.init(); err != nil { // 最终会call到 fd.init()
    	return err
        }
    
    
  2. net/fd_unix.go 然后看看netFD结构体的init(),就是上面的fd.init()

    // Network file descriptor.
    type netFD struct {
        // locking/lifetime of sysfd + serialize access to Read and Write methods
        fdmu fdMutex
    
        // immutable until Close
        sysfd       int
        family      int
        sotype      int
        isConnected bool
        net         string
        laddr       Addr
        raddr       Addr
    
        // wait server
        pd pollDesc //这个结构体很重要
    }
    func (fd *netFD) init() error {
        //fd.pd 是个pollDesc类型,具体的pd.init()见下面
        if err := fd.pd.init(fd); err != nil {
    	return err
        }
        return nil
    }
    
    
    
  3. net/fd_poll_runtime.go 注意找到文件

    // runtimeNano returns the current value of the runtime clock in nanoseconds.
    func runtimeNano() int64
    
    func runtime_pollServerInit()
    func runtime_pollOpen(fd uintptr) (uintptr, int)
    func runtime_pollClose(ctx uintptr)
    func runtime_pollWait(ctx uintptr, mode int) int
    func runtime_pollWaitCanceled(ctx uintptr, mode int) int
    func runtime_pollReset(ctx uintptr, mode int) int
    func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
    func runtime_pollUnblock(ctx uintptr)
    
    type pollDesc struct {
        runtimeCtx uintptr
    }
    
    var serverInit sync.Once
    
    func (pd *pollDesc) init(fd *netFD) error {
        // 这里的代码开始进入真正的初始化,通过sync.Once来保证runtime_pollServerInit这个函数只会被调用一次
        //文件开头却只是声明了runtime_pollServerInit一个空函数
        serverInit.Do(runtime_pollServerInit)
         上面是epoll_create的初始化
          下面这句则是本fd的open过程
    
         //runtime_pollOpen的跟踪,直接看本文 runtime_pollOpen分支
        ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
        if errno != 0 {
    	return syscall.Errno(errno)
        }
        pd.runtimeCtx = ctx
        return nil
    }
    

runtime_pollServerInit跟踪 这个过程会跟踪到epollcreate的调用

net/fd_poll_runtime.go 文件中声明的 runtime_pollServerInit是个空函数,导致似乎跟踪到此就结束了
然后似乎没非如此,用grep在整个代码搜索runtime_pollServerInit 会有以下结果

./net/fd_poll_runtime.go:18:func runtime_pollServerInit()
./net/fd_poll_runtime.go:34: serverInit.Do(runtime_pollServerInit)
./runtime/netpoll.go:80://go:linkname net_runtime_pollServerInit net.runtime_pollServerInit
./runtime/netpoll.go:81:func net_runtime_pollServerInit() {

我们打开./runtime/netpoll.go找到80行左右有如下代码

  //从这行注释来看 net.runtime_pollServerInit 似乎是有关联的,
  //怀疑 go compile时会对其进行相应的处理,
  //这里暂且就认为 net.runtime_pollServerInit() == runtime.net_runtime_pollServerInit()
//go:linkname net_runtime_pollServerInit net.runtime_pollServerInit
func net_runtime_pollServerInit() {
    netpollinit()
    atomic.Store(&netpollInited, 1)
}

runtime/netpoll_epoll.go

  var (
      epfd int32 = -1 // epoll descriptor
  )

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd >= 0 {
	return
    }
    epfd = epollcreate(1024) // 看到这里就看到了epollcreate的身影了
    if epfd >= 0 {
	closeonexec(epfd)
	return
    }
    println("netpollinit: failed to create epoll descriptor", -epfd)
    throw("netpollinit: failed to create descriptor")
}

epoll_ctrl的跟踪

我们再次回到上文中提到的一块代码pollDesc.init()的过程

  1. net/fd_poll_runtime.go 注意找到文件

    // runtimeNano returns the current value of the runtime clock in nanoseconds.
    func runtimeNano() int64
    
    func runtime_pollServerInit()
    func runtime_pollOpen(fd uintptr) (uintptr, int)
    func runtime_pollClose(ctx uintptr)
    func runtime_pollWait(ctx uintptr, mode int) int
    func runtime_pollWaitCanceled(ctx uintptr, mode int) int
    func runtime_pollReset(ctx uintptr, mode int) int
    func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
    func runtime_pollUnblock(ctx uintptr)
    
    type pollDesc struct {
        runtimeCtx uintptr
    }
    
    var serverInit sync.Once
    
    func (pd *pollDesc) init(fd *netFD) error {
        serverInit.Do(runtime_pollServerInit)
    	上面是epoll_create的初始化
    	下面这句则是本fd的open过程
    	//runtime_pollOpen的跟踪,
        ctx, errno := runtime_pollOpen(uintptr(fd.sysfd)) //重点这一行
        if errno != 0 {
    	return syscall.Errno(errno)
        }
        pd.runtimeCtx = ctx
        return nil
    }
    
  2. runtime/netpoll.go 相同的方式,我们找到了net_runtime_pollOpen这个函数

    //go:linkname net_runtime_pollOpen net.runtime_pollOpen
    func net_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
        pd := pollcache.alloc()
        errno = netpollopen(fd, pd)
        return pd, int(errno)
    }
    
  3. runtime/netpoll_epoll.go

    func netpollopen(fd uintptr, pd *pollDesc) int32 {
        var ev epollevent
        ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
        *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
        //分析到这里,我们找到epollctl的身影
        //再往下分析就分析到各种操作系统使用汇编实现epollctl的过程了,epollctl到此结束
        return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
    
    }
    

epoll_wait 与golang 的异步IO

runtime/netpoll_epoll.go文件中 我们找到了 epollwait的身影
先不管哪些地方会调用到这,先分析下这个函数
文档中有这样一段描述
http://man7.org/linux/man-pages/man2/epoll_wait.2.html

When successful, epoll_wait() returns the number of file descriptors
ready for the requested I/O, or zero if no file descriptor became
ready during the requested timeout milliseconds. When an error
occurs, epoll_wait() returns -1 and errno is set appropriately.

  // polls for ready network connections
  // returns list of goroutines that become runnable
  func netpoll(block bool) *g {
      if epfd == -1 {
	  return nil
      }
      waitms := int32(-1) //-1 表示无限期的block
      if !block {
	  waitms = 0 //马上返回,即使没有任何事件发生
      }
      var events [128]epollevent
 我的理解的(不知道对不对):
下面这段代码 基本可以确定无论传入参数block 值为何 都在一值在epollwait这里等待
直到收到内核相应的处理结果才会返回,差别仅仅是block在内核还是block 在这里
  retry:
      n := epollwait(epfd, &events[0], int32(len(events)), waitms)   //
      if n < 0 {
	  if n != -_EINTR {
	      # EINTR  The call was interrupted by a signal handler before either (1)
	      # any of the requested events occurred or (2) the timeout
	      # expired; see signal(7).
	      println("runtime: epollwait on fd", epfd, "failed with", -n)
	      throw("epollwait failed")
	  }
	  goto retry
      }
      var gp guintptr
      for i := int32(0); i < n; i++ {
	  ev := &events[i]
	  if ev.events == 0 {
	      continue
	  }
	  var mode int32
	  if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
	      mode += 'r'
	  }
	  if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
	      mode += 'w'
	  }
	  if mode != 0 {
	      pd := *(**pollDesc)(unsafe.Pointer(&ev.data))

	      netpollready(&gp, pd, mode) //内核数据准备就继,可以进行IO了
	  }
      }
      if block && gp == 0 {
	  goto retry
      }
      return gp.ptr()
  }

再往下分析就是 goroutines的调度分析了,
在runtime.proc.go这个文件中可以看到netpoll的身影,goroutines的调度 暂时还没时间看, 先不乱贴代码了,留待以后分析

sysmon goroutine调度 与epoll_wait 相关代码分析

从runtime/proc.go 文件中 的入口函数,入手
可以看到它会走到sysmon函数

func main() {
  g := getg()

  // Racectx of m0->g0 is used only as the parent of the main goroutine.
  // It must not be used for anything else.
  g.m.g0.racectx = 0

  // Max stack size is 1 GB on 64-bit, 250 MB on 32-bit.
  // Using decimal instead of binary GB and MB because
  // they look nicer in the stack overflow failure message.
  if sys.PtrSize == 8 {
      maxstacksize = 1000000000
  } else {
      maxstacksize = 250000000
  }

  // Allow newproc to start new Ms.
  mainStarted = true

  systemstack(func() {
    // 重点看这里,newm 函数的意义可以理解为启动一个单独的操作系统线程
    // 运行sysmon函数内的代码
      newm(sysmon, nil)
  })

继续看sysmon 函数相关代码

func sysmon() {
    //可以看到其内部会有一个无限for循环中定期sleep一定的时长
    //也就是说这个线程会每隔一定的时长就会执行一次for里面的代码

    for { 						//
	if idle == 0 { // start with 20us sleep...
	    delay = 20
	} else if idle > 50 { // start doubling the sleep after 1ms...
	    delay *= 2
	}
	if delay > 10*1000 { // up to 10ms
	    delay = 10 * 1000
	}
	usleep(delay)
	  //... 只保留与netpull相关的代码
	 // poll network if not polled for more than 10ms
	 lastpoll := int64(atomic.Load64(&sched.lastpoll))
	 now := nanotime()
	 if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
	// 这里检查netpoll 是否初始化,如果初始化,会调用netpoll函数
	     // netpoll这个函数我们上面分析过,里面会调用epoll_wait 函数来检查io是否就续
	     // 并将就续的goroutine返回,以便尝试对这些goroutine进行调度
	     atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
	     gp := netpoll(false) // non-blocking - returns list of goroutines
	     if gp != nil {
		 // Need to decrement number of idle locked M's
		 // (pretending that one more is running) before injectglist.
		 // Otherwise it can lead to the following situation:
		 // injectglist grabs all P's but before it starts M's to run the P's,
		 // another M returns from syscall, finishes running its G,
		 // observes that there is no work to do and no other running M's
		 // and reports deadlock.
		 incidlelocked(-1)
		 injectglist(gp)
		 incidlelocked(1)
	     }
	 }

Comments

comments powered by Disqus