vitess 代码阅读之Context
Table of Contents
context之.context中.WithValue(ctx,key,value) 数据的存储与传递
vitess 代码中到处充斥着它的身影,初读时颇为不解,
仔细看了下context 的代码 ,大概明白Context 设计的用意。
作一简短的记录
学golang 之前, 有过一年多的erlang 开发经验,erlang 中也存在类似的
轻量级进程,等同于golang 中的goroutine,erlang 中的进程有各自的进程字典
进程字典只可以在本进程中访问,而golang 没有进程字典,要想存储,一般是放到map
中 ,对map 的访问需要加锁,速度自然会慢上一些。vitess 没有采用map 来存储这类信息
而是把数据作为参数在各个函数间传递,此种方式缺点是每
个函数都需要多一个参数用来,传递数据,翻一下vitess 的代码 就会发现好多函数的第一个参数
都是Context ,Context 中存储了各种类型的数据
至于Context 一个参数是如何来存储各种数据的 ,下面简单说明一下
先说一下 如何使用:
使用Context ,需要使用golang 的package 来配合, 基本上一个package 里定义一种Struct
此package 需要存储的信息都在这个Struct里, 然后Context 会存储Struct内的信息
假设有 package user
type User struct {...} // 定义一种类型key ,注意这里是小写的 ,即在package 外部是无法访问到key // 这样就可以保证对只能在package user 中使用key类型 // 每个package 中都定义一个相应的key , type key int // Context 中会有各种类型的数据,每种数据有一个key // 以userKey 为user.User对应的key,userKey 未导出, 即在package user // 外部无法访问到userKey,对它的访问,使用下面提供的user.NewContext // 和 user.FromContext 来存储和访问 // 不要直接使用这个变量 var userKey key = 0 // 把u 存储到Context 中, (使用上面定义的userKey变量作为key) func NewContext(ctx context.Context, u *User) context.Context { return context.WithValue(ctx, userKey, u) } // 从Context 中 取回userKey 对应的user.User func FromContext(ctx context.Context) (*User, bool) { u, ok := ctx.Value(userKey).(*User) return u, ok }
package main import ( "golang.org/x/net/context" ) func main() { ctx := context.Background() // 最基础的一个Context ,里面什么都没存储 u := user.User{} ctx = ctx.NewContext(ctx, &u) // 把 *user.User 类型的u 存一ctx 中 // 假如 还有另一个package 叫, group g := group.Group{} ctx = ctx.NewContext(ctx, &g) // 把 *group.Group 类型的g 存一ctx 中 // 现在 ctx 中存了user 和group 的数据, // ctx 会做为参数在 各个函数中进行传递 // 例如在某个函数用, 你需要访问里面的user // 通过下面这种方式来获得 u, ok := user.FromContext(ctx) g, ok := group.Fromcontext(ctx) // 注意到存到context 中的数据基本都是指针类型, 即需要修改数据时,只需要修改即可 // 不需要把指针重新存到context 中,似乎 若不存指针而是存struct结构的话, 反复的NewContext // 会导致context 中的数据无限增大 }
context中 如何存储数据的,看下代码, 其实很简单
// Use context Values only for request-scoped data that transits processes and // APIs, not for passing optional parameters to functions. func WithValue(parent Context, key interface{}, val interface{}) Context { return &valueCtx{parent, key, val} } // A valueCtx carries a key-value pair. It implements Value for that key and // delegates all other calls to the embedded Context. type valueCtx struct { Context key, val interface{} } func (c *valueCtx) String() string { return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val) } func (c *valueCtx) Value(key interface{}) interface{} { if c.key == key { return c.val } return c.Context.Value(key) }
context 之 WithCancel 与 Done()
提供一个如何使用 WithCancel() 与Done()的示例
当某个goroutine A 等待别的goroutine B返回结果给他的时候
C 调cancel() 可以让A 不再继续等待,继续往下走
当然 cancel() 并不影响B 继续执行,只是给了b 提前退出的机会
下面的示例 访问 http://127.0.0.1:8080
后,等2秒钟会看到 cancel()被调用后的效果
package main import ( "fmt" "net/http" "time" "golang.org/x/net/context" ) func doALongJob(r chan string) { time.Sleep(time.Second * 5) fmt.Println("job is done") r <- "经历了漫长的过程,用户等到了想要的数据" fmt.Println("doALongJob is end") } func handleTest(w http.ResponseWriter, req *http.Request) { defer req.Body.Close() ctx := context.Background() var cancel context.CancelFunc ctx, cancel = context.WithCancel(ctx) defer cancel() go func() { // 我 (别的进程 ) 可以让本进程取消等待数据的返回 time.Sleep(2 * time.Second) fmt.Println("等不及了, 不再等待") // 如果此处没有调用cancel() ,浏览器上会一直等到 想过的结果 cancel() }() // 这里缓冲1个, 即 ,就算我放弃从resultChan 读数据,doALongJob处依然可以往里面写入一个数据,从而正常退出 var reusultChan chan string = make(chan string, 1) go doALongJob(reusultChan) select { case <-ctx.Done(): fmt.Println("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:", ctx.Err()) w.Write([]byte("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:" + ctx.Err().Error())) case result := <-reusultChan: w.Write([]byte(result)) fmt.Println(result) } } func main() { http.HandleFunc("/", handleTest) http.ListenAndServe(":8080", nil) }
context 之WithCancel 使用注意事项,
假设有如下代码
parentCtx, parentCancel := WithCancel(context.Background()) childCtx, childCancel := WithCancel(parentCtx) var reusultChan chan string = make(chan string, 1) go doALongJob(reusultChan) go func() { time.Sleep(1 * time.Second)//只等一秒,一秒后 doALongJob 还不返回结果 //注意这里是对parentCancel()的调用,按照context设计的目的, // 兼听childCtx.Done()的chan 也是可以收到parentCancel()被调用的事件的 parentCancel() }() select { case <-childCtx.Done(): // 等待 childCtx 收到cancel事件 fmt.Println("canceled") case result := <-reusultChan: }
结论:当parentCtx的cancel() 被调用的时候,<-child.Done()是能收到事件的
并且,如果parentCtx的cancel()已经被调用,以之为父创建的childctx
调用 <-childctx.Done() 会立即返回,使用的时候一定要注意这一点
WithTimeout应该也会类似的问题。
所以 一个已经触发了cancel()事件的ctx,不再适合一值传递下去
当需要WithCancel()或WithTimeout时,应该从context.Background()来创建,
它这样设计的目的,应该是允许一个ctx可以兼听多个cancel事件, 但凡收到一个事件便不再等待
以下代码是把context中 cancel相关的代码抽取出来,并加了一些注释,以便了解调用流程
package main import ( "context" "fmt" "net/http" "sync" "time" ) // DeadlineExceeded is the error returned by context.Context.Err when the context's // deadline passes. // WithCancel returns a copy of parent with a new Done channel. The returned // context's Done channel is closed when the returned cancel function is called // or when the parent context's Done channel is closed, whichever happens first. // // Canceling this context releases resources associated with it, so code should // call cancel as soon as the operations running in this context.Context complete. func WithCancel(parent context.Context) (ctx context.Context, cancel context.CancelFunc) { c := newCancelCtx(parent) propagateCancel(parent, &c) return &c, func() { c.cancel(true, context.Canceled) } } // newCancelCtx returns an initialized cancelCtx. func newCancelCtx(parent context.Context) cancelCtx { return cancelCtx{Context: parent} } // propagateCancel arranges for child to be canceled when parent is. func propagateCancel(parent context.Context, child canceler) { if parent.Done() == nil { fmt.Println("parent is never canceled") return // parent is never canceled } fmt.Println("checkparent") if p, ok := parentCancelCtx(parent); ok { p.mu.Lock() if p.err != nil { // parent has already been canceled fmt.Println("checkparent parent has already been canceled") child.cancel(false, p.err) } else { if p.children == nil { p.children = make(map[canceler]struct{}) } p.children[child] = struct{}{} fmt.Println("checkparent put child into parent.childrens") } p.mu.Unlock() } else { fmt.Println("checkparent do not understand here ") go func() { select { case <-parent.Done(): fmt.Println("candgo") child.cancel(false, parent.Err()) case <-child.Done(): fmt.Println("done") } }() } } // parentCancelCtx follows a chain of parent references until it finds a // *cancelCtx. This function understands how each of the concrete types in this // package represents its parent. func parentCancelCtx(parent context.Context) (*cancelCtx, bool) { for { switch c := parent.(type) { case *cancelCtx: return c, true default: return nil, false } } } // removeChild removes a context from its parent. func removeChild(parent context.Context, child canceler) { p, ok := parentCancelCtx(parent) if !ok { return } p.mu.Lock() if p.children != nil { delete(p.children, child) } p.mu.Unlock() } // A canceler is a context type that can be canceled directly. The // implementations are *cancelCtx and *timerCtx. type canceler interface { cancel(removeFromParent bool, err error) Done() <-chan struct{} } // closedchan is a reusable closed channel. var closedchan = make(chan struct{}) func init() { close(closedchan) } // A cancelCtx can be canceled. When canceled, it also cancels any children // that implement canceler. type cancelCtx struct { context.Context mu sync.Mutex // protects following fields done chan struct{} // created lazily, closed by first cancel call children map[canceler]struct{} // set to nil by the first cancel call err error // set to non-nil by the first cancel call } func (c *cancelCtx) Done() <-chan struct{} { c.mu.Lock() if c.done == nil { c.done = make(chan struct{}) } d := c.done c.mu.Unlock() return d } func (c *cancelCtx) Err() error { c.mu.Lock() err := c.err c.mu.Unlock() return err } func (c *cancelCtx) String() string { return fmt.Sprintf("%v.WithCancel", c.Context) } // cancel closes c.done, cancels each of c's children, and, if // removeFromParent is true, removes c from its parent's children. func (c *cancelCtx) cancel(removeFromParent bool, err error) { fmt.Println("cancel() is called removeFromParent:", removeFromParent) if err == nil { panic("context: internal error: missing cancel error") } c.mu.Lock() if c.err != nil { c.mu.Unlock() fmt.Println("already canceled") return // already canceled } c.err = err if c.done == nil { fmt.Println("nobody care whether the cancelCtx is done,so do not need call close chan to notify") c.done = closedchan } else { fmt.Println("closeddone") close(c.done) } for child := range c.children { // NOTE: acquiring the child's lock while holding parent's lock. child.cancel(false, err) } c.children = nil c.mu.Unlock() if removeFromParent { removeChild(c.Context, c) } } func doALongJob(r chan string) { time.Sleep(time.Second * 5) fmt.Println("job is done") r <- "经历了漫长的过程,用户等到了想要的数据" fmt.Println("doALongJob is end") } func handleTest(w http.ResponseWriter, req *http.Request) { start := time.Now() defer req.Body.Close() ctx, cancel := WithCancel(context.Background()) fmt.Println("trying create child cancelctx") ctx, cancel2 := WithCancel(ctx) go func() { // 我 (别的进程 ) 可以让本进程取消等待数据的返回 time.Sleep(1 * time.Second) fmt.Println("parent.ancelctx() 不再等待") fmt.Println("trying call parent.cancel()") cancel() }() go func() { // 我 (别的进程 ) 可以让本进程取消等待数据的返回 time.Sleep(2 * time.Second) fmt.Println("child.cancel()is called,donot want to wait") // 如果此处没有调用cancel() ,浏览器上会一直等到 想过的结果 cancel2() }() // 这里缓冲1个, 即 ,就算我放弃从resultChan 读数据,doALongJob处依然可以往里面写入一个数据,从而正常退出 var reusultChan chan string = make(chan string, 1) go doALongJob(reusultChan) select { case <-ctx.Done(): fmt.Println("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:", ctx.Err()) w.Write([]byte("after seconds,got the result:" + time.Since(start).String())) w.Write([]byte("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:" + ctx.Err().Error())) case result := <-reusultChan: w.Write([]byte("after seconds,got the result:" + time.Since(start).String())) w.Write([]byte(result)) fmt.Println(result) } } func main() { http.HandleFunc("/", handleTest) http.ListenAndServe(":8080", nil) }
withTimeout
package main import ( "fmt" "net/http" "time" "context" "math/rand" ) func doALongJob(ctx context.Context, r chan string) { time.Sleep(time.Second + time.Second*time.Duration(rand.Intn(5))) // 模拟做一个很费时的操作 fmt.Println("job is done") select { case <-ctx.Done(): // job done 后,发现已经超时,不必再将结果仍给r close(r) fmt.Println("doALongJob found that it already timeout") case r <- "经历了漫长的过程,用户等到了想要的数据": // 将结果扔给r,等对方接收 } fmt.Println("doALongJob is end") } func handleTest(w http.ResponseWriter, req *http.Request) { defer req.Body.Close() ctx := context.Background() var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, time.Second*3) defer cancel() // 这里缓冲1个, 即 ,就算我放弃从resultChan 读数据,doALongJob处依然可以往里面写入一个数据,从而正常退出 var reusultChan chan string = make(chan string) go doALongJob(ctx, reusultChan) select { case <-ctx.Done(): // 超时 fmt.Println("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:", ctx.Err()) w.Write([]byte("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:" + ctx.Err().Error())) case result := <-reusultChan: w.Write([]byte(result)) fmt.Println(result) } } func main() { http.HandleFunc("/", handleTest) http.ListenAndServe(":8080", nil) }