diff --git a/src/aggregator/aggregator/aggregator_mock.go b/src/aggregator/aggregator/aggregator_mock.go index 2f52191a6f..5107239a52 100644 --- a/src/aggregator/aggregator/aggregator_mock.go +++ b/src/aggregator/aggregator/aggregator_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/aggregator/aggregator (interfaces: ElectionManager,FlushTimesManager,PlacementManager) +// Source: github.com/m3db/m3/src/aggregator/aggregator (interfaces: Aggregator,ElectionManager,FlushTimesManager,PlacementManager) -// Copyright (c) 2019 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -31,11 +31,164 @@ import ( "github.com/m3db/m3/src/aggregator/generated/proto/flush" "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/shard" + "github.com/m3db/m3/src/metrics/metadata" + "github.com/m3db/m3/src/metrics/metric/aggregated" + "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/x/watch" "github.com/golang/mock/gomock" ) +// MockAggregator is a mock of Aggregator interface +type MockAggregator struct { + ctrl *gomock.Controller + recorder *MockAggregatorMockRecorder +} + +// MockAggregatorMockRecorder is the mock recorder for MockAggregator +type MockAggregatorMockRecorder struct { + mock *MockAggregator +} + +// NewMockAggregator creates a new mock instance +func NewMockAggregator(ctrl *gomock.Controller) *MockAggregator { + mock := &MockAggregator{ctrl: ctrl} + mock.recorder = &MockAggregatorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockAggregator) EXPECT() *MockAggregatorMockRecorder { + return m.recorder +} + +// AddForwarded mocks base method +func (m *MockAggregator) AddForwarded(arg0 aggregated.ForwardedMetric, arg1 metadata.ForwardMetadata) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddForwarded", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddForwarded indicates an expected call of AddForwarded +func (mr *MockAggregatorMockRecorder) AddForwarded(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddForwarded", reflect.TypeOf((*MockAggregator)(nil).AddForwarded), arg0, arg1) +} + +// AddPassthrough mocks base method +func (m *MockAggregator) AddPassthrough(arg0 aggregated.Metric, arg1 policy.StoragePolicy) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddPassthrough", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddPassthrough indicates an expected call of AddPassthrough +func (mr *MockAggregatorMockRecorder) AddPassthrough(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPassthrough", reflect.TypeOf((*MockAggregator)(nil).AddPassthrough), arg0, arg1) +} + +// AddTimed mocks base method +func (m *MockAggregator) AddTimed(arg0 aggregated.Metric, arg1 metadata.TimedMetadata) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddTimed", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddTimed indicates an expected call of AddTimed +func (mr *MockAggregatorMockRecorder) AddTimed(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTimed", reflect.TypeOf((*MockAggregator)(nil).AddTimed), arg0, arg1) +} + +// AddTimedWithStagedMetadatas mocks base method +func (m *MockAggregator) AddTimedWithStagedMetadatas(arg0 aggregated.Metric, arg1 metadata.StagedMetadatas) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddTimedWithStagedMetadatas", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddTimedWithStagedMetadatas indicates an expected call of AddTimedWithStagedMetadatas +func (mr *MockAggregatorMockRecorder) AddTimedWithStagedMetadatas(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddTimedWithStagedMetadatas", reflect.TypeOf((*MockAggregator)(nil).AddTimedWithStagedMetadatas), arg0, arg1) +} + +// AddUntimed mocks base method +func (m *MockAggregator) AddUntimed(arg0 unaggregated.MetricUnion, arg1 metadata.StagedMetadatas) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddUntimed", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddUntimed indicates an expected call of AddUntimed +func (mr *MockAggregatorMockRecorder) AddUntimed(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddUntimed", reflect.TypeOf((*MockAggregator)(nil).AddUntimed), arg0, arg1) +} + +// Close mocks base method +func (m *MockAggregator) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close +func (mr *MockAggregatorMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockAggregator)(nil).Close)) +} + +// Open mocks base method +func (m *MockAggregator) Open() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Open") + ret0, _ := ret[0].(error) + return ret0 +} + +// Open indicates an expected call of Open +func (mr *MockAggregatorMockRecorder) Open() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockAggregator)(nil).Open)) +} + +// Resign mocks base method +func (m *MockAggregator) Resign() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Resign") + ret0, _ := ret[0].(error) + return ret0 +} + +// Resign indicates an expected call of Resign +func (mr *MockAggregatorMockRecorder) Resign() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resign", reflect.TypeOf((*MockAggregator)(nil).Resign)) +} + +// Status mocks base method +func (m *MockAggregator) Status() RuntimeStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Status") + ret0, _ := ret[0].(RuntimeStatus) + return ret0 +} + +// Status indicates an expected call of Status +func (mr *MockAggregatorMockRecorder) Status() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Status", reflect.TypeOf((*MockAggregator)(nil).Status)) +} + // MockElectionManager is a mock of ElectionManager interface type MockElectionManager struct { ctrl *gomock.Controller diff --git a/src/aggregator/generated/mocks/generate.go b/src/aggregator/generated/mocks/generate.go index f733f62adc..1b16756579 100644 --- a/src/aggregator/generated/mocks/generate.go +++ b/src/aggregator/generated/mocks/generate.go @@ -19,7 +19,7 @@ // THE SOFTWARE. // mockgen rules for generating mocks for exported interfaces (reflection mode). -//go:generate sh -c "mockgen -package=aggregator github.com/m3db/m3/src/aggregator/aggregator ElectionManager,FlushTimesManager,PlacementManager | genclean -pkg github.com/m3db/m3/src/aggregator/aggregator -out $GOPATH/src/github.com/m3db/m3/src/aggregator/aggregator/aggregator_mock.go" +//go:generate sh -c "mockgen -package=aggregator github.com/m3db/m3/src/aggregator/aggregator Aggregator,ElectionManager,FlushTimesManager,PlacementManager | genclean -pkg github.com/m3db/m3/src/aggregator/aggregator -out $GOPATH/src/github.com/m3db/m3/src/aggregator/aggregator/aggregator_mock.go" //go:generate sh -c "mockgen -package=client github.com/m3db/m3/src/aggregator/client Client,AdminClient | genclean -pkg github.com/m3db/m3/src/aggregator/client -out $GOPATH/src/github.com/m3db/m3/src/aggregator/client/client_mock.go" //go:generate sh -c "mockgen -package=handler github.com/m3db/m3/src/aggregator/aggregator/handler Handler | genclean -pkg github.com/m3db/m3/src/aggregator/aggregator/handler -out $GOPATH/src/github.com/m3db/m3/src/aggregator/aggregator/handler/handler_mock.go" //go:generate sh -c "mockgen -package=runtime github.com/m3db/m3/src/aggregator/runtime OptionsWatcher | genclean -pkg github.com/m3db/m3/src/aggregator/runtime -out $GOPATH/src/github.com/m3db/m3/src/aggregator/runtime/runtime_mock.go" diff --git a/src/cmd/services/m3coordinator/downsample/flush_handler_test.go b/src/cmd/services/m3coordinator/downsample/flush_handler_test.go index 202067c2ab..70c6d96f0b 100644 --- a/src/cmd/services/m3coordinator/downsample/flush_handler_test.go +++ b/src/cmd/services/m3coordinator/downsample/flush_handler_test.go @@ -22,6 +22,7 @@ package downsample import ( "bytes" + "sync" "testing" "github.com/m3db/m3/src/metrics/metric/aggregated" @@ -29,7 +30,9 @@ import ( "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage/mock" + "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/pool" "github.com/m3db/m3/src/x/serialize" xsync "github.com/m3db/m3/src/x/sync" xtest "github.com/m3db/m3/src/x/test" @@ -41,7 +44,7 @@ import ( ) func TestDownsamplerFlushHandlerCopiesTags(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() store := mock.NewMockStorage() @@ -104,3 +107,106 @@ func TestDownsamplerFlushHandlerCopiesTags(t *testing.T) { assert.False(t, xtest.ByteSlicesBackedBySameData(tagName, tag.Name)) assert.False(t, xtest.ByteSlicesBackedBySameData(tagValue, tag.Value)) } + +func graphiteTags( + t *testing.T, first string, encPool serialize.TagEncoderPool) []byte { + enc := encPool.Get() + defer enc.Finalize() + + err := enc.Encode(ident.MustNewTagStringsIterator( + "__g0__", first, + "__g1__", "y", + "__g2__", "z", + string(MetricsOptionIDSchemeTagName), string(GraphiteIDSchemeTagValue), + )) + + require.NoError(t, err) + data, ok := enc.Data() + require.True(t, ok) + return append(make([]byte, 0, data.Len()), data.Bytes()...) +} + +func TestDownsamplerFlushHandlerHighConcurrencyNoTagMixing(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + store := mock.NewMockStorage() + + size := 10 + decodeOpts := serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{ + CheckBytesWrapperPoolSize: &size, + }) + + poolOpts := pool.NewObjectPoolOptions() + tagDecoderPool := serialize.NewTagDecoderPool(decodeOpts, poolOpts) + tagDecoderPool.Init() + + pool := serialize.NewMetricTagsIteratorPool(tagDecoderPool, poolOpts) + pool.Init() + + workers := xsync.NewWorkerPool(1) + workers.Init() + + instrumentOpts := instrument.NewOptions() + + handler := newDownsamplerFlushHandler(store, pool, + workers, models.NewTagOptions(), instrumentOpts) + writer, err := handler.NewWriter(tally.NoopScope) + require.NoError(t, err) + + encodeOpts := serialize.NewTagEncoderOptions() + encPool := serialize.NewTagEncoderPool(encodeOpts, poolOpts) + encPool.Init() + + xBytes := graphiteTags(t, "x", encPool) + fooBytes := graphiteTags(t, "foo", encPool) + + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + xData := append(make([]byte, 0, len(xBytes)), xBytes...) + fooData := append(make([]byte, 0, len(fooBytes)), fooBytes...) + go func() { + defer wg.Done() + err := writer.Write(aggregated.ChunkedMetricWithStoragePolicy{ + ChunkedMetric: aggregated.ChunkedMetric{ + ChunkedID: id.ChunkedID{Data: xData}, + TimeNanos: 123, + Value: 42.42, + }, + StoragePolicy: policy.MustParseStoragePolicy("1s:1d"), + }) + require.NoError(t, err) + + err = writer.Write(aggregated.ChunkedMetricWithStoragePolicy{ + ChunkedMetric: aggregated.ChunkedMetric{ + ChunkedID: id.ChunkedID{Data: fooData}, + TimeNanos: 123, + Value: 42.42, + }, + StoragePolicy: policy.MustParseStoragePolicy("1s:1d"), + }) + require.NoError(t, err) + }() + } + + wg.Wait() + // Wait for flush + err = writer.Flush() + require.NoError(t, err) + + // Inspect the write + writes := store.Writes() + require.Equal(t, 200, len(writes)) + + seenMap := make(map[string]int, 10) + for _, w := range writes { + str := w.Tags().String() + seenMap[str] = seenMap[str] + 1 + } + + assert.Equal(t, map[string]int{ + "__g0__: foo, __g1__: y, __g2__: z": 100, + "__g0__: x, __g1__: y, __g2__: z": 100, + }, seenMap) +} diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index 54b7bcf022..4f82fbd2ea 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -65,7 +65,10 @@ func newMetricsAppenderPool(opts pool.ObjectPoolOptions) *metricsAppenderPool { } func (p *metricsAppenderPool) Get() *metricsAppender { - return p.pool.Get().(*metricsAppender) + appender := p.pool.Get().(*metricsAppender) + // NB: reset appender. + appender.NextMetric() + return appender } func (p *metricsAppenderPool) Put(v *metricsAppender) { @@ -177,7 +180,6 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp a.multiSamplesAppender.reset() unownedID := data.Bytes() - // Match policies and rollups and build samples appender id := a.metricTagsIteratorPool.Get() id.Reset(unownedID) @@ -353,7 +355,6 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp a.debugLogMatch("downsampler applying matched rollup rule", debugLogMatchOptions{Meta: rollup.Metadatas, RollupID: rollup.ID}) - a.multiSamplesAppender.addSamplesAppender(samplesAppender{ agg: a.agg, clientRemote: a.clientRemote, diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender_test.go b/src/cmd/services/m3coordinator/downsample/metrics_appender_test.go new file mode 100644 index 0000000000..af3463c592 --- /dev/null +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender_test.go @@ -0,0 +1,156 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package downsample + +import ( + "errors" + "fmt" + "testing" + + "github.com/m3db/m3/src/aggregator/aggregator" + "github.com/m3db/m3/src/metrics/matcher" + "github.com/m3db/m3/src/metrics/metadata" + "github.com/m3db/m3/src/metrics/metric/id" + "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/metrics/rules" + "github.com/m3db/m3/src/x/checked" + "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" + xtest "github.com/m3db/m3/src/x/test" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSamplesAppenderPoolResetsTagsAcrossSamples(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + count := 3 + + poolOpts := pool.NewObjectPoolOptions().SetSize(1) + appenderPool := newMetricsAppenderPool(poolOpts) + + tagEncoderPool := serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(), + poolOpts) + tagEncoderPool.Init() + + size := 1 + tagDecoderPool := serialize.NewTagDecoderPool( + serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{ + CheckBytesWrapperPoolSize: &size, + }), poolOpts) + tagDecoderPool.Init() + + metricTagsIteratorPool := serialize.NewMetricTagsIteratorPool(tagDecoderPool, poolOpts) + metricTagsIteratorPool.Init() + + for i := 0; i < count; i++ { + matcher := matcher.NewMockMatcher(ctrl) + matcher.EXPECT().ForwardMatch(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(encodedID id.ID, _, _ int64) rules.MatchResult { + // NB: ensure tags are cleared correctly between runs. + bs := encodedID.Bytes() + + decoder := tagDecoderPool.Get() + decoder.Reset(checked.NewBytes(bs, nil)) + + var id string + for decoder.Next() { + tag := decoder.Current() + tagStr := fmt.Sprintf("%s-%s", tag.Name.String(), tag.Value.String()) + if len(id) == 0 { + id = tagStr + } else { + id = fmt.Sprintf("%s,%s", id, tagStr) + } + } + + decoder.Close() + return rules.NewMatchResult(1, 1, + metadata.StagedMetadatas{}, + []rules.IDWithMetadatas{ + { + ID: []byte(id), + Metadatas: metadata.StagedMetadatas{}, + }, + }, + ) + }) + + appender := appenderPool.Get() + agg := aggregator.NewMockAggregator(ctrl) + appender.reset(metricsAppenderOptions{ + tagEncoderPool: tagEncoderPool, + metricTagsIteratorPool: metricTagsIteratorPool, + matcher: matcher, + agg: agg, + }) + name := []byte(fmt.Sprint("foo", i)) + value := []byte(fmt.Sprint("bar", i)) + appender.AddTag(name, value) + a, err := appender.SamplesAppender(SampleAppenderOptions{}) + require.NoError(t, err) + + agg.EXPECT().AddUntimed(gomock.Any(), gomock.Any()).DoAndReturn( + func(u unaggregated.MetricUnion, _ metadata.StagedMetadatas) error { + if u.CounterVal != int64(i) { + return errors.New("wrong counter value") + } + + // NB: expected ID is generated into human-readable form + // from tags in ForwardMatch mock above. + expected := fmt.Sprintf("foo%d-bar%d", i, i) + if expected != u.ID.String() { + // NB: if this fails, appender is holding state after Finalize. + return fmt.Errorf("expected ID %s, got %s", expected, u.ID.String()) + } + + return nil + }, + ) + + require.NoError(t, a.SamplesAppender.AppendCounterSample(int64(i))) + + assert.False(t, a.IsDropPolicyApplied) + appender.Finalize() + } +} + +func TestSamplesAppenderPoolResetsTagSimple(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + poolOpts := pool.NewObjectPoolOptions().SetSize(1) + appenderPool := newMetricsAppenderPool(poolOpts) + + appender := appenderPool.Get() + appender.AddTag([]byte("foo"), []byte("bar")) + assert.Equal(t, 1, len(appender.originalTags.names)) + assert.Equal(t, 1, len(appender.originalTags.values)) + appender.Finalize() + + // NB: getting a new appender from the pool yields a clean appender. + appender = appenderPool.Get() + assert.Nil(t, appender.originalTags) + appender.Finalize() +} diff --git a/src/query/generated/mocks/generate.go b/src/query/generated/mocks/generate.go index b92d9a6d98..95393fd6e6 100644 --- a/src/query/generated/mocks/generate.go +++ b/src/query/generated/mocks/generate.go @@ -28,6 +28,7 @@ //go:generate sh -c "mockgen -package=transform -destination=$GOPATH/src/$PACKAGE/src/query/executor/transform/types_mock.go $PACKAGE/src/query/executor/transform OpNode" //go:generate sh -c "mockgen -package=executor -destination=$GOPATH/src/$PACKAGE/src/query/executor/types_mock.go $PACKAGE/src/query/executor Engine" //go:generate sh -c "mockgen -package=cost -destination=$GOPATH/src/github.com/m3db/m3/src/query/cost/cost_mock.go $PACKAGE/src/query/cost ChainedEnforcer,ChainedReporter" +//go:generate sh -c "mockgen -package=storage -destination=$GOPATH/src/$PACKAGE/src/query/graphite/storage/storage_mock.go $PACKAGE/src/query/graphite/storage Storage" // mockgen rules for generating mocks for unexported interfaces (file mode). //go:generate sh -c "mockgen -package=m3ql -destination=$GOPATH/src/github.com/m3db/m3/src/query/parser/m3ql/types_mock.go -source=$GOPATH/src/github.com/m3db/m3/src/query/parser/m3ql/types.go" diff --git a/src/query/graphite/common/test_util.go b/src/query/graphite/common/test_util.go index 893cf4718a..f1d9a95441 100644 --- a/src/query/graphite/common/test_util.go +++ b/src/query/graphite/common/test_util.go @@ -21,6 +21,7 @@ package common import ( + "fmt" "math" "testing" "time" @@ -43,7 +44,7 @@ type TestSeries struct { // NewTestContext creates a new test context. func NewTestContext() *Context { - now := time.Now() + now := time.Now().Truncate(time.Hour) return NewContext(ContextOptions{Start: now.Add(-time.Hour), End: now}) } @@ -95,15 +96,20 @@ func CompareOutputsAndExpected(t *testing.T, step int, start time.Time, expected a := actual[i] require.Equal(t, expected[i].Name, a.Name()) assert.Equal(t, step, a.MillisPerStep(), a.Name()+": MillisPerStep in expected series do not match MillisPerStep in actual") - assert.Equal(t, start, a.StartTime(), a.Name()+": StartTime in expected series does not match StartTime in actual") + diff := time.Duration(math.Abs(float64(start.Sub(a.StartTime())))) + assert.True(t, diff < time.Millisecond, + fmt.Sprintf("%s: StartTime in expected series (%v) does not match StartTime in actual (%v), diff %v", + a.Name(), start, a.StartTime(), diff)) e := expected[i].Data require.Equal(t, len(e), a.Len(), a.Name()+": length of expected series does not match length of actual") for step := 0; step < a.Len(); step++ { v := a.ValueAt(step) if math.IsNaN(e[step]) { - assert.True(t, math.IsNaN(v), a.Name()+": invalid value for step %d/%d, should be NaN but is %v", step, a.Len(), v) + assert.True(t, math.IsNaN(v), fmt.Sprintf("%s: invalid value for step %d/%d, should be NaN but is %v", a.Name(), 1+step, a.Len(), v)) + } else if math.IsNaN(v) { + assert.Fail(t, fmt.Sprintf("%s: invalid value for step %d/%d, should be %v but is NaN ", a.Name(), 1+step, a.Len(), e[step])) } else { - xtest.InDeltaWithNaNs(t, e[step], v, 0.0001, a.Name()+": invalid value for %d/%d", step, a.Len()) + xtest.InDeltaWithNaNs(t, e[step], v, 0.0001, a.Name()+": invalid value for %d/%d", 1+step, a.Len()) } } } diff --git a/src/query/graphite/common/transform_test.go b/src/query/graphite/common/transform_test.go index c1d782f381..6c58c90044 100644 --- a/src/query/graphite/common/transform_test.go +++ b/src/query/graphite/common/transform_test.go @@ -368,7 +368,7 @@ func TestPerSecond(t *testing.T) { TestSeries{Name: "foo | perSecond", Data: []float64{nan, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0}}, TestSeries{Name: "foo | perSecond", Data: []float64{nan, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0}}, TestSeries{Name: "foo | perSecond", Data: []float64{nan, 1.0, 1.0, 1.0, 1.0, nan, 1.0, 1.0, 1.0, 1.0}}, - TestSeries{Name: "foo | perSecond", Data: []float64{nan, nan, nan, nan, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0}}, + TestSeries{Name: "foo | perSecond", Data: []float64{nan, nan, nan, nan, 1.0, nan, 1.0, 1.0, 1.0, 1.0}}, TestSeries{Name: "foo | perSecond", Data: []float64{nan, 1.0, 1.0, nan, nan, nan, 1.0, 1.0, 1.0, 1.0}}, } input := ts.SeriesList{Values: inputSeries} diff --git a/src/query/graphite/native/builtin_functions.go b/src/query/graphite/native/builtin_functions.go index e10ba3c856..def2514987 100644 --- a/src/query/graphite/native/builtin_functions.go +++ b/src/query/graphite/native/builtin_functions.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3/src/query/graphite/common" "github.com/m3db/m3/src/query/graphite/errors" "github.com/m3db/m3/src/query/graphite/ts" + "github.com/m3db/m3/src/query/util" ) const ( @@ -577,7 +578,9 @@ func movingAverage(ctx *common.Context, input singlePathSpec, windowSizeValue ge interval)) return nil, err } - wf = func(stepSize int) int { return int(int64(delta/time.Millisecond) / int64(stepSize)) } + wf = func(stepSize int) int { + return int(int64(delta/time.Millisecond) / int64(stepSize)) + } ws = fmt.Sprintf("%q", windowSizeValue) delta = interval case float64: @@ -635,14 +638,17 @@ func movingAverage(ctx *common.Context, input singlePathSpec, windowSizeValue ge vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps) sum := 0.0 num := 0 + firstPoint := false for i := 0; i < numSteps; i++ { - // skip if the number of points received is less than the number of points - // in the lookback window. - if offset < windowPoints { - continue - } - if i == 0 { + // NB: skip if the number of points received is less than the number + // of points in the lookback window. + if !firstPoint { + firstPoint = true for j := offset - windowPoints; j < offset; j++ { + if j < 0 { + continue + } + v := bootstrap.ValueAt(j) if !math.IsNaN(v) { sum += v @@ -650,17 +656,20 @@ func movingAverage(ctx *common.Context, input singlePathSpec, windowSizeValue ge } } } else { - prev := bootstrap.ValueAt(i + offset - windowPoints - 1) - next := bootstrap.ValueAt(i + offset - 1) - if !math.IsNaN(prev) { - sum -= prev - num-- + if i+offset-windowPoints > 0 { + prev := bootstrap.ValueAt(i + offset - windowPoints - 1) + if !math.IsNaN(prev) { + sum -= prev + num-- + } } + next := bootstrap.ValueAt(i + offset - 1) if !math.IsNaN(next) { sum += next num++ } } + if num > 0 { vals.SetValueAt(i, sum/float64(num)) } @@ -1578,6 +1587,7 @@ func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*bi if err != nil { return nil, err } + if interval <= 0 { return nil, common.ErrInvalidIntervalFormat } @@ -1609,12 +1619,22 @@ func movingMedian(ctx *common.Context, _ singlePathSpec, windowSize string) (*bi return ts.NewSeriesList(), err } window := make([]float64, windowPoints) + util.Memset(window, math.NaN()) numSteps := series.Len() offset := bootstrap.Len() - numSteps vals := ts.NewValues(ctx, series.MillisPerStep(), numSteps) for i := 0; i < numSteps; i++ { for j := i + offset - windowPoints; j < i+offset; j++ { - window[j-i-offset+windowPoints] = bootstrap.ValueAt(j) + if j < 0 || j >= bootstrap.Len() { + continue + } + + idx := j - i - offset + windowPoints + if idx < 0 || idx > len(window)-1 { + continue + } + + window[idx] = bootstrap.ValueAt(j) } nans := common.SafeSort(window) if nans < windowPoints { diff --git a/src/query/graphite/native/builtin_functions_test.go b/src/query/graphite/native/builtin_functions_test.go index 6b76608f90..6a68bf59be 100644 --- a/src/query/graphite/native/builtin_functions_test.go +++ b/src/query/graphite/native/builtin_functions_test.go @@ -28,11 +28,14 @@ import ( "github.com/m3db/m3/src/query/block" "github.com/m3db/m3/src/query/graphite/common" + "github.com/m3db/m3/src/query/graphite/context" xctx "github.com/m3db/m3/src/query/graphite/context" "github.com/m3db/m3/src/query/graphite/storage" xtest "github.com/m3db/m3/src/query/graphite/testing" "github.com/m3db/m3/src/query/graphite/ts" + xgomock "github.com/m3db/m3/src/x/test" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1983,11 +1986,11 @@ func TestHoltWintersForecast(t *testing.T) { "foo", now, 1000, - []float64{4.0, 5.0, 6.0}, + []float64{4, 5.0, 6.0}, 3 * time.Second, now, 1000, - []float64{4.0, 4.0, 4.10035}, + []float64{math.NaN(), 4.0, 4.10035}, }, } @@ -1998,6 +2001,7 @@ func TestHoltWintersForecast(t *testing.T) { input.startTime, common.NewTestSeriesValues(ctx, input.stepInMilli, input.values), ) + results, err := holtWintersForecastInternal(ctx, singlePathSpec{ Values: []*ts.Series{series}, }, input.duration) @@ -2006,6 +2010,7 @@ func TestHoltWintersForecast(t *testing.T) { Data: input.output, } require.Nil(t, err) + common.CompareOutputsAndExpected(t, input.newStep, input.newStartTime, []common.TestSeries{expected}, results.Values) } @@ -2040,10 +2045,10 @@ func TestHoltWintersConfidenceBands(t *testing.T) { 3 * time.Second, now, 1000, - []float64{0.4787, 3.7, 3.5305}, + []float64{math.NaN(), 3.7, 3.5305}, now, 1000, - []float64{2.1039, 4.3, 4.6702}, + []float64{math.NaN(), 4.3, 4.6702}, }, } @@ -2448,13 +2453,13 @@ func TestChanged(t *testing.T) { expected, results.Values) } -// TODO: re-enable -// nolint -func testMovingMedian(t *testing.T) { - now := time.Now() - engine := NewEngine( - testStorage, - ) +func TestMovingMedian(t *testing.T) { + ctrl := xgomock.NewController(t) + defer ctrl.Finish() + + store := storage.NewMockStorage(ctrl) + now := time.Now().Truncate(time.Hour) + engine := NewEngine(store) startTime := now.Add(-3 * time.Minute) endTime := now.Add(-time.Minute) ctx := common.NewContext(common.ContextOptions{Start: startTime, End: endTime, Engine: engine}) @@ -2462,6 +2467,8 @@ func testMovingMedian(t *testing.T) { stepSize := 60000 target := "movingMedian(foo.bar.q.zed, '1min')" + store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + buildTestSeriesFn(stepSize, "foo.bar.q.zed")).Times(2) expr, err := engine.Compile(target) require.NoError(t, err) res, err := expr.Execute(ctx) @@ -2474,6 +2481,125 @@ func testMovingMedian(t *testing.T) { []common.TestSeries{expected}, res.Values) } +func TestMovingAverage(t *testing.T) { + ctrl := xgomock.NewController(t) + defer ctrl.Finish() + + store := storage.NewMockStorage(ctrl) + now := time.Now().Truncate(time.Hour) + engine := NewEngine(store) + startTime := now.Add(-3 * time.Minute) + endTime := now.Add(-1 * time.Minute) + ctx := common.NewContext(common.ContextOptions{Start: startTime, End: endTime, Engine: engine}) + defer ctx.Close() + + stepSize := 60000 + target := `movingAverage(timeShift(foo.bar.g.zed, '-1d'), '1min')` + store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + buildTestSeriesFn(stepSize, "foo.bar.g.zed")).Times(2) + expr, err := engine.Compile(target) + require.NoError(t, err) + res, err := expr.Execute(ctx) + require.NoError(t, err) + expected := common.TestSeries{ + Name: `movingAverage(timeShift(foo.bar.g.zed, -1d),"1min")`, + Data: []float64{1, 1}, + } + common.CompareOutputsAndExpected(t, stepSize, startTime, + []common.TestSeries{expected}, res.Values) +} + +func TestMovingMedianInvalidLimits(t *testing.T) { + ctrl := xgomock.NewController(t) + defer ctrl.Finish() + + store := storage.NewMockStorage(ctrl) + now := time.Now().Truncate(time.Hour) + engine := NewEngine(store) + startTime := now.Add(-3 * time.Minute) + endTime := now.Add(-time.Minute) + ctx := common.NewContext(common.ContextOptions{Start: startTime, End: endTime, Engine: engine}) + defer ctx.Close() + + stepSize := 60000 + target := "movingMedian(foo.bar.q.zed, '1min')" + store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, q string, opts storage.FetchOptions) (*storage.FetchResult, error) { + startTime := opts.StartTime + ctx := context.New() + numSteps := int(opts.EndTime.Sub(startTime)/time.Millisecond) / stepSize + vals := ts.NewConstantValues(ctx, 0, numSteps, stepSize) + series := ts.NewSeries(ctx, "foo.bar.q.zed", opts.EndTime, vals) + return &storage.FetchResult{SeriesList: []*ts.Series{series}}, nil + }).Times(2) + expr, err := engine.Compile(target) + require.NoError(t, err) + res, err := expr.Execute(ctx) + require.NoError(t, err) + expected := common.TestSeries{ + Name: "movingMedian(foo.bar.q.zed,\"1min\")", + Data: []float64{math.NaN(), 0.0}, + } + common.CompareOutputsAndExpected(t, stepSize, endTime, + []common.TestSeries{expected}, res.Values) +} + +func TestMovingMismatchedLimits(t *testing.T) { + // NB: this tests the behavior when query limits do not snap exactly to data + // points. When limits do not snap exactly, the first point should be omitted. + for _, fn := range []string{"movingAverage", "movingMedian"} { + for i := time.Duration(0); i < time.Minute; i += time.Second { + testMovingAverageInvalidLimits(t, fn, i) + } + } +} + +func testMovingAverageInvalidLimits(t *testing.T, fn string, offset time.Duration) { + ctrl := xgomock.NewController(t) + defer ctrl.Finish() + + store := storage.NewMockStorage(ctrl) + now := time.Now().Truncate(time.Hour).Add(offset) + engine := NewEngine(store) + startTime := now.Add(-3 * time.Minute) + endTime := now.Add(-time.Minute) + ctx := common.NewContext(common.ContextOptions{Start: startTime, End: endTime, Engine: engine}) + defer ctx.Close() + + stepSize := 60000 + target := fmt.Sprintf(`%s(timeShift(foo.bar.*.zed, '-1d'), '1min')`, fn) + store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + buildTestSeriesFn(stepSize, "foo.bar.g.zed", "foo.bar.x.zed"), + ).Times(2) + expr, err := engine.Compile(target) + require.NoError(t, err) + res, err := expr.Execute(ctx) + require.NoError(t, err) + + expectedStart := startTime + expectedDataG := []float64{1, 1} + expectedDataX := []float64{2, 2} + + if offset > 0 { + expectedStart = expectedStart.Add(time.Minute) + expectedDataG[0] = math.NaN() + expectedDataX[0] = math.NaN() + } + + expected := []common.TestSeries{ + { + Name: fmt.Sprintf(`%s(timeShift(foo.bar.g.zed, -1d),"1min")`, fn), + Data: expectedDataG, + }, + { + Name: fmt.Sprintf(`%s(timeShift(foo.bar.x.zed, -1d),"1min")`, fn), + Data: expectedDataX, + }, + } + + common.CompareOutputsAndExpected(t, stepSize, expectedStart, expected, res.Values) +} + func TestLegendValue(t *testing.T) { ctx := common.NewTestContext() defer ctx.Close() @@ -2680,13 +2806,13 @@ func TestTimeFunction(t *testing.T) { []common.TestSeries{expected}, results.Values) } -// TODO arnikola reenable -// nolint -func testTimeShift(t *testing.T) { - now := time.Now() - engine := NewEngine( - testStorage, - ) +func TestTimeShift(t *testing.T) { + ctrl := xgomock.NewController(t) + defer ctrl.Finish() + + store := storage.NewMockStorage(ctrl) + now := time.Now().Truncate(time.Hour) + engine := NewEngine(store) startTime := now.Add(-3 * time.Minute) endTime := now.Add(-time.Minute) ctx := common.NewContext(common.ContextOptions{ @@ -2698,6 +2824,10 @@ func testTimeShift(t *testing.T) { stepSize := 60000 target := "timeShift(foo.bar.q.zed, '1min', false)" + + store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + buildTestSeriesFn(stepSize, "foo.bar.q.zed")) + expr, err := engine.Compile(target) require.NoError(t, err) res, err := expr.Execute(ctx) diff --git a/src/query/graphite/native/engine_test.go b/src/query/graphite/native/engine_test.go index d9e822a143..78cf64034a 100644 --- a/src/query/graphite/native/engine_test.go +++ b/src/query/graphite/native/engine_test.go @@ -24,30 +24,62 @@ import ( "testing" "time" - "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/query/graphite/common" + "github.com/m3db/m3/src/query/graphite/context" "github.com/m3db/m3/src/query/graphite/storage" - xtime "github.com/m3db/m3/src/x/time" + "github.com/m3db/m3/src/query/graphite/ts" + xgomock "github.com/m3db/m3/src/x/test" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -// nolint type queryTestResult struct { - name string - max float64 + series string + expected string + max float64 } -// nolint type queryTest struct { query string ordered bool results []queryTestResult } +func snapStartToStepSize(t time.Time, stepSize int) time.Time { + step := time.Duration(stepSize) * time.Millisecond + if truncated := t.Truncate(step); truncated.Before(t) { + return t.Add(step) + } + + return t +} + +func testSeries(name string, stepSize int, val float64, opts storage.FetchOptions) *ts.Series { + ctx := context.New() + numSteps := int(opts.EndTime.Sub(opts.StartTime)/time.Millisecond) / stepSize + vals := ts.NewConstantValues(ctx, val, numSteps, stepSize) + firstPoint := snapStartToStepSize(opts.StartTime, stepSize) + return ts.NewSeries(ctx, name, firstPoint, vals) +} + +func buildTestSeriesFn( + stepSize int, + id ...string, +) func(context.Context, string, storage.FetchOptions) (*storage.FetchResult, error) { + return func(_ context.Context, q string, opts storage.FetchOptions) (*storage.FetchResult, error) { + series := make([]*ts.Series, 0, len(id)) + for _, name := range id { + val := testValues[name] + series = append(series, testSeries(name, stepSize, val, opts)) + } + + return &storage.FetchResult{SeriesList: series}, nil + } +} + var ( - // nolint testValues = map[string]float64{ "foo.bar.q.zed": 0, "foo.bar.g.zed": 1, @@ -57,42 +89,56 @@ var ( "chicago.cake": 5, "los_angeles.cake": 6, } - - // nolint - testPolicy = policy.NewStoragePolicy(10*time.Second, xtime.Second, 48*time.Hour) - // testTSDB = makeTSDB(testPolicy) - // nolint - testStorage storage.Storage //= nil - // local.NewLocalStorage(local.Options{ - // Database: testTSDB, - // Workers: workers, - // Scope: metrics.None, - // PolicyResolver: resolver.NewStaticResolver(testIndex, testPolicy), - // }) ) -// TODO arnikola reenable -// nolint -func testExecute(t *testing.T) { - engine := NewEngine( - testStorage, - ) +func newTestStorage(ctrl *gomock.Controller) storage.Storage { + store := storage.NewMockStorage(ctrl) + store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn( + func( + ctx context.Context, + query string, + opts storage.FetchOptions, + ) (*storage.FetchResult, error) { + return &storage.FetchResult{}, nil + }) + + return store +} + +func TestExecute(t *testing.T) { + ctrl := xgomock.NewController(t) + defer ctrl.Finish() + + store := storage.NewMockStorage(ctrl) + engine := NewEngine(store) + tests := []queryTest{ - {"foo.bar.q.zed", true, []queryTestResult{{"foo.bar.q.zed", 0}}}, + {"foo.bar.q.zed", true, []queryTestResult{{"foo.bar.q.zed", "foo.bar.q.zed", 0}}}, {"foo.bar.*.zed", false, []queryTestResult{ - {"foo.bar.q.zed", 0}, - {"foo.bar.g.zed", 1}, - {"foo.bar.x.zed", 2}}, + {"foo.bar.q.zed", "foo.bar.q.zed", 0}, + {"foo.bar.g.zed", "foo.bar.g.zed", 1}, + {"foo.bar.x.zed", "foo.bar.x.zed", 2}}, }, {"sortByName(aliasByNode(foo.bar.*.zed, 0, 2))", true, []queryTestResult{ - {"foo.g", 1}, - {"foo.q", 0}, - {"foo.x", 2}, + {"foo.bar.g.zed", "foo.g", 1}, + {"foo.bar.q.zed", "foo.q", 0}, + {"foo.bar.x.zed", "foo.x", 2}, }}, } ctx := common.NewContext(common.ContextOptions{Start: time.Now().Add(-1 * time.Hour), End: time.Now(), Engine: engine}) for _, test := range tests { + + stepSize := 60000 + queries := make([]string, 0, len(test.results)) + for _, r := range test.results { + queries = append(queries, r.series) + } + + store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + buildTestSeriesFn(stepSize, queries...)) + expr, err := engine.Compile(test.query) require.Nil(t, err) @@ -102,7 +148,7 @@ func testExecute(t *testing.T) { for i := range test.results { if test.ordered { - assert.Equal(t, test.results[i].name, results.Values[i].Name(), + assert.Equal(t, test.results[i].expected, results.Values[i].Name(), "invalid result %d for %s", i, test.query) assert.Equal(t, test.results[i].max, results.Values[i].CalcStatistics().Max, "invalid result %d for %s", i, test.query) @@ -111,12 +157,13 @@ func testExecute(t *testing.T) { } } -// TODO arnikola reenable -// nolint -func testTracing(t *testing.T) { - engine := NewEngine( - testStorage, - ) +func TestTracing(t *testing.T) { + ctrl := xgomock.NewController(t) + defer ctrl.Finish() + + store := storage.NewMockStorage(ctrl) + + engine := NewEngine(store) var traces []common.Trace ctx := common.NewContext(common.ContextOptions{Start: time.Now().Add(-1 * time.Hour), End: time.Now(), Engine: engine}) @@ -124,6 +171,11 @@ func testTracing(t *testing.T) { traces = append(traces, t) } + stepSize := 60000 + store.EXPECT().FetchByQuery(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + buildTestSeriesFn(stepSize, "foo.bar.q.zed", "foo.bar.g.zed", + "foo.bar.x.zed")) + expr, err := engine.Compile("groupByNode(sortByName(aliasByNode(foo.bar.*.zed, 0, 2)), 0, 'sumSeries')") require.NoError(t, err) @@ -155,21 +207,3 @@ func testTracing(t *testing.T) { assert.Equal(t, expected.Outputs, trace.Outputs, "incorrect outputs for trace %d", i) } } - -// func makeTSDB(policy policy.StoragePolicy) tsdb.Database { -// var ( -// now = time.Now().Truncate(time.Second * 10) -// testTSDB = nil //FIXME mocktsdb.New() -// ctx = context.New() -// ) - -// defer ctx.Close() - -// for name, val := range testValues { -// for t := now.Add(-time.Hour * 2); t.Before(now.Add(time.Hour)); t = t.Add(time.Second * 10) { -// testTSDB.WriteRaw(ctx, name, t, val, policy) -// } -// } - -// return testIndex, testTSDB -// } diff --git a/src/query/graphite/storage/storage_mock.go b/src/query/graphite/storage/storage_mock.go new file mode 100644 index 0000000000..bfe66c6d2e --- /dev/null +++ b/src/query/graphite/storage/storage_mock.go @@ -0,0 +1,71 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/m3db/m3/src/query/graphite/storage (interfaces: Storage) + +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package storage is a generated GoMock package. +package storage + +import ( + "reflect" + + "github.com/m3db/m3/src/query/graphite/context" + + "github.com/golang/mock/gomock" +) + +// MockStorage is a mock of Storage interface +type MockStorage struct { + ctrl *gomock.Controller + recorder *MockStorageMockRecorder +} + +// MockStorageMockRecorder is the mock recorder for MockStorage +type MockStorageMockRecorder struct { + mock *MockStorage +} + +// NewMockStorage creates a new mock instance +func NewMockStorage(ctrl *gomock.Controller) *MockStorage { + mock := &MockStorage{ctrl: ctrl} + mock.recorder = &MockStorageMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockStorage) EXPECT() *MockStorageMockRecorder { + return m.recorder +} + +// FetchByQuery mocks base method +func (m *MockStorage) FetchByQuery(arg0 context.Context, arg1 string, arg2 FetchOptions) (*FetchResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchByQuery", arg0, arg1, arg2) + ret0, _ := ret[0].(*FetchResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchByQuery indicates an expected call of FetchByQuery +func (mr *MockStorageMockRecorder) FetchByQuery(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchByQuery", reflect.TypeOf((*MockStorage)(nil).FetchByQuery), arg0, arg1, arg2) +} diff --git a/src/query/graphite/ts/series.go b/src/query/graphite/ts/series.go index beacee25b1..2b74713341 100644 --- a/src/query/graphite/ts/series.go +++ b/src/query/graphite/ts/series.go @@ -136,7 +136,12 @@ func (b *Series) Resolution() time.Duration { // StepAtTime returns the step within the block containing the given time func (b *Series) StepAtTime(t time.Time) int { - return int(t.UnixNano()/1000000-b.startTime.UnixNano()/1000000) / b.vals.MillisPerStep() + step := int(t.UnixNano()/1000000-b.startTime.UnixNano()/1000000) / b.vals.MillisPerStep() + if step < 0 { + return 0 + } + + return step } // StartTimeForStep returns the time at which the given step starts diff --git a/src/query/models/tags.go b/src/query/models/tags.go index 15dd124c42..17595e9396 100644 --- a/src/query/models/tags.go +++ b/src/query/models/tags.go @@ -275,8 +275,8 @@ func (t Tags) Validate() error { } if !tags.Less(i-1, i) { - return fmt.Errorf("tags out of order: '%s' appears after '%s'", - tags.Tags[i-1].Name, tags.Tags[i].Name) + return fmt.Errorf("graphite tags out of order: '%s' appears after"+ + " '%s', tags: %v", tags.Tags[i-1].Name, tags.Tags[i].Name, tags.Tags) } prev := tags.Tags[i-1] @@ -303,8 +303,8 @@ func (t Tags) Validate() error { prev := t.Tags[i-1] cmp := bytes.Compare(prev.Name, t.Tags[i].Name) if cmp > 0 { - return fmt.Errorf("tags out of order: '%s' appears after '%s'", - prev.Name, tag.Name) + return fmt.Errorf("tags out of order: '%s' appears after '%s', tags: %v", + prev.Name, tag.Name, t.Tags) } if cmp == 0 { return fmt.Errorf("tags duplicate: '%s' appears more than once in '%s'",