Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: support dynamic calibrate resource #43098

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
return &calibrateResourceExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), 0),
workloadType: s.Tp,
optionList: s.DynamicCalibrateResourceOptionList,
}
case *ast.LoadDataActionStmt:
return &LoadDataActionExec{
Expand Down
251 changes: 220 additions & 31 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,25 @@ package executor

import (
"context"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/duration"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

// workloadBaseRUCostMap contains the base resource cost rate per 1 kv cpu within 1 second,
Expand Down Expand Up @@ -83,13 +93,78 @@ type baseResourceCost struct {
writeReqCount uint64
}

const lowUsageThreshold = 10.
const percentOfPass = 0.9
const discardRate = 0.1

CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
type calibrateResourceExec struct {
baseExecutor

optionList []*ast.DynamicCalibrateResourceOption
workloadType ast.CalibrateResourceType
done bool
}

func (e *calibrateResourceExec) checkDynamicCalibrateOptions() (string, string, error) {
var startTime, endTime time.Time
var startTimeStr, endTimeStr string
checkDurationFn := func() error {
// check the duration
duration := endTime.Sub(startTime)
if duration > time.Hour*24 {
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
return errors.Errorf("the duration of calibration is too long.")
}
if duration < time.Minute*10 {
return errors.Errorf("the duration of calibration is too short.")
}
return nil
}
if len(e.optionList) == 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to replace this if-else with something like following:

var start, end, dur *ptr
for _, op := range e.optionList[0] {
  ...
}
if duration == nil { ...default for duration}
if start == nil { ...default-for-start }
if end == nil { ...default-for-end }

validate_start_end_duration()

...rest logics

And Since The static and dynamic branch have little common logic with each other, please wrap both of them with a separate function to avoid too long if..else.. block

if e.optionList[0].Tp != ast.CalibrateStartTime || (e.optionList[1].Tp != ast.CalibrateEndTime && e.optionList[1].Tp != ast.CalibrateDuration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.optionList[1].Tp != ast.CalibrateEndTime && e.optionList[1].Tp != ast.CalibrateDuration
I'm not sure why the same parameter would have && judgment twice

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we're trying to determine if it's wrong. The expression to assert truth is e.optionList[1].Tp == ast.CalibrateEndTime || e.optionList[1].Tp == ast.CalibrateDuration

return "", "", errors.Errorf("dynamic calibarate options are not matched, please input start time and end time or duration is optional.")
}
// check start time and end time whether validated
startTs, err := staleread.CalculateAsOfTsExpr(e.ctx, e.optionList[0].Ts)
if err != nil {
return "", "", err
}
startTime = oracle.GetTimeFromTS(startTs)
if e.optionList[1].Tp == ast.CalibrateEndTime {
endTs, err := staleread.CalculateAsOfTsExpr(e.ctx, e.optionList[1].Ts)
if err != nil {
return "", "", err
}
endTime = oracle.GetTimeFromTS(endTs)
} else {
duration, err := duration.ParseDuration(e.optionList[1].StrValue)
if err != nil {
return "", "", err
}
endTime = startTime.Add(duration)
}

} else if len(e.optionList) == 1 {
if e.optionList[0].Tp != ast.CalibrateStartTime {
return "", "", errors.Errorf("dynamic calibarate options are not matched, please input start time and end time is optional.")
}
// check start time whether validated
startTs, err := staleread.CalculateAsOfTsExpr(e.ctx, e.optionList[0].Ts)
if err != nil {
return "", "", err
}
startTime = oracle.GetTimeFromTS(startTs)
endTime = time.Now()
} else {
return "", "", errors.Errorf("dynamic calibarate options are too much.")
}
startTimeStr = startTime.In(e.ctx.GetSessionVars().Location()).Format("2006-01-02 15:04:05")
endTimeStr = endTime.In(e.ctx.GetSessionVars().Location()).Format("2006-01-02 15:04:05")
if err := checkDurationFn(); err != nil {
return "", "", err
}
logutil.BgLogger().Info("checkDynamicCalibrateOptions", zap.String("startTimeStr", startTimeStr), zap.String("endTimeStr", endTimeStr))
return startTimeStr, endTimeStr, nil
}

func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
Expand All @@ -99,41 +174,121 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
if len(e.optionList) > 0 {
startTime, endTime, err := e.checkDynamicCalibrateOptions()
if err != nil {
return err
}
totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
logutil.BgLogger().Info("checkDynamicCalibrateOptions", zap.Float64("totalKVCPUQuota", totalKVCPUQuota), zap.Error(err))
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
logutil.BgLogger().Info("checkDynamicCalibrateOptions", zap.Float64("totalTiDBCPU", totalTiDBCPU), zap.Error(err))
if err != nil {
return err
}
rus, err := getRUPerSec(ctx, exec, startTime, endTime)
logutil.BgLogger().Info("checkDynamicCalibrateOptions", zap.Any("rus", rus), zap.Error(err))
if err != nil {
return err
}
tikvCPUs, err := getTiKVCPUUsagePerSec(ctx, exec, startTime, endTime)
logutil.BgLogger().Info("checkDynamicCalibrateOptions", zap.Any("tikvCPUs", tikvCPUs), zap.Error(err))
if err != nil {
return err
}
tidbCPUs, err := getTiDBCPUUsagePerSec(ctx, exec, startTime, endTime)
logutil.BgLogger().Info("checkDynamicCalibrateOptions", zap.Any("tidbCPUs", tidbCPUs), zap.Error(err))
if err != nil {
return err
}
quotas := make([]float64, 0)
lowCount := 0
tidbCPULowCount := 0
tikvCPULowCOunt := 0
for idx, ru := range rus {
if idx >= len(tikvCPUs) || idx >= len(tidbCPUs) {
break
}
tikvQuota := totalKVCPUQuota / tikvCPUs[idx]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if tikvCPUs[idx] == 0 here, I think check tikvCPUs[idx]/totalKVCPUQuota as the cpu usage percentage is a more ergonomic way

tidbQuota := totalTiDBCPU / tidbCPUs[idx]
if tikvQuota > lowUsageThreshold {
lowCount++
tikvCPULowCOunt++
if tidbQuota > lowUsageThreshold {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if one of the two cpu usage is greater than the lowUsageThreshold, we should keep it. Maybe there are cluster topologies that tidb cpu quota >> tikv cpu
quota or vice verse, then no samples can be valid here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about add valuableUsageThreshold? If one of the two cpu usage is greater than the valuableUsageThreshold, we can accept it

tidbCPULowCount++
}
} else if tidbQuota > lowUsageThreshold {
lowCount++
tidbCPULowCount++
} else {
quotas = append(quotas, ru*mathutil.Min(tikvQuota, tidbQuota))
}
}
if len(quotas) < 5 {
return errors.Errorf("there are too few metrics points available.")
}
if float64(len(quotas))/float64(len(quotas)+lowCount) > percentOfPass {
sort.Slice(quotas, func(i, j int) bool {
return quotas[i] > quotas[j]
})
lowerBound := int(math.Round(float64(len(quotas)) * float64(discardRate)))
upperBound := len(quotas) - lowerBound
sum := 0.
for i := lowerBound; i < upperBound; i++ {
sum += quotas[i]
}
quota := sum / float64(upperBound-lowerBound)
req.AppendUint64(0, uint64(quota))
} else {
if tidbCPULowCount > 0 && tikvCPULowCOunt > 0 {
return errors.Errorf("The CPU utilizations of TiDB and TiKV are less than one tenth in some of the time.")
} else if tidbCPULowCount > 0 {
errors.Errorf("The CPU utilization of TiDB is less than one tenth in some of the time.")
} else {
errors.Errorf("The CPU utilization of TiKV is less than one tenth in some of the time.")
}
}

// first fetch the ru settings config.
ruCfg, err := getRUSettings(ctx, exec)
if err != nil {
return err
}
} else { // first fetch the ru settings config.
ruCfg, err := getRUSettings(ctx, exec)
if err != nil {
return err
}

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
if err != nil {
return err
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
if err != nil {
return err
}
totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
if err != nil {
return err
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
if err != nil {
return err
}

// The default workload to calculate the RU capacity.
if e.workloadType == ast.WorkloadNone {
e.workloadType = ast.TPCC
}
baseCost, ok := workloadBaseRUCostMap[e.workloadType]
if !ok {
return errors.Errorf("unknown workload '%T'", e.workloadType)
}
// we only support TPC-C currently, will support more in the future.
// The default workload to calculate the RU capacity.
if e.workloadType == ast.WorkloadNone {
e.workloadType = ast.TPCC
}
baseCost, ok := workloadBaseRUCostMap[e.workloadType]
if !ok {
return errors.Errorf("unknown workload '%T'", e.workloadType)
}

if totalTiDBCPU/baseCost.tidbCPU < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbCPU
}
ruPerKVCPU := ruCfg.readBaseCost*float64(baseCost.readReqCount) +
ruCfg.readCostCPU*baseCost.kvCPU +
ruCfg.readCostPerByte*float64(baseCost.readBytes) +
ruCfg.writeBaseCost*float64(baseCost.writeReqCount) +
ruCfg.writeCostPerByte*float64(baseCost.writeBytes)
quota := totalKVCPUQuota * ruPerKVCPU
req.AppendUint64(0, uint64(quota))

if totalTiDBCPU/baseCost.tidbCPU < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbCPU
}
ruPerKVCPU := ruCfg.readBaseCost*float64(baseCost.readReqCount) +
ruCfg.readCostCPU*baseCost.kvCPU +
ruCfg.readCostPerByte*float64(baseCost.readBytes) +
ruCfg.writeBaseCost*float64(baseCost.writeReqCount) +
ruCfg.writeCostPerByte*float64(baseCost.writeBytes)
quota := totalKVCPUQuota * ruPerKVCPU
req.AppendUint64(0, uint64(quota))

return nil
}
Expand Down Expand Up @@ -199,6 +354,24 @@ func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecuto
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
}

