diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 0db5c21f2c3..ded1c9e381b 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -80,9 +80,17 @@ func NewCapture(pdClient pd.Client, kvStorage tidbkv.Storage, etcdClient *kv.CDC } func (c *Capture) reset(ctx context.Context) error { + conf := config.GetGlobalServerConfig() + sess, err := concurrency.NewSession(c.EtcdClient.Client.Unwrap(), + concurrency.WithTTL(conf.CaptureSessionTTL)) + if err != nil { + return errors.Annotate( + cerror.WrapError(cerror.ErrNewCaptureFailed, err), + "create capture session") + } + c.captureMu.Lock() defer c.captureMu.Unlock() - conf := config.GetGlobalServerConfig() c.info = &model.CaptureInfo{ ID: uuid.New().String(), AdvertiseAddr: conf.AdvertiseAddr, @@ -93,11 +101,14 @@ func (c *Capture) reset(ctx context.Context) error { // It can't be handled even after it fails, so we ignore it. _ = c.session.Close() } +<<<<<<< HEAD sess, err := concurrency.NewSession(c.etcdClient.Client.Unwrap(), concurrency.WithTTL(conf.CaptureSessionTTL)) if err != nil { return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session") } +======= +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)) c.session = sess c.election = concurrency.NewElection(sess, kv.CaptureOwnerKey) diff --git a/cdc/capture/capture_test.go b/cdc/capture/capture_test.go new file mode 100644 index 00000000000..4840bf80291 --- /dev/null +++ b/cdc/capture/capture_test.go @@ -0,0 +1,68 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package capture + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/pingcap/tiflow/pkg/etcd" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/client/pkg/v3/logutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func TestReset(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + // init etcd mocker + clientURL, etcdServer, err := etcd.SetupEmbedEtcd(t.TempDir()) + require.Nil(t, err) + logConfig := logutil.DefaultZapLoggerConfig + logConfig.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel) + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{clientURL.String()}, + Context: ctx, + LogConfig: &logConfig, + DialTimeout: 3 * time.Second, + }) + require.NoError(t, err) + client := etcd.NewCDCEtcdClient(ctx, etcdCli) + // Close the client before the test function exits to prevent possible + // ctx leaks. + // Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229 + defer client.Close() + + cp := NewCapture4Test(nil) + cp.EtcdClient = &client + + // simulate network isolation scenarios + etcdServer.Close() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err = cp.reset(ctx) + require.Regexp(t, ".*context canceled.*", err) + wg.Done() + }() + time.Sleep(100 * time.Millisecond) + info := cp.Info() + require.NotNil(t, info) + cancel() + wg.Wait() +} diff --git a/cdc/capture/http_handler.go b/cdc/capture/http_handler.go index c1446b7c551..6e1fe45da6f 100644 --- a/cdc/capture/http_handler.go +++ b/cdc/capture/http_handler.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/logutil" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/version" "github.com/tikv/client-go/v2/oracle" "go.uber.org/zap" @@ -42,8 +41,6 @@ const ( apiOpVarCaptureID = "capture_id" // forWardFromCapture is a header to be set when a request is forwarded from another capture forWardFromCapture = "TiCDC-ForwardFromCapture" - // getOwnerRetryMaxTime is the retry max time to get an owner - getOwnerRetryMaxTime = 3 ) // HTTPHandler is a HTTPHandler of capture @@ -698,6 +695,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { var owner *model.CaptureInfo // get owner +<<<<<<< HEAD:cdc/capture/http_handler.go err := retry.Do(ctx, func() error { o, err := h.capture.GetOwner(ctx) if err != nil { @@ -707,7 +705,11 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { owner = o return nil }, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime)) +======= + owner, err := h.capture.GetOwnerCaptureInfo(ctx) +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)):cdc/api/open.go if err != nil { + log.Info("get owner failed", zap.Error(err)) _ = c.Error(err) return } diff --git a/cdc/capture/main_test.go b/cdc/capture/main_test.go index 78239f32f59..6847b86e0b4 100644 --- a/cdc/capture/main_test.go +++ b/cdc/capture/main_test.go @@ -1,4 +1,8 @@ +<<<<<<< HEAD // Copyright 2021 PingCAP, Inc. +======= +// Copyright 2022 PingCAP, Inc. +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)) // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,6 +20,7 @@ package capture import ( "testing" +<<<<<<< HEAD "go.uber.org/goleak" ) @@ -27,4 +32,11 @@ func TestMain(m *testing.M) { } goleak.VerifyTestMain(m, opts...) +======= + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)) } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 3d128f7a77a..4a812d43ab9 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -52,9 +52,9 @@ const ( etcdRequestProgressDuration = 1 * time.Second // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future etcdWatchChBufferSize = 16 - // etcdTxnTimeoutDuration represents the timeout duration for committing a - // transaction to Etcd - etcdTxnTimeoutDuration = 30 * time.Second + // etcdClientTimeoutDuration represents the timeout duration for + // etcd client to execute a remote call + etcdClientTimeoutDuration = 30 * time.Second ) var ( @@ -105,21 +105,39 @@ func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) e }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), retry.WithBackoffMaxDelay(backoffMaxDelayInMs), retry.WithMaxTries(maxTries), retry.WithIsRetryableErr(isRetryableError(rpcName))) } +<<<<<<< HEAD // Put delegates request to clientv3.KV.Put func (c *Client) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (resp *clientv3.PutResponse, err error) { +======= +// Put delegates request to clientV3.KV.Put +func (c *Client) Put( + ctx context.Context, key, val string, opts ...clientV3.OpOption, +) (resp *clientV3.PutResponse, err error) { + putCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)) err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error { var inErr error - resp, inErr = c.cli.Put(ctx, key, val, opts...) + resp, inErr = c.cli.Put(putCtx, key, val, opts...) return inErr }) return } +<<<<<<< HEAD // Get delegates request to clientv3.KV.Get func (c *Client) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (resp *clientv3.GetResponse, err error) { +======= +// Get delegates request to clientV3.KV.Get +func (c *Client) Get( + ctx context.Context, key string, opts ...clientV3.OpOption, +) (resp *clientV3.GetResponse, err error) { + getCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)) err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error { var inErr error - resp, inErr = c.cli.Get(ctx, key, opts...) + resp, inErr = c.cli.Get(getCtx, key, opts...) return inErr }) return @@ -130,8 +148,15 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti if metric, ok := c.metrics[EtcdTxn]; ok { metric.Inc() } +<<<<<<< HEAD // We don't retry on delete operatoin. It's dangerous. return c.cli.Delete(ctx, key, opts...) +======= + delCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() + // We don't retry on delete operation. It's dangerous. + return c.cli.Delete(delCtx, key, opts...) +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)) } // TxnWithoutRetry delegates request to clientv3.KV.Txn @@ -144,8 +169,15 @@ func (c *Client) TxnWithoutRetry(ctx context.Context) clientv3.Txn { // Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error, // such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry. +<<<<<<< HEAD func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) { txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) +======= +func (c *Client) Txn( + ctx context.Context, cmps []clientV3.Cmp, opsThen, opsElse []clientV3.Op, +) (resp *clientV3.TxnResponse, err error) { + txnCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)) defer cancel() err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error { var inErr error @@ -155,11 +187,20 @@ func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse return } +<<<<<<< HEAD // Grant delegates request to clientv3.Lease.Grant func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) { +======= +// Grant delegates request to clientV3.Lease.Grant +func (c *Client) Grant( + ctx context.Context, ttl int64, +) (resp *clientV3.LeaseGrantResponse, err error) { + grantCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)) err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error { var inErr error - resp, inErr = c.cli.Grant(ctx, ttl) + resp, inErr = c.cli.Grant(grantCtx, ttl) return inErr }) return @@ -187,21 +228,39 @@ func isRetryableError(rpcName string) retry.IsRetryable { } } +<<<<<<< HEAD // Revoke delegates request to clientv3.Lease.Revoke func (c *Client) Revoke(ctx context.Context, id clientv3.LeaseID) (resp *clientv3.LeaseRevokeResponse, err error) { +======= +// Revoke delegates request to clientV3.Lease.Revoke +func (c *Client) Revoke( + ctx context.Context, id clientV3.LeaseID, +) (resp *clientV3.LeaseRevokeResponse, err error) { + revokeCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)) err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error { var inErr error - resp, inErr = c.cli.Revoke(ctx, id) + resp, inErr = c.cli.Revoke(revokeCtx, id) return inErr }) return } +<<<<<<< HEAD // TimeToLive delegates request to clientv3.Lease.TimeToLive func (c *Client) TimeToLive(ctx context.Context, lease clientv3.LeaseID, opts ...clientv3.LeaseOption) (resp *clientv3.LeaseTimeToLiveResponse, err error) { +======= +// TimeToLive delegates request to clientV3.Lease.TimeToLive +func (c *Client) TimeToLive( + ctx context.Context, lease clientV3.LeaseID, opts ...clientV3.LeaseOption, +) (resp *clientV3.LeaseTimeToLiveResponse, err error) { + timeToLiveCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutDuration) + defer cancel() +>>>>>>> 7fb7097e1 (capture(ticdc): fix the problem that openapi is blocked when pd is abnormal (#4788)) err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error { var inErr error - resp, inErr = c.cli.TimeToLive(ctx, lease, opts...) + resp, inErr = c.cli.TimeToLive(timeToLiveCtx, lease, opts...) return inErr }) return