Skip to content

Commit

Permalink
add chaos testing for advancer owner
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Dec 12, 2024
1 parent d6b313f commit 2e8a8eb
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 27 deletions.
13 changes: 4 additions & 9 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (c *CheckpointAdvancer) HasTask() bool {
return c.task != nil
}

// HasSubscriber returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscribion() bool {
// HasSubscriptions returns whether the advancer is associated with a subscriber.
func (c *CheckpointAdvancer) HasSubscriptions() bool {
c.subscriberMu.Lock()
defer c.subscriberMu.Unlock()

Expand All @@ -117,7 +117,7 @@ func newCheckpointWithTS(ts uint64) *checkpoint {
}
}

func NewCheckpointWithSpan(s spans.Valued) *checkpoint {
func newCheckpointWithSpan(s spans.Valued) *checkpoint {
return &checkpoint{
StartKey: s.Key.StartKey,
EndKey: s.Key.EndKey,
Expand Down Expand Up @@ -270,11 +270,6 @@ func (c *CheckpointAdvancer) WithCheckpoints(f func(*spans.ValueSortedFull)) {
f(c.checkpoints)
}

// only used for test
func (c *CheckpointAdvancer) NewCheckpoints(cps *spans.ValueSortedFull) {
c.checkpoints = cps
}

func (c *CheckpointAdvancer) fetchRegionHint(ctx context.Context, startKey []byte) string {
region, err := locateKeyOfRegion(ctx, c.env, startKey)
if err != nil {
Expand Down Expand Up @@ -473,7 +468,7 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
}

func (c *CheckpointAdvancer) setCheckpoint(s spans.Valued) bool {
cp := NewCheckpointWithSpan(s)
cp := newCheckpointWithSpan(s)
if cp.TS < c.lastCheckpoint.TS {
log.Warn("failed to update global checkpoint: stale",
zap.Uint64("old", c.lastCheckpoint.TS), zap.Uint64("new", cp.TS))
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func TestRemoveTaskAndFlush(t *testing.T) {
}, 10*time.Second, 100*time.Millisecond)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/subscription-handler-loop"))
require.Eventually(t, func() bool {
return !adv.HasSubscribion()
return !adv.HasSubscriptions()
}, 10*time.Second, 100*time.Millisecond)
}

Expand Down
28 changes: 19 additions & 9 deletions br/pkg/streamhelper/config/advancer_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,19 @@ import (
const (
flagBackoffTime = "backoff-time"
flagTickInterval = "tick-interval"
flagFullScanDiffTick = "full-scan-tick"
flagAdvancingByCache = "advancing-by-cache"
flagTryAdvanceThreshold = "try-advance-threshold"
flagCheckPointLagLimit = "check-point-lag-limit"

DefaultConsistencyCheckTick = 5
DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultCheckPointLagLimit = 48 * time.Hour
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second
DefaultFullScanTick = 4
DefaultAdvanceByCache = true
// used for chaos testing
flagOwnerRetireInterval = "advance-owner-resign-interval"

DefaultTryAdvanceThreshold = 4 * time.Minute
DefaultCheckPointLagLimit = 48 * time.Hour
DefaultBackOffTime = 5 * time.Second
DefaultTickInterval = 12 * time.Second

// used for chaos testing, default to disable
DefaultAdvancerOwnerRetireInterval = 0
)

var (
Expand All @@ -38,6 +39,11 @@ type Config struct {
TryAdvanceThreshold time.Duration `toml:"try-advance-threshold" json:"try-advance-threshold"`
// The maximum lag could be tolerated for the checkpoint lag.
CheckPointLagLimit time.Duration `toml:"check-point-lag-limit" json:"check-point-lag-limit"`

// Following configs are used in chaos testings, better not to enable in prod
//
// used to periodically retire advancer owner for chaos testing
AdvancerOwnerRetireInterval time.Duration `toml:"advancer-owner-retire-interval" json:"advancer-owner-retire-interval"`
}

func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
Expand All @@ -49,6 +55,10 @@ func DefineFlagsForCheckpointAdvancerConfig(f *pflag.FlagSet) {
"If the checkpoint lag is greater than how long, we would try to poll TiKV for checkpoints.")
f.Duration(flagCheckPointLagLimit, DefaultCheckPointLagLimit,
"The maximum lag could be tolerated for the checkpoint lag.")

// used for chaos testing
f.Duration(flagOwnerRetireInterval, DefaultAdvancerOwnerRetireInterval,
"The interval that the owner will retire itself")
}

func Default() Config {
Expand Down
18 changes: 14 additions & 4 deletions br/pkg/streamhelper/daemon/owner_daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,20 @@ type OwnerDaemon struct {

// When not `nil`, implies the daemon is running.
cancel context.CancelFunc

// leader retire internal, used for chaos testing, suggest not to enable in prod
// default to 0 to disable
retireInterval time.Duration
ownerStartTime time.Time
}

// New creates a new owner daemon.
func New(daemon Interface, manager owner.Manager, tickInterval time.Duration) *OwnerDaemon {
func New(daemon Interface, manager owner.Manager, tickInterval time.Duration, retireInternal time.Duration) *OwnerDaemon {
return &OwnerDaemon{
daemon: daemon,
manager: manager,
tickInterval: tickInterval,
daemon: daemon,
manager: manager,
tickInterval: tickInterval,
retireInterval: retireInternal,
}
}

Expand All @@ -56,12 +62,16 @@ func (od *OwnerDaemon) ownerTick(ctx context.Context) {
log.Info("daemon became owner", zap.String("id", od.manager.ID()), zap.String("daemon-id", od.daemon.Name()))
// Note: maybe save the context so we can cancel the tick when we are not owner?
od.daemon.OnBecomeOwner(cx)
od.ownerStartTime = time.Now()
}

// Tick anyway.
if err := od.daemon.OnTick(ctx); err != nil {
log.Warn("failed on tick", logutil.ShortError(err))
}
if od.retireInterval != 0 && time.Now().Sub(od.ownerStartTime) > od.retireInterval {
od.manager.RetireOwner()
}
}

// Begin starts the daemon.
Expand Down
38 changes: 36 additions & 2 deletions br/pkg/streamhelper/daemon/owner_daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestDaemon(t *testing.T) {
req := require.New(t)
app := newTestApp(t)
ow := owner.NewMockManager(ctx, "owner_daemon_test", nil, "owner_key")
d := daemon.New(app, ow, 100*time.Millisecond)
d := daemon.New(app, ow, 100*time.Millisecond, 0)

app.AssertService(req, false)
f, err := d.Begin(ctx)
Expand All @@ -149,10 +149,44 @@ func TestDaemon(t *testing.T) {
ow.RetireOwner()
req.False(ow.IsOwner())
app.AssertNotRunning(1 * time.Second)
ow.CampaignOwner()
req.Eventually(func() bool {
return ow.IsOwner()
}, 1*time.Second, 100*time.Millisecond)
app.AssertStart(1 * time.Second)
app.AssertTick(1 * time.Second)

// make sure chaos did not kick in so never retires
req.Neverf(func() bool {
return !ow.IsOwner()
}, 5*time.Second, 100*time.Millisecond, "should never retire")
}

func TestDaemonWithChaos(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
req := require.New(t)
app := newTestApp(t)
ow := owner.NewMockManager(ctx, "owner_daemon_test", nil, "owner_key")
d := daemon.New(app, ow, 100*time.Millisecond, 2*time.Second)

app.AssertService(req, false)
f, err := d.Begin(ctx)
req.NoError(err)
app.AssertService(req, true)
go f()

// wait for it to become owner
req.Eventually(func() bool {
return ow.IsOwner()
}, 1*time.Second, 100*time.Millisecond)

// wait for chaos test to kick in to auto retire
req.Eventually(func() bool {
return !ow.IsOwner()
}, 3*time.Second, 500*time.Millisecond)

// sanity check it will try to become leader in background again
req.Eventually(func() bool {
return ow.IsOwner()
}, 2*time.Second, 500*time.Millisecond)
}
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -936,7 +936,7 @@ func RunStreamAdvancer(c context.Context, g glue.Glue, cmdName string, cfg *Stre
defer func() {
ownerMgr.Close()
}()
advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration)
advancerd := daemon.New(advancer, ownerMgr, cfg.AdvancerCfg.TickDuration, cfg.AdvancerCfg.AdvancerOwnerRetireInterval)
loop, err := advancerd.Begin(ctx)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1557,7 +1557,7 @@ func (do *Domain) initLogBackup(ctx context.Context, pdClient pd.Client) error {
}
adv := streamhelper.NewCheckpointAdvancer(env)
do.brOwnerMgr = streamhelper.OwnerManagerForLogBackup(ctx, do.etcdClient)
do.logBackupAdvancer = daemon.New(adv, do.brOwnerMgr, adv.Config().TickDuration)
do.logBackupAdvancer = daemon.New(adv, do.brOwnerMgr, adv.Config())
loop, err := do.logBackupAdvancer.Begin(ctx)
if err != nil {
return err
Expand Down

0 comments on commit 2e8a8eb

Please sign in to comment.