func getRUPerSec(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) ([]float64, error) {
query := fmt.Sprintf("SELECT value FROM METRICS_SCHEMA.resource_manager_resource_unit where time >= '%s' and time <= '%s' ORDER BY time desc", startTime, endTime)
logutil.BgLogger().Info("getRUPerSec", zap.String("query", query))
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
return getValuesFromMetrics(ctx, exec, query, "resource_manager_resource_unit")
}

func getTiDBCPUUsagePerSec(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) ([]float64, error) {
query := fmt.Sprintf("SELECT sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%tidb' GROUP BY time ORDER BY time desc", startTime, endTime)
logutil.BgLogger().Info("getTiDBCPUUsagePerSec", zap.String("getTiDBCPUUsagePerSec", query))
return getValuesFromMetrics(ctx, exec, query, "process_cpu_usage")
}

func getTiKVCPUUsagePerSec(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) ([]float64, error) {
query := fmt.Sprintf("SELECT sum(value) FROM METRICS_SCHEMA.process_cpu_usage where time >= '%s' and time <= '%s' and job like '%%tikv' GROUP BY time ORDER BY time desc", startTime, endTime)
logutil.BgLogger().Info("getTiKVCPUUsagePerSec", zap.String("getTiKVCPUUsagePerSec", query))
return getValuesFromMetrics(ctx, exec, query, "process_cpu_usage")
}

func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can uniform case of words in each statement.

rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
Expand All @@ -210,3 +383,19 @@ func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecuto

return rows[0].GetFloat64(0), nil
}

func getValuesFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) ([]float64, error) {
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
logutil.BgLogger().Info("getValuesFromMetrics", zap.Error(err))
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, errors.Errorf("metrics '%s' is empty", metrics)
}
ret := make([]float64, 0, len(rows))
for _, row := range rows {
ret = append(ret, row.GetFloat64(0))
}
return ret, nil
}
Loading