Skip to content

Commit

Permalink
capture(ticdc): fix the problem that openapi is blocked when pd is ab…
Browse files Browse the repository at this point in the history
…normal (pingcap#4788)

close pingcap#4778
  • Loading branch information
CharlesCheung96 committed Apr 14, 2022
1 parent 96dc222 commit ddc1fd3
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 40 deletions.
16 changes: 10 additions & 6 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,17 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *kv.CDC
}

func (c *Capture) reset(ctx context.Context) error {
conf := config.GetGlobalServerConfig()
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrNewCaptureFailed, err),
"create capture session")
}

c.captureMu.Lock()
defer c.captureMu.Unlock()
conf := config.GetGlobalServerConfig()
c.info = &model.CaptureInfo{
ID: uuid.New().String(),
AdvertiseAddr: conf.AdvertiseAddr,
Expand All @@ -93,11 +101,7 @@ func (c *Capture) reset(ctx context.Context) error {
// It can't be handled even after it fails, so we ignore it.
_ = c.session.Close()
}
sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(),
concurrency.WithTTL(conf.CaptureSessionTTL))
if err != nil {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}

c.session = sess
c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey)

Expand Down
14 changes: 2 additions & 12 deletions cdc/capture/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand All @@ -42,8 +41,6 @@ const (
apiOpVarCaptureID = "capture_id"
// forWardFromCapture is a header to be set when a request is forwarded from another capture
forWardFromCapture = "TiCDC-ForwardFromCapture"
// getOwnerRetryMaxTime is the retry max time to get an owner
getOwnerRetryMaxTime = 3
)

// HTTPHandler is a HTTPHandler of capture
Expand Down Expand Up @@ -698,16 +695,9 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) {

var owner *model.CaptureInfo
// get owner
err := retry.Do(ctx, func() error {
o, err := h.capture.GetOwner(ctx)
if err != nil {
log.Info("get owner failed, retry later", zap.Error(err))
return err
}
owner = o
return nil
}, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime))
owner, err := h.capture.GetOwner(ctx)
if err != nil {
log.Info("get owner failed", zap.Error(err))
_ = c.Error(err)
return
}
Expand Down
4 changes: 4 additions & 0 deletions cdc/capture/main_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
<<<<<<< HEAD
// Copyright 2021 PingCAP, Inc.
=======
// Copyright 2022 PingCAP, Inc.
>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788))
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
68 changes: 46 additions & 22 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/clientv3"
clientv3 "go.etcd.io/etcd/client/v3"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -52,9 +52,9 @@ const (
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
// etcdClientTimeoutDuration represents the timeout duration for
// etcd client to execute a remote call
etcdClientTimeoutDuration = 30 * time.Second
)

var (
Expand Down Expand Up @@ -105,21 +105,29 @@ func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) e
}, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(maxTries), retry.WithIsRetryableErr(isRetryableError(rpcName)))
}

// Put delegates request to clientv3.KV.Put
func (c *Client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (resp *clientv3.PutResponse, err error) {
// Put delegates request to clientV3.KV.Put
func (c *Client) Put(
ctx context.Context, key, val string, opts ...clientV3.OpOption,
) (resp *clientV3.PutResponse, err error) {
putCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error {
var inErr error
resp, inErr = c.cli.Put(ctx, key, val, opts...)
resp, inErr = c.cli.Put(putCtx, key, val, opts...)
return inErr
})
return
}

// Get delegates request to clientv3.KV.Get
func (c *Client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error) {
// Get delegates request to clientV3.KV.Get
func (c *Client) Get(
ctx context.Context, key string, opts ...clientV3.OpOption,
) (resp *clientV3.GetResponse, err error) {
getCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error {
var inErr error
resp, inErr = c.cli.Get(ctx, key, opts...)
resp, inErr = c.cli.Get(getCtx, key, opts...)
return inErr
})
return
Expand All @@ -130,8 +138,10 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti
if metric, ok := c.metrics[EtcdDel]; ok {
metric.Inc()
}
delCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
// We don't retry on delete operation. It's dangerous.
return c.cli.Delete(ctx, key, opts...)
return c.cli.Delete(delCtx, key, opts...)
}

// TxnWithoutRetry delegates request to clientv3.KV.Txn
Expand All @@ -144,8 +154,10 @@ func (c *Client) TxnWithoutRetry(ctx context.Context) clientv3.Txn {

// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error,
// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
func (c *Client) Txn(
ctx context.Context, cmps []clientV3.Cmp, opsThen, opsElse []clientV3.Op,
) (resp *clientV3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
var inErr error
Expand All @@ -155,11 +167,15 @@ func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse
return
}

// Grant delegates request to clientv3.Lease.Grant
func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) {
// Grant delegates request to clientV3.Lease.Grant
func (c *Client) Grant(
ctx context.Context, ttl int64,
) (resp *clientV3.LeaseGrantResponse, err error) {
grantCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error {
var inErr error
resp, inErr = c.cli.Grant(ctx, ttl)
resp, inErr = c.cli.Grant(grantCtx, ttl)
return inErr
})
return
Expand Down Expand Up @@ -187,21 +203,29 @@ func isRetryableError(rpcName string) retry.IsRetryable {
}
}

// Revoke delegates request to clientv3.Lease.Revoke
func (c *Client) Revoke(ctx context.Context, id clientv3.LeaseID) (resp *clientv3.LeaseRevokeResponse, err error) {
// Revoke delegates request to clientV3.Lease.Revoke
func (c *Client) Revoke(
ctx context.Context, id clientV3.LeaseID,
) (resp *clientV3.LeaseRevokeResponse, err error) {
revokeCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
var inErr error
resp, inErr = c.cli.Revoke(ctx, id)
resp, inErr = c.cli.Revoke(revokeCtx, id)
return inErr
})
return
}

// TimeToLive delegates request to clientv3.Lease.TimeToLive
func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption) (resp *clientv3.LeaseTimeToLiveResponse, err error) {
// TimeToLive delegates request to clientV3.Lease.TimeToLive
func (c *Client) TimeToLive(
ctx context.Context, lease clientV3.LeaseID, opts ...clientV3.LeaseOption,
) (resp *clientV3.LeaseTimeToLiveResponse, err error) {
timeToLiveCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration)
defer cancel()
err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error {
var inErr error
resp, inErr = c.cli.TimeToLive(ctx, lease, opts...)
resp, inErr = c.cli.TimeToLive(timeToLiveCtx, lease, opts...)
return inErr
})
return
Expand Down

0 comments on commit ddc1fd3

Please sign in to comment.