Skip to content

Commit

Permalink
resource control: fix unsafe usage of timer.Reset
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Dec 6, 2024
1 parent da0000a commit 384f53d
Showing 1 changed file with 36 additions and 0 deletions.
36 changes: 36 additions & 0 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,15 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
// Stop the timer if it's not stopped.
if !watchRetryTimer.Stop() {
select {
case <-watchRetryTimer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
Expand All @@ -310,6 +319,15 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, opt.WithRev(cfgRevision), opt.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
// Stop the timer if it's not stopped.
if !watchRetryTimer.Stop() {
select {
case <-watchRetryTimer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
watchRetryTimer.Reset(watchRetryInterval)
}
}
Expand Down Expand Up @@ -344,6 +362,15 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
})
if !ok {
watchMetaChannel = nil
// Stop the timer if it's not stopped.
if !watchRetryTimer.Stop() {
select {
case <-watchRetryTimer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
Expand Down Expand Up @@ -395,6 +422,15 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
case resp, ok := <-watchConfigChannel:
if !ok {
watchConfigChannel = nil
// Stop the timer if it's not stopped.
if !watchRetryTimer.Stop() {
select {
case <-watchRetryTimer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
Expand Down

0 comments on commit 384f53d

Please sign in to comment.