Skip to content

Commit

Permalink
pkg: Filter groupByLabels earlier
Browse files Browse the repository at this point in the history
Once we're building the FrostDB query we don't care about the rendering flame graphs anymore and therefore we only pass the group by labels into the selectMerge function. Excluding the possible group by features of flame graphs
  • Loading branch information
metalmatze committed Oct 14, 2024
1 parent df9be69 commit f96fbb6
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 22 deletions.
26 changes: 7 additions & 19 deletions pkg/parcacol/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,13 +1347,13 @@ func (q *Querier) QueryMerge(
ctx context.Context,
query string,
start, end time.Time,
groupByColumns []string,
groupByLabels []string,
invertCallStacks bool,
) (profile.Profile, error) {
ctx, span := q.tracer.Start(ctx, "Querier/QueryMerge")
defer span.End()

records, valueColumn, queryParts, err := q.selectMerge(ctx, query, start, end, groupByColumns)
records, valueColumn, queryParts, err := q.selectMerge(ctx, query, start, end, groupByLabels)
if err != nil {
return profile.Profile{}, err
}
Expand Down Expand Up @@ -1385,7 +1385,7 @@ func (q *Querier) selectMerge(
query string,
startTime,
endTime time.Time,
groupByColumns []string,
groupByLabels []string,
) ([]arrow.Record, string, QueryParts, error) {
ctx, span := q.tracer.Start(ctx, "Querier/selectMerge")
defer span.End()
Expand All @@ -1408,18 +1408,13 @@ func (q *Querier) selectMerge(
)

totalSum := logicalplan.Sum(logicalplan.Col(profile.ColumnValue))
durationSum := logicalplan.Sum(logicalplan.Col(profile.ColumnDuration))

columnsGroupBy := []logicalplan.Expr{
logicalplan.Col(profile.ColumnStacktrace),
}

for _, col := range groupByColumns {
// We only want to group by `labels.` prefixed columns.
// There can be some columns like `function_name` that are virtual and don't actually exist.
if strings.HasPrefix(col, profile.ColumnLabelsPrefix) {
columnsGroupBy = append(columnsGroupBy, logicalplan.Col(col))
}
for _, col := range groupByLabels {
columnsGroupBy = append(columnsGroupBy, logicalplan.Col(col))
}

var valueCol logicalplan.Expr = logicalplan.Col(profile.ColumnValue)
Expand All @@ -1445,19 +1440,12 @@ func (q *Querier) selectMerge(
}

if queryParts.Delta {
// Only for cpu and nanoseconds do we first project the ColumnDuration.
// We then use the aggregation function to sum(duration) for each stacktraces.
// The final project then takes the sum(value) / sum(duration) to get to the per second value.
firstProject = append(firstProject,
logicalplan.Col(profile.ColumnDuration),
)
finalProject = append(finalProject,
logicalplan.Div(
logicalplan.Convert(totalSum, arrow.PrimitiveTypes.Float64),
logicalplan.Convert(durationSum, arrow.PrimitiveTypes.Float64),
logicalplan.Literal(float64(endTime.Sub(startTime).Nanoseconds())),
).Alias(ValuePerSecond),
)
columnsAggregations = append(columnsAggregations, durationSum)
}

records := []arrow.Record{}
Expand All @@ -1478,8 +1466,8 @@ func (q *Querier) selectMerge(
return nil, "", queryParts, err
}

queryParts.Meta.Timestamp = start
queryParts.Meta.SampleType = resultType
queryParts.Meta.Timestamp = start

return records, "sum(value)", queryParts, nil
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/query/columnquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,11 @@ func (q *ColumnQueryAPI) Query(ctx context.Context, req *pb.QueryRequest) (*pb.Q
FlamegraphFieldMappingFile: {},
FlamegraphFieldFunctionFileName: {},
}
groupByLabels := make([]string, 0, len(groupBy))
for _, f := range groupBy {
if strings.HasPrefix(f, FlamegraphFieldLabels+".") {
// We allow to group by columns prefixed with labels.
// Add label to the groupByLabels passed to FrostDB
groupByLabels = append(groupByLabels, f)
continue
}
if _, allowed := allowedGroupBy[f]; !allowed {
Expand All @@ -270,7 +272,7 @@ func (q *ColumnQueryAPI) Query(ctx context.Context, req *pb.QueryRequest) (*pb.Q
p, err = q.selectMerge(
ctx,
req.GetMerge(),
groupBy,
groupByLabels,
isInvert,
)
}
Expand Down Expand Up @@ -354,7 +356,7 @@ func (q *ColumnQueryAPI) Query(ctx context.Context, req *pb.QueryRequest) (*pb.Q
req.GetReportType(),
req.GetNodeTrimThreshold(),
filtered,
groupBy,
groupByLabels,
req.GetSourceReference(),
source,
isDiff,
Expand Down

0 comments on commit f96fbb6

Please sign in to comment.