Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: refactor error handle mechanism to tolerant unexpect kv errors. (#48646) #49274

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 12 additions & 38 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,17 +1110,18 @@
backupTS uint64,
lockResolver *txnlock.LockResolver,
resp *backuppb.BackupResponse,
errContext *utils.ErrorContext,
) (*backuppb.BackupResponse, int, error) {
log.Debug("OnBackupResponse", zap.Reflect("resp", resp))
if resp.Error == nil {
return resp, 0, nil
}
backoffMs := 0
switch v := resp.Error.Detail.(type) {

err := resp.Error
switch v := err.Detail.(type) {
case *backuppb.Error_KvError:
if lockErr := v.KvError.Locked; lockErr != nil {
// Try to resolve lock.
log.Warn("backup occur kv error", zap.Reflect("error", v))
msBeforeExpired, err1 := lockResolver.ResolveLocks(
bo, backupTS, []*txnlock.Lock{txnlock.NewLock(lockErr)})
if err1 != nil {
Expand All @@ -1131,44 +1132,16 @@
}
return nil, backoffMs, nil
}
// Backup should not meet error other than KeyLocked.
log.Error("unexpect kv error", zap.Reflect("KvError", v.KvError))
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v)

case *backuppb.Error_RegionError:
regionErr := v.RegionError
// Ignore following errors.
if !(regionErr.EpochNotMatch != nil ||
regionErr.NotLeader != nil ||
regionErr.RegionNotFound != nil ||
regionErr.ServerIsBusy != nil ||
regionErr.StaleCommand != nil ||
regionErr.StoreNotMatch != nil ||
regionErr.ReadIndexNotReady != nil ||
regionErr.ProposalInMergingMode != nil) {
log.Error("unexpect region error", zap.Reflect("RegionError", regionErr))
return nil, backoffMs, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %v", storeID, v)
}
log.Warn("backup occur region error",
zap.Reflect("RegionError", regionErr),
zap.Uint64("storeID", storeID))
// TODO: a better backoff.
backoffMs = 1000 /* 1s */
return nil, backoffMs, nil
case *backuppb.Error_ClusterIdError:
log.Error("backup occur cluster ID error", zap.Reflect("error", v), zap.Uint64("storeID", storeID))
return nil, 0, errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v on storeID: %d", resp.Error, storeID)
default:
// UNSAFE! TODO: use meaningful error code instead of unstructured message to find failed to write error.
if utils.MessageIsRetryableStorageError(resp.GetError().GetMsg()) {
log.Warn("backup occur storage error", zap.String("error", resp.GetError().GetMsg()))
// back off 3000ms, for S3 is 99.99% available (i.e. the max outage time would less than 52.56mins per year),
// this time would be probably enough for s3 to resume.
res := errContext.HandleError(resp.Error, storeID)
switch res.Strategy {
case utils.GiveUpStrategy:
return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "storeID: %d OnBackupResponse error %s", storeID, res.Reason)
case utils.RetryStrategy:
return nil, 3000, nil
}
log.Error("backup occur unknown error", zap.String("error", resp.Error.GetMsg()), zap.Uint64("storeID", storeID))
return nil, 0, errors.Annotatef(berrors.ErrKVUnknown, "%v on storeID: %d", resp.Error, storeID)
}
return nil, 3000, errors.Annotatef(berrors.ErrKVUnknown, "unreachable")

Check warning on line 1144 in br/pkg/backup/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/client.go#L1144

Added line #L1144 was not covered by tests
}

func (bc *Client) handleFineGrained(
Expand Down Expand Up @@ -1197,12 +1170,13 @@
}
hasProgress := false
backoffMill := 0
errContext := utils.NewErrorContext("handleFineGrainedBackup", 10)

Check warning on line 1173 in br/pkg/backup/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/client.go#L1173

