拾遗笔记

vitess 代码阅读之Context

context之.context中.WithValue(ctx,key,value) 数据的存储与传递

vitess 代码中到处充斥着它的身影,初读时颇为不解,
仔细看了下context 的代码 ,大概明白Context 设计的用意。
作一简短的记录
学golang 之前, 有过一年多的erlang 开发经验,erlang 中也存在类似的
轻量级进程,等同于golang 中的goroutine,erlang 中的进程有各自的进程字典
进程字典只可以在本进程中访问,而golang 没有进程字典,要想存储,一般是放到map
中 ,对map 的访问需要加锁,速度自然会慢上一些。vitess 没有采用map 来存储这类信息
而是把数据作为参数在各个函数间传递,此种方式缺点是每
个函数都需要多一个参数用来,传递数据,翻一下vitess 的代码 就会发现好多函数的第一个参数
都是Context ,Context 中存储了各种类型的数据
至于Context 一个参数是如何来存储各种数据的 ,下面简单说明一下
先说一下 如何使用:

使用Context ,需要使用golang 的package 来配合, 基本上一个package 里定义一种Struct
此package 需要存储的信息都在这个Struct里, 然后Context 会存储Struct内的信息

假设有 package user

type User struct {...}

// 定义一种类型key ,注意这里是小写的 ,即在package 外部是无法访问到key
// 这样就可以保证对只能在package user 中使用key类型
// 每个package 中都定义一个相应的key ,
type key int

// Context 中会有各种类型的数据,每种数据有一个key
// 以userKey 为user.User对应的key,userKey 未导出, 即在package user
// 外部无法访问到userKey,对它的访问,使用下面提供的user.NewContext
// 和 user.FromContext 来存储和访问
// 不要直接使用这个变量
var userKey key = 0

// 把u 存储到Context 中, (使用上面定义的userKey变量作为key)
func NewContext(ctx context.Context, u *User) context.Context {
    return context.WithValue(ctx, userKey, u)
}

// 从Context 中 取回userKey 对应的user.User
func FromContext(ctx context.Context) (*User, bool) {
    u, ok := ctx.Value(userKey).(*User)
    return u, ok
}
package main

import (

    "golang.org/x/net/context"
)

func main() {
    ctx := context.Background() // 最基础的一个Context ,里面什么都没存储
    u := user.User{}
    ctx = ctx.NewContext(ctx, &u) // 把 *user.User 类型的u  存一ctx 中
    // 假如 还有另一个package 叫, group

    g := group.Group{}
    ctx = ctx.NewContext(ctx, &g) // 把 *group.Group 类型的g  存一ctx 中
    // 现在 ctx 中存了user 和group 的数据,
    // ctx 会做为参数在 各个函数中进行传递
    // 例如在某个函数用, 你需要访问里面的user
    // 通过下面这种方式来获得
    u, ok := user.FromContext(ctx)
    g, ok := group.Fromcontext(ctx)
    // 注意到存到context 中的数据基本都是指针类型, 即需要修改数据时,只需要修改即可
    // 不需要把指针重新存到context 中,似乎 若不存指针而是存struct结构的话, 反复的NewContext
    // 会导致context 中的数据无限增大
}

context中 如何存储数据的,看下代码, 其实很简单

// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key interface{}, val interface{}) Context {
    return &valueCtx{parent, key, val}
}

// A valueCtx carries a key-value pair.  It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
    Context
    key, val interface{}
}

func (c *valueCtx) String() string {
    return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
    if c.key == key {
	return c.val
    }
    return c.Context.Value(key)
}

context 之 WithCancel 与 Done()

提供一个如何使用 WithCancel() 与Done()的示例
当某个goroutine A 等待别的goroutine B返回结果给他的时候
C 调cancel() 可以让A 不再继续等待,继续往下走
当然 cancel() 并不影响B 继续执行,只是给了b 提前退出的机会
下面的示例 访问 http://127.0.0.1:8080
后,等2秒钟会看到 cancel()被调用后的效果

package main

import (
    "fmt"
    "net/http"
    "time"

    "golang.org/x/net/context"
)

