Skip to content

Commit

Permalink
destroy invoker smoothly
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks committed Feb 10, 2021
1 parent ad31d97 commit 1f3b81b
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 65 deletions.
104 changes: 65 additions & 39 deletions protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (

import (
"github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)

import (
Expand All @@ -41,13 +39,6 @@ import (
"github.com/apache/dubbo-go/remoting"
)

var (
// ErrNoReply
ErrNoReply = perrors.New("request need @response")
// ErrDestroyedInvoker
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)

var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY,
constant.VERSION_KEY}
Expand All @@ -57,13 +48,11 @@ var (
type DubboInvoker struct {
protocol.BaseInvoker
// the exchange layer, it is focus on network communication.
client *remoting.ExchangeClient
quitOnce sync.Once
clientGuard *sync.RWMutex
client *remoting.ExchangeClient
quitOnce sync.Once
// timeout for service(interface) level.
timeout time.Duration
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum uatomic.Int64
closed uatomic.Bool
}

// NewDubboInvoker constructor
Expand All @@ -76,30 +65,52 @@ func NewDubboInvoker(url *common.URL, client *remoting.ExchangeClient) *DubboInv
}
di := &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
clientGuard: &sync.RWMutex{},
client: client,
timeout: requestTimeout,
}
di.reqNum.Store(0)
di.closed.Store(false)

return di
}

func (di *DubboInvoker) setClient(client *remoting.ExchangeClient) {
di.clientGuard.Lock()
defer di.clientGuard.Unlock()

di.client = client
}

func (di *DubboInvoker) getClient() *remoting.ExchangeClient {
di.clientGuard.RLock()
defer di.clientGuard.RUnlock()

return di.client
}

// Invoke call remoting.
func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
err error
result protocol.RPCResult
)
if di.closed.Load() {
if !di.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = ErrDestroyedInvoker
result.Err = protocol.ErrDestroyedInvoker
return &result
}

di.AddInvokerTimes(1)
defer di.AddInvokerTimes(-1)

if !di.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroying")
result.Err = protocol.ErrDestroyedInvoker
return &result
}
di.reqNum.Add(1)
defer di.reqNum.Add(-1)

inv := invocation.(*invocation_impl.RPCInvocation)
// init param
Expand Down Expand Up @@ -127,18 +138,22 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
//response := NewResponse(inv.Reply(), nil)
rest := &protocol.RPCResult{}
timeout := di.getTimeout(inv)
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
//result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest)
} else {
result.Err = di.client.Send(&invocation, url, timeout)
}
client := di.getClient()
if client == nil {
result.Err = protocol.ErrClientClosed
} else {
if inv.Reply() == nil {
result.Err = ErrNoReply
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
result.Err = client.AsyncRequest(&invocation, url, timeout, callBack, rest)
} else {
result.Err = client.Send(&invocation, url, timeout)
}
} else {
result.Err = di.client.Request(&invocation, url, timeout, rest)
if inv.Reply() == nil {
result.Err = protocol.ErrNoReply
} else {
result.Err = client.Request(&invocation, url, timeout, rest)
}
}
}
if result.Err == nil {
Expand Down Expand Up @@ -166,25 +181,36 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti
}

func (di *DubboInvoker) IsAvailable() bool {
return di.client.IsAvailable()
client := di.getClient()
if client != nil {
return client.IsAvailable()
}

return false
}

// Destroy destroy dubbo client invoker.
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
di.closed.Store(true)
di.BaseInvoker.Stop()
var times int64
for {
if di.reqNum.Load() == 0 {
di.reqNum.Add(-1)
logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
times = di.BaseInvoker.InvokeTimes()
if times == 0 {
di.BaseInvoker.AddInvokerTimes(-1)
logger.Infof("dubboInvoker is destroyed, url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close()
di.client = nil
client := di.getClient()
if client != nil {
di.setClient(nil)
client.Close()
}
break
} else if times < 0 {
logger.Infof("impossible log: dubboInvoker has destroyed, url:{%s}", di.GetUrl().Key())
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum.Load(), di.GetUrl().Key())
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", times, di.GetUrl().Key())
time.Sleep(1 * time.Second)
}
})
Expand Down
94 changes: 82 additions & 12 deletions protocol/grpc/grpc_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,43 +21,87 @@ import (
"context"
"reflect"
"sync"
"time"
)

