Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[m3query] Add graphite function - aggregate #2584

Merged
merged 18 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 111 additions & 13 deletions src/query/graphite/native/aggregation_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"math"
"strings"

"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/errors"
"github.com/m3db/m3/src/query/graphite/ts"
)
Expand All @@ -38,7 +38,7 @@ func wrapPathExpr(wrapper string, series ts.SeriesList) string {
// sumSeries adds metrics together and returns the sum at each datapoint.
// If the time series have different intervals, the coarsest interval will be used.
func sumSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) {
return combineSeries(ctx, series, wrapPathExpr("sumSeries", ts.SeriesList(series)), ts.Sum)
return combineSeries(ctx, series, wrapPathExpr(sumSeriesFnName, ts.SeriesList(series)), ts.Sum)
}

// diffSeries subtracts all but the first series from the first series.
Expand All @@ -63,31 +63,96 @@ func diffSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, e
}
}

return combineSeries(ctx, transformedSeries, wrapPathExpr("diffSeries", ts.SeriesList(series)), ts.Sum)
return combineSeries(ctx, transformedSeries, wrapPathExpr(diffSeriesFnName, ts.SeriesList(series)), ts.Sum)
}

// multiplySeries multiplies metrics together and returns the product at each datapoint.
// If the time series have different intervals, the coarsest interval will be used.
func multiplySeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) {
return combineSeries(ctx, series, wrapPathExpr("multiplySeries", ts.SeriesList(series)), ts.Mul)
return combineSeries(ctx, series, wrapPathExpr(multiplySeriesFnName, ts.SeriesList(series)), ts.Mul)
}

// averageSeries takes a list of series and returns a new series containing the
// average of all values at each datapoint.
func averageSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) {
return combineSeries(ctx, series, wrapPathExpr("averageSeries", ts.SeriesList(series)), ts.Avg)
return combineSeries(ctx, series, wrapPathExpr(averageSeriesFnName, ts.SeriesList(series)), ts.Avg)
}

// minSeries takes a list of series and returns a new series containing the
// minimum value across the series at each datapoint
func minSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) {
return combineSeries(ctx, series, wrapPathExpr("minSeries", ts.SeriesList(series)), ts.Min)
return combineSeries(ctx, series, wrapPathExpr(minSeriesFnName, ts.SeriesList(series)), ts.Min)
}

// maxSeries takes a list of series and returns a new series containing the
// maximum value across the series at each datapoint
func maxSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) {
return combineSeries(ctx, series, wrapPathExpr("maxSeries", ts.SeriesList(series)), ts.Max)
return combineSeries(ctx, series, wrapPathExpr(maxSeriesFnName, ts.SeriesList(series)), ts.Max)
}

// lastSeries takes a list of series and returns a new series containing the
// last value at each datapoint
func lastSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) {
return combineSeries(ctx, series, wrapPathExpr(lastFnName, ts.SeriesList(series)), ts.Last)
}

// standardDeviationHelper returns the standard deviation of a slice of a []float64
func standardDeviationHelper(values []float64) float64 {
var count, sum float64

for _, value := range values {
if !math.IsNaN(value) {
sum += value
count++
}
}
if count == 0 { return math.NaN() }
avg := sum / count
Copy link
Collaborator

@arnikola arnikola Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add a guard here e.g. if count == 0 { return math.NaN() }

teddywahle marked this conversation as resolved.
Show resolved Hide resolved

m2 := float64(0)
for _, value := range values {
if !math.IsNaN(value) {
diff := value - avg
m2 += diff * diff
}
}

variance := m2 / count

return math.Sqrt(variance)
}

