Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4474
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Feb 8, 2022
1 parent d9af323 commit 35435c3
Show file tree
Hide file tree
Showing 8 changed files with 979 additions and 27 deletions.
150 changes: 150 additions & 0 deletions dm/pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// learn from https://github.com/pingcap/pd/blob/v3.0.5/pkg/etcdutil/etcdutil.go.

package etcdutil

import (
"context"
"crypto/tls"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.etcd.io/etcd/clientv3"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"

tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/retry"
"github.com/pingcap/tiflow/pkg/errorutil"
)

const (
// DefaultDialTimeout is the maximum amount of time a dial will wait for a
// connection to setup. 30s is long enough for most of the network conditions.
DefaultDialTimeout = 30 * time.Second

// DefaultRequestTimeout 10s is long enough for most of etcd clusters.
DefaultRequestTimeout = 10 * time.Second

// DefaultRevokeLeaseTimeout is the maximum amount of time waiting for revoke etcd lease.
DefaultRevokeLeaseTimeout = 3 * time.Second
)

var etcdDefaultTxnRetryParam = retry.Params{
RetryCount: 5,
FirstRetryDuration: time.Second,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
return errorutil.IsRetryableEtcdError(err)
},
}

var etcdDefaultTxnStrategy = retry.FiniteRetryStrategy{}

// CreateClient creates an etcd client with some default config items.
func CreateClient(endpoints []string, tlsCfg *tls.Config) (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: DefaultDialTimeout,
TLS: tlsCfg,
})
}

// ListMembers returns a list of internal etcd members.
func ListMembers(client *clientv3.Client) (*clientv3.MemberListResponse, error) {
ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout)
defer cancel()
return client.MemberList(ctx)
}

// AddMember adds an etcd member.
func AddMember(client *clientv3.Client, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout)
defer cancel()
return client.MemberAdd(ctx, peerAddrs)
}

// RemoveMember removes an etcd member by the given id.
func RemoveMember(client *clientv3.Client, id uint64) (*clientv3.MemberRemoveResponse, error) {
ctx, cancel := context.WithTimeout(client.Ctx(), DefaultRequestTimeout)
defer cancel()
return client.MemberRemove(ctx, id)
}

// DoOpsInOneTxnWithRetry do multiple etcd operations in one txn.
// TODO: add unit test to test encountered an retryable error first but then recovered.
func DoOpsInOneTxnWithRetry(cli *clientv3.Client, ops ...clientv3.Op) (*clientv3.TxnResponse, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), DefaultRequestTimeout)
defer cancel()
tctx := tcontext.NewContext(ctx, log.L())
ret, _, err := etcdDefaultTxnStrategy.Apply(tctx, etcdDefaultTxnRetryParam, func(t *tcontext.Context) (ret interface{}, err error) {
resp, err := cli.Txn(ctx).Then(ops...).Commit()
if err != nil {
return nil, errors.Trace(err)
}
return resp, nil
})
if err != nil {
return nil, 0, err
}
resp := ret.(*clientv3.TxnResponse)
return resp, resp.Header.Revision, nil
}

// DoOpsInOneCmpsTxnWithRetry do multiple etcd operations in one txn and with comparisons.
func DoOpsInOneCmpsTxnWithRetry(cli *clientv3.Client, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (*clientv3.TxnResponse, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), DefaultRequestTimeout)
defer cancel()
tctx := tcontext.NewContext(ctx, log.L())

ret, _, err := etcdDefaultTxnStrategy.Apply(tctx, etcdDefaultTxnRetryParam, func(t *tcontext.Context) (ret interface{}, err error) {
failpoint.Inject("ErrNoSpace", func() {
tctx.L().Info("fail to do ops in etcd", zap.String("failpoint", "ErrNoSpace"))
failpoint.Return(nil, v3rpc.ErrNoSpace)
})
resp, err := cli.Txn(ctx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit()
if err != nil {
return nil, err
}
return resp, nil
})
if err != nil {
return nil, 0, err
}
resp := ret.(*clientv3.TxnResponse)
return resp, resp.Header.Revision, nil
}

