Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
lhy1024 committed Dec 9, 2024
1 parent b0514ff commit b8093f4
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 45 deletions.
41 changes: 5 additions & 36 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/utils"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -291,15 +292,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
// Stop the timer if it's not stopped.
if !watchRetryTimer.Stop() {
select {
case <-watchRetryTimer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
utils.DrainAndStopTimer(watchRetryTimer)
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
Expand All @@ -310,15 +303,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
// Stop the timer if it's not stopped.
if !watchRetryTimer.Stop() {
select {
case <-watchRetryTimer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
utils.DrainAndStopTimer(watchRetryTimer)
watchRetryTimer.Reset(watchRetryInterval)
}
}
Expand Down Expand Up @@ -353,15 +338,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
})
if !ok {
watchMetaChannel = nil
// Stop the timer if it's not stopped.
if !watchRetryTimer.Stop() {
select {
case <-watchRetryTimer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
utils.DrainAndStopTimer(watchRetryTimer)
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
Expand Down Expand Up @@ -398,15 +375,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
case resp, ok := <-watchConfigChannel:
if !ok {
watchConfigChannel = nil
// Stop the timer if it's not stopped.
if !watchRetryTimer.Stop() {
select {
case <-watchRetryTimer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
utils.DrainAndStopTimer(watchRetryTimer)
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
Expand Down
11 changes: 2 additions & 9 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/client/timerpool"
"github.com/tikv/pd/client/tsoutil"
"github.com/tikv/pd/client/utils"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -432,15 +433,7 @@ tsoBatchLoop:
if maxBatchWaitInterval >= 0 {
tbc.adjustBestBatchSize()
}
// Stop the timer if it's not stopped.
if !streamLoopTimer.Stop() {
select {
case <-streamLoopTimer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
utils.DrainAndStopTimer(streamLoopTimer)
streamLoopTimer.Reset(c.option.timeout)
// Choose a stream to send the TSO gRPC request.
streamChoosingLoop:
Expand Down
29 changes: 29 additions & 0 deletions client/utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2024 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils

import "time"

// DrainAndStopTimer is used to drain and stop timer.
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
func DrainAndStopTimer(t *time.Timer) {
// Stop the timer if it's not stopped.
if !t.Stop() {
select {
case <-t.C: // try to drain from the channel
default:
}
}
}

0 comments on commit b8093f4

Please sign in to comment.