Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: support calibrate resource #42165

Merged
merged 13 commits into from
Mar 17, 2023
2 changes: 2 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ go_library(
"bind.go",
"brie.go",
"builder.go",
"calibrate_resource.go",
"change.go",
"checksum.go",
"compact_table.go",
Expand Down Expand Up @@ -274,6 +275,7 @@ go_test(
"batch_point_get_test.go",
"benchmark_test.go",
"brie_test.go",
"calibrate_resource_test.go",
"charset_test.go",
"chunk_size_control_test.go",
"cluster_table_test.go",
Expand Down
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,8 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
}
}
}
case *ast.CalibrateResourceStmt:
return b.buildCalibrateResource(v.Schema())
}
base := newBaseExecutor(b.ctx, v.Schema(), v.ID())
base.initCap = chunk.ZeroCapacity
Expand Down
183 changes: 183 additions & 0 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
"strconv"
"strings"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/sqlexec"
)

// workloadBaseRUCostMap contains the base resource cost per 1 kv cpu,
// the data is calculated by the benchmark result.
var workloadBaseRUCostMap = map[string]*baseResourceCost{
"tpcc": {
tidbCPU: 0.6,
kvCPU: 0.15,
readBytes: units.MiB / 2, // 0.5MiB
writeBytes: units.MiB, // 1MiB
readReqCount: 300,
writeReqCount: 1750,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it means 1 core can provide 1750 request in here? maybe add more comments.

Copy link
Contributor Author

@glorv glorv Mar 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It is based on benchmark result. I added comment on the baseResourceCost struct

},
}

func (b *executorBuilder) buildCalibrateResource(schema *expression.Schema) Executor {
return &calibrateResourceExec{
baseExecutor: newBaseExecutor(b.ctx, schema, 0),
}
}

type calibrateResourceExec struct {
baseExecutor
done bool
}

func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.done {
return nil
}
e.done = true

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)

// first fetch the ru settings config.
ruCfg, err := getRUSettings(ctx, exec)
if err != nil {
return err
}

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
if err != nil {
return err
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
if err != nil {
return err
}

workload := "tpcc"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about using a const or defined type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

baseCost, ok := workloadBaseRUCostMap[workload]
if !ok {
return errors.Errorf("unknown workload '%s'", workload)
}

if totalTiDBCPU/baseCost.tidbCPU < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbCPU
}
ruPerKVCPU := ruCfg.readBaseCost*float64(baseCost.readReqCount) +
ruCfg.readCostCPU*baseCost.kvCPU +
ruCfg.readCostPerByte*float64(baseCost.readBytes) +
ruCfg.writeBaseCost*float64(baseCost.writeReqCount) +
ruCfg.writeCostPerByte*float64(baseCost.writeBytes)
quota := totalKVCPUQuota * ruPerKVCPU
req.AppendUint64(0, uint64(quota))

return nil
}

// the resource cost of a specified workload per 1 tikv cpu
type baseResourceCost struct {
tidbCPU float64
// the kv CPU time for calculate RU, it's smaller than the actually cpu usage.
kvCPU float64
readBytes uint64
writeBytes uint64
readReqCount uint64
writeReqCount uint64
}

type ruConfig struct {
readBaseCost float64
writeBaseCost float64
readCostCPU float64
readCostPerByte float64
writeCostPerByte float64
}

func getRUSettings(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (*ruConfig, error) {
rows, fields, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, "SHOW CONFIG WHERE TYPE = 'pd' AND name like 'request_unit.%'")
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, errors.New("PD request-unit config not found")
}
var nameIdx, valueIdx int
for i, f := range fields {
switch f.ColumnAsName.L {
case "instance":
//instanceIdx = i
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please clean it

case "name":
nameIdx = i
case "value":
valueIdx = i
}
}

cfg := &ruConfig{}
for _, row := range rows {
val, err := strconv.ParseFloat(row.GetString(valueIdx), 64)
if err != nil {
return nil, errors.Trace(err)
}
name, _ := strings.CutPrefix(row.GetString(nameIdx), "request-unit.")

switch name {
case "read-base-cost":
cfg.readBaseCost = val
case "read-cost-per-byte":
cfg.readCostPerByte = val
case "read-cpu-ms-cost":
cfg.readCostCPU = val
case "write-base-cost":
cfg.writeBaseCost = val
case "write-cost-per-byte":
cfg.writeCostPerByte = val
}
}

return cfg, nil
}

func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota")
}

func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
}

func getNumberFromMetrics(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, query, metrics string) (float64, error) {
rows, _, err := exec.ExecRestrictedSQL(ctx, []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession}, query)
if err != nil {
return 0.0, errors.Trace(err)
}
if len(rows) == 0 {
return 0.0, errors.Errorf("metrics '%s' is empty", metrics)
}

return rows[0].GetFloat64(0), nil
}
105 changes: 105 additions & 0 deletions executor/calibrate_resource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor_test

