diff --git a/br/pkg/checkpoint/BUILD.bazel b/br/pkg/checkpoint/BUILD.bazel index baae284d545fb..76a30d72885be 100644 --- a/br/pkg/checkpoint/BUILD.bazel +++ b/br/pkg/checkpoint/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/pingcap/tidb/br/pkg/checkpoint", visibility = ["//visibility:public"], deps = [ + "//br/pkg/logutil", "//br/pkg/metautil", "//br/pkg/rtree", "//br/pkg/storage", diff --git a/br/pkg/checkpoint/checkpoint.go b/br/pkg/checkpoint/checkpoint.go index bbe2d4e44d1df..c462e78e949ad 100644 --- a/br/pkg/checkpoint/checkpoint.go +++ b/br/pkg/checkpoint/checkpoint.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/rtree" "github.com/pingcap/tidb/br/pkg/storage" @@ -240,7 +241,7 @@ func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalS appendCh: make(chan *CheckpointMessage), metaCh: make(chan map[string]*RangeGroups), - errCh: make(chan error), + errCh: make(chan error, 1), } runner.startCheckpointLoop(ctx, tick) @@ -258,7 +259,7 @@ func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage, appendCh: make(chan *CheckpointMessage), metaCh: make(chan map[string]*RangeGroups), - errCh: make(chan error), + errCh: make(chan error, 1), } runner.startCheckpointLoop(ctx, tickDuration) @@ -344,6 +345,14 @@ func (r *CheckpointRunner) startCheckpointRunner(ctx context.Context, wg *sync.W return errCh } +func (r *CheckpointRunner) sendError(err error) { + select { + case r.errCh <- err: + default: + log.Error("errCh is blocked", logutil.ShortError(err)) + } +} + func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration time.Duration) { r.wg.Add(1) checkpointLoop := func(ctx context.Context) { @@ -360,14 +369,14 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration return case <-ticker.C: if err := r.flushMeta(ctx, errCh); err != nil { - r.errCh <- err + r.sendError(err) return } case msg, ok := <-r.appendCh: if !ok { log.Info("stop checkpoint runner") if err := r.flushMeta(ctx, errCh); err != nil { - r.errCh <- err + r.sendError(err) } // close the channel to flush worker // and wait it to consumes all the metas @@ -386,7 +395,7 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration groups.Groups = append(groups.Groups, msg.Group) case err := <-errCh: // pass flush worker's error back - r.errCh <- err + r.sendError(err) return } } diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 451fa5f6ba572..63e3f43ae5d30 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -427,6 +427,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig defer func() { // don't reset the gc-safe-point if checkpoint mode is used and backup is not finished if cfg.UseCheckpoint && !gcSafePointKeeperRemovable { + log.Info("skip removing gc-safepoint keeper for next retry", zap.String("gc-id", sp.ID)) return } log.Info("start to remove gc-safepoint keeper")