Skip to content

Commit

Permalink
etcd/client(ticdc): add retry operation for etcd transaction api (pin…
Browse files Browse the repository at this point in the history
…gcap#4248) (pingcap#4474)

close pingcap#4248

Signed-off-by: Neil Shen <[email protected]>
  • Loading branch information
CharlesCheung96 authored and overvenus committed Feb 21, 2022
1 parent 3778f0b commit 621295d
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 57 deletions.
18 changes: 2 additions & 16 deletions dm/pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/pkg/errorutil"
)

const (
Expand All @@ -48,22 +49,7 @@ var etcdDefaultTxnRetryParam = retry.Params{
FirstRetryDuration: time.Second,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
switch err {
// Etcd ResourceExhausted errors, may recover after some time
case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests:
return true
// Etcd Unavailable errors, may be available after some time
// https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167
// ErrStopped:
// one of the etcd nodes stopped from failure injection
// ErrNotCapable:
// capability check has not been done (in the beginning)
case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout,
v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy:
return true
default:
return false
}
return errorutil.IsRetryableEtcdError(err)
},
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/errorutil/ignore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/mysql"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

// IsIgnorableMySQLDDLError is used to check what error can be ignored
Expand Down Expand Up @@ -46,3 +47,24 @@ func IsIgnorableMySQLDDLError(err error) bool {
return false
}
}

func IsRetryableEtcdError(err error) bool {
etcdErr := errors.Cause(err)

switch etcdErr {
// Etcd ResourceExhausted errors, may recover after some time
case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests:
return true
// Etcd Unavailable errors, may be available after some time
// https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167
// ErrStopped:
// one of the etcd nodes stopped from failure injection
// ErrNotCapable:
// capability check has not been done (in the beginning)
case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout,
v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy:
return true
default:
return false
}
}
23 changes: 21 additions & 2 deletions pkg/errorutil/ignore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb/infoschema"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

func newMysqlErr(number uint16, message string) *mysql.MySQLError {
Expand All @@ -42,6 +43,24 @@ func TestIgnoreMysqlDDLError(t *testing.T) {
}

for _, item := range cases {
assert.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err))
require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err))
}
}

func TestIsRetryableEtcdError(t *testing.T) {
cases := []struct {
err error
ret bool
}{
{nil, false},
{v3rpc.ErrCorrupt, false},

{v3rpc.ErrGRPCTimeoutDueToConnectionLost, true},
{v3rpc.ErrTimeoutDueToLeaderFail, true},
{v3rpc.ErrNoSpace, true},
}

for _, item := range cases {
require.Equal(t, item.ret, IsRetryableEtcdError(item.err))
}
}
52 changes: 38 additions & 14 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
)
Expand All @@ -51,6 +52,15 @@ const (
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
)

var (
TxnEmptyCmps = []clientv3.Cmp{}
TxnEmptyOpsThen = []clientv3.Op{}
TxnEmptyOpsElse = []clientv3.Op{}
)

// set to var instead of const for mocking the value to speedup test
Expand Down Expand Up @@ -121,12 +131,17 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti
return c.cli.Delete(ctx, key, opts...)
}

// Txn delegates request to clientv3.KV.Txn
func (c *Client) Txn(ctx context.Context) clientv3.Txn {
if metric, ok := c.metrics[EtcdTxn]; ok {
metric.Inc()
}
return c.cli.Txn(ctx)
// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error,
// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
defer cancel()
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
var inErr error
resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit()
return inErr
})
return
}

// Grant delegates request to clientv3.Lease.Grant
Expand All @@ -144,11 +159,17 @@ func isRetryableError(rpcName string) retry.IsRetryable {
if !cerrors.IsRetryableError(err) {
return false
}
if rpcName == EtcdRevoke {
if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// it means the etcd lease is already expired or revoked

switch rpcName {
case EtcdRevoke:
if etcdErr, ok := err.(v3rpc.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// It means the etcd lease is already expired or revoked
return false
}
case EtcdTxn:
return errorutil.IsRetryableEtcdError(err)
default:
// For other types of operation, we retry directly without handling errors
}

return true
Expand Down Expand Up @@ -190,7 +211,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
}()
var lastRevision int64
watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
// Using closures to handle changes to the cancel function
cancel()
}()
watchCh := c.cli.Watch(watchCtx, key, opts...)

ticker := c.clock.Ticker(etcdRequestProgressDuration)
Expand All @@ -200,7 +224,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
Expand All @@ -214,7 +237,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may block here
break Loop
Expand All @@ -238,7 +260,9 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
zap.String("role", role))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision+1))
// to avoid possible context leak warning from govet
_ = cancel
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision))
// we need to reset lastReceivedResponseTime after reset Watch
lastReceivedResponseTime = c.clock.Now()
}
Expand Down
85 changes: 79 additions & 6 deletions pkg/etcd/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ package etcd

