Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Implemented movingSum, movingMax, movingMin (graphite functions) #2570

Merged
merged 28 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4e901d7
Added the moving movingMin function
teddywahle Aug 27, 2020
f42685a
Merge branch 'master' into twahle-moving-min
teddywahle Aug 27, 2020
f0df4ce
worked on moving min
teddywahle Aug 27, 2020
bef6377
Merge branch 'twahle-moving-min' of https://github.com/teddywahle/m3 …
teddywahle Aug 27, 2020
8ce6dc3
more work on the moving min func
teddywahle Aug 27, 2020
30ef946
Merge branch 'master' into twahle-moving-min
teddywahle Aug 28, 2020
cf9288f
updated movingMedian
teddywahle Aug 28, 2020
50b0ae9
Merge branch 'twahle-moving-min' of https://github.com/teddywahle/m3 …
teddywahle Aug 28, 2020
1ebe629
Apply suggestions from code review
teddywahle Aug 28, 2020
ebaafce
testMovingFunction
teddywahle Aug 28, 2020
2a0643c
wrote test movingSum function
teddywahle Aug 28, 2020
53222b9
Merge branch 'master' into twahle-moving-min
teddywahle Aug 31, 2020
8c2b6bf
updated tests
teddywahle Aug 31, 2020
01792e7
Fixed up tests
teddywahle Aug 31, 2020
27f5d53
Merge branch 'master' into graphite-moving-sum
teddywahle Aug 31, 2020
b9e6a92
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 1, 2020
91b2778
refactored movingMedian to reuse common code
theodorewahle Sep 1, 2020
c024c85
refactored movingMedian to allow creation of movingMax and movingMin
teddywahle Sep 1, 2020
a2698c2
added function comments
teddywahle Sep 1, 2020
7c1129d
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 2, 2020
9c070b8
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 3, 2020
a0807a4
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 4, 2020
9877de0
Added pointer changes
teddywahle Sep 4, 2020
86e9b6a
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 7, 2020
3dbc68e
Update src/query/graphite/common/percentiles.go
teddywahle Sep 7, 2020
2124c31
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 8, 2020
3a201d6
Added testcase to engine_test
teddywahle Sep 8, 2020
1847586
Merge branch 'master' into graphite-moving-sum
teddywahle Sep 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/query/graphite/common/percentiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ func SafeSort(input []float64) int {
return nans
}

func SafeSum(input []float64) float64 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Need comments for exported methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

