Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

calibrate: refactor metrics error (#44451) #45167

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 98 additions & 40 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/duration"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -195,6 +196,11 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro
return e.staticCalibrate(ctx, req, exec)
}

var (
errLowUsage = errors.Errorf("The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation; please select another time window with higher workload, or calibrate resource by hardware instead")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
errLowUsage = errors.Errorf("The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation; please select another time window with higher workload, or calibrate resource by hardware instead")
errLowUsage = errors.Errorf("Unable to estimate capacity due to low workload in the selected time window. Please choose another time window with higher workload or calibrate hardware resources instead.")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a cherry-pick, it's not better to modify it.

errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v")
)

func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
startTs, endTs, err := e.parseCalibrateDuration()
if err != nil {
Expand All @@ -205,59 +211,72 @@ func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
if err != nil {
return err
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
if err != nil {
return err
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
rus, err := getRUPerSec(ctx, exec, startTime, endTime)
rus, err := getRUPerSec(ctx, e.ctx, exec, startTime, endTime)
if err != nil {
return err
}
tikvCPUs, err := getComponentCPUUsagePerSec(ctx, exec, "tikv", startTime, endTime)
tikvCPUs, err := getComponentCPUUsagePerSec(ctx, e.ctx, exec, "tikv", startTime, endTime)
if err != nil {
return err
}
tidbCPUs, err := getComponentCPUUsagePerSec(ctx, exec, "tidb", startTime, endTime)
tidbCPUs, err := getComponentCPUUsagePerSec(ctx, e.ctx, exec, "tidb", startTime, endTime)
if err != nil {
return err
}
quotas := make([]float64, 0)
lowCount := 0
for idx, ru := range rus {
if idx >= len(tikvCPUs) || idx >= len(tidbCPUs) {
for {
if rus.isEnd() || tikvCPUs.isEnd() || tidbCPUs.isEnd() {
break
}
tikvQuota, tidbQuota := tikvCPUs[idx]/totalKVCPUQuota, tidbCPUs[idx]/totalTiDBCPU
// make time point match
maxTime := rus.getTime()
if tikvCPUs.getTime().After(maxTime) {
maxTime = tikvCPUs.getTime()
}
if tidbCPUs.getTime().After(maxTime) {
maxTime = tidbCPUs.getTime()
}
if !rus.advance(maxTime) || !tikvCPUs.advance(maxTime) || !tidbCPUs.advance(maxTime) {
continue
}
tikvQuota, tidbQuota := tikvCPUs.getValue()/totalKVCPUQuota, tidbCPUs.getValue()/totalTiDBCPU
// If one of the two cpu usage is greater than the `valuableUsageThreshold`, we can accept it.
// And if both are greater than the `lowUsageThreshold`, we can also accept it.
if tikvQuota > valuableUsageThreshold || tidbQuota > valuableUsageThreshold {
quotas = append(quotas, ru/mathutil.Max(tikvQuota, tidbQuota))
quotas = append(quotas, rus.getValue()/mathutil.Max(tikvQuota, tidbQuota))
} else if tikvQuota < lowUsageThreshold || tidbQuota < lowUsageThreshold {
lowCount++
} else {
quotas = append(quotas, ru/mathutil.Max(tikvQuota, tidbQuota))
quotas = append(quotas, rus.getValue()/mathutil.Max(tikvQuota, tidbQuota))
}
rus.next()
tidbCPUs.next()
tikvCPUs.next()
}
if len(quotas) < 5 {
return errors.Errorf("There are too few metrics points available in selected time window")
return errLowUsage
}
if float64(len(quotas))/float64(len(quotas)+lowCount) > percentOfPass {
sort.Slice(quotas, func(i, j int) bool {
return quotas[i] > quotas[j]
})
lowerBound := int(math.Round(float64(len(quotas)) * discardRate))
upperBound := len(quotas) - lowerBound
sum := 0.
for i := lowerBound; i < upperBound; i++ {
sum += quotas[i]
}
quota := sum / float64(upperBound-lowerBound)
req.AppendUint64(0, uint64(quota))
} else {
return errors.Errorf("The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation; please select another time window with higher workload, or calibrate resource by hardware instead")
if float64(len(quotas))/float64(len(quotas)+lowCount) <= percentOfPass {
return errLowUsage
}
sort.Slice(quotas, func(i, j int) bool {
return quotas[i] > quotas[j]
})
lowerBound := int(math.Round(float64(len(quotas)) * discardRate))
upperBound := len(quotas) - lowerBound
sum := 0.
for i := lowerBound; i < upperBound; i++ {
sum += quotas[i]
}
quota := sum / float64(upperBound-lowerBound)
req.AppendUint64(0, uint64(quota))
return nil
}

Expand All @@ -272,11 +291,11 @@ func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
if err != nil {
return err
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
if err != nil {
return err
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}

// The default workload to calculate the RU capacity.
Expand Down Expand Up @@ -312,14 +331,51 @@ func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecuto
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
}

func getRUPerSec(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) ([]float64, error) {
query := fmt.Sprintf("SELECT value FROM METRICS_SCHEMA.resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time desc", startTime, endTime)
return getValuesFromMetrics(ctx, exec, query, "resource_manager_resource_unit")
type timePointValue struct {
tp time.Time
val float64
}

type timeSeriesValues struct {
idx int
vals []*timePointValue
}

func (t *timeSeriesValues) isEnd() bool {
return t.idx >= len(t.vals)
}

func (t *timeSeriesValues) next() {
t.idx++
}

func (t *timeSeriesValues) getTime() time.Time {
return t.vals[t.idx].tp
}

func (t *timeSeriesValues) getValue() float64 {
return t.vals[t.idx].val
}

func (t *timeSeriesValues) advance(target time.Time) bool {
for ; t.idx < len(t.vals); t.idx++ {
// `target` is maximal time in other timeSeriesValues,
// so we should find the time which offset is less than 10s.
if t.vals[t.idx].tp.Add(time.Second * 10).After(target) {
return t.vals[t.idx].tp.Add(-time.Second * 10).Before(target)
}
}
return false
}

func getRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) {
query := fmt.Sprintf("SELECT time, value FROM METRICS_SCHEMA.resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time asc", startTime, endTime)
return getValuesFromMetrics(ctx, sctx, exec, query, "resource_manager_resource_unit")
}

func getComponentCPUUsagePerSec(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, component, startTime, endTime string) ([]float64, error) {
query := fmt.Sprintf("SELECT sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%%s' GROUP BY time ORDER BY time desc", startTime, endTime, component)
return getValuesFromMetrics(ctx, exec, query, "process_cpu_usage")
func getComponentCPUUsagePerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, component, startTime, endTime string) (*timeSeriesValues, error) {
query := fmt.Sprintf("SELECT time, sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%%s' GROUP BY time ORDER BY time asc", startTime, endTime, component)
return getValuesFromMetrics(ctx, sctx, exec, query, "process_cpu_usage")
}

func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) {
Expand All @@ -334,17 +390,19 @@ func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecuto
return rows[0].GetFloat64(0), nil
}

func getValuesFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) ([]float64, error) {
func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (*timeSeriesValues, error) {
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, errors.Errorf("metrics '%s' is empty", metrics)
}
ret := make([]float64, 0, len(rows))
ret := make([]*timePointValue, 0, len(rows))
for _, row := range rows {
ret = append(ret, row.GetFloat64(0))
if tp, err := row.GetTime(0).AdjustedGoTime(sctx.GetSessionVars().Location()); err == nil {
ret = append(ret, &timePointValue{
tp: tp,
val: row.GetFloat64(1),
})
}
}
return ret, nil
return &timeSeriesValues{idx: 0, vals: ret}, nil
}
Loading