Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Implemented movingSum, movingMax, movingMin (graphite functions) #2570

Merged
merged 28 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4e901d7
Added the moving movingMin function
teddywahle Aug 27, 2020
f42685a
Merge branch 'master' into twahle-moving-min
teddywahle Aug 27, 2020
f0df4ce
worked on moving min
teddywahle Aug 27, 2020
bef6377
Merge branch 'twahle-moving-min' of https://github.com/teddywahle/m3 …
teddywahle Aug 27, 2020
8ce6dc3
more work on the moving min func
teddywahle Aug 27, 2020
30ef946
Merge branch 'master' into twahle-moving-min
teddywahle Aug 28, 2020
cf9288f
updated movingMedian
teddywahle Aug 28, 2020
50b0ae9
Merge branch 'twahle-moving-min' of https://github.com/teddywahle/m3 …
teddywahle Aug 28, 2020
1ebe629
Apply suggestions from code review
teddywahle Aug 28, 2020
ebaafce
testMovingFunction
teddywahle Aug 28, 2020
2a0643c
wrote test movingSum function
teddywahle Aug 28, 2020
53222b9
Merge branch 'master' into twahle-moving-min
teddywahle Aug 31, 2020
8c2b6bf
updated tests
teddywahle Aug 31, 2020
01792e7
Fixed up tests
teddywahle Aug 31, 2020
27f5d53
Merge branch 'master' into graphite-moving-sum
teddywahle Aug 31, 2020
b9e6a92
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 1, 2020
91b2778
refactored movingMedian to reuse common code
theodorewahle Sep 1, 2020
c024c85
refactored movingMedian to allow creation of movingMax and movingMin
teddywahle Sep 1, 2020
a2698c2
added function comments
teddywahle Sep 1, 2020
7c1129d
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 2, 2020
9c070b8
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 3, 2020
a0807a4
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 4, 2020
9877de0
Added pointer changes
teddywahle Sep 4, 2020
86e9b6a
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 7, 2020
3dbc68e
Update src/query/graphite/common/percentiles.go
teddywahle Sep 7, 2020
2124c31
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 8, 2020
3a201d6
Added testcase to engine_test
teddywahle Sep 8, 2020
1847586
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions src/query/graphite/common/percentiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,53 @@ func SafeSort(input []float64) int {
return nans
}

// SafeSum returns the sum of the input slice the number of NaNs in the input.
func SafeSum(input []float64) (float64, int) {
nans := 0
sum := 0.0
for _, v := range input {
if !math.IsNaN(v) {
sum += v
} else {
nans += 1
}
}
return sum, nans
}

