diff --git a/src/query/graphite/common/percentiles.go b/src/query/graphite/common/percentiles.go index 1a526fefde..62ed41a84c 100644 --- a/src/query/graphite/common/percentiles.go +++ b/src/query/graphite/common/percentiles.go @@ -71,6 +71,53 @@ func SafeSort(input []float64) int { return nans } +// SafeSum returns the sum of the input slice the number of NaNs in the input. +func SafeSum(input []float64) (float64, int) { + nans := 0 + sum := 0.0 + for _, v := range input { + if !math.IsNaN(v) { + sum += v + } else { + nans += 1 + } + } + return sum, nans +} + +// SafeMax returns the maximum value of the input slice the number of NaNs in the input. +func SafeMax(input []float64) (float64, int) { + nans := 0 + max := -math.MaxFloat64 + for _, v := range input { + if math.IsNaN(v) { + nans++ + continue + } + if v > max { + max = v + } + } + return max, nans +} + +// SafeMin returns the minimum value of the input slice the number of NaNs in the input. +func SafeMin(input []float64) (float64, int) { + nans := 0 + min := math.MaxFloat64 + for _, v := range input { + if math.IsNaN(v) { + nans++ + continue + } + if v < min { + min = v + } + } + return min, nans +} + + // GetPercentile computes the percentile cut off for an array of floats func GetPercentile(input []float64, percentile float64, interpolate bool) float64 { nans := SafeSort(input) diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index 4a904f4932..88df7f5527 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -1667,16 +1667,70 @@ func changed(ctx *common.Context, seriesList singlePathSpec) (ts.SeriesList, err }) } -// movingMedian takes one metric or a wildcard seriesList followed by a a quoted string -// with a length of time like '1hour' or '5min'. Graphs the median of the preceding -// datapoints for each point on the graph. All previous datapoints are set to None at -// the beginning of the graph. -func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*binaryContextShifter, error) { - interval, err := common.ParseInterval(windowSize) +// windowPointsLength calculates the number of window points in a interval +func windowPointsLength(series *ts.Series, interval time.Duration) int { + return int(interval / (time.Duration(series.MillisPerStep()) * time.Millisecond)) +} + +type movingImplementationFn func(window []float64, values ts.MutableValues, windowPoints int, i int) + +// movingMedianHelper given a slice of floats, calculates the median and assigns it into vals as index i +func movingMedianHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) { + nans := common.SafeSort(window) + + if nans < windowPoints { + index := (windowPoints - nans) / 2 + median := window[nans+index] + vals.SetValueAt(i, median) + } +} + +// movingSumHelper given a slice of floats, calculates the sum and assigns it into vals as index i +func movingSumHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) { + sum, nans := common.SafeSum(window) + + if nans < windowPoints { + vals.SetValueAt(i, sum) + } +} + +// movingMaxHelper given a slice of floats, finds the max and assigns it into vals as index i +func movingMaxHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) { + max, nans := common.SafeMax(window) + + if nans < windowPoints { + vals.SetValueAt(i, max) + } +} + +// movingMinHelper given a slice of floats, finds the min and assigns it into vals as index i +func movingMinHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) { + min, nans := common.SafeMin(window) + + if nans < windowPoints { + vals.SetValueAt(i, min) + } +} + + + +func newMovingBinaryTransform( + ctx *common.Context, + input singlePathSpec, + windowSizeValue genericInterface, + movingFunctionName string, + impl movingImplementationFn, +) (*binaryContextShifter, error) { + if len(input.Values) == 0 { + return nil, nil + } + + windowSize, err := parseWindowSize(windowSizeValue, input) if err != nil { return nil, err } + interval := windowSize.deltaValue if interval <= 0 { return nil, common.ErrInvalidIntervalFormat } @@ -1689,64 +1743,88 @@ func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*bi } bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime - transformerFn := func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) { - bootstrapList, err := combineBootstrapWithOriginal(ctx, - bootstrapStartTime, bootstrapEndTime, - bootstrapped, singlePathSpec(original)) - if err != nil { - return ts.NewSeriesList(), err - } - - results := make([]*ts.Series, 0, original.Len()) - for i, bootstrap := range bootstrapList.Values { - series := original.Values[i] - windowPoints := int(interval / (time.Duration(series.MillisPerStep()) * time.Millisecond)) - if windowPoints <= 0 { - err := errors.NewInvalidParamsError(fmt.Errorf( - "non positive window points, windowSize=%s, stepSize=%d", - windowSize, series.MillisPerStep())) + return &binaryContextShifter{ + ContextShiftFunc: contextShiftingFn, + BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) { + bootstrapList, err := combineBootstrapWithOriginal(ctx, + bootstrapStartTime, bootstrapEndTime, + bootstrapped, singlePathSpec(original)) + if err != nil { return ts.NewSeriesList(), err } - window := make([]float64, windowPoints) - util.Memset(window, math.NaN()) - numSteps := series.Len() - offset := bootstrap.Len() - numSteps - vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps) - for i := 0; i < numSteps; i++ { - for j := i + offset - windowPoints; j < i+offset; j++ { - if j < 0 || j >= bootstrap.Len() { - continue - } - - idx := j - i - offset + windowPoints - if idx < 0 || idx > len(window)-1 { - continue - } - window[idx] = bootstrap.ValueAt(j) + results := make([]*ts.Series, 0, original.Len()) + maxWindowPoints := 0 + for i, _ := range bootstrapList.Values { + series := original.Values[i] + windowPoints := windowPointsLength(series, interval) + if windowPoints <= 0 { + err := errors.NewInvalidParamsError(fmt.Errorf( + "non positive window points, windowSize=%s, stepSize=%d", + windowSize.stringValue, series.MillisPerStep())) + return ts.NewSeriesList(), err } - nans := common.SafeSort(window) - if nans < windowPoints { - index := (windowPoints - nans) / 2 - median := window[nans+index] - vals.SetValueAt(i, median) + if windowPoints > maxWindowPoints { + maxWindowPoints = windowPoints } } - name := fmt.Sprintf("movingMedian(%s,%q)", series.Name(), windowSize) - newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals) - results = append(results, newSeries) - } - original.Values = results - return original, nil - } + windowPoints := make([]float64, maxWindowPoints) + for i, bootstrap := range bootstrapList.Values { + series := original.Values[i] + currWindowPoints := windowPointsLength(series, interval) + window := windowPoints[:currWindowPoints] + util.Memset(window, math.NaN()) + numSteps := series.Len() + offset := bootstrap.Len() - numSteps + vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps) + for i := 0; i < numSteps; i++ { + for j := i + offset - currWindowPoints; j < i+offset; j++ { + if j < 0 || j >= bootstrap.Len() { + continue + } - return &binaryContextShifter{ - ContextShiftFunc: contextShiftingFn, - BinaryTransformer: transformerFn, + idx := j - i - offset + currWindowPoints + if idx < 0 || idx > len(window)-1 { + continue + } + + window[idx] = bootstrap.ValueAt(j) + } + impl(window, vals, currWindowPoints, i) + } + name := fmt.Sprintf("%s(%s,%s)", movingFunctionName, series.Name(), windowSize.stringValue) + newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals) + results = append(results, newSeries) + } + + original.Values = results + return original, nil + }, }, nil } +// movingMedian calculates the moving median of a metric (or metrics) over a time interval. +func movingMedian(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) { + return newMovingBinaryTransform(ctx, input, windowSize, "movingMedian", movingMedianHelper) +} + +// movingSum calculates the moving sum of a metric (or metrics) over a time interval. +func movingSum(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) { + return newMovingBinaryTransform(ctx, input, windowSize, "movingSum", movingSumHelper) +} + +// movingMax calculates the moving maximum of a metric (or metrics) over a time interval. +func movingMax(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) { + return newMovingBinaryTransform(ctx, input, windowSize, "movingMax", movingMaxHelper) +} + +// movingMin calculates the moving minimum of a metric (or metrics) over a time interval. +func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) { + return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", movingMinHelper) +} + + // legendValue takes one metric or a wildcard seriesList and a string in quotes. // Appends a value to the metric name in the legend. Currently one or several of: // "last", "avg", "total", "min", "max". @@ -1997,6 +2075,9 @@ func init() { MustRegisterFunction(mostDeviant) MustRegisterFunction(movingAverage) MustRegisterFunction(movingMedian) + MustRegisterFunction(movingSum) + MustRegisterFunction(movingMax) + MustRegisterFunction(movingMin) MustRegisterFunction(multiplySeries) MustRegisterFunction(nonNegativeDerivative).WithDefaultParams(map[uint8]interface{}{ 2: math.NaN(), // maxValue diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index 8becf9fe8e..0cfa8cbf69 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -722,6 +722,57 @@ func TestMovingAverageError(t *testing.T) { testMovingFunctionError(t, "movingAverage(foo.bar.baz, 0)") } +func TestMovingSumSuccess(t *testing.T) { + values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0} + bootstrap := []float64{3.0, 4.0, 5.0} + expected := []float64{12.0, 21.0, 36.0, 21.0, 9.0} // (3+4+5), (4+5+12), (5+12+19), (12+19-10), (19-10+Nan) + + testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,\"30s\")", values, bootstrap, expected) + testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,3)", nil, nil, nil) + + bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0} + testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected) +} + +func TestMovingSumError(t *testing.T) { + testMovingFunctionError(t, "movingSum(foo.bar.baz, '-30s')") + testMovingFunctionError(t, "movingSum(foo.bar.baz, 0)") +} + +func TestMovingMaxSuccess(t *testing.T) { + values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0} + bootstrap := []float64{3.0, 4.0, 5.0} + expected := []float64{5.0, 12.0, 19.0, 19.0, 19.0} // max(3,4,5), max(4,5,12), max(5,12,19), max(12,19,10), max(19,-10,NaN) + + testMovingFunction(t, "movingMax(foo.bar.baz, '30s')", "movingMax(foo.bar.baz,\"30s\")", values, bootstrap, expected) + testMovingFunction(t, "movingMax(foo.bar.baz, '30s')", "movingMax(foo.bar.baz,3)", nil, nil, nil) + + bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0} + testMovingFunction(t, "movingMax(foo.bar.baz, '30s')", "movingMax(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected) +} + +func TestMovingMaxError(t *testing.T) { + testMovingFunctionError(t, "movingMax(foo.bar.baz, '-30s')") + testMovingFunctionError(t, "movingMax(foo.bar.baz, 0)") +} + +func TestMovingMinSuccess(t *testing.T) { + values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0} + bootstrap := []float64{3.0, 4.0, 5.0} + expected := []float64{3.0, 4.0, 5.0, -10.0, -10.0} // min(3,4,5), min(4,5,12), min(5,12,19), min(12,19,-10), min(19,-10,NaN) + + testMovingFunction(t, "movingMin(foo.bar.baz, '30s')", "movingMin(foo.bar.baz,\"30s\")", values, bootstrap, expected) + testMovingFunction(t, "movingMin(foo.bar.baz, '30s')", "movingMin(foo.bar.baz,3)", nil, nil, nil) + + bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0} + testMovingFunction(t, "movingMin(foo.bar.baz, '30s')", "movingMin(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected) +} + +func TestMovingMinError(t *testing.T) { + testMovingFunctionError(t, "movingMin(foo.bar.baz, '-30s')") + testMovingFunctionError(t, "movingMin(foo.bar.baz, 0)") +} + func TestIsNonNull(t *testing.T) { ctx := common.NewTestContext() defer ctx.Close() @@ -2584,14 +2635,14 @@ func TestMovingMedianInvalidLimits(t *testing.T) { func TestMovingMismatchedLimits(t *testing.T) { // NB: this tests the behavior when query limits do not snap exactly to data // points. When limits do not snap exactly, the first point should be omitted. - for _, fn := range []string{"movingAverage", "movingMedian"} { + for _, fn := range []string{"movingAverage", "movingMedian", "movingSum", "movingMax", "movingMin"} { for i := time.Duration(0); i < time.Minute; i += time.Second { - testMovingAverageInvalidLimits(t, fn, i) + testMovingFunctionInvalidLimits(t, fn, i) } } } -func testMovingAverageInvalidLimits(t *testing.T, fn string, offset time.Duration) { +func testMovingFunctionInvalidLimits(t *testing.T, fn string, offset time.Duration) { ctrl := xgomock.NewController(t) defer ctrl.Finish() @@ -3061,6 +3112,9 @@ func TestFunctionsRegistered(t *testing.T) { "mostDeviant", "movingAverage", "movingMedian", + "movingSum", + "movingMax", + "movingMin", "multiplySeries", "nonNegativeDerivative", "nPercentile", diff --git a/src/query/graphite/native/engine_test.go b/src/query/graphite/native/engine_test.go index 78cf64034a..cc4f9d229a 100644 --- a/src/query/graphite/native/engine_test.go +++ b/src/query/graphite/native/engine_test.go @@ -207,3 +207,30 @@ func TestTracing(t *testing.T) { assert.Equal(t, expected.Outputs, trace.Outputs, "incorrect outputs for trace %d", i) } } + +func buildEmptyTestSeriesFn() func(context.Context, string, storage.FetchOptions) (*storage.FetchResult, error) { + return func(_ context.Context, q string, opts storage.FetchOptions) (*storage.FetchResult, error) { + series := make([]*ts.Series, 0, 0) + return &storage.FetchResult{SeriesList: series}, nil + } +} + +func TestNilBinaryContextShifter(t *testing.T) { + ctrl := xgomock.NewController(t) + defer ctrl.Finish() + + store := storage.NewMockStorage(ctrl) + + engine := NewEngine(store) + + ctx := common.NewContext(common.ContextOptions{Start: time.Now().Add(-1 * time.Hour), End: time.Now(), Engine: engine}) + + store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + buildEmptyTestSeriesFn()).AnyTimes() + + expr, err := engine.Compile("movingSum(foo.bar.q.zed, 30s)") + require.NoError(t, err) + + _, err = expr.Execute(ctx) + require.NoError(t, err) +}