import (
"context"
"testing"
"time"

"github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/pkg/util/testleak"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

type clientSuite struct{}
Expand All @@ -44,8 +47,11 @@ func (m *mockClient) Put(ctx context.Context, key, val string, opts ...clientv3.
return nil, errors.New("mock error")
}

func (s *clientSuite) TestRetry(c *check.C) {
defer testleak.AfterTest(c)()
func (m *mockClient) Txn(ctx context.Context) clientv3.Txn {
return &mockTxn{ctx: ctx}
}

func TestRetry(t *testing.T) {
originValue := maxTries
// to speedup the test
maxTries = 2
Expand All @@ -54,12 +60,38 @@ func (s *clientSuite) TestRetry(c *check.C) {
cli.KV = &mockClient{}
retrycli := Wrap(cli, nil)
get, err := retrycli.Get(context.TODO(), "")
c.Assert(err, check.IsNil)
c.Assert(get, check.NotNil)
require.Nil(t, err)
require.NotNil(t, get)

_, err = retrycli.Put(context.TODO(), "", "")
c.Assert(err, check.NotNil)
c.Assert(errors.Cause(err), check.ErrorMatches, "mock error", check.Commentf("err:%v", err.Error()))
require.NotNil(t, err)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Test Txn case
// case 0: normal
rsp, err := retrycli.Txn(ctx, nil, nil, nil)
require.Nil(t, err)
require.False(t, rsp.Succeeded)

// case 1: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, TxnEmptyCmps, nil, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)

// case 2: errors.ErrReachMaxTry
_, err = retrycli.Txn(ctx, nil, TxnEmptyOpsThen, nil)
require.Regexp(t, ".*CDC:ErrReachMaxTry.*", err)

// case 3: context.DeadlineExceeded
_, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, nil)
require.Equal(t, context.DeadlineExceeded, err)

// other case: mock error
_, err = retrycli.Txn(ctx, TxnEmptyCmps, TxnEmptyOpsThen, TxnEmptyOpsElse)
require.Containsf(t, errors.Cause(err).Error(), "mock error", "err:%v", err.Error())

maxTries = originValue
}

Expand Down Expand Up @@ -90,3 +122,44 @@ func (s *etcdSuite) TestDelegateLease(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(ttlResp.TTL, check.Equals, int64(-1))
}

type mockTxn struct {
ctx context.Context
mode int
}

func (txn *mockTxn) If(cs ...clientv3.Cmp) clientv3.Txn {
if cs != nil {
txn.mode += 1
}
return txn
}

func (txn *mockTxn) Then(ops ...clientv3.Op) clientv3.Txn {
if ops != nil {
txn.mode += 1 << 1
}
return txn
}

func (txn *mockTxn) Else(ops ...clientv3.Op) clientv3.Txn {
if ops != nil {
txn.mode += 1 << 2
}
return txn
}

func (txn *mockTxn) Commit() (*clientv3.TxnResponse, error) {
switch txn.mode {
case 0:
return &clientv3.TxnResponse{}, nil
case 1:
return nil, rpctypes.ErrNoSpace
case 2:
return nil, rpctypes.ErrTimeoutDueToLeaderFail
case 3:
return nil, context.DeadlineExceeded
default:
return nil, errors.New("mock error")
}
}
17 changes: 12 additions & 5 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,15 @@ func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.Cha
if err != nil {
return errors.Trace(err)
}
resp, err := c.Client.Txn(ctx).If(

cmps := []clientv3.Cmp{
clientv3.Compare(clientv3.ModRevision(infoKey), "=", 0),
clientv3.Compare(clientv3.ModRevision(jobKey), "=", 0),
).Then(
}
opsThen := []clientv3.Op{
clientv3.OpPut(infoKey, value),
).Commit()
}
resp, err := c.Client.Txn(ctx, cmps, opsThen, TxnEmptyOpsElse)
if err != nil {
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
Expand Down Expand Up @@ -504,10 +507,14 @@ func (c CDCEtcdClient) PutTaskPositionOnChange(
}

key := GetEtcdKeyTaskPosition(changefeedID, captureID)
resp, err := c.Client.Txn(ctx).If(
cmps := []clientv3.Cmp{
clientv3.Compare(clientv3.ModRevision(key), ">", 0),
clientv3.Compare(clientv3.Value(key), "=", data),
).Else(clientv3.OpPut(key, data)).Commit()
}
opsElse := []clientv3.Op{
clientv3.OpPut(key, data),
}
resp, err := c.Client.Txn(ctx, cmps, TxnEmptyOpsThen, opsElse)
if err != nil {
return false, cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}
Expand Down
Loading

0 comments on commit 621295d

Please sign in to comment.