From bb423133557f9aed2d4f0822a6d8e53e83b5fe31 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 25 Jan 2022 18:22:11 +0800 Subject: [PATCH] br: refactor sendBackup (#31264) ref pingcap/tidb#31374 --- br/pkg/backup/client.go | 141 ++++++++++++++++++++-------------------- 1 file changed, 72 insertions(+), 69 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index c8cf668c04010..2b6e2c6877040 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -887,6 +887,68 @@ func (bc *Client) handleFineGrained( return backoffMill, nil } +func doSendBackup( + ctx context.Context, + client backuppb.BackupClient, + req backuppb.BackupRequest, + respFn func(*backuppb.BackupResponse) error, +) error { + failpoint.Inject("hint-backup-start", func(v failpoint.Value) { + logutil.CL(ctx).Info("failpoint hint-backup-start injected, " + + "process will notify the shell.") + if sigFile, ok := v.(string); ok { + file, err := os.Create(sigFile) + if err != nil { + log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) + } + if file != nil { + file.Close() + } + } + time.Sleep(3 * time.Second) + }) + bCli, err := client.Backup(ctx, &req) + failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { + if val.(bool) { + logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.") + err = status.Error(codes.Unavailable, "Unavailable error") + } + }) + failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) { + if val.(bool) { + logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.") + err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3") + } + }) + if err != nil { + return err + } + defer func() { + _ = bCli.CloseSend() + }() + + for { + resp, err := bCli.Recv() + if err != nil { + if errors.Cause(err) == io.EOF { // nolint:errorlint + logutil.CL(ctx).Debug("backup streaming finish", + logutil.Key("backup-start-key", req.GetStartKey()), + logutil.Key("backup-end-key", req.GetEndKey())) + return nil + } + return err + } + // TODO: handle errors in the resp. + logutil.CL(ctx).Debug("range backed up", + logutil.Key("small-range-start-key", resp.GetStartKey()), + logutil.Key("small-range-end-key", resp.GetEndKey())) + err = respFn(resp) + if err != nil { + return errors.Trace(err) + } + } +} + // SendBackup send backup request to the given store. // Stop receiving response if respFn returns error. func SendBackup( @@ -908,40 +970,15 @@ func SendBackup( } var errReset error -backupLoop: + var errBackup error + for retry := 0; retry < backupRetryTimes; retry++ { logutil.CL(ctx).Info("try backup", zap.Int("retry time", retry), ) - failpoint.Inject("hint-backup-start", func(v failpoint.Value) { - logutil.CL(ctx).Info("failpoint hint-backup-start injected, " + - "process will notify the shell.") - if sigFile, ok := v.(string); ok { - file, err := os.Create(sigFile) - if err != nil { - log.Warn("failed to create file for notifying, skipping notify", zap.Error(err)) - } - if file != nil { - file.Close() - } - } - time.Sleep(3 * time.Second) - }) - bcli, err := client.Backup(ctx, &req) - failpoint.Inject("reset-retryable-error", func(val failpoint.Value) { - if val.(bool) { - logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.") - err = status.Error(codes.Unavailable, "Unavailable error") - } - }) - failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) { - if val.(bool) { - logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.") - err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3") - } - }) - if err != nil { - if isRetryableError(err) { + errBackup = doSendBackup(ctx, client, req, respFn) + if errBackup != nil { + if isRetryableError(errBackup) { time.Sleep(3 * time.Second) client, errReset = resetFn() if errReset != nil { @@ -950,45 +987,11 @@ backupLoop: } continue } - logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), - zap.Int("retry time", retry)) - return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID) - } - - for { - resp, err := bcli.Recv() - if err != nil { - if errors.Cause(err) == io.EOF { // nolint:errorlint - logutil.CL(ctx).Info("backup streaming finish", - zap.Int("retry-time", retry)) - _ = bcli.CloseSend() - break backupLoop - } - if isRetryableError(err) { - time.Sleep(3 * time.Second) - // current tikv is unavailable - client, errReset = resetFn() - if errReset != nil { - _ = bcli.CloseSend() - return errors.Annotatef(errReset, "failed to reset recv connection on store:%d "+ - "please check the tikv status", storeID) - } - _ = bcli.CloseSend() - break - } - _ = bcli.CloseSend() - return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to connect to store: %d with retry times:%d", storeID, retry) - } - - // TODO: handle errors in the resp. - logutil.CL(ctx).Info("range backed up", - logutil.Key("small-range-start-key", resp.GetStartKey()), - logutil.Key("small-range-end-key", resp.GetEndKey())) - err = respFn(resp) - if err != nil { - _ = bcli.CloseSend() - return errors.Trace(err) - } + logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry", retry)) + return berrors.ErrFailedToConnect.Wrap(errBackup).GenWithStack("failed to create backup stream to store %d", storeID) + } else { + // finish backup + break } } return nil