// stddevSeries takes a list of series and returns a new series containing the
// standard deviation at each datapoint
// At step n, stddevSeries will make a list of every series' nth value,
// and calculate the standard deviation of that list.
// The output is a seriesList containing 1 series
func stddevSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesList, error) {
if len(seriesList.Values) == 0 {
return ts.NewSeriesList(), nil
}

firstSeries := seriesList.Values[0]
numSteps := firstSeries.Len()
values := ts.NewValues(ctx, firstSeries.MillisPerStep(), numSteps)
valuesAtTime := make([]float64, 0, numSteps)
for i := 0; i < numSteps; i++ {
valuesAtTime = valuesAtTime[:0]
if l := seriesList.Len(); l != numSteps {
return nil, fmt.Errorf("mismatched series length, expected %d, got %d", numSteps, l)
}
for _, series := range seriesList.Values {
Copy link
Collaborator

@arnikola arnikola Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a little dangerous since series within the same query, series may (potentially) have different resolutions here, e.g.

If you have foo.real.time aggregated to the 10d:1m namespace, but foo.capacity.planning is aggregated only to 1y:1d, a query for foo.*.* will return both, and they'd have very different amounts of data.

Luckily, this is unlikely to come up unless you have any non-overlapping metrics in multiple Graphite namespaces, and I doubt this issue is local to this function only; wouldn't be surprised to see the same issue in some of the existing functions. Perhaps this should just do a fail-fast sanity check like this for now?

if l := seriesList.Len(); l != numSteps {
 return nil, fmt.Errorf("mismatched series length, expected %d, got %d", numSteps, l)
} 

If you're interested, it will probably be some downsampling we perform around here: https://github.com/m3db/m3/blob/master/src/query/graphite/storage/m3_wrapper.go#L212 to downsample all series to the largest possible resolution using something like ts.LTTB(series, start, end, newStep)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, added this check.

teddywahle marked this conversation as resolved.
Show resolved Hide resolved
valuesAtTime = append(valuesAtTime, series.ValueAt(i))
}
values.SetValueAt(i, standardDeviationHelper(valuesAtTime))
}

name := wrapPathExpr(stddevSeriesFnName, ts.SeriesList(seriesList))
output := ts.NewSeries(ctx, name, firstSeries.StartTime(), values)
return ts.SeriesList{
Values: []*ts.Series{output},
Metadata: seriesList.Metadata,
}, nil
}

func divideSeriesHelper(ctx *common.Context, dividendSeries, divisorSeries *ts.Series, metadata block.ResultMetadata) (*ts.Series, error) {
Expand All @@ -111,7 +176,7 @@ func divideSeriesHelper(ctx *common.Context, dividendSeries, divisorSeries *ts.S
vals.SetValueAt(i, value)
}
}

// The individual series will be named divideSeries(X, X), even if it is generated by divideSeriesLists
// Based on Graphite source code (link below)
// https://github.com/graphite-project/graphite-web/blob/17a34e7966f7a46eded30c2362765c74eea899cb/webapp/graphite/render/functions.py#L901
Expand All @@ -137,7 +202,7 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin
divisorSeries := divisorSeriesList.Values[0]
results := make([]*ts.Series, len(dividendSeriesList.Values))
for idx, dividendSeries := range dividendSeriesList.Values {
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata)
if err != nil {
return ts.NewSeriesList(), err
Expand All @@ -150,7 +215,6 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin
return r, nil
}


