Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: refactor error handle mechanism to tolerant unexpect kv errors. #48646

Merged
merged 16 commits into from
Dec 8, 2023
50 changes: 12 additions & 38 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,17 +1185,18 @@ func OnBackupResponse(
backupTS uint64,
lockResolver *txnlock.LockResolver,
resp *backuppb.BackupResponse,
errContext *utils.ErrorContext,
) (*backuppb.BackupResponse, int, error) {
log.Debug("OnBackupResponse", zap.Reflect("resp", resp))
if resp.Error == nil {
return resp, 0, nil
}
backoffMs := 0
switch v := resp.Error.Detail.(type) {

err := resp.Error
switch v := err.Detail.(type) {
case *backuppb.Error_KvError:
if lockErr := v.KvError.Locked; lockErr != nil {
// Try to resolve lock.
log.Warn("backup occur kv error", zap.Reflect("error", v))
msBeforeExpired, err1 := lockResolver.ResolveLocks(
bo, backupTS, []*txnlock.Lock{txnlock.NewLock(lockErr)})
if err1 != nil {
Expand All @@ -1206,44 +1207,16 @@ func OnBackupResponse(
}
return nil, backoffMs, nil
}
// Backup should not meet error other than KeyLocked.
log.Error("unexpect kv error", zap.Reflect("KvError", v.KvError))
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v)

case *backuppb.Error_RegionError:
regionErr := v.RegionError
// Ignore following errors.
if !(regionErr.EpochNotMatch != nil ||
regionErr.NotLeader != nil ||
regionErr.RegionNotFound != nil ||
regionErr.ServerIsBusy != nil ||
regionErr.StaleCommand != nil ||
regionErr.StoreNotMatch != nil ||
regionErr.ReadIndexNotReady != nil ||
regionErr.ProposalInMergingMode != nil) {
log.Error("unexpect region error", zap.Reflect("RegionError", regionErr))
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v)
}
log.Warn("backup occur region error",
zap.Reflect("RegionError", regionErr),
zap.Uint64("storeID", storeID))
// TODO: a better backoff.
backoffMs = 1000 /* 1s */
return nil, backoffMs, nil
case *backuppb.Error_ClusterIdError:
log.Error("backup occur cluster ID error", zap.Reflect("error", v), zap.Uint64("storeID", storeID))
return nil, 0, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v on storeID: %d", resp.Error, storeID)
default:
// UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error.
if utils.MessageIsRetryableStorageError(resp.GetError().GetMsg()) {
log.Warn("backup occur storage error", zap.String("error", resp.GetError().GetMsg()))
// back off 3000ms, for S3 is 99.99% available (i.e. the max outage time would less than 52.56mins per year),
// this time would be probably enough for s3 to resume.
res := errContext.HandleError(resp.Error, storeID)
switch res.Strategy {
case utils.GiveUpStrategy:
return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %s", storeID, res.Reason)
case utils.RetryStrategy:
return nil, 3000, nil
}
log.Error("backup occur unknown error", zap.String("error", resp.Error.GetMsg()), zap.Uint64("storeID", storeID))
return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "%v on storeID: %d", resp.Error, storeID)
}
return nil, 3000, errors.Annotatef(berrors.ErrKVUnknown, "unreachable")
}

func (bc *Client) handleFineGrained(
Expand Down Expand Up @@ -1273,12 +1246,13 @@ func (bc *Client) handleFineGrained(
}
hasProgress := false
backoffMill := 0
errContext := utils.NewErrorContext("handleFineGrainedBackup", 10)
err = SendBackup(
ctx, storeID, client, req,
// Handle responses with the same backoffer.
func(resp *backuppb.BackupResponse) error {
response, shouldBackoff, err1 :=
OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp)
OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp, errContext)
if err1 != nil {
return err1
}
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -230,20 +231,20 @@ func TestOnBackupRegionErrorResponse(t *testing.T) {
}

cases := []Case{
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{KeyNotInRegion: &errorpb.KeyNotInRegion{}}), exceptedBackoffMs: 0, exceptedErr: true},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RaftEntryTooLarge: &errorpb.RaftEntryTooLarge{}}), exceptedBackoffMs: 0, exceptedErr: true},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 3000, exceptedErr: false},
}
for _, cs := range cases {
t.Log(cs)
_, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp)
_, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp, utils.NewErrorContext("test", 1))
require.Equal(t, cs.exceptedBackoffMs, backoffMs)
if cs.exceptedErr {
require.Error(t, err)
Expand Down
39 changes: 9 additions & 30 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package backup

import (
"context"
"fmt"
"sync"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -73,6 +72,7 @@ func (push *pushDown) pushBackup(
})

wg := new(sync.WaitGroup)
errContext := utils.NewErrorContext("pushBackup", 10)
for _, s := range stores {
store := s
storeID := s.GetId()
Expand Down Expand Up @@ -183,35 +183,10 @@ func (push *pushDown) pushBackup(
progressCallBack(RegionUnit)
} else {
errPb := resp.GetError()
switch v := errPb.Detail.(type) {
case *backuppb.Error_KvError:
logutil.CL(ctx).Warn("backup occur kv error", zap.Reflect("error", v))

case *backuppb.Error_RegionError:
logutil.CL(ctx).Warn("backup occur region error", zap.Reflect("error", v))

case *backuppb.Error_ClusterIdError:
logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v))
return errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb)
default:
if utils.MessageIsRetryableStorageError(errPb.GetMsg()) {
logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
continue
}
var errMsg string
if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) {
errMsg = fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+
"work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.",
store.GetId(), redact.String(store.GetAddress()))
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg))
}
if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) {
errMsg = fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+
"work around:please ensure tikv has permission to read from & write to the storage.",
store.GetId(), redact.String(store.GetAddress()))
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg))
}

