Skip to content

Commit

Permalink
Merge pull request #1045 from apache/feature/dubbo_invoker_reqnum
Browse files Browse the repository at this point in the history
Imp: destroy invoker smoothly
  • Loading branch information
AlexStocks authored Feb 22, 2021
2 parents d72fcd4 + d84ece5 commit 7d52dd1
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 61 deletions.
90 changes: 53 additions & 37 deletions protocol/dubbo/dubbo_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
)

import (
"github.com/opentracing/opentracing-go"
perrors "github.com/pkg/errors"
)

import (
Expand All @@ -41,28 +39,20 @@ import (
"github.com/apache/dubbo-go/remoting"
)

var (
// ErrNoReply
ErrNoReply = perrors.New("request need @response")
// ErrDestroyedInvoker
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)

var (
attachmentKey = []string{constant.INTERFACE_KEY, constant.GROUP_KEY, constant.TOKEN_KEY, constant.TIMEOUT_KEY,
constant.VERSION_KEY}
)

// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refer to one service and ip.
// DubboInvoker is implement of protocol.Invoker. A dubboInvoker refers to one service and ip.
type DubboInvoker struct {
protocol.BaseInvoker
// the exchange layer, it is focus on network communication.
client *remoting.ExchangeClient
quitOnce sync.Once
clientGuard *sync.RWMutex
client *remoting.ExchangeClient
quitOnce sync.Once
// timeout for service(interface) level.
timeout time.Duration
// Used to record the number of requests. -1 represent this DubboInvoker is destroyed
reqNum int64
}

// NewDubboInvoker constructor
Expand All @@ -73,12 +63,28 @@ func NewDubboInvoker(url *common.URL, client *remoting.ExchangeClient) *DubboInv
if t, err := time.ParseDuration(requestTimeoutStr); err == nil {
requestTimeout = t
}
return &DubboInvoker{
di := &DubboInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
clientGuard: &sync.RWMutex{},
client: client,
reqNum: 0,
timeout: requestTimeout,
}

return di
}

func (di *DubboInvoker) setClient(client *remoting.ExchangeClient) {
di.clientGuard.Lock()
defer di.clientGuard.Unlock()

di.client = client
}

func (di *DubboInvoker) getClient() *remoting.ExchangeClient {
di.clientGuard.RLock()
defer di.clientGuard.RUnlock()

return di.client
}

// Invoke call remoting.
Expand All @@ -87,15 +93,30 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
err error
result protocol.RPCResult
)
if atomic.LoadInt64(&di.reqNum) < 0 {
if !di.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroyed")
result.Err = ErrDestroyedInvoker
result.Err = protocol.ErrDestroyedInvoker
return &result
}

di.clientGuard.RLock()
defer di.clientGuard.RUnlock()

if di.client == nil {
result.Err = protocol.ErrClientClosed
logger.Debugf("result.Err: %v", result.Err)
return &result
}

if !di.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this dubboInvoker is destroying")
result.Err = protocol.ErrDestroyedInvoker
return &result
}
atomic.AddInt64(&(di.reqNum), 1)
defer atomic.AddInt64(&(di.reqNum), -1)

