Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Implemented the Graphite interpolate function #2650

Merged
merged 8 commits into from
Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,64 @@ func logarithm(ctx *common.Context, input singlePathSpec, base int) (ts.SeriesLi
return r, nil
}

// interpolate takes one metric or a wildcard seriesList, and optionally a limit to the number of ‘None’ values
// to skip over. Continues the line with the last received value when gaps (‘None’ values)
// appear in your data, rather than breaking your line.
//
// interpolate will not interpolate at the beginning or end of a series, only in the middle
func interpolate(ctx *common.Context, input singlePathSpec, limit int) (ts.SeriesList, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we explicitly call out that it won't interpolate at the end or the start? Graphite isn't very clear on this in their docs but would be useful to have here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

output := make([]*ts.Series, 0, len(input.Values))
for _, series := range input.Values {
var (
consecutiveNaNs = 0
numSteps = series.Len()
vals = ts.NewValues(ctx, series.MillisPerStep(), numSteps)
firstNonNan = false
)

for i := 0; i < numSteps; i++ {
value := series.ValueAt(i)
vals.SetValueAt(i, value)

if math.IsNaN(value) {
if !firstNonNan {
continue
}

consecutiveNaNs++
if limit >= 0 && consecutiveNaNs > limit {
consecutiveNaNs = 0
}

continue
} else {
firstNonNan = true
}

if consecutiveNaNs == 0 {
// have a value but no need to interpolate
continue
}

interpolated := series.ValueAt(i - consecutiveNaNs - 1)
interpolateStep := (value - interpolated) / float64(consecutiveNaNs+1)
for index := i - consecutiveNaNs; index < i; index++ {
interpolated = interpolated + interpolateStep
vals.SetValueAt(index, interpolated)
}

consecutiveNaNs = 0
}

name := fmt.Sprintf("interpolate(%s)", series.Name())
newSeries := ts.NewSeries(ctx, name, series.StartTime(), vals)
output = append(output, newSeries)
}
r := ts.SeriesList(input)
r.Values = output
return r, nil
}

// group takes an arbitrary number of pathspecs and adds them to a single timeseries array.
// This function is used to pass multiple pathspecs to a function which only takes one
func group(_ *common.Context, input multiplePathSpecs) (ts.SeriesList, error) {
Expand Down Expand Up @@ -2243,6 +2301,9 @@ func init() {
MustRegisterFunction(identity)
MustRegisterFunction(integral)
MustRegisterFunction(integralByInterval)
MustRegisterFunction(interpolate).WithDefaultParams(map[uint8]interface{}{
2: -1, // limit
})
MustRegisterFunction(isNonNull)
MustRegisterFunction(keepLastValue).WithDefaultParams(map[uint8]interface{}{
2: -1, // limit
Expand Down
66 changes: 62 additions & 4 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1983,10 +1983,67 @@ func TestIntegral(t *testing.T) {
}
}

/*
seriesList = self._gen_series_list_with_data(key='test',start=0,end=600,step=60,data=[None, 1, 2, 3, 4, 5, None, 6, 7, 8])
expected = [TimeSeries("integralByInterval(test,'2min')", 0, 600, 60, [0, 1, 2, 5, 4, 9, 0, 6, 7, 15])]
*/
func TestInterpolate(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()

tests := []struct {
values []float64
output []float64
limit int
}{
{
Copy link
Collaborator

@arnikola arnikola Sep 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mind adding a test where there's more nans than limit, and maybe one with a bunch of preceeding nans (to flex the i==0 case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added both

[]float64{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0},
[]float64{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0},
-1,
},
{
[]float64{math.NaN(), 2.0, math.NaN(), 4.0, math.NaN(), 6.0, math.NaN(), 8.0, math.NaN(), 10.0, math.NaN(), 12.0, math.NaN(), 14.0, math.NaN(), 16.0, math.NaN(), 18.0, math.NaN(), 20.0},
[]float64{math.NaN(), 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0},
-1,
},
{
[]float64{1.0, 2.0, math.NaN(), math.NaN(), math.NaN(), 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, math.NaN(), math.NaN(), math.NaN()},
[]float64{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, math.NaN(), math.NaN(), math.NaN()},
-1,
},
{
[]float64{1.0, 2.0, 3.0, 4.0, math.NaN(), 6.0, math.NaN(), math.NaN(), 9.0, 10.0, 11.0, math.NaN(), 13.0, math.NaN(), math.NaN(), math.NaN(), math.NaN(), 18.0, 19.0, 20.0},
[]float64{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0},
-1,
},
{
[]float64{1.0, 2.0, math.NaN(), math.NaN(), math.NaN(), 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, math.NaN(), math.NaN()},
[]float64{1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, math.NaN(), math.NaN()},
-1,
},
{
[]float64{1.0, 2.0, math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, math.NaN(), math.NaN()},
[]float64{1.0, 2.0, math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, math.NaN(), math.NaN()},
3,
},
{
[]float64{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0},
[]float64{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0},
-1,
},
}

start := time.Now()
step := 100
for _, test := range tests {
input := []common.TestSeries{{"foo", test.values}}
expected := []common.TestSeries{{"interpolate(foo)", test.output}}
timeSeries := generateSeriesList(ctx, start, input, step)
output, err := interpolate(ctx, singlePathSpec{
Values: timeSeries,
}, test.limit)
require.NoError(t, err)
common.CompareOutputsAndExpected(t, step, start,
expected, output.Values)
}
}

func TestIntegralByInterval(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -3365,6 +3422,7 @@ func TestFunctionsRegistered(t *testing.T) {
"identity",
"integral",
"integralByInterval",
"interpolate",
"isNonNull",
"keepLastValue",
"legendValue",
Expand Down