diff --git a/executor/calibrate_resource.go b/executor/calibrate_resource.go index a9a1f05b3e1be..600f1f89a482e 100644 --- a/executor/calibrate_resource.go +++ b/executor/calibrate_resource.go @@ -15,14 +15,22 @@ package executor import ( + "bufio" "context" + "encoding/base64" "fmt" + "io" "math" + "net/http" + "runtime" "sort" + "strconv" + "strings" "time" "github.com/docker/go-units" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" @@ -30,6 +38,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn/staleread" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/sqlexec" @@ -121,6 +130,13 @@ const ( // duration Indicates the supported calibration duration maxDuration = time.Hour * 24 minDuration = time.Minute + + // serverTypeTiDB is tidb's instance type name + serverTypeTiDB = "tidb" + // serverTypeTiKV is tikv's instance type name + serverTypeTiKV = "tikv" + // serverTypeTiFlash is tiflash's instance type name + serverTypeTiFlash = "tiflash" ) type calibrateResourceExec struct { @@ -187,13 +203,14 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro return nil } e.done = true - - exec := e.ctx.(sqlexec.RestrictedSQLExecutor) + if !variable.EnableResourceControl.Load() { + return infoschema.ErrResourceGroupSupportDisabled + } ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers) if len(e.optionList) > 0 { - return e.dynamicCalibrate(ctx, req, exec) + return e.dynamicCalibrate(ctx, req) } - return e.staticCalibrate(ctx, req, exec) + return e.staticCalibrate(req) } var ( @@ -201,19 +218,24 @@ var ( errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v") ) -func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { +func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk) error { + exec := e.ctx.(sqlexec.RestrictedSQLExecutor) startTs, endTs, err := e.parseCalibrateDuration(ctx) if err != nil { return err } + serverInfos, err := infoschema.GetClusterServerInfo(e.ctx) + if err != nil { + return err + } startTime := startTs.In(e.ctx.GetSessionVars().Location()).Format("2006-01-02 15:04:05") endTime := endTs.In(e.ctx.GetSessionVars().Location()).Format("2006-01-02 15:04:05") - totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) + totalKVCPUQuota, err := getTiKVTotalCPUQuota(serverInfos) if err != nil { return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } - totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) + totalTiDBCPU, err := getTiDBTotalCPUQuota(serverInfos) if err != nil { return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -277,20 +299,21 @@ func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk return nil } -func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { - if !variable.EnableResourceControl.Load() { - return infoschema.ErrResourceGroupSupportDisabled - } +func (e *calibrateResourceExec) staticCalibrate(req *chunk.Chunk) error { // first fetch the ru settings config. if resourceGroupCtl == nil { return errors.New("resource group controller is not initialized") } + clusterInfo, err := infoschema.GetClusterServerInfo(e.ctx) + if err != nil { + return err + } - totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) + totalKVCPUQuota, err := getTiKVTotalCPUQuota(clusterInfo) if err != nil { return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } - totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) + totalTiDBCPUQuota, err := getTiDBTotalCPUQuota(clusterInfo) if err != nil { return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -304,8 +327,8 @@ func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk. return errors.Errorf("unknown workload '%T'", e.workloadType) } - if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota { - totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio + if totalTiDBCPUQuota/baseCost.tidbToKVCPURatio < totalKVCPUQuota { + totalKVCPUQuota = totalTiDBCPUQuota / baseCost.tidbToKVCPURatio } ruCfg := resourceGroupCtl.GetConfig() ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) + @@ -318,14 +341,27 @@ func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk. return nil } -func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { - query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1" - return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota") +func getTiDBTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) { + cpuQuota := float64(runtime.GOMAXPROCS(0)) + failpoint.Inject("mockGOMAXPROCS", func(val failpoint.Value) { + if val != nil { + cpuQuota = float64(val.(int)) + } + }) + instanceNum := count(clusterInfo, serverTypeTiDB) + return cpuQuota * float64(instanceNum), nil } -func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { - query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1" - return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs") +func getTiKVTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) { + instanceNum := count(clusterInfo, serverTypeTiKV) + if instanceNum == 0 { + return 0.0, errors.New("no server with type 'tikv' is found") + } + cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiKV, "tikv_server_cpu_cores_quota") + if err != nil { + return 0.0, err + } + return cpuQuota * float64(instanceNum), nil } type timePointValue struct { @@ -403,3 +439,94 @@ func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sql } return &timeSeriesValues{idx: 0, vals: ret}, nil } + +func count(clusterInfo []infoschema.ServerInfo, ty string) int { + num := 0 + for _, e := range clusterInfo { + if e.ServerType == ty { + num++ + } + } + return num +} + +func fetchServerCPUQuota(serverInfos []infoschema.ServerInfo, serverType string, metricName string) (float64, error) { + var cpuQuota float64 + err := fetchStoreMetrics(serverInfos, serverType, func(addr string, resp *http.Response) error { + if resp.StatusCode != http.StatusOK { + return errors.Errorf("request %s failed: %s", addr, resp.Status) + } + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, metricName) { + continue + } + // the metrics format is like following: + // tikv_server_cpu_cores_quota 8 + quota, err := strconv.ParseFloat(line[len(metricName)+1:], 64) + if err == nil { + cpuQuota = quota + } + return errors.Trace(err) + } + return errors.Errorf("metrics '%s' not found from server '%s'", metricName, addr) + }) + return cpuQuota, err +} + +func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, onResp func(string, *http.Response) error) error { + var firstErr error + for _, srv := range serversInfo { + if srv.ServerType != serverType { + continue + } + if len(srv.StatusAddr) == 0 { + continue + } + url := fmt.Sprintf("%s://%s/metrics", util.InternalHTTPSchema(), srv.StatusAddr) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return err + } + var resp *http.Response + failpoint.Inject("mockMetricsResponse", func(val failpoint.Value) { + if val != nil { + data, _ := base64.StdEncoding.DecodeString(val.(string)) + resp = &http.Response{ + StatusCode: http.StatusOK, + Body: noopCloserWrapper{ + Reader: strings.NewReader(string(data)), + }, + } + } + }) + if resp == nil { + var err1 error + // ignore false positive go line, can't use defer here because it's in a loop. + //nolint:bodyclose + resp, err1 = util.InternalHTTPClient().Do(req) + if err1 != nil { + if firstErr == nil { + firstErr = err1 + } + continue + } + } + err = onResp(srv.Address, resp) + resp.Body.Close() + return err + } + if firstErr == nil { + firstErr = errors.Errorf("no server with type '%s' is found", serverType) + } + return firstErr +} + +type noopCloserWrapper struct { + io.Reader +} + +func (noopCloserWrapper) Close() error { + return nil +} diff --git a/executor/calibrate_resource_test.go b/executor/calibrate_resource_test.go index 51380e0a29013..86c60ee7da458 100644 --- a/executor/calibrate_resource_test.go +++ b/executor/calibrate_resource_test.go @@ -16,7 +16,9 @@ package executor_test import ( "context" + "encoding/base64" "encoding/json" + "strings" "testing" "github.com/pingcap/errors" @@ -76,12 +78,27 @@ func TestCalibrateResource(t *testing.T) { require.NoError(t, err) require.NotNil(t, rs) err = rs.Next(context.Background(), rs.NewChunk(nil)) - require.ErrorContains(t, err, "query metric error: pd unavailable") + require.ErrorContains(t, err, "no server with type 'tikv' is found") // error sql _, err = tk.Exec("CALIBRATE RESOURCE WORKLOAD tpcc START_TIME '2020-02-12 10:35:00'") require.Error(t, err) + // Mock for cluster info + // information_schema.cluster_config + instances := []string{ + "pd,127.0.0.1:32379,127.0.0.1:32380,mock-version,mock-githash,0", + "tidb,127.0.0.1:34000,30080,mock-version,mock-githash,1001", + "tikv,127.0.0.1:30160,30180,mock-version,mock-githash,0", + "tikv,127.0.0.1:30161,30181,mock-version,mock-githash,0", + "tikv,127.0.0.1:30162,30182,mock-version,mock-githash,0", + } + fpExpr := `return("` + strings.Join(instances, ";") + `")` + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/infoschema/mockClusterInfo", fpExpr)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/infoschema/mockClusterInfo")) + }() + // Mock for metric table data. fpName := "github.com/pingcap/tidb/executor/mockMetricsTableData" require.NoError(t, failpoint.Enable(fpName, "return")) @@ -95,30 +112,28 @@ func TestCalibrateResource(t *testing.T) { return time } + metricsData := `# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds. +# TYPE process_cpu_seconds_total counter +process_cpu_seconds_total 49943 +# HELP tikv_server_cpu_cores_quota Total CPU cores quota for TiKV server +# TYPE tikv_server_cpu_cores_quota gauge +tikv_server_cpu_cores_quota 8 +` + // failpoint doesn't support string contains whitespaces and newline + encodedData := base64.StdEncoding.EncodeToString([]byte(metricsData)) + fpExpr = `return("` + encodedData + `")` + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockMetricsResponse", fpExpr)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockGOMAXPROCS", "return(40)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockGOMAXPROCS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockMetricsResponse")) + }() mockData := make(map[string][][]types.Datum) ctx := context.WithValue(context.Background(), "__mockMetricsTableData", mockData) ctx = failpoint.WithHook(ctx, func(_ context.Context, fpname string) bool { return fpName == fpname }) - rs, err = tk.Exec("CALIBRATE RESOURCE") - require.NoError(t, err) - require.NotNil(t, rs) - err = rs.Next(ctx, rs.NewChunk(nil)) - // because when mock metrics is empty, error is always `pd unavailable`, don't check detail. - require.ErrorContains(t, err, "There is no CPU quota metrics, query metric error: pd unavailable") - - mockData["tikv_cpu_quota"] = [][]types.Datum{ - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-0", 8.0), - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-1", 8.0), - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-2", 8.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-0", 8.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-1", 8.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-2", 8.0), - } - mockData["tidb_server_maxprocs"] = [][]types.Datum{ - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 40.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tidb-0", 40.0), - } + tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("69768")) tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD TPCC").Check(testkit.Rows("69768")) tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_READ_WRITE").Check(testkit.Rows("55823")) @@ -126,9 +141,7 @@ func TestCalibrateResource(t *testing.T) { tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_WRITE_ONLY").Check(testkit.Rows("109776")) // change total tidb cpu to less than tikv_cpu_quota - mockData["tidb_server_maxprocs"] = [][]types.Datum{ - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 8.0), - } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockGOMAXPROCS", "return(8)")) tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("38760")) // construct data for dynamic calibrate