import (
"context"
"testing"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
)

func TestCalibrateResource(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

var confItems [][]types.Datum
var confErr error
var confFunc executor.TestShowClusterConfigFunc = func() ([][]types.Datum, error) {
return confItems, confErr
}
tk.Session().SetValue(executor.TestShowClusterConfigKey, confFunc)
strs2Items := func(strs ...string) []types.Datum {
items := make([]types.Datum, 0, len(strs))
for _, s := range strs {
items = append(items, types.NewStringDatum(s))
}
return items
}

// empty requet-unit config error
rs, err := tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "PD request-unit config not found")

confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.read-base-cost", "0.25"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.read-cost-per-byte", "0.0000152587890625"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.read-cpu-ms-cost", "0.3333333333333333"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.write-base-cost", "1"))
confItems = append(confItems, strs2Items("pd", "127.0.0.1:2379", "request-unit.write-cost-per-byte", "0.0009765625"))

// empty metrics error
rs, err = tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "query metric error: pd unavailable")

// Mock for metric table data.
fpName := "github.com/pingcap/tidb/executor/mockMetricsTableData"
require.NoError(t, failpoint.Enable(fpName, "return"))
defer func() {
require.NoError(t, failpoint.Disable(fpName))
}()

datetime := func(s string) types.Time {
time, err := types.ParseTime(tk.Session().GetSessionVars().StmtCtx, s, mysql.TypeDatetime, types.MaxFsp, nil)
require.NoError(t, err)
return time
}

mockData := map[string][][]types.Datum{
"tikv_cpu_quota": {
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-0", 8.0),
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-1", 8.0),
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-2", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-0", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-1", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-2", 8.0),
},
"tidb_server_maxprocs": {
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 40.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tidb-0", 40.0),
},
}
ctx := context.WithValue(context.Background(), "__mockMetricsTableData", mockData)
ctx = failpoint.WithHook(ctx, func(_ context.Context, fpname string) bool {
return fpName == fpname
})
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("68569"))

// change total tidb cpu to less than tikv_cpu_quota
mockData["tidb_server_maxprocs"] = [][]types.Datum{
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 8.0),
}
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("38094"))
}
10 changes: 10 additions & 0 deletions infoschema/metric_table_def.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,11 @@ var MetricTableMap = map[string]MetricTableDef{
Labels: []string{"instance", "type"},
Comment: "The available or capacity size of each TiKV instance",
},
"tikv_cpu_quota": {
PromQL: "tikv_server_cpu_cores_quota{$LABEL_CONDITIONS}",
Labels: []string{"instance"},
Comment: "The Total CPU quota of each TiKV instance",
},
"tikv_thread_cpu": {
PromQL: `sum(rate(tikv_thread_cpu_seconds_total{$LABEL_CONDITIONS}[$RANGE_DURATION])) by (instance,name)`,
Labels: []string{"instance", "name"},
Expand Down Expand Up @@ -2713,6 +2718,11 @@ var MetricTableMap = map[string]MetricTableDef{
Labels: []string{"instance", "type", "sql_type"},
Comment: "The total time of transaction execution durations, including retry(second)",
},
"tidb_server_maxprocs": {
PromQL: "tidb_server_maxprocs{$LABEL_CONDITIONS}",
Labels: []string{"instance"},
Comment: "The Total CPU quota of each TiDB instance",
},
"tikv_raftstore_append_log_total_count": {
PromQL: "sum(increase(tikv_raftstore_append_log_duration_seconds_count{$LABEL_CONDITIONS}[$RANGE_DURATION])) by (instance)",
Labels: []string{"instance"},
Expand Down
21 changes: 21 additions & 0 deletions parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3649,3 +3649,24 @@ func (n *SetResourceGroupStmt) Accept(v Visitor) (Node, bool) {
n = newNode.(*SetResourceGroupStmt)
return v.Leave(n)
}

// CalibrateResourceStmt is a statement to fetch the cluster RU capacity
type CalibrateResourceStmt struct {
stmtNode
}

// Restore implements Node interface.
func (n *CalibrateResourceStmt) Restore(ctx *format.RestoreCtx) error {
ctx.WriteKeyWord("CALIBRATE RESOURCE")
return nil
}

// Accept implements Node Accept interface.
func (n *CalibrateResourceStmt) Accept(v Visitor) (Node, bool) {
newNode, skipChildren := v.Enter(n)
if skipChildren {
return v.Leave(newNode)
}
n = newNode.(*CalibrateResourceStmt)
return v.Leave(n)
}
1 change: 1 addition & 0 deletions parser/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ var tokenMap = map[string]int{
"BY": by,
"BYTE": byteType,
"CACHE": cache,
"CALIBRATE": calibrate,
"CALL": call,
"CANCEL": cancel,
"CAPTURE": capture,
Expand Down
Loading