Skip to content

Commit

Permalink
resource_control: support calibrate resource (#42165)
Browse files Browse the repository at this point in the history
ref #38825
  • Loading branch information
glorv authored Mar 17, 2023
1 parent ed86b9c commit 9632aa6
Show file tree
Hide file tree
Showing 11 changed files with 10,191 additions and 9,804 deletions.
2 changes: 2 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"bind.go",
"brie.go",
"builder.go",
"calibrate_resource.go",
"change.go",
"checksum.go",
"compact_table.go",
Expand Down Expand Up @@ -274,6 +275,7 @@ go_test(
"batch_point_get_test.go",
"benchmark_test.go",
"brie_test.go",
"calibrate_resource_test.go",
"charset_test.go",
"chunk_size_control_test.go",
"cluster_table_test.go",
Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,8 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
}
}
}
case *ast.CalibrateResourceStmt:
return b.buildCalibrateResource(v.Schema())
case *ast.LoadDataActionStmt:
return &LoadDataActionExec{
baseExecutor: newBaseExecutor(b.ctx, nil, 0),
Expand Down
197 changes: 197 additions & 0 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"strconv"
"strings"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
)

const (
// the workload name of TPC-C
workloadTpcc = "tpcc"
// the default workload to calculate the RU capacity.
defaultWorkload = workloadTpcc
)

// workloadBaseRUCostMap contains the base resource cost rate per 1 kv cpu within 1 second,
// the data is calculated from benchmark result, these data might not be very accurate,
// but is enough here because the maximum RU capacity is depend on both the cluster and
// the workload.
var workloadBaseRUCostMap = map[string]*baseResourceCost{
workloadTpcc: {
tidbCPU: 0.6,
kvCPU: 0.15,
readBytes: units.MiB / 2,
writeBytes: units.MiB,
readReqCount: 300,
writeReqCount: 1750,
},
}

// the resource cost rate of a specified workload per 1 tikv cpu
type baseResourceCost struct {
// the average tikv cpu time, this is used to calculate whether tikv cpu
// or tidb cpu is the performance bottle neck.
tidbCPU float64
// the kv CPU time for calculate RU, it's smaller than the actually cpu usage.
kvCPU float64
// the read bytes rate per 1 tikv cpu.
readBytes uint64
// the write bytes rate per 1 tikv cpu.
writeBytes uint64
// the average tikv read request count per 1 tikv cpu.
readReqCount uint64
// the average tikv write request count per 1 tikv cpu.
writeReqCount uint64
}

func (b *executorBuilder) buildCalibrateResource(schema *expression.Schema) Executor {
return &calibrateResourceExec{
baseExecutor: newBaseExecutor(b.ctx, schema, 0),
}
}

type calibrateResourceExec struct {
baseExecutor
done bool
}

func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
return nil
}
e.done = true

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)

// first fetch the ru settings config.
ruCfg, err := getRUSettings(ctx, exec)
if err != nil {
return err
}

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
if err != nil {
return err
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
if err != nil {
return err
}

// we only support TPC-C currently, will support more in the future.
workload := defaultWorkload
baseCost, ok := workloadBaseRUCostMap[workload]
if !ok {
return errors.Errorf("unknown workload '%s'", workload)
}

if totalTiDBCPU/baseCost.tidbCPU < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbCPU
}
ruPerKVCPU := ruCfg.readBaseCost*float64(baseCost.readReqCount) +
ruCfg.readCostCPU*baseCost.kvCPU +
ruCfg.readCostPerByte*float64(baseCost.readBytes) +
ruCfg.writeBaseCost*float64(baseCost.writeReqCount) +
ruCfg.writeCostPerByte*float64(baseCost.writeBytes)
quota := totalKVCPUQuota * ruPerKVCPU
req.AppendUint64(0, uint64(quota))

return nil
}

type ruConfig struct {
readBaseCost float64
writeBaseCost float64
readCostCPU float64
readCostPerByte float64
writeCostPerByte float64
}

func getRUSettings(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (*ruConfig, error) {
rows, fields, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SHOW CONFIG WHERE TYPE = 'pd' AND name like 'request_unit.%'")
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, errors.New("PD request-unit config not found")
}
var nameIdx, valueIdx int
for i, f := range fields {
switch f.ColumnAsName.L {
case "name":
nameIdx = i
case "value":
valueIdx = i
}
}

cfg := &ruConfig{}
for _, row := range rows {
val, err := strconv.ParseFloat(row.GetString(valueIdx), 64)
if err != nil {
return nil, errors.Trace(err)
}
name, _ := strings.CutPrefix(row.GetString(nameIdx), "request-unit.")

switch name {
case "read-base-cost":
cfg.readBaseCost = val
case "read-cost-per-byte":
cfg.readCostPerByte = val
case "read-cpu-ms-cost":
cfg.readCostCPU = val
case "write-base-cost":
cfg.writeBaseCost = val
case "write-cost-per-byte":
cfg.writeCostPerByte = val
}
}

return cfg, nil
}

