diff --git a/cdc/kv/etcd.go b/cdc/kv/etcd.go index 86dd0843680..41025ee22dc 100644 --- a/cdc/kv/etcd.go +++ b/cdc/kv/etcd.go @@ -327,7 +327,7 @@ func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.Cha if err != nil { return errors.Trace(err) } - resp, err := c.Client.Txn(ctx).If( + resp, err := c.Client.TxnWithoutRetry(ctx).If( clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0), clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0), ).Then( @@ -674,7 +674,7 @@ func (c CDCEtcdClient) AtomicPutTaskStatus( return errors.Trace(err) } - resp, err := c.Client.Txn(ctx).If(writeCmp).Then( + resp, err := c.Client.TxnWithoutRetry(ctx).If(writeCmp).Then( clientv3.OpPut(key, value), ).Commit() if err != nil { @@ -731,7 +731,7 @@ func (c CDCEtcdClient) PutTaskPositionOnChange( } key := GetEtcdKeyTaskPosition(changefeedID, captureID) - resp, err := c.Client.Txn(ctx).If( + resp, err := c.Client.TxnWithoutRetry(ctx).If( clientv3.Compare(clientv3.ModRevision(key), ">", 0), clientv3.Compare(clientv3.Value(key), "=", data), ).Else(clientv3.OpPut(key, data)).Commit() @@ -799,7 +799,7 @@ func (c CDCEtcdClient) SetChangeFeedStatusTTL( // PutAllChangeFeedStatus puts ChangeFeedStatus of each changefeed into etcd func (c CDCEtcdClient) PutAllChangeFeedStatus(ctx context.Context, infos map[model.ChangeFeedID]*model.ChangeFeedStatus) error { var ( - txn = c.Client.Txn(ctx) + txn = c.Client.TxnWithoutRetry(ctx) ops = make([]clientv3.Op, 0, embed.DefaultMaxTxnOps) ) for changefeedID, info := range infos { @@ -814,7 +814,7 @@ func (c CDCEtcdClient) PutAllChangeFeedStatus(ctx context.Context, infos map[mod if err != nil { return cerror.WrapError(cerror.ErrPDEtcdAPIError, err) } - txn = c.Client.Txn(ctx) + txn = c.Client.TxnWithoutRetry(ctx) ops = ops[:0] } } diff --git a/go.mod b/go.mod index a41faa6afb7..f80f19a40a2 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,7 @@ require ( github.com/pingcap/parser v0.0.0-20210831085004-b5390aa83f65 github.com/pingcap/tidb v1.1.0-beta.0.20210907130457-cd8fb24c5f7e github.com/pingcap/tidb-tools v5.0.3+incompatible + github.com/pingcap/tidb/parser v0.0.0-20220224040743-5af053e9d314 github.com/prometheus/client_golang v1.5.1 github.com/r3labs/diff v1.1.0 github.com/spf13/cobra v1.0.0 diff --git a/go.sum b/go.sum index 8aa2ba09cb5..a21482b7b01 100644 --- a/go.sum +++ b/go.sum @@ -673,6 +673,8 @@ github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:O github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ= github.com/pingcap/tidb-tools v5.0.3+incompatible h1:vYMrW9ux+3HRMeRZ1fUOjy2nyiodtuVyAyK270EKBEs= github.com/pingcap/tidb-tools v5.0.3+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM= +github.com/pingcap/tidb/parser v0.0.0-20220224040743-5af053e9d314 h1:acXc/9Fs1/L78dKmMlENohrBXTLq6pXB7FYJ4L/gnAo= +github.com/pingcap/tidb/parser v0.0.0-20220224040743-5af053e9d314/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= github.com/pingcap/tipb v0.0.0-20210708040514-0f154bb0dc0f h1:q6WgGOeY+hbkvtKLyi6nAew7Ptl5vXyeI61VJuJdXnQ= github.com/pingcap/tipb v0.0.0-20210708040514-0f154bb0dc0f/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -939,6 +941,7 @@ golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 h1:pLI5jrR7OSLijeIDcmRxNm golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20181106170214-d68db9428509/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1295,8 +1298,21 @@ honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9 honnef.co/go/tools v0.2.0 h1:ws8AfbgTX3oIczLPNPCu5166oBg9ST2vNs0rcht+mDE= honnef.co/go/tools v0.2.0/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= -modernc.org/mathutil v1.2.2 h1:+yFk8hBprV+4c0U9GjFtL+dV3N8hOJ8JCituQcMShFY= +modernc.org/fileutil v1.0.0/go.mod h1:JHsWpkrk/CnVV1H/eGlFf85BEpfkrp56ro8nojIq9Q8= +modernc.org/golex v1.0.1/go.mod h1:QCA53QtsT1NdGkaZZkF5ezFwk4IXh4BGNafAARTC254= +modernc.org/lex v1.0.0/go.mod h1:G6rxMTy3cH2iA0iXL/HRRv4Znu8MK4higxph/lE7ypk= +modernc.org/lexer v1.0.0/go.mod h1:F/Dld0YKYdZCLQ7bD0USbWL4YKCyTDRDHiDTOs0q0vk= +modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= modernc.org/mathutil v1.2.2/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/mathutil v1.4.1 h1:ij3fYGe8zBF4Vu+g0oT7mB06r8sqGWKuJu1yXeR4by8= +modernc.org/mathutil v1.4.1/go.mod h1:mZW8CKdRPY1v87qxC/wUdX5O1qDzXMP5TH3wjfpga6E= +modernc.org/parser v1.0.0/go.mod h1:H20AntYJ2cHHL6MHthJ8LZzXCdDCHMWt1KZXtIMjejA= +modernc.org/parser v1.0.2/go.mod h1:TXNq3HABP3HMaqLK7brD1fLA/LfN0KS6JxZn71QdDqs= +modernc.org/scanner v1.0.1/go.mod h1:OIzD2ZtjYk6yTuyqZr57FmifbM9fIH74SumloSsajuE= +modernc.org/sortutil v1.0.0/go.mod h1:1QO0q8IlIlmjBIwm6t/7sof874+xCfZouyqZMLIAtxM= +modernc.org/strutil v1.0.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= +modernc.org/strutil v1.1.0/go.mod h1:lstksw84oURvj9y3tn8lGvRxyRC1S2+g5uuIzNfIOBs= +modernc.org/y v1.0.1/go.mod h1:Ho86I+LVHEI+LYXoUKlmOMAM1JTXOCfj8qi1T8PsClE= moul.io/zapgorm2 v1.1.0/go.mod h1:emRfKjNqSzVj5lcgasBdovIXY1jSOwFz2GQZn1Rddks= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/pkg/errorutil/ignore.go b/pkg/errorutil/ignore.go new file mode 100644 index 00000000000..05ecf2ad36d --- /dev/null +++ b/pkg/errorutil/ignore.go @@ -0,0 +1,70 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package errorutil + +import ( + dmysql "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + tddl "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/mysql" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" +) + +// IsIgnorableMySQLDDLError is used to check what error can be ignored +// we can get error code from: +// infoschema's error definition: https://github.com/pingcap/tidb/blob/master/infoschema/infoschema.go +// DDL's error definition: https://github.com/pingcap/tidb/blob/master/ddl/ddl.go +// tidb/mysql error code definition: https://github.com/pingcap/tidb/blob/master/mysql/errcode.go +func IsIgnorableMySQLDDLError(err error) bool { + err = errors.Cause(err) + mysqlErr, ok := err.(*dmysql.MySQLError) + if !ok { + return false + } + + errCode := errors.ErrCode(mysqlErr.Number) + switch errCode { + case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseDropExists.Code(), + infoschema.ErrTableExists.Code(), infoschema.ErrTableDropExists.Code(), + infoschema.ErrColumnExists.Code(), infoschema.ErrIndexExists.Code(), + infoschema.ErrKeyNotExists.Code(), tddl.ErrCantDropFieldOrKey.Code(), + mysql.ErrDupKeyName, mysql.ErrSameNamePartition, + mysql.ErrDropPartitionNonExistent, mysql.ErrMultiplePriKey: + return true + default: + return false + } +} + +func IsRetryableEtcdError(err error) bool { + etcdErr := errors.Cause(err) + + switch etcdErr { + // Etcd ResourceExhausted errors, may recover after some time + case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests: + return true + // Etcd Unavailable errors, may be available after some time + // https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167 + // ErrStopped: + // one of the etcd nodes stopped from failure injection + // ErrNotCapable: + // capability check has not been done (in the beginning) + case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout, + v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy: + return true + default: + return false + } +} diff --git a/pkg/errorutil/ignore_test.go b/pkg/errorutil/ignore_test.go new file mode 100644 index 00000000000..825bf7d91b6 --- /dev/null +++ b/pkg/errorutil/ignore_test.go @@ -0,0 +1,66 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package errorutil + +import ( + "errors" + "testing" + + "github.com/go-sql-driver/mysql" + "github.com/pingcap/tidb/infoschema" + tmysql "github.com/pingcap/tidb/parser/mysql" + "github.com/stretchr/testify/require" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" +) + +func newMysqlErr(number uint16, message string) *mysql.MySQLError { + return &mysql.MySQLError{ + Number: number, + Message: message, + } +} + +func TestIgnoreMysqlDDLError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {errors.New("raw error"), false}, + {newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), true}, + {newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database"), true}, + {newMysqlErr(uint16(infoschema.ErrAccessDenied.Code()), "Access denied for user"), false}, + } + + for _, item := range cases { + require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err)) + } +} + +func TestIsRetryableEtcdError(t *testing.T) { + cases := []struct { + err error + ret bool + }{ + {nil, false}, + {v3rpc.ErrCorrupt, false}, + + {v3rpc.ErrGRPCTimeoutDueToConnectionLost, true}, + {v3rpc.ErrTimeoutDueToLeaderFail, true}, + {v3rpc.ErrNoSpace, true}, + } + + for _, item := range cases { + require.Equal(t, item.ret, IsRetryableEtcdError(item.err)) + } +} diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 14568025ee8..5ffd971bcbb 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -21,10 +21,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" - "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" "google.golang.org/grpc/codes" ) @@ -51,6 +52,15 @@ const ( etcdRequestProgressDuration = 1 * time.Second // etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future etcdWatchChBufferSize = 16 + // etcdTxnTimeoutDuration represents the timeout duration for committing a + // transaction to Etcd + etcdTxnTimeoutDuration = 30 * time.Second +) + +var ( + TxnEmptyCmps = []clientv3.Cmp{} + TxnEmptyOpsThen = []clientv3.Op{} + TxnEmptyOpsElse = []clientv3.Op{} ) // set to var instead of const for mocking the value to speedup test @@ -122,13 +132,26 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti } // Txn delegates request to clientv3.KV.Txn -func (c *Client) Txn(ctx context.Context) clientv3.Txn { +func (c *Client) TxnWithoutRetry(ctx context.Context) clientv3.Txn { if metric, ok := c.metrics[EtcdTxn]; ok { metric.Inc() } return c.cli.Txn(ctx) } +// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error, +// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry. +func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) { + txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) + defer cancel() + err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error { + var inErr error + resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit() + return inErr + }) + return +} + // Grant delegates request to clientv3.Lease.Grant func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientv3.LeaseGrantResponse, err error) { err = retryRPC(EtcdGrant, c.metrics[EtcdGrant], func() error { @@ -144,11 +167,17 @@ func isRetryableError(rpcName string) retry.IsRetryable { if !cerrors.IsRetryableError(err) { return false } - if rpcName == EtcdRevoke { - if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound { - // it means the etcd lease is already expired or revoked + + switch rpcName { + case EtcdRevoke: + if etcdErr, ok := err.(v3rpc.EtcdError); ok && etcdErr.Code() == codes.NotFound { + // It means the etcd lease is already expired or revoked return false } + case EtcdTxn: + return errorutil.IsRetryableEtcdError(err) + default: + // For other types of operation, we retry directly without handling errors } return true @@ -190,7 +219,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR }() var lastRevision int64 watchCtx, cancel := context.WithCancel(ctx) - defer cancel() + defer func() { + // Using closures to handle changes to the cancel function + cancel() + }() watchCh := c.cli.Watch(watchCtx, key, opts...) ticker := c.clock.Ticker(etcdRequestProgressDuration) @@ -200,7 +232,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR for { select { case <-ctx.Done(): - cancel() return case response := <-watchCh: lastReceivedResponseTime = c.clock.Now() @@ -214,7 +245,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR for { select { case <-ctx.Done(): - cancel() return case outCh <- response: // it may block here break Loop diff --git a/pkg/etcd/client_test.go b/pkg/etcd/client_test.go index cda7f2fcc4f..72a295affb3 100644 --- a/pkg/etcd/client_test.go +++ b/pkg/etcd/client_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/pkg/util/testleak" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" ) type clientSuite struct { @@ -46,6 +47,10 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3. return nil, errors.New("mock error") } +func (m *mockClient) Txn(ctx context.Context) clientv3.Txn { + return &mockTxn{ctx: ctx} +} + type mockWatcher struct { clientv3.Watcher watchCh chan clientv3.WatchResponse @@ -79,6 +84,32 @@ func (s *clientSuite) TestRetry(c *check.C) { _, err = retrycli.Put(context.TODO(), "", "") c.Assert(err, check.NotNil) c.Assert(errors.Cause(err), check.ErrorMatches, "mock error", check.Commentf("err:%v", err.Error())) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Test Txn case + // case 0: normal + rsp, err := retrycli.Txn(ctx, nil, nil, nil) + c.Assert(err, check.IsNil) + c.Assert(rsp.Succeeded, check.IsFalse) + + // case 1: errors.ErrReachMaxTry + _, err = retrycli.Txn(ctx, TxnEmptyCmps, nil, nil) + c.Assert(err, check.ErrorMatches, ".*CDC:ErrReachMaxTry.*") + + // case 2: errors.ErrReachMaxTry + _, err = retrycli.Txn(ctx, nil, TxnEmptyOpsThen, nil) + c.Assert(err, check.ErrorMatches, ".*CDC:ErrReachMaxTry.*") + + // case 3: context.DeadlineExceeded + _, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, nil) + c.Assert(err, check.Equals, context.DeadlineExceeded) + + // other case: mock error + _, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, TxnEmptyOpsElse) + c.Assert(errors.Cause(err), check.ErrorMatches, "mock error", check.Commentf("err:%v", err.Error())) + maxTries = originValue } @@ -219,3 +250,44 @@ func (s *etcdSuite) TestOutChBlocked(c *check.C) { c.Check(sentRes, check.DeepEquals, receivedRes) } + +type mockTxn struct { + ctx context.Context + mode int +} + +func (txn *mockTxn) If(cs ...clientv3.Cmp) clientv3.Txn { + if cs != nil { + txn.mode += 1 + } + return txn +} + +func (txn *mockTxn) Then(ops ...clientv3.Op) clientv3.Txn { + if ops != nil { + txn.mode += 1 << 1 + } + return txn +} + +func (txn *mockTxn) Else(ops ...clientv3.Op) clientv3.Txn { + if ops != nil { + txn.mode += 1 << 2 + } + return txn +} + +func (txn *mockTxn) Commit() (*clientv3.TxnResponse, error) { + switch txn.mode { + case 0: + return &clientv3.TxnResponse{}, nil + case 1: + return nil, rpctypes.ErrNoSpace + case 2: + return nil, rpctypes.ErrTimeoutDueToLeaderFail + case 3: + return nil, context.DeadlineExceeded + default: + return nil, errors.New("mock error") + } +} diff --git a/pkg/orchestrator/etcd_worker.go b/pkg/orchestrator/etcd_worker.go index edb53c4c49e..c1fd11d50a4 100644 --- a/pkg/orchestrator/etcd_worker.go +++ b/pkg/orchestrator/etcd_worker.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -205,10 +206,10 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, if len(pendingPatches) > 0 { // Here we have some patches yet to be uploaded to Etcd. pendingPatches, err = worker.applyPatchGroups(ctx, pendingPatches) + if isRetryableError(err) { + continue + } if err != nil { - if cerrors.ErrEtcdTryAgain.Equal(errors.Cause(err)) { - continue - } return errors.Trace(err) } } else { @@ -257,6 +258,18 @@ func (worker *EtcdWorker) Run(ctx context.Context, session *concurrency.Session, } } +func isRetryableError(err error) bool { + err = errors.Cause(err) + if cerrors.ErrEtcdTryAgain.Equal(err) || + context.DeadlineExceeded == err { + return true + } + // When encountering an abnormal connection with etcd, the worker will keep retrying + // until the session is done. + _, ok := err.(rpctypes.EtcdError) + return ok +} + func (worker *EtcdWorker) handleEvent(_ context.Context, event *clientv3.Event) { if worker.isDeleteCounterKey(event.Kv.Key) { switch event.Type { @@ -351,7 +364,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m } cmps := make([]clientv3.Cmp, 0, len(changedState)) - ops := make([]clientv3.Op, 0, len(changedState)) + opsThen := make([]clientv3.Op, 0, len(changedState)) hasDelete := false for key, value := range changedState { @@ -373,11 +386,11 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m op = clientv3.OpDelete(key.String()) hasDelete = true } - ops = append(ops, op) + opsThen = append(opsThen, op) } if hasDelete { - ops = append(ops, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1))) + opsThen = append(opsThen, clientv3.OpPut(worker.prefix.String()+deletionCounterKey, fmt.Sprint(worker.deleteCounter+1))) } if worker.deleteCounter > 0 { cmps = append(cmps, clientv3.Compare(clientv3.Value(worker.prefix.String()+deletionCounterKey), "=", fmt.Sprint(worker.deleteCounter))) @@ -389,10 +402,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m worker.metrics.metricEtcdTxnSize.Observe(float64(size)) startTime := time.Now() - - txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration) - resp, err := worker.client.Txn(txnCtx).If(cmps...).Then(ops...).Commit() - cancel() + resp, err := worker.client.Txn(ctx, cmps, opsThen, etcd.TxnEmptyOpsElse) // For testing the situation where we have a progress notification that // has the same revision as the committed Etcd transaction. @@ -411,7 +421,7 @@ func (worker *EtcdWorker) commitChangedState(ctx context.Context, changedState m return errors.Trace(err) } - logEtcdOps(ops, resp.Succeeded) + logEtcdOps(opsThen, resp.Succeeded) if resp.Succeeded { worker.barrierRev = resp.Header.GetRevision() return nil diff --git a/pkg/orchestrator/etcd_worker_test.go b/pkg/orchestrator/etcd_worker_test.go index 0040b9e362a..d51de27bfe3 100644 --- a/pkg/orchestrator/etcd_worker_test.go +++ b/pkg/orchestrator/etcd_worker_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/prometheus/client_golang/prometheus" "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -767,3 +768,11 @@ func (s *etcdWorkerSuite) TestModifyAfterDelete(c *check.C) { _ = cli1.Unwrap().Close() _ = cli2.Unwrap().Close() } + +func (s *etcdWorkerSuite) TestRetryableError(c *check.C) { + defer testleak.AfterTest(c)() + c.Check(isRetryableError(cerrors.ErrEtcdTryAgain), check.IsTrue) + c.Check(isRetryableError(cerrors.ErrReachMaxTry.Wrap(rpctypes.ErrTimeoutDueToLeaderFail)), check.IsTrue) + c.Check(isRetryableError(errors.Trace(context.DeadlineExceeded)), check.IsTrue) + c.Check(isRetryableError(context.Canceled), check.IsFalse) +}