Added line #L1173 was not covered by tests
err = SendBackup(
ctx, storeID, client, req,
// Handle responses with the same backoffer.
func(resp *backuppb.BackupResponse) error {
response, shouldBackoff, err1 :=
OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp)
OnBackupResponse(storeID, bo, req.EndVersion, lockResolver, resp, errContext)

Check warning on line 1179 in br/pkg/backup/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/client.go#L1179

Added line #L1179 was not covered by tests
if err1 != nil {
return err1
}
Expand Down
19 changes: 10 additions & 9 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -230,20 +231,20 @@ func TestOnBackupRegionErrorResponse(t *testing.T) {
}

cases := []Case{
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{NotLeader: &errorpb.NotLeader{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{KeyNotInRegion: &errorpb.KeyNotInRegion{}}), exceptedBackoffMs: 0, exceptedErr: true},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StaleCommand: &errorpb.StaleCommand{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{StoreNotMatch: &errorpb.StoreNotMatch{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{RaftEntryTooLarge: &errorpb.RaftEntryTooLarge{}}), exceptedBackoffMs: 0, exceptedErr: true},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 1000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ReadIndexNotReady: &errorpb.ReadIndexNotReady{}}), exceptedBackoffMs: 3000, exceptedErr: false},
{storeID: 1, backupTS: 421123291611137, resp: newBackupRegionErrorResp(&errorpb.Error{ProposalInMergingMode: &errorpb.ProposalInMergingMode{}}), exceptedBackoffMs: 3000, exceptedErr: false},
}
for _, cs := range cases {
t.Log(cs)
_, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp)
_, backoffMs, err := backup.OnBackupResponse(cs.storeID, cs.bo, cs.backupTS, cs.lockResolver, cs.resp, utils.NewErrorContext("test", 1))
require.Equal(t, cs.exceptedBackoffMs, backoffMs)
if cs.exceptedErr {
require.Error(t, err)
Expand Down
39 changes: 9 additions & 30 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import (
"context"
"fmt"
"sync"

"github.com/opentracing/opentracing-go"
Expand Down Expand Up @@ -73,6 +72,7 @@
})

wg := new(sync.WaitGroup)
errContext := utils.NewErrorContext("pushBackup", 10)

Check warning on line 75 in br/pkg/backup/push.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/push.go#L75

Added line #L75 was not covered by tests
for _, s := range stores {
store := s
storeID := s.GetId()
Expand Down Expand Up @@ -182,35 +182,10 @@
progressCallBack(RegionUnit)
} else {
errPb := resp.GetError()
switch v := errPb.Detail.(type) {
case *backuppb.Error_KvError:
logutil.CL(ctx).Warn("backup occur kv error", zap.Reflect("error", v))

case *backuppb.Error_RegionError:
logutil.CL(ctx).Warn("backup occur region error", zap.Reflect("error", v))

case *backuppb.Error_ClusterIdError:
logutil.CL(ctx).Error("backup occur cluster ID error", zap.Reflect("error", v))
return errors.Annotatef(berrors.ErrKVClusterIDMismatch, "%v", errPb)
default:
if utils.MessageIsRetryableStorageError(errPb.GetMsg()) {
logutil.CL(ctx).Warn("backup occur storage error", zap.String("error", errPb.GetMsg()))
continue
}
var errMsg string
if utils.MessageIsNotFoundStorageError(errPb.GetMsg()) {
errMsg = fmt.Sprintf("File or directory not found on TiKV Node (store id: %v; Address: %s). "+
"work around:please ensure br and tikv nodes share a same storage and the user of br and tikv has same uid.",
store.GetId(), redact.String(store.GetAddress()))
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg))
}
if utils.MessageIsPermissionDeniedStorageError(errPb.GetMsg()) {
errMsg = fmt.Sprintf("I/O permission denied error occurs on TiKV Node(store id: %v; Address: %s). "+
"work around:please ensure tikv has permission to read from & write to the storage.",
store.GetId(), redact.String(store.GetAddress()))
logutil.CL(ctx).Error("", zap.String("error", berrors.ErrKVStorage.Error()+": "+errMsg))
}

res := errContext.HandleIgnorableError(errPb, store.GetId())
switch res.Strategy {
case utils.GiveUpStrategy:
errMsg := res.Reason

Check warning on line 188 in br/pkg/backup/push.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/push.go#L185-L188

Added lines #L185 - L188 were not covered by tests
if len(errMsg) <= 0 {
errMsg = errPb.Msg
}
Expand All @@ -219,6 +194,10 @@
redact.String(store.GetAddress()),
errMsg,
)
default:
// other type just continue for next response
// and finally handle the range in fineGrainedBackup
continue

Check warning on line 200 in br/pkg/backup/push.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/backup/push.go#L197-L200

Added lines #L197 - L200 were not covered by tests
}
}
case err := <-push.errCh:
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
ErrBackupInvalidRange = errors.Normalize("backup range invalid", errors.RFCCodeText("BR:Backup:ErrBackupInvalidRange"))
ErrBackupNoLeader = errors.Normalize("backup no leader", errors.RFCCodeText("BR:Backup:ErrBackupNoLeader"))
ErrBackupGCSafepointExceeded = errors.Normalize("backup GC safepoint exceeded", errors.RFCCodeText("BR:Backup:ErrBackupGCSafepointExceeded"))
ErrBackupKeyIsLocked = errors.Normalize("backup key is locked", errors.RFCCodeText("BR:Backup:ErrBackupKeyIsLocked"))
ErrBackupRegion = errors.Normalize("backup region error", errors.RFCCodeText("BR:Backup:ErrBackupRegion"))

