diff --git a/protocol/dubbo/dubbo_invoker.go b/protocol/dubbo/dubbo_invoker.go index 6dcf2568fa..2d1ef1f169 100644 --- a/protocol/dubbo/dubbo_invoker.go +++ b/protocol/dubbo/dubbo_invoker.go @@ -20,6 +20,8 @@ package dubbo import ( "strconv" "sync" + "sync/atomic" + "time" ) import ( @@ -34,7 +36,11 @@ import ( invocation_impl "github.com/apache/dubbo-go/protocol/invocation" ) -var Err_No_Reply = perrors.New("request need @response") +var ( + // ErrNoReply ... + ErrNoReply = perrors.New("request need @response") + ErrDestroyedInvoker = perrors.New("request Destroyed invoker") +) var ( attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY} @@ -44,12 +50,15 @@ type DubboInvoker struct { protocol.BaseInvoker client *Client quitOnce sync.Once + // Used to record the number of requests. -1 represent this DubboInvoker is destroyed + reqNum int64 } func NewDubboInvoker(url common.URL, client *Client) *DubboInvoker { return &DubboInvoker{ BaseInvoker: *protocol.NewBaseInvoker(url), client: client, + reqNum: 0, } } @@ -59,6 +68,15 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { err error result protocol.RPCResult ) + if di.reqNum < 0 { + // Generally, the case will not happen, because the invoker has been removed + // from the invoker list before destroy,so no new request will enter the destroyed invoker + logger.Warnf("this dubboInvoker is destroyed") + result.Err = ErrDestroyedInvoker + return &result + } + atomic.AddInt64(&(di.reqNum), 1) + defer atomic.AddInt64(&(di.reqNum), -1) inv := invocation.(*invocation_impl.RPCInvocation) for _, k := range attachmentKey { @@ -82,7 +100,7 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { } } else { if inv.Reply() == nil { - result.Err = Err_No_Reply + result.Err = ErrNoReply } else { result.Err = di.client.Call(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), response) } @@ -98,10 +116,20 @@ func (di *DubboInvoker) Invoke(invocation protocol.Invocation) protocol.Result { func (di *DubboInvoker) Destroy() { di.quitOnce.Do(func() { - di.BaseInvoker.Destroy() - - if di.client != nil { - di.client.Close() + for { + if di.reqNum == 0 { + di.reqNum = -1 + logger.Info("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key()) + di.BaseInvoker.Destroy() + if di.client != nil { + di.client.Close() + di.client = nil + } + break + } + logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key()) + time.Sleep(1 * time.Second) } + }) } diff --git a/registry/directory/directory.go b/registry/directory/directory.go index e88c611f6f..d60a340ac1 100644 --- a/registry/directory/directory.go +++ b/registry/directory/directory.go @@ -106,7 +106,10 @@ func (dir *registryDirectory) update(res *registry.ServiceEvent) { } func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { - var url *common.URL + var ( + url *common.URL + oldInvoker protocol.Invoker = nil + ) //judge is override or others if res != nil { url = &res.Service @@ -123,10 +126,10 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { switch res.Action { case remoting.EventTypeAdd, remoting.EventTypeUpdate: //dir.cacheService.EventTypeAdd(res.Path, dir.serviceTTL) - dir.cacheInvoker(url) + oldInvoker = dir.cacheInvoker(url) case remoting.EventTypeDel: //dir.cacheService.EventTypeDel(res.Path, dir.serviceTTL) - dir.uncacheInvoker(url) + oldInvoker = dir.uncacheInvoker(url) logger.Infof("selector delete service url{%s}", res.Service) default: return @@ -135,8 +138,14 @@ func (dir *registryDirectory) refreshInvokers(res *registry.ServiceEvent) { newInvokers := dir.toGroupInvokers() dir.listenerLock.Lock() - defer dir.listenerLock.Unlock() dir.cacheInvokers = newInvokers + dir.listenerLock.Unlock() + // After dir.cacheInvokers is updated,destroy the oldInvoker + // Ensure that no request will enter the oldInvoker + if oldInvoker != nil { + oldInvoker.Destroy() + } + } func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { @@ -174,12 +183,18 @@ func (dir *registryDirectory) toGroupInvokers() []protocol.Invoker { return groupInvokersList } -func (dir *registryDirectory) uncacheInvoker(url *common.URL) { +// uncacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil +func (dir *registryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker { logger.Debugf("service will be deleted in cache invokers: invokers key is %s!", url.Key()) - dir.cacheInvokersMap.Delete(url.Key()) + if cacheInvoker, ok := dir.cacheInvokersMap.Load(url.Key()); ok { + dir.cacheInvokersMap.Delete(url.Key()) + return cacheInvoker.(protocol.Invoker) + } + return nil } -func (dir *registryDirectory) cacheInvoker(url *common.URL) { +// cacheInvoker return abandoned Invoker,if no Invoker to be abandoned,return nil +func (dir *registryDirectory) cacheInvoker(url *common.URL) protocol.Invoker { dir.overrideUrl(dir.GetDirectoryUrl()) referenceUrl := dir.GetDirectoryUrl().SubURL @@ -190,7 +205,7 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { } if url == nil { logger.Error("URL is nil ,pls check if service url is subscribe successfully!") - return + return nil } //check the url's protocol is equal to the protocol which is configured in reference config or referenceUrl is not care about protocol if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" { @@ -207,10 +222,11 @@ func (dir *registryDirectory) cacheInvoker(url *common.URL) { newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(*newUrl) if newInvoker != nil { dir.cacheInvokersMap.Store(newUrl.Key(), newInvoker) - cacheInvoker.(protocol.Invoker).Destroy() + return cacheInvoker.(protocol.Invoker) } } } + return nil } //select the protocol invokers from the directory @@ -235,10 +251,11 @@ func (dir *registryDirectory) IsAvailable() bool { func (dir *registryDirectory) Destroy() { //TODO:unregister & unsubscribe dir.BaseDirectory.Destroy(func() { - for _, ivk := range dir.cacheInvokers { + invokers := dir.cacheInvokers + dir.cacheInvokers = []protocol.Invoker{} + for _, ivk := range invokers { ivk.Destroy() } - dir.cacheInvokers = []protocol.Invoker{} }) }