From 5044861773d506e96ea3d8f5169d175e480c81d0 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Wed, 20 Nov 2024 22:51:29 +0530 Subject: [PATCH 1/2] fix: remove service overview API (#6495) --- .../app/clickhouseReader/reader.go | 124 --------- pkg/query-service/app/http_handler.go | 17 -- pkg/query-service/app/parser.go | 245 ------------------ pkg/query-service/interfaces/interface.go | 1 - 4 files changed, 387 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 7e57e5ccaf..cd597eca24 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -642,130 +642,6 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G } return &serviceItems, nil } - -func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) { - - topLevelOps, apiErr := r.GetTopLevelOperations(ctx, skipConfig, *queryParams.Start, *queryParams.End, nil) - if apiErr != nil { - return nil, apiErr - } - ops, ok := (*topLevelOps)[queryParams.ServiceName] - if !ok { - return nil, &model.ApiError{Typ: model.ErrorNotFound, Err: fmt.Errorf("service not found")} - } - - namedArgs := []interface{}{ - clickhouse.Named("interval", strconv.Itoa(int(queryParams.StepSeconds/60))), - clickhouse.Named("start", strconv.FormatInt(queryParams.Start.UnixNano(), 10)), - clickhouse.Named("end", strconv.FormatInt(queryParams.End.UnixNano(), 10)), - clickhouse.Named("serviceName", queryParams.ServiceName), - clickhouse.Named("names", ops), - } - - serviceOverviewItems := []model.ServiceOverviewItem{} - - query := fmt.Sprintf(` - SELECT - toStartOfInterval(timestamp, INTERVAL @interval minute) as time, - quantile(0.99)(durationNano) as p99, - quantile(0.95)(durationNano) as p95, - quantile(0.50)(durationNano) as p50, - count(*) as numCalls - FROM %s.%s - WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`, - r.TraceDB, r.indexTable, - ) - args := []interface{}{} - args = append(args, namedArgs...) - - // create TagQuery from TagQueryParams - tags := createTagQueryFromTagQueryParams(queryParams.Tags) - subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) - query += subQuery - args = append(args, argsSubQuery...) - if errStatus != nil { - return nil, errStatus - } - query += " GROUP BY time ORDER BY time DESC" - err := r.db.Select(ctx, &serviceOverviewItems, query, args...) - - zap.L().Debug("running query", zap.String("query", query)) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - - serviceErrorItems := []model.ServiceErrorItem{} - - query = fmt.Sprintf(` - SELECT - toStartOfInterval(timestamp, INTERVAL @interval minute) as time, - count(*) as numErrors - FROM %s.%s - WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, - r.TraceDB, r.indexTable, - ) - args = []interface{}{} - args = append(args, namedArgs...) - subQuery, argsSubQuery, errStatus = buildQueryWithTagParams(ctx, tags) - query += subQuery - args = append(args, argsSubQuery...) - if errStatus != nil { - return nil, errStatus - } - query += " GROUP BY time ORDER BY time DESC" - err = r.db.Select(ctx, &serviceErrorItems, query, args...) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query")} - } - - m := make(map[int64]int) - - for j := range serviceErrorItems { - m[int64(serviceErrorItems[j].Time.UnixNano())] = int(serviceErrorItems[j].NumErrors) - } - - for i := range serviceOverviewItems { - serviceOverviewItems[i].Timestamp = int64(serviceOverviewItems[i].Time.UnixNano()) - - if val, ok := m[serviceOverviewItems[i].Timestamp]; ok { - serviceOverviewItems[i].NumErrors = uint64(val) - } - serviceOverviewItems[i].ErrorRate = float64(serviceOverviewItems[i].NumErrors) * 100 / float64(serviceOverviewItems[i].NumCalls) - serviceOverviewItems[i].CallRate = float64(serviceOverviewItems[i].NumCalls) / float64(queryParams.StepSeconds) - } - - return &serviceOverviewItems, nil -} - -func buildFilterArrayQuery(_ context.Context, excludeMap map[string]struct{}, params []string, filter string, query *string, args []interface{}) []interface{} { - for i, e := range params { - filterKey := filter + String(5) - if i == 0 && i == len(params)-1 { - if _, ok := excludeMap[filter]; ok { - *query += fmt.Sprintf(" AND NOT (%s=@%s)", filter, filterKey) - } else { - *query += fmt.Sprintf(" AND (%s=@%s)", filter, filterKey) - } - } else if i == 0 && i != len(params)-1 { - if _, ok := excludeMap[filter]; ok { - *query += fmt.Sprintf(" AND NOT (%s=@%s", filter, filterKey) - } else { - *query += fmt.Sprintf(" AND (%s=@%s", filter, filterKey) - } - } else if i != 0 && i == len(params)-1 { - *query += fmt.Sprintf(" OR %s=@%s)", filter, filterKey) - } else { - *query += fmt.Sprintf(" OR %s=@%s", filter, filterKey) - } - args = append(args, clickhouse.Named(filterKey, e)) - } - return args -} - func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string { // status can only be two and if both are selected than they are equivalent to none selected diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 2be68ede43..5fb604d7a9 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -508,7 +508,6 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { // router.HandleFunc("/api/v1/get_percentiles", aH.getApplicationPercentiles).Methods(http.MethodGet) router.HandleFunc("/api/v1/services", am.ViewAccess(aH.getServices)).Methods(http.MethodPost) router.HandleFunc("/api/v1/services/list", am.ViewAccess(aH.getServicesList)).Methods(http.MethodGet) - router.HandleFunc("/api/v1/service/overview", am.ViewAccess(aH.getServiceOverview)).Methods(http.MethodPost) router.HandleFunc("/api/v1/service/top_operations", am.ViewAccess(aH.getTopOperations)).Methods(http.MethodPost) router.HandleFunc("/api/v1/service/top_level_operations", am.ViewAccess(aH.getServicesTopLevelOps)).Methods(http.MethodPost) router.HandleFunc("/api/v1/traces/{traceId}", am.ViewAccess(aH.SearchTraces)).Methods(http.MethodGet) @@ -1632,22 +1631,6 @@ func (aH *APIHandler) getUsage(w http.ResponseWriter, r *http.Request) { } -func (aH *APIHandler) getServiceOverview(w http.ResponseWriter, r *http.Request) { - - query, err := parseGetServiceOverviewRequest(r) - if aH.HandleError(w, err, http.StatusBadRequest) { - return - } - - result, apiErr := aH.reader.GetServiceOverview(r.Context(), query, aH.skipConfig) - if apiErr != nil && aH.HandleError(w, apiErr.Err, http.StatusInternalServerError) { - return - } - - aH.WriteJSON(w, r, result) - -} - func (aH *APIHandler) getServicesTopLevelOps(w http.ResponseWriter, r *http.Request) { var start, end time.Time diff --git a/pkg/query-service/app/parser.go b/pkg/query-service/app/parser.go index fcf6944234..8cfc621662 100644 --- a/pkg/query-service/app/parser.go +++ b/pkg/query-service/app/parser.go @@ -195,28 +195,6 @@ func parseGetUsageRequest(r *http.Request) (*model.GetUsageParams, error) { } -func parseGetServiceOverviewRequest(r *http.Request) (*model.GetServiceOverviewParams, error) { - - var postData *model.GetServiceOverviewParams - err := json.NewDecoder(r.Body).Decode(&postData) - - if err != nil { - return nil, err - } - - postData.Start, err = parseTimeStr(postData.StartTime, "start") - if err != nil { - return nil, err - } - postData.End, err = parseTimeMinusBufferStr(postData.EndTime, "end") - if err != nil { - return nil, err - } - - postData.Period = fmt.Sprintf("PT%dM", postData.StepSeconds/60) - return postData, nil -} - func parseGetServicesRequest(r *http.Request) (*model.GetServicesParams, error) { var postData *model.GetServicesParams @@ -289,229 +267,6 @@ func DoesExistInSlice(item string, list []string) bool { } return false } - -func parseSpanFilterRequestBody(r *http.Request) (*model.SpanFilterParams, error) { - - var postData *model.SpanFilterParams - err := json.NewDecoder(r.Body).Decode(&postData) - - if err != nil { - return nil, err - } - - postData.Start, err = parseTimeStr(postData.StartStr, "start") - if err != nil { - return nil, err - } - postData.End, err = parseTimeMinusBufferStr(postData.EndStr, "end") - if err != nil { - return nil, err - } - - return postData, nil -} - -func parseFilteredSpansRequest(r *http.Request, aH *APIHandler) (*model.GetFilteredSpansParams, error) { - - var postData *model.GetFilteredSpansParams - err := json.NewDecoder(r.Body).Decode(&postData) - - if err != nil { - return nil, err - } - - postData.Start, err = parseTimeStr(postData.StartStr, "start") - if err != nil { - return nil, err - } - postData.End, err = parseTimeMinusBufferStr(postData.EndStr, "end") - if err != nil { - return nil, err - } - - if postData.Limit == 0 { - postData.Limit = 10 - } - - if len(postData.Order) != 0 { - if postData.Order != baseconstants.Ascending && postData.Order != baseconstants.Descending { - return nil, errors.New("order param is not in correct format") - } - if postData.OrderParam != baseconstants.Duration && postData.OrderParam != baseconstants.Timestamp { - return nil, errors.New("order param is not in correct format") - } - if postData.OrderParam == baseconstants.Duration && !aH.CheckFeature(baseconstants.DurationSort) { - return nil, model.ErrFeatureUnavailable{Key: baseconstants.DurationSort} - } else if postData.OrderParam == baseconstants.Timestamp && !aH.CheckFeature(baseconstants.TimestampSort) { - return nil, model.ErrFeatureUnavailable{Key: baseconstants.TimestampSort} - } - } - tags, err := extractTagKeys(postData.Tags) - if err != nil { - return nil, err - } - postData.Tags = tags - return postData, nil -} - -func parseFilteredSpanAggregatesRequest(r *http.Request) (*model.GetFilteredSpanAggregatesParams, error) { - - var postData *model.GetFilteredSpanAggregatesParams - err := json.NewDecoder(r.Body).Decode(&postData) - - if err != nil { - return nil, err - } - - postData.Start, err = parseTimeStr(postData.StartStr, "start") - if err != nil { - return nil, err - } - postData.End, err = parseTimeMinusBufferStr(postData.EndStr, "end") - if err != nil { - return nil, err - } - - step := postData.StepSeconds - if step == 0 { - return nil, errors.New("step param missing in query") - } - - function := postData.Function - if len(function) == 0 { - return nil, errors.New("function param missing in query") - } else { - if !DoesExistInSlice(function, allowedFunctions) { - return nil, fmt.Errorf("given function: %s is not allowed in query", function) - } - } - - var dimension, aggregationOption string - - switch function { - case "count": - dimension = "calls" - aggregationOption = "count" - case "ratePerSec": - dimension = "calls" - aggregationOption = "rate_per_sec" - case "avg": - dimension = "duration" - aggregationOption = "avg" - case "sum": - dimension = "duration" - aggregationOption = "sum" - case "p50": - dimension = "duration" - aggregationOption = "p50" - case "p90": - dimension = "duration" - aggregationOption = "p90" - case "p95": - dimension = "duration" - aggregationOption = "p95" - case "p99": - dimension = "duration" - aggregationOption = "p99" - case "min": - dimension = "duration" - aggregationOption = "min" - case "max": - dimension = "duration" - aggregationOption = "max" - } - - postData.AggregationOption = aggregationOption - postData.Dimension = dimension - tags, err := extractTagKeys(postData.Tags) - if err != nil { - return nil, err - } - postData.Tags = tags - - return postData, nil -} - -func extractTagKeys(tags []model.TagQueryParam) ([]model.TagQueryParam, error) { - newTags := make([]model.TagQueryParam, 0) - if len(tags) != 0 { - for _, tag := range tags { - customStr := strings.Split(tag.Key, ".(") - if len(customStr) < 2 { - return nil, fmt.Errorf("TagKey param is not valid in query") - } else { - tag.Key = customStr[0] - } - if tag.Operator == model.ExistsOperator || tag.Operator == model.NotExistsOperator { - if customStr[1] == string(model.TagTypeString)+")" { - tag.StringValues = []string{" "} - } else if customStr[1] == string(model.TagTypeBool)+")" { - tag.BoolValues = []bool{true} - } else if customStr[1] == string(model.TagTypeNumber)+")" { - tag.NumberValues = []float64{0} - } else { - return nil, fmt.Errorf("TagKey param is not valid in query") - } - } - newTags = append(newTags, tag) - } - } - return newTags, nil -} - -func parseTagFilterRequest(r *http.Request) (*model.TagFilterParams, error) { - var postData *model.TagFilterParams - err := json.NewDecoder(r.Body).Decode(&postData) - - if err != nil { - return nil, err - } - - postData.Start, err = parseTimeStr(postData.StartStr, "start") - if err != nil { - return nil, err - } - postData.End, err = parseTimeMinusBufferStr(postData.EndStr, "end") - if err != nil { - return nil, err - } - - return postData, nil - -} - -func parseTagValueRequest(r *http.Request) (*model.TagFilterParams, error) { - var postData *model.TagFilterParams - err := json.NewDecoder(r.Body).Decode(&postData) - - if err != nil { - return nil, err - } - if postData.TagKey == (model.TagKey{}) { - return nil, fmt.Errorf("TagKey param missing in query") - } - - if postData.TagKey.Type != model.TagTypeString && postData.TagKey.Type != model.TagTypeBool && postData.TagKey.Type != model.TagTypeNumber { - return nil, fmt.Errorf("tag keys type %s is not supported", postData.TagKey.Type) - } - - if postData.Limit == 0 { - postData.Limit = 100 - } - - postData.Start, err = parseTimeStr(postData.StartStr, "start") - if err != nil { - return nil, err - } - postData.End, err = parseTimeMinusBufferStr(postData.EndStr, "end") - if err != nil { - return nil, err - } - - return postData, nil - -} - func parseListErrorsRequest(r *http.Request) (*model.ListErrorsParams, error) { var allowedOrderParams = []string{"exceptionType", "exceptionCount", "firstSeen", "lastSeen", "serviceName"} diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index 10c718aa28..1cf32ede02 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -16,7 +16,6 @@ import ( type Reader interface { GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) - GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) GetTopLevelOperations(ctx context.Context, skipConfig *model.SkipConfig, start, end time.Time, services []string) (*map[string][]string, *model.ApiError) GetServices(ctx context.Context, query *model.GetServicesParams, skipConfig *model.SkipConfig) (*[]model.ServiceItem, *model.ApiError) GetTopOperations(ctx context.Context, query *model.GetTopOperationsParams) (*[]model.TopOperationsItem, *model.ApiError) From d43adc24ef4a1d8154b0765ae94d2b5aa2b1a8c3 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Wed, 20 Nov 2024 23:35:44 +0530 Subject: [PATCH 2/2] feat: update clickhouse reader to support new trace schema (#6479) * feat: update clickhouse reader to support new trace schema * fix: minor fixes * fix: address comments * fix: add changes to overview function * fix: add changes to overview function * fix: use hardcoded true * fix: address comments --- .../app/clickhouseReader/options.go | 18 + .../app/clickhouseReader/reader.go | 516 +++++++++++++++++- pkg/query-service/constants/constants.go | 14 + pkg/query-service/model/trace.go | 1 + 4 files changed, 525 insertions(+), 24 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 695bef8570..25eea0c7ff 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -22,6 +22,7 @@ const ( defaultTraceDB string = "signoz_traces" defaultOperationsTable string = "distributed_signoz_operations" defaultIndexTable string = "distributed_signoz_index_v2" + defaultLocalIndexTable string = "signoz_index_v2" defaultErrorTable string = "distributed_signoz_error_index_v2" defaultDurationTable string = "distributed_durationSort" defaultUsageExplorerTable string = "distributed_usage_explorer" @@ -45,6 +46,11 @@ const ( defaultLogsTableV2 string = "distributed_logs_v2" defaultLogsResourceLocalTableV2 string = "logs_v2_resource" defaultLogsResourceTableV2 string = "distributed_logs_v2_resource" + + defaultTraceIndexTableV3 string = "distributed_signoz_index_v3" + defaultTraceLocalTableName string = "signoz_index_v3" + defaultTraceResourceTableV3 string = "distributed_traces_v3_resource" + defaultTraceSummaryTable string = "distributed_trace_summary" ) // NamespaceConfig is Clickhouse's internal configuration data @@ -58,6 +64,7 @@ type namespaceConfig struct { TraceDB string OperationsTable string IndexTable string + LocalIndexTable string DurationTable string UsageExplorerTable string SpansTable string @@ -82,6 +89,11 @@ type namespaceConfig struct { LogsTableV2 string LogsResourceLocalTableV2 string LogsResourceTableV2 string + + TraceIndexTableV3 string + TraceLocalTableNameV3 string + TraceResourceTableV3 string + TraceSummaryTable string } // Connecto defines how to connect to the database @@ -150,6 +162,7 @@ func NewOptions( TraceDB: defaultTraceDB, OperationsTable: defaultOperationsTable, IndexTable: defaultIndexTable, + LocalIndexTable: defaultLocalIndexTable, ErrorTable: defaultErrorTable, DurationTable: defaultDurationTable, UsageExplorerTable: defaultUsageExplorerTable, @@ -174,6 +187,11 @@ func NewOptions( LogsLocalTableV2: defaultLogsLocalTableV2, LogsResourceTableV2: defaultLogsResourceTableV2, LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2, + + TraceIndexTableV3: defaultTraceIndexTableV3, + TraceLocalTableNameV3: defaultTraceLocalTableName, + TraceResourceTableV3: defaultTraceResourceTableV3, + TraceSummaryTable: defaultTraceSummaryTable, }, others: make(map[string]*namespaceConfig, len(otherNamespaces)), } diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index cd597eca24..b7fd02383d 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -145,9 +145,16 @@ type ClickHouseReader struct { liveTailRefreshSeconds int cluster string - useLogsNewSchema bool + useLogsNewSchema bool + useTraceNewSchema bool + logsTableName string logsLocalTableName string + + traceTableName string + traceLocalTableName string + traceResourceTableV3 string + traceSummaryTable string } // NewTraceReader returns a TraceReader for the database @@ -160,6 +167,7 @@ func NewReader( dialTimeout time.Duration, cluster string, useLogsNewSchema bool, + // useTraceNewSchema bool, // TODO: uncomment this in integration PR ) *ClickHouseReader { datasource := os.Getenv("ClickHouseUrl") @@ -181,6 +189,7 @@ func NewReaderFromClickhouseConnection( featureFlag interfaces.FeatureLookup, cluster string, useLogsNewSchema bool, + // useTraceNewSchema bool, ) *ClickHouseReader { alertManager, err := am.New() if err != nil { @@ -218,6 +227,14 @@ func NewReaderFromClickhouseConnection( logsLocalTableName = options.primary.LogsLocalTableV2 } + traceTableName := options.primary.IndexTable + traceLocalTableName := options.primary.LocalIndexTable + // TODO: uncomment this in integration PR + // if useTraceNewSchema { + // traceTableName = options.primary.TraceIndexTableV3 + // traceLocalTableName = options.primary.TraceLocalTableNameV3 + // } + return &ClickHouseReader{ db: wrap, localDB: localDB, @@ -253,6 +270,12 @@ func NewReaderFromClickhouseConnection( logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2, logsTableName: logsTableName, logsLocalTableName: logsLocalTableName, + + // useTraceNewSchema: useTraceNewSchema, + traceLocalTableName: traceLocalTableName, + traceTableName: traceTableName, + traceResourceTableV3: options.primary.TraceResourceTableV3, + traceSummaryTable: options.primary.TraceSummaryTable, } } @@ -465,7 +488,11 @@ func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) { services := []string{} - query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.indexTable) + query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceTableName) + + if r.useTraceNewSchema { + query = fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE ts_bucket_start > (toUnixTimestamp(now() - INTERVAL 1 DAY) - 1800) AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceTableName) + } rows, err := r.db.Query(ctx, query) @@ -574,14 +601,14 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G count(*) as numCalls FROM %s.%s WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`, - r.TraceDB, r.indexTable, + r.TraceDB, r.traceTableName, ) errorQuery := fmt.Sprintf( `SELECT count(*) as numErrors FROM %s.%s WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, - r.TraceDB, r.indexTable, + r.TraceDB, r.traceTableName, ) args := []interface{}{} @@ -591,6 +618,18 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G clickhouse.Named("serviceName", svc), clickhouse.Named("names", ops), ) + + if r.useTraceNewSchema { + resourceBucketFilter := fmt.Sprintf(constants.TraceResourceBucketFilterWithServiceName, r.TraceDB, r.traceResourceTableV3) + query += resourceBucketFilter + errorQuery += resourceBucketFilter + args = append(args, + clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)), + clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)), + clickhouse.Named("labelFilter", "%service.name%"+strings.ToLower(utils.QuoteEscapedStringForContains(svc, true))+"%"), + ) + } + // create TagQuery from TagQueryParams tags := createTagQueryFromTagQueryParams(queryParams.Tags) subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) @@ -642,6 +681,7 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G } return &serviceItems, nil } + func getStatusFilters(query string, statusParams []string, excludeMap map[string]struct{}) string { // status can only be two and if both are selected than they are equivalent to none selected @@ -863,8 +903,19 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo name FROM %s.%s WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`, - r.TraceDB, r.indexTable, + r.TraceDB, r.traceTableName, ) + + if r.useTraceNewSchema { + resourceBucketFilter := fmt.Sprintf(constants.TraceResourceBucketFilterWithServiceName, r.TraceDB, r.traceResourceTableV3) + query += resourceBucketFilter + namedArgs = append(namedArgs, + clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)), + clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)), + clickhouse.Named("labelFilter", "%service.name%"+strings.ToLower(utils.QuoteEscapedStringForContains(queryParams.ServiceName, true))+"%"), + ) + } + args := []interface{}{} args = append(args, namedArgs...) // create TagQuery from TagQueryParams @@ -930,10 +981,140 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU return &usageItems, nil } +func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) { + searchSpansResult := []model.SearchSpansResult{ + { + Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"}, + IsSubTree: false, + Events: make([][]interface{}, 0), + }, + } + + var traceSummary model.TraceSummary + summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE trace_id=$1", r.TraceDB, r.traceSummaryTable) + err := r.db.QueryRow(ctx, summaryQuery, params.TraceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) + if err != nil { + if err == sql.ErrNoRows { + return &searchSpansResult, nil + } + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, fmt.Errorf("error in processing sql query") + } + + if traceSummary.NumSpans > uint64(params.MaxSpansInTrace) { + zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace), + zap.Uint64("Count", traceSummary.NumSpans)) + userEmail, err := auth.GetEmailFromJwt(ctx) + if err == nil { + data := map[string]interface{}{ + "traceSize": traceSummary.NumSpans, + "maxSpansInTraceLimit": params.MaxSpansInTrace, + } + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, userEmail, true, false) + } + return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details") + } + + userEmail, err := auth.GetEmailFromJwt(ctx) + if err == nil { + data := map[string]interface{}{ + "traceSize": traceSummary.NumSpans, + } + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, userEmail, true, false) + } + + var startTime, endTime, durationNano uint64 + var searchScanResponses []model.SpanItemV2 + + query := fmt.Sprintf("SELECT timestamp, duration_nano, span_id, trace_id, has_error, kind, resource_string_service$$name, name, references, attributes_string, attributes_number, attributes_bool, resources_string, events, status_message, status_code_string, kind_string FROM %s.%s WHERE trace_id=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3", r.TraceDB, r.traceTableName) + + start := time.Now() + + err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) + + zap.L().Info(query) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, fmt.Errorf("error in processing sql query") + } + end := time.Now() + zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start))) + + searchSpansResult[0].Events = make([][]interface{}, len(searchScanResponses)) + + searchSpanResponses := []model.SearchSpanResponseItem{} + start = time.Now() + for _, item := range searchScanResponses { + ref := []model.OtelSpanRef{} + err := json.Unmarshal([]byte(item.References), &ref) + if err != nil { + zap.L().Error("Error unmarshalling references", zap.Error(err)) + return nil, err + } + + // merge attributes_number and attributes_bool to attributes_string + for k, v := range item.Attributes_bool { + item.Attributes_string[k] = fmt.Sprintf("%v", v) + } + for k, v := range item.Attributes_number { + item.Attributes_string[k] = fmt.Sprintf("%v", v) + } + for k, v := range item.Resources_string { + item.Attributes_string[k] = v + } + + jsonItem := model.SearchSpanResponseItem{ + SpanID: item.SpanID, + TraceID: item.TraceID, + ServiceName: item.ServiceName, + Name: item.Name, + Kind: int32(item.Kind), + DurationNano: int64(item.DurationNano), + HasError: item.HasError, + StatusMessage: item.StatusMessage, + StatusCodeString: item.StatusCodeString, + SpanKind: item.SpanKind, + References: ref, + Events: item.Events, + TagMap: item.Attributes_string, + } + + jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000) + + searchSpanResponses = append(searchSpanResponses, jsonItem) + if startTime == 0 || jsonItem.TimeUnixNano < startTime { + startTime = jsonItem.TimeUnixNano + } + if endTime == 0 || jsonItem.TimeUnixNano > endTime { + endTime = jsonItem.TimeUnixNano + } + if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano { + durationNano = uint64(jsonItem.DurationNano) + } + } + end = time.Now() + zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start))) + + for i, item := range searchSpanResponses { + spanEvents := item.GetValues() + searchSpansResult[0].Events[i] = spanEvents + } + + searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000) + searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000) + + return &searchSpansResult, nil +} + func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) { + if r.useTraceNewSchema { + return r.SearchTracesV2(ctx, params) + } + var countSpans uint64 countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans) @@ -2011,7 +2192,7 @@ func (r *ClickHouseReader) GetTotalSpans(ctx context.Context) (uint64, error) { var totalSpans uint64 - queryStr := fmt.Sprintf("SELECT count() from %s.%s;", signozTraceDBName, signozTraceTableName) + queryStr := fmt.Sprintf("SELECT count() from %s.%s;", signozTraceDBName, r.traceTableName) r.db.QueryRow(ctx, queryStr).Scan(&totalSpans) return totalSpans, nil @@ -2022,7 +2203,9 @@ func (r *ClickHouseReader) GetSpansInLastHeartBeatInterval(ctx context.Context, var spansInLastHeartBeatInterval uint64 queryStr := fmt.Sprintf("SELECT count() from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, signozSpansTable, int(interval.Minutes())) - + if r.useTraceNewSchema { + queryStr = fmt.Sprintf("SELECT count() from %s.%s where ts_bucket_start >= toUInt64(toUnixTimestamp(now() - toIntervalMinute(%d))) - 1800 and timestamp > toUnixTimestamp(now()-toIntervalMinute(%d));", signozTraceDBName, r.traceTableName, int(interval.Minutes()), int(interval.Minutes())) + } r.db.QueryRow(ctx, queryStr).Scan(&spansInLastHeartBeatInterval) return spansInLastHeartBeatInterval, nil @@ -2141,11 +2324,17 @@ func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Contex } func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) { - queryStr := fmt.Sprintf(`select serviceName, stringTagMap['deployment.environment'] as env, stringTagMap['telemetry.sdk.language'] as language from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d)) - group by serviceName, env, language;`, r.TraceDB, r.indexTable, int(interval.Minutes())) + group by serviceName, env, language;`, r.TraceDB, r.traceTableName, int(interval.Minutes())) + + if r.useTraceNewSchema { + queryStr = fmt.Sprintf(`select serviceName, resources_string['deployment.environment'] as env, + resources_string['telemetry.sdk.language'] as language from %s.%s + where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d)) + group by serviceName, env, language;`, r.TraceDB, r.traceTableName, int(interval.Minutes())) + } tagTelemetryDataList := []model.TagTelemetryData{} err := r.db.Select(ctx, &tagTelemetryDataList, queryStr) @@ -3603,7 +3792,102 @@ func (r *ClickHouseReader) CheckClickHouse(ctx context.Context) error { return nil } +func (r *ClickHouseReader) GetTraceAggregateAttributesV2(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { + var query string + var err error + var rows driver.Rows + var response v3.AggregateAttributeResponse + var stringAllowed bool + + where := "" + switch req.Operator { + case + v3.AggregateOperatorCountDistinct, + v3.AggregateOperatorCount: + where = "tagKey ILIKE $1" + stringAllowed = true + case + v3.AggregateOperatorRateSum, + v3.AggregateOperatorRateMax, + v3.AggregateOperatorRateAvg, + v3.AggregateOperatorRate, + v3.AggregateOperatorRateMin, + v3.AggregateOperatorP05, + v3.AggregateOperatorP10, + v3.AggregateOperatorP20, + v3.AggregateOperatorP25, + v3.AggregateOperatorP50, + v3.AggregateOperatorP75, + v3.AggregateOperatorP90, + v3.AggregateOperatorP95, + v3.AggregateOperatorP99, + v3.AggregateOperatorAvg, + v3.AggregateOperatorSum, + v3.AggregateOperatorMin, + v3.AggregateOperatorMax: + where = "tagKey ILIKE $1 AND dataType='float64'" + stringAllowed = false + case + v3.AggregateOperatorNoOp: + return &v3.AggregateAttributeResponse{}, nil + default: + return nil, fmt.Errorf("unsupported aggregate operator") + } + query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s WHERE %s", r.TraceDB, r.spanAttributeTable, where) + if req.Limit != 0 { + query = query + fmt.Sprintf(" LIMIT %d;", req.Limit) + } + rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText)) + + if err != nil { + zap.L().Error("Error while executing query", zap.Error(err)) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + statements := []model.ShowCreateTableStatement{} + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceLocalTableName) + err = r.db.Select(ctx, &statements, query) + if err != nil { + return nil, fmt.Errorf("error while fetching trace schema: %s", err.Error()) + } + + var tagKey string + var dataType string + var tagType string + for rows.Next() { + if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + key := v3.AttributeKey{ + Key: tagKey, + DataType: v3.AttributeKeyDataType(dataType), + Type: v3.AttributeKeyType(tagType), + IsColumn: isColumn(true, statements[0].Statement, tagType, tagKey, dataType), + } + + if _, ok := constants.DeprecatedStaticFieldsTraces[tagKey]; !ok { + response.AttributeKeys = append(response.AttributeKeys, key) + } + } + + // add the new static fields + for _, field := range constants.NewStaticFieldsTraces { + if (!stringAllowed && field.DataType == v3.AttributeKeyDataTypeString) || (v3.AttributeKey{} == field) { + continue + } else if len(req.SearchText) == 0 || strings.Contains(field.Key, req.SearchText) { + response.AttributeKeys = append(response.AttributeKeys, field) + } + } + + return &response, nil +} + func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req *v3.AggregateAttributeRequest) (*v3.AggregateAttributeResponse, error) { + if r.useTraceNewSchema { + return r.GetTraceAggregateAttributesV2(ctx, req) + } + var query string var err error var rows driver.Rows @@ -3660,8 +3944,6 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) } - // TODO: Remove this once the column name are updated in the table - tagKey = tempHandleFixedColumns(tagKey) key := v3.AttributeKey{ Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), @@ -3673,7 +3955,69 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req return &response, nil } +func (r *ClickHouseReader) GetTraceAttributeKeysV2(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + + var query string + var err error + var rows driver.Rows + var response v3.FilterAttributeKeyResponse + + query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s WHERE tagKey ILIKE $1 LIMIT $2", r.TraceDB, r.spanAttributeTable) + + rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), req.Limit) + + if err != nil { + zap.L().Error("Error while executing query", zap.Error(err)) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + statements := []model.ShowCreateTableStatement{} + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceLocalTableName) + err = r.db.Select(ctx, &statements, query) + if err != nil { + return nil, fmt.Errorf("error while fetching trace schema: %s", err.Error()) + } + + var tagKey string + var dataType string + var tagType string + for rows.Next() { + if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + key := v3.AttributeKey{ + Key: tagKey, + DataType: v3.AttributeKeyDataType(dataType), + Type: v3.AttributeKeyType(tagType), + IsColumn: isColumn(true, statements[0].Statement, tagType, tagKey, dataType), + } + + // don't send deprecated static fields + // this is added so that once the old tenants are moved to new schema, + // they old attributes are not sent to the frontend autocomplete + if _, ok := constants.DeprecatedStaticFieldsTraces[tagKey]; !ok { + response.AttributeKeys = append(response.AttributeKeys, key) + } + } + + // add the new static fields + for _, f := range constants.NewStaticFieldsTraces { + if (v3.AttributeKey{} == f) { + continue + } + if len(req.SearchText) == 0 || strings.Contains(f.Key, req.SearchText) { + response.AttributeKeys = append(response.AttributeKeys, f) + } + } + + return &response, nil +} + func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + if r.useTraceNewSchema { + return r.GetTraceAttributeKeysV2(ctx, req) + } var query string var err error @@ -3701,8 +4045,6 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) } - // TODO: Remove this once the column name are updated in the table - tagKey = tempHandleFixedColumns(tagKey) key := v3.AttributeKey{ Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), @@ -3714,21 +4056,96 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi return &response, nil } -// tempHandleFixedColumns is a temporary function to handle the fixed columns whose name has been changed in AttributeKeys Table -func tempHandleFixedColumns(tagKey string) string { - switch { - case tagKey == "traceId": - tagKey = "traceID" - case tagKey == "spanId": - tagKey = "spanID" - case tagKey == "parentSpanId": - tagKey = "parentSpanID" - } - return tagKey +func (r *ClickHouseReader) GetTraceAttributeValuesV2(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + var query string + var filterValueColumn string + var err error + var rows driver.Rows + var attributeValues v3.FilterAttributeValueResponse + + // if dataType or tagType is not present return empty response + if len(req.FilterAttributeKeyDataType) == 0 || len(req.TagType) == 0 { + // add data type if it's a top level key + if k, ok := constants.StaticFieldsTraces[req.FilterAttributeKey]; ok { + req.FilterAttributeKeyDataType = k.DataType + } else { + return &v3.FilterAttributeValueResponse{}, nil + } + } + + // if data type is bool, return true and false + if req.FilterAttributeKeyDataType == v3.AttributeKeyDataTypeBool { + return &v3.FilterAttributeValueResponse{ + BoolAttributeValues: []bool{true, false}, + }, nil + } + + query = "select distinct" + switch req.FilterAttributeKeyDataType { + case v3.AttributeKeyDataTypeFloat64: + filterValueColumn = "float64TagValue" + case v3.AttributeKeyDataTypeString: + filterValueColumn = "stringTagValue" + } + + searchText := fmt.Sprintf("%%%s%%", req.SearchText) + + // check if the tagKey is a topLevelColumn + // here we are using StaticFieldsTraces instead of NewStaticFieldsTraces as we want to consider old columns as well. + if _, ok := constants.StaticFieldsTraces[req.FilterAttributeKey]; ok { + // query the column for the last 48 hours + filterValueColumnWhere := req.FilterAttributeKey + selectKey := req.FilterAttributeKey + if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString { + filterValueColumnWhere = fmt.Sprintf("toString(%s)", req.FilterAttributeKey) + selectKey = fmt.Sprintf("toInt64(%s)", req.FilterAttributeKey) + } + + // TODO(nitya): remove 24 hour limit in future after checking the perf/resource implications + query = fmt.Sprintf("select distinct %s from %s.%s where ts_bucket_start >= toUInt64(toUnixTimestamp(now() - INTERVAL 48 HOUR)) AND timestamp >= toDateTime64(now() - INTERVAL 48 HOUR, 9) and %s ILIKE $1 limit $2", selectKey, r.TraceDB, r.traceTableName, filterValueColumnWhere) + rows, err = r.db.Query(ctx, query, searchText, req.Limit) + } else { + filterValueColumnWhere := filterValueColumn + if req.FilterAttributeKeyDataType != v3.AttributeKeyDataTypeString { + filterValueColumnWhere = fmt.Sprintf("toString(%s)", filterValueColumn) + } + query = fmt.Sprintf("select distinct %s from %s.%s where tagKey=$1 and %s ILIKE $2 and tagType=$3 limit $4", filterValueColumn, r.TraceDB, r.spanAttributeTable, filterValueColumnWhere) + rows, err = r.db.Query(ctx, query, req.FilterAttributeKey, searchText, req.TagType, req.Limit) + } + + if err != nil { + zap.L().Error("Error while executing query", zap.Error(err)) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + var strAttributeValue string + var float64AttributeValue sql.NullFloat64 + for rows.Next() { + switch req.FilterAttributeKeyDataType { + case v3.AttributeKeyDataTypeFloat64: + if err := rows.Scan(&float64AttributeValue); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + if float64AttributeValue.Valid { + attributeValues.NumberAttributeValues = append(attributeValues.NumberAttributeValues, float64AttributeValue.Float64) + } + case v3.AttributeKeyDataTypeString: + if err := rows.Scan(&strAttributeValue); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + attributeValues.StringAttributeValues = append(attributeValues.StringAttributeValues, strAttributeValue) + } + } + + return &attributeValues, nil } func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + if r.useTraceNewSchema { + return r.GetTraceAttributeValuesV2(ctx, req) + } var query string var err error var rows driver.Rows @@ -3781,7 +4198,58 @@ func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3. return &attributeValues, nil } +func (r *ClickHouseReader) GetSpanAttributeKeysV2(ctx context.Context) (map[string]v3.AttributeKey, error) { + var query string + var err error + var rows driver.Rows + response := map[string]v3.AttributeKey{} + + query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s", r.TraceDB, r.spanAttributesKeysTable) + + rows, err = r.db.Query(ctx, query) + if err != nil { + zap.L().Error("Error while executing query", zap.Error(err)) + return nil, fmt.Errorf("error while executing query: %s", err.Error()) + } + defer rows.Close() + + statements := []model.ShowCreateTableStatement{} + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceTableName) + err = r.db.Select(ctx, &statements, query) + if err != nil { + return nil, fmt.Errorf("error while fetching trace schema: %s", err.Error()) + } + + var tagKey string + var dataType string + var tagType string + for rows.Next() { + if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil { + return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) + } + key := v3.AttributeKey{ + Key: tagKey, + DataType: v3.AttributeKeyDataType(dataType), + Type: v3.AttributeKeyType(tagType), + IsColumn: isColumn(true, statements[0].Statement, tagType, tagKey, dataType), + } + + name := tagKey + "##" + tagType + "##" + strings.ToLower(dataType) + response[name] = key + } + + for _, key := range constants.StaticFieldsTraces { + name := key.Key + "##" + key.Type.String() + "##" + strings.ToLower(key.DataType.String()) + response[name] = key + } + + return response, nil +} + func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string]v3.AttributeKey, error) { + if r.useTraceNewSchema { + return r.GetSpanAttributeKeysV2(ctx) + } var query string var err error var rows driver.Rows diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 7072f13173..b66f60dfa9 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -718,3 +718,17 @@ func init() { } const TRACE_V4_MAX_PAGINATION_LIMIT = 10000 + +const TraceResourceBucketFilterWithServiceName = ` + AND ( + resource_fingerprint GLOBAL IN + ( + SELECT fingerprint FROM %s.%s + WHERE + seen_at_ts_bucket_start >= @start_bucket AND seen_at_ts_bucket_start <= @end_bucket AND + simpleJSONExtractString(labels, 'service.name') = @serviceName AND + labels like @labelFilter + ) + ) + AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket + ` diff --git a/pkg/query-service/model/trace.go b/pkg/query-service/model/trace.go index d334e1cfe7..e8d3d70ac2 100644 --- a/pkg/query-service/model/trace.go +++ b/pkg/query-service/model/trace.go @@ -15,6 +15,7 @@ type SpanItemV2 struct { Attributes_string map[string]string `ch:"attributes_string"` Attributes_number map[string]float64 `ch:"attributes_number"` Attributes_bool map[string]bool `ch:"attributes_bool"` + Resources_string map[string]string `ch:"resources_string"` Events []string `ch:"events"` StatusMessage string `ch:"status_message"` StatusCodeString string `ch:"status_code_string"`