// divideSeriesLists divides one series list by another series list
func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesList singlePathSpec) (ts.SeriesList, error) {
if len(dividendSeriesList.Values) != len(divisorSeriesList.Values) {
Expand All @@ -161,7 +225,7 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis
results := make([]*ts.Series, len(dividendSeriesList.Values))
for idx, dividendSeries := range dividendSeriesList.Values {
divisorSeries := divisorSeriesList.Values[idx]
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata)
quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata)
if err != nil {
return ts.NewSeriesList(), err
Expand All @@ -174,6 +238,40 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis
return r, nil
}

// aggregate takes a list of series and returns a new series containing the
// value aggregated across the series at each datapoint using the specified function.
// This function can be used with aggregation functionsL average (or avg), avg_zero,
// median, sum (or total), min, max, diff, stddev, count,
// range (or rangeOf), multiply & last (or current).
func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.SeriesList, error) {
switch fname {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Believe you're missing support for a few here? Namely:

  • avg_zero
  • median
  • stddev
  • range (or rangeOf)
  • last (or current)

As per:
https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.aggregate

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I was just checking the implemented methods in the aggregation_function.go, forgot to check the whole list in buildin_function.go. (stddev and range seems are implemented already) Will update the support for methods that are already implemented.

For other aggregate functions that are not implemented, I will update the PR to document them for now. and do some query internally to see if any of them are actually being used, if used, I will implement them.

case emptyFnName, sumFnName, sumSeriesFnName, totalFnName:
return sumSeries(ctx, multiplePathSpecs(series))
case minFnName, minSeriesFnName:
return minSeries(ctx, multiplePathSpecs(series))
case maxFnName, maxSeriesFnName:
return maxSeries(ctx, multiplePathSpecs(series))
case avgFnName, averageFnName, averageSeriesFnName:
return averageSeries(ctx, multiplePathSpecs(series))
case multiplyFnName, multiplySeriesFnName:
return multiplySeries(ctx, multiplePathSpecs(series))
case diffFnName, diffSeriesFnName:
return diffSeries(ctx, multiplePathSpecs(series))
case countFnName, countSeriesFnName:
return countSeries(ctx, multiplePathSpecs(series))
case rangeFnName, rangeOfFnName, rangeOfSeriesFnName:
return rangeOfSeries(ctx, series)
case lastFnName, currentFnName:
return lastSeries(ctx, multiplePathSpecs(series))
case stddevFnName, stdevFnName, stddevSeriesFnName:
return stddevSeries(ctx, multiplePathSpecs(series))
default:
// Median: the movingMedian() method already implemented is returning an series non compatible result. skip support for now.
// avg_zero is not implemented, skip support for now unless later identified actual use cases.
return ts.NewSeriesList(), errors.NewInvalidParamsError(fmt.Errorf("invalid func %s", fname))
}
}