// IsRetryableError check whether error is retryable error for etcd to build again.
func IsRetryableError(err error) bool {
switch errors.Cause(err) {
case v3rpc.ErrCompacted, v3rpc.ErrNoLeader, v3rpc.ErrNoSpace, context.DeadlineExceeded:
return true
default:
return false
}
}

// IsLimitedRetryableError check whether error is retryable error for etcd to build again in a limited number of times.
func IsLimitedRetryableError(err error) bool {
switch errors.Cause(err) {
case v3rpc.ErrNoSpace, context.DeadlineExceeded:
return true
default:
return false
}
}
70 changes: 70 additions & 0 deletions pkg/errorutil/ignore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package errorutil

import (
dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
tddl "github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/mysql"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

// IsIgnorableMySQLDDLError is used to check what error can be ignored
// we can get error code from:
// infoschema's error definition: https://github.com/pingcap/tidb/blob/master/infoschema/infoschema.go
// DDL's error definition: https://github.com/pingcap/tidb/blob/master/ddl/ddl.go
// tidb/mysql error code definition: https://github.com/pingcap/tidb/blob/master/mysql/errcode.go
func IsIgnorableMySQLDDLError(err error) bool {
err = errors.Cause(err)
mysqlErr, ok := err.(*dmysql.MySQLError)
if !ok {
return false
}

errCode := errors.ErrCode(mysqlErr.Number)
switch errCode {
case infoschema.ErrDatabaseExists.Code(), infoschema.ErrDatabaseDropExists.Code(),
infoschema.ErrTableExists.Code(), infoschema.ErrTableDropExists.Code(),
infoschema.ErrColumnExists.Code(), infoschema.ErrIndexExists.Code(),
infoschema.ErrKeyNotExists.Code(), tddl.ErrCantDropFieldOrKey.Code(),
mysql.ErrDupKeyName, mysql.ErrSameNamePartition,
mysql.ErrDropPartitionNonExistent, mysql.ErrMultiplePriKey:
return true
default:
return false
}
}

func IsRetryableEtcdError(err error) bool {
etcdErr := errors.Cause(err)

switch etcdErr {
// Etcd ResourceExhausted errors, may recover after some time
case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests:
return true
// Etcd Unavailable errors, may be available after some time
// https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167
// ErrStopped:
// one of the etcd nodes stopped from failure injection
// ErrNotCapable:
// capability check has not been done (in the beginning)
case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout,
v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy:
return true
default:
return false
}
}
66 changes: 66 additions & 0 deletions pkg/errorutil/ignore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package errorutil

import (
"errors"
"testing"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/tidb/infoschema"
tmysql "github.com/pingcap/tidb/parser/mysql"
"github.com/stretchr/testify/require"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)

func newMysqlErr(number uint16, message string) *mysql.MySQLError {
return &mysql.MySQLError{
Number: number,
Message: message,
}
}

func TestIgnoreMysqlDDLError(t *testing.T) {
cases := []struct {
err error
ret bool
}{
{errors.New("raw error"), false},
{newMysqlErr(tmysql.ErrDupKeyName, "Error: Duplicate key name 'some_key'"), true},
{newMysqlErr(uint16(infoschema.ErrDatabaseExists.Code()), "Can't create database"), true},
{newMysqlErr(uint16(infoschema.ErrAccessDenied.Code()), "Access denied for user"), false},
}

for _, item := range cases {
require.Equal(t, item.ret, IsIgnorableMySQLDDLError(item.err))
}
}

func TestIsRetryableEtcdError(t *testing.T) {
cases := []struct {
err error
ret bool
}{
{nil, false},
{v3rpc.ErrCorrupt, false},

{v3rpc.ErrGRPCTimeoutDueToConnectionLost, true},
{v3rpc.ErrTimeoutDueToLeaderFail, true},
{v3rpc.ErrNoSpace, true},
}

for _, item := range cases {
require.Equal(t, item.ret, IsRetryableEtcdError(item.err))
}
}
50 changes: 37 additions & 13 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/errorutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/prometheus/client_golang/prometheus"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
)
Expand All @@ -51,6 +52,15 @@ const (
etcdRequestProgressDuration = 1 * time.Second
// etcdWatchChBufferSize is arbitrarily specified, it will be modified in the future
etcdWatchChBufferSize = 16
// etcdTxnTimeoutDuration represents the timeout duration for committing a
// transaction to Etcd
etcdTxnTimeoutDuration = 30 * time.Second
)

