Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Fix graphite functions, aggregation bug #2549

Merged
merged 20 commits into from
Aug 23, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/query/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
//go:generate sh -c "mockgen -package=transform -destination=$GOPATH/src/$PACKAGE/src/query/executor/transform/types_mock.go $PACKAGE/src/query/executor/transform OpNode"
//go:generate sh -c "mockgen -package=executor -destination=$GOPATH/src/$PACKAGE/src/query/executor/types_mock.go $PACKAGE/src/query/executor Engine"
//go:generate sh -c "mockgen -package=cost -destination=$GOPATH/src/github.com/m3db/m3/src/query/cost/cost_mock.go $PACKAGE/src/query/cost ChainedEnforcer,ChainedReporter"
//go:generate sh -c "mockgen -package=storage -destination=$GOPATH/src/$PACKAGE/src/query/graphite/storage/storage_mock.go $PACKAGE/src/query/graphite/storage Storage"

// mockgen rules for generating mocks for unexported interfaces (file mode).
//go:generate sh -c "mockgen -package=m3ql -destination=$GOPATH/src/github.com/m3db/m3/src/query/parser/m3ql/types_mock.go -source=$GOPATH/src/github.com/m3db/m3/src/query/parser/m3ql/types.go"
Expand Down
12 changes: 11 additions & 1 deletion src/query/graphite/native/builtin_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,6 +1578,7 @@ func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*bi
if err != nil {
return nil, err
}

if interval <= 0 {
return nil, common.ErrInvalidIntervalFormat
}
Expand Down Expand Up @@ -1614,7 +1615,16 @@ func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*bi
vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps)
for i := 0; i < numSteps; i++ {
for j := i + offset - windowPoints; j < i+offset; j++ {
window[j-i-offset+windowPoints] = bootstrap.ValueAt(j)
if j < 0 || j >= bootstrap.Len()-1 {
continue
}

idx := j - i - offset + windowPoints
if idx < 0 || idx >= len(window)-1 {
continue
}

window[idx] = bootstrap.ValueAt(j)
}
nans := common.SafeSort(window)
if nans < windowPoints {
Expand Down
72 changes: 58 additions & 14 deletions src/query/graphite/native/builtin_functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: third party imports last?

"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/context"
xctx "github.com/m3db/m3/src/query/graphite/context"
"github.com/m3db/m3/src/query/graphite/storage"
xtest "github.com/m3db/m3/src/query/graphite/testing"
"github.com/m3db/m3/src/query/graphite/ts"
xgomock "github.com/m3db/m3/src/x/test"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: third party imports last?


"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -2448,20 +2451,22 @@ func TestChanged(t *testing.T) {
expected, results.Values)
}

// TODO: re-enable
// nolint
func testMovingMedian(t *testing.T) {
now := time.Now()
engine := NewEngine(
testStorage,
)
func TestMovingMedian(t *testing.T) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

store := storage.NewMockStorage(ctrl)
now := time.Now().Truncate(time.Hour)
engine := NewEngine(store)
startTime := now.Add(-3 * time.Minute)
endTime := now.Add(-time.Minute)
ctx := common.NewContext(common.ContextOptions{Start: startTime, End: endTime, Engine: engine})
defer ctx.Close()

stepSize := 60000
target := "movingMedian(foo.bar.q.zed, '1min')"
store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
buildTestSeriesFn(stepSize, "foo.bar.q.zed")).Times(2)
expr, err := engine.Compile(target)
require.NoError(t, err)
res, err := expr.Execute(ctx)
Expand All @@ -2474,6 +2479,41 @@ func testMovingMedian(t *testing.T) {
[]common.TestSeries{expected}, res.Values)
}

func TestMovingMedianInvalidLimits(t *testing.T) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

store := storage.NewMockStorage(ctrl)
now := time.Now().Truncate(time.Hour)
engine := NewEngine(store)
startTime := now.Add(-3 * time.Minute)
endTime := now.Add(-time.Minute)
ctx := common.NewContext(common.ContextOptions{Start: startTime, End: endTime, Engine: engine})
defer ctx.Close()

