diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 535d59e7185..acb15de8de6 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -118,9 +118,17 @@ func NewCapture4Test(o owner.Owner) *Capture { } func (c *Capture) reset(ctx context.Context) error { + conf := config.GetGlobalServerConfig() + sess, err := concurrency.NewSession(c.EtcdClient.Client.Unwrap(), + concurrency.WithTTL(conf.CaptureSessionTTL)) + if err != nil { + return errors.Annotate( + cerror.WrapError(cerror.ErrNewCaptureFailed, err), + "create capture session") + } + c.captureMu.Lock() defer c.captureMu.Unlock() - conf := config.GetGlobalServerConfig() c.info = &model.CaptureInfo{ ID: uuid.New().String(), AdvertiseAddr: conf.AdvertiseAddr, @@ -131,13 +139,6 @@ func (c *Capture) reset(ctx context.Context) error { // It can't be handled even after it fails, so we ignore it. _ = c.session.Close() } - sess, err := concurrency.NewSession(c.EtcdClient.Client.Unwrap(), - concurrency.WithTTL(conf.CaptureSessionTTL)) - if err != nil { - return errors.Annotate( - cerror.WrapError(cerror.ErrNewCaptureFailed, err), - "create capture session") - } c.session = sess c.election = concurrency.NewElection(sess, etcd.CaptureOwnerKey) @@ -605,6 +606,8 @@ func (c *Capture) WriteDebugInfo(ctx context.Context, w io.Writer) { if c.processorManager != nil { fmt.Fprintf(w, "\n\n*** processors info ***:\n\n") c.processorManager.WriteDebugInfo(ctx, w, doneM) + } else { + close(doneM) } // wait the debug info printed wait(doneM) diff --git a/cdc/capture/capture_test.go b/cdc/capture/capture_test.go new file mode 100644 index 00000000000..febc47e8f41 --- /dev/null +++ b/cdc/capture/capture_test.go @@ -0,0 +1,68 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package capture + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/pingcap/tiflow/pkg/etcd" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/client/pkg/v3/logutil" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func TestReset(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + // init etcd mocker + clientURL, etcdServer, err := etcd.SetupEmbedEtcd(t.TempDir()) + require.Nil(t, err) + logConfig := logutil.DefaultZapLoggerConfig + logConfig.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel) + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: []string{clientURL.String()}, + Context: ctx, + LogConfig: &logConfig, + DialTimeout: 3 * time.Second, + }) + require.NoError(t, err) + client := etcd.NewCDCEtcdClient(ctx, etcdCli) + // Close the client before the test function exits to prevent possible + // ctx leaks. + // Ref: https://github.com/grpc/grpc-go/blob/master/stream.go#L229 + defer client.Close() + + cp := NewCapture4Test(nil) + cp.EtcdClient = &client + + // simulate network isolation scenarios + etcdServer.Close() + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + err = cp.reset(ctx) + require.Regexp(t, ".*context canceled.*", err) + wg.Done() + }() + time.Sleep(100 * time.Millisecond) + info := cp.Info() + require.NotNil(t, info) + cancel() + wg.Wait() +} diff --git a/cdc/capture/main_test.go b/cdc/capture/main_test.go new file mode 100644 index 00000000000..eae9fd1aedb --- /dev/null +++ b/cdc/capture/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package capture + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 5d15d7ff39d..8cc631cf8a1 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -53,9 +53,12 @@ const ( etcdRequestProgressDuration = 1 * time.Second // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future etcdWatchChBufferSize = 16 - // etcdTxnTimeoutDuration represents the timeout duration for committing a - // transaction to Etcd - etcdTxnTimeoutDuration = 30 * time.Second + // etcdClientTimeoutWithRetry represents the timeout duration for + // etcd client to perform Get, Put and Txn operations + etcdClientTimeoutWithRetry = 5 * time.Second + // etcdClientTimeoutWithoutRetry represents the timeout duration for + // etcd client to perform Del operations + etcdClientTimeoutWithoutRetry = 30 * time.Second ) var ( @@ -111,8 +114,10 @@ func retryRPC(rpcName string, metric prometheus.Counter, etcdRPC func() error) e func (c *Client) Put(ctx context.Context, key, val string, opts ...clientV3.OpOption) (resp *clientV3.PutResponse, err error) { err = retryRPC(EtcdPut, c.metrics[EtcdPut], func() error { + putCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithRetry) + defer cancel() var inErr error - resp, inErr = c.cli.Put(ctx, key, val, opts...) + resp, inErr = c.cli.Put(putCtx, key, val, opts...) return inErr }) return @@ -122,8 +127,10 @@ func (c *Client) Put(ctx context.Context, key, val string, func (c *Client) Get(ctx context.Context, key string, opts ...clientV3.OpOption) (resp *clientV3.GetResponse, err error) { err = retryRPC(EtcdGet, c.metrics[EtcdGet], func() error { + getCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithRetry) + defer cancel() var inErr error - resp, inErr = c.cli.Get(ctx, key, opts...) + resp, inErr = c.cli.Get(getCtx, key, opts...) return inErr }) return @@ -135,17 +142,19 @@ func (c *Client) Delete(ctx context.Context, key string, if metric, ok := c.metrics[EtcdDel]; ok { metric.Inc() } + delCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithoutRetry) + defer cancel() // We don't retry on delete operation. It's dangerous. - return c.cli.Delete(ctx, key, opts...) + return c.cli.Delete(delCtx, key, opts...) } // Txn delegates request to clientV3.KV.Txn. The error returned can only be a non-retryable error, // such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry. func (c *Client) Txn(ctx context.Context, cmps []clientV3.Cmp, opsThen, opsElse []clientV3.Op) (resp *clientV3.TxnResponse, err error) { - txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) - defer cancel() err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error { + txnCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithRetry) + defer cancel() var inErr error resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit() return inErr @@ -157,8 +166,10 @@ func (c *Client) Txn(ctx context.Context, func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientV3.LeaseGrantResponse, err error) { err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error { + grantCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithoutRetry) + defer cancel() var inErr error - resp, inErr = c.cli.Grant(ctx, ttl) + resp, inErr = c.cli.Grant(grantCtx, ttl) return inErr }) return @@ -190,8 +201,10 @@ func isRetryableError(rpcName string) retry.IsRetryable { func (c *Client) Revoke(ctx context.Context, id clientV3.LeaseID) (resp *clientV3.LeaseRevokeResponse, err error) { err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error { + revokeCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithoutRetry) + defer cancel() var inErr error - resp, inErr = c.cli.Revoke(ctx, id) + resp, inErr = c.cli.Revoke(revokeCtx, id) return inErr }) return @@ -201,8 +214,10 @@ func (c *Client) Revoke(ctx context.Context, func (c *Client) TimeToLive(ctx context.Context, lease clientV3.LeaseID, opts ...clientV3.LeaseOption) (resp *clientV3.LeaseTimeToLiveResponse, err error) { err = retryRPC(EtcdRevoke, c.metrics[EtcdRevoke], func() error { + timeToLiveCtx, cancel := context.WithTimeout(ctx, etcdClientTimeoutWithoutRetry) + defer cancel() var inErr error - resp, inErr = c.cli.TimeToLive(ctx, lease, opts...) + resp, inErr = c.cli.TimeToLive(timeToLiveCtx, lease, opts...) return inErr }) return