func doALongJob(r chan string) {
    time.Sleep(time.Second * 5)
    fmt.Println("job is done")
    r <- "经历了漫长的过程,用户等到了想要的数据"
    fmt.Println("doALongJob is end")
}
func handleTest(w http.ResponseWriter, req *http.Request) {
    defer req.Body.Close()
    ctx := context.Background()
    var cancel context.CancelFunc
    ctx, cancel = context.WithCancel(ctx)
    defer cancel()
    go func() {
	// 我 (别的进程 ) 可以让本进程取消等待数据的返回
	time.Sleep(2 * time.Second)
	fmt.Println("等不及了, 不再等待")
	// 如果此处没有调用cancel() ,浏览器上会一直等到 想过的结果
	cancel()
    }()

    // 这里缓冲1个, 即 ,就算我放弃从resultChan 读数据,doALongJob处依然可以往里面写入一个数据,从而正常退出
    var reusultChan chan string = make(chan string, 1)
    go doALongJob(reusultChan)
    select {
    case <-ctx.Done():
	fmt.Println("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:", ctx.Err())
	w.Write([]byte("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:" + ctx.Err().Error()))
    case result := <-reusultChan:
	w.Write([]byte(result))
	fmt.Println(result)
    }

}
func main() {
    http.HandleFunc("/", handleTest)
    http.ListenAndServe(":8080", nil)
}

context 之WithCancel 使用注意事项,

假设有如下代码

parentCtx, parentCancel := WithCancel(context.Background())
childCtx, childCancel := WithCancel(parentCtx)
var reusultChan chan string = make(chan string, 1)
go doALongJob(reusultChan)
go func() {
    time.Sleep(1 * time.Second)//只等一秒,一秒后 doALongJob 还不返回结果
    //注意这里是对parentCancel()的调用,按照context设计的目的,
    // 兼听childCtx.Done()的chan 也是可以收到parentCancel()被调用的事件的
    parentCancel()
}()
select {
case <-childCtx.Done():				// 等待 childCtx 收到cancel事件
     fmt.Println("canceled")
case result := <-reusultChan:
}

结论:当parentCtx的cancel() 被调用的时候,<-child.Done()是能收到事件的
并且,如果parentCtx的cancel()已经被调用,以之为父创建的childctx
调用 <-childctx.Done() 会立即返回,使用的时候一定要注意这一点
WithTimeout应该也会类似的问题。
所以 一个已经触发了cancel()事件的ctx,不再适合一值传递下去
当需要WithCancel()或WithTimeout时,应该从context.Background()来创建,
它这样设计的目的,应该是允许一个ctx可以兼听多个cancel事件, 但凡收到一个事件便不再等待

以下代码是把context中 cancel相关的代码抽取出来,并加了一些注释,以便了解调用流程

package main

import (
	"context"
	"fmt"
	"net/http"
	"sync"
	"time"
)

// DeadlineExceeded is the error returned by context.Context.Err when the context's
// deadline passes.

// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this context.Context complete.
func WithCancel(parent context.Context) (ctx context.Context, cancel context.CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, context.Canceled) }
}

// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent context.Context) cancelCtx {
	return cancelCtx{Context: parent}
}

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent context.Context, child canceler) {
	if parent.Done() == nil {
		fmt.Println("parent is never canceled")
		return // parent is never canceled
	}
	fmt.Println("checkparent")
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent has already been canceled
			fmt.Println("checkparent parent has already been canceled")
			child.cancel(false, p.err)
		} else {
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
			fmt.Println("checkparent put child into parent.childrens")
		}
		p.mu.Unlock()
	} else {
		fmt.Println("checkparent  do not understand here ")
		go func() {
			select {
			case <-parent.Done():
				fmt.Println("candgo")
				child.cancel(false, parent.Err())
			case <-child.Done():
				fmt.Println("done")
			}
		}()
	}
}

// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent context.Context) (*cancelCtx, bool) {
	for {
		switch c := parent.(type) {
		case *cancelCtx:
			return c, true
		default:
			return nil, false
		}
	}
}

// removeChild removes a context from its parent.
func removeChild(parent context.Context, child canceler) {
	p, ok := parentCancelCtx(parent)
	if !ok {
		return
	}
	p.mu.Lock()
	if p.children != nil {
		delete(p.children, child)
	}
	p.mu.Unlock()
}

// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
	cancel(removeFromParent bool, err error)
	Done() <-chan struct{}
}

// closedchan is a reusable closed channel.
var closedchan = make(chan struct{})

func init() {
	close(closedchan)
}

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
	context.Context

	mu       sync.Mutex            // protects following fields
	done     chan struct{}         // created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

