Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add errch buf for checkpoint #40166

Merged
merged 12 commits into from
Jan 4, 2023
15 changes: 10 additions & 5 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalS

appendCh: make(chan *CheckpointMessage),
metaCh: make(chan map[string]*RangeGroups),
errCh: make(chan error),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the count 1 of buffer length enough to deal this issue, if the multiple error is sent to errCh ?
Can we close the errCh before receiver is closed ?

Copy link
Contributor Author

@Leavrth Leavrth Jan 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error incurs only from the flush stage, which is wrapped in a goroutine. Once an error incurs, this goroutine sends back the error and exits, so no error will occur again.

errCh: make(chan error, 1),
}

runner.startCheckpointLoop(ctx, tick)
Expand All @@ -258,7 +258,7 @@ func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage,

appendCh: make(chan *CheckpointMessage),
metaCh: make(chan map[string]*RangeGroups),
errCh: make(chan error),
errCh: make(chan error, 1),
}

runner.startCheckpointLoop(ctx, tickDuration)
Expand Down Expand Up @@ -344,6 +344,11 @@ func (r *CheckpointRunner) startCheckpointRunner(ctx context.Context, wg *sync.W
return errCh
}

func (r *CheckpointRunner) sendError(err error) {
log.Error("stop checkpoint runner due to error", zap.Error(err))
r.errCh <- err
Leavrth marked this conversation as resolved.
Show resolved Hide resolved
}

func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration time.Duration) {
r.wg.Add(1)
checkpointLoop := func(ctx context.Context) {
Expand All @@ -360,14 +365,14 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration
return
case <-ticker.C:
if err := r.flushMeta(ctx, errCh); err != nil {
r.errCh <- err
r.sendError(err)
return
}
case msg, ok := <-r.appendCh:
if !ok {
log.Info("stop checkpoint runner")
if err := r.flushMeta(ctx, errCh); err != nil {
r.errCh <- err
r.sendError(err)
}
// close the channel to flush worker
// and wait it to consumes all the metas
Expand All @@ -386,7 +391,7 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration
groups.Groups = append(groups.Groups, msg.Group)
case err := <-errCh:
// pass flush worker's error back
r.errCh <- err
r.sendError(err)
return
}
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
defer func() {
// don't reset the gc-safe-point if checkpoint mode is used and backup is not finished
if cfg.UseCheckpoint && !gcSafePointKeeperRemovable {
log.Info("skip removing gc-safepoint keeper for next retry", zap.String("gc-id", sp.ID))
return
}
log.Info("start to remove gc-safepoint keeper")
Expand Down