Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-7.5' into cherry-pick-…
Browse files Browse the repository at this point in the history
…7443-to-release-7.5
  • Loading branch information
CabinfeverB committed Dec 25, 2023
2 parents c479cf9 + c9c9979 commit 276e41e
Show file tree
Hide file tree
Showing 39 changed files with 624 additions and 201 deletions.
2 changes: 1 addition & 1 deletion client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var (
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed, %s", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream"))
ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream"))
)

// grpcutil errors
Expand Down
2 changes: 1 addition & 1 deletion client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func ZapError(err error, causeError ...error) zap.Field {
}
if e, ok := err.(*errors.Error); ok {
if len(causeError) >= 1 {
err = e.Wrap(causeError[0]).FastGenWithCause()
err = e.Wrap(causeError[0])
} else {
err = e.FastGenByArgs()
}
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
Expand All @@ -31,7 +32,6 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Config struct {
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`

// EnableControllerTraceLog is to control whether resource control client enable trace.
EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"`
}

// DefaultConfig returns the default resource manager controller configuration.
Expand All @@ -96,6 +99,7 @@ func DefaultConfig() *Config {
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
RequestUnit: DefaultRequestUnitConfig(),
EnableControllerTraceLog: false,
}
}

Expand Down
53 changes: 35 additions & 18 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
Expand All @@ -54,6 +55,14 @@ const (
lowToken selectType = 1
)

var enableControllerTraceLog = atomicutil.NewBool(false)

func logControllerTrace(msg string, fields ...zap.Field) {
if enableControllerTraceLog.Load() {
log.Info(msg, fields...)
}
}

// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
Expand Down Expand Up @@ -369,6 +378,9 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
copyCfg := *c.ruConfig
c.safeRuConfig.Store(&copyCfg)
if enableControllerTraceLog.Load() != config.EnableControllerTraceLog {
enableControllerTraceLog.Store(config.EnableControllerTraceLog)
}
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
}

Expand Down Expand Up @@ -505,7 +517,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
c.responseDeadlineCh = c.run.responseDeadline.C
}
go func() {
log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
logControllerTrace("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
resp, err := c.provider.AcquireTokenBuckets(ctx, req)
latency := time.Since(now)
if err != nil {
Expand All @@ -518,7 +530,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
} else {
successfulTokenRequestDuration.Observe(latency.Seconds())
}
log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
c.tokenResponseChan <- resp
}()
}
Expand Down Expand Up @@ -603,10 +615,11 @@ type groupCostController struct {
calculators []ResourceCalculator
handleRespFunc func(*rmpb.TokenBucketResponse)

successfulRequestDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter
successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter

mu struct {
sync.Mutex
Expand Down Expand Up @@ -696,14 +709,15 @@ func newGroupCostController(
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
}
gc := &groupCostController{
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name),
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name),
calculators: []ResourceCalculator{
newKVCalculator(mainCfg),
newSQLCalculator(mainCfg),
Expand Down Expand Up @@ -805,7 +819,7 @@ func (gc *groupCostController) updateRunState() {
}
*gc.run.consumption = *gc.mu.consumption
gc.mu.Unlock()
log.Debug("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption))
logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption))
gc.run.now = newTime
}