// averageSeriesWithWildcards splits the given set of series into sub-groupings
// based on wildcard matches in the hierarchy, then averages the values in each
// grouping
Expand Down Expand Up @@ -355,7 +453,7 @@ func groupByNodes(ctx *common.Context, series singlePathSpec, fname string, node

func applyFnToMetaSeries(ctx *common.Context, series singlePathSpec, metaSeries map[string][]*ts.Series, fname string) (ts.SeriesList, error) {
if fname == "" {
fname = "sum"
fname = sumFnName
}

f, fexists := summarizeFuncs[fname]
Expand Down Expand Up @@ -524,7 +622,7 @@ func weightedAverage(
// countSeries draws a horizontal line representing the number of nodes found in the seriesList.
func countSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesList, error) {
count, err := common.Count(ctx, ts.SeriesList(seriesList), func(series ts.SeriesList) string {
return wrapPathExpr("countSeries", series)
return wrapPathExpr(countSeriesFnName, series)
})
if err != nil {
return ts.NewSeriesList(), err
Expand Down
42 changes: 41 additions & 1 deletion src/query/graphite/native/aggregation_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/m3db/m3/src/query/graphite/context"
"github.com/m3db/m3/src/query/graphite/storage"
"github.com/m3db/m3/src/query/graphite/ts"
xgomock "github.com/m3db/m3/src/x/test"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -134,6 +135,46 @@ func TestSumSeries(t *testing.T) {
}, 15.0, 28.0, 30.0, 17.0, "invalid sum value for step %d")
}

func TestStdDevSeries(t *testing.T) {
var (
ctrl = xgomock.NewController(t)
store = storage.NewMockStorage(ctrl)
engine = NewEngine(store)
start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT")
end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT")
ctx = common.NewContext(common.ContextOptions{Start: start, End: end, Engine: engine})
millisPerStep = 60000
inputs = []*ts.Series{
ts.NewSeries(ctx, "servers.s2", start,
common.NewTestSeriesValues(ctx, millisPerStep, []float64{10, 20, 30})),
ts.NewSeries(ctx, "servers.s1", start,
common.NewTestSeriesValues(ctx, millisPerStep, []float64{90, 80, 70})),
}
)

expectedResults := []common.TestSeries{
{
Name: "stddevSeries(servers.s2,servers.s1)",
Data: []float64{40, 30, 20},
},
}
result, err := stddevSeries(ctx, multiplePathSpecs{
Values: inputs,
})
require.NoError(t, err)
common.CompareOutputsAndExpected(t, 60000, start, expectedResults, result.Values)
}

func TestAggregate(t *testing.T) {
testAggregatedSeries(t, func(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) {
return aggregate(ctx, singlePathSpec(series), "sum")
}, 15.0, 28.0, 30.0, 17.0, "invalid sum value for step %d")

testAggregatedSeries(t, func(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) {
return aggregate(ctx, singlePathSpec(series), "maxSeries")
}, 15.0, 15.0, 17.0, 17.0, "invalid max value for step %d")
}

type mockEngine struct {
fn func(
ctx context.Context,
Expand Down Expand Up @@ -314,7 +355,6 @@ func TestDivideSeriesLists(t *testing.T) {
require.Error(t, err)
}


func TestAverageSeriesWithWildcards(t *testing.T) {
ctx, _ := newConsolidationTestSeries()
defer ctx.Close()
Expand Down
2 changes: 2 additions & 0 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2255,6 +2255,7 @@ func threshold(ctx *common.Context, value float64, label string, color string) (
func init() {
// functions - in alpha ordering
MustRegisterFunction(absolute)
MustRegisterFunction(aggregate)
MustRegisterFunction(aggregateLine).WithDefaultParams(map[uint8]interface{}{
2: "avg", // f
})
Expand Down Expand Up @@ -2365,6 +2366,7 @@ func init() {
MustRegisterFunction(stdev).WithDefaultParams(map[uint8]interface{}{
3: 0.1, // windowTolerance
})
MustRegisterFunction(stddevSeries)
MustRegisterFunction(substr).WithDefaultParams(map[uint8]interface{}{
2: 0, // start
3: 0, // stop
Expand Down
2 changes: 2 additions & 0 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3322,6 +3322,7 @@ func TestFunctionsRegistered(t *testing.T) {
fnames := []string{
"abs",
"absolute",
"aggregate",
"aggregateLine",
"alias",
"aliasByMetric",
Expand Down Expand Up @@ -3409,6 +3410,7 @@ func TestFunctionsRegistered(t *testing.T) {
"sortByTotal",
"squareRoot",
"stdev",
"stddevSeries",
"substr",
"sum",
"sumSeries",
Expand Down
30 changes: 30 additions & 0 deletions src/query/graphite/native/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,36 @@ var (
functions = map[string]*Function{}
)

// list of graphite function name strings. (not whole list, update on-demand)
const (
averageFnName = "average"
averageSeriesFnName = "averageSeries"
avgFnName = "avg"
countFnName = "count"
countSeriesFnName = "countSeries"
currentFnName = "current"
diffFnName = "diff"
diffSeriesFnName = "diffSeries"
emptyFnName = ""
lastFnName = "last"
lastSeriesFnName = "lastSeries"
maxFnName = "max"
maxSeriesFnName = "maxSeries"
minFnName = "min"
minSeriesFnName = "minSeries"
multiplyFnName = "multiply"
multiplySeriesFnName = "multiplySeries"
rangeFnName = "range"
rangeOfFnName = "rangeOf"
rangeOfSeriesFnName = "rangeOfSeries"
stdevFnName = "stdev"
stddevFnName = "stddev"
stddevSeriesFnName = "stddevSeries"
sumFnName = "sum"
sumSeriesFnName = "sumSeries"
totalFnName = "total"
)

// registerFunction is used to register a function under a specific name
func registerFunction(f interface{}) (*Function, error) {
fn, err := buildFunction(f)
Expand Down