From 4348ec50e3c54bc5ab175dde0e5fd3e4fd3b6dbc Mon Sep 17 00:00:00 2001 From: Evan Yin Date: Wed, 2 Sep 2020 17:35:15 -0700 Subject: [PATCH 01/10] [m3query] Add graphite function - aggregate --- .../graphite/native/aggregation_functions.go | 39 +++++++++++++++---- .../native/aggregation_functions_test.go | 10 +++++ .../graphite/native/builtin_functions.go | 1 + src/query/graphite/native/functions.go | 20 ++++++++++ 4 files changed, 62 insertions(+), 8 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 5ea1949431..55bc6415a8 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -37,7 +37,7 @@ func wrapPathExpr(wrapper string, series ts.SeriesList) string { // sumSeries adds metrics together and returns the sum at each datapoint. // If the time series have different intervals, the coarsest interval will be used. func sumSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr("sumSeries", ts.SeriesList(series)), ts.Sum) + return combineSeries(ctx, series, wrapPathExpr(SumSeries, ts.SeriesList(series)), ts.Sum) } // diffSeries subtracts all but the first series from the first series. @@ -62,31 +62,31 @@ func diffSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, e } } - return combineSeries(ctx, transformedSeries, wrapPathExpr("diffSeries", ts.SeriesList(series)), ts.Sum) + return combineSeries(ctx, transformedSeries, wrapPathExpr(DiffSeries, ts.SeriesList(series)), ts.Sum) } // multiplySeries multiplies metrics together and returns the product at each datapoint. // If the time series have different intervals, the coarsest interval will be used. func multiplySeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr("multiplySeries", ts.SeriesList(series)), ts.Mul) + return combineSeries(ctx, series, wrapPathExpr(MultiplySeries, ts.SeriesList(series)), ts.Mul) } // averageSeries takes a list of series and returns a new series containing the // average of all values at each datapoint. func averageSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr("averageSeries", ts.SeriesList(series)), ts.Avg) + return combineSeries(ctx, series, wrapPathExpr(AverageSeries, ts.SeriesList(series)), ts.Avg) } // minSeries takes a list of series and returns a new series containing the // minimum value across the series at each datapoint func minSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr("minSeries", ts.SeriesList(series)), ts.Min) + return combineSeries(ctx, series, wrapPathExpr(MinSeries, ts.SeriesList(series)), ts.Min) } // maxSeries takes a list of series and returns a new series containing the // maximum value across the series at each datapoint func maxSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr("maxSeries", ts.SeriesList(series)), ts.Max) + return combineSeries(ctx, series, wrapPathExpr(MaxSeries, ts.SeriesList(series)), ts.Max) } // divideSeries divides one series list by another series @@ -135,6 +135,29 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin return r, nil } +// aggregate takes a list of series and returns a new series containing the +// value aggregated across the series at each datapoint using the specified function. +func aggregate(ctx *common.Context, series multiplePathSpecs, fname string) (ts.SeriesList, error) { + switch fname { + case Empty, Sum, SumSeries, Total: + return sumSeries(ctx, series) + case Min, MinSeries: + return minSeries(ctx, series) + case Max, MaxSeries: + return maxSeries(ctx, series) + case Avg, Average, AverageSeries: + return averageSeries(ctx,series) + case Multiply, MultiplySeries: + return multiplySeries(ctx, series) + case Diff, DiffSeries: + return diffSeries(ctx, series) + case Count, CountSeries: + return countSeries(ctx, series) + default: + return ts.NewSeriesList(), errors.NewInvalidParamsError(fmt.Errorf("invalid func %s", fname)) + } +} + // averageSeriesWithWildcards splits the given set of series into sub-groupings // based on wildcard matches in the hierarchy, then averages the values in each // grouping @@ -316,7 +339,7 @@ func groupByNodes(ctx *common.Context, series singlePathSpec, fname string, node func applyFnToMetaSeries(ctx *common.Context, series singlePathSpec, metaSeries map[string][]*ts.Series, fname string) (ts.SeriesList, error) { if fname == "" { - fname = "sum" + fname = Sum } f, fexists := summarizeFuncs[fname] @@ -485,7 +508,7 @@ func weightedAverage( // countSeries draws a horizontal line representing the number of nodes found in the seriesList. func countSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesList, error) { count, err := common.Count(ctx, ts.SeriesList(seriesList), func(series ts.SeriesList) string { - return wrapPathExpr("countSeries", series) + return wrapPathExpr(CountSeries, series) }) if err != nil { return ts.NewSeriesList(), err diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index cb19232d5b..48f65ac72f 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -134,6 +134,16 @@ func TestSumSeries(t *testing.T) { }, 15.0, 28.0, 30.0, 17.0, "invalid sum value for step %d") } +func TestAggregate(t *testing.T) { + testAggregatedSeries(t, func(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { + return aggregate(ctx, series, "sum") + }, 15.0, 28.0, 30.0, 17.0, "invalid sum value for step %d") + + testAggregatedSeries(t, func(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { + return aggregate(ctx, series, "maxSeries") + }, 15.0, 15.0, 17.0, 17.0, "invalid max value for step %d") +} + type mockEngine struct { fn func( ctx context.Context, diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index 40bce34c6a..e2961ce147 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -1862,6 +1862,7 @@ func threshold(ctx *common.Context, value float64, label string, color string) ( func init() { // functions - in alpha ordering MustRegisterFunction(absolute) + MustRegisterFunction(aggregate) MustRegisterFunction(aggregateLine).WithDefaultParams(map[uint8]interface{}{ 2: "avg", // f }) diff --git a/src/query/graphite/native/functions.go b/src/query/graphite/native/functions.go index ba17d76195..a812ad2c63 100644 --- a/src/query/graphite/native/functions.go +++ b/src/query/graphite/native/functions.go @@ -39,6 +39,26 @@ var ( funcMut sync.RWMutex functions = map[string]*Function{} ) +// list of graphite function name strings. (not whole list, update on-demand) +const ( + Average = "average" + AverageSeries = "averageSeries" + Avg = "avg" + Count = "count" + CountSeries = "countSeries" + Diff = "diff" + DiffSeries = "diffSeries" + Empty = "" + Max = "max" + MaxSeries = "maxSeries" + Min = "min" + MinSeries = "minSeries" + Multiply = "multiply" + MultiplySeries = "multiplySeries" + Sum = "sum" + SumSeries = "sumSeries" + Total = "total" +) // registerFunction is used to register a function under a specific name func registerFunction(f interface{}) (*Function, error) { From 36c0c65b99603b2cec2d50d485bab1e290d9b040 Mon Sep 17 00:00:00 2001 From: Evan Yin Date: Wed, 2 Sep 2020 22:19:33 -0700 Subject: [PATCH 02/10] Fix input type --- .../graphite/native/aggregation_functions.go | 16 ++++++++-------- .../native/aggregation_functions_test.go | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 55bc6415a8..60c7691ce8 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -137,22 +137,22 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin // aggregate takes a list of series and returns a new series containing the // value aggregated across the series at each datapoint using the specified function. -func aggregate(ctx *common.Context, series multiplePathSpecs, fname string) (ts.SeriesList, error) { +func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.SeriesList, error) { switch fname { case Empty, Sum, SumSeries, Total: - return sumSeries(ctx, series) + return sumSeries(ctx, multiplePathSpecs(series)) case Min, MinSeries: - return minSeries(ctx, series) + return minSeries(ctx, multiplePathSpecs(series)) case Max, MaxSeries: - return maxSeries(ctx, series) + return maxSeries(ctx, multiplePathSpecs(series)) case Avg, Average, AverageSeries: - return averageSeries(ctx,series) + return averageSeries(ctx,multiplePathSpecs(series)) case Multiply, MultiplySeries: - return multiplySeries(ctx, series) + return multiplySeries(ctx, multiplePathSpecs(series)) case Diff, DiffSeries: - return diffSeries(ctx, series) + return diffSeries(ctx, multiplePathSpecs(series)) case Count, CountSeries: - return countSeries(ctx, series) + return countSeries(ctx, multiplePathSpecs(series)) default: return ts.NewSeriesList(), errors.NewInvalidParamsError(fmt.Errorf("invalid func %s", fname)) } diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 48f65ac72f..0bc219cb8d 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -136,11 +136,11 @@ func TestSumSeries(t *testing.T) { func TestAggregate(t *testing.T) { testAggregatedSeries(t, func(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return aggregate(ctx, series, "sum") + return aggregate(ctx, singlePathSpec(series), "sum") }, 15.0, 28.0, 30.0, 17.0, "invalid sum value for step %d") testAggregatedSeries(t, func(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return aggregate(ctx, series, "maxSeries") + return aggregate(ctx, singlePathSpec(series), "maxSeries") }, 15.0, 15.0, 17.0, 17.0, "invalid max value for step %d") } From a219d4013f8afe7ff9913ffaab3f75e0a74f08e2 Mon Sep 17 00:00:00 2001 From: Evan Yin Date: Fri, 4 Sep 2020 01:51:11 -0700 Subject: [PATCH 03/10] address feedback --- src/query/graphite/native/aggregation_functions.go | 12 +++++++++++- src/query/graphite/native/functions.go | 8 ++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 60c7691ce8..cae4abe929 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -146,14 +146,24 @@ func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.Ser case Max, MaxSeries: return maxSeries(ctx, multiplePathSpecs(series)) case Avg, Average, AverageSeries: - return averageSeries(ctx,multiplePathSpecs(series)) + return averageSeries(ctx, multiplePathSpecs(series)) case Multiply, MultiplySeries: return multiplySeries(ctx, multiplePathSpecs(series)) case Diff, DiffSeries: return diffSeries(ctx, multiplePathSpecs(series)) case Count, CountSeries: return countSeries(ctx, multiplePathSpecs(series)) + case Range, RangeOf, RangeOfSeries: + return rangeOfSeries(ctx, series) + case Last, KeepLastValue: + return keepLastValue(ctx, series, -1) // using default value, same as graphite-web python implementation. + case Stdev, Stddev: + // default value for points in python implementation not clear, using an arbitrary 5 data points value should works fine. + // default value for null value ratio is same as graphite-web python implementation. + return stdev(ctx, series, 5, 0.1) default: + // Median: the movingMedian() method already implemented is returning an series non compatible result. skip support for now. + // avg_zero is not implemented, skip support for now unless later identified actual use cases. return ts.NewSeriesList(), errors.NewInvalidParamsError(fmt.Errorf("invalid func %s", fname)) } } diff --git a/src/query/graphite/native/functions.go b/src/query/graphite/native/functions.go index a812ad2c63..ba7edc1765 100644 --- a/src/query/graphite/native/functions.go +++ b/src/query/graphite/native/functions.go @@ -39,6 +39,7 @@ var ( funcMut sync.RWMutex functions = map[string]*Function{} ) + // list of graphite function name strings. (not whole list, update on-demand) const ( Average = "average" @@ -49,12 +50,19 @@ const ( Diff = "diff" DiffSeries = "diffSeries" Empty = "" + Last = "last" + KeepLastValue = "keepLastValue" Max = "max" MaxSeries = "maxSeries" Min = "min" MinSeries = "minSeries" Multiply = "multiply" MultiplySeries = "multiplySeries" + Range = "range" + RangeOf = "rangeOf" + RangeOfSeries = "rangeOfSeries" + Stdev = "stdev" + Stddev = "stddev" Sum = "sum" SumSeries = "sumSeries" Total = "total" From d96f486f4cedb8afb66a511046d56f0d386e002e Mon Sep 17 00:00:00 2001 From: teddywahle Date: Wed, 23 Sep 2020 13:36:52 -0700 Subject: [PATCH 04/10] Updated aggregate() --- .../graphite/native/aggregation_functions.go | 72 ++++++++++++++++--- .../native/aggregation_functions_test.go | 1 - .../graphite/native/builtin_functions_test.go | 1 + src/query/graphite/native/functions.go | 4 +- 4 files changed, 66 insertions(+), 12 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 9918ba03f6..4e5e60bfd2 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -25,8 +25,8 @@ import ( "math" "strings" - "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/block" + "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/graphite/ts" ) @@ -90,6 +90,61 @@ func maxSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, er return combineSeries(ctx, series, wrapPathExpr(MaxSeries, ts.SeriesList(series)), ts.Max) } +// lastSeries takes a list of series and returns a new series containing the +// last value at each datapoint +func lastSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { + return combineSeries(ctx, series, wrapPathExpr(LastSeries, ts.SeriesList(series)), ts.Last) +} + +// standardDeviationHelper returns the standard deviation of a slice of a []float64 +func standardDeviationHelper(values []float64) float64 { + count := float64(0) + sum := float64(0) + + for _, value := range values { + if !math.IsNaN(value) { + sum += value + count += 1 + } + } + avg := sum / count + + m2 := float64(0) + for _, value := range values { + if !math.IsNaN(value) { + diff := value - avg + m2 += diff * diff + } + } + + variance := m2 / count + + return math.Sqrt(variance) +} +func stDevSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesList, error) { + if len(seriesList.Values) == 0 { + return ts.NewSeriesList(), nil + } + + firstSeries := seriesList.Values[0] + numSteps := firstSeries.Len() + values := ts.NewValues(ctx, firstSeries.MillisPerStep(), numSteps) + for i := 0; i < numSteps; i++ { + valuesAtTime := []float64{} + for _, series := range seriesList.Values { + valuesAtTime = append(valuesAtTime, series.ValueAt(i)) + } + values.SetValueAt(i, standardDeviationHelper(valuesAtTime)) + } + + name := wrapPathExpr(StddevSeries, ts.SeriesList(seriesList)) + output := ts.NewSeries(ctx, name, firstSeries.StartTime(), values) + return ts.SeriesList{ + Values: []*ts.Series{output}, + Metadata: seriesList.Metadata, + }, nil +} + func divideSeriesHelper(ctx *common.Context, dividendSeries, divisorSeries *ts.Series, metadata block.ResultMetadata) (*ts.Series, error) { normalized, minBegin, _, lcmMillisPerStep, err := common.Normalize(ctx, ts.SeriesList{ Values: []*ts.Series{dividendSeries, divisorSeries}, @@ -111,7 +166,7 @@ func divideSeriesHelper(ctx *common.Context, dividendSeries, divisorSeries *ts.S vals.SetValueAt(i, value) } } - + // The individual series will be named divideSeries(X, X), even if it is generated by divideSeriesLists // Based on Graphite source code (link below) // https://github.com/graphite-project/graphite-web/blob/17a34e7966f7a46eded30c2362765c74eea899cb/webapp/graphite/render/functions.py#L901 @@ -137,7 +192,7 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin divisorSeries := divisorSeriesList.Values[0] results := make([]*ts.Series, len(dividendSeriesList.Values)) for idx, dividendSeries := range dividendSeriesList.Values { - metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata) + metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata) quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata) if err != nil { return ts.NewSeriesList(), err @@ -150,7 +205,6 @@ func divideSeries(ctx *common.Context, dividendSeriesList, divisorSeriesList sin return r, nil } - // divideSeriesLists divides one series list by another series list func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesList singlePathSpec) (ts.SeriesList, error) { if len(dividendSeriesList.Values) != len(divisorSeriesList.Values) { @@ -161,7 +215,7 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis results := make([]*ts.Series, len(dividendSeriesList.Values)) for idx, dividendSeries := range dividendSeriesList.Values { divisorSeries := divisorSeriesList.Values[idx] - metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata) + metadata := divisorSeriesList.Metadata.CombineMetadata(dividendSeriesList.Metadata) quotientSeries, err := divideSeriesHelper(ctx, dividendSeries, divisorSeries, metadata) if err != nil { return ts.NewSeriesList(), err @@ -194,12 +248,10 @@ func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.Ser return countSeries(ctx, multiplePathSpecs(series)) case Range, RangeOf, RangeOfSeries: return rangeOfSeries(ctx, series) - case Last, KeepLastValue: - return keepLastValue(ctx, series, -1) // using default value, same as graphite-web python implementation. + case Last, Current: + return lastSeries(ctx, multiplePathSpecs(series)) case Stdev, Stddev: - // default value for points in python implementation not clear, using an arbitrary 5 data points value should works fine. - // default value for null value ratio is same as graphite-web python implementation. - return stdev(ctx, series, 5, 0.1) + return stDevSeries(ctx, multiplePathSpecs(series)) default: // Median: the movingMedian() method already implemented is returning an series non compatible result. skip support for now. // avg_zero is not implemented, skip support for now unless later identified actual use cases. diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index da6050de29..191428a751 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -324,7 +324,6 @@ func TestDivideSeriesLists(t *testing.T) { require.Error(t, err) } - func TestAverageSeriesWithWildcards(t *testing.T) { ctx, _ := newConsolidationTestSeries() defer ctx.Close() diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index fc3d343572..db1e1e7f60 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -3129,6 +3129,7 @@ func TestFunctionsRegistered(t *testing.T) { fnames := []string{ "abs", "absolute", + "aggregate", "aggregateLine", "alias", "aliasByMetric", diff --git a/src/query/graphite/native/functions.go b/src/query/graphite/native/functions.go index 3881924962..524b78a778 100644 --- a/src/query/graphite/native/functions.go +++ b/src/query/graphite/native/functions.go @@ -47,11 +47,12 @@ const ( Avg = "avg" Count = "count" CountSeries = "countSeries" + Current = "current" Diff = "diff" DiffSeries = "diffSeries" Empty = "" Last = "last" - KeepLastValue = "keepLastValue" + LastSeries = "lastSeries" Max = "max" MaxSeries = "maxSeries" Min = "min" @@ -63,6 +64,7 @@ const ( RangeOfSeries = "rangeOfSeries" Stdev = "stdev" Stddev = "stddev" + StddevSeries = "stddevSeries" Sum = "sum" SumSeries = "sumSeries" Total = "total" From 2c3c182836f83c096b0c7d8bf63728d8d36b5ffe Mon Sep 17 00:00:00 2001 From: teddywahle Date: Thu, 1 Oct 2020 11:28:43 -0700 Subject: [PATCH 05/10] added stddevSeries function properly --- .../graphite/native/aggregation_functions.go | 13 ++++++-- .../native/aggregation_functions_test.go | 31 +++++++++++++++++++ .../graphite/native/builtin_functions.go | 1 + .../graphite/native/builtin_functions_test.go | 1 + 4 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 4e5e60bfd2..efe3e7799d 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -121,7 +121,13 @@ func standardDeviationHelper(values []float64) float64 { return math.Sqrt(variance) } -func stDevSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesList, error) { + +// stddevSeries takes a list of series and returns a new series containing the +// standard deviation at each datapoint +// At step n, stddevSeries will make a list of every series' nth value, +// and calculate the standard deviation of that list. +// The output is a seriesList containing 1 series +func stddevSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesList, error) { if len(seriesList.Values) == 0 { return ts.NewSeriesList(), nil } @@ -230,6 +236,9 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis // aggregate takes a list of series and returns a new series containing the // value aggregated across the series at each datapoint using the specified function. +// This function can be used with aggregation functionsL average (or avg), avg_zero, +// median, sum (or total), min, max, diff, stddev, count, +// range (or rangeOf), multiply & last (or current). func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.SeriesList, error) { switch fname { case Empty, Sum, SumSeries, Total: @@ -251,7 +260,7 @@ func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.Ser case Last, Current: return lastSeries(ctx, multiplePathSpecs(series)) case Stdev, Stddev: - return stDevSeries(ctx, multiplePathSpecs(series)) + return stddevSeries(ctx, multiplePathSpecs(series)) default: // Median: the movingMedian() method already implemented is returning an series non compatible result. skip support for now. // avg_zero is not implemented, skip support for now unless later identified actual use cases. diff --git a/src/query/graphite/native/aggregation_functions_test.go b/src/query/graphite/native/aggregation_functions_test.go index 191428a751..f96c011f2f 100644 --- a/src/query/graphite/native/aggregation_functions_test.go +++ b/src/query/graphite/native/aggregation_functions_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/query/graphite/context" "github.com/m3db/m3/src/query/graphite/storage" "github.com/m3db/m3/src/query/graphite/ts" + xgomock "github.com/m3db/m3/src/x/test" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -134,6 +135,36 @@ func TestSumSeries(t *testing.T) { }, 15.0, 28.0, 30.0, 17.0, "invalid sum value for step %d") } +func TestStdDevSeries(t *testing.T) { + var ( + ctrl = xgomock.NewController(t) + store = storage.NewMockStorage(ctrl) + engine = NewEngine(store) + start, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:41:19 GMT") + end, _ = time.Parse(time.RFC1123, "Mon, 27 Jul 2015 19:43:19 GMT") + ctx = common.NewContext(common.ContextOptions{Start: start, End: end, Engine: engine}) + millisPerStep = 60000 + inputs = []*ts.Series{ + ts.NewSeries(ctx, "servers.s2", start, + common.NewTestSeriesValues(ctx, millisPerStep, []float64{10, 20, 30})), + ts.NewSeries(ctx, "servers.s1", start, + common.NewTestSeriesValues(ctx, millisPerStep, []float64{90, 80, 70})), + } + ) + + expectedResults := []common.TestSeries{ + { + Name: "stddevSeries(servers.s2,servers.s1)", + Data: []float64{40, 30, 20}, + }, + } + result, err := stddevSeries(ctx, multiplePathSpecs{ + Values: inputs, + }) + require.NoError(t, err) + common.CompareOutputsAndExpected(t, 60000, start, expectedResults, result.Values) +} + func TestAggregate(t *testing.T) { testAggregatedSeries(t, func(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { return aggregate(ctx, singlePathSpec(series), "sum") diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index 73061beaa9..860c45bd6e 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -2362,6 +2362,7 @@ func init() { MustRegisterFunction(stdev).WithDefaultParams(map[uint8]interface{}{ 3: 0.1, // windowTolerance }) + MustRegisterFunction(stddevSeries) MustRegisterFunction(substr).WithDefaultParams(map[uint8]interface{}{ 2: 0, // start 3: 0, // stop diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index 96ebd27ff3..648415789e 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -3467,6 +3467,7 @@ func TestFunctionsRegistered(t *testing.T) { "sortByTotal", "squareRoot", "stdev", + "stddevSeries", "substr", "sum", "sumSeries", From 88e6f039848f88cdf40f243f75d887b03fd87623 Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Mon, 5 Oct 2020 14:35:03 -0700 Subject: [PATCH 06/10] Apply suggestions from code review --- .../graphite/native/aggregation_functions.go | 9 ++-- src/query/graphite/native/functions.go | 52 +++++++++---------- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index efe3e7799d..c4f5d1e20e 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -98,15 +98,15 @@ func lastSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, e // standardDeviationHelper returns the standard deviation of a slice of a []float64 func standardDeviationHelper(values []float64) float64 { - count := float64(0) - sum := float64(0) + var count, sum float64 for _, value := range values { if !math.IsNaN(value) { sum += value - count += 1 + count++ } } + if count == 0 { return math.NaN() } avg := sum / count m2 := float64(0) @@ -135,8 +135,9 @@ func stddevSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesL firstSeries := seriesList.Values[0] numSteps := firstSeries.Len() values := ts.NewValues(ctx, firstSeries.MillisPerStep(), numSteps) + valuesAtTime := make([]float64, 0, numSteps) for i := 0; i < numSteps; i++ { - valuesAtTime := []float64{} + valuesAtTime = valuesAtTime[:0] for _, series := range seriesList.Values { valuesAtTime = append(valuesAtTime, series.ValueAt(i)) } diff --git a/src/query/graphite/native/functions.go b/src/query/graphite/native/functions.go index 524b78a778..28f5bb6412 100644 --- a/src/query/graphite/native/functions.go +++ b/src/query/graphite/native/functions.go @@ -42,32 +42,32 @@ var ( // list of graphite function name strings. (not whole list, update on-demand) const ( - Average = "average" - AverageSeries = "averageSeries" - Avg = "avg" - Count = "count" - CountSeries = "countSeries" - Current = "current" - Diff = "diff" - DiffSeries = "diffSeries" - Empty = "" - Last = "last" - LastSeries = "lastSeries" - Max = "max" - MaxSeries = "maxSeries" - Min = "min" - MinSeries = "minSeries" - Multiply = "multiply" - MultiplySeries = "multiplySeries" - Range = "range" - RangeOf = "rangeOf" - RangeOfSeries = "rangeOfSeries" - Stdev = "stdev" - Stddev = "stddev" - StddevSeries = "stddevSeries" - Sum = "sum" - SumSeries = "sumSeries" - Total = "total" + average = "average" + averageSeries = "averageSeries" + avg = "avg" + count = "count" + countSeries = "countSeries" + current = "current" + diff = "diff" + diffSeries = "diffSeries" + empty = "" + last = "last" + lastSeries = "lastSeries" + max = "max" + maxSeries = "maxSeries" + min = "min" + minSeries = "minSeries" + multiply = "multiply" + multiplySeries = "multiplySeries" + range = "range" + rangeOf = "rangeOf" + rangeOfSeries = "rangeOfSeries" + stdev = "stdev" + stddev = "stddev" + stddevSeries = "stddevSeries" + sum = "sum" + sumSeries = "sumSeries" + total = "total" ) // registerFunction is used to register a function under a specific name From ee55448f0a1f0c173ce0ce17c237f8c3d21a4e5e Mon Sep 17 00:00:00 2001 From: teddywahle Date: Mon, 5 Oct 2020 15:06:34 -0700 Subject: [PATCH 07/10] updated all function names --- .../graphite/native/aggregation_functions.go | 40 +++++++------- src/query/graphite/native/functions.go | 52 +++++++++---------- 2 files changed, 46 insertions(+), 46 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index c4f5d1e20e..ab575824e7 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -38,7 +38,7 @@ func wrapPathExpr(wrapper string, series ts.SeriesList) string { // sumSeries adds metrics together and returns the sum at each datapoint. // If the time series have different intervals, the coarsest interval will be used. func sumSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr(SumSeries, ts.SeriesList(series)), ts.Sum) + return combineSeries(ctx, series, wrapPathExpr(sumSeriesFnName, ts.SeriesList(series)), ts.Sum) } // diffSeries subtracts all but the first series from the first series. @@ -63,37 +63,37 @@ func diffSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, e } } - return combineSeries(ctx, transformedSeries, wrapPathExpr(DiffSeries, ts.SeriesList(series)), ts.Sum) + return combineSeries(ctx, transformedSeries, wrapPathExpr(diffSeriesFnName, ts.SeriesList(series)), ts.Sum) } // multiplySeries multiplies metrics together and returns the product at each datapoint. // If the time series have different intervals, the coarsest interval will be used. func multiplySeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr(MultiplySeries, ts.SeriesList(series)), ts.Mul) + return combineSeries(ctx, series, wrapPathExpr(multiplySeriesFnName, ts.SeriesList(series)), ts.Mul) } // averageSeries takes a list of series and returns a new series containing the // average of all values at each datapoint. func averageSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr(AverageSeries, ts.SeriesList(series)), ts.Avg) + return combineSeries(ctx, series, wrapPathExpr(averageSeriesFnName, ts.SeriesList(series)), ts.Avg) } // minSeries takes a list of series and returns a new series containing the // minimum value across the series at each datapoint func minSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr(MinSeries, ts.SeriesList(series)), ts.Min) + return combineSeries(ctx, series, wrapPathExpr(minSeriesFnName, ts.SeriesList(series)), ts.Min) } // maxSeries takes a list of series and returns a new series containing the // maximum value across the series at each datapoint func maxSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr(MaxSeries, ts.SeriesList(series)), ts.Max) + return combineSeries(ctx, series, wrapPathExpr(maxSeriesFnName, ts.SeriesList(series)), ts.Max) } // lastSeries takes a list of series and returns a new series containing the // last value at each datapoint func lastSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr(LastSeries, ts.SeriesList(series)), ts.Last) + return combineSeries(ctx, series, wrapPathExpr(lastFnName, ts.SeriesList(series)), ts.Last) } // standardDeviationHelper returns the standard deviation of a slice of a []float64 @@ -144,7 +144,7 @@ func stddevSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesL values.SetValueAt(i, standardDeviationHelper(valuesAtTime)) } - name := wrapPathExpr(StddevSeries, ts.SeriesList(seriesList)) + name := wrapPathExpr(stddevSeriesFnName, ts.SeriesList(seriesList)) output := ts.NewSeries(ctx, name, firstSeries.StartTime(), values) return ts.SeriesList{ Values: []*ts.Series{output}, @@ -242,25 +242,25 @@ func divideSeriesLists(ctx *common.Context, dividendSeriesList, divisorSeriesLis // range (or rangeOf), multiply & last (or current). func aggregate(ctx *common.Context, series singlePathSpec, fname string) (ts.SeriesList, error) { switch fname { - case Empty, Sum, SumSeries, Total: + case emptyFnName, sumFnName, sumSeriesFnName, totalFnName: return sumSeries(ctx, multiplePathSpecs(series)) - case Min, MinSeries: + case minFnName, minSeriesFnName: return minSeries(ctx, multiplePathSpecs(series)) - case Max, MaxSeries: + case maxFnName, maxSeriesFnName: return maxSeries(ctx, multiplePathSpecs(series)) - case Avg, Average, AverageSeries: + case avgFnName, averageFnName, averageSeriesFnName: return averageSeries(ctx, multiplePathSpecs(series)) - case Multiply, MultiplySeries: + case multiplyFnName, multiplySeriesFnName: return multiplySeries(ctx, multiplePathSpecs(series)) - case Diff, DiffSeries: + case diffFnName, diffSeriesFnName: return diffSeries(ctx, multiplePathSpecs(series)) - case Count, CountSeries: + case countFnName, countSeriesFnName: return countSeries(ctx, multiplePathSpecs(series)) - case Range, RangeOf, RangeOfSeries: + case rangeFnName, rangeOfFnName, rangeOfSeriesFnName: return rangeOfSeries(ctx, series) - case Last, Current: + case lastFnName, currentFnName: return lastSeries(ctx, multiplePathSpecs(series)) - case Stdev, Stddev: + case stddevFnName, stdevFnName, stddevSeriesFnName: return stddevSeries(ctx, multiplePathSpecs(series)) default: // Median: the movingMedian() method already implemented is returning an series non compatible result. skip support for now. @@ -450,7 +450,7 @@ func groupByNodes(ctx *common.Context, series singlePathSpec, fname string, node func applyFnToMetaSeries(ctx *common.Context, series singlePathSpec, metaSeries map[string][]*ts.Series, fname string) (ts.SeriesList, error) { if fname == "" { - fname = Sum + fname = sumFnName } f, fexists := summarizeFuncs[fname] @@ -619,7 +619,7 @@ func weightedAverage( // countSeries draws a horizontal line representing the number of nodes found in the seriesList. func countSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesList, error) { count, err := common.Count(ctx, ts.SeriesList(seriesList), func(series ts.SeriesList) string { - return wrapPathExpr(CountSeries, series) + return wrapPathExpr(countSeriesFnName, series) }) if err != nil { return ts.NewSeriesList(), err diff --git a/src/query/graphite/native/functions.go b/src/query/graphite/native/functions.go index 28f5bb6412..76c08c5a08 100644 --- a/src/query/graphite/native/functions.go +++ b/src/query/graphite/native/functions.go @@ -42,32 +42,32 @@ var ( // list of graphite function name strings. (not whole list, update on-demand) const ( - average = "average" - averageSeries = "averageSeries" - avg = "avg" - count = "count" - countSeries = "countSeries" - current = "current" - diff = "diff" - diffSeries = "diffSeries" - empty = "" - last = "last" - lastSeries = "lastSeries" - max = "max" - maxSeries = "maxSeries" - min = "min" - minSeries = "minSeries" - multiply = "multiply" - multiplySeries = "multiplySeries" - range = "range" - rangeOf = "rangeOf" - rangeOfSeries = "rangeOfSeries" - stdev = "stdev" - stddev = "stddev" - stddevSeries = "stddevSeries" - sum = "sum" - sumSeries = "sumSeries" - total = "total" + averageFnName = "average" + averageSeriesFnName = "averageSeries" + avgFnName = "avg" + countFnName = "count" + countSeriesFnName = "countSeries" + currentFnName = "current" + diffFnName = "diff" + diffSeriesFnName = "diffSeries" + emptyFnName = "" + lastFnName = "last" + lastSeriesFnName = "lastSeries" + maxFnName = "max" + maxSeriesFnName = "maxSeries" + minFnName = "min" + minSeriesFnName = "minSeries" + multiplyFnName = "multiply" + multiplySeriesFnName = "multiplySeries" + rangeFnName = "range" + rangeOfFnName = "rangeOf" + rangeOfSeriesFnName = "rangeOfSeries" + stdevFnName = "stdev" + stddevFnName = "stddev" + stddevSeriesFnName = "stddevSeries" + sumFnName = "sum" + sumSeriesFnName = "sumSeries" + totalFnName = "total" ) // registerFunction is used to register a function under a specific name From 68f4878236159dd2bb64108163724e00ebe0a516 Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Mon, 5 Oct 2020 15:10:38 -0700 Subject: [PATCH 08/10] Apply suggestions from code review --- src/query/graphite/native/aggregation_functions.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index ab575824e7..2ee9395673 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -138,6 +138,9 @@ func stddevSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesL valuesAtTime := make([]float64, 0, numSteps) for i := 0; i < numSteps; i++ { valuesAtTime = valuesAtTime[:0] + if l := seriesList.Len(); l != numSteps { + return nil, fmt.Errorf("mismatched series length, expected %d, got %d", numSteps, l) +} for _, series := range seriesList.Values { valuesAtTime = append(valuesAtTime, series.ValueAt(i)) } From 2d34acee877868c681f4e10d3c93e676ba182ab1 Mon Sep 17 00:00:00 2001 From: teddywahle Date: Mon, 5 Oct 2020 15:20:55 -0700 Subject: [PATCH 09/10] added check --- src/query/graphite/native/aggregation_functions.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 72233039f5..5cb128c9dd 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -110,7 +110,9 @@ func standardDeviationHelper(values []float64) float64 { count++ } } - if count == 0 { return math.NaN() } + if count == 0 { + return math.NaN() + } avg := sum / count m2 := float64(0) @@ -142,10 +144,10 @@ func stddevSeries(ctx *common.Context, seriesList multiplePathSpecs) (ts.SeriesL valuesAtTime := make([]float64, 0, numSteps) for i := 0; i < numSteps; i++ { valuesAtTime = valuesAtTime[:0] - if l := seriesList.Len(); l != numSteps { - return nil, fmt.Errorf("mismatched series length, expected %d, got %d", numSteps, l) -} for _, series := range seriesList.Values { + if l := series.Len(); l != numSteps { + return ts.NewSeriesList(), fmt.Errorf("mismatched series length, expected %d, got %d", numSteps, l) + } valuesAtTime = append(valuesAtTime, series.ValueAt(i)) } values.SetValueAt(i, standardDeviationHelper(valuesAtTime)) From 186830eb2d1d9bdf94b42653e3a741a6794bd61c Mon Sep 17 00:00:00 2001 From: teddywahle Date: Mon, 5 Oct 2020 16:34:09 -0700 Subject: [PATCH 10/10] updated join path nit --- src/query/graphite/native/aggregation_functions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/graphite/native/aggregation_functions.go b/src/query/graphite/native/aggregation_functions.go index 5cb128c9dd..bdc1262e30 100644 --- a/src/query/graphite/native/aggregation_functions.go +++ b/src/query/graphite/native/aggregation_functions.go @@ -97,7 +97,7 @@ func maxSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, er // lastSeries takes a list of series and returns a new series containing the // last value at each datapoint func lastSeries(ctx *common.Context, series multiplePathSpecs) (ts.SeriesList, error) { - return combineSeries(ctx, series, wrapPathExpr(lastFnName, ts.SeriesList(series)), ts.Last) + return combineSeries(ctx, series, joinPathExpr(ts.SeriesList(series)), ts.Last) } // standardDeviationHelper returns the standard deviation of a slice of a []float64