func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota")
}

func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
}

func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) {
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
return 0.0, errors.Trace(err)
}
if len(rows) == 0 {
return 0.0, errors.Errorf("metrics '%s' is empty", metrics)
}

return rows[0].GetFloat64(0), nil
}
105 changes: 105 additions & 0 deletions executor/calibrate_resource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
"context"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
)

func TestCalibrateResource(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

var confItems [][]types.Datum
var confErr error
var confFunc executor.TestShowClusterConfigFunc = func() ([][]types.Datum, error) {
return confItems, confErr
}
tk.Session().SetValue(executor.TestShowClusterConfigKey, confFunc)
strs2Items := func(strs ...string) []types.Datum {
items := make([]types.Datum, 0, len(strs))
for _, s := range strs {
items = append(items, types.NewStringDatum(s))
}
return items
}

// empty requet-unit config error
rs, err := tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "PD request-unit config not found")

confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.read-base-cost", "0.25"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.read-cost-per-byte", "0.0000152587890625"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.read-cpu-ms-cost", "0.3333333333333333"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.write-base-cost", "1"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.write-cost-per-byte", "0.0009765625"))

// empty metrics error
rs, err = tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "query metric error: pd unavailable")

// Mock for metric table data.
fpName := "github.com/pingcap/tidb/executor/mockMetricsTableData"
require.NoError(t, failpoint.Enable(fpName, "return"))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()

datetime := func(s string) types.Time {
time, err := types.ParseTime(tk.Session().GetSessionVars().StmtCtx, s, mysql.TypeDatetime, types.MaxFsp, nil)
require.NoError(t, err)
return time
}

mockData := map[string][][]types.Datum{
"tikv_cpu_quota": {
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-0", 8.0),
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-1", 8.0),
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-2", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-0", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-1", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-2", 8.0),
},
"tidb_server_maxprocs": {
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 40.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tidb-0", 40.0),
},
}
ctx := context.WithValue(context.Background(), "__mockMetricsTableData", mockData)
ctx = failpoint.WithHook(ctx, func(_ context.Context, fpname string) bool {
return fpName == fpname
})
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("68569"))

// change total tidb cpu to less than tikv_cpu_quota
mockData["tidb_server_maxprocs"] = [][]types.Datum{
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 8.0),
}
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("38094"))
}
10 changes: 10 additions & 0 deletions infoschema/metric_table_def.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,11 @@ var MetricTableMap = map[string]MetricTableDef{
Labels: []string{"instance", "type"},
Comment: "The available or capacity size of each TiKV instance",
},
"tikv_cpu_quota": {
PromQL: "tikv_server_cpu_cores_quota{$LABEL_CONDITIONS}",
Labels: []string{"instance"},
Comment: "The Total CPU quota of each TiKV instance",
},
"tikv_thread_cpu": {
PromQL: `sum(rate(tikv_thread_cpu_seconds_total{$LABEL_CONDITIONS}[$RANGE_DURATION])) by (instance,name)`,
Labels: []string{"instance", "name"},
Expand Down Expand Up @@ -2713,6 +2718,11 @@ var MetricTableMap = map[string]MetricTableDef{
Labels: []string{"instance", "type", "sql_type"},
Comment: "The total time of transaction execution durations, including retry(second)",
},
"tidb_server_maxprocs": {
PromQL: "tidb_server_maxprocs{$LABEL_CONDITIONS}",
Labels: []string{"instance"},
Comment: "The Total CPU quota of each TiDB instance",
},
"tikv_raftstore_append_log_total_count": {
PromQL: "sum(increase(tikv_raftstore_append_log_duration_seconds_count{$LABEL_CONDITIONS}[$RANGE_DURATION])) by (instance)",
Labels: []string{"instance"},
Expand Down
21 changes: 21 additions & 0 deletions parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3649,3 +3649,24 @@ func (n *SetResourceGroupStmt) Accept(v Visitor) (Node, bool) {
n = newNode.(*SetResourceGroupStmt)
return v.Leave(n)
}

// CalibrateResourceStmt is a statement to fetch the cluster RU capacity
type CalibrateResourceStmt struct {
stmtNode
}

// Restore implements Node interface.
func (n *CalibrateResourceStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("CALIBRATE RESOURCE")
return nil
}

// Accept implements Node Accept interface.
func (n *CalibrateResourceStmt) Accept(v Visitor) (Node, bool) {
newNode, skipChildren := v.Enter(n)
if skipChildren {
return v.Leave(newNode)
}
n = newNode.(*CalibrateResourceStmt)
return v.Leave(n)
}
1 change: 1 addition & 0 deletions parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ var tokenMap = map[string]int{
"BY": by,
"BYTE": byteType,
"CACHE": cache,
"CALIBRATE": calibrate,
"CALL": call,
"CANCEL": cancel,
"CAPTURE": capture,
Expand Down
Loading

0 comments on commit 9632aa6

Please sign in to comment.