diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index b51acf90a2..c2561034b4 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -338,20 +338,44 @@ func sumSeriesWithWildcards( // aggregateWithWildcards splits the given set of series into sub-groupings // based on wildcard matches in the hierarchy, then aggregate the values in // each grouping based on the given function. +// Similar to combineSeriesWithWildcards function but more general, as it +// supports any aggregate functions while combineSeriesWithWildcards only +// support aggregation with existing ts.ConsolidationFunc. func aggregateWithWildcards( ctx *common.Context, series singlePathSpec, fname string, positions ...int, ) (ts.SeriesList, error) { - f, fexists := summarizeFuncs[fname] - if !fexists { - err := xerrors.NewInvalidParamsError(fmt.Errorf( - "invalid func %s", fname)) - return ts.NewSeriesList(), err + if len(series.Values) == 0 { + return ts.SeriesList(series), nil + } + + toAggregate := splitSeriesIntoSubgroups(series, positions) + + newSeries := make([]*ts.Series, 0, len(toAggregate)) + for name, toAggregateSeries := range toAggregate { + seriesList := ts.SeriesList{ + Values: toAggregateSeries, + Metadata: series.Metadata, + } + aggregated, err := aggregate(ctx, singlePathSpec(seriesList), fname) + if err != nil { + return ts.NewSeriesList(), err + } + renamedSeries := aggregated.Values[0].RenamedTo(name) + newSeries = append(newSeries, renamedSeries) } - return combineSeriesWithWildcards(ctx, series, positions, f.specificationFunc, f.consolidationFunc) + r := ts.SeriesList(series) + + r.Values = newSeries + + // Ranging over hash map to create results destroys + // any sort order on the incoming series list + r.SortApplied = false + + return r, nil } // combineSeriesWithWildcards splits the given set of series into sub-groupings @@ -368,6 +392,34 @@ func combineSeriesWithWildcards( return ts.SeriesList(series), nil } + toCombine := splitSeriesIntoSubgroups(series, positions) + + newSeries := make([]*ts.Series, 0, len(toCombine)) + for name, toCombineSeries := range toCombine { + seriesList := ts.SeriesList{ + Values: toCombineSeries, + Metadata: series.Metadata, + } + combined, err := combineSeries(ctx, multiplePathSpecs(seriesList), name, f) + if err != nil { + return ts.NewSeriesList(), err + } + combined.Values[0].Specification = sf(seriesList) + newSeries = append(newSeries, combined.Values...) + } + + r := ts.SeriesList(series) + + r.Values = newSeries + + // Ranging over hash map to create results destroys + // any sort order on the incoming series list + r.SortApplied = false + + return r, nil +} + +func splitSeriesIntoSubgroups(series singlePathSpec, positions []int) map[string][]*ts.Series { var ( toCombine = make(map[string][]*ts.Series) wildcards = make(map[int]struct{}) @@ -392,29 +444,7 @@ func combineSeriesWithWildcards( toCombine[newName] = append(toCombine[newName], series) } - newSeries := make([]*ts.Series, 0, len(toCombine)) - for name, combinedSeries := range toCombine { - seriesList := ts.SeriesList{ - Values: combinedSeries, - Metadata: series.Metadata, - } - combined, err := combineSeries(ctx, multiplePathSpecs(seriesList), name, f) - if err != nil { - return ts.NewSeriesList(), err - } - combined.Values[0].Specification = sf(seriesList) - newSeries = append(newSeries, combined.Values...) - } - - r := ts.SeriesList(series) - - r.Values = newSeries - - // Ranging over hash map to create results destroys - // any sort order on the incoming series list - r.SortApplied = false - - return r, nil + return toCombine } // splits a slice into chunks diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 8e9af03045..4b0b9ed6a0 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -651,26 +651,50 @@ func TestAggregateWithWildcards(t *testing.T) { ) defer ctx.Close() - outSeries, err := aggregateWithWildcards(ctx, singlePathSpec{ - Values: inputs, - }, "sum", 1, 2) - require.NoError(t, err) - require.Equal(t, 2, len(outSeries.Values)) - - outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false) - - expectedOutputs := []struct { + type result struct { name string sumOfVals float64 + } + + tests := []struct { + fname string + nodes []int + expectedResults []result }{ - {"servers.status.400", 90 * 12}, - {"servers.status.500", 30 * 12}, + {"avg", []int{1, 2}, []result{ + {"servers.status.400", ((20 + 30 + 40) / 3) * 12}, + {"servers.status.500", ((2 + 4 + 6 + 8 + 10) / 5) * 12}, + }}, + {"max", []int{2, 4}, []result{ + {"servers.status.400", 40 * 12}, + {"servers.status.500", 10 * 12}, + }}, + {"min", []int{2, -1}, []result{ + {"servers.status.400", 20 * 12}, + {"servers.status.500", 2 * 12}, + }}, + {"median", []int{1, 2}, []result{ + {"servers.status.400", 30 * 12}, + {"servers.status.500", 6 * 12}, + }}, } - for i, expected := range expectedOutputs { - series := outSeries.Values[i] - assert.Equal(t, expected.name, series.Name()) - assert.Equal(t, expected.sumOfVals, series.SafeSum()) + for _, test := range tests { + outSeries, err := aggregateWithWildcards(ctx, singlePathSpec{ + Values: inputs, + }, test.fname, 1, 2) + require.NoError(t, err) + require.Equal(t, 2, len(outSeries.Values)) + + outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false) + + for i, expected := range test.expectedResults { + series := outSeries.Values[i] + assert.Equal(t, expected.name, series.Name(), "wrong name for %v %s (%d)", test.nodes, test.fname, i) + + assert.Equal(t, expected.sumOfVals, series.SafeSum(), + "wrong result for %v %s (%d)", test.nodes, test.fname, i) + } } }