Skip to content

Commit

Permalink
using lock instead atomic reqNum
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks committed Feb 11, 2021
1 parent 1f3b81b commit f15b14e
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 89 deletions.
58 changes: 22 additions & 36 deletions protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,14 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
return &result
}

di.AddInvokerTimes(1)
defer di.AddInvokerTimes(-1)
di.clientGuard.RLock()
defer di.clientGuard.RUnlock()

if di.client == nil {
result.Err = protocol.ErrClientClosed
logger.Debugf("result.Err: %v", result.Err)
return &result
}

if !di.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
Expand Down Expand Up @@ -138,22 +144,17 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
//response := NewResponse(inv.Reply(), nil)
rest := &protocol.RPCResult{}
timeout := di.getTimeout(inv)
client := di.getClient()
if client == nil {
result.Err = protocol.ErrClientClosed
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest)
} else {
result.Err = di.client.Send(&invocation, url, timeout)
}
} else {
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
result.Err = client.AsyncRequest(&invocation, url, timeout, callBack, rest)
} else {
result.Err = client.Send(&invocation, url, timeout)
}
if inv.Reply() == nil {
result.Err = protocol.ErrNoReply
} else {
if inv.Reply() == nil {
result.Err = protocol.ErrNoReply
} else {
result.Err = client.Request(&invocation, url, timeout, rest)
}
result.Err = di.client.Request(&invocation, url, timeout, rest)
}
}
if result.Err == nil {
Expand Down Expand Up @@ -192,26 +193,11 @@ func (di *DubboInvoker) IsAvailable() bool {
// Destroy destroy dubbo client invoker.
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
di.BaseInvoker.Stop()
var times int64
for {
times = di.BaseInvoker.InvokeTimes()
if times == 0 {
di.BaseInvoker.AddInvokerTimes(-1)
logger.Infof("dubboInvoker is destroyed, url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
client := di.getClient()
if client != nil {
di.setClient(nil)
client.Close()
}
break
} else if times < 0 {
logger.Infof("impossible log: dubboInvoker has destroyed, url:{%s}", di.GetUrl().Key())
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", times, di.GetUrl().Key())
time.Sleep(1 * time.Second)
di.BaseInvoker.Destroy()
client := di.getClient()
if client != nil {
di.setClient(nil)
client.Close()
}
})
}
Expand Down
35 changes: 9 additions & 26 deletions protocol/grpc/grpc_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"reflect"
"sync"
"time"
)

import (
Expand Down Expand Up @@ -85,11 +84,10 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio
return &result
}

gi.AddInvokerTimes(1)
defer gi.AddInvokerTimes(-1)
gi.clientGuard.RLock()
defer gi.clientGuard.RUnlock()

client := gi.getClient()
if client == nil {
if gi.client == nil {
result.Err = protocol.ErrClientClosed
return &result
}
Expand All @@ -111,7 +109,7 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio
in = append(in, invocation.ParameterValues()...)

methodName := invocation.MethodName()
method := client.invoker.MethodByName(methodName)
method := gi.client.invoker.MethodByName(methodName)
res := method.Call(in)

result.Rest = res[0]
Expand Down Expand Up @@ -148,26 +146,11 @@ func (gi *GrpcInvoker) IsDestroyed() bool {
// Destroy will destroy gRPC's invoker and client, so it is only called once
func (gi *GrpcInvoker) Destroy() {
gi.quitOnce.Do(func() {
gi.BaseInvoker.Stop()
var times int64
for {
times = gi.BaseInvoker.InvokeTimes()
if times == 0 {
gi.BaseInvoker.AddInvokerTimes(-1)
logger.Infof("grpcInvoker is destroyed, url:{%s}", gi.GetUrl().Key())
gi.BaseInvoker.Destroy()
client := gi.getClient()
if client != nil {
gi.setClient(nil)
client.Close()
}
break
} else if times < 0 {
logger.Infof("impossible log: grpcInvoker has destroyed, url:{%s}", gi.GetUrl().Key())
break
}
logger.Warnf("GrpcInvoker is to be destroyed, wait {%v} req end, url:{%s}", times, gi.GetUrl().Key())
time.Sleep(1 * time.Second)
gi.BaseInvoker.Destroy()
client := gi.getClient()
if client != nil {
gi.setClient(nil)
client.Close()
}
})
}
19 changes: 0 additions & 19 deletions protocol/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ type BaseInvoker struct {
url *common.URL
available uatomic.Bool
destroyed uatomic.Bool
// Used to record the number of requests. -1 represent this invoker is destroyed
ivkNum uatomic.Int64
}

// NewBaseInvoker creates a new BaseInvoker
Expand All @@ -69,7 +67,6 @@ func NewBaseInvoker(url *common.URL) *BaseInvoker {
}
ivk.available.Store(true)
ivk.destroyed.Store(false)
ivk.ivkNum.Store(0)

return ivk
}
Expand All @@ -89,27 +86,11 @@ func (bi *BaseInvoker) IsDestroyed() bool {
return bi.destroyed.Load()
}

// InvokeTimes atomically loads the wrapped value and return the invoke times.
func (bi *BaseInvoker) InvokeTimes() int64 {
return bi.ivkNum.Load()
}

// AddInvokerTimes atomically adds to the wrapped int64 and returns the new value.
func (bi *BaseInvoker) AddInvokerTimes(num int64) int64 {
return bi.ivkNum.Add(num)
}

// Invoke provides default invoker implement
func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Result {
return &RPCResult{}
}

// Stop changes available flag
func (bi *BaseInvoker) Stop() {
logger.Infof("Stop invoker: %s", bi.GetUrl())
bi.available.Store(false)
}

// Destroy changes available and destroyed flag
func (bi *BaseInvoker) Destroy() {
logger.Infof("Destroy invoker: %s", bi.GetUrl())
Expand Down
8 changes: 0 additions & 8 deletions protocol/invoker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ func TestBaseInvoker(t *testing.T) {
assert.NotNil(t, ivk.GetUrl())
assert.True(t, ivk.IsAvailable())
assert.False(t, ivk.IsDestroyed())
assert.Zero(t, ivk.InvokeTimes())

ivk.AddInvokerTimes(1)
assert.True(t, ivk.InvokeTimes() == 1)

ivk.Stop()
assert.False(t, ivk.IsAvailable())
assert.False(t, ivk.IsDestroyed())

ivk.Destroy()
assert.False(t, ivk.IsAvailable())
Expand Down
5 changes: 5 additions & 0 deletions registry/nacos/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ func (nr *nacosRegistry) subscribe(conf *common.URL) (registry.Listener, error)

// subscribe from registry
func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) error {
role, _ := strconv.Atoi(nr.URL.GetParam(constant.ROLE_KEY, ""))
if role != common.CONSUMER {
return nil
}

for {
if !nr.IsAvailable() {
logger.Warnf("event listener game over.")
Expand Down

0 comments on commit f15b14e

Please sign in to comment.