Dubbo-go 源码笔记(二)客户端调用过程

简介: 有了上一篇文章《Dubbo-go 源码笔记(一)Server 端开启服务过程》的铺垫,可以类比客户端启动于服务端的启动过程。其中最大的区别是服务端通过 zk 注册服务,发布自己的ivkURL并订阅事件开启监听;而客户应该是通过zk注册组件,拿到需要调用的+ q z 5 #serviceURL,更新invoker并重写用户的RPCServiceo k X,从而实现对r S 8 m u x远程过程调用细节的封装。

Dubbo-go 源码笔记(二)客户端调用过程

配置文件和客户端源代码

1. client 配置文件

helloworld 提供的 demo:profiles/client.yaml。

registries :

"demoZk":

protocol: "zookeeper"

timeout : "3s"

address: "127.0.0.1:z ] $ S q { i d ~2181"

username: ""

password: ""

references:

"UserProvider":

# 可以指定多个registry,使用逗号隔开;不指定默认向所有注册中心注册

registry: "demoZk"

protocol : "dubbo"

interface : "com.ikurento.user.UserProvider"

cluster: "failover"

methods :

- name: "GetUser"

retries: 3

可看到配置文件与之前讨论过的 Server 端非常类似,其 refrences 部L R ~ 5 D } d分字段就q . * A I $ m d &是对当前服务要主调的服务的配e F 4置,其中详细说明了调用协议、注册协议、接口 id、调用方法、集群策略等,这些配置都会在之后与注册组件交互、重写 ivk、调用的过程中使用到。

2. 客户端使用框架源码

user.go:

fuA q r o znc init() {

config.SetConsumerService(userProvider)

hessian.RegisterPOJO(&User{})

}

main.go:

func main() {

hessian.RegisterPOJO(&User{})

config.Load()

time.Sleep(3e9) D Y } ` 2

printl! M in("\n\n\nsta8 p _ mrt to test dubbo")

user := &User{}

err := userProvi # `ider.GetUser(context.TODO(), []interface{}{"A001"}, user)

if err != nil {

panic(err)

}

println("rM w D q % Y m q )esponse result: %v\n", user)

initSignal()

}

在官网提供的 h[ A A G F Melloworld de y ) Y | % Z J Nemo 的源码中,可看到与服务端类似,在 userz ` ! [ N C.go 内注册了 rpc-service,以及需要 rpc 传输的结构体 user。

在 main 函数中,同样调用了b n 3 ; ] D | confiX X @ 8g.Load() 函数,L r G A v h P _之后就可以通过实现好的 rpc-service:userProvider 直接调用对应的功能函数,即可实现 rpc 调用。

可以猜X 6 X =到,从 hessian 注册结构、Sb $ L eetConsumerService,到调用函数 .GetUser() 期间,用户定义% [ | F的 rpc-service 也就是 userProvider 对应的函数被重写,重写后的 GetUser 函数已经包含实现了远程调用逻辑的 invoker。

= F j b X C ,下来,就要通过阅读源码,看看 dubbo-go 是如何做到的。

实现远程过程调用

1. 加载配置文件

// file: config/config_* D _ ( )loader.go :Load(~ J r r 0 K)

// Load Dubbo Init

fK a O - b l nunc Load() {

// ini 7 , Qt router

initRouter()

// init the global event dispatcher

ext( h 8 ` q P _ rension.SetAndInitGlobalDispatcher(GetBaseCo; & C { K =nfig().EventDispatcherType)

// start the metadata report if config% b F ] a S p R n seth L - H %

if err := starp 6 F 1 UtMetadataReport(k _ = 2 ! O t 1GetApv H H a / # `plicationConfig().MetadataType, GetBaseConfig().MetadataRepW q E wortCon1 | @ g Pfig); err != nil {

logger.Errorf("U n bProvider starts metadata re+ B Q H _ # K ;porg N 5 l }t error, and the error is {%#v}", err)

return

}Q w &

// refer9 + f d ~ P : eence config

loadConsumerConfig()

在 main 函数w 5 X $ | # G F中调用了 config.Load() 函数,进而调用了 loadCoY x insumerConfig,类似于之前讲到的 server 端配~ o G A h } lA O , P读入函数。

在 loadConsumerConfig 函数中,进行了三步操作:

// config/cod m y 7nfig_loader.go

func loadConsumerConfig() {

// 1 init other con1 T ~sumer config3 o # M 1 B 0 ~

con- S v | ? E N b 2ConfigType :=R 7 k s R a consumerConfig.ConfigType

for key, value :=h _ v 2 m g Q range extension.GetDefaultl & y YCo3 % $ O k 9 ( hnfigReader] j n() {}

checkApplicationName(consumerConfig.ApplicationConfig)

configCenterRefreshConsumer()

checkRegistries(consumerConfig.Registries, consumerConfig.Registry)

// 2 refer-implement-reference

for key, ref := range consumerConfig.Referenc- S / oes {

if ref.Generic {

genericService := NewGenericService(key)

S: ] } tetCon2 f ? - G e =sumerService(genericService)

}

rpcServi8 F N *ce := GetConsumerService(key)

ref.id = key

ref.t 9 & 7 ] g a !Refer(G 7 Y s r ! X # .rpcService)

ref.Implement(r3 O ZpcService)

}

