From f2ebf5c3f96e70b31a538b5dac27676e4f33dd06 Mon Sep 17 00:00:00 2001 From: teddywahle <69990143+teddywahle@users.noreply.github.com> Date: Mon, 21 Sep 2020 12:33:37 -0700 Subject: [PATCH] [query] Implemented the Graphite `integralByInterval` function (#2596) --- .../graphite/native/builtin_functions.go | 56 ++++++++++++++--- .../graphite/native/builtin_functions_test.go | 63 ++++++++++++++----- 2 files changed, 96 insertions(+), 23 deletions(-) diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index 19c43d0458..367c0f76ed 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -271,7 +271,7 @@ func delay( func delayValuesHelper(ctx *common.Context, series *ts.Series, steps int) ts.Values { output := ts.NewValues(ctx, series.MillisPerStep(), series.Len()) for i := steps; i < series.Len(); i++ { - output.SetValueAt(i, series.ValueAt(i - steps)) + output.SetValueAt(i, series.ValueAt(i-steps)) } return output } @@ -281,7 +281,7 @@ func delayValuesHelper(ctx *common.Context, series *ts.Series, steps int) ts.Val // Useful for filtering out a part of a series of data from a wider range of data. func timeSlice(ctx *common.Context, inputPath singlePathSpec, start string, end string) (ts.SeriesList, error) { var ( - now = time.Now() + now = time.Now() tzOffsetForAbsoluteTime time.Duration ) startTime, err := graphite.ParseTime(start, now, tzOffsetForAbsoluteTime) @@ -633,8 +633,8 @@ func lowestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesLis type windowSizeFunc func(stepSize int) int type windowSizeParsed struct { - deltaValue time.Duration - stringValue string + deltaValue time.Duration + stringValue string windowSizeFunc windowSizeFunc } @@ -841,7 +841,7 @@ func exponentialMovingAverage(ctx *common.Context, input singlePathSpec, windowS curr := bootstrap.ValueAt(i + offset) if !math.IsNaN(curr) { // formula: ema(current) = constant * (Current Value) + (1 - constant) * ema(previous) - ema = emaConstant * curr + (1 - emaConstant) * ema + ema = emaConstant*curr + (1-emaConstant)*ema vals.SetValueAt(i, ema) } else { vals.SetValueAt(i, math.NaN()) @@ -1086,6 +1086,46 @@ func integral(ctx *common.Context, input singlePathSpec) (ts.SeriesList, error) return r, nil } +// integralByInterval will do the same as integral funcion, except it resets the total to 0 +// at the given time in the parameter “from”. Useful for finding totals per hour/day/week. +func integralByInterval(ctx *common.Context, input singlePathSpec, intervalString string) (ts.SeriesList, error) { + intervalUnit, err := common.ParseInterval(intervalString) + if err != nil { + return ts.NewSeriesList(), err + } + results := make([]*ts.Series, 0, len(input.Values)) + + for _, series := range input.Values { + var ( + stepsPerInterval = intervalUnit.Milliseconds() / int64(series.MillisPerStep()) + outVals = ts.NewValues(ctx, series.MillisPerStep(), series.Len()) + stepCounter int64 + currentSum float64 + ) + + for i := 0; i < series.Len(); i++ { + if stepCounter == stepsPerInterval { + // startNewInterval + stepCounter = 0 + currentSum = 0.0 + } + n := series.ValueAt(i) + if !math.IsNaN(n) { + currentSum += n + } + outVals.SetValueAt(i, currentSum) + stepCounter += 1 + } + + newName := fmt.Sprintf("integralByInterval(%s, %s)", series.Name(), intervalString) + results = append(results, ts.NewSeries(ctx, newName, series.StartTime(), outVals)) + } + + r := ts.SeriesList(input) + r.Values = results + return r, nil +} + // This is the opposite of the integral function. This is useful for taking a // running total metric and calculating the delta between subsequent data // points. @@ -1798,8 +1838,6 @@ func movingMinHelper(window []float64, vals ts.MutableValues, windowPoints int, } } - - func newMovingBinaryTransform( ctx *common.Context, input singlePathSpec, @@ -1830,7 +1868,7 @@ func newMovingBinaryTransform( bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime return &binaryContextShifter{ - ContextShiftFunc: contextShiftingFn, + ContextShiftFunc: contextShiftingFn, BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) { bootstrapList, err := combineBootstrapWithOriginal(ctx, bootstrapStartTime, bootstrapEndTime, @@ -1910,7 +1948,6 @@ func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInte return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", movingMinHelper) } - // legendValue takes one metric or a wildcard seriesList and a string in quotes. // Appends a value to the metric name in the legend. Currently one or several of: // "last", "avg", "total", "min", "max". @@ -2145,6 +2182,7 @@ func init() { MustRegisterFunction(holtWintersForecast) MustRegisterFunction(identity) MustRegisterFunction(integral) + MustRegisterFunction(integralByInterval) MustRegisterFunction(isNonNull) MustRegisterFunction(keepLastValue).WithDefaultParams(map[uint8]interface{}{ 2: -1, // limit diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index cd7a4017ca..fc3d343572 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -643,8 +643,8 @@ func testMovingFunction(t *testing.T, target, expectedName string, values, boots } var ( - testGeneralFunctionStart = time.Now().Add(time.Minute * -11).Truncate(time.Minute) - testGeneralFunctionEnd = time.Now().Add(time.Minute * -3).Truncate(time.Minute) + testGeneralFunctionStart = time.Now().Add(time.Minute * -11).Truncate(time.Minute) + testGeneralFunctionEnd = time.Now().Add(time.Minute * -3).Truncate(time.Minute) ) // testGeneralFunction is a copy of testMovingFunction but without any logic for bootstrapping values @@ -654,8 +654,8 @@ func testGeneralFunction(t *testing.T, target, expectedName string, values, outp engine := NewEngine( &common.MovingFunctionStorage{ - StepMillis: 60000, - Values: values, + StepMillis: 60000, + Values: values, }, ) phonyContext := common.NewContext(common.ContextOptions{ @@ -694,11 +694,11 @@ func TestMovingAverageSuccess(t *testing.T) { func TestExponentialMovingAverageSuccess(t *testing.T) { tests := []struct { - target string + target string expectedName string - bootstrap []float64 - inputs []float64 - expected []float64 + bootstrap []float64 + inputs []float64 + expected []float64 }{ { "exponentialMovingAverage(foo.bar.baz, 3)", @@ -1823,6 +1823,40 @@ func TestIntegral(t *testing.T) { } } +/* + seriesList = self._gen_series_list_with_data(key='test',start=0,end=600,step=60,data=[None, 1, 2, 3, 4, 5, None, 6, 7, 8]) + expected = [TimeSeries("integralByInterval(test,'2min')", 0, 600, 60, [0, 1, 2, 5, 4, 9, 0, 6, 7, 15])] +*/ +func TestIntegralByInterval(t *testing.T) { + ctx := common.NewTestContext() + defer ctx.Close() + + invals := []float64{ + math.NaN(), 1, 2, 3, 4, 5, math.NaN(), 6, 7, 8, + } + + outvals := []float64{ + 0, 1, 2, 5, 4, 9, 0, 6, 7, 15, + } + + series := ts.NewSeries(ctx, "hello", time.Now(), + common.NewTestSeriesValues(ctx, 60000, invals)) + + r, err := integralByInterval(ctx, singlePathSpec{ + Values: []*ts.Series{series}, + }, "2min") + require.NoError(t, err) + + output := r.Values + require.Equal(t, 1, len(output)) + assert.Equal(t, "integralByInterval(hello, 2min)", output[0].Name()) + assert.Equal(t, series.StartTime(), output[0].StartTime()) + require.Equal(t, len(outvals), output[0].Len()) + for i, expected := range outvals { + xtest.Equalish(t, expected, output[0].ValueAt(i), "incorrect value at %d", i) + } +} + func TestDerivative(t *testing.T) { ctx := common.NewTestContext() defer ctx.Close() @@ -2983,8 +3017,8 @@ func TestDelay(t *testing.T) { } var ( - testDelayStart = time.Now().Truncate(time.Minute) - testDelayEnd = testMovingFunctionEnd.Add(time.Minute) + testDelayStart = time.Now().Truncate(time.Minute) + testDelayEnd = testMovingFunctionEnd.Add(time.Minute) ) func testDelay(t *testing.T, target, expectedName string, values, output []float64) { @@ -2993,8 +3027,8 @@ func testDelay(t *testing.T, target, expectedName string, values, output []float engine := NewEngine( &common.MovingFunctionStorage{ - StepMillis: 10000, - Values: values, + StepMillis: 10000, + Values: values, }, ) phonyContext := common.NewContext(common.ContextOptions{ @@ -3020,8 +3054,8 @@ func testDelay(t *testing.T, target, expectedName string, values, output []float } func TestTimeSlice(t *testing.T) { - values := []float64{math.NaN(),1.0,2.0,3.0,math.NaN(),5.0,6.0,math.NaN(),7.0,8.0,9.0} - expected := []float64{math.NaN(),math.NaN(),math.NaN(),3.0,math.NaN(),5.0,6.0,math.NaN(),7.0,math.NaN(),math.NaN()} + values := []float64{math.NaN(), 1.0, 2.0, 3.0, math.NaN(), 5.0, 6.0, math.NaN(), 7.0, 8.0, 9.0} + expected := []float64{math.NaN(), math.NaN(), math.NaN(), 3.0, math.NaN(), 5.0, 6.0, math.NaN(), 7.0, math.NaN(), math.NaN()} testGeneralFunction(t, "timeSlice(foo.bar.baz, '-9min','-3min')", "timeSlice(foo.bar.baz, -9min, -3min)", values, expected) } @@ -3134,6 +3168,7 @@ func TestFunctionsRegistered(t *testing.T) { "holtWintersForecast", "identity", "integral", + "integralByInterval", "isNonNull", "keepLastValue", "legendValue",