import (
hessian2 "github.com/apache/dubbo-go-hessian2"
"github.com/pkg/errors"
"google.golang.org/grpc/connectivity"
)

import (
hessian2 "github.com/apache/dubbo-go-hessian2"

"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

var errNoReply = errors.New("request need @response")
var (
errNoReply = errors.New("request need @response")
)

// nolint
type GrpcInvoker struct {
protocol.BaseInvoker
quitOnce sync.Once
client *Client
quitOnce sync.Once
clientGuard *sync.RWMutex
client *Client
}

// NewGrpcInvoker returns a Grpc invoker instance
func NewGrpcInvoker(url *common.URL, client *Client) *GrpcInvoker {
return &GrpcInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
clientGuard: &sync.RWMutex{},
client: client,
}
}

func (gi *GrpcInvoker) setClient(client *Client) {
gi.clientGuard.Lock()
defer gi.clientGuard.Unlock()

gi.client = client
}

func (gi *GrpcInvoker) getClient() *Client {
gi.clientGuard.RLock()
defer gi.clientGuard.RUnlock()

return gi.client
}

// Invoke is used to call service method by invocation
func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.RPCResult
)

if !gi.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this grpcInvoker is destroyed")
result.Err = protocol.ErrDestroyedInvoker
return &result
}

gi.AddInvokerTimes(1)
defer gi.AddInvokerTimes(-1)

client := gi.getClient()
if client == nil {
result.Err = protocol.ErrClientClosed
return &result
}

if !gi.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this grpcInvoker is destroying")
result.Err = protocol.ErrDestroyedInvoker
return &result
}

if invocation.Reply() == nil {
result.Err = errNoReply
}
Expand All @@ -67,7 +111,7 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio
in = append(in, invocation.ParameterValues()...)

methodName := invocation.MethodName()
method := gi.client.invoker.MethodByName(methodName)
method := client.invoker.MethodByName(methodName)
res := method.Call(in)

result.Rest = res[0]
Expand All @@ -83,21 +127,47 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio

// IsAvailable get available status
func (gi *GrpcInvoker) IsAvailable() bool {
return gi.BaseInvoker.IsAvailable() && gi.client.GetState() != connectivity.Shutdown
client := gi.getClient()
if client != nil {
return gi.BaseInvoker.IsAvailable() && client.GetState() != connectivity.Shutdown
}

return false
}

// IsDestroyed get destroyed status
func (gi *GrpcInvoker) IsDestroyed() bool {
return gi.BaseInvoker.IsDestroyed() && gi.client.GetState() == connectivity.Shutdown
client := gi.getClient()
if client != nil {
return gi.BaseInvoker.IsDestroyed() && client.GetState() == connectivity.Shutdown
}

return false
}

// Destroy will destroy gRPC's invoker and client, so it is only called once
func (gi *GrpcInvoker) Destroy() {
gi.quitOnce.Do(func() {
gi.BaseInvoker.Destroy()

if gi.client != nil {
_ = gi.client.Close()
gi.BaseInvoker.Stop()
var times int64
for {
times = gi.BaseInvoker.InvokeTimes()
if times == 0 {
gi.BaseInvoker.AddInvokerTimes(-1)
logger.Infof("grpcInvoker is destroyed, url:{%s}", gi.GetUrl().Key())
gi.BaseInvoker.Destroy()
client := gi.getClient()
if client != nil {
gi.setClient(nil)
client.Close()
}
break
} else if times < 0 {
logger.Infof("impossible log: grpcInvoker has destroyed, url:{%s}", gi.GetUrl().Key())
break
}
logger.Warnf("GrpcInvoker is to be destroyed, wait {%v} req end, url:{%s}", times, gi.GetUrl().Key())
time.Sleep(1 * time.Second)
}
})
}
Loading

0 comments on commit 1f3b81b

Please sign in to comment.