From 9a1b1d87b0cba41a617873f3edaaeae8843fdfb1 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Sat, 2 Apr 2022 18:36:29 +0800 Subject: [PATCH] capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788) close pingcap/tiflow#4778 --- cdc/capture/capture.go | 16 +++++++---- cdc/capture/http_handler.go | 14 ++-------- pkg/etcd/client.go | 56 ++++++++++++++++++++++++++----------- 3 files changed, 52 insertions(+), 34 deletions(-) diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 0db5c21f2c3..510eda7e027 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -80,9 +80,17 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *kv.CDC } func (c *Capture) reset(ctx context.Context) error { + conf := config.GetGlobalServerConfig() + sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(), + concurrency.WithTTL(conf.CaptureSessionTTL)) + if err != nil { + return errors.Annotate( + cerror.WrapError(cerror.ErrNewCaptureFailed, err), + "create capture session") + } + c.captureMu.Lock() defer c.captureMu.Unlock() - conf := config.GetGlobalServerConfig() c.info = &model.CaptureInfo{ ID: uuid.New().String(), AdvertiseAddr: conf.AdvertiseAddr, @@ -93,11 +101,7 @@ func (c *Capture) reset(ctx context.Context) error { // It can't be handled even after it fails, so we ignore it. _ = c.session.Close() } - sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(), - concurrency.WithTTL(conf.CaptureSessionTTL)) - if err != nil { - return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session") - } + c.session = sess c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey) diff --git a/cdc/capture/http_handler.go b/cdc/capture/http_handler.go index c1446b7c551..92d0e9772e6 100644 --- a/cdc/capture/http_handler.go +++ b/cdc/capture/http_handler.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/logutil" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -42,8 +41,6 @@ const ( apiOpVarCaptureID = "capture_id" // forWardFromCapture is a header to be set when a request is forwarded from another capture forWardFromCapture = "TiCDC-ForwardFromCapture" - // getOwnerRetryMaxTime is the retry max time to get an owner - getOwnerRetryMaxTime = 3 ) // HTTPHandler is a HTTPHandler of capture @@ -698,16 +695,9 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { var owner *model.CaptureInfo // get owner - err := retry.Do(ctx, func() error { - o, err := h.capture.GetOwner(ctx) - if err != nil { - log.Info("get owner failed, retry later", zap.Error(err)) - return err - } - owner = o - return nil - }, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime)) + owner, err := h.capture.GetOwner(ctx) if err != nil { + log.Info("get owner failed", zap.Error(err)) _ = c.Error(err) return } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 9c0642778de..07ce471df8f 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -52,9 +52,9 @@ const ( etcdRequestProgressDuration = 1 * time.Second // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future etcdWatchChBufferSize = 16 - // etcdTxnTimeoutDuration represents the timeout duration for committing a - // transaction to Etcd - etcdTxnTimeoutDuration = 30 * time.Second + // etcdClientTimeoutDuration represents the timeout duration for + // etcd client to execute a remote call + etcdClientTimeoutDuration = 30 * time.Second ) var ( @@ -106,20 +106,28 @@ func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) e } // Put delegates request to clientv3.KV.Put -func (c *Client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (resp *clientv3.PutResponse, err error) { +func (c *Client) Put( + ctx context.Context, key, val string, opts ...clientv3.OpOption, +) (resp *clientv3.PutResponse, err error) { + putCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error { var inErr error - resp, inErr = c.cli.Put(ctx, key, val, opts...) + resp, inErr = c.cli.Put(putCtx, key, val, opts...) return inErr }) return } // Get delegates request to clientv3.KV.Get -func (c *Client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error) { +func (c *Client) Get( + ctx context.Context, key string, opts ...clientv3.OpOption, +) (resp *clientv3.GetResponse, err error) { + getCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error { var inErr error - resp, inErr = c.cli.Get(ctx, key, opts...) + resp, inErr = c.cli.Get(getCtx, key, opts...) return inErr }) return @@ -130,8 +138,10 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti if metric, ok := c.metrics[EtcdDel]; ok { metric.Inc() } + delCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() // We don't retry on delete operation. It's dangerous. - return c.cli.Delete(ctx, key, opts...) + return c.cli.Delete(delCtx, key, opts...) } // TxnWithoutRetry delegates request to clientv3.KV.Txn @@ -144,8 +154,10 @@ func (c *Client) TxnWithoutRetry(ctx context.Context) clientv3.Txn { // Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error, // such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry. -func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) { - txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) +func (c *Client) Txn( + ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op, +) (resp *clientv3.TxnResponse, err error) { + txnCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) defer cancel() err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error { var inErr error @@ -156,10 +168,14 @@ func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse } // Grant delegates request to clientv3.Lease.Grant -func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) { +func (c *Client) Grant( + ctx context.Context, ttl int64, +) (resp *clientv3.LeaseGrantResponse, err error) { + grantCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error { var inErr error - resp, inErr = c.cli.Grant(ctx, ttl) + resp, inErr = c.cli.Grant(grantCtx, ttl) return inErr }) return @@ -188,20 +204,28 @@ func isRetryableError(rpcName string) retry.IsRetryable { } // Revoke delegates request to clientv3.Lease.Revoke -func (c *Client) Revoke(ctx context.Context, id clientv3.LeaseID) (resp *clientv3.LeaseRevokeResponse, err error) { +func (c *Client) Revoke( + ctx context.Context, id clientv3.LeaseID, +) (resp *clientv3.LeaseRevokeResponse, err error) { + revokeCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error { var inErr error - resp, inErr = c.cli.Revoke(ctx, id) + resp, inErr = c.cli.Revoke(revokeCtx, id) return inErr }) return } // TimeToLive delegates request to clientv3.Lease.TimeToLive -func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption) (resp *clientv3.LeaseTimeToLiveResponse, err error) { +func (c *Client) TimeToLive( + ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption, +) (resp *clientv3.LeaseTimeToLiveResponse, err error) { + timeToLiveCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error { var inErr error - resp, inErr = c.cli.TimeToLive(ctx, lease, opts...) + resp, inErr = c.cli.TimeToLive(timeToLiveCtx, lease, opts...) return inErr }) return