Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: support calibrate resource #42165

Merged
merged 13 commits into from
Mar 17, 2023
2 changes: 2 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"bind.go",
"brie.go",
"builder.go",
"calibrate_resource.go",
"change.go",
"checksum.go",
"compact_table.go",
Expand Down Expand Up @@ -274,6 +275,7 @@ go_test(
"batch_point_get_test.go",
"benchmark_test.go",
"brie_test.go",
"calibrate_resource_test.go",
"charset_test.go",
"chunk_size_control_test.go",
"cluster_table_test.go",
Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,8 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
}
}
}
case *ast.CalibrateResourceStmt:
return b.buildCalibrateResource(v.Schema())
case *ast.LoadDataActionStmt:
return &LoadDataActionExec{
baseExecutor: newBaseExecutor(b.ctx, nil, 0),
Expand Down
197 changes: 197 additions & 0 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"strconv"
"strings"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
)

const (
// the workload name of TPC-C
workloadTpcc = "tpcc"
// the default workload to calculate the RU capacity.
defaultWorkload = workloadTpcc
)

// workloadBaseRUCostMap contains the base resource cost rate per 1 kv cpu within 1 second,
// the data is calculated from benchmark result, these data might not be very accurate,
// but is enough here because the maximum RU capacity is depend on both the cluster and
// the workload.
var workloadBaseRUCostMap = map[string]*baseResourceCost{
workloadTpcc: {
tidbCPU: 0.6,
kvCPU: 0.15,
readBytes: units.MiB / 2,
writeBytes: units.MiB,
readReqCount: 300,
writeReqCount: 1750,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it means 1 core can provide 1750 request in here? maybe add more comments.

Copy link
Contributor Author

@glorv glorv Mar 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is based on benchmark result. I added comment on the baseResourceCost struct

},
}

// the resource cost rate of a specified workload per 1 tikv cpu
type baseResourceCost struct {
// the average tikv cpu time, this is used to calculate whether tikv cpu
// or tidb cpu is the performance bottle neck.
tidbCPU float64
// the kv CPU time for calculate RU, it's smaller than the actually cpu usage.
kvCPU float64
// the read bytes rate per 1 tikv cpu.
readBytes uint64
// the write bytes rate per 1 tikv cpu.
writeBytes uint64
// the average tikv read request count per 1 tikv cpu.
readReqCount uint64
// the average tikv write request count per 1 tikv cpu.
writeReqCount uint64
}

func (b *executorBuilder) buildCalibrateResource(schema *expression.Schema) Executor {
return &calibrateResourceExec{
baseExecutor: newBaseExecutor(b.ctx, schema, 0),
}
}

type calibrateResourceExec struct {
baseExecutor
done bool
}

func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
return nil
}
e.done = true

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)

// first fetch the ru settings config.
ruCfg, err := getRUSettings(ctx, exec)
if err != nil {
return err
}

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
if err != nil {
return err
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
if err != nil {
return err
}

// we only support TPC-C currently, will support more in the future.
workload := defaultWorkload
baseCost, ok := workloadBaseRUCostMap[workload]
if !ok {
return errors.Errorf("unknown workload '%s'", workload)
}

if totalTiDBCPU/baseCost.tidbCPU < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbCPU
}
ruPerKVCPU := ruCfg.readBaseCost*float64(baseCost.readReqCount) +
ruCfg.readCostCPU*baseCost.kvCPU +
ruCfg.readCostPerByte*float64(baseCost.readBytes) +
ruCfg.writeBaseCost*float64(baseCost.writeReqCount) +
ruCfg.writeCostPerByte*float64(baseCost.writeBytes)
quota := totalKVCPUQuota * ruPerKVCPU
req.AppendUint64(0, uint64(quota))

return nil
}

type ruConfig struct {
readBaseCost float64
writeBaseCost float64
readCostCPU float64
readCostPerByte float64
writeCostPerByte float64
}

func getRUSettings(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (*ruConfig, error) {
rows, fields, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SHOW CONFIG WHERE TYPE = 'pd' AND name like 'request_unit.%'")
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, errors.New("PD request-unit config not found")
}
var nameIdx, valueIdx int
for i, f := range fields {
switch f.ColumnAsName.L {
case "name":
nameIdx = i
case "value":
valueIdx = i
}
}

cfg := &ruConfig{}
for _, row := range rows {
val, err := strconv.ParseFloat(row.GetString(valueIdx), 64)
if err != nil {
return nil, errors.Trace(err)
}
name, _ := strings.CutPrefix(row.GetString(nameIdx), "request-unit.")

switch name {
case "read-base-cost":
cfg.readBaseCost = val
case "read-cost-per-byte":
cfg.readCostPerByte = val
case "read-cpu-ms-cost":
cfg.readCostCPU = val
case "write-base-cost":
cfg.writeBaseCost = val
case "write-cost-per-byte":
cfg.writeCostPerByte = val
}
}

return cfg, nil
}

