go | go 技术论坛-大发黄金版app下载
在服务端开发中我们经常会在一个服务中发起请求调用其他服务。很多服务主要逻辑就是根据产品逻辑调用各个rpc请求,再把各个请求的结果组合在一起返回。业内把专注于开发这类服务的开发者称为api boy。这类服务的特点是io密集,耗时主要是rpc请求。所以优化这类服务的耗时就是优化rpc请求的串并行关系。那么怎样的串并行关系才是最优的呢?其实很简单,只要做到下面两点就可以了。
- 所有逻辑都在依赖数据准备好的第一时间开始跑,每个rpc请求都在请求数据准备好的第一时间发起。
- 相同的数据只请求一次。
于是我们就把一个全局的问题分解成各个局部的问题,就可以分而治之,逐个击破。本文主要讨论如何在go语言开发的服务中管理rpc调用,实现上述两点。
下面我们先讨论几种情况。
func func1() {
resp := rpc()
handle1(resp)
}
func func2() {
resp := rpc()
handle2(resp)
}
func case1() {
go func1()
go func2()
}
func1与func2分别调用同一个rpc获得同一个结果,然后分别对结果做处理。这里func1与func2应该合并调用一次rpc,减少下游的压力。一个问题是这里并不确定func1与func2谁会先发起rpc,解决方法是使用sync.once。
func func1(resp *resp) {
resp = rpc()
}
func func2(resp *resp) {
handle(resp)
}
func case2() {
var resp resp
go func1(&resp)
go func2(&resp)
}
这里func2在handle之前必须保证func1已经生产出resp。在go,这一般用channel来实现。这里不能用sync.once是因为func2没有生产能力,func2调用sync.once会导致rpc调用不了。
我想做一个通用的包裹来包住resp,做一个通用的数据获取接口获取resp,使其能囊括上面两种情况。于是我混合了sync.once与channel,写出了下面的代码。核心是提供了initandget接口来生产数据,get接口来获取数据。
type datagetter[t any] struct {
data t
dataonce sync.once
selfonce sync.once
isready chan struct{}
}
func (d *datagetter[t]) initandget(initfunc func(*t)) *t {
d.initself()
d.dataonce.do(func() {
doinit(initfunc)
})
<-d.isready
return &d.data
}
func (d *datagetter[t]) get() *t {
d.initself()
<-d.isready
return &d.data
}
func (d *datagetter[t]) initself() {
d.selfonce.do(func() {
d.isready = make(chan struct{})
})
}
func (d *datagetter[t]) doinit(initfunc func(*t)) {
defer func() {
recover()
close(d.isready)
}()
if initfunc != nil {
initfunc(&d.data)
}
}
于是上面两个case可作如下改写。
func func1(resp *datagetter[resp]) {
handle1(resp.initandget(func(r *resp) {
*r = rpc()
}))
}
func func2(resp *datagetter[resp]) {
handle2(resp.initandget(func(r *resp) {
*r = rpc()
}))
}
func case1() {
var resp datagetter[resp]
go func1(&resp)
go func2(&resp)
}
func func1(resp *datagetter[resp]) {
resp.initandget(func(r *resp) {
*r = rpc()
})
}
func func2(resp *datagetter[resp]) {
handle(resp.get())
}
func case2() {
var resp datagetter[resp]
go func1(&resp)
go func2(&resp)
}
看起来满足需求了。但很快我便发现,get接口非常危险。我们看下面case3,我把case2的func1加了个if。
func func1(resp *datagetter[resp]) {
if condition() {
resp.initandget(func(r *resp) {
*r = rpc()
})
}
}
func func2(resp *datagetter[resp]) {
handle(resp.get())
}
func case3() {
var resp datagetter[resp]
go func1(&resp)
go func2(&resp)
}
如果condition()为false,func1没有生产数据,func2就会无限等待。我们要加强get接口的安全性。首先会想到的是加一个超时,这是应该的,但是超时只是最后的兜底,它不会帮你把数据生产出来。我们要做的是加以限制,从逻辑上避免出现有消费但没有生产数据的情况。
我先简单修一下case3。
func func1(resp *datagetter[resp]) {
defer resp.initandget(nil)
if condition() {
resp.initandget(func(r *resp) {
*r = rpc()
})
}
}
func func2(resp *datagetter[resp]) {
handle(resp.get())
}
func case3() {
var resp datagetter[resp]
go func1(&resp)
go func2(&resp)
}
我在func1中加了一行defer,表示如果condition()为false,resp就不会生产了,func2的handle就不会永远阻塞,但是要处理没数据的情况。这个fix处理了永远阻塞的情况,但前提是func1一定会运行。
func func1(resp *datagetter[resp]) {
if condition() {
resp.initandget(func(r *resp) {
*r = rpc()
})
}
}
func func2(resp *datagetter[resp]) {
handle(resp.get())
}
func func3(resp *datagetter[resp]) {
defer resp.initandget(nil)
if condition2() {
func1(resp)
}
}
func case4() {
var resp datagetter[resp]
go func3(&resp)
go func2(&resp)
}
在case4中,func1并不是一定会运行的,func3才是一定会运行的,所以defer语句挪到func3。这样产生一个问题,rpc代码在func1不在func3,看func3代码时会觉得莫名其妙。更要命的是,除了梳理代码,没有任何方式来检测我这个defer写的位置究竟对不对?defer究竟应该写在func1还是func3还是别的函数?有没有漏写?写错的话依然会造成无限等待。怎么办呢?
有一个比较简单的case是func2的handle知道,如果resp有数据,resp一定会在hanle之前调用initandget,那么handle只需要再调用一次initandget(nil)就可以保证不无限等待了。由于只有同步操作能保证先后顺序,为了方便handle与resp的获取在不同协程上进行,我在datagetter增加了一个asyncinit接口。
func (d *datagetter[t]) asyncinit(initfunc func(*t)) {
d.initself()
d.dataonce.do(func() {
go doinit(initfunc)
})
}
其实这个asyncinit的实现是有问题的,如果在asyncinit之前被调用了initandget,asyncinit就会同步等到initandget结束,不符合ayncinit给人的感觉。这里为了便于理解先这样写着,本文最后会给出修正后的完整代码。case5是一个典型的应用场景。
func case5() {
var resp datagetter[resp]
resp.asyncinit(func(r *resp) {
*r = rpc()
})
handle(resp.initandget(nil))
}
对于更一般的情况怎么办呢?于是我借鉴别的编程语言,发现了一个叫promise的东西,我决定把的std::promise抄过来。promise的思想是,我在get的时候,要指定一个promise,这个promise并不是说保证会生产数据,而是说保证这个数据只会在promise的内生产,也就是说,如果promise的生命周期结束了,那么数据就不会再生产。更准确地说,我对promise提出了下面几个要求。
- promise在get之前必须已初始化。
- promise的生命周期一定会结束,不会无限等待。
- 如果数据会生产,则一定会在promise的生命周期结束之前调用initandget或asyncinit。
promise用channel来实现就好了,用make来表示promise的初始化,用close来表示promise生命周期的结束。加上上文提到的要加个超时,因为datagetter作为一个通用的东西,并不知道超时多少是合理的,于是就交给外部来设定,直接采用context的超时。于是get接口就改成下面这样。
func (d *datagetter[t]) get(ctx context.context, promise chan struct{}) *t {
d.initself()
select {
case <-promise:
return d.initandget(nil)
case <-d.isready:
return &d.data
case <-ctx.done():
return d.initandget(nil)
}
}
case4就改写成下面这样。
func func1(resp *datagetter[resp]) {
if condition() {
resp.initandget(func(r *resp) {
*r = rpc()
})
}
}
func func2(ctx, context.context, resp *datagetter[resp], func3promise chan struct{}) {
handle(resp.get(ctx, func3promise))
}
func func3(resp *datagetter[resp], func3promise chan struct{}) {
defer close(func3promise)
if condition2() {
func1(resp)
}
}
func case4() {
ctx, cancel := context.withtimeout(context.background(), time.second)
defer cancel()
var resp datagetter[resp]
func3promise := make(chan struct{})
go func3(&resp, func3promise)
go func2(ctx, &resp, func3promise)
}
这样做的好处是:
- func3promise是只跟func3有关,跟resp无关的,即使有resp2、resp3等也可以用func3promise。在func3 close func3promise也不会觉得奇怪。
- promise跟resp的生产无关,跟消费有关,是get的时候选择的。生产可以无压力地写生产代码,不用担心某个ifelse没覆盖到。不同的地方get的时候可以选择不同的promise。
- 如果get选错promise,promise在get的时候没有初始化,那么程序会panic。我认为panic是比较好定位的错误,因为可以看堆栈。
缺点是get的时候确实需要谨慎选择promise,使promise满足上面说的promise的几个条件。一般在生产与消费分叉之前令promise初始化,比如case4函数,在分叉之后的生产方的第一个函数进行close,比如func3。
有时候并不能简单按上面的promise规则,比如下面的case6,我把case4中func3中的func1调用改为了go func1。
func func1(resp *datagetter[resp]) {
if condition() {
resp.initandget(func(r *resp) {
*r = rpc()
})
}
}
func func2(ctx, context.context, resp *datagetter[resp], func3promise chan struct{}) {
handle(resp.get(ctx, func3promise))
}
func func3(resp *datagetter[resp], func3promise chan struct{}) {
defer close(func3promise)
if condition2() {
go func1(resp)
}
}
func case6() {
ctx, cancel := context.withtimeout(context.background(), time.second)
defer cancel()
var resp datagetter[resp]
func3promise := make(chan struct{})
go func3(&resp, func3promise)
go func2(ctx, &resp, func3promise)
}
于是就出问题了,func3promise被close的时候可能func1还没被调用,导致resp的生产在func3promise生命周期之后,不满足promise定义。怎么办呢?我又把目光投向c ,发现还有个std::move的语义,于是我又把它抄过来了。go语言里一般没有move的概念,我抄什么呢?抄析构函数调用的时机。在c 里,如果一个promise被move到新的变量,那么原变量就不会执行析构函数。我就抄这个逻辑。下面是改写后的case6。
func func1(resp *datagetter[resp], func1promise chan struct{}) {
defer close(func1promise)
if condition() {
resp.initandget(func(r *resp) {
*r = rpc()
})
}
}
func func2(ctx, context.context, resp *datagetter[resp], func1promise chan struct{}) {
handle(resp.get(ctx, func1promise))
}
func func3(resp *datagetter[resp], func1promise chan struct{}) {
isfun1promisemoved := false
defer func() {
if !isfun1promisemoved {
close(func1promise)
}
}()
if condition2() {
isfun1promisemoved = true
go func1(resp, func1promise)
}
}
func case6() {
ctx, cancel := context.withtimeout(context.background(), time.second)
defer cancel()
var resp datagetter[resp]
func1promise := make(chan struct{})
go func3(&resp, func1promise)
go func2(ctx, &resp, func1promise)
}
问题是解决了,但是,这怎么这么复杂?而且并没有很通用。比如下面的case7就不行了,func1被调用多次,其中第一次并不发起rpc,第二次才发起。
func func1(resp *datagetter[resp], func1promise chan struct{}, condition bool) {
defer close(func1promise)
if condition {
resp.initandget(func(r *resp) {
*r = rpc()
})
}
}
func func2(ctx, context.context, resp *datagetter[resp], func1promise chan struct{}) {
handle(resp.get(ctx, func1promise))
}
func func3(resp *datagetter[resp], func1promise chan struct{}) {
isfun1promisemoved := false
defer func() {
if !isfun1promisemoved {
close(func1promise)
}
}()
if condition2() {
isfun1promisemoved = true
go func1(resp, func1promise, false)
}
if condition3() {
isfun1promisemoved = true
go func1(resp, func1promise, true)
}
}
func case7() {
ctx, cancel := context.withtimeout(context.background(), time.second)
defer cancel()
var resp datagetter[resp]
func1promise := make(chan struct{})
go func3(&resp, func1promise)
go func2(ctx, &resp, func1promise)
}
于是我又想了一想,发现我可能抄错作业了,有一个更无脑的语义可以用:func3及在func3生命周期里发起的协程的总的生命周期。于是case7改写成下面这样。
func func1(resp *datagetter[resp], func1promise chan struct{}, condition bool) {
defer close(func1promise)
if condition {
resp.initandget(func(r *resp) {
*r = rpc()
})
}
}
func func2(ctx, context.context, resp *datagetter[resp], func3promise chan struct{}) {
handle(resp.get(ctx, func3promise))
}
func func3(resp *datagetter[resp], func3promise chan struct{}) {
subpromises := make([]chan struct{}, 0)
defer func() {
go func() {
for _, promise := range subpromises {
<-promise
}
close(func3promise)
}()
}()
if condition2() {
func1promise := make(chan struct{})
subpromises = append(subpromises, func1promise)
go func1(resp, func1promise, false)
}
if condition3() {
func1promise := make(chan struct{})
subpromises = append(subpromises, func1promise)
go func1(resp, func1promise, true)
}
}
func case7() {
ctx, cancel := context.withtimeout(context.background(), time.second)
defer cancel()
var resp datagetter[resp]
func3promise := make(chan struct{})
go func3(&resp, func3promise)
go func2(ctx, &resp, func3promise)
}
这个语义就无脑多了,每见到一个可能生产数据的函数调用就加一个subpromise就行了。
总的来说,promise就是要划定一个范围,从生产者消费者分开之前开始,到所有可能发生生产的路径的范围。大多数时候,我们只需要无脑划定一个很粗的范围就足够了,比如case4,case5,case7(case4可看作promise在get调用的时候close掉)。有时候可能真的需要一些很精细的操作去划定一个很精确的范围,不过我暂时还没遇到过,就这样吧,等我遇到了再来补充。
另外提一句,我后来尴尬地发现,其实用datagetter包住数据的方式是很反人类的,因为这改变了生产者的代码。写生产代码的时候可能并不知道哪个数据需要用datagetter包着,等到写消费者的时候才知道。所以,像sync.once这样,在数据之外自行创建一个变量才是更好的方式。所以直接用datagetter[struct{}]就好了。这样的话实际上不需要用泛型了,可以支持更低版本的go。
写了这么多,可能大家会疑惑这跟开头说的最优串并行有什么关系啊?其实我写了这么多,就是为了降低门槛,让大家可以随心所欲地在程序的各个地方起协程发起rpc,同时可以随心所欲地获取其他协程的rpc结果。为了达到全局最优,我的方法不是宏观管控,而是做好每一个微观逻辑。一个典型的新需求的开发流程是这样的:在一个合适的地方起一个协程进入新需求的流程,看看新需求依赖的数据在旧代码的哪里生产,在生产的地方加上datagetter,然后在新需求的协程里等待结果,等到结果马上开始生产新需求的数据,并加上datagetter,新需求的数据需要合入旧逻辑来改变旧逻辑的输出,那么在需要改变旧逻辑的地方,用datagetter去等待新需求的数据,一但等到了马上开始改变。你看,这不就满足了最优串并行的要求了吗?如果旧逻辑是全局最优的,那么加入新需求后的逻辑也依然是全局最优的。这可维护性拉满了啊。
下面是datagetter完整的代码,修复了上面说的asyncinit的问题,去除了泛型。
type datagetterstate = int32
const (
datagetterstatenotstarted datagetterstate = 0
datagetterstatestarted datagetterstate = 1
datagetterstatedone datagetterstate = 2
)
type datagetter struct {
state datagetterstate
selfonce sync.once
isready chan struct{}
}
func (d *datagetter) asyncinit(initfunc func()) {
if !d.ismyturn() {
return
}
d.initself()
go d.doinit(initfunc)
}
func (d *datagetter) ismyturn() bool {
return atomic.compareandswapint32(&d.state, datagetterstatenotstarted, datagetterstatestarted)
}
func (d *datagetter) get(ctx context.context, promise chan struct{}) {
if atomic.loadint32(&d.state) == datagetterstatedone {
return
}
d.initself()
select {
case <-promise:
d.initandget(nil)
return
case <-d.isready:
return
case <-ctx.done():
d.initandget(nil)
return
}
}
func (d *datagetter) initandget(initfunc func()) {
if atomic.loadint32(&d.state) == datagetterstatedone {
return
}
d.initself()
if d.ismyturn() {
d.doinit(initfunc)
}
<-d.isready
}
func (d *datagetter) doinit(initfunc func()) {
defer func() {
recover()
close(d.isready)
atomic.storeint32(&d.state, datagetterstatedone)
}()
if initfunc != nil {
initfunc()
}
}
func (d *datagetter) initself() {
d.selfonce.do(func() {
d.isready = make(chan struct{})
})
}
本作品采用《cc 协议》,转载必须注明作者和本文链接