From 4c02107af70d9896e3713dfefca98112b8818ae7 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 5 Dec 2023 13:08:55 +0800 Subject: [PATCH 1/9] fetch metrics from store instead of prometheus --- .../calibrateresource/calibrate_resource.go | 209 +++++++++++++++--- .../calibrate_resource_test.go | 96 ++++---- 2 files changed, 224 insertions(+), 81 deletions(-) diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource.go b/pkg/executor/internal/calibrateresource/calibrate_resource.go index 1c8ddcd76fa82..f86aa46ce344c 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource.go @@ -15,10 +15,17 @@ package calibrateresource import ( + "bufio" "context" + "encoding/base64" "fmt" + "io" "math" + "net/http" + "runtime" "sort" + "strconv" + "strings" "time" "github.com/docker/go-units" @@ -31,9 +38,11 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/duration" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn/staleread" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/tikv/client-go/v2/oracle" @@ -81,6 +90,15 @@ var ( } ) +const ( + // serverTypeTiDB is tidb's instance type name + serverTypeTiDB = "tidb" + // serverTypeTiDB is tikv's instance type name + serverTypeTiKV = "tikv" + // serverTypeTiDB is tiflash's instance type name + serverTypeTiFlash = "tiflash" +) + // the resource cost rate of a specified workload per 1 tikv cpu. type baseResourceCost struct { // represents the average ratio of TiDB CPU time to TiKV CPU time, this is used to calculate whether tikv cpu @@ -235,13 +253,14 @@ func (e *Executor) Next(ctx context.Context, req *chunk.Chunk) error { return nil } e.done = true - - exec := e.Ctx().(sqlexec.RestrictedSQLExecutor) + if !variable.EnableResourceControl.Load() { + return infoschema.ErrResourceGroupSupportDisabled + } ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers) if len(e.OptionList) > 0 { - return e.dynamicCalibrate(ctx, req, exec) + return e.dynamicCalibrate(ctx, req) } - return e.staticCalibrate(ctx, req, exec) + return e.staticCalibrate(ctx, req) } var ( @@ -249,29 +268,40 @@ var ( errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v") ) -func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { +func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk) error { + exec := e.Ctx().(sqlexec.RestrictedSQLExecutor) startTs, endTs, err := e.parseCalibrateDuration(ctx) if err != nil { return err } - tidbQuota, err1 := e.getTiDBQuota(ctx, exec, startTs, endTs) - tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, startTs, endTs) + clusterInfo, err := infoschema.GetClusterServerInfo(e.Ctx()) + if err != nil { + return err + } + tidbQuota, err1 := e.getTiDBQuota(ctx, exec, clusterInfo, startTs, endTs) + tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, clusterInfo, startTs, endTs) if err1 != nil && err2 != nil { return err1 } + req.AppendUint64(0, uint64(tidbQuota+tiflashQuota)) return nil } -func (e *Executor) getTiDBQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) { +func (e *Executor) getTiDBQuota( + ctx context.Context, + exec sqlexec.RestrictedSQLExecutor, + serverInfos []infoschema.ServerInfo, + startTs, endTs time.Time, +) (float64, error) { startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) - totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) + totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, serverInfos) if err != nil { return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } - totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) + totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, serverInfos) if err != nil { return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -368,12 +398,17 @@ func setupQuotas(quotas []float64) (float64, error) { return sum / float64(upperBound-lowerBound), nil } -func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) { +func (e *Executor) getTiFlashQuota( + ctx context.Context, + exec sqlexec.RestrictedSQLExecutor, + serverInfos []infoschema.ServerInfo, + startTs, endTs time.Time, +) (float64, error) { startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) quotas := make([]float64, 0) - totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec) + totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, serverInfos) if err != nil { return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -407,25 +442,26 @@ func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedS return setupQuotas(quotas) } -func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { - if !variable.EnableResourceControl.Load() { - return infoschema.ErrResourceGroupSupportDisabled - } +func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk) error { resourceGroupCtl := domain.GetDomain(e.Ctx()).ResourceGroupsController() // first fetch the ru settings config. if resourceGroupCtl == nil { return errors.New("resource group controller is not initialized") } + clusterInfo, err := infoschema.GetClusterServerInfo(e.Ctx()) + if err != nil { + return err + } ruCfg := resourceGroupCtl.GetConfig() if e.WorkloadType == ast.TPCH10 { - return staticCalibrateTpch10(ctx, req, exec, ruCfg) + return staticCalibrateTpch10(ctx, req, clusterInfo, ruCfg) } - totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) + totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, clusterInfo) if err != nil { return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } - totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) + totalTiDBCPUQuota, err := getTiDBTotalCPUQuota(ctx, clusterInfo) if err != nil { return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -439,8 +475,8 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s return errors.Errorf("unknown workload '%T'", e.WorkloadType) } - if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota { - totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio + if totalTiDBCPUQuota/baseCost.tidbToKVCPURatio < totalKVCPUQuota { + totalKVCPUQuota = totalTiDBCPUQuota / baseCost.tidbToKVCPURatio } ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) + float64(ruCfg.CPUMsCost)*baseCost.kvCPU*1000 + // convert to ms @@ -452,14 +488,14 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s return nil } -func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor, ruCfg *resourceControlClient.RUConfig) error { +func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, clusterInfo []infoschema.ServerInfo, ruCfg *resourceControlClient.RUConfig) error { // TPCH10 only considers the resource usage of the TiFlash including cpu and read bytes. Others are ignored. // cpu usage: 105494.666484 / 20 / 20 = 263.74 // read bytes: 401799161689.0 / 20 / 20 = 1004497904.22 const cpuTimePerCPUPerSec float64 = 263.74 const readBytesPerCPUPerSec float64 = 1004497904.22 ruPerCPU := float64(ruCfg.CPUMsCost)*cpuTimePerCPUPerSec + float64(ruCfg.ReadBytesCost)*readBytesPerCPUPerSec - totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec) + totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, clusterInfo) if err != nil { return err } @@ -468,19 +504,39 @@ func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.R return nil } -func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { - query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1" - return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota") +func getTiDBTotalCPUQuota(ctx context.Context, clusterInfo []infoschema.ServerInfo) (float64, error) { + cpuQuota := float64(runtime.GOMAXPROCS(0)) + failpoint.Inject("mockGOMAXPROCS", func(val failpoint.Value) { + if val != nil { + cpuQuota = float64(val.(int)) + } + }) + instanceNum := count(clusterInfo, serverTypeTiDB) + return cpuQuota * float64(instanceNum), nil } -func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { - query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1" - return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs") +func getTiKVTotalCPUQuota(ctx context.Context, clusterInfo []infoschema.ServerInfo) (float64, error) { + instanceNum := count(clusterInfo, serverTypeTiKV) + if instanceNum == 0 { + return 0.0, errors.New("no server with type 'tikv' is found") + } + cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiKV, "tikv_server_cpu_cores_quota") + if err != nil { + return 0.0, err + } + return cpuQuota * float64(instanceNum), nil } -func getTiFlashLogicalCores(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { - query := "SELECT SUM(value) FROM METRICS_SCHEMA.tiflash_cpu_quota GROUP BY time ORDER BY time desc limit 1" - return getNumberFromMetrics(ctx, exec, query, "tiflash_cpu_quota") +func getTiFlashLogicalCores(ctx context.Context, clusterInfo []infoschema.ServerInfo) (float64, error) { + instanceNum := count(clusterInfo, serverTypeTiFlash) + if instanceNum == 0 { + return 0.0, nil + } + cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiFlash, "tiflash_proxy_tikv_server_cpu_cores_quota") + if err != nil { + return 0.0, err + } + return cpuQuota * float64(instanceNum), nil } func getTiFlashRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) { @@ -568,3 +624,92 @@ func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sql } return &timeSeriesValues{idx: 0, vals: ret}, nil } + +func count(clusterInfo []infoschema.ServerInfo, ty string) int { + num := 0 + for _, e := range clusterInfo { + if e.ServerType == ty { + num++ + } + } + return num +} + +func fetchServerCPUQuota(serverInfos []infoschema.ServerInfo, serverType string, metricName string) (float64, error) { + var cpuQuota float64 + err := fetchStoreMetrics(serverInfos, serverType, func(addr string, reader io.Reader) error { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, metricName) { + continue + } + // the metrics format is like following: + // tikv_server_cpu_cores_quota 8 + quota, err := strconv.ParseFloat(line[len(metricName)+1:], 64) + if err == nil { + cpuQuota = quota + } + return errors.Trace(err) + } + return errors.Errorf("metrics '%s' not found from server '%s'", metricName, addr) + }) + return cpuQuota, err +} + +func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, onResp func(string, io.Reader) error) error { + var firstErr error + for _, srv := range serversInfo { + if srv.ServerType != serverType { + continue + } + if len(srv.StatusAddr) == 0 { + continue + } + url := fmt.Sprintf("%s://%s/metrics", util.InternalHTTPSchema(), srv.StatusAddr) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return err + } + var resp *http.Response + failpoint.Inject("mockMetricsResponse", func(val failpoint.Value) { + if val != nil { + data, _ := base64.StdEncoding.DecodeString(val.(string)) + resp = &http.Response{ + StatusCode: http.StatusOK, + Body: dummyReaderCloser{ + Reader: strings.NewReader(string(data)), + }, + } + } + }) + if resp == nil { + var err1 error + resp, err1 = util.InternalHTTPClient().Do(req) + if err1 != nil { + firstErr = err1 + continue + } + defer func() { + terror.Log(resp.Body.Close()) + }() + } + + if resp.StatusCode != http.StatusOK { + return errors.Errorf("request %s failed: %s", url, resp.Status) + } + return onResp(srv.Address, resp.Body) + } + if firstErr == nil { + firstErr = errors.Errorf("no server with type '%s' is found", serverType) + } + return firstErr +} + +type dummyReaderCloser struct { + io.Reader +} + +func (r dummyReaderCloser) Close() error { + return nil +} diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource_test.go b/pkg/executor/internal/calibrateresource/calibrate_resource_test.go index 6fb9467454c7e..c0030b004ca08 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource_test.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource_test.go @@ -17,7 +17,9 @@ package calibrateresource_test import ( "bytes" "context" + "encoding/base64" "encoding/json" + "strings" "testing" "time" @@ -73,12 +75,27 @@ func TestCalibrateResource(t *testing.T) { require.NoError(t, err) require.NotNil(t, rs) err = rs.Next(context.Background(), rs.NewChunk(nil)) - require.ErrorContains(t, err, "query metric error: pd unavailable") + require.ErrorContains(t, err, "no server with type 'tikv' is found") // error sql _, err = tk.Exec("CALIBRATE RESOURCE WORKLOAD tpcc START_TIME '2020-02-12 10:35:00'") require.Error(t, err) + // Mock for cluster info + // information_schema.cluster_config + instances := []string{ + "pd,127.0.0.1:32379,127.0.0.1:32380,mock-version,mock-githash,0", + "tidb,127.0.0.1:34000,30080,mock-version,mock-githash,1001", + "tikv,127.0.0.1:30160,30180,mock-version,mock-githash,0", + "tikv,127.0.0.1:30161,30181,mock-version,mock-githash,0", + "tikv,127.0.0.1:30162,30182,mock-version,mock-githash,0", + } + fpExpr := `return("` + strings.Join(instances, ";") + `")` + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo", fpExpr)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo")) + }() + // Mock for metric table data. fpName := "github.com/pingcap/tidb/pkg/executor/mockMetricsTableData" require.NoError(t, failpoint.Enable(fpName, "return")) @@ -99,30 +116,34 @@ func TestCalibrateResource(t *testing.T) { return time } + metricsData := `# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds. +# TYPE process_cpu_seconds_total counter +process_cpu_seconds_total 49943 +# HELP tikv_server_cpu_cores_quota Total CPU cores quota for TiKV server +# TYPE tikv_server_cpu_cores_quota gauge +tikv_server_cpu_cores_quota 8 +# HELP tiflash_proxy_tikv_scheduler_write_flow The write flow passed through at scheduler level. +# TYPE tiflash_proxy_tikv_scheduler_write_flow gauge +tiflash_proxy_tikv_scheduler_write_flow 0 +# HELP tiflash_proxy_tikv_server_cpu_cores_quota Total CPU cores quota for TiKV server +# TYPE tiflash_proxy_tikv_server_cpu_cores_quota gauge +tiflash_proxy_tikv_server_cpu_cores_quota 20 +` + // failpoint doesn't support string contains whitespaces and newline + encodedData := base64.StdEncoding.EncodeToString([]byte(metricsData)) + fpExpr = `return("` + encodedData + `")` + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/internal/calibrateresource/mockMetricsResponse", fpExpr)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/internal/calibrateresource/mockGOMAXPROCS", "return(40)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/internal/calibrateresource/mockGOMAXPROCS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/internal/calibrateresource/mockMetricsResponse")) + }() mockData := make(map[string][][]types.Datum) ctx := context.WithValue(context.Background(), "__mockMetricsTableData", mockData) ctx = failpoint.WithHook(ctx, func(_ context.Context, fpname string) bool { return fpName == fpname }) - rs, err = tk.Exec("CALIBRATE RESOURCE") - require.NoError(t, err) - require.NotNil(t, rs) - err = rs.Next(ctx, rs.NewChunk(nil)) - // because when mock metrics is empty, error is always `pd unavailable`, don't check detail. - require.ErrorContains(t, err, "There is no CPU quota metrics, query metric error: pd unavailable") - - mockData["tikv_cpu_quota"] = [][]types.Datum{ - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-0", 8.0), - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-1", 8.0), - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-2", 8.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-0", 8.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-1", 8.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-2", 8.0), - } - mockData["tidb_server_maxprocs"] = [][]types.Datum{ - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 40.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tidb-0", 40.0), - } + tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("69768")) tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD TPCC").Check(testkit.Rows("69768")) tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_READ_WRITE").Check(testkit.Rows("55823")) @@ -130,9 +151,7 @@ func TestCalibrateResource(t *testing.T) { tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_WRITE_ONLY").Check(testkit.Rows("109776")) // change total tidb cpu to less than tikv_cpu_quota - mockData["tidb_server_maxprocs"] = [][]types.Datum{ - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 8.0), - } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/internal/calibrateresource/mockGOMAXPROCS", "return(8)")) tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("38760")) ru1 := [][]types.Datum{ @@ -707,19 +726,11 @@ func TestCalibrateResource(t *testing.T) { types.MakeDatums(datetime("2023-09-19 20:00:39.329000"), "127.0.0.1:10080", 20.0), } - mockData["tikv_cpu_quota"] = [][]types.Datum{ - types.MakeDatums(datetime("2023-09-19 19:50:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:51:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:52:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:53:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:54:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:55:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:56:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:57:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:58:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:59:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 20:00:39.330000"), "127.0.0.1:20180", 20.0), - } + // change mock for cluster info, add tiflash + instances = append(instances, "tiflash,127.0.0.1:3930,33940,mock-version,mock-githash,0") + fpExpr = `return("` + strings.Join(instances, ";") + `")` + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo", fpExpr)) + rs, err = tk.Exec("CALIBRATE RESOURCE START_TIME '2023-09-19 19:50:39' DURATION '10m'") require.NoError(t, err) require.NotNil(t, rs) @@ -754,19 +765,6 @@ func TestCalibrateResource(t *testing.T) { types.MakeDatums(datetime("2023-09-19 20:00:39.318000"), 659167.0340155548), } - mockData["tiflash_cpu_quota"] = [][]types.Datum{ - types.MakeDatums(datetime("2023-09-19 19:50:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:51:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:52:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:53:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:54:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:55:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:56:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:57:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:58:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:59:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 20:00:39.502000"), "127.0.0.1:8234 ", 20.0), - } tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE START_TIME '2023-09-19 19:50:39' DURATION '10m'").Check(testkit.Rows("729439")) delete(mockData, "process_cpu_usage") From ba500bff668b253874ab123ce46db47fdc9cfe36 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 5 Dec 2023 14:07:12 +0800 Subject: [PATCH 2/9] fix --- .../internal/calibrateresource/calibrate_resource.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource.go b/pkg/executor/internal/calibrateresource/calibrate_resource.go index f86aa46ce344c..bfa06962add93 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource.go @@ -38,7 +38,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/duration" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn/staleread" @@ -690,9 +689,7 @@ func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, o firstErr = err1 continue } - defer func() { - terror.Log(resp.Body.Close()) - }() + defer resp.Body.Close() } if resp.StatusCode != http.StatusOK { From 8fdba2c67f19e2f012ff141b9ff39df8c670f280 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 5 Dec 2023 15:51:34 +0800 Subject: [PATCH 3/9] fix build --- pkg/executor/internal/calibrateresource/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/executor/internal/calibrateresource/BUILD.bazel b/pkg/executor/internal/calibrateresource/BUILD.bazel index 579adff6fceab..c7ecb061ff33d 100644 --- a/pkg/executor/internal/calibrateresource/BUILD.bazel +++ b/pkg/executor/internal/calibrateresource/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/sessiontxn/staleread", + "//pkg/util", "//pkg/util/chunk", "//pkg/util/sqlexec", "@com_github_docker_go_units//:go-units", From 73334eece6279cc7b7d06dc491baaf276da9952d Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 5 Dec 2023 17:58:34 +0800 Subject: [PATCH 4/9] change name --- .../internal/calibrateresource/calibrate_resource.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource.go b/pkg/executor/internal/calibrateresource/calibrate_resource.go index bfa06962add93..1c5927643c3ed 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource.go @@ -103,7 +103,7 @@ type baseResourceCost struct { // represents the average ratio of TiDB CPU time to TiKV CPU time, this is used to calculate whether tikv cpu // or tidb cpu is the performance bottle neck. tidbToKVCPURatio float64 - // the kv CPU time for calculate RU, it's smaller than the actual cpu usage. The unit is seconds. + // the kv CPU time for calcula[te R]U, it's smaller than the actual cpu usage. The unit is seconds. kvCPU float64 // the read bytes rate per 1 tikv cpu. readBytes uint64 @@ -676,7 +676,7 @@ func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, o data, _ := base64.StdEncoding.DecodeString(val.(string)) resp = &http.Response{ StatusCode: http.StatusOK, - Body: dummyReaderCloser{ + Body: noopCloserWrapper{ Reader: strings.NewReader(string(data)), }, } @@ -703,10 +703,10 @@ func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, o return firstErr } -type dummyReaderCloser struct { +type noopCloserWrapper struct { io.Reader } -func (r dummyReaderCloser) Close() error { +func (r noopCloserWrapper) Close() error { return nil } From 98325df5c8f63b8263874f265807920f6d3eae02 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 5 Dec 2023 18:20:50 +0800 Subject: [PATCH 5/9] fix lint --- .../calibrateresource/calibrate_resource.go | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource.go b/pkg/executor/internal/calibrateresource/calibrate_resource.go index 1c5927643c3ed..c41b5499abb9a 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource.go @@ -259,7 +259,7 @@ func (e *Executor) Next(ctx context.Context, req *chunk.Chunk) error { if len(e.OptionList) > 0 { return e.dynamicCalibrate(ctx, req) } - return e.staticCalibrate(ctx, req) + return e.staticCalibrate(req) } var ( @@ -296,11 +296,11 @@ func (e *Executor) getTiDBQuota( startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) - totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, serverInfos) + totalKVCPUQuota, err := getTiKVTotalCPUQuota(serverInfos) if err != nil { return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } - totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, serverInfos) + totalTiDBCPU, err := getTiDBTotalCPUQuota(serverInfos) if err != nil { return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -407,7 +407,7 @@ func (e *Executor) getTiFlashQuota( endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) quotas := make([]float64, 0) - totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, serverInfos) + totalTiFlashLogicalCores, err := getTiFlashLogicalCores(serverInfos) if err != nil { return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -441,7 +441,7 @@ func (e *Executor) getTiFlashQuota( return setupQuotas(quotas) } -func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk) error { +func (e *Executor) staticCalibrate(req *chunk.Chunk) error { resourceGroupCtl := domain.GetDomain(e.Ctx()).ResourceGroupsController() // first fetch the ru settings config. if resourceGroupCtl == nil { @@ -453,14 +453,14 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk) error } ruCfg := resourceGroupCtl.GetConfig() if e.WorkloadType == ast.TPCH10 { - return staticCalibrateTpch10(ctx, req, clusterInfo, ruCfg) + return staticCalibrateTpch10(req, clusterInfo, ruCfg) } - totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, clusterInfo) + totalKVCPUQuota, err := getTiKVTotalCPUQuota(clusterInfo) if err != nil { return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } - totalTiDBCPUQuota, err := getTiDBTotalCPUQuota(ctx, clusterInfo) + totalTiDBCPUQuota, err := getTiDBTotalCPUQuota(clusterInfo) if err != nil { return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -487,14 +487,14 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk) error return nil } -func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, clusterInfo []infoschema.ServerInfo, ruCfg *resourceControlClient.RUConfig) error { +func staticCalibrateTpch10(req *chunk.Chunk, clusterInfo []infoschema.ServerInfo, ruCfg *resourceControlClient.RUConfig) error { // TPCH10 only considers the resource usage of the TiFlash including cpu and read bytes. Others are ignored. // cpu usage: 105494.666484 / 20 / 20 = 263.74 // read bytes: 401799161689.0 / 20 / 20 = 1004497904.22 const cpuTimePerCPUPerSec float64 = 263.74 const readBytesPerCPUPerSec float64 = 1004497904.22 ruPerCPU := float64(ruCfg.CPUMsCost)*cpuTimePerCPUPerSec + float64(ruCfg.ReadBytesCost)*readBytesPerCPUPerSec - totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, clusterInfo) + totalTiFlashLogicalCores, err := getTiFlashLogicalCores(clusterInfo) if err != nil { return err } @@ -503,7 +503,7 @@ func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, clusterInfo [] return nil } -func getTiDBTotalCPUQuota(ctx context.Context, clusterInfo []infoschema.ServerInfo) (float64, error) { +func getTiDBTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) { cpuQuota := float64(runtime.GOMAXPROCS(0)) failpoint.Inject("mockGOMAXPROCS", func(val failpoint.Value) { if val != nil { @@ -514,7 +514,7 @@ func getTiDBTotalCPUQuota(ctx context.Context, clusterInfo []infoschema.ServerIn return cpuQuota * float64(instanceNum), nil } -func getTiKVTotalCPUQuota(ctx context.Context, clusterInfo []infoschema.ServerInfo) (float64, error) { +func getTiKVTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) { instanceNum := count(clusterInfo, serverTypeTiKV) if instanceNum == 0 { return 0.0, errors.New("no server with type 'tikv' is found") @@ -526,7 +526,7 @@ func getTiKVTotalCPUQuota(ctx context.Context, clusterInfo []infoschema.ServerIn return cpuQuota * float64(instanceNum), nil } -func getTiFlashLogicalCores(ctx context.Context, clusterInfo []infoschema.ServerInfo) (float64, error) { +func getTiFlashLogicalCores(clusterInfo []infoschema.ServerInfo) (float64, error) { instanceNum := count(clusterInfo, serverTypeTiFlash) if instanceNum == 0 { return 0.0, nil @@ -636,8 +636,11 @@ func count(clusterInfo []infoschema.ServerInfo, ty string) int { func fetchServerCPUQuota(serverInfos []infoschema.ServerInfo, serverType string, metricName string) (float64, error) { var cpuQuota float64 - err := fetchStoreMetrics(serverInfos, serverType, func(addr string, reader io.Reader) error { - scanner := bufio.NewScanner(reader) + err := fetchStoreMetrics(serverInfos, serverType, func(addr string, resp *http.Response) error { + if resp.StatusCode != http.StatusOK { + return errors.Errorf("request %s failed: %s", addr, resp.Status) + } + scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := scanner.Text() if !strings.HasPrefix(line, metricName) { @@ -656,7 +659,7 @@ func fetchServerCPUQuota(serverInfos []infoschema.ServerInfo, serverType string, return cpuQuota, err } -func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, onResp func(string, io.Reader) error) error { +func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, onResp func(string, *http.Response) error) error { var firstErr error for _, srv := range serversInfo { if srv.ServerType != serverType { @@ -685,17 +688,15 @@ func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, o if resp == nil { var err1 error resp, err1 = util.InternalHTTPClient().Do(req) - if err1 != nil { + if err1 != nil && firstErr == nil { firstErr = err1 continue } - defer resp.Body.Close() } + err = onResp(srv.Address, resp) + resp.Body.Close() + return err - if resp.StatusCode != http.StatusOK { - return errors.Errorf("request %s failed: %s", url, resp.Status) - } - return onResp(srv.Address, resp.Body) } if firstErr == nil { firstErr = errors.Errorf("no server with type '%s' is found", serverType) @@ -707,6 +708,6 @@ type noopCloserWrapper struct { io.Reader } -func (r noopCloserWrapper) Close() error { +func (noopCloserWrapper) Close() error { return nil } From 64e5c8bb5fd53d5b00416c0cfc84929a960b5c56 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 5 Dec 2023 18:36:19 +0800 Subject: [PATCH 6/9] fix close resp body --- .../internal/calibrateresource/calibrate_resource.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource.go b/pkg/executor/internal/calibrateresource/calibrate_resource.go index c41b5499abb9a..d380b379274e3 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource.go @@ -688,8 +688,10 @@ func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, o if resp == nil { var err1 error resp, err1 = util.InternalHTTPClient().Do(req) - if err1 != nil && firstErr == nil { - firstErr = err1 + if err1 != nil { + if firstErr == nil { + firstErr = err1 + } continue } } From 605cde6cae70963d35699c29d847b0c26db56fa4 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 5 Dec 2023 18:45:48 +0800 Subject: [PATCH 7/9] fix fmt --- pkg/executor/internal/calibrateresource/calibrate_resource.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource.go b/pkg/executor/internal/calibrateresource/calibrate_resource.go index d380b379274e3..8830ac0cbe0a6 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource.go @@ -698,7 +698,6 @@ func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, o err = onResp(srv.Address, resp) resp.Body.Close() return err - } if firstErr == nil { firstErr = errors.Errorf("no server with type '%s' is found", serverType) From 58767c75220d2a72ac355907dd6f4f350ef47182 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 6 Dec 2023 10:32:58 +0800 Subject: [PATCH 8/9] ignore false-positive lint --- pkg/executor/internal/calibrateresource/calibrate_resource.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource.go b/pkg/executor/internal/calibrateresource/calibrate_resource.go index 8830ac0cbe0a6..592a196f9ed88 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource.go @@ -687,6 +687,8 @@ func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, o }) if resp == nil { var err1 error + // ignore false positive go line, can't use defer here because it's in a loop. + //nolint:bodyclose resp, err1 = util.InternalHTTPClient().Do(req) if err1 != nil { if firstErr == nil { From 338bdf78d96adc8e21d437010694a7acb35498c0 Mon Sep 17 00:00:00 2001 From: glorv Date: Wed, 6 Dec 2023 11:17:23 +0800 Subject: [PATCH 9/9] fix comment typo --- .../internal/calibrateresource/calibrate_resource.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource.go b/pkg/executor/internal/calibrateresource/calibrate_resource.go index 592a196f9ed88..0d3a34c60f0ee 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource.go @@ -92,9 +92,9 @@ var ( const ( // serverTypeTiDB is tidb's instance type name serverTypeTiDB = "tidb" - // serverTypeTiDB is tikv's instance type name + // serverTypeTiKV is tikv's instance type name serverTypeTiKV = "tikv" - // serverTypeTiDB is tiflash's instance type name + // serverTypeTiFlash is tiflash's instance type name serverTypeTiFlash = "tiflash" ) @@ -103,7 +103,7 @@ type baseResourceCost struct { // represents the average ratio of TiDB CPU time to TiKV CPU time, this is used to calculate whether tikv cpu // or tidb cpu is the performance bottle neck. tidbToKVCPURatio float64 - // the kv CPU time for calcula[te R]U, it's smaller than the actual cpu usage. The unit is seconds. + // the kv CPU time for calculate RU, it's smaller than the actual cpu usage. The unit is seconds. kvCPU float64 // the read bytes rate per 1 tikv cpu. readBytes uint64