Skip to content

Commit

Permalink
br/streamhelper: fix panic while removing task (pingcap#50869) (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 27, 2024
1 parent 02089bc commit 8a48675
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 10 deletions.
3 changes: 1 addition & 2 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ go_test(
"subscription_test.go",
],
flaky = True,
race = "on",
shard_count = 21,
shard_count = 22,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
46 changes: 38 additions & 8 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,22 @@ type CheckpointAdvancer struct {
subscriberMu sync.Mutex
}

// HasTask returns whether the advancer has been bound to a task.
func (c *CheckpointAdvancer) HasTask() bool {
c.taskMu.Lock()
defer c.taskMu.Unlock()

return c.task != nil
}

// HasSubscriber returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscribion() bool {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()

return c.subscriber != nil && len(c.subscriber.subscriptions) > 0
}

// checkpoint represents the TS with specific range.
// it's only used in advancer.go.
type checkpoint struct {
Expand Down Expand Up @@ -361,6 +377,12 @@ func (c *CheckpointAdvancer) StartTaskListener(ctx context.Context) {
}()
}

func (c *CheckpointAdvancer) setCheckpoints(cps *spans.ValueSortedFull) {
c.checkpointsMu.Lock()
c.checkpoints = cps
c.checkpointsMu.Unlock()
}

func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
Expand All @@ -369,7 +391,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
utils.LogBackupTaskCountInc()
c.task = e.Info
c.taskRange = spans.Collapse(len(e.Ranges), func(i int) kv.KeyRange { return e.Ranges[i] })
c.checkpoints = spans.Sorted(spans.NewFullWith(e.Ranges, 0))
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
c.lastCheckpoint = newCheckpointWithTS(e.Info.StartTs)
p, err := c.env.BlockGCUntil(ctx, c.task.StartTs)
if err != nil {
Expand All @@ -381,12 +403,12 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
utils.LogBackupTaskCountDec()
c.task = nil
c.taskRange = nil
c.checkpoints = nil
// This would be synced by `taskMu`, perhaps we'd better rename that to `tickMu`.
// Do the null check because some of test cases won't equip the advancer with subscriber.
if c.subscriber != nil {
c.subscriber.Clear()
}
c.setCheckpoints(nil)
if err := c.env.ClearV3GlobalCheckpointForTask(ctx, e.Name); err != nil {
log.Warn("failed to clear global checkpoint", logutil.ShortError(err))
}
Expand Down Expand Up @@ -448,8 +470,10 @@ func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) {
defer c.subscriberMu.Unlock()
c.subscriber = NewSubscriber(c.env, c.env, WithMasterContext(ctx))
es := c.subscriber.Events()
log.Info("Subscription handler spawned.", zap.String("category", "log backup subscription manager"))

go func() {
defer utils.CatchAndLogPanic()
for {
select {
case <-ctx.Done():
Expand All @@ -458,12 +482,18 @@ func (c *CheckpointAdvancer) SpawnSubscriptionHandler(ctx context.Context) {
if !ok {
return
}
c.checkpointsMu.Lock()
log.Debug("Accepting region flush event.",
zap.Stringer("range", logutil.StringifyRange(event.Key)),
zap.Uint64("checkpoint", event.Value))
c.checkpoints.Merge(event)
c.checkpointsMu.Unlock()
failpoint.Inject("subscription-handler-loop", func() {})
c.WithCheckpoints(func(vsf *spans.ValueSortedFull) {
if vsf == nil {
log.Warn("Span tree not found, perhaps stale event of removed tasks.",
zap.String("category", "log backup subscription manager"))
return
}
log.Debug("Accepting region flush event.",
zap.Stringer("range", logutil.StringifyRange(event.Key)),
zap.Uint64("checkpoint", event.Value))
vsf.Merge(event)
})
}
}
}()
Expand Down
26 changes: 26 additions & 0 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,3 +437,29 @@ func TestOwnerDropped(t *testing.T) {
require.Equal(t, vsf.MinValue(), cp)
})
}

// TestRemoveTaskAndFlush tests the bug has been described in #50839.
func TestRemoveTaskAndFlush(t *testing.T) {
log.SetLevel(zapcore.DebugLevel)
ctx := context.Background()
c := createFakeCluster(t, 4, true)
installSubscribeSupport(c)
env := &testEnv{
fakeCluster: c,
testCtx: t,
}
adv := streamhelper.NewCheckpointAdvancer(env)
adv.StartTaskListener(ctx)
adv.SpawnSubscriptionHandler(ctx)
require.NoError(t, adv.OnTick(ctx))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop", "pause"))
c.flushAll()
env.unregisterTask()
require.Eventually(t, func() bool {
return !adv.HasTask()
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop"))
require.Eventually(t, func() bool {
return !adv.HasSubscribion()
}, 10*time.Second, 100*time.Millisecond)
}
14 changes: 14 additions & 0 deletions br/pkg/utils/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,17 @@ func PanicToErr(err *error) {
log.Warn("PanicToErr: panicked, recovering and returning error", zap.StackSkip("stack", 1), logutil.ShortError(*err))
}
}

// CatchAndLogPanic recovers when the execution get panicked, and log the panic.
// generally, this would be used with `defer`, like:
//
// func foo() {
// defer utils.CatchAndLogPanic()
// maybePanic()
// }
func CatchAndLogPanic() {
item := recover()
if item != nil {
log.Warn("CatchAndLogPanic: panicked, but ignored.", zap.StackSkip("stack", 1), zap.Any("panic", item))
}
}
1 change: 1 addition & 0 deletions tablecodec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_test(
"main_test.go",
"tablecodec_test.go",
],
data = glob(["testdata/**"]),
embed = [":tablecodec"],
flaky = True,
shard_count = 23,
Expand Down

0 comments on commit 8a48675

Please sign in to comment.