func (c *cancelCtx) Err() error {
	c.mu.Lock()
	err := c.err
	c.mu.Unlock()
	return err
}

func (c *cancelCtx) String() string {
	return fmt.Sprintf("%v.WithCancel", c.Context)
}

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	fmt.Println("cancel() is called removeFromParent:", removeFromParent)
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		fmt.Println("already canceled")
		return // already canceled
	}
	c.err = err
	if c.done == nil {
		fmt.Println("nobody care whether the cancelCtx is done,so do not need call close chan to notify")
		c.done = closedchan
	} else {
		fmt.Println("closeddone")
		close(c.done)
	}
	for child := range c.children {
		// NOTE: acquiring the child's lock while holding parent's lock.
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
		removeChild(c.Context, c)
	}
}

func doALongJob(r chan string) {
	time.Sleep(time.Second * 5)
	fmt.Println("job is done")
	r <- "经历了漫长的过程,用户等到了想要的数据"
	fmt.Println("doALongJob is end")
}
func handleTest(w http.ResponseWriter, req *http.Request) {
	start := time.Now()
	defer req.Body.Close()
	ctx, cancel := WithCancel(context.Background())
	fmt.Println("trying create child cancelctx")
	ctx, cancel2 := WithCancel(ctx)
	go func() {
		// 我 (别的进程 ) 可以让本进程取消等待数据的返回
		time.Sleep(1 * time.Second)
		fmt.Println("parent.ancelctx() 不再等待")
		fmt.Println("trying call parent.cancel()")
		cancel()
	}()
	go func() {
		// 我 (别的进程 ) 可以让本进程取消等待数据的返回
		time.Sleep(2 * time.Second)
		fmt.Println("child.cancel()is called,donot want to wait")
		// 如果此处没有调用cancel() ,浏览器上会一直等到 想过的结果
		cancel2()
	}()

	// 这里缓冲1个, 即 ,就算我放弃从resultChan 读数据,doALongJob处依然可以往里面写入一个数据,从而正常退出
	var reusultChan chan string = make(chan string, 1)
	go doALongJob(reusultChan)
	select {
	case <-ctx.Done():
		fmt.Println("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:", ctx.Err())
		w.Write([]byte("after seconds,got the result:" + time.Since(start).String()))
		w.Write([]byte("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:" + ctx.Err().Error()))
	case result := <-reusultChan:
		w.Write([]byte("after seconds,got the result:" + time.Since(start).String()))
		w.Write([]byte(result))
		fmt.Println(result)
	}

}
func main() {
	http.HandleFunc("/", handleTest)
	http.ListenAndServe(":8080", nil)
}

withTimeout

package main

import (
    "fmt"
    "net/http"
    "time"

    "context"
    "math/rand"
)

func doALongJob(ctx context.Context, r chan string) {
    time.Sleep(time.Second + time.Second*time.Duration(rand.Intn(5))) // 模拟做一个很费时的操作
    fmt.Println("job is done")
    select {
    case <-ctx.Done(): // job done 后,发现已经超时,不必再将结果仍给r
	close(r)
	fmt.Println("doALongJob found that it already timeout")
    case r <- "经历了漫长的过程,用户等到了想要的数据":
	// 将结果扔给r,等对方接收
    }
    fmt.Println("doALongJob is end")

}

func handleTest(w http.ResponseWriter, req *http.Request) {
    defer req.Body.Close()
    ctx := context.Background()
    var cancel context.CancelFunc
    ctx, cancel = context.WithTimeout(ctx, time.Second*3)
    defer cancel()

    // 这里缓冲1个, 即 ,就算我放弃从resultChan 读数据,doALongJob处依然可以往里面写入一个数据,从而正常退出
    var reusultChan chan string = make(chan string)
    go doALongJob(ctx, reusultChan)
    select {
    case <-ctx.Done(): // 超时
	fmt.Println("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:", ctx.Err())
	w.Write([]byte("走到ctx.Done(),分支,说明用户不愿意继续等待了 with err:" + ctx.Err().Error()))
    case result := <-reusultChan:
	w.Write([]byte(result))
	fmt.Println(result)
    }

}

func main() {
    http.HandleFunc("/", handleTest)
    http.ListenAndServe(":8080", nil)
}

Comments

comments powered by Disqus