stepSize := 60000
target := "movingMedian(foo.bar.q.zed, '1min')"
store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, q string, opts storage.FetchOptions) (*storage.FetchResult, error) {
startTime := opts.StartTime
ctx := context.New()
numSteps := int(opts.EndTime.Sub(startTime)/time.Millisecond) / stepSize
vals := ts.NewConstantValues(ctx, 0, numSteps, stepSize)
series := ts.NewSeries(ctx, "foo.bar.q.zed", opts.EndTime, vals)
return &storage.FetchResult{SeriesList: []*ts.Series{series}}, nil
}).Times(2)
expr, err := engine.Compile(target)
require.NoError(t, err)
res, err := expr.Execute(ctx)
require.NoError(t, err)
expected := common.TestSeries{
Name: "movingMedian(foo.bar.q.zed,\"1min\")",
Data: []float64{0.0, 0.0},
}
common.CompareOutputsAndExpected(t, stepSize, endTime,
[]common.TestSeries{expected}, res.Values)
}

func TestLegendValue(t *testing.T) {
ctx := common.NewTestContext()
defer ctx.Close()
Expand Down Expand Up @@ -2680,13 +2720,13 @@ func TestTimeFunction(t *testing.T) {
[]common.TestSeries{expected}, results.Values)
}

// TODO arnikola reenable
// nolint
func testTimeShift(t *testing.T) {
now := time.Now()
engine := NewEngine(
testStorage,
)
func TestTimeShift(t *testing.T) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

store := storage.NewMockStorage(ctrl)
now := time.Now().Truncate(time.Hour)
engine := NewEngine(store)
startTime := now.Add(-3 * time.Minute)
endTime := now.Add(-time.Minute)
ctx := common.NewContext(common.ContextOptions{
Expand All @@ -2698,6 +2738,10 @@ func testTimeShift(t *testing.T) {

stepSize := 60000
target := "timeShift(foo.bar.q.zed, '1min', false)"

store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
buildTestSeriesFn(stepSize, "foo.bar.q.zed"))

expr, err := engine.Compile(target)
require.NoError(t, err)
res, err := expr.Execute(ctx)
Expand Down
135 changes: 81 additions & 54 deletions src/query/graphite/native/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,22 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/metrics/policy"
"github.com/golang/mock/gomock"
"github.com/m3db/m3/src/query/graphite/common"
"github.com/m3db/m3/src/query/graphite/context"
"github.com/m3db/m3/src/query/graphite/storage"
xtime "github.com/m3db/m3/src/x/time"
"github.com/m3db/m3/src/query/graphite/ts"
xgomock "github.com/m3db/m3/src/x/test"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// nolint
type queryTestResult struct {
name string
max float64
series string
expected string
max float64
}

// nolint
Expand All @@ -46,8 +49,29 @@ type queryTest struct {
results []queryTestResult
}

func testSeries(name string, stepSize int, val float64, opts storage.FetchOptions) *ts.Series {
ctx := context.New()
numSteps := int(opts.EndTime.Sub(opts.StartTime)/time.Millisecond) / stepSize
vals := ts.NewConstantValues(ctx, val, numSteps, stepSize)
return ts.NewSeries(ctx, name, opts.StartTime, vals)
}

func buildTestSeriesFn(
stepSize int,
id ...string,
) func(context.Context, string, storage.FetchOptions) (*storage.FetchResult, error) {
return func(_ context.Context, q string, opts storage.FetchOptions) (*storage.FetchResult, error) {
series := make([]*ts.Series, 0, len(id))
for _, name := range id {
val := testValues[name]
series = append(series, testSeries(name, stepSize, val, opts))
}

return &storage.FetchResult{SeriesList: series}, nil
}
}

var (
// nolint
testValues = map[string]float64{
"foo.bar.q.zed": 0,
"foo.bar.g.zed": 1,
Expand All @@ -57,42 +81,57 @@ var (
"chicago.cake": 5,
"los_angeles.cake": 6,
}

// nolint
testPolicy = policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour)
// testTSDB = makeTSDB(testPolicy)
// nolint
testStorage storage.Storage //= nil
// local.NewLocalStorage(local.Options{
// Database: testTSDB,
// Workers: workers,
// Scope: metrics.None,
// PolicyResolver: resolver.NewStaticResolver(testIndex, testPolicy),
// })
)

// TODO arnikola reenable
// nolint
func testExecute(t *testing.T) {
engine := NewEngine(
testStorage,
)
func newTestStorage(ctrl *gomock.Controller) storage.Storage {
store := storage.NewMockStorage(ctrl)
store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(
func(
ctx context.Context,
query string,
opts storage.FetchOptions,
) (*storage.FetchResult, error) {
return &storage.FetchResult{}, nil
})

return store
}

func TestExecute(t *testing.T) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

store := storage.NewMockStorage(ctrl)
engine := NewEngine(store)

tests := []queryTest{
{"foo.bar.q.zed", true, []queryTestResult{{"foo.bar.q.zed", 0}}},
{"foo.bar.q.zed", true, []queryTestResult{{"foo.bar.q.zed", "foo.bar.q.zed", 0}}},
{"foo.bar.*.zed", false, []queryTestResult{
{"foo.bar.q.zed", 0},
{"foo.bar.g.zed", 1},
{"foo.bar.x.zed", 2}},
{"foo.bar.q.zed", "foo.bar.q.zed", 0},
{"foo.bar.g.zed", "foo.bar.g.zed", 1},
{"foo.bar.x.zed", "foo.bar.x.zed", 2}},
},
{"sortByName(aliasByNode(foo.bar.*.zed, 0, 2))", true, []queryTestResult{
{"foo.g", 1},
{"foo.q", 0},
{"foo.x", 2},
{"foo.bar.g.zed", "foo.g", 1},
{"foo.bar.q.zed", "foo.q", 0},
{"foo.bar.x.zed", "foo.x", 2},
}},
}

ctx := common.NewContext(common.ContextOptions{Start: time.Now().Add(-1 * time.Hour), End: time.Now(), Engine: engine})
for _, test := range tests {

stepSize := 60000
queries := make([]string, 0, len(test.results))
for _, r := range test.results {
queries = append(queries, r.series)
}

store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
buildTestSeriesFn(stepSize, queries...))

expr, err := engine.Compile(test.query)
require.Nil(t, err)

Expand All @@ -102,7 +141,7 @@ func testExecute(t *testing.T) {

for i := range test.results {
if test.ordered {
assert.Equal(t, test.results[i].name, results.Values[i].Name(),
assert.Equal(t, test.results[i].expected, results.Values[i].Name(),
"invalid result %d for %s", i, test.query)
assert.Equal(t, test.results[i].max, results.Values[i].CalcStatistics().Max,
"invalid result %d for %s", i, test.query)
Expand All @@ -111,19 +150,25 @@ func testExecute(t *testing.T) {
}
}

// TODO arnikola reenable
// nolint
func testTracing(t *testing.T) {
engine := NewEngine(
testStorage,
)
func TestTracing(t *testing.T) {
ctrl := xgomock.NewController(t)
defer ctrl.Finish()

store := storage.NewMockStorage(ctrl)

engine := NewEngine(store)
var traces []common.Trace

ctx := common.NewContext(common.ContextOptions{Start: time.Now().Add(-1 * time.Hour), End: time.Now(), Engine: engine})
ctx.Trace = func(t common.Trace) {
traces = append(traces, t)
}

stepSize := 60000
store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
buildTestSeriesFn(stepSize, "foo.bar.q.zed", "foo.bar.g.zed",
"foo.bar.x.zed"))

expr, err := engine.Compile("groupByNode(sortByName(aliasByNode(foo.bar.*.zed, 0, 2)), 0, 'sumSeries')")
require.NoError(t, err)

Expand Down Expand Up @@ -155,21 +200,3 @@ func testTracing(t *testing.T) {
assert.Equal(t, expected.Outputs, trace.Outputs, "incorrect outputs for trace %d", i)
}
}

// func makeTSDB(policy policy.StoragePolicy) tsdb.Database {
// var (
// now = time.Now().Truncate(time.Second * 10)
// testTSDB = nil //FIXME mocktsdb.New()
// ctx = context.New()
// )

// defer ctx.Close()

// for name, val := range testValues {
// for t := now.Add(-time.Hour * 2); t.Before(now.Add(time.Hour)); t = t.Add(time.Second * 10) {
// testTSDB.WriteRaw(ctx, name, t, val, policy)
// }
// }

// return testIndex, testTSDB
// }
Loading