// 3 wait for invoker is available, if wa0 F Y C 6 b * jit over default 3s, then panic

for {}

}

  1. 检查配置文件并将配置写入内存
  2. 在 for 循环内部,依次引用(refp Z 1 6 Q fer)并且实例化(implement)每个被调 reference
  3. 等待三秒钟所有 invoker 就绪n [ a k

其中重要的就是 for 循环里面的引用和实例化,两步操作,会在接下来展开讨论。

至此,配置已经被写入了框架。

2. 获取远程 Servico 6 r ee URL,实现可供调用的 invoker

上述的 ref.Refer 完成的就是这部分的操作。

Dubbo-go 源码笔记(二)客户端调用过程

图(一)

1)构造注册 url

和 server 端类似,存在注册 url 和服务 url,dubbo 习惯将服务 url 作为注册 url 的 sub。

// file: config/referencj [ w % 1 j )e_config.go: Refer()

func (c *Refx ] ? 8 AerenceConfig) Refer(_ interface{}) {

//(一T m : C + # p ,)配置url参数(serviceUrl),将会d | p H v k 作为sub

cfgURL := com$ * e v p 4 B /mon.NewURLWithOptions(

common.WithPath(c.id),

common.WithProf a N A r 0 ] R }tocol(c.Protocol),

common.Wik A . w l y & TthParams(c.getUrlMap()),

common.WithParamsValue(constant.BEAN_NAME_KEY, c.id),

// (二)注册地址可以通过url格式给定,也可以通过配置格式给定

// 这一步的意义就是配置->提取信息生成URL