inv := invocation.(*invocation_impl.RPCInvocation)
// init param
Expand Down Expand Up @@ -125,14 +146,13 @@ func (di *DubboInvoker) Invoke(ctx context.Context, invocation protocol.Invocati
timeout := di.getTimeout(inv)
if async {
if callBack, ok := inv.CallBack().(func(response common.CallbackResponse)); ok {
//result.Err = di.client.AsyncCall(NewRequest(url.Location, url, inv.MethodName(), inv.Arguments(), inv.Attachments()), callBack, response)
result.Err = di.client.AsyncRequest(&invocation, url, timeout, callBack, rest)
} else {
result.Err = di.client.Send(&invocation, url, timeout)
}
} else {
if inv.Reply() == nil {
result.Err = ErrNoReply
result.Err = protocol.ErrNoReply
} else {
result.Err = di.client.Request(&invocation, url, timeout, rest)
}
Expand Down Expand Up @@ -162,27 +182,23 @@ func (di *DubboInvoker) getTimeout(invocation *invocation_impl.RPCInvocation) ti
}

func (di *DubboInvoker) IsAvailable() bool {
return di.client.IsAvailable()
client := di.getClient()
if client != nil {
return client.IsAvailable()
}

return false
}

// Destroy destroy dubbo client invoker.
func (di *DubboInvoker) Destroy() {
di.quitOnce.Do(func() {
for {
if di.reqNum == 0 {
di.reqNum = -1
logger.Infof("dubboInvoker is destroyed,url:{%s}", di.GetUrl().Key())
di.BaseInvoker.Destroy()
if di.client != nil {
di.client.Close()
di.client = nil
}
break
}
logger.Warnf("DubboInvoker is to be destroyed, wait {%v} req end,url:{%s}", di.reqNum, di.GetUrl().Key())
time.Sleep(1 * time.Second)
di.BaseInvoker.Destroy()
client := di.getClient()
if client != nil {
di.setClient(nil)
client.Close()
}

})
}

Expand Down
73 changes: 63 additions & 10 deletions protocol/grpc/grpc_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,82 @@ import (
)

import (
hessian2 "github.com/apache/dubbo-go-hessian2"
"github.com/pkg/errors"
"google.golang.org/grpc/connectivity"
)

import (
hessian2 "github.com/apache/dubbo-go-hessian2"

"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/protocol"
)

var errNoReply = errors.New("request need @response")
var (
errNoReply = errors.New("request need @response")
)

// nolint
type GrpcInvoker struct {
protocol.BaseInvoker
quitOnce sync.Once
client *Client
quitOnce sync.Once
clientGuard *sync.RWMutex
client *Client
}

// NewGrpcInvoker returns a Grpc invoker instance
func NewGrpcInvoker(url *common.URL, client *Client) *GrpcInvoker {
return &GrpcInvoker{
BaseInvoker: *protocol.NewBaseInvoker(url),
clientGuard: &sync.RWMutex{},
client: client,
}
}

func (gi *GrpcInvoker) setClient(client *Client) {
gi.clientGuard.Lock()
defer gi.clientGuard.Unlock()

gi.client = client
}

func (gi *GrpcInvoker) getClient() *Client {
gi.clientGuard.RLock()
defer gi.clientGuard.RUnlock()

return gi.client
}

// Invoke is used to call service method by invocation
func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result {
var (
result protocol.RPCResult
)

if !gi.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this grpcInvoker is destroyed")
result.Err = protocol.ErrDestroyedInvoker
return &result
}

gi.clientGuard.RLock()
defer gi.clientGuard.RUnlock()

if gi.client == nil {
result.Err = protocol.ErrClientClosed
return &result
}

if !gi.BaseInvoker.IsAvailable() {
// Generally, the case will not happen, because the invoker has been removed
// from the invoker list before destroy,so no new request will enter the destroyed invoker
logger.Warnf("this grpcInvoker is destroying")
result.Err = protocol.ErrDestroyedInvoker
return &result
}

if invocation.Reply() == nil {
result.Err = errNoReply
}
Expand All @@ -83,21 +125,32 @@ func (gi *GrpcInvoker) Invoke(ctx context.Context, invocation protocol.Invocatio

// IsAvailable get available status
func (gi *GrpcInvoker) IsAvailable() bool {
return gi.BaseInvoker.IsAvailable() && gi.client.GetState() != connectivity.Shutdown
client := gi.getClient()
if client != nil {
return gi.BaseInvoker.IsAvailable() && client.GetState() != connectivity.Shutdown
}

return false
}

// IsDestroyed get destroyed status
func (gi *GrpcInvoker) IsDestroyed() bool {
return gi.BaseInvoker.IsDestroyed() && gi.client.GetState() == connectivity.Shutdown
client := gi.getClient()
if client != nil {
return gi.BaseInvoker.IsDestroyed() && client.GetState() == connectivity.Shutdown
}

return false
}

// Destroy will destroy gRPC's invoker and client, so it is only called once
func (gi *GrpcInvoker) Destroy() {
gi.quitOnce.Do(func() {
gi.BaseInvoker.Destroy()

if gi.client != nil {
_ = gi.client.Close()
client := gi.getClient()
if client != nil {
gi.setClient(nil)
client.Close()
}
})
}
38 changes: 27 additions & 11 deletions protocol/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,25 @@ import (
"context"
)

import (
perrors "github.com/pkg/errors"
uatomic "go.uber.org/atomic"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
)

var (
// ErrClientClosed means client has clossed.
ErrClientClosed = perrors.New("remoting client has closed")
// ErrNoReply
ErrNoReply = perrors.New("request need @response")
// ErrDestroyedInvoker
ErrDestroyedInvoker = perrors.New("request Destroyed invoker")
)

// Invoker the service invocation interface for the consumer
//go:generate mockgen -source invoker.go -destination mock/mock_invoker.go -self_package github.com/apache/dubbo-go/protocol/mock --package mock Invoker
// Extension - Invoker
Expand All @@ -42,17 +56,19 @@ type Invoker interface {
// BaseInvoker provides default invoker implement
type BaseInvoker struct {
url *common.URL
available bool
destroyed bool
available uatomic.Bool
destroyed uatomic.Bool
}

// NewBaseInvoker creates a new BaseInvoker
func NewBaseInvoker(url *common.URL) *BaseInvoker {
return &BaseInvoker{
url: url,
available: true,
destroyed: false,
ivk := &BaseInvoker{
url: url,
}
ivk.available.Store(true)
ivk.destroyed.Store(false)

return ivk
}

// GetUrl gets base invoker URL
Expand All @@ -62,12 +78,12 @@ func (bi *BaseInvoker) GetUrl() *common.URL {

// IsAvailable gets available flag
func (bi *BaseInvoker) IsAvailable() bool {
return bi.available
return bi.available.Load()
}

// IsDestroyed gets destroyed flag
func (bi *BaseInvoker) IsDestroyed() bool {
return bi.destroyed
return bi.destroyed.Load()
}

// Invoke provides default invoker implement
Expand All @@ -77,7 +93,7 @@ func (bi *BaseInvoker) Invoke(context context.Context, invocation Invocation) Re

// Destroy changes available and destroyed flag
func (bi *BaseInvoker) Destroy() {
logger.Infof("Destroy invoker: %s", bi.GetUrl().String())
bi.destroyed = true
bi.available = false
logger.Infof("Destroy invoker: %s", bi.GetUrl())
bi.destroyed.Store(true)
bi.available.Store(false)
}
Loading

0 comments on commit 7d52dd1

Please sign in to comment.