Skip to content

Commit

Permalink
[query] Implemented movingSum, movingMax, movingMin (graphite functio…
Browse files Browse the repository at this point in the history
…ns) (#2570)
  • Loading branch information
teddywahle authored Sep 14, 2020
1 parent e66e016 commit 07c034d
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 56 deletions.
47 changes: 47 additions & 0 deletions src/query/graphite/common/percentiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,53 @@ func SafeSort(input []float64) int {
return nans
}

// SafeSum returns the sum of the input slice the number of NaNs in the input.
func SafeSum(input []float64) (float64, int) {
nans := 0
sum := 0.0
for _, v := range input {
if !math.IsNaN(v) {
sum += v
} else {
nans += 1
}
}
return sum, nans
}

// SafeMax returns the maximum value of the input slice the number of NaNs in the input.
func SafeMax(input []float64) (float64, int) {
nans := 0
max := -math.MaxFloat64
for _, v := range input {
if math.IsNaN(v) {
nans++
continue
}
if v > max {
max = v
}
}
return max, nans
}

// SafeMin returns the minimum value of the input slice the number of NaNs in the input.
func SafeMin(input []float64) (float64, int) {
nans := 0
min := math.MaxFloat64
for _, v := range input {
if math.IsNaN(v) {
nans++
continue
}
if v < min {
min = v
}
}
return min, nans
}


// GetPercentile computes the percentile cut off for an array of floats
func GetPercentile(input []float64, percentile float64, interpolate bool) float64 {
nans := SafeSort(input)
Expand Down
187 changes: 134 additions & 53 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1753,16 +1753,70 @@ func changed(ctx *common.Context, seriesList singlePathSpec) (ts.SeriesList, err
})
}

// movingMedian takes one metric or a wildcard seriesList followed by a a quoted string
// with a length of time like '1hour' or '5min'. Graphs the median of the preceding
// datapoints for each point on the graph. All previous datapoints are set to None at
// the beginning of the graph.
func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*binaryContextShifter, error) {
interval, err := common.ParseInterval(windowSize)
// windowPointsLength calculates the number of window points in a interval
func windowPointsLength(series *ts.Series, interval time.Duration) int {
return int(interval / (time.Duration(series.MillisPerStep()) * time.Millisecond))
}

type movingImplementationFn func(window []float64, values ts.MutableValues, windowPoints int, i int)

// movingMedianHelper given a slice of floats, calculates the median and assigns it into vals as index i
func movingMedianHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) {
nans := common.SafeSort(window)

if nans < windowPoints {
index := (windowPoints - nans) / 2
median := window[nans+index]
vals.SetValueAt(i, median)
}
}

// movingSumHelper given a slice of floats, calculates the sum and assigns it into vals as index i
func movingSumHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) {
sum, nans := common.SafeSum(window)

if nans < windowPoints {
vals.SetValueAt(i, sum)
}
}

// movingMaxHelper given a slice of floats, finds the max and assigns it into vals as index i
func movingMaxHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) {
max, nans := common.SafeMax(window)

if nans < windowPoints {
vals.SetValueAt(i, max)
}
}

// movingMinHelper given a slice of floats, finds the min and assigns it into vals as index i
func movingMinHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) {
min, nans := common.SafeMin(window)

if nans < windowPoints {
vals.SetValueAt(i, min)
}
}



func newMovingBinaryTransform(
ctx *common.Context,
input singlePathSpec,
windowSizeValue genericInterface,
movingFunctionName string,
impl movingImplementationFn,
) (*binaryContextShifter, error) {
if len(input.Values) == 0 {
return nil, nil
}

windowSize, err := parseWindowSize(windowSizeValue, input)
if err != nil {
return nil, err
}

interval := windowSize.deltaValue
if interval <= 0 {
return nil, common.ErrInvalidIntervalFormat
}
Expand All @@ -1775,64 +1829,88 @@ func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*bi
}

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
transformerFn := func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
}