// SafeMax returns the maximum value of the input slice the number of NaNs in the input.
func SafeMax(input []float64) (float64, int) {
nans := 0
max := -math.MaxFloat64
for _, v := range input {
if !math.IsNaN(v) {
if v > max {
max = v
}
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
} else {
nans += 1
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Go prefers to limit indentation wherever possible (since it's relative to complexity of the statement). Can simplify loop by following's advice of Effective Go "In the Go libraries, you'll find that when an if statement doesn't flow into the next statement—that is, the body ends in break, continue, goto, or return—the unnecessary else is omitted.":
https://golang.org/doc/effective_go.html#control-structures

e.g.

for _, v := range input {
  if math.IsNaN(v) {
    nans++
    continue
  }
  if v > max {
    max = v
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

return max, nans
}

// SafeMin returns the minimum value of the input slice the number of NaNs in the input.
func SafeMin(input []float64) (float64, int) {
nans := 0
min := math.MaxFloat64
for _, v := range input {
if !math.IsNaN(v) {
if v < min {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Go prefers to limit indentation wherever possible (since it's relative to complexity of the statement). Can simplify loop by following's advice of Effective Go "In the Go libraries, you'll find that when an if statement doesn't flow into the next statement—that is, the body ends in break, continue, goto, or return—the unnecessary else is omitted.":
https://golang.org/doc/effective_go.html#control-structures

e.g.

for _, v := range input {
  if math.IsNaN(v) {
    nans++
    continue
  }
  if v < min {
    min = v
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

min = v
}
} else {
nans += 1
}
}
return min, nans
}


// GetPercentile computes the percentile cut off for an array of floats
func GetPercentile(input []float64, percentile float64, interpolate bool) float64 {
nans := SafeSort(input)
Expand Down
187 changes: 134 additions & 53 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1591,16 +1591,70 @@ func changed(ctx *common.Context, seriesList singlePathSpec) (ts.SeriesList, err
})
}

// movingMedian takes one metric or a wildcard seriesList followed by a a quoted string
// with a length of time like '1hour' or '5min'. Graphs the median of the preceding
// datapoints for each point on the graph. All previous datapoints are set to None at
// the beginning of the graph.
func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*binaryContextShifter, error) {
interval, err := common.ParseInterval(windowSize)
// windowPointsLength calculates the number of window points in a interval
func windowPointsLength(series *ts.Series, interval time.Duration) int {
return int(interval / (time.Duration(series.MillisPerStep()) * time.Millisecond))
}

type movingImplementationFn func(window []float64, values ts.MutableValues, windowPoints int, i int)

// movingMedianHelper given a slice of floats, calculates the median and assigns it into vals as index i
func movingMedianHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) {
nans := common.SafeSort(window)

if nans < windowPoints {
index := (windowPoints - nans) / 2
median := window[nans+index]
vals.SetValueAt(i, median)
}
}

// movingSumHelper given a slice of floats, calculates the sum and assigns it into vals as index i
func movingSumHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) {
sum, nans := common.SafeSum(window)

if nans < windowPoints {
vals.SetValueAt(i, sum)
}
}

// movingMaxHelper given a slice of floats, finds the max and assigns it into vals as index i
func movingMaxHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) {
max, nans := common.SafeMax(window)

if nans < windowPoints {
vals.SetValueAt(i, max)
}
}

// movingMinHelper given a slice of floats, finds the min and assigns it into vals as index i
func movingMinHelper(window []float64, vals ts.MutableValues, windowPoints int, i int) {
min, nans := common.SafeMin(window)

if nans < windowPoints {
vals.SetValueAt(i, min)
}
}



func newMovingBinaryTransform(
ctx *common.Context,
input singlePathSpec,
windowSizeValue genericInterface,
movingFunctionName string,
impl movingImplementationFn,
) (*binaryContextShifter, error) {
if len(input.Values) == 0 {
return nil, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some explicit tests to make sure this case is handled ok? Would add a test case to engine_test.go and inside TestExecute to make sure a binary transform correctly is handled if a nil binary context shifter is returned (i.e. send input values of zero).

I just spent 10-20mins finding if it was or not and not 100% confident about it. I see this which implies it should be:

	// context shifter ptr is nil, nothing to do here, return empty series.
	if result.IsNil() {
		return reflect.ValueOf(ts.NewSeriesList()), nil
	}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

So you said you want me to add "a test case to engine_test.go and inside TestExecute"

i added the test case to engine_test.go

but i dont quite understand your wording on the second part. were you saying that you also wanted me to modify TestExecute to test whether or not a binary transform is correctly handled if a nil binary context shifter is returned? why would I need to test for that in two places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved offline. Rob said this test was enough.

}

windowSize, err := parseWindowSize(windowSizeValue, input)
if err != nil {
return nil, err
}

interval := windowSize.deltaValue
if interval <= 0 {
return nil, common.ErrInvalidIntervalFormat
}
Expand All @@ -1613,64 +1667,88 @@ func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*bi
}

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
transformerFn := func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
}

results := make([]*ts.Series, 0, original.Len())
for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
windowPoints := int(interval / (time.Duration(series.MillisPerStep()) * time.Millisecond))
if windowPoints <= 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
"non positive window points, windowSize=%s, stepSize=%d",
windowSize, series.MillisPerStep()))
return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
}
window := make([]float64, windowPoints)
util.Memset(window, math.NaN())
numSteps := series.Len()
offset := bootstrap.Len() - numSteps
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
for i := 0; i < numSteps; i++ {
for j := i + offset - windowPoints; j < i+offset; j++ {
if j < 0 || j >= bootstrap.Len() {
continue
}

idx := j - i - offset + windowPoints
if idx < 0 || idx > len(window)-1 {
continue
}

window[idx] = bootstrap.ValueAt(j)
results := make([]*ts.Series, 0, original.Len())
maxWindowPoints := 0
for i, _ := range bootstrapList.Values {
series := original.Values[i]
windowPoints := windowPointsLength(series, interval)
if windowPoints <= 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
"non positive window points, windowSize=%s, stepSize=%d",
windowSize.stringValue, series.MillisPerStep()))
return ts.NewSeriesList(), err
}
nans := common.SafeSort(window)
if nans < windowPoints {
index := (windowPoints - nans) / 2
median := window[nans+index]
vals.SetValueAt(i, median)
if windowPoints > maxWindowPoints {
maxWindowPoints = windowPoints
}
}
name := fmt.Sprintf("movingMedian(%s,%q)", series.Name(), windowSize)
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
results = append(results, newSeries)
}

original.Values = results
return original, nil
}
windowPoints := make([]float64, maxWindowPoints)
for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
currWindowPoints := windowPointsLength(series, interval)
window := windowPoints[:currWindowPoints]
util.Memset(window, math.NaN())
numSteps := series.Len()
offset := bootstrap.Len() - numSteps
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
for i := 0; i < numSteps; i++ {
for j := i + offset - currWindowPoints; j < i+offset; j++ {
if j < 0 || j >= bootstrap.Len() {
continue
}

return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: transformerFn,
idx := j - i - offset + currWindowPoints
if idx < 0 || idx > len(window)-1 {
continue
}

window[idx] = bootstrap.ValueAt(j)
}
impl(window, vals, currWindowPoints, i)
}
name := fmt.Sprintf("%s(%s,%s)", movingFunctionName, series.Name(), windowSize.stringValue)
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
results = append(results, newSeries)
}