func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota")
}

func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
}

func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) {
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
return 0.0, errors.Trace(err)
}
if len(rows) == 0 {
return 0.0, errors.Errorf("metrics '%s' is empty", metrics)
}

return rows[0].GetFloat64(0), nil
}
105 changes: 105 additions & 0 deletions executor/calibrate_resource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
"context"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
)

func TestCalibrateResource(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

var confItems [][]types.Datum
var confErr error
var confFunc executor.TestShowClusterConfigFunc = func() ([][]types.Datum, error) {
return confItems, confErr
}
tk.Session().SetValue(executor.TestShowClusterConfigKey, confFunc)
strs2Items := func(strs ...string) []types.Datum {
items := make([]types.Datum, 0, len(strs))
for _, s := range strs {
items = append(items, types.NewStringDatum(s))
}
return items
}

// empty requet-unit config error
rs, err := tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "PD request-unit config not found")

confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.read-base-cost", "0.25"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.read-cost-per-byte", "0.0000152587890625"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.read-cpu-ms-cost", "0.3333333333333333"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.write-base-cost", "1"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.write-cost-per-byte", "0.0009765625"))

// empty metrics error
rs, err = tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "query metric error: pd unavailable")

// Mock for metric table data.
fpName := "github.com/pingcap/tidb/executor/mockMetricsTableData"
require.NoError(t, failpoint.Enable(fpName, "return"))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()

datetime := func(s string) types.Time {
time, err := types.ParseTime(tk.Session().GetSessionVars().StmtCtx, s, mysql.TypeDatetime, types.MaxFsp, nil)
require.NoError(t, err)
return time
}

mockData := map[string][][]types.Datum{
"tikv_cpu_quota": {
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-0", 8.0),
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-1", 8.0),
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-2", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-0", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-1", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-2", 8.0),
},
"tidb_server_maxprocs": {
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 40.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tidb-0", 40.0),
},
}
ctx := context.WithValue(context.Background(), "__mockMetricsTableData", mockData)
ctx = failpoint.WithHook(ctx, func(_ context.Context, fpname string) bool {
return fpName == fpname
})
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("68569"))

// change total tidb cpu to less than tikv_cpu_quota
mockData["tidb_server_maxprocs"] = [][]types.Datum{
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 8.0),
}
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("38094"))
}
10 changes: 10 additions & 0 deletions infoschema/metric_table_def.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,11 @@ var MetricTableMap = map[string]MetricTableDef{
Labels: []string{"instance", "type"},
Comment: "The available or capacity size of each TiKV instance",
},
"tikv_cpu_quota": {
PromQL: "tikv_server_cpu_cores_quota{$LABEL_CONDITIONS}",
Labels: []string{"instance"},
Comment: "The Total CPU quota of each TiKV instance",
},
"tikv_thread_cpu": {
PromQL: `sum(rate(tikv_thread_cpu_seconds_total{$LABEL_CONDITIONS}[$RANGE_DURATION])) by (instance,name)`,
Labels: []string{"instance", "name"},
Expand Down Expand Up @@ -2713,6 +2718,11 @@ var MetricTableMap = map[string]MetricTableDef{
Labels: []string{"instance", "type", "sql_type"},
Comment: "The total time of transaction execution durations, including retry(second)",
},
"tidb_server_maxprocs": {
PromQL: "tidb_server_maxprocs{$LABEL_CONDITIONS}",
Labels: []string{"instance"},
Comment: "The Total CPU quota of each TiDB instance",
},
"tikv_raftstore_append_log_total_count": {
PromQL: "sum(increase(tikv_raftstore_append_log_duration_seconds_count{$LABEL_CONDITIONS}[$RANGE_DURATION])) by (instance)",
Labels: []string{"instance"},
Expand Down
21 changes: 21 additions & 0 deletions parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3649,3 +3649,24 @@ func (n *SetResourceGroupStmt) Accept(v Visitor) (Node, bool) {
n = newNode.(*SetResourceGroupStmt)
return v.Leave(n)
}

// CalibrateResourceStmt is a statement to fetch the cluster RU capacity
type CalibrateResourceStmt struct {
stmtNode
}

// Restore implements Node interface.
func (n *CalibrateResourceStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("CALIBRATE RESOURCE")
return nil
}

// Accept implements Node Accept interface.
func (n *CalibrateResourceStmt) Accept(v Visitor) (Node, bool) {
newNode, skipChildren := v.Enter(n)
if skipChildren {
return v.Leave(newNode)
}
n = newNode.(*CalibrateResourceStmt)
return v.Leave(n)
}
1 change: 1 addition & 0 deletions parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ var tokenMap = map[string]int{
"BY": by,
"BYTE": byteType,
"CACHE": cache,
"CALIBRATE": calibrate,
"CALL": call,
"CANCEL": cancel,
"CAPTURE": capture,
Expand Down
Loading