diff --git a/src/cmd/services/m3dbnode/config/config.go b/src/cmd/services/m3dbnode/config/config.go index 90d67ffa7a..3617c32bf9 100644 --- a/src/cmd/services/m3dbnode/config/config.go +++ b/src/cmd/services/m3dbnode/config/config.go @@ -32,6 +32,7 @@ import ( coordinatorcfg "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/environment" + "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/x/config/hostid" "github.com/m3db/m3/src/x/instrument" xlog "github.com/m3db/m3/src/x/log" @@ -70,6 +71,9 @@ type DBConfiguration struct { // Index configuration. Index IndexConfiguration `yaml:"index"` + // Transforms configuration. + Transforms TransformConfiguration `yaml:"transforms"` + // Logging configuration. Logging xlog.Configuration `yaml:"logging"` @@ -165,6 +169,10 @@ func (c *DBConfiguration) InitDefaultsAndValidate() error { return err } + if err := c.Transforms.Validate(); err != nil { + return err + } + return nil } @@ -173,8 +181,42 @@ type IndexConfiguration struct { // MaxQueryIDsConcurrency controls the maximum number of outstanding QueryID // requests that can be serviced concurrently. Limiting the concurrency is // important to prevent index queries from overloading the database entirely - // as they are very CPU-intensive (regex and FST matching.) + // as they are very CPU-intensive (regex and FST matching). MaxQueryIDsConcurrency int `yaml:"maxQueryIDsConcurrency" validate:"min=0"` + + // ForwardIndexProbability determines the likelihood that an incoming write is + // written to the next block, when arriving close to the block boundary. + // + // NB: this is an optimization which lessens pressure on the index around + // block boundaries by eagerly writing the series to the next block + // preemptively. + ForwardIndexProbability float64 `yaml:"forwardIndexProbability" validate:"min=0.0,max=1.0"` + + // ForwardIndexThreshold determines the threshold for forward writes, as a + // fraction of the given namespace's bufferFuture. + // + // NB: this is an optimization which lessens pressure on the index around + // block boundaries by eagerly writing the series to the next block + // preemptively. + ForwardIndexThreshold float64 `yaml:"forwardIndexThreshold" validate:"min=0.0,max=1.0"` +} + +// TransformConfiguration contains configuration options that can transform +// incoming writes. +type TransformConfiguration struct { + // TruncateBy determines what type of truncatation is applied to incoming + // writes. + TruncateBy series.TruncateType `yaml:"truncateBy"` + // ForcedValue determines what to set all incoming write values to. + ForcedValue *float64 `yaml:"forceValue"` +} + +func (c *TransformConfiguration) Validate() error { + if c == nil { + return nil + } + + return c.TruncateBy.Validate() } // TickConfiguration is the tick configuration for background processing of diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 1ee0a1bde5..29a71fefd6 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -331,6 +331,11 @@ func TestConfiguration(t *testing.T) { expected := `db: index: maxQueryIDsConcurrency: 0 + forwardIndexProbability: 0 + forwardIndexThreshold: 0 + transforms: + truncateBy: 0 + forceValue: null logging: file: /var/log/m3dbnode.log level: info diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 5d517e40b7..4e696f1f8f 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -1278,6 +1278,17 @@ func withEncodingAndPoolingOptions( aggregateQueryResultsPool := index.NewAggregateResultsPool( poolOptions(policy.IndexResultsPool, scope.SubScope("index-aggregate-results-pool"))) + // Set value transformation options. + opts = opts.SetTruncateType(cfg.Transforms.TruncateBy) + forcedValue := cfg.Transforms.ForcedValue + if forcedValue != nil { + opts = opts.SetWriteTransformOptions(series.WriteTransformOptions{ + ForceValueEnabled: true, + ForceValue: *forcedValue, + }) + } + + // Set index options. indexOpts := opts.IndexOptions(). SetInstrumentOptions(iopts). SetMemSegmentOptions( @@ -1294,7 +1305,9 @@ func withEncodingAndPoolingOptions( SetIdentifierPool(identifierPool). SetCheckedBytesPool(bytesPool). SetQueryResultsPool(queryResultsPool). - SetAggregateResultsPool(aggregateQueryResultsPool) + SetAggregateResultsPool(aggregateQueryResultsPool). + SetForwardIndexProbability(cfg.Index.ForwardIndexProbability). + SetForwardIndexThreshold(cfg.Index.ForwardIndexThreshold) queryResultsPool.Init(func() index.QueryResults { // NB(r): Need to initialize after setting the index opts so diff --git a/src/dbnode/storage/index/index_mock.go b/src/dbnode/storage/index/index_mock.go index eb40eb016f..a09657e6ca 100644 --- a/src/dbnode/storage/index/index_mock.go +++ b/src/dbnode/storage/index/index_mock.go @@ -1452,3 +1452,59 @@ func (mr *MockOptionsMockRecorder) ReadThroughSegmentOptions() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadThroughSegmentOptions", reflect.TypeOf((*MockOptions)(nil).ReadThroughSegmentOptions)) } + +// SetForwardIndexProbability mocks base method +func (m *MockOptions) SetForwardIndexProbability(value float64) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetForwardIndexProbability", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetForwardIndexProbability indicates an expected call of SetForwardIndexProbability +func (mr *MockOptionsMockRecorder) SetForwardIndexProbability(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetForwardIndexProbability", reflect.TypeOf((*MockOptions)(nil).SetForwardIndexProbability), value) +} + +// ForwardIndexProbability mocks base method +func (m *MockOptions) ForwardIndexProbability() float64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForwardIndexProbability") + ret0, _ := ret[0].(float64) + return ret0 +} + +// ForwardIndexProbability indicates an expected call of ForwardIndexProbability +func (mr *MockOptionsMockRecorder) ForwardIndexProbability() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardIndexProbability", reflect.TypeOf((*MockOptions)(nil).ForwardIndexProbability)) +} + +// SetForwardIndexThreshold mocks base method +func (m *MockOptions) SetForwardIndexThreshold(value float64) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetForwardIndexThreshold", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetForwardIndexThreshold indicates an expected call of SetForwardIndexThreshold +func (mr *MockOptionsMockRecorder) SetForwardIndexThreshold(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetForwardIndexThreshold", reflect.TypeOf((*MockOptions)(nil).SetForwardIndexThreshold), value) +} + +// ForwardIndexThreshold mocks base method +func (m *MockOptions) ForwardIndexThreshold() float64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForwardIndexThreshold") + ret0, _ := ret[0].(float64) + return ret0 +} + +// ForwardIndexThreshold indicates an expected call of ForwardIndexThreshold +func (mr *MockOptionsMockRecorder) ForwardIndexThreshold() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForwardIndexThreshold", reflect.TypeOf((*MockOptions)(nil).ForwardIndexThreshold)) +} diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index 07f505adbf..f572df5c37 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -100,6 +100,8 @@ func init() { // nolint: maligned type opts struct { + forwardIndexThreshold float64 + forwardIndexProbability float64 insertMode InsertMode clockOpts clock.Options instrumentOpts instrument.Options @@ -379,3 +381,23 @@ func (o *opts) SetReadThroughSegmentOptions(value ReadThroughSegmentOptions) Opt func (o *opts) ReadThroughSegmentOptions() ReadThroughSegmentOptions { return o.readThroughSegmentOptions } + +func (o *opts) SetForwardIndexProbability(value float64) Options { + opts := *o + opts.forwardIndexProbability = value + return &opts +} + +func (o *opts) ForwardIndexProbability() float64 { + return o.forwardIndexProbability +} + +func (o *opts) SetForwardIndexThreshold(value float64) Options { + opts := *o + opts.forwardIndexThreshold = value + return &opts +} + +func (o *opts) ForwardIndexThreshold() float64 { + return o.forwardIndexThreshold +} diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index c1fae3ee2d..687a10071f 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -850,4 +850,17 @@ type Options interface { // ReadThroughSegmentOptions returns the read through segment cache options. ReadThroughSegmentOptions() ReadThroughSegmentOptions + + // SetForwardIndexProbability sets the probability chance for forward writes. + SetForwardIndexProbability(value float64) Options + + // ForwardIndexProbability returns the probability chance for forward writes. + ForwardIndexProbability() float64 + + // SetForwardIndexProbability sets the threshold for forward writes as a + // fraction of the bufferFuture. + SetForwardIndexThreshold(value float64) Options + + // ForwardIndexProbability returns the threshold for forward writes. + ForwardIndexThreshold() float64 } diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 289b9802b4..8ecaa79cce 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -591,8 +591,11 @@ func (n *dbNamespace) Write( n.metrics.write.ReportError(n.nowFn().Sub(callStart)) return ts.Series{}, false, err } + opts := series.WriteOptions{ + TruncateType: n.opts.TruncateType(), + } series, wasWritten, err := shard.Write(ctx, id, timestamp, - value, unit, annotation, series.WriteOptions{}) + value, unit, annotation, opts) n.metrics.write.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) return series, wasWritten, err } @@ -616,8 +619,11 @@ func (n *dbNamespace) WriteTagged( n.metrics.writeTagged.ReportError(n.nowFn().Sub(callStart)) return ts.Series{}, false, err } + opts := series.WriteOptions{ + TruncateType: n.opts.TruncateType(), + } series, wasWritten, err := shard.WriteTagged(ctx, id, tags, timestamp, - value, unit, annotation, series.WriteOptions{}) + value, unit, annotation, opts) n.metrics.writeTagged.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) return series, wasWritten, err } diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 98619d51a3..302f876f24 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -38,6 +38,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/storage/repair" + "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" xmetrics "github.com/m3db/m3/src/dbnode/x/metrics" @@ -97,7 +98,25 @@ func newTestNamespaceWithIDOpts( return ns.(*dbNamespace), closer } -func newTestNamespaceWithIndex(t *testing.T, index namespaceIndex) (*dbNamespace, closerFn) { +func newTestNamespaceWithOpts( + t *testing.T, + dopts Options, +) (*dbNamespace, closerFn) { + nsID, opts := defaultTestNs1ID, defaultTestNs1Opts + metadata := newTestNamespaceMetadataWithIDOpts(t, nsID, opts) + hashFn := func(identifier ident.ID) uint32 { return testShardIDs[0].ID() } + shardSet, err := sharding.NewShardSet(testShardIDs, hashFn) + require.NoError(t, err) + ns, err := newDatabaseNamespace(metadata, shardSet, nil, nil, nil, dopts) + require.NoError(t, err) + closer := dopts.RuntimeOptionsManager().Close + return ns.(*dbNamespace), closer +} + +func newTestNamespaceWithIndex( + t *testing.T, + index namespaceIndex, +) (*dbNamespace, closerFn) { ns, closer := newTestNamespace(t) if index != nil { ns.reverseIndex = index @@ -105,6 +124,20 @@ func newTestNamespaceWithIndex(t *testing.T, index namespaceIndex) (*dbNamespace return ns, closer } +func newTestNamespaceWithTruncateType( + t *testing.T, + index namespaceIndex, + truncateType series.TruncateType, +) (*dbNamespace, closerFn) { + opts := testDatabaseOptions(). + SetRuntimeOptionsManager(runtime.NewOptionsManager()). + SetTruncateType(truncateType) + + ns, closer := newTestNamespaceWithOpts(t, opts) + ns.reverseIndex = index + return ns, closer +} + func TestNamespaceName(t *testing.T) { ns, closer := newTestNamespace(t) defer closer() @@ -180,23 +213,29 @@ func TestNamespaceWriteShardOwned(t *testing.T) { unit := xtime.Second ant := []byte(nil) - ns, closer := newTestNamespace(t) - defer closer() - shard := NewMockdatabaseShard(ctrl) - shard.EXPECT().Write(ctx, id, now, val, unit, ant, gomock.Any()). - Return(ts.Series{}, true, nil).Times(1) - shard.EXPECT().Write(ctx, id, now, val, unit, ant, gomock.Any()). - Return(ts.Series{}, false, nil).Times(1) + truncateTypes := []series.TruncateType{series.TypeBlock, series.TypeNone} + for _, truncateType := range truncateTypes { + ns, closer := newTestNamespaceWithTruncateType(t, nil, truncateType) + defer closer() + shard := NewMockdatabaseShard(ctrl) + opts := series.WriteOptions{ + TruncateType: truncateType, + } + shard.EXPECT().Write(ctx, id, now, val, unit, ant, opts). + Return(ts.Series{}, true, nil).Times(1) + shard.EXPECT().Write(ctx, id, now, val, unit, ant, opts). + Return(ts.Series{}, false, nil).Times(1) - ns.shards[testShardIDs[0].ID()] = shard + ns.shards[testShardIDs[0].ID()] = shard - _, wasWritten, err := ns.Write(ctx, id, now, val, unit, ant) - require.NoError(t, err) - require.True(t, wasWritten) + _, wasWritten, err := ns.Write(ctx, id, now, val, unit, ant) + require.NoError(t, err) + require.True(t, wasWritten) - _, wasWritten, err = ns.Write(ctx, id, now, val, unit, ant) - require.NoError(t, err) - require.False(t, wasWritten) + _, wasWritten, err = ns.Write(ctx, id, now, val, unit, ant) + require.NoError(t, err) + require.False(t, wasWritten) + } } func TestNamespaceReadEncodedShardNotOwned(t *testing.T) { @@ -1064,34 +1103,43 @@ func TestNamespaceIndexInsert(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - idx := NewMocknamespaceIndex(ctrl) - ns, closer := newTestNamespaceWithIndex(t, idx) - defer closer() + truncateTypes := []series.TruncateType{series.TypeBlock, series.TypeNone} + for _, truncateType := range truncateTypes { + idx := NewMocknamespaceIndex(ctrl) - ctx := context.NewContext() - now := time.Now() + ns, closer := newTestNamespaceWithTruncateType(t, idx, truncateType) + ns.reverseIndex = idx + defer closer() - shard := NewMockdatabaseShard(ctrl) - shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, - now, 1.0, xtime.Second, nil, gomock.Any()).Return(ts.Series{}, true, nil) - shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, - now, 1.0, xtime.Second, nil, gomock.Any()).Return(ts.Series{}, false, nil) + ctx := context.NewContext() + now := time.Now() - ns.shards[testShardIDs[0].ID()] = shard - - _, wasWritten, err := ns.WriteTagged(ctx, ident.StringID("a"), - ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) - require.NoError(t, err) - require.True(t, wasWritten) - - _, wasWritten, err = ns.WriteTagged(ctx, ident.StringID("a"), - ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) - require.NoError(t, err) - require.False(t, wasWritten) + shard := NewMockdatabaseShard(ctrl) - shard.EXPECT().Close() - idx.EXPECT().Close().Return(nil) - require.NoError(t, ns.Close()) + opts := series.WriteOptions{ + TruncateType: truncateType, + } + shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, + now, 1.0, xtime.Second, nil, opts).Return(ts.Series{}, true, nil) + shard.EXPECT().WriteTagged(ctx, ident.NewIDMatcher("a"), ident.EmptyTagIterator, + now, 1.0, xtime.Second, nil, opts).Return(ts.Series{}, false, nil) + + ns.shards[testShardIDs[0].ID()] = shard + + _, wasWritten, err := ns.WriteTagged(ctx, ident.StringID("a"), + ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) + require.NoError(t, err) + require.True(t, wasWritten) + + _, wasWritten, err = ns.WriteTagged(ctx, ident.StringID("a"), + ident.EmptyTagIterator, now, 1.0, xtime.Second, nil) + require.NoError(t, err) + require.False(t, wasWritten) + + shard.EXPECT().Close() + idx.EXPECT().Close().Return(nil) + require.NoError(t, ns.Close()) + } } func TestNamespaceIndexQuery(t *testing.T) { diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 4e117aa1a2..7890dc1237 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -118,6 +118,8 @@ type options struct { errThresholdForLoad int64 indexingEnabled bool repairEnabled bool + truncateType series.TruncateType + transformOptions series.WriteTransformOptions indexOpts index.Options repairOpts repair.Options newEncoderFn encoding.NewEncoderFn @@ -354,6 +356,28 @@ func (o *options) RepairEnabled() bool { return o.repairEnabled } +func (o *options) SetTruncateType(value series.TruncateType) Options { + opts := *o + opts.truncateType = value + return &opts +} + +func (o *options) TruncateType() series.TruncateType { + return o.truncateType +} + +func (o *options) SetWriteTransformOptions( + value series.WriteTransformOptions, +) Options { + opts := *o + opts.transformOptions = value + return &opts +} + +func (o *options) WriteTransformOptions() series.WriteTransformOptions { + return o.transformOptions +} + func (o *options) SetRepairOptions(value repair.Options) Options { opts := *o opts.repairOpts = value diff --git a/src/dbnode/storage/series/buffer.go b/src/dbnode/storage/series/buffer.go index f870f30afe..1545764ef4 100644 --- a/src/dbnode/storage/series/buffer.go +++ b/src/dbnode/storage/series/buffer.go @@ -254,6 +254,15 @@ func (b *dbBuffer) Write( blockStart := timestamp.Truncate(b.blockSize) buckets := b.bucketVersionsAtCreate(blockStart) b.putBucketVersionsInCache(buckets) + + if wOpts.TruncateType == TypeBlock { + timestamp = blockStart + } + + if wOpts.TransformOptions.ForceValueEnabled { + value = wOpts.TransformOptions.ForceValue + } + return buckets.write(timestamp, value, unit, annotation, writeType) } diff --git a/src/dbnode/storage/series/buffer_test.go b/src/dbnode/storage/series/buffer_test.go index e0c2ee0f84..d4b12cd3f7 100644 --- a/src/dbnode/storage/series/buffer_test.go +++ b/src/dbnode/storage/series/buffer_test.go @@ -475,6 +475,54 @@ func TestBufferBucketDuplicatePointsNotWrittenButUpserted(t *testing.T) { assertSegmentValuesEqual(t, expected, []xio.SegmentReader{stream}, opts) } +func TestIndexedBufferWriteOnlyWritesSinglePoint(t *testing.T) { + opts := newBufferTestOptions() + rops := opts.RetentionOptions() + curr := time.Now().Truncate(rops.BlockSize()) + opts = opts.SetClockOptions(opts.ClockOptions().SetNowFn(func() time.Time { + return curr + })) + buffer := newDatabaseBuffer().(*dbBuffer) + buffer.Reset(opts) + + data := []value{ + {curr.Add(secs(1)), 1, xtime.Second, nil}, + {curr.Add(secs(2)), 2, xtime.Second, nil}, + {curr.Add(secs(3)), 3, xtime.Second, nil}, + } + + forceValue := 1.0 + for i, v := range data { + ctx := context.NewContext() + writeOpts := WriteOptions{ + TruncateType: TypeBlock, + TransformOptions: WriteTransformOptions{ + ForceValueEnabled: true, + ForceValue: forceValue, + }, + } + wasWritten, err := buffer.Write(ctx, v.timestamp, v.value, v.unit, + v.annotation, writeOpts) + require.NoError(t, err) + expectedWrite := i == 0 + require.Equal(t, expectedWrite, wasWritten) + ctx.Close() + } + + ctx := context.NewContext() + defer ctx.Close() + + results, err := buffer.ReadEncoded(ctx, timeZero, timeDistantFuture) + assert.NoError(t, err) + assert.NotNil(t, results) + + ex := []value{ + {curr, forceValue, xtime.Second, nil}, + } + + assertValuesEqual(t, ex, results, opts) +} + func TestBufferFetchBlocks(t *testing.T) { b, opts, expected := newTestBufferBucketsWithData(t) ctx := opts.ContextPool().Get() diff --git a/src/dbnode/storage/series/truncate_type.go b/src/dbnode/storage/series/truncate_type.go new file mode 100644 index 0000000000..8a2665fda6 --- /dev/null +++ b/src/dbnode/storage/series/truncate_type.go @@ -0,0 +1,81 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package series + +import ( + "fmt" +) + +// TruncateType determines the scheme for truncating transforms. +type TruncateType uint8 + +const ( + TypeNone TruncateType = iota + TypeBlock +) + +var validTruncationTypes = []TruncateType{ + // TypeNone indicates that no truncation occurs. + TypeNone, + // TypeBlock truncates incoming writes to the block boundary immediately + // preceeding this point's timestamp. + TypeBlock, +} + +// Validate validates that the scheme type is valid. +func (t TruncateType) Validate() error { + if t >= TypeNone && t <= TypeBlock { + return nil + } + + return fmt.Errorf("invalid truncation type: '%v' valid types are: %v", + t, validTruncationTypes) +} + +func (t TruncateType) String() string { + switch t { + case TypeNone: + return "none" + case TypeBlock: + return "block" + default: + // Should never get here. + return "unknown" + } +} + +// UnmarshalYAML unmarshals a stored truncation type. +func (t *TruncateType) UnmarshalYAML(unmarshal func(interface{}) error) error { + var str string + if err := unmarshal(&str); err != nil { + return err + } + + for _, valid := range validTruncationTypes { + if str == valid.String() { + *t = valid + return nil + } + } + + *t = TypeNone + return nil +} diff --git a/src/dbnode/storage/series/truncate_type_test.go b/src/dbnode/storage/series/truncate_type_test.go new file mode 100644 index 0000000000..3a75e3df6e --- /dev/null +++ b/src/dbnode/storage/series/truncate_type_test.go @@ -0,0 +1,67 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package series + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + yaml "gopkg.in/yaml.v2" +) + +func TestTruncateTypeValidation(t *testing.T) { + err := TypeNone.Validate() + assert.NoError(t, err) + err = TypeBlock.Validate() + assert.NoError(t, err) + err = TruncateType(4).Validate() + assert.Error(t, err) +} + +func TestTruncateTypeUnmarshalYAML(t *testing.T) { + type config struct { + Type TruncateType `yaml:"type"` + } + + validParseSchemes := []TruncateType{ + TypeNone, + TypeBlock, + } + + for _, value := range validParseSchemes { + str := fmt.Sprintf("type: %s\n", value.String()) + + var cfg config + require.NoError(t, yaml.Unmarshal([]byte(str), &cfg)) + + assert.Equal(t, value, cfg.Type) + } + + var cfg config + // Bad type marshalls to TypeNone. + require.NoError(t, yaml.Unmarshal([]byte("type: not_a_known_type\n"), &cfg)) + assert.Equal(t, TypeNone, cfg.Type) + + require.NoError(t, yaml.Unmarshal([]byte(""), &cfg)) + assert.Equal(t, TypeNone, cfg.Type) +} diff --git a/src/dbnode/storage/series/types.go b/src/dbnode/storage/series/types.go index 7e4f65e6f3..95e8dec377 100644 --- a/src/dbnode/storage/series/types.go +++ b/src/dbnode/storage/series/types.go @@ -350,7 +350,21 @@ const ( // what write type makes sense for bootstraps. const BootstrapWriteType = WarmWrite +// WriteTransformOptions describes transforms to run on incoming writes. +type WriteTransformOptions struct { + // ForceValueEnabled indicates if the values for incoming writes + // should be forced to `ForceValue`. + ForceValueEnabled bool + // ForceValue is the value that incoming writes should be forced to. + ForceValue float64 +} + // WriteOptions provides a set of options for a write. type WriteOptions struct { + // SchemaDesc is the schema description. SchemaDesc namespace.SchemaDescr + // TruncateType is the truncation type for incoming writes. + TruncateType TruncateType + // TransformOptions describes transformation options for incoming writes. + TransformOptions WriteTransformOptions } diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index a68985cbe8..45f3fba264 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1081,7 +1081,6 @@ func (s *dbShard) newShardEntry( default: return nil, errNewShardEntryTagsTypeInvalid - } series := s.seriesPool.Get() diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index ea10e88c51..50e138a2c5 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -3020,6 +3020,62 @@ func (mr *MockOptionsMockRecorder) IndexOptions() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexOptions", reflect.TypeOf((*MockOptions)(nil).IndexOptions)) } +// SetTruncateType mocks base method +func (m *MockOptions) SetTruncateType(value series.TruncateType) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetTruncateType", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetTruncateType indicates an expected call of SetTruncateType +func (mr *MockOptionsMockRecorder) SetTruncateType(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetTruncateType", reflect.TypeOf((*MockOptions)(nil).SetTruncateType), value) +} + +// TruncateType mocks base method +func (m *MockOptions) TruncateType() series.TruncateType { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TruncateType") + ret0, _ := ret[0].(series.TruncateType) + return ret0 +} + +// TruncateType indicates an expected call of TruncateType +func (mr *MockOptionsMockRecorder) TruncateType() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TruncateType", reflect.TypeOf((*MockOptions)(nil).TruncateType)) +} + +// SetWriteTransformOptions mocks base method +func (m *MockOptions) SetWriteTransformOptions(value series.WriteTransformOptions) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetWriteTransformOptions", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetWriteTransformOptions indicates an expected call of SetWriteTransformOptions +func (mr *MockOptionsMockRecorder) SetWriteTransformOptions(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetWriteTransformOptions", reflect.TypeOf((*MockOptions)(nil).SetWriteTransformOptions), value) +} + +// WriteTransformOptions mocks base method +func (m *MockOptions) WriteTransformOptions() series.WriteTransformOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteTransformOptions") + ret0, _ := ret[0].(series.WriteTransformOptions) + return ret0 +} + +// WriteTransformOptions indicates an expected call of WriteTransformOptions +func (mr *MockOptionsMockRecorder) WriteTransformOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteTransformOptions", reflect.TypeOf((*MockOptions)(nil).WriteTransformOptions)) +} + // SetRepairEnabled mocks base method func (m *MockOptions) SetRepairEnabled(b bool) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 6fab06e1eb..ef3413a5a0 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -782,6 +782,20 @@ type Options interface { // IndexOptions returns the indexing options. IndexOptions() index.Options + // SetTruncateType sets the truncation type for the database. + SetTruncateType(value series.TruncateType) Options + + // TruncateType returns the truncation type for the database. + TruncateType() series.TruncateType + + // SetWriteTransformOptions sets options for transforming incoming writes + // to the database. + SetWriteTransformOptions(value series.WriteTransformOptions) Options + + // WriteTransformOptions returns the options for transforming incoming writes + // to the database. + WriteTransformOptions() series.WriteTransformOptions + // SetRepairEnabled sets whether or not to enable the repair. SetRepairEnabled(b bool) Options