Expand Down Expand Up @@ -886,7 +900,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() {
if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
}
gc.burstable.Store(isBurstable)
}
Expand All @@ -900,7 +914,7 @@ func (gc *groupCostController) updateAvgRUPerSec() {
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
}
gc.burstable.Store(isBurstable)
}
Expand Down Expand Up @@ -1220,6 +1234,9 @@ func (gc *groupCostController) onRequestWait(
}
if err != nil {
gc.failedRequestCounter.Inc()
if d.Seconds() > 0 {
gc.failedLimitReserveDuration.Observe(d.Seconds())
}
gc.mu.Lock()
sub(gc.mu.consumption, delta)
gc.mu.Unlock()
Expand All @@ -1245,7 +1262,7 @@ func (gc *groupCostController) onRequestWait(
*gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter
gc.mu.Unlock()

return delta, penalty, gc.meta.Priority, nil
return delta, penalty, gc.getMeta().GetPriority(), nil
}

func (gc *groupCostController) onResponse(
Expand Down
31 changes: 20 additions & 11 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDurtion time.Duration
// This is the Limit at reservation time, it can change later.
limit Limit
}
Expand Down Expand Up @@ -301,7 +302,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
) {
lim.mu.Lock()
defer lim.mu.Unlock()
log.Debug("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst))
logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst))
if args.NewBurst < 0 {
lim.last = now
lim.tokens = args.NewTokens
Expand All @@ -317,7 +318,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
opt(lim)
}
lim.maybeNotify()
log.Debug("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
logControllerTrace("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
}

// AvailableTokens decreases the amount of tokens currently available.
Expand Down Expand Up @@ -358,9 +359,10 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur

// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDurtion: waitDuration,
}
if ok {
r.tokens = n
Expand All @@ -372,7 +374,14 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
lim.tokens = tokens
lim.maybeNotify()
} else {
log.Debug("[resource group controller]", zap.Float64("current-tokens", lim.tokens), zap.Float64("current-rate", float64(lim.limit)), zap.Float64("request-tokens", n), zap.Int64("burst", lim.burst), zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
log.Warn("[resource group controller] cannot reserve enough tokens",
zap.Duration("need-wait-duration", waitDuration),
zap.Duration("max-wait-duration", maxFutureReserve),
zap.Float64("current-ltb-tokens", lim.tokens),
zap.Float64("current-ltb-rate", float64(lim.limit)),
zap.Float64("request-tokens", n),
zap.Int64("burst", lim.burst),
zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
lim.last = last
if lim.limit == 0 {
lim.notify()
Expand Down Expand Up @@ -452,7 +461,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
return 0, errs.ErrClientResourceGroupThrottled
return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled
}
delay := res.DelayFrom(now)
if delay > longestDelayDuration {
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestCancel(t *testing.T) {
checkTokens(re, lim1, t2, 7)
checkTokens(re, lim2, t2, 2)
d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
re.Equal(d, time.Duration(0))
re.Equal(d, 4*time.Second)
re.Error(err)
checkTokens(re, lim1, t3, 13)
checkTokens(re, lim2, t3, 3)
Expand Down
10 changes: 10 additions & 0 deletions client/resource_group/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ var (
Help: "Bucketed histogram of wait duration of successful request.",
}, []string{resourceGroupNameLabel})

failedLimitReserveDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "limit_reserve_time_failed",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Help: "Bucketed histogram of wait duration of failed request.",
}, []string{resourceGroupNameLabel})

failedRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -85,6 +94,7 @@ func init() {
prometheus.MustRegister(resourceGroupStatusGauge)
prometheus.MustRegister(successfulRequestDuration)
prometheus.MustRegister(failedRequestCounter)
prometheus.MustRegister(failedLimitReserveDuration)
prometheus.MustRegister(requestRetryCounter)
prometheus.MustRegister(tokenRequestDuration)
prometheus.MustRegister(resourceGroupTokenRequestCounter)
Expand Down
6 changes: 3 additions & 3 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ tsoBatchLoop:
} else {
log.Error("[tso] fetch pending tso requests error",
zap.String("dc-location", dc),
errs.ZapError(errs.ErrClientGetTSO.FastGenByArgs("when fetch pending tso requests"), err))
errs.ZapError(errs.ErrClientGetTSO, err))
}
return
}
Expand Down Expand Up @@ -495,10 +495,10 @@ tsoBatchLoop:
default:
}
c.svcDiscovery.ScheduleCheckMemberChanged()
log.Error("[tso] getTS error",
log.Error("[tso] getTS error after processing requests",
zap.String("dc-location", dc),
zap.String("stream-addr", streamAddr),
errs.ZapError(errs.ErrClientGetTSO.FastGenByArgs("after processing requests"), err))
errs.ZapError(errs.ErrClientGetTSO, err))
// Set `stream` to nil and remove this stream from the `connectionCtxs` due to error.
connectionCtxs.Delete(streamAddr)
cancel()
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ get min TSO failed, %v

["PD:client:ErrClientGetTSO"]
error = '''
get TSO failed, %v
get TSO failed
'''

["PD:client:ErrClientGetTSOTimeout"]
Expand Down
Loading

0 comments on commit 276e41e

Please sign in to comment.