diff --git a/component/topsql/query/default_query.go b/component/topsql/query/default_query.go index 74e6a3e..04b1f4e 100644 --- a/component/topsql/query/default_query.go +++ b/component/topsql/query/default_query.go @@ -69,6 +69,52 @@ func (dq *DefaultQuery) Records(name string, startSecs, endSecs, windowSecs, top }) } +func (dq *DefaultQuery) SummaryBy(startSecs, endSecs, windowSecs, top int, instance, instanceType, by string, fill *[]SummaryByItem) error { + if startSecs > endSecs { + return nil + } + + // adjust start to make result align to end + alignStartSecs := endSecs - (endSecs-startSecs)/windowSecs*windowSecs + + var recordsResponse recordsMetricRespV2 + if err := dq.fetchRecordsFromTSDBBy(store.MetricNameCPUTime, alignStartSecs, endSecs, windowSecs, instance, instanceType, by, top, &recordsResponse); err != nil { + return err + } + if len(recordsResponse.Data.Results) == 0 { + return nil + } + + for _, result := range recordsResponse.Data.Results { + text := result.Metric[by].(string) + sumItem := SummaryByItem{ + Text: text, + } + if text == "other" { + sumItem.IsOther = true + } + valueSum := uint64(0) + for _, value := range result.Values { + if len(value) != 2 { + continue + } + + ts := uint64(value[0].(float64)) + v, err := strconv.ParseUint(value[1].(string), 10, 64) + if err != nil { + continue + } + + valueSum += v + sumItem.TimestampSec = append(sumItem.TimestampSec, ts) + sumItem.CPUTimeMs = append(sumItem.CPUTimeMs, v) + } + sumItem.CPUTimeMsSum = valueSum + *fill = append(*fill, sumItem) + } + return nil +} + func (dq *DefaultQuery) Summary(startSecs, endSecs, windowSecs, top int, instance, instanceType string, fill *[]SummaryItem) error { if startSecs > endSecs { return nil @@ -303,6 +349,41 @@ func (dq *DefaultQuery) fetchRecordsFromTSDB(name string, startSecs int, endSecs return json.Unmarshal(respR.Body.Bytes(), metricResponse) } +func (dq *DefaultQuery) fetchRecordsFromTSDBBy(name string, startSecs int, endSecs int, windowSecs int, instance, instanceType, by string, top int, metricResponse *recordsMetricRespV2) error { + if dq.vmselectHandler == nil { + return fmt.Errorf("empty query handler") + } + + bufResp := bytesP.Get() + header := headerP.Get() + + defer bytesP.Put(bufResp) + defer headerP.Put(header) + + req, err := http.NewRequest("GET", "/api/v1/query_range", nil) + if err != nil { + return err + } + reqQuery := req.URL.Query() + reqQuery.Set("query", fmt.Sprintf("topk_avg(%d, sum(sum_over_time(%s{instance=\"%s\", instance_type=\"%s\"}[%d])) by (%s), \"%s=other\")", top, name, instance, instanceType, windowSecs, by, by)) + reqQuery.Set("start", strconv.Itoa(startSecs)) + reqQuery.Set("end", strconv.Itoa(endSecs)) + reqQuery.Set("step", strconv.Itoa(windowSecs)) + reqQuery.Set("nocache", "1") + req.URL.RawQuery = reqQuery.Encode() + req.Header.Set("Accept", "application/json") + + respR := utils.NewRespWriter(bufResp, header) + dq.vmselectHandler(&respR, req) + + if statusOK := respR.Code >= 200 && respR.Code < 300; !statusOK { + errStr := respR.Body.String() + log.Warn("failed to fetch timeseries db", zap.String("error", errStr)) + return errors.New(errStr) + } + return json.Unmarshal(respR.Body.Bytes(), metricResponse) +} + func (dq *DefaultQuery) fetchInstancesFromTSDB(startSecs, endSecs int, fill *[]InstanceItem) error { if dq.vmselectHandler == nil { return fmt.Errorf("empty query handler") diff --git a/component/topsql/query/model.go b/component/topsql/query/model.go index 2969fbb..167046a 100644 --- a/component/topsql/query/model.go +++ b/component/topsql/query/model.go @@ -26,6 +26,14 @@ type RecordPlanItem struct { SQLDurationCount []uint64 `json:"sql_duration_count,omitempty"` } +type SummaryByItem struct { + Text string `json:"text"` + TimestampSec []uint64 `json:"timestamp_sec"` + CPUTimeMs []uint64 `json:"cpu_time_ms,omitempty"` + CPUTimeMsSum uint64 `json:"cpu_time_ms_sum"` + IsOther bool `json:"is_other"` +} + type SummaryItem struct { SQLDigest string `json:"sql_digest"` SQLText string `json:"sql_text"` @@ -76,6 +84,21 @@ type recordsMetricRespDataResultMetric struct { PlanDigest string `json:"plan_digest"` } +type recordsMetricRespV2 struct { + Status string `json:"status"` + Data recordsMetricRespDataV2 `json:"data"` +} + +type recordsMetricRespDataV2 struct { + ResultType string `json:"resultType"` + Results []recordsMetricRespDataResultV2 `json:"result"` +} + +type recordsMetricRespDataResultV2 struct { + Metric map[string]interface{} `json:"metric"` + Values []recordsMetricRespDataResultValue `json:"values"` +} + type recordsMetricRespDataResultValue = []interface{} type instancesMetricResp struct { diff --git a/component/topsql/query/query.go b/component/topsql/query/query.go index a6b8799..c088de5 100644 --- a/component/topsql/query/query.go +++ b/component/topsql/query/query.go @@ -3,6 +3,7 @@ package query type Query interface { Records(name string, startSecs, endSecs, windowSecs, top int, instance, instanceType string, fill *[]RecordItem) error Summary(startSecs, endSecs, windowSecs, top int, instance, instanceType string, fill *[]SummaryItem) error + SummaryBy(startSecs, endSecs, windowSecs, top int, instance, instanceType, by string, fill *[]SummaryByItem) error Instances(startSecs, endSecs int, fill *[]InstanceItem) error Close() } diff --git a/component/topsql/service/http.go b/component/topsql/service/http.go index 63d0bd3..30e3284 100644 --- a/component/topsql/service/http.go +++ b/component/topsql/service/http.go @@ -14,7 +14,8 @@ import ( var ( recordsP = recordsPool{} - summaryP = summaryPool{} + summaryBySqlP = summarySQLPool{} + summaryByItemP = summaryByItemPool{} instanceItemsP = InstanceItemsPool{} metricNames = []string{ @@ -79,7 +80,7 @@ func (s *Service) metricHandler(name string) gin.HandlerFunc { } func (s *Service) summaryHandler(c *gin.Context) { - start, end, windowSecs, top, instance, instanceType, err := parseAllParams(c) + start, end, windowSecs, top, instance, instanceType, groupBy, err := parseAllParams(c) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "status": "error", @@ -87,26 +88,71 @@ func (s *Service) summaryHandler(c *gin.Context) { }) return } - - items := summaryP.Get() - defer summaryP.Put(items) - err = s.query.Summary(start, end, windowSecs, top, instance, instanceType, items) - if err != nil { - c.JSON(http.StatusServiceUnavailable, gin.H{ - "status": "error", - "message": err.Error(), + switch groupBy { + case "sql", "": + items := summaryBySqlP.Get() + defer summaryBySqlP.Put(items) + err = s.query.Summary(start, end, windowSecs, top, instance, instanceType, items) + if err != nil { + c.JSON(http.StatusServiceUnavailable, gin.H{ + "status": "error", + "message": err.Error(), + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + "data": items, + }) + case "table": + if instanceType == "tidb" { + c.JSON(http.StatusBadRequest, gin.H{ + "status": "error", + "message": "table summary is not supported for tidb", + }) + return + } + items := summaryByItemP.Get() + defer summaryByItemP.Put(items) + err = s.query.SummaryBy(start, end, windowSecs, top, instance, instanceType, "table", items) + if err != nil { + c.JSON(http.StatusServiceUnavailable, gin.H{ + "status": "error", + "message": err.Error(), + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + "data_by": items, + }) + case "db": + if instanceType == "tidb" { + c.JSON(http.StatusBadRequest, gin.H{ + "status": "error", + "message": "table summary is not supported for tidb", + }) + return + } + items := summaryByItemP.Get() + defer summaryByItemP.Put(items) + err = s.query.SummaryBy(start, end, windowSecs, top, instance, instanceType, "db", items) + if err != nil { + c.JSON(http.StatusServiceUnavailable, gin.H{ + "status": "error", + "message": err.Error(), + }) + return + } + c.JSON(http.StatusOK, gin.H{ + "status": "ok", + "data_by": items, }) - return } - c.JSON(http.StatusOK, gin.H{ - "status": "ok", - "data": items, - }) } - func (s *Service) queryMetric(c *gin.Context, name string) { - start, end, windowSecs, top, instance, instanceType, err := parseAllParams(c) + start, end, windowSecs, top, instance, instanceType, _, err := parseAllParams(c) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ "status": "error", @@ -132,7 +178,7 @@ func (s *Service) queryMetric(c *gin.Context, name string) { }) } -func parseAllParams(c *gin.Context) (start, end, windowSecs, top int, instance, instanceType string, err error) { +func parseAllParams(c *gin.Context) (start, end, windowSecs, top int, instance, instanceType string, groupBy string, err error) { instance = c.Query("instance") if len(instance) == 0 { err = errors.New("no instance") @@ -174,6 +220,7 @@ func parseAllParams(c *gin.Context) (start, end, windowSecs, top int, instance, } windowSecs = int(duration.Seconds()) + groupBy = c.Query("group_by") return } diff --git a/component/topsql/service/pools.go b/component/topsql/service/pools.go index c42e9d9..eb5418c 100644 --- a/component/topsql/service/pools.go +++ b/component/topsql/service/pools.go @@ -77,11 +77,11 @@ func (tip *recordsPool) Put(ti *[]query.RecordItem) { tip.p.Put(ti) } -type summaryPool struct { +type summarySQLPool struct { p sync.Pool } -func (sp *summaryPool) Get() *[]query.SummaryItem { +func (sp *summarySQLPool) Get() *[]query.SummaryItem { sv := sp.p.Get() if sv == nil { return &[]query.SummaryItem{} @@ -89,11 +89,28 @@ func (sp *summaryPool) Get() *[]query.SummaryItem { return sv.(*[]query.SummaryItem) } -func (sp *summaryPool) Put(s *[]query.SummaryItem) { +func (sp *summarySQLPool) Put(s *[]query.SummaryItem) { *s = (*s)[:0] sp.p.Put(s) } +type summaryByItemPool struct { + p sync.Pool +} + +func (tp *summaryByItemPool) Get() *[]query.SummaryByItem { + tv := tp.p.Get() + if tv == nil { + return &[]query.SummaryByItem{} + } + return tv.(*[]query.SummaryByItem) +} + +func (tp *summaryByItemPool) Put(t *[]query.SummaryByItem) { + *t = (*t)[:0] + tp.p.Put(t) +} + type InstanceItemsPool struct { p sync.Pool } diff --git a/component/topsql/topsql_test.go b/component/topsql/topsql_test.go index e8567f3..cf21244 100644 --- a/component/topsql/topsql_test.go +++ b/component/topsql/topsql_test.go @@ -875,6 +875,7 @@ func (s *testTopSQLSuite) TestTiKVSummary() { s.sortSummary(res) tsList = []uint64{testBaseTs + 22, testBaseTs + 32} timeWindow = float64(32 - 19 + 1) + s.Equal(res, []query.SummaryItem{{ SQLDigest: hex.EncodeToString([]byte("sql-0")), CPUTimeMs: sum( @@ -903,6 +904,7 @@ func (s *testTopSQLSuite) TestTiKVSummary() { ScanIndexesPerSec: sumf(data["sql-0"]["plan-0"]["index"].reads[2:4]) / timeWindow, }}, }, { + SQLDigest: hex.EncodeToString([]byte("sql-1")), CPUTimeMs: sum( data["sql-1"]["plan-0"]["index"].cpu[2:4], @@ -936,6 +938,7 @@ func (s *testTopSQLSuite) TestTiKVSummary() { ScanRecordsPerSec: sumf(data["sql-1"]["plan-2"]["record"].reads[2:4]) / timeWindow, }}, }, { + SQLDigest: hex.EncodeToString([]byte("sql-2")), CPUTimeMs: sum(data["sql-2"]["plan-0"]["record"].cpu[2:4]), ExecCountPerSec: sumf(data["sql-2"]["plan-0"]["record"].exec[2:4]) / timeWindow,