Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fix snapping bug affecting "moving" function (movingMedian, movingAverage, etc.) #2694

Merged
merged 8 commits into from
Oct 5, 2020
6 changes: 5 additions & 1 deletion src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1503,9 +1503,13 @@ func combineBootstrapWithOriginal(
return ts.NewSeriesList(), err
}
}
bootstrapEndStep := endTime.Truncate(original.Resolution())
if bootstrapEndStep.Before(endTime) {
bootstrapEndStep = bootstrapEndStep.Add(original.Resolution())
}
// NB(braskin): using bootstrap.Len() is incorrect as it will include all
// of the steps in the original timeseries, not just the steps up to the new end time
bootstrapLength := bootstrap.StepAtTime(endTime)
bootstrapLength := bootstrap.StepAtTime(bootstrapEndStep)
ratio := bootstrap.MillisPerStep() / original.MillisPerStep()
numBootstrapValues := bootstrapLength * ratio
numCombinedValues := numBootstrapValues + original.Len()
Expand Down
127 changes: 35 additions & 92 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/context"
xctx "github.com/m3db/m3/src/query/graphite/context"
"github.com/m3db/m3/src/query/graphite/storage"
xtest "github.com/m3db/m3/src/query/graphite/testing"
Expand Down Expand Up @@ -721,6 +720,41 @@ func testGeneralFunction(t *testing.T, target, expectedName string, values, outp
common.CompareOutputsAndExpected(t, 60000, testGeneralFunctionStart, expected, res.Values)
}

func TestCombineBootstrapWithOriginal(t *testing.T) {
var (
contextStart = time.Date(2020, time.October, 5, 1, 15, 37, 884207922, time.UTC)
contextEnd = time.Date(2020, time.October, 5, 1, 18, 37, 884207922, time.UTC)
ctx = common.NewContext(common.ContextOptions{
Start: contextStart,
End: contextEnd,
Engine: NewEngine(&common.MovingFunctionStorage{}),
})

originalStart = time.Date(2020, time.October, 5, 1, 16, 00, 0, time.UTC)
originalValues = []float64{14, 15, 16, 17, 18}
originalSeriesListValues = []*ts.Series{ts.NewSeries(ctx, "original", originalStart, common.NewTestSeriesValues(ctx, 30000, originalValues))}
originalSeriesList = singlePathSpec{Values: originalSeriesListValues}

bootstrappedStart = time.Date(2020, time.October, 5, 1, 15, 00, 0, time.UTC)
bootstrappedValues = []float64{12, 13, 14, 15, 16, 17, 18}
bootstrappedSeriesListValues = []*ts.Series{ts.NewSeries(ctx, "original", bootstrappedStart, common.NewTestSeriesValues(ctx, 30000, bootstrappedValues))}
bootstrappedSeriesList = ts.NewSeriesList()

bootstrapStartTime = time.Date(2020, time.October, 5, 1, 14, 37, 884207922, time.UTC)
bootstrapEndTime = time.Date(2020, time.October, 5, 1, 15, 37, 884207922, time.UTC)

expectedValues = []float64{12, 13, 14, 15, 16, 17, 18}
expectedSeries = ts.NewSeries(ctx, "original", bootstrapStartTime, common.NewTestSeriesValues(ctx, 30000, expectedValues))
)
bootstrappedSeriesList.Values = bootstrappedSeriesListValues

defer ctx.Close()

output, err := combineBootstrapWithOriginal(ctx, bootstrapStartTime, bootstrapEndTime, bootstrappedSeriesList, originalSeriesList)
assert.Equal(t, output.Values[0], expectedSeries)
assert.Nil(t, err)
}

func TestMovingAverageSuccess(t *testing.T) {
values := []float64{12.0, 19.0, -10.0, math.NaN(), 10.0}
bootstrap := []float64{3.0, 4.0, 5.0}
Expand Down Expand Up @@ -2884,97 +2918,6 @@ func TestMovingAverage(t *testing.T) {
[]common.TestSeries{expected}, res.Values)
}

func TestMovingMedianInvalidLimits(t *testing.T) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

store := storage.NewMockStorage(ctrl)
now := time.Now().Truncate(time.Hour)
engine := NewEngine(store)
startTime := now.Add(-3 * time.Minute)
endTime := now.Add(-time.Minute)
ctx := common.NewContext(common.ContextOptions{Start: startTime, End: endTime, Engine: engine})
defer ctx.Close()

stepSize := 60000
target := "movingMedian(foo.bar.q.zed, '1min')"
store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, q string, opts storage.FetchOptions) (*storage.FetchResult, error) {
startTime := opts.StartTime
ctx := context.New()
numSteps := int(opts.EndTime.Sub(startTime)/time.Millisecond) / stepSize
vals := ts.NewConstantValues(ctx, 0, numSteps, stepSize)
series := ts.NewSeries(ctx, "foo.bar.q.zed", opts.EndTime, vals)
return &storage.FetchResult{SeriesList: []*ts.Series{series}}, nil
}).Times(2)
expr, err := engine.Compile(target)
require.NoError(t, err)
res, err := expr.Execute(ctx)
require.NoError(t, err)
expected := common.TestSeries{
Name: "movingMedian(foo.bar.q.zed,\"1min\")",
Data: []float64{math.NaN(), 0.0},
}
common.CompareOutputsAndExpected(t, stepSize, endTime,
[]common.TestSeries{expected}, res.Values)
}

func TestMovingMismatchedLimits(t *testing.T) {
// NB: this tests the behavior when query limits do not snap exactly to data
// points. When limits do not snap exactly, the first point should be omitted.
for _, fn := range []string{"movingAverage", "movingMedian", "movingSum", "movingMax", "movingMin"} {
for i := time.Duration(0); i < time.Minute; i += time.Second {
testMovingFunctionInvalidLimits(t, fn, i)
}
}
}

func testMovingFunctionInvalidLimits(t *testing.T, fn string, offset time.Duration) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

store := storage.NewMockStorage(ctrl)
now := time.Now().Truncate(time.Hour).Add(offset)
engine := NewEngine(store)
startTime := now.Add(-3 * time.Minute)
endTime := now.Add(-time.Minute)
ctx := common.NewContext(common.ContextOptions{Start: startTime, End: endTime, Engine: engine})
defer ctx.Close()

stepSize := 60000
target := fmt.Sprintf(`%s(timeShift(foo.bar.*.zed, '-1d'), '1min')`, fn)
store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
buildTestSeriesFn(stepSize, "foo.bar.g.zed", "foo.bar.x.zed"),
).Times(2)
expr, err := engine.Compile(target)
require.NoError(t, err)
res, err := expr.Execute(ctx)
require.NoError(t, err)

expectedStart := startTime
expectedDataG := []float64{1, 1}
expectedDataX := []float64{2, 2}

if offset > 0 {
expectedStart = expectedStart.Add(time.Minute)
expectedDataG[0] = math.NaN()
expectedDataX[0] = math.NaN()
}

expected := []common.TestSeries{
{
Name: fmt.Sprintf(`%s(timeShift(foo.bar.g.zed, -1d),"1min")`, fn),
Data: expectedDataG,
},
{
Name: fmt.Sprintf(`%s(timeShift(foo.bar.x.zed, -1d),"1min")`, fn),
Data: expectedDataX,
},
}

common.CompareOutputsAndExpected(t, stepSize, expectedStart, expected, res.Values)
}

func TestLegendValue(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down