diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 55b2bffb5c5..0874097f12b 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/utils" atomicutil "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -291,15 +292,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) - // Stop the timer if it's not stopped. - if !watchRetryTimer.Stop() { - select { - case <-watchRetryTimer.C: // try to drain from the channel - default: - } - } - // We need be careful here, see more details in the comments of Timer.Reset. - // https://pkg.go.dev/time@master#Timer.Reset + utils.DrainAndStopTimer(watchRetryTimer) watchRetryTimer.Reset(watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) @@ -310,15 +303,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) if err != nil { log.Warn("watch resource group config failed", zap.Error(err)) - // Stop the timer if it's not stopped. - if !watchRetryTimer.Stop() { - select { - case <-watchRetryTimer.C: // try to drain from the channel - default: - } - } - // We need be careful here, see more details in the comments of Timer.Reset. - // https://pkg.go.dev/time@master#Timer.Reset + utils.DrainAndStopTimer(watchRetryTimer) watchRetryTimer.Reset(watchRetryInterval) } } @@ -353,15 +338,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { }) if !ok { watchMetaChannel = nil - // Stop the timer if it's not stopped. - if !watchRetryTimer.Stop() { - select { - case <-watchRetryTimer.C: // try to drain from the channel - default: - } - } - // We need be careful here, see more details in the comments of Timer.Reset. - // https://pkg.go.dev/time@master#Timer.Reset + utils.DrainAndStopTimer(watchRetryTimer) watchRetryTimer.Reset(watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) @@ -398,15 +375,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case resp, ok := <-watchConfigChannel: if !ok { watchConfigChannel = nil - // Stop the timer if it's not stopped. - if !watchRetryTimer.Stop() { - select { - case <-watchRetryTimer.C: // try to drain from the channel - default: - } - } - // We need be careful here, see more details in the comments of Timer.Reset. - // https://pkg.go.dev/time@master#Timer.Reset + utils.DrainAndStopTimer(watchRetryTimer) watchRetryTimer.Reset(watchRetryInterval) failpoint.Inject("watchStreamError", func() { watchRetryTimer.Reset(20 * time.Millisecond) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index eb89f892f75..5160604b38c 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/timerpool" "github.com/tikv/pd/client/tsoutil" + "github.com/tikv/pd/client/utils" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -432,15 +433,7 @@ tsoBatchLoop: if maxBatchWaitInterval >= 0 { tbc.adjustBestBatchSize() } - // Stop the timer if it's not stopped. - if !streamLoopTimer.Stop() { - select { - case <-streamLoopTimer.C: // try to drain from the channel - default: - } - } - // We need be careful here, see more details in the comments of Timer.Reset. - // https://pkg.go.dev/time@master#Timer.Reset + utils.DrainAndStopTimer(streamLoopTimer) streamLoopTimer.Reset(c.option.timeout) // Choose a stream to send the TSO gRPC request. streamChoosingLoop: diff --git a/client/utils/utils.go b/client/utils/utils.go new file mode 100644 index 00000000000..1c4eaa9bd5d --- /dev/null +++ b/client/utils/utils.go @@ -0,0 +1,29 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package utils + +import "time" + +// DrainAndStopTimer is used to drain and stop timer. +// We need be careful here, see more details in the comments of Timer.Reset. +// https://pkg.go.dev/time@master#Timer.Reset +func DrainAndStopTimer(t *time.Timer) { + // Stop the timer if it's not stopped. + if !t.Stop() { + select { + case <-t.C: // try to drain from the channel + default: + } + } +}