original.Values = results
return original, nil
},
}, nil
}

// movingMedian calculates the moving median of a metric (or metrics) over a time interval.
func movingMedian(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMedian", movingMedianHelper)
}

// movingSum calculates the moving sum of a metric (or metrics) over a time interval.
func movingSum(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingSum", movingSumHelper)
}

// movingMax calculates the moving maximum of a metric (or metrics) over a time interval.
func movingMax(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMax", movingMaxHelper)
}

// movingMin calculates the moving minimum of a metric (or metrics) over a time interval.
func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInterface) (*binaryContextShifter, error) {
return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", movingMinHelper)
}


// legendValue takes one metric or a wildcard seriesList and a string in quotes.
// Appends a value to the metric name in the legend. Currently one or several of:
// "last", "avg", "total", "min", "max".
Expand Down Expand Up @@ -1920,6 +1998,9 @@ func init() {
MustRegisterFunction(mostDeviant)
MustRegisterFunction(movingAverage)
MustRegisterFunction(movingMedian)
MustRegisterFunction(movingSum)
MustRegisterFunction(movingMax)
MustRegisterFunction(movingMin)
MustRegisterFunction(multiplySeries)
MustRegisterFunction(nonNegativeDerivative).WithDefaultParams(map[uint8]interface{}{
2: math.NaN(), // maxValue
Expand Down
60 changes: 57 additions & 3 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,57 @@ func TestMovingAverageError(t *testing.T) {
testMovingFunctionError(t, "movingAverage(foo.bar.baz, 0)")
}

func TestMovingSumSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
expected := []float64{12.0, 21.0, 36.0, 21.0, 9.0} // (3+4+5), (4+5+12), (5+12+19), (12+19-10), (19-10+Nan)

testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,\"30s\")", values, bootstrap, expected)
testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,3)", nil, nil, nil)

bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0}
testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected)
}

func TestMovingSumError(t *testing.T) {
testMovingFunctionError(t, "movingSum(foo.bar.baz, '-30s')")
testMovingFunctionError(t, "movingSum(foo.bar.baz, 0)")
}

func TestMovingMaxSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
expected := []float64{5.0, 12.0, 19.0, 19.0, 19.0} // max(3,4,5), max(4,5,12), max(5,12,19), max(12,19,10), max(19,-10,NaN)

testMovingFunction(t, "movingMax(foo.bar.baz, '30s')", "movingMax(foo.bar.baz,\"30s\")", values, bootstrap, expected)
testMovingFunction(t, "movingMax(foo.bar.baz, '30s')", "movingMax(foo.bar.baz,3)", nil, nil, nil)

bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0}
testMovingFunction(t, "movingMax(foo.bar.baz, '30s')", "movingMax(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected)
}

func TestMovingMaxError(t *testing.T) {
testMovingFunctionError(t, "movingMax(foo.bar.baz, '-30s')")
testMovingFunctionError(t, "movingMax(foo.bar.baz, 0)")
}

func TestMovingMinSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
expected := []float64{3.0, 4.0, 5.0, -10.0, -10.0} // min(3,4,5), min(4,5,12), min(5,12,19), min(12,19,-10), min(19,-10,NaN)

testMovingFunction(t, "movingMin(foo.bar.baz, '30s')", "movingMin(foo.bar.baz,\"30s\")", values, bootstrap, expected)
testMovingFunction(t, "movingMin(foo.bar.baz, '30s')", "movingMin(foo.bar.baz,3)", nil, nil, nil)

bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0}
testMovingFunction(t, "movingMin(foo.bar.baz, '30s')", "movingMin(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected)
}

func TestMovingMinError(t *testing.T) {
testMovingFunctionError(t, "movingMin(foo.bar.baz, '-30s')")
testMovingFunctionError(t, "movingMin(foo.bar.baz, 0)")
}

func TestIsNonNull(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -2547,14 +2598,14 @@ func TestMovingMedianInvalidLimits(t *testing.T) {
func TestMovingMismatchedLimits(t *testing.T) {
// NB: this tests the behavior when query limits do not snap exactly to data
// points. When limits do not snap exactly, the first point should be omitted.
for _, fn := range []string{"movingAverage", "movingMedian"} {
for _, fn := range []string{"movingAverage", "movingMedian", "movingSum", "movingMax", "movingMin"} {
for i := time.Duration(0); i < time.Minute; i += time.Second {
testMovingAverageInvalidLimits(t, fn, i)
testMovingFunctionInvalidLimits(t, fn, i)
}
}
}

func testMovingAverageInvalidLimits(t *testing.T, fn string, offset time.Duration) {
func testMovingFunctionInvalidLimits(t *testing.T, fn string, offset time.Duration) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

Expand Down Expand Up @@ -2961,6 +3012,9 @@ func TestFunctionsRegistered(t *testing.T) {
"mostDeviant",
"movingAverage",
"movingMedian",
"movingSum",
"movingMax",
"movingMin",
"multiplySeries",
"nonNegativeDerivative",
"nPercentile",
Expand Down