Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fix bugs in aggregateWithWildcards and update it to support more aggr… #3469

Merged
merged 7 commits into from
May 6, 2021
Merged
88 changes: 59 additions & 29 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,20 +338,44 @@ func sumSeriesWithWildcards(
// aggregateWithWildcards splits the given set of series into sub-groupings
// based on wildcard matches in the hierarchy, then aggregate the values in
// each grouping based on the given function.
// Similar to combineSeriesWithWildcards function but more general, as it
// supports any aggregate functions while combineSeriesWithWildcards only
// support aggregation with existing ts.ConsolidationFunc.
func aggregateWithWildcards(
ctx *common.Context,
series singlePathSpec,
fname string,
positions ...int,
) (ts.SeriesList, error) {
f, fexists := summarizeFuncs[fname]
if !fexists {
err := xerrors.NewInvalidParamsError(fmt.Errorf(
"invalid func %s", fname))
return ts.NewSeriesList(), err
if len(series.Values) == 0 {
return ts.SeriesList(series), nil
}

toAggregate := splitSeriesIntoSubgroups(series, positions)

newSeries := make([]*ts.Series, 0, len(toAggregate))
for name, toAggregateSeries := range toAggregate {
seriesList := ts.SeriesList{
Values: toAggregateSeries,
Metadata: series.Metadata,
}
aggregated, err := aggregate(ctx, singlePathSpec(seriesList), fname)
if err != nil {
return ts.NewSeriesList(), err
}
renamedSeries := aggregated.Values[0].RenamedTo(name)
newSeries = append(newSeries, renamedSeries)
}

return combineSeriesWithWildcards(ctx, series, positions, f.specificationFunc, f.consolidationFunc)
r := ts.SeriesList(series)

r.Values = newSeries

// Ranging over hash map to create results destroys
// any sort order on the incoming series list
r.SortApplied = false

return r, nil
}

// combineSeriesWithWildcards splits the given set of series into sub-groupings
Expand All @@ -368,6 +392,34 @@ func combineSeriesWithWildcards(
return ts.SeriesList(series), nil
}

toCombine := splitSeriesIntoSubgroups(series, positions)

newSeries := make([]*ts.Series, 0, len(toCombine))
for name, toCombineSeries := range toCombine {
seriesList := ts.SeriesList{
Values: toCombineSeries,
Metadata: series.Metadata,
}
combined, err := combineSeries(ctx, multiplePathSpecs(seriesList), name, f)
if err != nil {
return ts.NewSeriesList(), err
}
combined.Values[0].Specification = sf(seriesList)
newSeries = append(newSeries, combined.Values...)
}

r := ts.SeriesList(series)

r.Values = newSeries

// Ranging over hash map to create results destroys
// any sort order on the incoming series list
r.SortApplied = false

return r, nil
}

func splitSeriesIntoSubgroups(series singlePathSpec, positions []int) map[string][]*ts.Series {
var (
toCombine = make(map[string][]*ts.Series)
wildcards = make(map[int]struct{})
Expand All @@ -392,29 +444,7 @@ func combineSeriesWithWildcards(
toCombine[newName] = append(toCombine[newName], series)
}

newSeries := make([]*ts.Series, 0, len(toCombine))
for name, combinedSeries := range toCombine {
seriesList := ts.SeriesList{
Values: combinedSeries,
Metadata: series.Metadata,
}
combined, err := combineSeries(ctx, multiplePathSpecs(seriesList), name, f)
if err != nil {
return ts.NewSeriesList(), err
}
combined.Values[0].Specification = sf(seriesList)
newSeries = append(newSeries, combined.Values...)
}

r := ts.SeriesList(series)

r.Values = newSeries

// Ranging over hash map to create results destroys
// any sort order on the incoming series list
r.SortApplied = false

return r, nil
return toCombine
}

// splits a slice into chunks
Expand Down
54 changes: 39 additions & 15 deletions src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -651,26 +651,50 @@ func TestAggregateWithWildcards(t *testing.T) {
)
defer ctx.Close()

outSeries, err := aggregateWithWildcards(ctx, singlePathSpec{
Values: inputs,
}, "sum", 1, 2)
require.NoError(t, err)
require.Equal(t, 2, len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false)

expectedOutputs := []struct {
type result struct {
name string
sumOfVals float64
}

tests := []struct {
fname string
nodes []int
expectedResults []result
}{
{"servers.status.400", 90 * 12},
{"servers.status.500", 30 * 12},
{"avg", []int{1, 2}, []result{
{"servers.status.400", ((20 + 30 + 40) / 3) * 12},
{"servers.status.500", ((2 + 4 + 6 + 8 + 10) / 5) * 12},
}},
{"max", []int{2, 4}, []result{
{"servers.status.400", 40 * 12},
{"servers.status.500", 10 * 12},
}},
{"min", []int{2, -1}, []result{
{"servers.status.400", 20 * 12},
{"servers.status.500", 2 * 12},
}},
{"median", []int{1, 2}, []result{
{"servers.status.400", 30 * 12},
{"servers.status.500", 6 * 12},
}},
}

for i, expected := range expectedOutputs {
series := outSeries.Values[i]
assert.Equal(t, expected.name, series.Name())
assert.Equal(t, expected.sumOfVals, series.SafeSum())
for _, test := range tests {
outSeries, err := aggregateWithWildcards(ctx, singlePathSpec{
Values: inputs,
}, test.fname, 1, 2)
require.NoError(t, err)
require.Equal(t, 2, len(outSeries.Values))

outSeries, _ = sortByName(ctx, singlePathSpec(outSeries), false, false)

for i, expected := range test.expectedResults {
series := outSeries.Values[i]
assert.Equal(t, expected.name, series.Name(), "wrong name for %v %s (%d)", test.nodes, test.fname, i)

assert.Equal(t, expected.sumOfVals, series.SafeSum(),
"wrong result for %v %s (%d)", test.nodes, test.fname, i)
}
}
}

Expand Down