sum := 0.0
for _, v := range input {
if !math.IsNaN(v) {
sum += v
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Go prefers to limit indentation wherever possible (since it's relative to complexity of the statement). Can simplify loop by following's advice of Effective Go "In the Go libraries, you'll find that when an if statement doesn't flow into the next statement—that is, the body ends in break, continue, goto, or return—the unnecessary else is omitted.":
https://golang.org/doc/effective_go.html#control-structures

e.g.

for _, v := range input {
  if math.IsNaN(v) {
    nans++
    continue
  }
  if v > max {
    max = v
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

return sum
}

// GetPercentile computes the percentile cut off for an array of floats
func GetPercentile(input []float64, percentile float64, interpolate bool) float64 {
nans := SafeSort(input)
Expand Down
78 changes: 78 additions & 0 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,83 @@ func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*bi
}, nil
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Remove empty line here, can check err immediately.

// movingSum takes one metric or a wildcard seriesList followed by a a quoted string
// with a length of time like '1hour' or '5min'. Graphs the sum of the preceding
// datapoints for each point on the graph. All previous datapoints are set to None at
// the beginning of the graph.
func movingSum(ctx *common.Context, _ singlePathSpec, windowSize string) (*binaryContextShifter, error) {
interval, err := common.ParseInterval(windowSize)
if err != nil {
return nil, err
}

if interval <= 0 {
return nil, common.ErrInvalidIntervalFormat
}

contextShiftingFn := func(c *common.Context) *common.Context {
opts := common.NewChildContextOptions()
opts.AdjustTimeRange(0, 0, interval, 0)
childCtx := c.NewChildContext(opts)
return childCtx
}

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
transformerFn := func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
bootstrapped, singlePathSpec(original))
if err != nil {
return ts.NewSeriesList(), err
}

results := make([]*ts.Series, 0, original.Len())

for i, bootstrap := range bootstrapList.Values {
series := original.Values[i]
windowPoints := int(interval / (time.Duration(series.MillisPerStep()) * time.Millisecond))
if windowPoints <= 0 {
err := errors.NewInvalidParamsError(fmt.Errorf(
"non positive window points, windowSize=%s, stepSize=%d",
windowSize, series.MillisPerStep()))
return ts.NewSeriesList(), err
}
window := make([]float64, windowPoints)
Copy link
Collaborator

@robskillington robskillington Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would probably loop through all series before this loop and find the "max window points" required by a series, then reuse that in the inner loop so we don't need to allocate this intermediate large slice per series.

Perhaps we can update movingAverage and movingMedian to do that too?

e.g.

	results := make([]*ts.Series, 0, original.Len())
	maxWindowPoints := 0
	for i, bootstrap := range bootstrapList.Values {
		series := original.Values[i]
		windowPoints := windowPointsLength(series, interval)
		if windowPoints > maxWindowPoints {
			maxWindowPoints = windowPoints
		}
	}
	
	windowPoints := make([]float64, maxWindowPoints)
	for i, bootstrap := range bootstrapList.Values {
		series := original.Values[i]
		currWindowPoints := windowPointsLength(series, interval)
		window := windowPoints[:currWindowPoints]
		util.Memset(window, math.NaN())
		// .. existing code
	}

And a common func now:

// ported windowPointsLength to a dedicated function
func windowPointsLength(series *ts.Series, interval time.Duration) int {
	return int(interval / (time.Duration(series.MillisPerStep()) * time.Millisecond))
}

util.Memset(window, math.NaN())
numSteps := series.Len()
offset := bootstrap.Len() - numSteps
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
for i := 0; i < numSteps; i++ {
for j := i + offset - windowPoints; j < i+offset; j++ {
if j < 0 || j >= bootstrap.Len() {
continue
}

idx := j - i - offset + windowPoints
if idx < 0 || idx > len(window)-1 {
continue
}

window[idx] = bootstrap.ValueAt(j)
}
vals.SetValueAt(i, common.SafeSum(window))
}
name := fmt.Sprintf("movingSum(%s,%q)", series.Name(), windowSize)
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
results = append(results, newSeries)
}

original.Values = results
return original, nil
}

return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: transformerFn,
Copy link
Collaborator

@robskillington robskillington Aug 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I've read the common code, I think the only things that differ is stuff like this:

// movingMedian
				nans := common.SafeSort(window)
				if nans < windowPoints {
					index := (windowPoints - nans) / 2
					median := window[nans+index]
					vals.SetValueAt(i, median)
				}
// movingSum
				vals.SetValueAt(i, common.SafeSum(window))

And the naming of the series:

// movingMedian
name := fmt.Sprintf("movingMedian(%s,%q)", series.Name(), windowSize)
// movingSum
name := fmt.Sprintf("movingSum(%s,%q)", series.Name(), windowSize)

Can we extract all the common into a new method? i.e.

type movingImplFn func(window float64, values ts.MutableValues)

func newMovingBinaryTransform(movingFunctionName string, impl movingImplFn) *binaryContextShifter {
	interval, err := common.ParseInterval(windowSize)
	if err != nil {
		return nil, err
	}

	if interval <= 0 {
		return nil, common.ErrInvalidIntervalFormat
	}

	contextShiftingFn := func(c *common.Context) *common.Context {
		opts := common.NewChildContextOptions()
		opts.AdjustTimeRange(0, 0, interval, 0)
		childCtx := c.NewChildContext(opts)
		return childCtx
	}

	bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
	return &binaryContextShifter{
		ContextShiftFunc:  contextShiftingFn,
		BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
			bootstrapList, err := combineBootstrapWithOriginal(ctx,
				bootstrapStartTime, bootstrapEndTime,
				bootstrapped, singlePathSpec(original))
			if err != nil {
				return ts.NewSeriesList(), err
			}

			results := make([]*ts.Series, 0, original.Len())
			maxWindowPoints := 0
			for i, bootstrap := range bootstrapList.Values {
				series := original.Values[i]
				windowPoints := windowPointsLength(series, interval)
				if windowPoints <= 0 {
					err := errors.NewInvalidParamsError(fmt.Errorf(
						"non positive window points, windowSize=%s, stepSize=%d",
						windowSize, series.MillisPerStep()))
					return ts.NewSeriesList(), err
				}
				if windowPoints > maxWindowPoints {
					maxWindowPoints = windowPoints
				}
			}
			
			windowPoints := make([]float64, maxWindowPoints)
			for i, bootstrap := range bootstrapList.Values {
				series := original.Values[i]
				currWindowPoints := windowPointsLength(series, interval)
				window := windowPoints[:currWindowPoints]
				util.Memset(window, math.NaN())
				numSteps := series.Len()
				offset := bootstrap.Len() - numSteps
				vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
				for i := 0; i < numSteps; i++ {
					for j := i + offset - windowPoints; j < i+offset; j++ {
						if j < 0 || j >= bootstrap.Len() {
							continue
						}

						idx := j - i - offset + windowPoints
						if idx < 0 || idx > len(window)-1 {
							continue
						}

						window[idx] = bootstrap.ValueAt(j)
					}
					impl(window, vals)
				}
				name := fmt.Sprintf("%s(%s,%q)", movingFunctionName, series.Name(), windowSize)
				newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
				results = append(results, newSeries)
			}

			original.Values = results
			return original, nil
		},
	}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that movingAverage differs a fair bit, but at least we could reuse between movingMedian and movingSum, etc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, great idea!

}, nil
}


// legendValue takes one metric or a wildcard seriesList and a string in quotes.
// Appends a value to the metric name in the legend. Currently one or several of:
// "last", "avg", "total", "min", "max".
Expand Down Expand Up @@ -1906,6 +1983,7 @@ func init() {
MustRegisterFunction(mostDeviant)
MustRegisterFunction(movingAverage)
MustRegisterFunction(movingMedian)
MustRegisterFunction(movingSum)
MustRegisterFunction(multiplySeries)
MustRegisterFunction(nonNegativeDerivative).WithDefaultParams(map[uint8]interface{}{
2: math.NaN(), // maxValue
Expand Down
23 changes: 20 additions & 3 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,22 @@ func TestMovingAverageError(t *testing.T) {
testMovingFunctionError(t, "movingAverage(foo.bar.baz, 0)")
}

func TestMovingSumSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
expected := []float64{12.0, 21.0, 36.0, 21.0, 9.0} // (3+4+5), (4+5+12), (5+12+19), (12+19-10), (19-10+Nan)

testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,\"30s\")", values, bootstrap, expected)
testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,3)", nil, nil, nil)

bootstrapEntireSeries := []float64{3.0, 4.0, 5.0, 12.0, 19.0, -10.0, math.NaN(), 10.0}
testMovingFunction(t, "movingSum(foo.bar.baz, '30s')", "movingSum(foo.bar.baz,\"30s\")", values, bootstrapEntireSeries, expected)
}

func TestMovingSumError(t *testing.T) {
testMovingFunctionError(t, "movingSum(foo.bar.baz, '-30s')")
}

func TestIsNonNull(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -2547,14 +2563,14 @@ func TestMovingMedianInvalidLimits(t *testing.T) {
func TestMovingMismatchedLimits(t *testing.T) {
// NB: this tests the behavior when query limits do not snap exactly to data
// points. When limits do not snap exactly, the first point should be omitted.
for _, fn := range []string{"movingAverage", "movingMedian"} {
for _, fn := range []string{"movingAverage", "movingMedian", "movingSum"} {
for i := time.Duration(0); i < time.Minute; i += time.Second {
testMovingAverageInvalidLimits(t, fn, i)
testMovingFunctionInvalidLimits(t, fn, i)
}
}
}

func testMovingAverageInvalidLimits(t *testing.T, fn string, offset time.Duration) {
func testMovingFunctionInvalidLimits(t *testing.T, fn string, offset time.Duration) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

Expand Down Expand Up @@ -2960,6 +2976,7 @@ func TestFunctionsRegistered(t *testing.T) {
"mostDeviant",
"movingAverage",
"movingMedian",
"movingSum",
"multiplySeries",
"nonNegativeDerivative",
"nPercentile",
Expand Down