results := make([]*ts.Series, 0, original.Len())
for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
windowPoints := int(interval / (time.Duration(series.MillisPerStep()) * time.Millisecond))
if windowPoints <= 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
"non positive window points, windowSize=%s, stepSize=%d",
windowSize, series.MillisPerStep()))
return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
}
window := make([]float64, windowPoints)
util.Memset(window, math.NaN())
numSteps := series.Len()
offset := bootstrap.Len() - numSteps
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
for i := 0; i < numSteps; i++ {
for j := i + offset - windowPoints; j < i+offset; j++ {
if j < 0 || j >= bootstrap.Len() {
continue
}

idx := j - i - offset + windowPoints
if idx < 0 || idx > len(window)-1 {
continue
}

window[idx] = bootstrap.ValueAt(j)
results := make([]*ts.Series, 0, original.Len())
maxWindowPoints := 0
for i, _ := range bootstrapList.Values {
series := original.Values[i]
windowPoints := windowPointsLength(series, interval)
if windowPoints <= 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
"non positive window points, windowSize=%s, stepSize=%d",
windowSize.stringValue, series.MillisPerStep()))
return ts.NewSeriesList(), err
}
nans := common.SafeSort(window)
if nans < windowPoints {
index := (windowPoints - nans) / 2
median := window[nans+index]
vals.SetValueAt(i, median)
if windowPoints > maxWindowPoints {
maxWindowPoints = windowPoints
}
}
name := fmt.Sprintf("movingMedian(%s,%q)", series.Name(), windowSize)
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
results = append(results, newSeries)
}

original.Values = results
return original, nil
}
windowPoints := make([]float64, maxWindowPoints)
for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
currWindowPoints := windowPointsLength(series, interval)
window := windowPoints[:currWindowPoints]
util.Memset(window, math.NaN())
numSteps := series.Len()
offset := bootstrap.Len() - numSteps
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
for i := 0; i < numSteps; i++ {
for j := i + offset - currWindowPoints; j < i+offset; j++ {
if j < 0 || j >= bootstrap.Len() {
continue
}

return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: transformerFn,
idx := j - i - offset + currWindowPoints
if idx < 0 || idx > len(window)-1 {
continue
}

window[idx] = bootstrap.ValueAt(j)
}
impl(window, vals, currWindowPoints, i)
}
name := fmt.Sprintf("%s(%s,%s)", movingFunctionName, series.Name(), windowSize.stringValue)
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
results = append(results, newSeries)
}

original.Values = results
return original, nil
},
}, nil
}

// movingMedian calculates the moving median of a metric (or metrics) over a time interval.
func movingMedian(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMedian", movingMedianHelper)
}

// movingSum calculates the moving sum of a metric (or metrics) over a time interval.
func movingSum(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingSum", movingSumHelper)
}

// movingMax calculates the moving maximum of a metric (or metrics) over a time interval.
func movingMax(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMax", movingMaxHelper)
}

// movingMin calculates the moving minimum of a metric (or metrics) over a time interval.
func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", movingMinHelper)
}


// legendValue takes one metric or a wildcard seriesList and a string in quotes.
// Appends a value to the metric name in the legend. Currently one or several of:
// "last", "avg", "total", "min", "max".
Expand Down Expand Up @@ -2084,6 +2162,9 @@ func init() {
MustRegisterFunction(mostDeviant)
MustRegisterFunction(movingAverage)
MustRegisterFunction(movingMedian)
MustRegisterFunction(movingSum)
MustRegisterFunction(movingMax)
MustRegisterFunction(movingMin)
MustRegisterFunction(multiplySeries)
MustRegisterFunction(nonNegativeDerivative).WithDefaultParams(map[uint8]interface{}{
2: math.NaN(), // maxValue
Expand Down
60 changes: 57 additions & 3 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,57 @@ func TestMovingAverageError(t *testing.T) {
testMovingFunctionError(t, "movingAverage(foo.bar.baz, 0)")
}

func TestMovingSumSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
expected := []float64{12.0, 21.0, 36.0, 21.0, 9.0} // (3+4+5), (4+5+12), (5+12+19), (12+19-10), (19-10+Nan)

testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,\"30s\")", values, bootstrap, expected)
testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,3)", nil, nil, nil)

bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0}
testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected)
}

func TestMovingSumError(t *testing.T) {
testMovingFunctionError(t, "movingSum(foo.bar.baz, '-30s')")
testMovingFunctionError(t, "movingSum(foo.bar.baz, 0)")
}

func TestMovingMaxSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
expected := []float64{5.0, 12.0, 19.0, 19.0, 19.0} // max(3,4,5), max(4,5,12), max(5,12,19), max(12,19,10), max(19,-10,NaN)

testMovingFunction(t, "movingMax(foo.bar.baz, '30s')", "movingMax(foo.bar.baz,\"30s\")", values, bootstrap, expected)
testMovingFunction(t, "movingMax(foo.bar.baz, '30s')", "movingMax(foo.bar.baz,3)", nil, nil, nil)

bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0}
testMovingFunction(t, "movingMax(foo.bar.baz, '30s')", "movingMax(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected)
}

func TestMovingMaxError(t *testing.T) {
testMovingFunctionError(t, "movingMax(foo.bar.baz, '-30s')")
testMovingFunctionError(t, "movingMax(foo.bar.baz, 0)")
}

func TestMovingMinSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
expected := []float64{3.0, 4.0, 5.0, -10.0, -10.0} // min(3,4,5), min(4,5,12), min(5,12,19), min(12,19,-10), min(19,-10,NaN)

testMovingFunction(t, "movingMin(foo.bar.baz, '30s')", "movingMin(foo.bar.baz,\"30s\")", values, bootstrap, expected)
testMovingFunction(t, "movingMin(foo.bar.baz, '30s')", "movingMin(foo.bar.baz,3)", nil, nil, nil)

bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0}
testMovingFunction(t, "movingMin(foo.bar.baz, '30s')", "movingMin(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected)
}

func TestMovingMinError(t *testing.T) {
testMovingFunctionError(t, "movingMin(foo.bar.baz, '-30s')")
testMovingFunctionError(t, "movingMin(foo.bar.baz, 0)")
}

func TestIsNonNull(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -2620,14 +2671,14 @@ func TestMovingMedianInvalidLimits(t *testing.T) {
func TestMovingMismatchedLimits(t *testing.T) {
// NB: this tests the behavior when query limits do not snap exactly to data
// points. When limits do not snap exactly, the first point should be omitted.
for _, fn := range []string{"movingAverage", "movingMedian"} {
for _, fn := range []string{"movingAverage", "movingMedian", "movingSum", "movingMax", "movingMin"} {
for i := time.Duration(0); i < time.Minute; i += time.Second {
testMovingAverageInvalidLimits(t, fn, i)
testMovingFunctionInvalidLimits(t, fn, i)
}
}
}

func testMovingAverageInvalidLimits(t *testing.T, fn string, offset time.Duration) {
func testMovingFunctionInvalidLimits(t *testing.T, fn string, offset time.Duration) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

Expand Down Expand Up @@ -3099,6 +3150,9 @@ func TestFunctionsRegistered(t *testing.T) {
"mostDeviant",
"movingAverage",
"movingMedian",
"movingSum",
"movingMax",
"movingMin",
"multiplySeries",
"nonNegativeDerivative",
"nPercentile",
Expand Down
27 changes: 27 additions & 0 deletions src/query/graphite/native/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,30 @@ func TestTracing(t *testing.T) {
assert.Equal(t, expected.Outputs, trace.Outputs, "incorrect outputs for trace %d", i)
}
}

func buildEmptyTestSeriesFn() func(context.Context, string, storage.FetchOptions) (*storage.FetchResult, error) {
return func(_ context.Context, q string, opts storage.FetchOptions) (*storage.FetchResult, error) {
series := make([]*ts.Series, 0, 0)
return &storage.FetchResult{SeriesList: series}, nil
}
}

func TestNilBinaryContextShifter(t *testing.T) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

store := storage.NewMockStorage(ctrl)

engine := NewEngine(store)

ctx := common.NewContext(common.ContextOptions{Start: time.Now().Add(-1 * time.Hour), End: time.Now(), Engine: engine})

store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
buildEmptyTestSeriesFn()).AnyTimes()

expr, err := engine.Compile("movingSum(foo.bar.q.zed, 30s)")
require.NoError(t, err)

_, err = expr.Execute(ctx)
require.NoError(t, err)
}

0 comments on commit 07c034d

Please sign in to comment.