Skip to content

Commit

Permalink
br: refactor sendBackup (pingcap#31264)
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored Jan 25, 2022
1 parent 2e39aac commit bb42313
Showing 1 changed file with 72 additions and 69 deletions.
141 changes: 72 additions & 69 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,68 @@ func (bc *Client) handleFineGrained(
return backoffMill, nil
}

func doSendBackup(
ctx context.Context,
client backuppb.BackupClient,
req backuppb.BackupRequest,
respFn func(*backuppb.BackupResponse) error,
) error {
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
}
time.Sleep(3 * time.Second)
})
bCli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
}
})
failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.")
err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3")
}
})
if err != nil {
return err
}
defer func() {
_ = bCli.CloseSend()
}()

for {
resp, err := bCli.Recv()
if err != nil {
if errors.Cause(err) == io.EOF { // nolint:errorlint
logutil.CL(ctx).Debug("backup streaming finish",
logutil.Key("backup-start-key", req.GetStartKey()),
logutil.Key("backup-end-key", req.GetEndKey()))
return nil
}
return err
}
// TODO: handle errors in the resp.
logutil.CL(ctx).Debug("range backed up",
logutil.Key("small-range-start-key", resp.GetStartKey()),
logutil.Key("small-range-end-key", resp.GetEndKey()))
err = respFn(resp)
if err != nil {
return errors.Trace(err)
}
}
}

// SendBackup send backup request to the given store.
// Stop receiving response if respFn returns error.
func SendBackup(
Expand All @@ -908,40 +970,15 @@ func SendBackup(
}

var errReset error
backupLoop:
var errBackup error

for retry := 0; retry < backupRetryTimes; retry++ {
logutil.CL(ctx).Info("try backup",
zap.Int("retry time", retry),
)
failpoint.Inject("hint-backup-start", func(v failpoint.Value) {
logutil.CL(ctx).Info("failpoint hint-backup-start injected, " +
"process will notify the shell.")
if sigFile, ok := v.(string); ok {
file, err := os.Create(sigFile)
if err != nil {
log.Warn("failed to create file for notifying, skipping notify", zap.Error(err))
}
if file != nil {
file.Close()
}
}
time.Sleep(3 * time.Second)
})
bcli, err := client.Backup(ctx, &req)
failpoint.Inject("reset-retryable-error", func(val failpoint.Value) {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint reset-retryable-error injected.")
err = status.Error(codes.Unavailable, "Unavailable error")
}
})
failpoint.Inject("reset-not-retryable-error", func(val failpoint.Value) {
if val.(bool) {
logutil.CL(ctx).Debug("failpoint reset-not-retryable-error injected.")
err = status.Error(codes.Unknown, "Your server was haunted hence doesn't work, meow :3")
}
})
if err != nil {
if isRetryableError(err) {
errBackup = doSendBackup(ctx, client, req, respFn)
if errBackup != nil {
if isRetryableError(errBackup) {
time.Sleep(3 * time.Second)
client, errReset = resetFn()
if errReset != nil {
Expand All @@ -950,45 +987,11 @@ backupLoop:
}
continue
}
logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID),
zap.Int("retry time", retry))
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to create backup stream to store %d", storeID)
}

for {
resp, err := bcli.Recv()
if err != nil {
if errors.Cause(err) == io.EOF { // nolint:errorlint
logutil.CL(ctx).Info("backup streaming finish",
zap.Int("retry-time", retry))
_ = bcli.CloseSend()
break backupLoop
}
if isRetryableError(err) {
time.Sleep(3 * time.Second)
// current tikv is unavailable
client, errReset = resetFn()
if errReset != nil {
_ = bcli.CloseSend()
return errors.Annotatef(errReset, "failed to reset recv connection on store:%d "+
"please check the tikv status", storeID)
}
_ = bcli.CloseSend()
break
}
_ = bcli.CloseSend()
return berrors.ErrFailedToConnect.Wrap(err).GenWithStack("failed to connect to store: %d with retry times:%d", storeID, retry)
}

// TODO: handle errors in the resp.
logutil.CL(ctx).Info("range backed up",
logutil.Key("small-range-start-key", resp.GetStartKey()),
logutil.Key("small-range-end-key", resp.GetEndKey()))
err = respFn(resp)
if err != nil {
_ = bcli.CloseSend()
return errors.Trace(err)
}
logutil.CL(ctx).Error("fail to backup", zap.Uint64("StoreID", storeID), zap.Int("retry", retry))
return berrors.ErrFailedToConnect.Wrap(errBackup).GenWithStack("failed to create backup stream to store %d", storeID)
} else {
// finish backup
break
}
}
return nil
Expand Down

0 comments on commit bb42313

Please sign in to comment.