diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 40cc46d3f43..1c241ad67b3 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -115,9 +115,17 @@ func NewCapture4Test() *Capture { } func (c *Capture) reset(ctx context.Context) error { + conf := config.GetGlobalServerConfig() + sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(), + concurrency.WithTTL(conf.CaptureSessionTTL)) + if err != nil { + return errors.Annotate( + cerror.WrapError(cerror.ErrNewCaptureFailed, err), + "create capture session") + } + c.captureMu.Lock() defer c.captureMu.Unlock() - conf := config.GetGlobalServerConfig() c.info = &model.CaptureInfo{ ID: uuid.New().String(), AdvertiseAddr: conf.AdvertiseAddr, @@ -128,13 +136,7 @@ func (c *Capture) reset(ctx context.Context) error { // It can't be handled even after it fails, so we ignore it. _ = c.session.Close() } - sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(), - concurrency.WithTTL(conf.CaptureSessionTTL)) - if err != nil { - return errors.Annotate( - cerror.WrapError(cerror.ErrNewCaptureFailed, err), - "create capture session") - } + c.session = sess c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey) diff --git a/cdc/capture/capture_test.go b/cdc/capture/capture_test.go new file mode 100644 index 00000000000..66555659424 --- /dev/null +++ b/cdc/capture/capture_test.go @@ -0,0 +1,62 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package capture + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/pingcap/tiflow/pkg/etcd" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/clientv3" +) + +func TestReset(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + // init etcd mocker + clientURL, etcdServer, err := etcd.SetupEmbedEtcd(t.TempDir()) + require.Nil(t, err) + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{clientURL.String()}, + Context: ctx, + DialTimeout: 3 * time.Second, + }) + require.NoError(t, err) + client := etcd.NewCDCEtcdClient(ctx, etcdCli) + // Close the client before the test function exits to prevent possible + // ctx leaks. + // Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229 + defer client.Close() + + cp := NewCapture4Test() + cp.etcdClient = &client + + // simulate network isolation scenarios + etcdServer.Close() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err = cp.reset(ctx) + require.Regexp(t, ".*context canceled.*", err) + wg.Done() + }() + time.Sleep(100 * time.Millisecond) + info := cp.Info() + require.NotNil(t, info) + cancel() + wg.Wait() +} diff --git a/cdc/capture/http_handler.go b/cdc/capture/http_handler.go index eb5249455f0..0ce703008af 100644 --- a/cdc/capture/http_handler.go +++ b/cdc/capture/http_handler.go @@ -26,7 +26,6 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/logutil" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -41,8 +40,6 @@ const ( apiOpVarCaptureID = "capture_id" // forWardFromCapture is a header to be set when a request is forwarded from another capture forWardFromCapture = "TiCDC-ForwardFromCapture" - // getOwnerRetryMaxTime is the retry max time to get an owner - getOwnerRetryMaxTime = 3 ) // HTTPHandler is a HTTPHandler of capture @@ -751,16 +748,9 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { var owner *model.CaptureInfo // get owner - err := retry.Do(ctx, func() error { - o, err := h.capture.GetOwner(ctx) - if err != nil { - log.Info("get owner failed, retry later", zap.Error(err)) - return err - } - owner = o - return nil - }, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime)) + owner, err := h.capture.GetOwner(ctx) if err != nil { + log.Info("get owner failed", zap.Error(err)) _ = c.Error(err) return } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index bb4f50c197c..692e2da1e73 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -52,9 +52,9 @@ const ( etcdRequestProgressDuration = 1 * time.Second // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future etcdWatchChBufferSize = 16 - // etcdTxnTimeoutDuration represents the timeout duration for committing a - // transaction to Etcd - etcdTxnTimeoutDuration = 30 * time.Second + // etcdClientTimeoutDuration represents the timeout duration for + // etcd client to execute a remote call + etcdClientTimeoutDuration = 30 * time.Second ) var ( @@ -103,20 +103,28 @@ func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) e } // Put delegates request to clientv3.KV.Put -func (c *Client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (resp *clientv3.PutResponse, err error) { +func (c *Client) Put( + ctx context.Context, key, val string, opts ...clientv3.OpOption, +) (resp *clientv3.PutResponse, err error) { + putCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error { var inErr error - resp, inErr = c.cli.Put(ctx, key, val, opts...) + resp, inErr = c.cli.Put(putCtx, key, val, opts...) return inErr }) return } // Get delegates request to clientv3.KV.Get -func (c *Client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error) { +func (c *Client) Get( + ctx context.Context, key string, opts ...clientv3.OpOption, +) (resp *clientv3.GetResponse, err error) { + getCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error { var inErr error - resp, inErr = c.cli.Get(ctx, key, opts...) + resp, inErr = c.cli.Get(getCtx, key, opts...) return inErr }) return @@ -127,14 +135,18 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti if metric, ok := c.metrics[EtcdDel]; ok { metric.Inc() } + delCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() // We don't retry on delete operation. It's dangerous. - return c.cli.Delete(ctx, key, opts...) + return c.cli.Delete(delCtx, key, opts...) } // Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error, // such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry. -func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) { - txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) +func (c *Client) Txn( + ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op, +) (resp *clientv3.TxnResponse, err error) { + txnCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) defer cancel() err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error { var inErr error @@ -145,10 +157,14 @@ func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse } // Grant delegates request to clientv3.Lease.Grant -func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) { +func (c *Client) Grant( + ctx context.Context, ttl int64, +) (resp *clientv3.LeaseGrantResponse, err error) { + grantCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error { var inErr error - resp, inErr = c.cli.Grant(ctx, ttl) + resp, inErr = c.cli.Grant(grantCtx, ttl) return inErr }) return @@ -177,20 +193,28 @@ func isRetryableError(rpcName string) retry.IsRetryable { } // Revoke delegates request to clientv3.Lease.Revoke -func (c *Client) Revoke(ctx context.Context, id clientv3.LeaseID) (resp *clientv3.LeaseRevokeResponse, err error) { +func (c *Client) Revoke( + ctx context.Context, id clientv3.LeaseID, +) (resp *clientv3.LeaseRevokeResponse, err error) { + revokeCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error { var inErr error - resp, inErr = c.cli.Revoke(ctx, id) + resp, inErr = c.cli.Revoke(revokeCtx, id) return inErr }) return } // TimeToLive delegates request to clientv3.Lease.TimeToLive -func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption) (resp *clientv3.LeaseTimeToLiveResponse, err error) { +func (c *Client) TimeToLive( + ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption, +) (resp *clientv3.LeaseTimeToLiveResponse, err error) { + timeToLiveCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error { var inErr error - resp, inErr = c.cli.TimeToLive(ctx, lease, opts...) + resp, inErr = c.cli.TimeToLive(timeToLiveCtx, lease, opts...) return inErr }) return