if c.Url != "" {// 用户给定url信息,可以是点对点的地址,也可以是注册中心的地址

// 1. user specified URL, could be pee8 J 7 Sr-to-peer address, or register center's address.

urlStrings := gxstrings.RegSplit(c.Url, "\\s*[;]+\\s*")

for _, urlSt. ; s Y ) br := rang( * L y D y ,e urlStrings {

serviceUrl, err := common.NewURL(urlStr)

}

} else {// 配置读入注册中心的信息

// assemble SubURL from register center's configuration mode

// 这是注册url,D l h T I pprotocol = rn : 9 t b S [egistrq K d Qy,包含了zk的{ / N 9 a ~ 1用户名、密码、ip等等

c.urls = loadRegistries(c.Regi[ D Z i F bstry, consumerConfig.Registries, common.CONSUMER)

// set url to regUrls

for _, regUrl := range c.urls {

regUrl.SubURL = cfgURL// regUrl的subUa 4 ; _ F ] 5 h DRl` d D D f } U存当前配置url

}

}

//至此,无论通过什么形式,已经拿到了全部的regURL

// (三)获取registryProtocol实例,调用其RefeY , * M m r ^ #r方法,传入新构建好的regURL

if len(c.urls) == 1 {

// 这一步Q W I C 2 2访问到registry/protocol/protocol.go reg# J 3 j * }isU 7 # ZtryProtocol.Refer

// 这里是registry

c.invoker = extension.GetProtocoP i Ol(cC o ^ y J K r t (.urls[0].Protocol).Refer(*c.urlsh Q % o 9 T ~ A )[0])f e [ V o T

} else@ , T _ 6 ( L x {

// 如果有多个注T 4 f c r 7 v z R册中心7 + t $ F 7,即有多个invoker,则采取集群策略

invokh } h Cers := make([]protocop 5 A m v b 0 Jl.Invoker, 0, len(c.urls))

}

这个函数中,已经处理完, B * j从 Register 配置到 RegisterURL 的转换,即图(一)中部分:

Dubbo-go 源码笔记(二)客户端调用过程

接下来,已经拿到的 url 将被传递给 RegistryProtocol,进一步 refer。

2)registryP- J 0 Q , L = } {rotocol 获取到 zkRegistry 实例,进一步^ a d h $ N 4 Refer

// filew j P: registry/protocol/protocol.go: Refer

// Refer provider service from registry center

// 拿到的是配置文件registries的url,他能够生成一个invoker = 指向目的addr,以供客户端直接调用。

fuw e u l M @ # L &nc (proto *re: : p 9 5 4 rgistryProtocol) RefH e @er(url c+ c @ n m | m (ommon.URL) protocol.Invoker {

var registryUrl = url

// 这里拿到的是referenceConfig^ ~ + k L A n,serviceUrI ) o = ^ = Sl里面包f p r [ r含了[ P f c , VReference的所有信息,包含interfaceName、method等等

var serviceUrl = reg s T v SgistryUrl.SubURL

if registryUrl.= F k YProtocol == constant.REGISTRYv 3 ; ! l D 1_PROTOCOL {Y U F E : E a K G// registryUrl.Proto = "registry"

protocol := registryUrl.Gez ) X = ] 5 [tParam(cD E P D T Y ~onstant.REGISTRY_KEY, "q t I 6")

registryUrl.Protocol = protocol//替换成了具体的值,比如"zookeeper"

}

// 接口对象

var reg registry.Registry

// (一)实例化接口对象,缓存策略

if regI, loaded := proto.registries.Load(o T , b & lregistryUrl.Key()); !loaded {

// 缓存中不存在当前registry,新建一个regp = h [ ;

reg = getRegistry(&registryUrl)

// 缓存起来

proto.registries.Store(registryUrl.Key(), reg)

} else {

reg = regI.(registry.Registry)

}

/P K N/ 到这里,获取到了! R o z V a ) Dreg实例 zookeeper的registry

//(二)根据Register的实例zkRegistry和传入的regURL新建一个direcm K G j f 8 % =tory

// 这一步存在复杂的异步逻辑,从注册中心拿到了目的service的真实addr,获取了invoker并放入directory,

// 这一步将在下面详细给出步H 6 c + G B 1

// new registry directory for store serviceE 7 9 a url from registry

directory, err := extension.GetDefaultRegistryDirectory(&registc ; m - D iryR O ZUrl, reg)

if err != nil {

logger.Errorf("consumer service %v create registry= m ? B z a dir: ! Tectory error, error message is %s, and will return nilh ; # G ) 5 / T invoker!",

serviceUrl.String(), err.Error())

return nil

}

// (三)DoRegister 在zk上注册当前client s# z P 3 ~ Eervice

err = reg.Register(*serviceUrl)

if err != nil {

logger.Errorf("consumeq , N 2 q 6r service %v register registry %v error, error message is %s",

service+ R H $ _ | }Url.String(), regiO H ] l G ^ J 3stryUrl.String(), err.Er. f s D Rror()). C u O a - Z R !

}

// (四)nj 4 aew cluster invoker,将directory写入集群,获得具有集群策略的invoker

cluster := extension.GetCluster(serviceUrl.GetPar* s H | ^ o C {am(constant.CLUSTER_KEY, consu A Wtant.DEFAULT_CLUSTER))

invoker := cluster.Join(directory)

// invoker保存

proto.invokers = append(proto.invokers, invoker)

return invokers ? - k k / w

}

可详细阅读上述注释,这个函数完成了从 url 到 invoker 的全部过程:

(一)首先获得 Registry 对象,默认是之. { G Z i s `前实例化的 zkRegistry,和之前 serve{ Z W tr 获取 Registry 的处理很类似。

(二)通过构造一个新的 dire= z m 6 wctory,异步拿到之前在 zk 上注册的 server 端信息,生成 invoker。

(三)在 zk. v 3 3 K - 上注册当前 service。

(四)集群策略,获得最终 invoker。

这一步完成了图(一)中所有余下的绝大多数操作,接下来就需要详细地查看 directory 的构造过程。

3)构造 directory(包含较复杂的异步操作)

Dubbo-go 源码笔记(二)客户端调用过程

图(二)

上述的 extension.GetDefaultReg+ O * y Z Ki- = L L r X x *stryDirectory(&registryUrl, reg) 函数,本质上调用了已经注册好的 NewRegistryDirectol b 0 R % ]ry 函数:

/// Z s fH P o L l S Dile: registry/directory/directory.go: NewRegistryDirectory()

// NewReN ! H ! [ q u W DgistryDirectorM n # ? m +y will create a new Rem s I s hgistryDirectory

// 这个函数作为default注册在extension上面

// url为注册url,reg为zookeeper registry

func NewRegistryDirectory(url *common.URL, registry registry.Registry) (cluster.Directory, error) {

if url.Sub+ 4 eURL == nil {

return nil, perrors.Errorf("url is invalid, suburl can not be niY F W l")

}

dir := &RegistryDirectory{

BaseDF @ P D { Y iirectory: directory.NewBaseDirectory(url),

cacheInvoker% o 2 8 U Ds: []protoco@ ! { 8 T F F %l.Invoker{},

cacheInvokersMap: &sync.Map{},

serviceType: u, K 2 E T g `rl.S@ x * OubURL.Service(),

registry: registry,

}

dir.consumerConfigurationListeneA m i x z ~r = newConsumerConfigurationListener(dir)

go dir.4 m = m 0 t J nsu$ H o z ? ? _ tbscribe(url.SubURL)

return dir, nil

}

首先构造了一个注册 directory,开启协程调用其 subscribe 函数,传入 serviceURL。

这个 directory 目前包含了对应的 zkRegistry,以及传入的 URL,它的 cacheInvokers 部分是空的。

进入 dir.subscribe(url.SubURL) 这个异步函数:

/ file: registry/directory/directory.go: subsct ; 1 Q S 4 u }ribe()

// subscribe fror U + O a S Mm registry

func (dir *RegistryDirectory) subscribe(url *common.URL) {

// 增加两个监听,

dir.consumerConfigurationListener.addNotifyListe4 K 2 Y b a -ner(dir)

dir.referencn s - [ 2eConfigurationListener = newReferenceConfigurationL0 O / 8 :istener(dir, url)

// subscribe调用

dir.registry.Subscribe(url, dir)

}

重点来了,它调用了 zkRN B 1egistry 的 Subscribe 方法Y 2 ) $ d f 2 & S,与此同时将自己作为H i ^ ) P R ConfigLisH S T C d otener 传入。