res := errContext.HandleIgnorableError(errPb, store.GetId())
switch res.Strategy {
case utils.GiveUpStrategy:
errMsg := res.Reason
if len(errMsg) <= 0 {
errMsg = errPb.Msg
}
Expand All @@ -220,6 +195,10 @@ func (push *pushDown) pushBackup(
redact.String(store.GetAddress()),
errMsg,
)
default:
// other type just continue for next response
// and finally handle the range in fineGrainedBackup
continue
}
}
case err := <-push.errCh:
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
ErrBackupNoLeader = errors.Normalize("backup no leader", errors.RFCCodeText("BR:Backup:ErrBackupNoLeader"))
ErrBackupGCSafepointExceeded = errors.Normalize("backup GC safepoint exceeded", errors.RFCCodeText("BR:Backup:ErrBackupGCSafepointExceeded"))
ErrBackupKeyIsLocked = errors.Normalize("backup key is locked", errors.RFCCodeText("BR:Backup:ErrBackupKeyIsLocked"))
ErrBackupRegion = errors.Normalize("backup region error", errors.RFCCodeText("BR:Backup:ErrBackupRegion"))

ErrRestoreModeMismatch = errors.Normalize("restore mode mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreModeMismatch"))
ErrRestoreRangeMismatch = errors.Normalize("restore range mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreRangeMismatch"))
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 34,
shard_count = 37,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand All @@ -114,6 +114,7 @@ go_test(
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_pingcap_kvproto//pkg/import_sstpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_stretchr_testify//require",
Expand Down
20 changes: 13 additions & 7 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,34 @@ type importerBackoffer struct {
attempt int
delayTime time.Duration
maxDelayTime time.Duration
errContext *ErrorContext
}

// NewBackoffer creates a new controller regulating a truncated exponential backoff.
func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration) Backoffer {
func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext) Backoffer {
return &importerBackoffer{
attempt: attempt,
delayTime: delayTime,
maxDelayTime: maxDelayTime,
errContext: errContext,
}
}

func NewImportSSTBackoffer() Backoffer {
return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval)
errContext := NewErrorContext("import sst", 3)
return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval, errContext)
}

func NewDownloadSSTBackoffer() Backoffer {
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval)
errContext := NewErrorContext("download sst", 3)
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext)
}

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err))
if MessageIsRetryableStorageError(err.Error()) {
// we don't care storeID here.
res := bo.errContext.HandleErrorMsg(err.Error(), 0)
if res.Strategy == RetryStrategy {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
} else {
Expand All @@ -151,7 +157,7 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case berrors.ErrKVRangeIsEmpty, berrors.ErrKVRewriteRuleNotFound:
// Excepted error, finish the operation
// Expected error, finish the operation
bo.delayTime = 0
bo.attempt = 0
default:
Expand All @@ -160,10 +166,10 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
default:
// Unexcepted error
// Unexpected error
bo.delayTime = 0
bo.attempt = 0
log.Warn("unexcepted error, stop to retry", zap.Error(err))
log.Warn("unexpected error, stop retrying", zap.Error(err))
}
}
}
Expand Down
26 changes: 22 additions & 4 deletions br/pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/stretchr/testify/require"
Expand All @@ -18,7 +19,7 @@ import (

func TestBackoffWithSuccess(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
Expand All @@ -35,9 +36,26 @@ func TestBackoffWithSuccess(t *testing.T) {
require.NoError(t, err)
}

func TestBackoffWithUnknowneErrorSuccess(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
case 0:
return errors.New("unknown error: not in the allow list")
case 1:
return berrors.ErrKVEpochNotMatch
}
return nil
}, backoffer)
require.Equal(t, 3, counter)
require.NoError(t, err)
}

func TestBackoffWithFatalError(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
gRPCError := status.Error(codes.Unavailable, "transport is closing")
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
Expand Down Expand Up @@ -65,7 +83,7 @@ func TestBackoffWithFatalError(t *testing.T) {
func TestBackoffWithFatalRawGRPCError(t *testing.T) {
var counter int
canceledError := status.Error(codes.Canceled, "context canceled")
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
return canceledError // nolint:wrapcheck
Expand All @@ -76,7 +94,7 @@ func TestBackoffWithFatalRawGRPCError(t *testing.T) {

func TestBackoffWithRetryableError(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
return berrors.ErrKVEpochNotMatch
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/utils/permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ var (
permissionDeniedMsg = "permissiondenied"
)

// MessageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error
func MessageIsNotFoundStorageError(msg string) bool {
// messageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error
func messageIsNotFoundStorageError(msg string) bool {
msgLower := strings.ToLower(msg)
return strings.Contains(msgLower, "io") && strings.Contains(msgLower, ioNotFoundMsg)
}

// MessageIsPermissionDeniedStorageError checks whether the message returning from TiKV is "PermissionDenied" storage I/O error
func MessageIsPermissionDeniedStorageError(msg string) bool {
func messageIsPermissionDeniedStorageError(msg string) bool {
msgLower := strings.ToLower(msg)
return strings.Contains(msgLower, permissionDeniedMsg)
}
Loading
Loading