From 8f31691122c19ae24c697fd10ac2e3fc95ef6b3f Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Tue, 30 Apr 2019 14:02:02 -0400 Subject: [PATCH 1/8] [db] Add an option to use db writes in index mode --- src/dbnode/storage/namespace.go | 20 +++- src/dbnode/storage/namespace/index_options.go | 15 ++- src/dbnode/storage/namespace/types.go | 6 + src/dbnode/storage/namespace_test.go | 108 +++++++++++------- src/dbnode/storage/series/buffer.go | 11 ++ src/dbnode/storage/series/buffer_test.go | 40 +++++++ src/dbnode/storage/series/types.go | 1 + 7 files changed, 155 insertions(+), 46 deletions(-) diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 289b9802b4..3abfee339f 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -101,6 +101,7 @@ type dbNamespace struct { sync.RWMutex closed bool + useAsIndex bool shutdownCh chan struct{} id ident.ID shardSet sharding.ShardSet @@ -285,12 +286,16 @@ func newDatabaseNamespace( opts Options, ) (databaseNamespace, error) { var ( - nopts = metadata.Options() - id = metadata.ID() + nopts = metadata.Options() + id = metadata.ID() + useAsIndex bool ) if !nopts.WritesToCommitLog() { commitLogWriter = commitLogWriteNoOp } + if nopts.IndexOptions().UseAsIndex() { + useAsIndex = true + } iops := opts.InstrumentOptions() logger := iops.Logger().With(zap.String("namespace", id.String())) @@ -335,6 +340,7 @@ func newDatabaseNamespace( opts: opts, metadata: metadata, nopts: nopts, + useAsIndex: useAsIndex, schemaRegistry: metadata.Options().SchemaRegistry(), seriesOpts: seriesOpts, nowFn: opts.ClockOptions().NowFn(), @@ -591,8 +597,11 @@ func (n *dbNamespace) Write( n.metrics.write.ReportError(n.nowFn().Sub(callStart)) return ts.Series{}, false, err } + opts := series.WriteOptions{ + UseAsIndex: n.useAsIndex, + } series, wasWritten, err := shard.Write(ctx, id, timestamp, - value, unit, annotation, series.WriteOptions{}) + value, unit, annotation, opts) n.metrics.write.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) return series, wasWritten, err } @@ -616,8 +625,11 @@ func (n *dbNamespace) WriteTagged( n.metrics.writeTagged.ReportError(n.nowFn().Sub(callStart)) return ts.Series{}, false, err } + opts := series.WriteOptions{ + UseAsIndex: n.useAsIndex, + } series, wasWritten, err := shard.WriteTagged(ctx, id, tags, timestamp, - value, unit, annotation, series.WriteOptions{}) + value, unit, annotation, opts) n.metrics.writeTagged.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) return series, wasWritten, err } diff --git a/src/dbnode/storage/namespace/index_options.go b/src/dbnode/storage/namespace/index_options.go index aca2e4c8a5..96cc75a1ed 100644 --- a/src/dbnode/storage/namespace/index_options.go +++ b/src/dbnode/storage/namespace/index_options.go @@ -33,8 +33,9 @@ var ( ) type indexOpts struct { - enabled bool - blockSize time.Duration + enabled bool + useAsIndex bool + blockSize time.Duration } // NewIndexOptions returns a new IndexOptions. @@ -69,3 +70,13 @@ func (i *indexOpts) SetBlockSize(value time.Duration) IndexOptions { func (i *indexOpts) BlockSize() time.Duration { return i.blockSize } + +func (i *indexOpts) SetUseAsIndex(b bool) IndexOptions { + io := *i + io.useAsIndex = b + return &io +} + +func (i *indexOpts) UseAsIndex() bool { + return i.useAsIndex +} diff --git a/src/dbnode/storage/namespace/types.go b/src/dbnode/storage/namespace/types.go index dac4448174..b6b213cd0c 100644 --- a/src/dbnode/storage/namespace/types.go +++ b/src/dbnode/storage/namespace/types.go @@ -114,6 +114,12 @@ type IndexOptions interface { // BlockSize returns the block size. BlockSize() time.Duration + + // SetUseAsIndex sets whether or not this namespace is being used as an index. + SetUseAsIndex(b bool) IndexOptions + + // UseAsIndex returns whether this namespace is being used as an index. + UseAsIndex() bool } // SchemaDescr describes the schema for a complex type value. diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 98619d51a3..b48288bfe3 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -38,6 +38,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/storage/repair" + "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" @@ -97,7 +98,10 @@ func newTestNamespaceWithIDOpts( return ns.(*dbNamespace), closer } -func newTestNamespaceWithIndex(t *testing.T, index namespaceIndex) (*dbNamespace, closerFn) { +func newTestNamespaceWithIndex( + t *testing.T, + index namespaceIndex, +) (*dbNamespace, closerFn) { ns, closer := newTestNamespace(t) if index != nil { ns.reverseIndex = index @@ -105,6 +109,18 @@ func newTestNamespaceWithIndex(t *testing.T, index namespaceIndex) (*dbNamespace return ns, closer } +func newTestNamespaceWithUseAsIndex( + t *testing.T, + index namespaceIndex, + useAsIndex bool, +) (*dbNamespace, closerFn) { + nsOpts := defaultTestNs1Opts + nsOpts = nsOpts.SetIndexOptions(nsOpts.IndexOptions().SetUseAsIndex(useAsIndex)) + ns, closer := newTestNamespaceWithIDOpts(t, defaultTestNs1ID, nsOpts) + ns.reverseIndex = index + return ns, closer +} + func TestNamespaceName(t *testing.T) { ns, closer := newTestNamespace(t) defer closer() @@ -180,23 +196,28 @@ func TestNamespaceWriteShardOwned(t *testing.T) { unit := xtime.Second ant := []byte(nil) - ns, closer := newTestNamespace(t) - defer closer() - shard := NewMockdatabaseShard(ctrl) - shard.EXPECT().Write(ctx, id, now, val, unit, ant, gomock.Any()). - Return(ts.Series{}, true, nil).Times(1) - shard.EXPECT().Write(ctx, id, now, val, unit, ant, gomock.Any()). - Return(ts.Series{}, false, nil).Times(1) + for _, useAsIndex := range []bool{true, false} { + ns, closer := newTestNamespaceWithUseAsIndex(t, nil, useAsIndex) + defer closer() + shard := NewMockdatabaseShard(ctrl) + opts := series.WriteOptions{ + UseAsIndex: useAsIndex, + } + shard.EXPECT().Write(ctx, id, now, val, unit, ant, opts). + Return(ts.Series{}, true, nil).Times(1) + shard.EXPECT().Write(ctx, id, now, val, unit, ant, opts). + Return(ts.Series{}, false, nil).Times(1) - ns.shards[testShardIDs[0].ID()] = shard + ns.shards[testShardIDs[0].ID()] = shard - _, wasWritten, err := ns.Write(ctx, id, now, val, unit, ant) - require.NoError(t, err) - require.True(t, wasWritten) + _, wasWritten, err := ns.Write(ctx, id, now, val, unit, ant) + require.NoError(t, err) + require.True(t, wasWritten) - _, wasWritten, err = ns.Write(ctx, id, now, val, unit, ant) - require.NoError(t, err) - require.False(t, wasWritten) + _, wasWritten, err = ns.Write(ctx, id, now, val, unit, ant) + require.NoError(t, err) + require.False(t, wasWritten) + } } func TestNamespaceReadEncodedShardNotOwned(t *testing.T) { @@ -1064,34 +1085,41 @@ func TestNamespaceIndexInsert(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - idx := NewMocknamespaceIndex(ctrl) - ns, closer := newTestNamespaceWithIndex(t, idx) - defer closer() + for _, useAsIndex := range []bool{true, false} { + idx := NewMocknamespaceIndex(ctrl) + ns, closer := newTestNamespaceWithUseAsIndex(t, idx, useAsIndex) + ns.reverseIndex = idx + defer closer() - ctx := context.NewContext() - now := time.Now() + ctx := context.NewContext() + now := time.Now() - shard := NewMockdatabaseShard(ctrl) - shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, - now, 1.0, xtime.Second, nil, gomock.Any()).Return(ts.Series{}, true, nil) - shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, - now, 1.0, xtime.Second, nil, gomock.Any()).Return(ts.Series{}, false, nil) - - ns.shards[testShardIDs[0].ID()] = shard - - _, wasWritten, err := ns.WriteTagged(ctx, ident.StringID("a"), - ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) - require.NoError(t, err) - require.True(t, wasWritten) - - _, wasWritten, err = ns.WriteTagged(ctx, ident.StringID("a"), - ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) - require.NoError(t, err) - require.False(t, wasWritten) + shard := NewMockdatabaseShard(ctrl) - shard.EXPECT().Close() - idx.EXPECT().Close().Return(nil) - require.NoError(t, ns.Close()) + opts := series.WriteOptions{ + UseAsIndex: useAsIndex, + } + shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, + now, 1.0, xtime.Second, nil, opts).Return(ts.Series{}, true, nil) + shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, + now, 1.0, xtime.Second, nil, opts).Return(ts.Series{}, false, nil) + + ns.shards[testShardIDs[0].ID()] = shard + + _, wasWritten, err := ns.WriteTagged(ctx, ident.StringID("a"), + ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) + require.NoError(t, err) + require.True(t, wasWritten) + + _, wasWritten, err = ns.WriteTagged(ctx, ident.StringID("a"), + ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) + require.NoError(t, err) + require.False(t, wasWritten) + + shard.EXPECT().Close() + idx.EXPECT().Close().Return(nil) + require.NoError(t, ns.Close()) + } } func TestNamespaceIndexQuery(t *testing.T) { diff --git a/src/dbnode/storage/series/buffer.go b/src/dbnode/storage/series/buffer.go index f870f30afe..0c2fd7afca 100644 --- a/src/dbnode/storage/series/buffer.go +++ b/src/dbnode/storage/series/buffer.go @@ -60,6 +60,10 @@ const ( // enabled to see if this is a sane number. evictedTimesArraySize = 8 writableBucketVer = 0 + + // NB: this is the value all points will be truncated to. + truncatedValue float64 = 0 + truncatedUnit = xtime.Second ) type databaseBuffer interface { @@ -254,6 +258,13 @@ func (b *dbBuffer) Write( blockStart := timestamp.Truncate(b.blockSize) buckets := b.bucketVersionsAtCreate(blockStart) b.putBucketVersionsInCache(buckets) + + if wOpts.UseAsIndex { + timestamp = blockStart + value = truncatedValue + unit = truncatedUnit + } + return buckets.write(timestamp, value, unit, annotation, writeType) } diff --git a/src/dbnode/storage/series/buffer_test.go b/src/dbnode/storage/series/buffer_test.go index e0c2ee0f84..8f7c34d5a8 100644 --- a/src/dbnode/storage/series/buffer_test.go +++ b/src/dbnode/storage/series/buffer_test.go @@ -475,6 +475,46 @@ func TestBufferBucketDuplicatePointsNotWrittenButUpserted(t *testing.T) { assertSegmentValuesEqual(t, expected, []xio.SegmentReader{stream}, opts) } +func TestIndexedBufferWriteOnlyWritesSinglePoint(t *testing.T) { + opts := newBufferTestOptions() + rops := opts.RetentionOptions() + curr := time.Now().Truncate(rops.BlockSize()) + opts = opts.SetClockOptions(opts.ClockOptions().SetNowFn(func() time.Time { + return curr + })) + buffer := newDatabaseBuffer().(*dbBuffer) + buffer.Reset(opts) + + data := []value{ + {curr.Add(secs(1)), 1, xtime.Second, nil}, + {curr.Add(secs(2)), 2, xtime.Second, nil}, + {curr.Add(secs(3)), 3, xtime.Second, nil}, + } + + for i, v := range data { + ctx := context.NewContext() + wasWritten, err := buffer.Write(ctx, v.timestamp, v.value, v.unit, + v.annotation, WriteOptions{UseAsIndex: true}) + require.NoError(t, err) + expectedWrite := i == 0 + require.Equal(t, expectedWrite, wasWritten) + ctx.Close() + } + + ctx := context.NewContext() + defer ctx.Close() + + results, err := buffer.ReadEncoded(ctx, timeZero, timeDistantFuture) + assert.NoError(t, err) + assert.NotNil(t, results) + + ex := []value{ + {curr, truncatedValue, truncatedUnit, nil}, + } + + assertValuesEqual(t, ex, results, opts) +} + func TestBufferFetchBlocks(t *testing.T) { b, opts, expected := newTestBufferBucketsWithData(t) ctx := opts.ContextPool().Get() diff --git a/src/dbnode/storage/series/types.go b/src/dbnode/storage/series/types.go index 7e4f65e6f3..0ef48626fc 100644 --- a/src/dbnode/storage/series/types.go +++ b/src/dbnode/storage/series/types.go @@ -353,4 +353,5 @@ const BootstrapWriteType = WarmWrite // WriteOptions provides a set of options for a write. type WriteOptions struct { SchemaDesc namespace.SchemaDescr + UseAsIndex bool } From ef8059eb97c14351a5a5f5c26df687f827905384 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Tue, 30 Apr 2019 14:33:34 -0400 Subject: [PATCH 2/8] Update mock --- .../storage/namespace/namespace_mock.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/dbnode/storage/namespace/namespace_mock.go b/src/dbnode/storage/namespace/namespace_mock.go index cebd6c3ac3..907ffb7e65 100644 --- a/src/dbnode/storage/namespace/namespace_mock.go +++ b/src/dbnode/storage/namespace/namespace_mock.go @@ -460,6 +460,34 @@ func (mr *MockIndexOptionsMockRecorder) BlockSize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockSize", reflect.TypeOf((*MockIndexOptions)(nil).BlockSize)) } +// SetUseAsIndex mocks base method +func (m *MockIndexOptions) SetUseAsIndex(b bool) IndexOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetUseAsIndex", b) + ret0, _ := ret[0].(IndexOptions) + return ret0 +} + +// SetUseAsIndex indicates an expected call of SetUseAsIndex +func (mr *MockIndexOptionsMockRecorder) SetUseAsIndex(b interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetUseAsIndex", reflect.TypeOf((*MockIndexOptions)(nil).SetUseAsIndex), b) +} + +// UseAsIndex mocks base method +func (m *MockIndexOptions) UseAsIndex() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UseAsIndex") + ret0, _ := ret[0].(bool) + return ret0 +} + +// UseAsIndex indicates an expected call of UseAsIndex +func (mr *MockIndexOptionsMockRecorder) UseAsIndex() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UseAsIndex", reflect.TypeOf((*MockIndexOptions)(nil).UseAsIndex)) +} + // MockSchemaDescr is a mock of SchemaDescr interface type MockSchemaDescr struct { ctrl *gomock.Controller From 145ad6d75ceb7b3708e5ffaf0f8ec6edc76c364b Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Wed, 1 May 2019 11:33:36 -0400 Subject: [PATCH 3/8] PR response; updating truncation options --- src/cmd/services/m3dbnode/config/config.go | 37 +++++++- .../services/m3dbnode/config/config_test.go | 4 + src/dbnode/storage/namespace/index_options.go | 30 +++++-- .../storage/namespace/index_options_test.go | 16 ++++ .../storage/namespace/namespace_mock.go | 82 +++++++++++++++--- src/dbnode/storage/namespace/options.go | 11 +++ src/dbnode/storage/namespace/truncate_type.go | 86 +++++++++++++++++++ .../storage/namespace/truncate_type_test.go | 66 ++++++++++++++ src/dbnode/storage/namespace/types.go | 21 ++++- src/dbnode/storage/options.go | 10 +++ 10 files changed, 337 insertions(+), 26 deletions(-) create mode 100644 src/dbnode/storage/namespace/truncate_type.go create mode 100644 src/dbnode/storage/namespace/truncate_type_test.go diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 90d67ffa7a..6c005ed91b 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -32,6 +32,7 @@ import ( coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/environment" + "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/x/config/hostid" "github.com/m3db/m3/src/x/instrument" xlog "github.com/m3db/m3/src/x/log" @@ -70,6 +71,9 @@ type DBConfiguration struct { // Index configuration. Index IndexConfiguration `yaml:"index"` + // Transforms configuration. + Transforms TransformConfiguration `yaml:"transforms"` + // Logging configuration. Logging xlog.Configuration `yaml:"logging"` @@ -173,8 +177,39 @@ type IndexConfiguration struct { // MaxQueryIDsConcurrency controls the maximum number of outstanding QueryID // requests that can be serviced concurrently. Limiting the concurrency is // important to prevent index queries from overloading the database entirely - // as they are very CPU-intensive (regex and FST matching.) + // as they are very CPU-intensive (regex and FST matching). MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"` + + // ForwardIndexProbability determines the likelihood that an incoming write is + // written to the next block, when arriving close to the block boundary. + // + // NB: this is an optimization which lessens pressure on the index around + // block boundaries by eagerly writing the series to the next block + // preemptively. + ForwardIndexProbability float64 `yaml:"forwardIndexProbability" validate:"min=0.0,max=1.0"` + + // ForwardIndexThreshold determines the threshold for forward writes, as a + // fraction of the given namespace's bufferFuture. + // + // NB: this is an optimization which lessens pressure on the index around + // block boundaries by eagerly writing the series to the next block + // preemptively. + ForwardIndexThreshold float64 `yaml:"forwardIndexThreshold" validate:"min=0.0,max=1.0"` +} + +// TransformConfiguration contains configuration options that can transform +// incoming writes. +type TransformConfiguration struct { + // TruncateBy determines what type of truncatation + TruncateBy namespace.TruncateType `yaml:"truncateBy"` +} + +func (c *TransformConfiguration) Validate() error { + if c == nil { + return nil + } + + return c.TruncateBy.Validate() } // TickConfiguration is the tick configuration for background processing of diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 1ee0a1bde5..e164af4096 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -331,6 +331,10 @@ func TestConfiguration(t *testing.T) { expected := `db: index: maxQueryIDsConcurrency: 0 + forwardIndexProbability: 0 + forwardIndexThreshold: 0 + transforms: + truncateBy: 0 logging: file: /var/log/m3dbnode.log level: info diff --git a/src/dbnode/storage/namespace/index_options.go b/src/dbnode/storage/namespace/index_options.go index 96cc75a1ed..1bc5c1cee3 100644 --- a/src/dbnode/storage/namespace/index_options.go +++ b/src/dbnode/storage/namespace/index_options.go @@ -33,9 +33,11 @@ var ( ) type indexOpts struct { - enabled bool - useAsIndex bool - blockSize time.Duration + enabled bool + blockSize time.Duration + + forwardIndexThreshold float64 + forwardIndexProbability float64 } // NewIndexOptions returns a new IndexOptions. @@ -48,7 +50,9 @@ func NewIndexOptions() IndexOptions { func (i *indexOpts) Equal(value IndexOptions) bool { return i.Enabled() == value.Enabled() && - i.BlockSize() == value.BlockSize() + i.BlockSize() == value.BlockSize() && + i.ForwardIndexProbability() == value.ForwardIndexProbability() && + i.ForwardIndexThreshold() == value.ForwardIndexThreshold() } func (i *indexOpts) SetEnabled(value bool) IndexOptions { @@ -71,12 +75,22 @@ func (i *indexOpts) BlockSize() time.Duration { return i.blockSize } -func (i *indexOpts) SetUseAsIndex(b bool) IndexOptions { +func (i *indexOpts) SetForwardIndexProbability(value float64) IndexOptions { + io := *i + io.forwardIndexProbability = value + return &io +} + +func (i *indexOpts) ForwardIndexProbability() float64 { + return i.forwardIndexProbability +} + +func (i *indexOpts) SetForwardIndexThreshold(value float64) IndexOptions { io := *i - io.useAsIndex = b + io.forwardIndexThreshold = value return &io } -func (i *indexOpts) UseAsIndex() bool { - return i.useAsIndex +func (i *indexOpts) ForwardIndexThreshold() float64 { + return i.forwardIndexThreshold } diff --git a/src/dbnode/storage/namespace/index_options_test.go b/src/dbnode/storage/namespace/index_options_test.go index 633aadc12f..5c8b94be81 100644 --- a/src/dbnode/storage/namespace/index_options_test.go +++ b/src/dbnode/storage/namespace/index_options_test.go @@ -33,6 +33,10 @@ func TestIndexOptionsEqual(t *testing.T) { require.False(t, opts.SetEnabled(true).Equal(opts.SetEnabled(false))) require.False(t, opts.SetBlockSize(time.Hour).Equal( opts.SetBlockSize(time.Hour*2))) + require.False(t, opts.SetForwardIndexProbability(0.0). + Equal(opts.SetForwardIndexProbability(0.1))) + require.False(t, opts.SetForwardIndexThreshold(0.0). + Equal(opts.SetForwardIndexThreshold(0.1))) } func TestIndexOptionsEnabled(t *testing.T) { @@ -45,3 +49,15 @@ func TestIndexOptionsBlockSize(t *testing.T) { opts := NewIndexOptions() require.Equal(t, time.Hour, opts.SetBlockSize(time.Hour).BlockSize()) } + +func TestIndexOptionsForwardIndexThreshold(t *testing.T) { + threshold := 0.3 + opts := NewIndexOptions().SetForwardIndexThreshold(threshold) + require.Equal(t, threshold, opts.ForwardIndexThreshold()) +} + +func TestIndexOptionsForwardIndexProbability(t *testing.T) { + threshold := 0.3 + opts := NewIndexOptions().SetForwardIndexProbability(threshold) + require.Equal(t, threshold, opts.ForwardIndexProbability()) +} diff --git a/src/dbnode/storage/namespace/namespace_mock.go b/src/dbnode/storage/namespace/namespace_mock.go index 907ffb7e65..18fd1117d8 100644 --- a/src/dbnode/storage/namespace/namespace_mock.go +++ b/src/dbnode/storage/namespace/namespace_mock.go @@ -339,6 +339,34 @@ func (mr *MockOptionsMockRecorder) IndexOptions() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexOptions", reflect.TypeOf((*MockOptions)(nil).IndexOptions)) } +// SetTruncateType mocks base method +func (m *MockOptions) SetTruncateType(value TruncateType) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetTruncateType", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetTruncateType indicates an expected call of SetTruncateType +func (mr *MockOptionsMockRecorder) SetTruncateType(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTruncateType", reflect.TypeOf((*MockOptions)(nil).SetTruncateType), value) +} + +// TruncateType mocks base method +func (m *MockOptions) TruncateType() TruncateType { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TruncateType") + ret0, _ := ret[0].(TruncateType) + return ret0 +} + +// TruncateType indicates an expected call of TruncateType +func (mr *MockOptionsMockRecorder) TruncateType() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TruncateType", reflect.TypeOf((*MockOptions)(nil).TruncateType)) +} + // SetSchemaRegistry mocks base method func (m *MockOptions) SetSchemaRegistry(value SchemaRegistry) Options { m.ctrl.T.Helper() @@ -460,32 +488,60 @@ func (mr *MockIndexOptionsMockRecorder) BlockSize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockSize", reflect.TypeOf((*MockIndexOptions)(nil).BlockSize)) } -// SetUseAsIndex mocks base method -func (m *MockIndexOptions) SetUseAsIndex(b bool) IndexOptions { +// SetForwardIndexProbability mocks base method +func (m *MockIndexOptions) SetForwardIndexProbability(value float64) IndexOptions { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetUseAsIndex", b) + ret := m.ctrl.Call(m, "SetForwardIndexProbability", value) ret0, _ := ret[0].(IndexOptions) return ret0 } -// SetUseAsIndex indicates an expected call of SetUseAsIndex -func (mr *MockIndexOptionsMockRecorder) SetUseAsIndex(b interface{}) *gomock.Call { +// SetForwardIndexProbability indicates an expected call of SetForwardIndexProbability +func (mr *MockIndexOptionsMockRecorder) SetForwardIndexProbability(value interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetUseAsIndex", reflect.TypeOf((*MockIndexOptions)(nil).SetUseAsIndex), b) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetForwardIndexProbability", reflect.TypeOf((*MockIndexOptions)(nil).SetForwardIndexProbability), value) } -// UseAsIndex mocks base method -func (m *MockIndexOptions) UseAsIndex() bool { +// ForwardIndexProbability mocks base method +func (m *MockIndexOptions) ForwardIndexProbability() float64 { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UseAsIndex") - ret0, _ := ret[0].(bool) + ret := m.ctrl.Call(m, "ForwardIndexProbability") + ret0, _ := ret[0].(float64) + return ret0 +} + +// ForwardIndexProbability indicates an expected call of ForwardIndexProbability +func (mr *MockIndexOptionsMockRecorder) ForwardIndexProbability() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardIndexProbability", reflect.TypeOf((*MockIndexOptions)(nil).ForwardIndexProbability)) +} + +// SetForwardIndexThreshold mocks base method +func (m *MockIndexOptions) SetForwardIndexThreshold(value float64) IndexOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetForwardIndexThreshold", value) + ret0, _ := ret[0].(IndexOptions) + return ret0 +} + +// SetForwardIndexThreshold indicates an expected call of SetForwardIndexThreshold +func (mr *MockIndexOptionsMockRecorder) SetForwardIndexThreshold(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetForwardIndexThreshold", reflect.TypeOf((*MockIndexOptions)(nil).SetForwardIndexThreshold), value) +} + +// ForwardIndexThreshold mocks base method +func (m *MockIndexOptions) ForwardIndexThreshold() float64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForwardIndexThreshold") + ret0, _ := ret[0].(float64) return ret0 } -// UseAsIndex indicates an expected call of UseAsIndex -func (mr *MockIndexOptionsMockRecorder) UseAsIndex() *gomock.Call { +// ForwardIndexThreshold indicates an expected call of ForwardIndexThreshold +func (mr *MockIndexOptionsMockRecorder) ForwardIndexThreshold() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UseAsIndex", reflect.TypeOf((*MockIndexOptions)(nil).UseAsIndex)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardIndexThreshold", reflect.TypeOf((*MockIndexOptions)(nil).ForwardIndexThreshold)) } // MockSchemaDescr is a mock of SchemaDescr interface diff --git a/src/dbnode/storage/namespace/options.go b/src/dbnode/storage/namespace/options.go index 849237dfad..0ce744083f 100644 --- a/src/dbnode/storage/namespace/options.go +++ b/src/dbnode/storage/namespace/options.go @@ -63,6 +63,7 @@ type options struct { cleanupEnabled bool repairEnabled bool coldWritesEnabled bool + truncateType TruncateType retentionOpts retention.Options indexOpts IndexOptions schemaReg SchemaRegistry @@ -212,6 +213,16 @@ func (o *options) IndexOptions() IndexOptions { return o.indexOpts } +func (o *options) SetTruncateType(t TruncateType) Options { + opts := *o + opts.truncateType = t + return &opts +} + +func (o *options) TruncateType() TruncateType { + return o.truncateType +} + func (o *options) SetSchemaRegistry(value SchemaRegistry) Options { opts := *o opts.schemaReg = value diff --git a/src/dbnode/storage/namespace/truncate_type.go b/src/dbnode/storage/namespace/truncate_type.go new file mode 100644 index 0000000000..312d458af9 --- /dev/null +++ b/src/dbnode/storage/namespace/truncate_type.go @@ -0,0 +1,86 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package namespace + +import ( + "fmt" +) + +// TruncateType determines the scheme for truncating transforms. +type TruncateType uint8 + +const ( + TypeNone TruncateType = iota + TypeBlock +) + +var validTruncationTypes = []TruncateType{ + TypeNone, + TypeBlock, +} + +// Validate validates that the scheme type is valid. +func (t TruncateType) Validate() error { + if t == TypeNone { + return nil + } + + if t >= TypeNone && t <= TypeBlock { + return nil + } + + return fmt.Errorf("invalid truncation type: '%v' valid types are: %v", + t, validTruncationTypes) +} + +func (t TruncateType) String() string { + switch t { + case TypeNone: + return "none" + case TypeBlock: + return "block" + default: + // Should never get here. + return "unknown" + } +} + +// UnmarshalYAML unmarshals a stored merics type. +func (t *TruncateType) UnmarshalYAML(unmarshal func(interface{}) error) error { + var str string + if err := unmarshal(&str); err != nil { + return err + } + + if str == "" { + *t = TypeNone + } + + for _, valid := range validTruncationTypes { + if str == valid.String() { + *t = valid + return nil + } + } + + return fmt.Errorf("invalid truncation type: '%s' valid types are: %v", + str, validTruncationTypes) +} diff --git a/src/dbnode/storage/namespace/truncate_type_test.go b/src/dbnode/storage/namespace/truncate_type_test.go new file mode 100644 index 0000000000..0a93817283 --- /dev/null +++ b/src/dbnode/storage/namespace/truncate_type_test.go @@ -0,0 +1,66 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package namespace + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + yaml "gopkg.in/yaml.v2" +) + +func TestTruncateTypeValidation(t *testing.T) { + err := TypeNone.Validate() + assert.NoError(t, err) + err = TypeBlock.Validate() + assert.NoError(t, err) + err = TruncateType(4).Validate() + assert.Error(t, err) +} + +func TestTruncateTypeUnmarshalYAML(t *testing.T) { + type config struct { + Type TruncateType `yaml:"type"` + } + + validParseSchemes := []TruncateType{ + TypeNone, + TypeBlock, + } + + for _, value := range validParseSchemes { + str := fmt.Sprintf("type: %s\n", value.String()) + + var cfg config + require.NoError(t, yaml.Unmarshal([]byte(str), &cfg)) + + assert.Equal(t, value, cfg.Type) + } + + var cfg config + // Bad type fails. + require.Error(t, yaml.Unmarshal([]byte("type: not_a_known_type\n"), &cfg)) + + require.NoError(t, yaml.Unmarshal([]byte(""), &cfg)) + assert.Equal(t, TypeNone, cfg.Type) +} diff --git a/src/dbnode/storage/namespace/types.go b/src/dbnode/storage/namespace/types.go index b6b213cd0c..3327c59191 100644 --- a/src/dbnode/storage/namespace/types.go +++ b/src/dbnode/storage/namespace/types.go @@ -91,6 +91,12 @@ type Options interface { // IndexOptions returns the IndexOptions. IndexOptions() IndexOptions + // SetTruncateType sets the truncation type for this namespace. + SetTruncateType(value TruncateType) Options + + // TruncateType returns the truncation type for this namespace. + TruncateType() TruncateType + // SetSchemaRegistry sets the schema registry for this namespace. SetSchemaRegistry(value SchemaRegistry) Options @@ -115,11 +121,18 @@ type IndexOptions interface { // BlockSize returns the block size. BlockSize() time.Duration - // SetUseAsIndex sets whether or not this namespace is being used as an index. - SetUseAsIndex(b bool) IndexOptions + // SetForwardIndexProbability sets the probability chance for forward writes. + SetForwardIndexProbability(value float64) IndexOptions + + // ForwardIndexProbability returns the probability chance for forward writes. + ForwardIndexProbability() float64 + + // SetForwardIndexProbability sets the threshold for forward writes as a + // fraction of the bufferFuture. + SetForwardIndexThreshold(value float64) IndexOptions - // UseAsIndex returns whether this namespace is being used as an index. - UseAsIndex() bool + // ForwardIndexProbability returns the threshold for forward writes. + ForwardIndexThreshold() float64 } // SchemaDescr describes the schema for a complex type value. diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 4e117aa1a2..484bfbce68 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -354,6 +354,16 @@ func (o *options) RepairEnabled() bool { return o.repairEnabled } +func (o *options) SetIndexOptions(b bool) Options { + opts := *o + opts.useAsIndex = b + return &opts +} + +func (o *options) UseAsIndex() bool { + return o.useAsIndex +} + func (o *options) SetRepairOptions(value repair.Options) Options { opts := *o opts.repairOpts = value From 28c082885f9c050d4cd341b90877d1643c140088 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Wed, 1 May 2019 14:20:43 -0400 Subject: [PATCH 4/8] Refactoring options, plumbing config to options --- src/cmd/services/m3dbnode/config/config.go | 8 +++- src/dbnode/server/server.go | 7 +++- src/dbnode/storage/index/options.go | 22 ++++++++++ src/dbnode/storage/index/types.go | 13 ++++++ src/dbnode/storage/namespace.go | 14 ++----- src/dbnode/storage/namespace/index_options.go | 26 +----------- .../storage/namespace/index_options_test.go | 16 -------- .../storage/namespace/namespace_mock.go | 28 ------------- src/dbnode/storage/namespace/options.go | 11 ----- src/dbnode/storage/namespace/types.go | 19 --------- src/dbnode/storage/namespace_test.go | 40 ++++++++++++++----- src/dbnode/storage/options.go | 9 +++-- src/dbnode/storage/series/buffer.go | 2 +- src/dbnode/storage/series/buffer_test.go | 2 +- .../{namespace => series}/truncate_type.go | 2 +- .../truncate_type_test.go | 2 +- src/dbnode/storage/series/types.go | 4 +- src/dbnode/storage/shard.go | 1 - src/dbnode/storage/types.go | 6 +++ 19 files changed, 98 insertions(+), 134 deletions(-) rename src/dbnode/storage/{namespace => series}/truncate_type.go (99%) rename src/dbnode/storage/{namespace => series}/truncate_type_test.go (99%) diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 6c005ed91b..92c70ca263 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -32,7 +32,7 @@ import ( coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/environment" - "github.com/m3db/m3/src/dbnode/storage/namespace" + "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/x/config/hostid" "github.com/m3db/m3/src/x/instrument" xlog "github.com/m3db/m3/src/x/log" @@ -169,6 +169,10 @@ func (c *DBConfiguration) InitDefaultsAndValidate() error { return err } + if err := c.Transforms.Validate(); err != nil { + return err + } + return nil } @@ -201,7 +205,7 @@ type IndexConfiguration struct { // incoming writes. type TransformConfiguration struct { // TruncateBy determines what type of truncatation - TruncateBy namespace.TruncateType `yaml:"truncateBy"` + TruncateBy series.TruncateType `yaml:"truncateBy"` } func (c *TransformConfiguration) Validate() error { diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 6c0c204529..2efc49f22e 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -342,7 +342,6 @@ func Run(runOpts RunOptions) { CacheRegexp: plCacheConfig.CacheRegexpOrDefault(), CacheTerms: plCacheConfig.CacheTermsOrDefault(), }) - opts = opts.SetIndexOptions(indexOpts) if tick := cfg.Tick; tick != nil { runtimeOpts = runtimeOpts. @@ -1281,6 +1280,8 @@ func withEncodingAndPoolingOptions( aggregateQueryResultsPool := index.NewAggregateResultsPool( poolOptions(policy.IndexResultsPool, scope.SubScope("index-aggregate-results-pool"))) + // Set index options. + opts = opts.SetTruncateType(cfg.Transforms.TruncateBy) indexOpts := opts.IndexOptions(). SetInstrumentOptions(iopts). SetMemSegmentOptions( @@ -1297,7 +1298,9 @@ func withEncodingAndPoolingOptions( SetIdentifierPool(identifierPool). SetCheckedBytesPool(bytesPool). SetQueryResultsPool(queryResultsPool). - SetAggregateResultsPool(aggregateQueryResultsPool) + SetAggregateResultsPool(aggregateQueryResultsPool). + SetForwardIndexProbability(cfg.Index.ForwardIndexProbability). + SetForwardIndexThreshold(cfg.Index.ForwardIndexThreshold) queryResultsPool.Init(func() index.QueryResults { // NB(r): Need to initialize after setting the index opts so diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index 07f505adbf..f572df5c37 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -100,6 +100,8 @@ func init() { // nolint: maligned type opts struct { + forwardIndexThreshold float64 + forwardIndexProbability float64 insertMode InsertMode clockOpts clock.Options instrumentOpts instrument.Options @@ -379,3 +381,23 @@ func (o *opts) SetReadThroughSegmentOptions(value ReadThroughSegmentOptions) Opt func (o *opts) ReadThroughSegmentOptions() ReadThroughSegmentOptions { return o.readThroughSegmentOptions } + +func (o *opts) SetForwardIndexProbability(value float64) Options { + opts := *o + opts.forwardIndexProbability = value + return &opts +} + +func (o *opts) ForwardIndexProbability() float64 { + return o.forwardIndexProbability +} + +func (o *opts) SetForwardIndexThreshold(value float64) Options { + opts := *o + opts.forwardIndexThreshold = value + return &opts +} + +func (o *opts) ForwardIndexThreshold() float64 { + return o.forwardIndexThreshold +} diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index c1fae3ee2d..687a10071f 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -850,4 +850,17 @@ type Options interface { // ReadThroughSegmentOptions returns the read through segment cache options. ReadThroughSegmentOptions() ReadThroughSegmentOptions + + // SetForwardIndexProbability sets the probability chance for forward writes. + SetForwardIndexProbability(value float64) Options + + // ForwardIndexProbability returns the probability chance for forward writes. + ForwardIndexProbability() float64 + + // SetForwardIndexProbability sets the threshold for forward writes as a + // fraction of the bufferFuture. + SetForwardIndexThreshold(value float64) Options + + // ForwardIndexProbability returns the threshold for forward writes. + ForwardIndexThreshold() float64 } diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 3abfee339f..8ecaa79cce 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -101,7 +101,6 @@ type dbNamespace struct { sync.RWMutex closed bool - useAsIndex bool shutdownCh chan struct{} id ident.ID shardSet sharding.ShardSet @@ -286,16 +285,12 @@ func newDatabaseNamespace( opts Options, ) (databaseNamespace, error) { var ( - nopts = metadata.Options() - id = metadata.ID() - useAsIndex bool + nopts = metadata.Options() + id = metadata.ID() ) if !nopts.WritesToCommitLog() { commitLogWriter = commitLogWriteNoOp } - if nopts.IndexOptions().UseAsIndex() { - useAsIndex = true - } iops := opts.InstrumentOptions() logger := iops.Logger().With(zap.String("namespace", id.String())) @@ -340,7 +335,6 @@ func newDatabaseNamespace( opts: opts, metadata: metadata, nopts: nopts, - useAsIndex: useAsIndex, schemaRegistry: metadata.Options().SchemaRegistry(), seriesOpts: seriesOpts, nowFn: opts.ClockOptions().NowFn(), @@ -598,7 +592,7 @@ func (n *dbNamespace) Write( return ts.Series{}, false, err } opts := series.WriteOptions{ - UseAsIndex: n.useAsIndex, + TruncateType: n.opts.TruncateType(), } series, wasWritten, err := shard.Write(ctx, id, timestamp, value, unit, annotation, opts) @@ -626,7 +620,7 @@ func (n *dbNamespace) WriteTagged( return ts.Series{}, false, err } opts := series.WriteOptions{ - UseAsIndex: n.useAsIndex, + TruncateType: n.opts.TruncateType(), } series, wasWritten, err := shard.WriteTagged(ctx, id, tags, timestamp, value, unit, annotation, opts) diff --git a/src/dbnode/storage/namespace/index_options.go b/src/dbnode/storage/namespace/index_options.go index 1bc5c1cee3..c50b547c97 100644 --- a/src/dbnode/storage/namespace/index_options.go +++ b/src/dbnode/storage/namespace/index_options.go @@ -35,9 +35,6 @@ var ( type indexOpts struct { enabled bool blockSize time.Duration - - forwardIndexThreshold float64 - forwardIndexProbability float64 } // NewIndexOptions returns a new IndexOptions. @@ -50,9 +47,7 @@ func NewIndexOptions() IndexOptions { func (i *indexOpts) Equal(value IndexOptions) bool { return i.Enabled() == value.Enabled() && - i.BlockSize() == value.BlockSize() && - i.ForwardIndexProbability() == value.ForwardIndexProbability() && - i.ForwardIndexThreshold() == value.ForwardIndexThreshold() + i.BlockSize() == value.BlockSize() } func (i *indexOpts) SetEnabled(value bool) IndexOptions { @@ -75,22 +70,3 @@ func (i *indexOpts) BlockSize() time.Duration { return i.blockSize } -func (i *indexOpts) SetForwardIndexProbability(value float64) IndexOptions { - io := *i - io.forwardIndexProbability = value - return &io -} - -func (i *indexOpts) ForwardIndexProbability() float64 { - return i.forwardIndexProbability -} - -func (i *indexOpts) SetForwardIndexThreshold(value float64) IndexOptions { - io := *i - io.forwardIndexThreshold = value - return &io -} - -func (i *indexOpts) ForwardIndexThreshold() float64 { - return i.forwardIndexThreshold -} diff --git a/src/dbnode/storage/namespace/index_options_test.go b/src/dbnode/storage/namespace/index_options_test.go index 5c8b94be81..633aadc12f 100644 --- a/src/dbnode/storage/namespace/index_options_test.go +++ b/src/dbnode/storage/namespace/index_options_test.go @@ -33,10 +33,6 @@ func TestIndexOptionsEqual(t *testing.T) { require.False(t, opts.SetEnabled(true).Equal(opts.SetEnabled(false))) require.False(t, opts.SetBlockSize(time.Hour).Equal( opts.SetBlockSize(time.Hour*2))) - require.False(t, opts.SetForwardIndexProbability(0.0). - Equal(opts.SetForwardIndexProbability(0.1))) - require.False(t, opts.SetForwardIndexThreshold(0.0). - Equal(opts.SetForwardIndexThreshold(0.1))) } func TestIndexOptionsEnabled(t *testing.T) { @@ -49,15 +45,3 @@ func TestIndexOptionsBlockSize(t *testing.T) { opts := NewIndexOptions() require.Equal(t, time.Hour, opts.SetBlockSize(time.Hour).BlockSize()) } - -func TestIndexOptionsForwardIndexThreshold(t *testing.T) { - threshold := 0.3 - opts := NewIndexOptions().SetForwardIndexThreshold(threshold) - require.Equal(t, threshold, opts.ForwardIndexThreshold()) -} - -func TestIndexOptionsForwardIndexProbability(t *testing.T) { - threshold := 0.3 - opts := NewIndexOptions().SetForwardIndexProbability(threshold) - require.Equal(t, threshold, opts.ForwardIndexProbability()) -} diff --git a/src/dbnode/storage/namespace/namespace_mock.go b/src/dbnode/storage/namespace/namespace_mock.go index 18fd1117d8..08c6269011 100644 --- a/src/dbnode/storage/namespace/namespace_mock.go +++ b/src/dbnode/storage/namespace/namespace_mock.go @@ -339,34 +339,6 @@ func (mr *MockOptionsMockRecorder) IndexOptions() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexOptions", reflect.TypeOf((*MockOptions)(nil).IndexOptions)) } -// SetTruncateType mocks base method -func (m *MockOptions) SetTruncateType(value TruncateType) Options { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetTruncateType", value) - ret0, _ := ret[0].(Options) - return ret0 -} - -// SetTruncateType indicates an expected call of SetTruncateType -func (mr *MockOptionsMockRecorder) SetTruncateType(value interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTruncateType", reflect.TypeOf((*MockOptions)(nil).SetTruncateType), value) -} - -// TruncateType mocks base method -func (m *MockOptions) TruncateType() TruncateType { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TruncateType") - ret0, _ := ret[0].(TruncateType) - return ret0 -} - -// TruncateType indicates an expected call of TruncateType -func (mr *MockOptionsMockRecorder) TruncateType() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TruncateType", reflect.TypeOf((*MockOptions)(nil).TruncateType)) -} - // SetSchemaRegistry mocks base method func (m *MockOptions) SetSchemaRegistry(value SchemaRegistry) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/namespace/options.go b/src/dbnode/storage/namespace/options.go index 0ce744083f..849237dfad 100644 --- a/src/dbnode/storage/namespace/options.go +++ b/src/dbnode/storage/namespace/options.go @@ -63,7 +63,6 @@ type options struct { cleanupEnabled bool repairEnabled bool coldWritesEnabled bool - truncateType TruncateType retentionOpts retention.Options indexOpts IndexOptions schemaReg SchemaRegistry @@ -213,16 +212,6 @@ func (o *options) IndexOptions() IndexOptions { return o.indexOpts } -func (o *options) SetTruncateType(t TruncateType) Options { - opts := *o - opts.truncateType = t - return &opts -} - -func (o *options) TruncateType() TruncateType { - return o.truncateType -} - func (o *options) SetSchemaRegistry(value SchemaRegistry) Options { opts := *o opts.schemaReg = value diff --git a/src/dbnode/storage/namespace/types.go b/src/dbnode/storage/namespace/types.go index 3327c59191..dac4448174 100644 --- a/src/dbnode/storage/namespace/types.go +++ b/src/dbnode/storage/namespace/types.go @@ -91,12 +91,6 @@ type Options interface { // IndexOptions returns the IndexOptions. IndexOptions() IndexOptions - // SetTruncateType sets the truncation type for this namespace. - SetTruncateType(value TruncateType) Options - - // TruncateType returns the truncation type for this namespace. - TruncateType() TruncateType - // SetSchemaRegistry sets the schema registry for this namespace. SetSchemaRegistry(value SchemaRegistry) Options @@ -120,19 +114,6 @@ type IndexOptions interface { // BlockSize returns the block size. BlockSize() time.Duration - - // SetForwardIndexProbability sets the probability chance for forward writes. - SetForwardIndexProbability(value float64) IndexOptions - - // ForwardIndexProbability returns the probability chance for forward writes. - ForwardIndexProbability() float64 - - // SetForwardIndexProbability sets the threshold for forward writes as a - // fraction of the bufferFuture. - SetForwardIndexThreshold(value float64) IndexOptions - - // ForwardIndexProbability returns the threshold for forward writes. - ForwardIndexThreshold() float64 } // SchemaDescr describes the schema for a complex type value. diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index b48288bfe3..b33a09c68e 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -98,6 +98,21 @@ func newTestNamespaceWithIDOpts( return ns.(*dbNamespace), closer } +func newTestNamespaceWithOpts( + t *testing.T, + dopts Options, +) (*dbNamespace, closerFn) { + nsID, opts := defaultTestNs1ID, defaultTestNs1Opts + metadata := newTestNamespaceMetadataWithIDOpts(t, nsID, opts) + hashFn := func(identifier ident.ID) uint32 { return testShardIDs[0].ID() } + shardSet, err := sharding.NewShardSet(testShardIDs, hashFn) + require.NoError(t, err) + ns, err := newDatabaseNamespace(metadata, shardSet, nil, nil, nil, dopts) + require.NoError(t, err) + closer := dopts.RuntimeOptionsManager().Close + return ns.(*dbNamespace), closer +} + func newTestNamespaceWithIndex( t *testing.T, index namespaceIndex, @@ -112,11 +127,13 @@ func newTestNamespaceWithIndex( func newTestNamespaceWithUseAsIndex( t *testing.T, index namespaceIndex, - useAsIndex bool, + truncateType series.TruncateType, ) (*dbNamespace, closerFn) { - nsOpts := defaultTestNs1Opts - nsOpts = nsOpts.SetIndexOptions(nsOpts.IndexOptions().SetUseAsIndex(useAsIndex)) - ns, closer := newTestNamespaceWithIDOpts(t, defaultTestNs1ID, nsOpts) + opts := testDatabaseOptions(). + SetRuntimeOptionsManager(runtime.NewOptionsManager()). + SetTruncateType(truncateType) + + ns, closer := newTestNamespaceWithOpts(t, opts) ns.reverseIndex = index return ns, closer } @@ -196,12 +213,13 @@ func TestNamespaceWriteShardOwned(t *testing.T) { unit := xtime.Second ant := []byte(nil) - for _, useAsIndex := range []bool{true, false} { - ns, closer := newTestNamespaceWithUseAsIndex(t, nil, useAsIndex) + truncateTypes := []series.TruncateType{series.TypeBlock, series.TypeNone} + for _, truncateType := range truncateTypes { + ns, closer := newTestNamespaceWithUseAsIndex(t, nil, truncateType) defer closer() shard := NewMockdatabaseShard(ctrl) opts := series.WriteOptions{ - UseAsIndex: useAsIndex, + TruncateType: truncateType, } shard.EXPECT().Write(ctx, id, now, val, unit, ant, opts). Return(ts.Series{}, true, nil).Times(1) @@ -1085,9 +1103,11 @@ func TestNamespaceIndexInsert(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - for _, useAsIndex := range []bool{true, false} { + truncateTypes := []series.TruncateType{series.TypeBlock, series.TypeNone} + for _, truncateType := range truncateTypes { idx := NewMocknamespaceIndex(ctrl) - ns, closer := newTestNamespaceWithUseAsIndex(t, idx, useAsIndex) + + ns, closer := newTestNamespaceWithUseAsIndex(t, idx, truncateType) ns.reverseIndex = idx defer closer() @@ -1097,7 +1117,7 @@ func TestNamespaceIndexInsert(t *testing.T) { shard := NewMockdatabaseShard(ctrl) opts := series.WriteOptions{ - UseAsIndex: useAsIndex, + TruncateType: truncateType, } shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, now, 1.0, xtime.Second, nil, opts).Return(ts.Series{}, true, nil) diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 484bfbce68..ce2aa92581 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -118,6 +118,7 @@ type options struct { errThresholdForLoad int64 indexingEnabled bool repairEnabled bool + truncateType series.TruncateType indexOpts index.Options repairOpts repair.Options newEncoderFn encoding.NewEncoderFn @@ -354,14 +355,14 @@ func (o *options) RepairEnabled() bool { return o.repairEnabled } -func (o *options) SetIndexOptions(b bool) Options { +func (o *options) SetTruncateType(value series.TruncateType) Options { opts := *o - opts.useAsIndex = b + opts.truncateType = value return &opts } -func (o *options) UseAsIndex() bool { - return o.useAsIndex +func (o *options) TruncateType() series.TruncateType { + return o.truncateType } func (o *options) SetRepairOptions(value repair.Options) Options { diff --git a/src/dbnode/storage/series/buffer.go b/src/dbnode/storage/series/buffer.go index 0c2fd7afca..6676ac22d0 100644 --- a/src/dbnode/storage/series/buffer.go +++ b/src/dbnode/storage/series/buffer.go @@ -259,7 +259,7 @@ func (b *dbBuffer) Write( buckets := b.bucketVersionsAtCreate(blockStart) b.putBucketVersionsInCache(buckets) - if wOpts.UseAsIndex { + if wOpts.TruncateType == TypeBlock { timestamp = blockStart value = truncatedValue unit = truncatedUnit diff --git a/src/dbnode/storage/series/buffer_test.go b/src/dbnode/storage/series/buffer_test.go index 8f7c34d5a8..9ab28c46dd 100644 --- a/src/dbnode/storage/series/buffer_test.go +++ b/src/dbnode/storage/series/buffer_test.go @@ -494,7 +494,7 @@ func TestIndexedBufferWriteOnlyWritesSinglePoint(t *testing.T) { for i, v := range data { ctx := context.NewContext() wasWritten, err := buffer.Write(ctx, v.timestamp, v.value, v.unit, - v.annotation, WriteOptions{UseAsIndex: true}) + v.annotation, WriteOptions{TruncateType: TypeBlock}) require.NoError(t, err) expectedWrite := i == 0 require.Equal(t, expectedWrite, wasWritten) diff --git a/src/dbnode/storage/namespace/truncate_type.go b/src/dbnode/storage/series/truncate_type.go similarity index 99% rename from src/dbnode/storage/namespace/truncate_type.go rename to src/dbnode/storage/series/truncate_type.go index 312d458af9..e5d9056ee7 100644 --- a/src/dbnode/storage/namespace/truncate_type.go +++ b/src/dbnode/storage/series/truncate_type.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package namespace +package series import ( "fmt" diff --git a/src/dbnode/storage/namespace/truncate_type_test.go b/src/dbnode/storage/series/truncate_type_test.go similarity index 99% rename from src/dbnode/storage/namespace/truncate_type_test.go rename to src/dbnode/storage/series/truncate_type_test.go index 0a93817283..ec761bfdb5 100644 --- a/src/dbnode/storage/namespace/truncate_type_test.go +++ b/src/dbnode/storage/series/truncate_type_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package namespace +package series import ( "fmt" diff --git a/src/dbnode/storage/series/types.go b/src/dbnode/storage/series/types.go index 0ef48626fc..bbbec4e4d2 100644 --- a/src/dbnode/storage/series/types.go +++ b/src/dbnode/storage/series/types.go @@ -352,6 +352,6 @@ const BootstrapWriteType = WarmWrite // WriteOptions provides a set of options for a write. type WriteOptions struct { - SchemaDesc namespace.SchemaDescr - UseAsIndex bool + SchemaDesc namespace.SchemaDescr + TruncateType TruncateType } diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index a68985cbe8..45f3fba264 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1081,7 +1081,6 @@ func (s *dbShard) newShardEntry( default: return nil, errNewShardEntryTagsTypeInvalid - } series := s.seriesPool.Get() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 6fab06e1eb..ec26ea7978 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -782,6 +782,12 @@ type Options interface { // IndexOptions returns the indexing options. IndexOptions() index.Options + // SetTruncateType sets the truncation type for the database. + SetTruncateType(value series.TruncateType) Options + + // TruncateType returns the truncation type for the database. + TruncateType() series.TruncateType + // SetRepairEnabled sets whether or not to enable the repair. SetRepairEnabled(b bool) Options From e6c6c55539ec519a7d25bfff9ecf303837316c02 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Wed, 1 May 2019 17:58:30 -0400 Subject: [PATCH 5/8] PR response --- scripts/development/m3_stack/m3dbnode.yml | 8 ++++++++ src/cmd/services/m3dbnode/config/config.go | 5 ++++- src/cmd/services/m3dbnode/config/config_test.go | 1 + src/dbnode/server/server.go | 11 ++++++++++- src/dbnode/storage/namespace_test.go | 6 +++--- src/dbnode/storage/options.go | 13 +++++++++++++ src/dbnode/storage/series/buffer.go | 10 ++++------ src/dbnode/storage/series/buffer_test.go | 12 ++++++++++-- src/dbnode/storage/series/types.go | 15 ++++++++++++++- src/dbnode/storage/types.go | 8 ++++++++ 10 files changed, 75 insertions(+), 14 deletions(-) diff --git a/scripts/development/m3_stack/m3dbnode.yml b/scripts/development/m3_stack/m3dbnode.yml index 08ed122e29..559c8e4c67 100644 --- a/scripts/development/m3_stack/m3dbnode.yml +++ b/scripts/development/m3_stack/m3dbnode.yml @@ -1,4 +1,12 @@ db: + transforms: + truncateBy: block + truncateDuration: 1h + + index: + forwardIndexThreshold: 0.1 + forwardIndexProbability: 0.1 + logging: level: info diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 92c70ca263..3617c32bf9 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -204,8 +204,11 @@ type IndexConfiguration struct { // TransformConfiguration contains configuration options that can transform // incoming writes. type TransformConfiguration struct { - // TruncateBy determines what type of truncatation + // TruncateBy determines what type of truncatation is applied to incoming + // writes. TruncateBy series.TruncateType `yaml:"truncateBy"` + // ForcedValue determines what to set all incoming write values to. + ForcedValue *float64 `yaml:"forceValue"` } func (c *TransformConfiguration) Validate() error { diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index e164af4096..29a71fefd6 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -335,6 +335,7 @@ func TestConfiguration(t *testing.T) { forwardIndexThreshold: 0 transforms: truncateBy: 0 + forceValue: null logging: file: /var/log/m3dbnode.log level: info diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 2efc49f22e..8766feb497 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -1280,8 +1280,17 @@ func withEncodingAndPoolingOptions( aggregateQueryResultsPool := index.NewAggregateResultsPool( poolOptions(policy.IndexResultsPool, scope.SubScope("index-aggregate-results-pool"))) - // Set index options. + // Set value transformation options. opts = opts.SetTruncateType(cfg.Transforms.TruncateBy) + forcedValue := cfg.Transforms.ForcedValue + if forcedValue != nil { + opts = opts.SetWriteTransformOptions(series.WriteTransformOptions{ + ForceValueEnabled: true, + ForceValue: *forcedValue, + }) + } + + // Set index options. indexOpts := opts.IndexOptions(). SetInstrumentOptions(iopts). SetMemSegmentOptions( diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index b33a09c68e..302f876f24 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -124,7 +124,7 @@ func newTestNamespaceWithIndex( return ns, closer } -func newTestNamespaceWithUseAsIndex( +func newTestNamespaceWithTruncateType( t *testing.T, index namespaceIndex, truncateType series.TruncateType, @@ -215,7 +215,7 @@ func TestNamespaceWriteShardOwned(t *testing.T) { truncateTypes := []series.TruncateType{series.TypeBlock, series.TypeNone} for _, truncateType := range truncateTypes { - ns, closer := newTestNamespaceWithUseAsIndex(t, nil, truncateType) + ns, closer := newTestNamespaceWithTruncateType(t, nil, truncateType) defer closer() shard := NewMockdatabaseShard(ctrl) opts := series.WriteOptions{ @@ -1107,7 +1107,7 @@ func TestNamespaceIndexInsert(t *testing.T) { for _, truncateType := range truncateTypes { idx := NewMocknamespaceIndex(ctrl) - ns, closer := newTestNamespaceWithUseAsIndex(t, idx, truncateType) + ns, closer := newTestNamespaceWithTruncateType(t, idx, truncateType) ns.reverseIndex = idx defer closer() diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index ce2aa92581..7890dc1237 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -119,6 +119,7 @@ type options struct { indexingEnabled bool repairEnabled bool truncateType series.TruncateType + transformOptions series.WriteTransformOptions indexOpts index.Options repairOpts repair.Options newEncoderFn encoding.NewEncoderFn @@ -365,6 +366,18 @@ func (o *options) TruncateType() series.TruncateType { return o.truncateType } +func (o *options) SetWriteTransformOptions( + value series.WriteTransformOptions, +) Options { + opts := *o + opts.transformOptions = value + return &opts +} + +func (o *options) WriteTransformOptions() series.WriteTransformOptions { + return o.transformOptions +} + func (o *options) SetRepairOptions(value repair.Options) Options { opts := *o opts.repairOpts = value diff --git a/src/dbnode/storage/series/buffer.go b/src/dbnode/storage/series/buffer.go index 6676ac22d0..1545764ef4 100644 --- a/src/dbnode/storage/series/buffer.go +++ b/src/dbnode/storage/series/buffer.go @@ -60,10 +60,6 @@ const ( // enabled to see if this is a sane number. evictedTimesArraySize = 8 writableBucketVer = 0 - - // NB: this is the value all points will be truncated to. - truncatedValue float64 = 0 - truncatedUnit = xtime.Second ) type databaseBuffer interface { @@ -261,8 +257,10 @@ func (b *dbBuffer) Write( if wOpts.TruncateType == TypeBlock { timestamp = blockStart - value = truncatedValue - unit = truncatedUnit + } + + if wOpts.TransformOptions.ForceValueEnabled { + value = wOpts.TransformOptions.ForceValue } return buckets.write(timestamp, value, unit, annotation, writeType) diff --git a/src/dbnode/storage/series/buffer_test.go b/src/dbnode/storage/series/buffer_test.go index 9ab28c46dd..d4b12cd3f7 100644 --- a/src/dbnode/storage/series/buffer_test.go +++ b/src/dbnode/storage/series/buffer_test.go @@ -491,10 +491,18 @@ func TestIndexedBufferWriteOnlyWritesSinglePoint(t *testing.T) { {curr.Add(secs(3)), 3, xtime.Second, nil}, } + forceValue := 1.0 for i, v := range data { ctx := context.NewContext() + writeOpts := WriteOptions{ + TruncateType: TypeBlock, + TransformOptions: WriteTransformOptions{ + ForceValueEnabled: true, + ForceValue: forceValue, + }, + } wasWritten, err := buffer.Write(ctx, v.timestamp, v.value, v.unit, - v.annotation, WriteOptions{TruncateType: TypeBlock}) + v.annotation, writeOpts) require.NoError(t, err) expectedWrite := i == 0 require.Equal(t, expectedWrite, wasWritten) @@ -509,7 +517,7 @@ func TestIndexedBufferWriteOnlyWritesSinglePoint(t *testing.T) { assert.NotNil(t, results) ex := []value{ - {curr, truncatedValue, truncatedUnit, nil}, + {curr, forceValue, xtime.Second, nil}, } assertValuesEqual(t, ex, results, opts) diff --git a/src/dbnode/storage/series/types.go b/src/dbnode/storage/series/types.go index bbbec4e4d2..95e8dec377 100644 --- a/src/dbnode/storage/series/types.go +++ b/src/dbnode/storage/series/types.go @@ -350,8 +350,21 @@ const ( // what write type makes sense for bootstraps. const BootstrapWriteType = WarmWrite +// WriteTransformOptions describes transforms to run on incoming writes. +type WriteTransformOptions struct { + // ForceValueEnabled indicates if the values for incoming writes + // should be forced to `ForceValue`. + ForceValueEnabled bool + // ForceValue is the value that incoming writes should be forced to. + ForceValue float64 +} + // WriteOptions provides a set of options for a write. type WriteOptions struct { - SchemaDesc namespace.SchemaDescr + // SchemaDesc is the schema description. + SchemaDesc namespace.SchemaDescr + // TruncateType is the truncation type for incoming writes. TruncateType TruncateType + // TransformOptions describes transformation options for incoming writes. + TransformOptions WriteTransformOptions } diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index ec26ea7978..ef3413a5a0 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -788,6 +788,14 @@ type Options interface { // TruncateType returns the truncation type for the database. TruncateType() series.TruncateType + // SetWriteTransformOptions sets options for transforming incoming writes + // to the database. + SetWriteTransformOptions(value series.WriteTransformOptions) Options + + // WriteTransformOptions returns the options for transforming incoming writes + // to the database. + WriteTransformOptions() series.WriteTransformOptions + // SetRepairEnabled sets whether or not to enable the repair. SetRepairEnabled(b bool) Options From ff64c38967e79937683b6c3c5de22d155a6728b9 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Wed, 1 May 2019 17:59:57 -0400 Subject: [PATCH 6/8] Update mocks --- src/dbnode/storage/index/index_mock.go | 56 +++++++++++++++++++ .../storage/namespace/namespace_mock.go | 56 ------------------- src/dbnode/storage/storage_mock.go | 56 +++++++++++++++++++ 3 files changed, 112 insertions(+), 56 deletions(-) diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index eb40eb016f..a09657e6ca 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -1452,3 +1452,59 @@ func (mr *MockOptionsMockRecorder) ReadThroughSegmentOptions() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadThroughSegmentOptions", reflect.TypeOf((*MockOptions)(nil).ReadThroughSegmentOptions)) } + +// SetForwardIndexProbability mocks base method +func (m *MockOptions) SetForwardIndexProbability(value float64) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetForwardIndexProbability", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetForwardIndexProbability indicates an expected call of SetForwardIndexProbability +func (mr *MockOptionsMockRecorder) SetForwardIndexProbability(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetForwardIndexProbability", reflect.TypeOf((*MockOptions)(nil).SetForwardIndexProbability), value) +} + +// ForwardIndexProbability mocks base method +func (m *MockOptions) ForwardIndexProbability() float64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForwardIndexProbability") + ret0, _ := ret[0].(float64) + return ret0 +} + +// ForwardIndexProbability indicates an expected call of ForwardIndexProbability +func (mr *MockOptionsMockRecorder) ForwardIndexProbability() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardIndexProbability", reflect.TypeOf((*MockOptions)(nil).ForwardIndexProbability)) +} + +// SetForwardIndexThreshold mocks base method +func (m *MockOptions) SetForwardIndexThreshold(value float64) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetForwardIndexThreshold", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetForwardIndexThreshold indicates an expected call of SetForwardIndexThreshold +func (mr *MockOptionsMockRecorder) SetForwardIndexThreshold(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetForwardIndexThreshold", reflect.TypeOf((*MockOptions)(nil).SetForwardIndexThreshold), value) +} + +// ForwardIndexThreshold mocks base method +func (m *MockOptions) ForwardIndexThreshold() float64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForwardIndexThreshold") + ret0, _ := ret[0].(float64) + return ret0 +} + +// ForwardIndexThreshold indicates an expected call of ForwardIndexThreshold +func (mr *MockOptionsMockRecorder) ForwardIndexThreshold() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardIndexThreshold", reflect.TypeOf((*MockOptions)(nil).ForwardIndexThreshold)) +} diff --git a/src/dbnode/storage/namespace/namespace_mock.go b/src/dbnode/storage/namespace/namespace_mock.go index 08c6269011..cebd6c3ac3 100644 --- a/src/dbnode/storage/namespace/namespace_mock.go +++ b/src/dbnode/storage/namespace/namespace_mock.go @@ -460,62 +460,6 @@ func (mr *MockIndexOptionsMockRecorder) BlockSize() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockSize", reflect.TypeOf((*MockIndexOptions)(nil).BlockSize)) } -// SetForwardIndexProbability mocks base method -func (m *MockIndexOptions) SetForwardIndexProbability(value float64) IndexOptions { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetForwardIndexProbability", value) - ret0, _ := ret[0].(IndexOptions) - return ret0 -} - -// SetForwardIndexProbability indicates an expected call of SetForwardIndexProbability -func (mr *MockIndexOptionsMockRecorder) SetForwardIndexProbability(value interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetForwardIndexProbability", reflect.TypeOf((*MockIndexOptions)(nil).SetForwardIndexProbability), value) -} - -// ForwardIndexProbability mocks base method -func (m *MockIndexOptions) ForwardIndexProbability() float64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ForwardIndexProbability") - ret0, _ := ret[0].(float64) - return ret0 -} - -// ForwardIndexProbability indicates an expected call of ForwardIndexProbability -func (mr *MockIndexOptionsMockRecorder) ForwardIndexProbability() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardIndexProbability", reflect.TypeOf((*MockIndexOptions)(nil).ForwardIndexProbability)) -} - -// SetForwardIndexThreshold mocks base method -func (m *MockIndexOptions) SetForwardIndexThreshold(value float64) IndexOptions { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SetForwardIndexThreshold", value) - ret0, _ := ret[0].(IndexOptions) - return ret0 -} - -// SetForwardIndexThreshold indicates an expected call of SetForwardIndexThreshold -func (mr *MockIndexOptionsMockRecorder) SetForwardIndexThreshold(value interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetForwardIndexThreshold", reflect.TypeOf((*MockIndexOptions)(nil).SetForwardIndexThreshold), value) -} - -// ForwardIndexThreshold mocks base method -func (m *MockIndexOptions) ForwardIndexThreshold() float64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ForwardIndexThreshold") - ret0, _ := ret[0].(float64) - return ret0 -} - -// ForwardIndexThreshold indicates an expected call of ForwardIndexThreshold -func (mr *MockIndexOptionsMockRecorder) ForwardIndexThreshold() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardIndexThreshold", reflect.TypeOf((*MockIndexOptions)(nil).ForwardIndexThreshold)) -} - // MockSchemaDescr is a mock of SchemaDescr interface type MockSchemaDescr struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index ea10e88c51..50e138a2c5 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -3020,6 +3020,62 @@ func (mr *MockOptionsMockRecorder) IndexOptions() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexOptions", reflect.TypeOf((*MockOptions)(nil).IndexOptions)) } +// SetTruncateType mocks base method +func (m *MockOptions) SetTruncateType(value series.TruncateType) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetTruncateType", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetTruncateType indicates an expected call of SetTruncateType +func (mr *MockOptionsMockRecorder) SetTruncateType(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTruncateType", reflect.TypeOf((*MockOptions)(nil).SetTruncateType), value) +} + +// TruncateType mocks base method +func (m *MockOptions) TruncateType() series.TruncateType { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TruncateType") + ret0, _ := ret[0].(series.TruncateType) + return ret0 +} + +// TruncateType indicates an expected call of TruncateType +func (mr *MockOptionsMockRecorder) TruncateType() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TruncateType", reflect.TypeOf((*MockOptions)(nil).TruncateType)) +} + +// SetWriteTransformOptions mocks base method +func (m *MockOptions) SetWriteTransformOptions(value series.WriteTransformOptions) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetWriteTransformOptions", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetWriteTransformOptions indicates an expected call of SetWriteTransformOptions +func (mr *MockOptionsMockRecorder) SetWriteTransformOptions(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWriteTransformOptions", reflect.TypeOf((*MockOptions)(nil).SetWriteTransformOptions), value) +} + +// WriteTransformOptions mocks base method +func (m *MockOptions) WriteTransformOptions() series.WriteTransformOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteTransformOptions") + ret0, _ := ret[0].(series.WriteTransformOptions) + return ret0 +} + +// WriteTransformOptions indicates an expected call of WriteTransformOptions +func (mr *MockOptionsMockRecorder) WriteTransformOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTransformOptions", reflect.TypeOf((*MockOptions)(nil).WriteTransformOptions)) +} + // SetRepairEnabled mocks base method func (m *MockOptions) SetRepairEnabled(b bool) Options { m.ctrl.T.Helper() From c001f9ed400272e6ebb14100adc33e9dec102efa Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Thu, 2 May 2019 12:52:23 -0400 Subject: [PATCH 7/8] Fixing integration tests --- scripts/development/m3_stack/m3dbnode.yml | 8 -------- src/dbnode/storage/namespace/index_options.go | 1 - src/dbnode/storage/series/truncate_type.go | 17 ++++++----------- src/dbnode/storage/series/truncate_type_test.go | 5 +++-- 4 files changed, 9 insertions(+), 22 deletions(-) diff --git a/scripts/development/m3_stack/m3dbnode.yml b/scripts/development/m3_stack/m3dbnode.yml index 559c8e4c67..08ed122e29 100644 --- a/scripts/development/m3_stack/m3dbnode.yml +++ b/scripts/development/m3_stack/m3dbnode.yml @@ -1,12 +1,4 @@ db: - transforms: - truncateBy: block - truncateDuration: 1h - - index: - forwardIndexThreshold: 0.1 - forwardIndexProbability: 0.1 - logging: level: info diff --git a/src/dbnode/storage/namespace/index_options.go b/src/dbnode/storage/namespace/index_options.go index c50b547c97..aca2e4c8a5 100644 --- a/src/dbnode/storage/namespace/index_options.go +++ b/src/dbnode/storage/namespace/index_options.go @@ -69,4 +69,3 @@ func (i *indexOpts) SetBlockSize(value time.Duration) IndexOptions { func (i *indexOpts) BlockSize() time.Duration { return i.blockSize } - diff --git a/src/dbnode/storage/series/truncate_type.go b/src/dbnode/storage/series/truncate_type.go index e5d9056ee7..8a2665fda6 100644 --- a/src/dbnode/storage/series/truncate_type.go +++ b/src/dbnode/storage/series/truncate_type.go @@ -33,16 +33,15 @@ const ( ) var validTruncationTypes = []TruncateType{ + // TypeNone indicates that no truncation occurs. TypeNone, + // TypeBlock truncates incoming writes to the block boundary immediately + // preceeding this point's timestamp. TypeBlock, } // Validate validates that the scheme type is valid. func (t TruncateType) Validate() error { - if t == TypeNone { - return nil - } - if t >= TypeNone && t <= TypeBlock { return nil } @@ -63,17 +62,13 @@ func (t TruncateType) String() string { } } -// UnmarshalYAML unmarshals a stored merics type. +// UnmarshalYAML unmarshals a stored truncation type. func (t *TruncateType) UnmarshalYAML(unmarshal func(interface{}) error) error { var str string if err := unmarshal(&str); err != nil { return err } - if str == "" { - *t = TypeNone - } - for _, valid := range validTruncationTypes { if str == valid.String() { *t = valid @@ -81,6 +76,6 @@ func (t *TruncateType) UnmarshalYAML(unmarshal func(interface{}) error) error { } } - return fmt.Errorf("invalid truncation type: '%s' valid types are: %v", - str, validTruncationTypes) + *t = TypeNone + return nil } diff --git a/src/dbnode/storage/series/truncate_type_test.go b/src/dbnode/storage/series/truncate_type_test.go index ec761bfdb5..3a75e3df6e 100644 --- a/src/dbnode/storage/series/truncate_type_test.go +++ b/src/dbnode/storage/series/truncate_type_test.go @@ -58,8 +58,9 @@ func TestTruncateTypeUnmarshalYAML(t *testing.T) { } var cfg config - // Bad type fails. - require.Error(t, yaml.Unmarshal([]byte("type: not_a_known_type\n"), &cfg)) + // Bad type marshalls to TypeNone. + require.NoError(t, yaml.Unmarshal([]byte("type: not_a_known_type\n"), &cfg)) + assert.Equal(t, TypeNone, cfg.Type) require.NoError(t, yaml.Unmarshal([]byte(""), &cfg)) assert.Equal(t, TypeNone, cfg.Type) From c9253647f06703dbd7a6d27dd57310b5f8942691 Mon Sep 17 00:00:00 2001 From: Artem Nikolayevsky Date: Thu, 2 May 2019 13:09:46 -0400 Subject: [PATCH 8/8] Fix break to index options --- src/dbnode/server/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index b45b30b2f1..4e696f1f8f 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -339,6 +339,7 @@ func Run(runOpts RunOptions) { CacheRegexp: plCacheConfig.CacheRegexpOrDefault(), CacheTerms: plCacheConfig.CacheTermsOrDefault(), }) + opts = opts.SetIndexOptions(indexOpts) if tick := cfg.Tick; tick != nil { runtimeOpts = runtimeOpts.