我认为这种传入 listener 的设计模式非u I { s s z常值得学习,而且很有 java 的味道。

针对等待 zk 返回订阅信息这样的异步操作,需要传入一个V c W Listener,这个/ B a Q s ( Listener 需要实现 Notify 方法,进而在作为参数9 ~ h m 1 p & f传入内部之后,可以被异步地调用 Notify,将内部触发的异步事件c K 3 I“传递出来”,再进一步处理加工。

层层的 L= Q G Q istener 事件链,能将传入的原始 servic6 meURL 通过 zkConn 发送给 zk 服务,获取到服务端注册好的 url 对应的二进制信息。

而 Notify 回调链,则将G ( #这串 byte[] 一步一步解析、加工;以事件的形式向外传递,最终落到 director2 3 ` V M - 4 } Uy 上的时候,已经是成型的 newInvokers 了。

具体细节不L j C @ 1 _ t % 7再以源码形式展示,可参照上图查r n # h l阅源码。

至此已经拿到了 server 端注册好的真实 invor q . R 9 ( 0ker。

完成了图2 t $ n ^(一)中的部分:

Dubbo-go 源码笔记(二)客户端调用过程

4)构造带有集群策略的 clusterinvoker

* u 7过上述操作,已经拿到了 server 端 Invokers,放入了 directory 的 cacheinvokers 数组里面缓存。

后续的操作对应本文: E w # F U k %从 url 到 invoker 的过程的最后一步,由 direc. V i itory 生成带有特性集群策略的 invoker。

/T ^ j 6 l $ @ :/ (四)new cluster invoker,将director- p Ly写入集群,获得具有集群策略的invoker

clusA / H u [ +ter := ext$ z Z Y 7ension.GetCluster(serviceUrl.GetParam(constant.CLUSTER_KEY, constant.DEFAULT_CLUSTER))

invoker := cluster.Join(div I b 5rectory)

123

Join 函数的实现就是如下函数:

// file: cluster/cluster_iA A i p . ) lmpl/failover_cluster_invokers.go: newFailoverClusterInvoker()

func newFailoverClusterInvoker(directory cluster.Directory) protocol.Invoker {

return &failoverClusterInvoker{

baseClustB e 6 ZerInvoker: newBaseClusterl y j 2 h w = o gInvoker(directory),

}

}

12345

dubbo-go 框架默认选择 failover 策略,既然返回了一个 invoker,我们查看一下 fa= Z DiloverClusterInvoker 的 InvV ; B A eoker 方法,看它是如何将集群策略A 4 { = 1 [封装到 Inv; I s Y z 4 Zoker 函数内部的:

// file: clus~ : 1 |ter/cluster_impl/failover_cluster_invo5 ^ 5 f _ nkers3 & P } O O.go: Invoker()

// Invoker 函数

func (invoker *failoverClusterInvoker) Invoke(ctx context.Conte; i E z 8 s F O 9xt, invocation protocol.Invocation) protocol.Result {

//调用List方法拿到directory缓存的所有invokers

invokers := invoker.directory.List(in^ + f / ? nvocation)

if err := invoker.checkInvokers(inw K j 1vokers, invocation); err != na Q c 1il {// 检查是否可以实现调用

return &prQ I : Ro` 3 ~ Ztocol.RPCRes} & M | ! *ult{Err: err}

}

// 获取来自用户方向传入的

methodName := invocation.MethodName()

retries := getRetries(invokers, methodName)

loadBalance := getLoadBalance(invo. o u ukers[0], invocation8 M % | p)

for i := 0; i <= retriev I C ! H [ x xs; i++ {

// 重要!这里是集群策略的体现,失败后R , w重试!

//Reselect before retryS U l z to avoid a change of candidate `invokers`z H J ! G &.

//NOTE:[ L ] T q if `invokers`a 5 ~ } C ! s D changed, then `invoked` also lose accuracy.

if i > 0 {

if err := invoker.checkWhetherDestroyed(); err != nil0 B ] L J l B {

return &protocol.RPCResult{Eu z N C a vrr1 Z X j: err}

}

invokers = invoker.directory.List(invocation)5 [ O

if err := invoker.checkInvokers(invokers, invocation); err != nil {

return &protocol.RPCResult{Err: err}

}

}

// 这里是负载均衡策略的体现!选择特定ivk进行调用。

ivk := inQ e U ) N Vvoker.doSelect(loadBalance, invocation, invokers, invoked)$ = = _

if ivk == nil {

continue

}

invoked = append(invoked,K ! k ivk)

//DO INVOKE

result = ivk.Invoke(ctx, invocat` 3 ~ E & /ion)= F z : | r h

if result.E4 % e | qrror() != nil {

providers = append(providers, ivk.GetUrl().Key())

continue

}

return result

}

}

看了很多 Invoke 函数的实现,所有类似的 Invoker 函数都包含两个方向:一个是用户方向的 invcation;一个是函数方向的底层 invokers。

A l s } H i集群策略的 invoke 函数本身作为l 6 x ~ K接线员,把 invop L N 4 ;cation 一步步解析,根据调用需求和集群策s } 8 o ) 略,选择特定的 invoker 来执行。

proxy 函数也是这样,一个是用户方向的 ins[] reflect.Type, 一个是函数方向的 invoker。

proxy 函数负责将 ins 转换为 invocation,调用对应 invoker 的 invoker 函数,实现连通。

而出于g Y u这样的设计,可以在一* R z S步步 Invoker 封装的过程中,每个 Invoker 只关心自己负责操作的部分,从而使整个调用栈解耦。

妙啊!!!

至此,我们理解了 failoverClusterInvoker 的 Invoke 函数实现,x Y ~ X y & q R也正是和这个集群策略 InvJ B ` * 2 1 D woker 被返回,接受来自上方的调用。z x x v

已完成图(一)v g W 2中的:

Dubbo-go 源码笔记(二)客户端调用过程

5)在 z; 0 z Eookeeper 上注册当前 client

拿到 invokers 后,可以回到这个函8 w - f数了:

// f| * 1 C 4 file: config/refrence_config.go: Refer()

if len(c.urls) == 1 {

// 这一步访问到re+ : ( =gistry/protocol/protocol.go registryProtocol.Refer

c.invoker = extension.GetProtocol(c.urls[0].Protocol).R5 u _ j U @efer(*c.urls[0])

// (一)拿到了真实的invokers

} else {

// 如果有多个注册中心,即有多个ine D r S y 1 |voker,则采取集群策略

invokers := make([]protocol.Invoker, 0, len(c.urls))

cluB * Mster := extension.GetCluE ! w S R 2 Pster(hitClu)

// If 'zone-aware' policy select, the invoker wrap sequence would be:

// ZoneAwareClusterInvoker(StaticDirectory) ->

// FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker

c.invoker = cluster.Join(directory.NewStaticDi/ @ k s h d f qrectory(invokers))

}

// (二)create proxy,为函数配置代理

if c.Async {

callbaX s [ Ock := Ge3 P M ^ AtCallback(c.id)

c.pxy = extension.GetProxyFactory(consumerConfig.Pr2 t h NoxyFactorq t e %y).GetAsyncProxy(c.invokt M k s $ 5 y #er, callback, cfgURL)

} else {

// 这里c.b & X x zinvoker已经是目的addr了

c.pxy = extension.GetProxyFactory(cW v M RonsumerConfig.ProxyFactory)._ p ! J # N 0 + *GetProxy(c.invoker, cfgURL)

}

我们有了可以打通的 invokers,但还不能直接调用,因为 invoker 的入参是 invocation,而调用函数使用的是具体的参数列表,需要通过一层 proxy 来规范入参和出参。

接下来新建一个默认 proxy,放置在 c.proxy 内,以供后续使用。

至此,完成了图(一; | 7)中最后的操作:

Dubbo-go 源码笔记(二)客户端调用过程

3. 将v y ? % # 4 4 W C调用逻辑W ) f Q r R m K S以代理函数的形式写入 rpc-service

上面完成了 config.Refer 操作,回到:

config/config_loadeZ O n } Z `r.go: loadConsuL ? E ; } h O :merConfig()

Dubbo-go 源码笔记(二)客户端调用过程

下一个重要的函数是 Implement,它的操作较为简单:旨在使用上面生成的 c.proxy 代理,链接用户自己定义的 r! F } h y W * | )pcService 到 clusterInvoker 的信息传输。