ErrRestoreModeMismatch = errors.Normalize("restore mode mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreModeMismatch"))
ErrRestoreRangeMismatch = errors.Normalize("restore range mismatch", errors.RFCCodeText("BR:Restore:ErrRestoreRangeMismatch"))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ go_test(
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_pingcap_kvproto//pkg/encryptionpb",
"@com_github_pingcap_kvproto//pkg/errorpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
Expand Down
20 changes: 13 additions & 7 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,34 @@ type importerBackoffer struct {
attempt int
delayTime time.Duration
maxDelayTime time.Duration
errContext *ErrorContext
}

// NewBackoffer creates a new controller regulating a truncated exponential backoff.
func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration) Backoffer {
func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext) Backoffer {
return &importerBackoffer{
attempt: attempt,
delayTime: delayTime,
maxDelayTime: maxDelayTime,
errContext: errContext,
}
}

func NewImportSSTBackoffer() Backoffer {
return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval)
errContext := NewErrorContext("import sst", 3)
return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval, errContext)
}

func NewDownloadSSTBackoffer() Backoffer {
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval)
errContext := NewErrorContext("download sst", 3)
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval, errContext)
}

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
log.Warn("retry to import ssts", zap.Int("attempt", bo.attempt), zap.Error(err))
if MessageIsRetryableStorageError(err.Error()) {
// we don't care storeID here.
res := bo.errContext.HandleErrorMsg(err.Error(), 0)
if res.Strategy == RetryStrategy {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
} else {
Expand All @@ -142,7 +148,7 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
case berrors.ErrKVRangeIsEmpty, berrors.ErrKVRewriteRuleNotFound:
// Excepted error, finish the operation
// Expected error, finish the operation
bo.delayTime = 0
bo.attempt = 0
default:
Expand All @@ -151,10 +157,10 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
default:
// Unexcepted error
// Unexpected error
bo.delayTime = 0
bo.attempt = 0
log.Warn("unexcepted error, stop to retry", zap.Error(err))
log.Warn("unexpected error, stop retrying", zap.Error(err))
}
}
}
Expand Down
26 changes: 22 additions & 4 deletions br/pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/stretchr/testify/require"
Expand All @@ -18,7 +19,7 @@ import (

func TestBackoffWithSuccess(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
Expand All @@ -35,9 +36,26 @@ func TestBackoffWithSuccess(t *testing.T) {
require.NoError(t, err)
}

func TestBackoffWithUnknowneErrorSuccess(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
case 0:
return errors.New("unknown error: not in the allow list")
case 1:
return berrors.ErrKVEpochNotMatch
}
return nil
}, backoffer)
require.Equal(t, 3, counter)
require.NoError(t, err)
}

func TestBackoffWithFatalError(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
gRPCError := status.Error(codes.Unavailable, "transport is closing")
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
Expand Down Expand Up @@ -65,7 +83,7 @@ func TestBackoffWithFatalError(t *testing.T) {
func TestBackoffWithFatalRawGRPCError(t *testing.T) {
var counter int
canceledError := status.Error(codes.Canceled, "context canceled")
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
return canceledError // nolint:wrapcheck
Expand All @@ -76,7 +94,7 @@ func TestBackoffWithFatalRawGRPCError(t *testing.T) {

func TestBackoffWithRetryableError(t *testing.T) {
var counter int
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond, utils.NewDefaultContext())
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
return berrors.ErrKVEpochNotMatch
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/utils/permission.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ var (
permissionDeniedMsg = "permissiondenied"
)

// MessageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error
func MessageIsNotFoundStorageError(msg string) bool {
// messageIsNotFoundStorageError checks whether the message returning from TiKV is "NotFound" storage I/O error
func messageIsNotFoundStorageError(msg string) bool {
msgLower := strings.ToLower(msg)
return strings.Contains(msgLower, "io") && strings.Contains(msgLower, ioNotFoundMsg)
}

// MessageIsPermissionDeniedStorageError checks whether the message returning from TiKV is "PermissionDenied" storage I/O error
func MessageIsPermissionDeniedStorageError(msg string) bool {
func messageIsPermissionDeniedStorageError(msg string) bool {
msgLower := strings.ToLower(msg)
return strings.Contains(msgLower, permissionDeniedMsg)
}
Loading
Loading