Skip to content

Commit

Permalink
resource_control: make metrics time point match in dynamic calibrate (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
CabinfeverB committed Jul 5, 2023
1 parent ba851d5 commit 370db24
Show file tree
Hide file tree
Showing 2 changed files with 283 additions and 21 deletions.
93 changes: 75 additions & 18 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/duration"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -211,34 +212,48 @@ func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk
if err != nil {
return err
}
rus, err := getRUPerSec(ctx, exec, startTime, endTime)
rus, err := getRUPerSec(ctx, e.ctx, exec, startTime, endTime)
if err != nil {
return err
}
tikvCPUs, err := getComponentCPUUsagePerSec(ctx, exec, "tikv", startTime, endTime)
tikvCPUs, err := getComponentCPUUsagePerSec(ctx, e.ctx, exec, "tikv", startTime, endTime)
if err != nil {
return err
}
tidbCPUs, err := getComponentCPUUsagePerSec(ctx, exec, "tidb", startTime, endTime)
tidbCPUs, err := getComponentCPUUsagePerSec(ctx, e.ctx, exec, "tidb", startTime, endTime)
if err != nil {
return err
}
quotas := make([]float64, 0)
lowCount := 0
for idx, ru := range rus {
if idx >= len(tikvCPUs) || idx >= len(tidbCPUs) {
for {
if rus.isEnd() || tikvCPUs.isEnd() || tidbCPUs.isEnd() {
break
}
tikvQuota, tidbQuota := tikvCPUs[idx]/totalKVCPUQuota, tidbCPUs[idx]/totalTiDBCPU
// make time point match
maxTime := rus.getTime()
if tikvCPUs.getTime().After(maxTime) {
maxTime = tikvCPUs.getTime()
}
if tidbCPUs.getTime().After(maxTime) {
maxTime = tidbCPUs.getTime()
}
if !rus.advance(maxTime) || !tikvCPUs.advance(maxTime) || !tidbCPUs.advance(maxTime) {
continue
}
tikvQuota, tidbQuota := tikvCPUs.getValue()/totalKVCPUQuota, tidbCPUs.getValue()/totalTiDBCPU
// If one of the two cpu usage is greater than the `valuableUsageThreshold`, we can accept it.
// And if both are greater than the `lowUsageThreshold`, we can also accept it.
if tikvQuota > valuableUsageThreshold || tidbQuota > valuableUsageThreshold {
quotas = append(quotas, ru/mathutil.Max(tikvQuota, tidbQuota))
quotas = append(quotas, rus.getValue()/mathutil.Max(tikvQuota, tidbQuota))
} else if tikvQuota < lowUsageThreshold || tidbQuota < lowUsageThreshold {
lowCount++
} else {
quotas = append(quotas, ru/mathutil.Max(tikvQuota, tidbQuota))
quotas = append(quotas, rus.getValue()/mathutil.Max(tikvQuota, tidbQuota))
}
rus.next()
tidbCPUs.next()
tikvCPUs.next()
}
if len(quotas) < 5 {
return errors.Errorf("There are too few metrics points available in selected time window")
Expand Down Expand Up @@ -312,14 +327,51 @@ func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecuto
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
}

func getRUPerSec(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) ([]float64, error) {
query := fmt.Sprintf("SELECT value FROM METRICS_SCHEMA.resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time desc", startTime, endTime)
return getValuesFromMetrics(ctx, exec, query, "resource_manager_resource_unit")
type timePointValue struct {
tp time.Time
val float64
}

type timeSeriesValues struct {
idx int
vals []*timePointValue
}

func (t *timeSeriesValues) isEnd() bool {
return t.idx >= len(t.vals)
}

func (t *timeSeriesValues) next() {
t.idx++
}

func (t *timeSeriesValues) getTime() time.Time {
return t.vals[t.idx].tp
}

func (t *timeSeriesValues) getValue() float64 {
return t.vals[t.idx].val
}

func (t *timeSeriesValues) advance(target time.Time) bool {
for ; t.idx < len(t.vals); t.idx++ {
// `target` is maximal time in other timeSeriesValues,
// so we should find the time which offset is less than 10s.
if t.vals[t.idx].tp.Add(time.Second * 10).After(target) {
return t.vals[t.idx].tp.Add(-time.Second * 10).Before(target)
}
}
return false
}

func getComponentCPUUsagePerSec(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, component, startTime, endTime string) ([]float64, error) {
query := fmt.Sprintf("SELECT sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%%s' GROUP BY time ORDER BY time desc", startTime, endTime, component)
return getValuesFromMetrics(ctx, exec, query, "process_cpu_usage")
func getRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) {
query := fmt.Sprintf("SELECT time, value FROM METRICS_SCHEMA.resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time asc", startTime, endTime)
return getValuesFromMetrics(ctx, sctx, exec, query, "resource_manager_resource_unit")
}

func getComponentCPUUsagePerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, component, startTime, endTime string) (*timeSeriesValues, error) {
query := fmt.Sprintf("SELECT time, sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%%s' GROUP BY time ORDER BY time asc", startTime, endTime, component)
return getValuesFromMetrics(ctx, sctx, exec, query, "process_cpu_usage")
}

func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) {
Expand All @@ -334,17 +386,22 @@ func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecuto
return rows[0].GetFloat64(0), nil
}

func getValuesFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) ([]float64, error) {
func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (*timeSeriesValues, error) {
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, errors.Errorf("metrics '%s' is empty", metrics)
}
ret := make([]float64, 0, len(rows))
ret := make([]*timePointValue, 0, len(rows))
for _, row := range rows {
ret = append(ret, row.GetFloat64(0))
if tp, err := row.GetTime(0).AdjustedGoTime(sctx.GetSessionVars().Location()); err == nil {
ret = append(ret, &timePointValue{
tp: tp,
val: row.GetFloat64(1),
})
}
}
return ret, nil
return &timeSeriesValues{idx: 0, vals: ret}, nil
}
Loading

0 comments on commit 370db24

Please sign in to comment.