vitess 代码阅读之Context
Table of Contents
context之.context中.WithValue(ctx,key,value) 数据的存储与传递
vitess 代码中到处充斥着它的身影,初读时颇为不解,
仔细看了下context 的代码 ,大概明白Context 设计的用意。
作一简短的记录
学golang 之前, 有过一年多的erlang 开发经验,erlang 中也存在类似的
轻量级进程,等同于golang 中的goroutine,erlang 中的进程有各自的进程字典
进程字典只可以在本进程中访问,而golang 没有进程字典,要想存储,一般是放到map
中 ,对map 的访问需要加锁,速度自然会慢上一些。vitess 没有采用map 来存储这类信息
而是把数据作为参数在各个函数间传递,此种方式缺点是每
个函数都需要多一个参数用来,传递数据,翻一下vitess 的代码 就会发现好多函数的第一个参数
都是Context ,Context 中存储了各种类型的数据
至于Context 一个参数是如何来存储各种数据的 ,下面简单说明一下
先说一下 如何使用:
使用Context ,需要使用golang 的package 来配合, 基本上一个package 里定义一种Struct
此package 需要存储的信息都在这个Struct里, 然后Context 会存储Struct内的信息
假设有 package user
type User struct {...}
// 定义一种类型key ,注意这里是小写的 ,即在package 外部是无法访问到key
// 这样就可以保证对只能在package user 中使用key类型
// 每个package 中都定义一个相应的key ,
type key int
// Context 中会有各种类型的数据,每种数据有一个key
// 以userKey 为user.User对应的key,userKey 未导出, 即在package user
// 外部无法访问到userKey,对它的访问,使用下面提供的user.NewContext
// 和 user.FromContext 来存储和访问
// 不要直接使用这个变量
var userKey key = 0
// 把u 存储到Context 中, (使用上面定义的userKey变量作为key)
func NewContext(ctx context.Context, u *User) context.Context {
return context.WithValue(ctx, userKey, u)
}
// 从Context 中 取回userKey 对应的user.User
func FromContext(ctx context.Context) (*User, bool) {
u, ok := ctx.Value(userKey).(*User)
return u, ok
}
package main
import (
"golang.org/x/net/context"
)
func main() {
ctx := context.Background() // 最基础的一个Context ,里面什么都没存储
u := user.User{}
ctx = ctx.NewContext(ctx, &u) // 把 *user.User 类型的u 存一ctx 中
// 假如 还有另一个package 叫, group
g := group.Group{}
ctx = ctx.NewContext(ctx, &g) // 把 *group.Group 类型的g 存一ctx 中
// 现在 ctx 中存了user 和group 的数据,
// ctx 会做为参数在 各个函数中进行传递
// 例如在某个函数用, 你需要访问里面的user
// 通过下面这种方式来获得
u, ok := user.FromContext(ctx)
g, ok := group.Fromcontext(ctx)
// 注意到存到context 中的数据基本都是指针类型, 即需要修改数据时,只需要修改即可
// 不需要把指针重新存到context 中,似乎 若不存指针而是存struct结构的话, 反复的NewContext
// 会导致context 中的数据无限增大
}
context中 如何存储数据的,看下代码, 其实很简单
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key interface{}, val interface{}) Context {
return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
context 之 WithCancel 与 Done()
提供一个如何使用 WithCancel() 与Done()的示例
当某个goroutine A 等待别的goroutine B返回结果给他的时候
C 调cancel() 可以让A 不再继续等待,继续往下走
当然 cancel() 并不影响B 继续执行,只是给了b 提前退出的机会
下面的示例 访问 http://127.0.0.1:8080
后,等2秒钟会看到 cancel()被调用后的效果
package main
import (
"fmt"
"net/http"
"time"
"golang.org/x/net/context"
)
func doALongJob(r chan string) {
time.Sleep(time.Second * 5)
fmt.Println("job is done")
r <- "经历了漫长的过程,用户等到了想要的数据"
fmt.Println("doALongJob is end")
}
func handleTest(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
ctx := context.Background()
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
go func() {
// 我 (别的进程 ) 可以让本进程取消等待数据的返回
time.Sleep(2 * time.Second)
fmt.Println("等不及了, 不再等待")
// 如果此处没有调用cancel() ,浏览器上会一直等到 想过的结果
cancel()
}()
// 这里缓冲1个, 即 ,就算我放弃从resultChan 读数据,doALongJob处依然可以往里面写入一个数据,从而正常退出
var reusultChan chan string = make(chan string, 1)
go doALongJob(reusultChan)
select {
case <-ctx.Done():
fmt.Println("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:", ctx.Err())
w.Write([]byte("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:" + ctx.Err().Error()))
case result := <-reusultChan:
w.Write([]byte(result))
fmt.Println(result)
}
}
func main() {
http.HandleFunc("/", handleTest)
http.ListenAndServe(":8080", nil)
}
context 之WithCancel 使用注意事项,
假设有如下代码
parentCtx, parentCancel := WithCancel(context.Background())
childCtx, childCancel := WithCancel(parentCtx)
var reusultChan chan string = make(chan string, 1)
go doALongJob(reusultChan)
go func() {
time.Sleep(1 * time.Second)//只等一秒,一秒后 doALongJob 还不返回结果
//注意这里是对parentCancel()的调用,按照context设计的目的,
// 兼听childCtx.Done()的chan 也是可以收到parentCancel()被调用的事件的
parentCancel()
}()
select {
case <-childCtx.Done(): // 等待 childCtx 收到cancel事件
fmt.Println("canceled")
case result := <-reusultChan:
}
结论:当parentCtx的cancel() 被调用的时候,<-child.Done()是能收到事件的
并且,如果parentCtx的cancel()已经被调用,以之为父创建的childctx
调用 <-childctx.Done() 会立即返回,使用的时候一定要注意这一点
WithTimeout应该也会类似的问题。
所以 一个已经触发了cancel()事件的ctx,不再适合一值传递下去
当需要WithCancel()或WithTimeout时,应该从context.Background()来创建,
它这样设计的目的,应该是允许一个ctx可以兼听多个cancel事件, 但凡收到一个事件便不再等待
以下代码是把context中 cancel相关的代码抽取出来,并加了一些注释,以便了解调用流程
package main
import (
"context"
"fmt"
"net/http"
"sync"
"time"
)
// DeadlineExceeded is the error returned by context.Context.Err when the context's
// deadline passes.
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this context.Context complete.
func WithCancel(parent context.Context) (ctx context.Context, cancel context.CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, context.Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent context.Context) cancelCtx {
return cancelCtx{Context: parent}
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent context.Context, child canceler) {
if parent.Done() == nil {
fmt.Println("parent is never canceled")
return // parent is never canceled
}
fmt.Println("checkparent")
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
fmt.Println("checkparent parent has already been canceled")
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
fmt.Println("checkparent put child into parent.childrens")
}
p.mu.Unlock()
} else {
fmt.Println("checkparent do not understand here ")
go func() {
select {
case <-parent.Done():
fmt.Println("candgo")
child.cancel(false, parent.Err())
case <-child.Done():
fmt.Println("done")
}
}()
}
}
// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent context.Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
default:
return nil, false
}
}
}
// removeChild removes a context from its parent.
func removeChild(parent context.Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
// closedchan is a reusable closed channel.
var closedchan = make(chan struct{})
func init() {
close(closedchan)
}
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
context.Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
func (c *cancelCtx) String() string {
return fmt.Sprintf("%v.WithCancel", c.Context)
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
fmt.Println("cancel() is called removeFromParent:", removeFromParent)
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
fmt.Println("already canceled")
return // already canceled
}
c.err = err
if c.done == nil {
fmt.Println("nobody care whether the cancelCtx is done,so do not need call close chan to notify")
c.done = closedchan
} else {
fmt.Println("closeddone")
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
func doALongJob(r chan string) {
time.Sleep(time.Second * 5)
fmt.Println("job is done")
r <- "经历了漫长的过程,用户等到了想要的数据"
fmt.Println("doALongJob is end")
}
func handleTest(w http.ResponseWriter, req *http.Request) {
start := time.Now()
defer req.Body.Close()
ctx, cancel := WithCancel(context.Background())
fmt.Println("trying create child cancelctx")
ctx, cancel2 := WithCancel(ctx)
go func() {
// 我 (别的进程 ) 可以让本进程取消等待数据的返回
time.Sleep(1 * time.Second)
fmt.Println("parent.ancelctx() 不再等待")
fmt.Println("trying call parent.cancel()")
cancel()
}()
go func() {
// 我 (别的进程 ) 可以让本进程取消等待数据的返回
time.Sleep(2 * time.Second)
fmt.Println("child.cancel()is called,donot want to wait")
// 如果此处没有调用cancel() ,浏览器上会一直等到 想过的结果
cancel2()
}()
// 这里缓冲1个, 即 ,就算我放弃从resultChan 读数据,doALongJob处依然可以往里面写入一个数据,从而正常退出
var reusultChan chan string = make(chan string, 1)
go doALongJob(reusultChan)
select {
case <-ctx.Done():
fmt.Println("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:", ctx.Err())
w.Write([]byte("after seconds,got the result:" + time.Since(start).String()))
w.Write([]byte("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:" + ctx.Err().Error()))
case result := <-reusultChan:
w.Write([]byte("after seconds,got the result:" + time.Since(start).String()))
w.Write([]byte(result))
fmt.Println(result)
}
}
func main() {
http.HandleFunc("/", handleTest)
http.ListenAndServe(":8080", nil)
}
withTimeout
package main
import (
"fmt"
"net/http"
"time"
"context"
"math/rand"
)
func doALongJob(ctx context.Context, r chan string) {
time.Sleep(time.Second + time.Second*time.Duration(rand.Intn(5))) // 模拟做一个很费时的操作
fmt.Println("job is done")
select {
case <-ctx.Done(): // job done 后,发现已经超时,不必再将结果仍给r
close(r)
fmt.Println("doALongJob found that it already timeout")
case r <- "经历了漫长的过程,用户等到了想要的数据":
// 将结果扔给r,等对方接收
}
fmt.Println("doALongJob is end")
}
func handleTest(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
ctx := context.Background()
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second*3)
defer cancel()
// 这里缓冲1个, 即 ,就算我放弃从resultChan 读数据,doALongJob处依然可以往里面写入一个数据,从而正常退出
var reusultChan chan string = make(chan string)
go doALongJob(ctx, reusultChan)
select {
case <-ctx.Done(): // 超时
fmt.Println("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:", ctx.Err())
w.Write([]byte("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:" + ctx.Err().Error()))
case result := <-reusultChan:
w.Write([]byte(result))
fmt.Println(result)
}
}
func main() {
http.HandleFunc("/", handleTest)
http.ListenAndServe(":8080", nil)
}