Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Implemented the Graphite integralByInterval function #2596

Merged
merged 32 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4e901d7
Added the moving movingMin function
teddywahle Aug 27, 2020
f42685a
Merge branch 'master' into twahle-moving-min
teddywahle Aug 27, 2020
f0df4ce
worked on moving min
teddywahle Aug 27, 2020
bef6377
Merge branch 'twahle-moving-min' of https://github.com/teddywahle/m3 …
teddywahle Aug 27, 2020
8ce6dc3
more work on the moving min func
teddywahle Aug 27, 2020
30ef946
Merge branch 'master' into twahle-moving-min
teddywahle Aug 28, 2020
cf9288f
updated movingMedian
teddywahle Aug 28, 2020
50b0ae9
Merge branch 'twahle-moving-min' of https://github.com/teddywahle/m3 …
teddywahle Aug 28, 2020
1ebe629
Apply suggestions from code review
teddywahle Aug 28, 2020
ebaafce
testMovingFunction
teddywahle Aug 28, 2020
2a0643c
wrote test movingSum function
teddywahle Aug 28, 2020
53222b9
Merge branch 'master' into twahle-moving-min
teddywahle Aug 31, 2020
f45d288
Merge branch 'master' into master
teddywahle Sep 4, 2020
12a47a7
Apply suggestions from code review
teddywahle Sep 4, 2020
67b5378
Apply suggestions from code review
teddywahle Sep 4, 2020
fabdacb
finished basic logic, just need to write tests
teddywahle Sep 4, 2020
58d7914
Merge branch 'master' into graphite-integral-by-interval
teddywahle Sep 4, 2020
70fc626
completed testing for integralByInteral
teddywahle Sep 4, 2020
9083619
Merge branch 'master' into graphite-integral-by-interval
teddywahle Sep 7, 2020
26f1190
Merge branch 'master' into graphite-integral-by-interval
teddywahle Sep 8, 2020
0f425b9
Update src/query/graphite/native/builtin_functions.go
teddywahle Sep 11, 2020
78a46a3
Merge branch 'master' into graphite-integral-by-interval
teddywahle Sep 11, 2020
deb483e
made variables more Go-like
teddywahle Sep 11, 2020
9bde29f
Merge branch 'master' into graphite-integral-by-interval
teddywahle Sep 21, 2020
a912b30
Update src/query/graphite/native/builtin_functions.go
teddywahle Sep 21, 2020
03ad7d5
Apply suggestions from code review
teddywahle Sep 21, 2020
970ed5c
Ran go fmt
teddywahle Sep 21, 2020
b0155f4
Added proper test case from graphite-web source code
teddywahle Sep 21, 2020
6f85d3c
Apply suggestions from code review
teddywahle Sep 21, 2020
678690a
Apply suggestions from code review
teddywahle Sep 21, 2020
6cabbd4
fixed formatting
teddywahle Sep 21, 2020
54d6307
Merge branch 'master' into graphite-integral-by-interval
teddywahle Sep 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func delay(
func delayValuesHelper(ctx *common.Context, series *ts.Series, steps int) ts.Values {
output := ts.NewValues(ctx, series.MillisPerStep(), series.Len())
for i := steps; i < series.Len(); i++ {
output.SetValueAt(i, series.ValueAt(i - steps))
output.SetValueAt(i, series.ValueAt(i-steps))
}
return output
}
Expand All @@ -281,7 +281,7 @@ func delayValuesHelper(ctx *common.Context, series *ts.Series, steps int) ts.Val
// Useful for filtering out a part of a series of data from a wider range of data.
func timeSlice(ctx *common.Context, inputPath singlePathSpec, start string, end string) (ts.SeriesList, error) {
var (
now = time.Now()
now = time.Now()
tzOffsetForAbsoluteTime time.Duration
)
startTime, err := graphite.ParseTime(start, now, tzOffsetForAbsoluteTime)
Expand Down Expand Up @@ -633,8 +633,8 @@ func lowestCurrent(_ *common.Context, input singlePathSpec, n int) (ts.SeriesLis
type windowSizeFunc func(stepSize int) int

type windowSizeParsed struct {
deltaValue time.Duration
stringValue string
deltaValue time.Duration
stringValue string
windowSizeFunc windowSizeFunc
}

Expand Down Expand Up @@ -841,7 +841,7 @@ func exponentialMovingAverage(ctx *common.Context, input singlePathSpec, windowS
curr := bootstrap.ValueAt(i + offset)
if !math.IsNaN(curr) {
// formula: ema(current) = constant * (Current Value) + (1 - constant) * ema(previous)
ema = emaConstant * curr + (1 - emaConstant) * ema
ema = emaConstant*curr + (1-emaConstant)*ema
vals.SetValueAt(i, ema)
} else {
vals.SetValueAt(i, math.NaN())
Expand Down Expand Up @@ -1086,6 +1086,46 @@ func integral(ctx *common.Context, input singlePathSpec) (ts.SeriesList, error)
return r, nil
}

// integralByInterval will do the same as integral funcion, except it resets the total to 0
// at the given time in the parameter “from”. Useful for finding totals per hour/day/week.
func integralByInterval(ctx *common.Context, input singlePathSpec, intervalString string) (ts.SeriesList, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be added to builtin_functions.go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

intervalUnit, err := common.ParseInterval(intervalString)
if err != nil {
return ts.NewSeriesList(), err
}
results := make([]*ts.Series, 0, len(input.Values))

for _, series := range input.Values {
var (
stepsPerInterval = intervalUnit.Milliseconds() / int64(series.MillisPerStep())
outVals = ts.NewValues(ctx, series.MillisPerStep(), series.Len())
stepCounter int64 = 0
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
currentSum float64 = 0.0
teddywahle marked this conversation as resolved.
Show resolved Hide resolved
)

for i := 0; i < series.Len(); i++ {
if stepCounter == stepsPerInterval {
// startNewInterval
stepCounter = 0
currentSum = 0.0
}
n := series.ValueAt(i)
if !math.IsNaN(n) {
currentSum += n
}
outVals.SetValueAt(i, currentSum)
stepCounter += 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct to increment step counter if the value is NaN?

Copy link
Contributor Author

@teddywahle teddywahle Sep 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Answer is yes according to graphite-web source code.

}

newName := fmt.Sprintf("integralByInterval(%s, %s)", series.Name(), intervalString)
results = append(results, ts.NewSeries(ctx, newName, series.StartTime(), outVals))
}

r := ts.SeriesList(input)
r.Values = results
return r, nil
}

// This is the opposite of the integral function. This is useful for taking a
// running total metric and calculating the delta between subsequent data
// points.
Expand Down Expand Up @@ -1798,8 +1838,6 @@ func movingMinHelper(window []float64, vals ts.MutableValues, windowPoints int,
}
}



func newMovingBinaryTransform(
ctx *common.Context,
input singlePathSpec,
Expand Down Expand Up @@ -1830,7 +1868,7 @@ func newMovingBinaryTransform(

bootstrapStartTime, bootstrapEndTime := ctx.StartTime.Add(-interval), ctx.StartTime
return &binaryContextShifter{
ContextShiftFunc: contextShiftingFn,
ContextShiftFunc: contextShiftingFn,
BinaryTransformer: func(bootstrapped, original ts.SeriesList) (ts.SeriesList, error) {
bootstrapList, err := combineBootstrapWithOriginal(ctx,
bootstrapStartTime, bootstrapEndTime,
Expand Down Expand Up @@ -1910,7 +1948,6 @@ func movingMin(ctx *common.Context, input singlePathSpec, windowSize genericInte
return newMovingBinaryTransform(ctx, input, windowSize, "movingMin", movingMinHelper)
}


// legendValue takes one metric or a wildcard seriesList and a string in quotes.
// Appends a value to the metric name in the legend. Currently one or several of:
// "last", "avg", "total", "min", "max".
Expand Down Expand Up @@ -2145,6 +2182,7 @@ func init() {
MustRegisterFunction(holtWintersForecast)
MustRegisterFunction(identity)
MustRegisterFunction(integral)
MustRegisterFunction(integralByInterval)
MustRegisterFunction(isNonNull)
MustRegisterFunction(keepLastValue).WithDefaultParams(map[uint8]interface{}{
2: -1, // limit
Expand Down
63 changes: 49 additions & 14 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,8 @@ func testMovingFunction(t *testing.T, target, expectedName string, values, boots
}

var (
testGeneralFunctionStart = time.Now().Add(time.Minute * -11).Truncate(time.Minute)
testGeneralFunctionEnd = time.Now().Add(time.Minute * -3).Truncate(time.Minute)
testGeneralFunctionStart = time.Now().Add(time.Minute * -11).Truncate(time.Minute)
testGeneralFunctionEnd = time.Now().Add(time.Minute * -3).Truncate(time.Minute)
)

// testGeneralFunction is a copy of testMovingFunction but without any logic for bootstrapping values
Expand All @@ -654,8 +654,8 @@ func testGeneralFunction(t *testing.T, target, expectedName string, values, outp

engine := NewEngine(
&common.MovingFunctionStorage{
StepMillis: 60000,
Values: values,
StepMillis: 60000,
Values: values,
},
)
phonyContext := common.NewContext(common.ContextOptions{
Expand Down Expand Up @@ -694,11 +694,11 @@ func TestMovingAverageSuccess(t *testing.T) {

func TestExponentialMovingAverageSuccess(t *testing.T) {
tests := []struct {
target string
target string
expectedName string
bootstrap []float64
inputs []float64
expected []float64
bootstrap []float64
inputs []float64
expected []float64
}{
{
"exponentialMovingAverage(foo.bar.baz, 3)",
Expand Down Expand Up @@ -1823,6 +1823,40 @@ func TestIntegral(t *testing.T) {
}
}

/*
seriesList = self._gen_series_list_with_data(key='test',start=0,end=600,step=60,data=[None, 1, 2, 3, 4, 5, None, 6, 7, 8])
expected = [TimeSeries("integralByInterval(test,'2min')", 0, 600, 60, [0, 1, 2, 5, 4, 9, 0, 6, 7, 15])]
*/
func TestIntegralByInterval(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()

invals := []float64{
math.NaN(), 1, 2, 3, 4, 5, math.NaN(), 6, 7, 8,
}

outvals := []float64{
0, 1, 2, 5, 4, 9, 0, 6, 7, 15,
}

series := ts.NewSeries(ctx, "hello", time.Now(),
common.NewTestSeriesValues(ctx, 60000, invals))

r, err := integralByInterval(ctx, singlePathSpec{
Values: []*ts.Series{series},
}, "2min")
require.NoError(t, err)

output := r.Values
require.Equal(t, 1, len(output))
assert.Equal(t, "integralByInterval(hello, 2min)", output[0].Name())
assert.Equal(t, series.StartTime(), output[0].StartTime())
require.Equal(t, len(outvals), output[0].Len())
for i, expected := range outvals {
xtest.Equalish(t, expected, output[0].ValueAt(i), "incorrect value at %d", i)
}
}

func TestDerivative(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -2983,8 +3017,8 @@ func TestDelay(t *testing.T) {
}

var (
testDelayStart = time.Now().Truncate(time.Minute)
testDelayEnd = testMovingFunctionEnd.Add(time.Minute)
testDelayStart = time.Now().Truncate(time.Minute)
testDelayEnd = testMovingFunctionEnd.Add(time.Minute)
)

func testDelay(t *testing.T, target, expectedName string, values, output []float64) {
Expand All @@ -2993,8 +3027,8 @@ func testDelay(t *testing.T, target, expectedName string, values, output []float

engine := NewEngine(
&common.MovingFunctionStorage{
StepMillis: 10000,
Values: values,
StepMillis: 10000,
Values: values,
},
)
phonyContext := common.NewContext(common.ContextOptions{
Expand All @@ -3020,8 +3054,8 @@ func testDelay(t *testing.T, target, expectedName string, values, output []float
}

func TestTimeSlice(t *testing.T) {
values := []float64{math.NaN(),1.0,2.0,3.0,math.NaN(),5.0,6.0,math.NaN(),7.0,8.0,9.0}
expected := []float64{math.NaN(),math.NaN(),math.NaN(),3.0,math.NaN(),5.0,6.0,math.NaN(),7.0,math.NaN(),math.NaN()}
values := []float64{math.NaN(), 1.0, 2.0, 3.0, math.NaN(), 5.0, 6.0, math.NaN(), 7.0, 8.0, 9.0}
expected := []float64{math.NaN(), math.NaN(), math.NaN(), 3.0, math.NaN(), 5.0, 6.0, math.NaN(), 7.0, math.NaN(), math.NaN()}

testGeneralFunction(t, "timeSlice(foo.bar.baz, '-9min','-3min')", "timeSlice(foo.bar.baz, -9min, -3min)", values, expected)
}
Expand Down Expand Up @@ -3134,6 +3168,7 @@ func TestFunctionsRegistered(t *testing.T) {
"holtWintersForecast",
"identity",
"integral",
"integralByInterval",
"isNonNull",
"keepLastValue",
"legendValue",
Expand Down