var (
TxnEmptyCmps = []clientv3.Cmp{}
TxnEmptyOpsThen = []clientv3.Op{}
TxnEmptyOpsElse = []clientv3.Op{}
)

// set to var instead of const for mocking the value to speedup test
Expand Down Expand Up @@ -121,12 +131,17 @@ func (c *Client) Delete(ctx context.Context, key string, opts ...clientv3.OpOpti
return c.cli.Delete(ctx, key, opts...)
}

// Txn delegates request to clientv3.KV.Txn
func (c *Client) Txn(ctx context.Context) clientv3.Txn {
if metric, ok := c.metrics[EtcdTxn]; ok {
metric.Inc()
}
return c.cli.Txn(ctx)
// Txn delegates request to clientv3.KV.Txn. The error returned can only be a non-retryable error,
// such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
func (c *Client) Txn(ctx context.Context, cmps []clientv3.Cmp, opsThen, opsElse []clientv3.Op) (resp *clientv3.TxnResponse, err error) {
txnCtx, cancel := context.WithTimeout(ctx, etcdTxnTimeoutDuration)
defer cancel()
err = retryRPC(EtcdTxn, c.metrics[EtcdTxn], func() error {
var inErr error
resp, inErr = c.cli.Txn(txnCtx).If(cmps...).Then(opsThen...).Else(opsElse...).Commit()
return inErr
})
return
}

// Grant delegates request to clientv3.Lease.Grant
Expand All @@ -144,11 +159,17 @@ func isRetryableError(rpcName string) retry.IsRetryable {
if !cerrors.IsRetryableError(err) {
return false
}
if rpcName == EtcdRevoke {
if etcdErr, ok := err.(rpctypes.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// it means the etcd lease is already expired or revoked

switch rpcName {
case EtcdRevoke:
if etcdErr, ok := err.(v3rpc.EtcdError); ok && etcdErr.Code() == codes.NotFound {
// It means the etcd lease is already expired or revoked
return false
}
case EtcdTxn:
return errorutil.IsRetryableEtcdError(err)
default:
// For other types of operation, we retry directly without handling errors
}

return true
Expand Down Expand Up @@ -193,7 +214,10 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
lastRevision := getRevisionFromWatchOpts(opts...)

watchCtx, cancel := context.WithCancel(ctx)
defer cancel()
defer func() {
// Using closures to handle changes to the cancel function
cancel()
}()
watchCh := c.cli.Watch(watchCtx, key, opts...)

ticker := c.clock.Ticker(etcdRequestProgressDuration)
Expand All @@ -203,7 +227,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
for {
select {
case <-ctx.Done():
cancel()
return
case response := <-watchCh:
lastReceivedResponseTime = c.clock.Now()
Expand All @@ -217,7 +240,6 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
for {
select {
case <-ctx.Done():
cancel()
return
case outCh <- response: // it may block here
break Loop
Expand All @@ -242,6 +264,8 @@ func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientv3.WatchR
zap.String("role", role))
cancel()
watchCtx, cancel = context.WithCancel(ctx)
// to avoid possible context leak warning from govet
_ = cancel
watchCh = c.cli.Watch(watchCtx, key, clientv3.WithPrefix(), clientv3.WithRev(lastRevision))
// we need to reset lastReceivedResponseTime after reset Watch
lastReceivedResponseTime = c.clock.Now()
Expand Down
Loading

0 comments on commit 35435c3

Please sign in to comment.