函数较长,只选取了重G m = . 6 + ) t 0要的部分:

// file: common/proxy/proxy.go: Implement()

// Implement

// proxy implement

// In consumer, RPCService like:

// type XxxProvider struct {

// Y1 d p Xyy func(ctx context.Context, args []interface{}, rspw W ` ( O N ( *Zzz) error

// }

// Implement 实现的A e 4过程,就是proxy根据函数名和返回值,通过调用invoker 构造出拥有远程调用逻辑的代理函数

// 将当前rpc所有可供调用的函数注册到proxy.rpc内

func (p *Proxy) Implement(v common.RPCService) {

// makeDubboCallProxy 这是一个构造代理函数,这个函数的返回值是func(in []reflect.Value) []reflect.Value 这样一个函数

// 这个被返回的函数是请求实现的载^ Z F - [ u C , 1体,由他来发起调E H K H J u用获取结果

makeDubboCallProxy := func(methodName string, outs []reflect.Type) func(in []reflect.Value) []reflect.Van X F . Jlue {

return func(in []reflect.Value) []reflect.Value {

// 根据methodName和outs的类型,构造这样一个函数,这个函数能将in 输入的value转换为输出的value

// 这个函数具体的实现如下:

// 目前拿到了 methodName、所有入参的interface和value,出参数reply

// (一)根据这些生成一个 rpcinvocation

inv = invocation_impl.NH J A 4ewRPCInvocationWithOptions(

invocation_impl.WithMethodH X ; cNa6 y {me(methodName),

invocation_impl.WithArguments(inIArr),

invocation_impl.WithReply(reply.Interface()),

invocation_impl.Withl 0 b u A t g nCallBack(p.callBack),

invocation_impl.Wit3 J G & L AhParameterValues(I W ` ?inVArr))

for k, value := range p.attachments {

inv.SetAttachments(k, value)

}

// add user setAttachment

at/ 1 = om := invCtx.Value(constant.AttachmentKey) // 如果传入的ctx里面有attachmentF K i - O + ^ A 1,也要写入inv

if m, ok := atm.(map[string]string); ok {

for k,s D _ - value := ran! 7 X G f n G i }ge m {

inv.SetAttachments(k, value)

}

}

// 至此构造inv完毕

// (二)触发Invoker 之前已经将cluster_invoker放入proxy,使用Invoke方法,通过getty远程过程调用

result := p.invokeM G M A [.Invoke(invCtx, inv)

// 如果有attachment,则加入

if len(result.At} z ( ^ gtachments()) > 0 {

invCtx = context.WithValue(invCtx, constant.AttachmentKey, result.Attachments())

}

}

}

numField := valueOfEle4 _ _m.NumField()

f? Y i C J Ior i := 0; i < numField; i++ {

t := typeOL ; { ;f.Field(i)

methodName := tG j 7 k ! T H | ?.Tag.Get("dubbo")

if methodName == "" {

methodName = t.Name

}

f := valu l G ~ v SeOfElem.Field(i)

if f.Kind() == refle? K K ` h g Y ct.Func &&f Y O s Z N 0amp; f.IsValid() && f.CanS_ R F Z H Jet() { /y l Q / i z/ 针对于每个函数

outNum := t.Type.NumOut()

// 规定函数输出只能有1/2个

if outNum != 1 && outNumy ` & K R z # 0 != 2 {

logger.Warnf("me X Othod %s of mtype %v has wrong number of in out parameters %d; needsp 0 o _ t D / exactly 1/2",

t.Name, t.Type.String(), outNum)

continue

}

// The latest return type of theS j m method must be error.

// 规定最后一个返P F o 6 O G -回值一定是error

if returnType := t.Type.Out(outNum - 1); returnType != ty Y D 3 . 0 / - {ypError {

logger.Warnf("the latest re: 9 I b M t MturU 4 9n type %s of method %q is not error", retg h S [ k ] IurnType, t.Name)

continue

}

// 获取到所有的出参类型,E = Z 2 a G 9放到数组里

var funcOuts = make([]reflect.Type, ok j @ kutNum)

for i := 0; i < outNum; i++ {

funcOuts[i] = t.Type.Out(i)

}

// dO 7 Ho method proxy here:

// (三)调用make函数,传入函数名和返回值,获得能调用o ! + w ] A ` %远程的proxL ) 1 p d Z iy,将这个proxy替换掉原来的函数位置

f.Set(reflect.MakeFunc(f.Type(), makeDubboCallProxy(methodName, funcOuts)))

logger.Debugf("set method [%s]",T ( } methodName)

}

}

}

正如之前所说,proxy 的作用是将用户定义的函数参数列表,转化为抽象的 invo2 } Y N I = dcation 传入 Invoker,进行调用。

其中已标明有三处较为重要的地方:

  1. 在代理函数中实现由参数列表生成 Invocation 的逻辑
  2. 在代理函数实现调用 InvokZ p t q z u 0 ;er 的逻辑
  3. 将代理函数替换为原始 rpc-service 对应函数

至此,也就解决了一开始的问题:

/Y 4 ~ 2 p 9/ file: client.go: mZ { U : m s +ain()

config.Load()

user := &User{}

err := userProvk m q = %ider.GetUser(context6 _ y l e.TODO(), []interface{}{"A001"}, user)

这里直接调用用户定义的 rpcService 的函数 GetUser,此处实际调用的是经过重写入的函数代理,所以就能实现远程调用了。

从 client 到 server 的 invoker 嵌套链- 小结

在阅读 dubbo-go 源码的过程中,我们能够发现一条清晰的 invoker-proxy 嵌套链,希望能够通过图的形式来展现:g I S

Dubbo-go 源码笔记(二)客户端调用过程

作者:李志信 (GitHubID LaurenceLiZhixin),中山大h L 3 n W O D r学软件工程专业在校学生,擅长使用 Java/Go 语言,专注于云原生和微服务等技术方向。

本文为阿里云原创内容,未经允许不得转载