diff --git a/src/cmd/tools/verify_commitlogs/main/main.go b/src/cmd/tools/verify_commitlogs/main/main.go index 5cc3f7155e..925edc7370 100644 --- a/src/cmd/tools/verify_commitlogs/main/main.go +++ b/src/cmd/tools/verify_commitlogs/main/main.go @@ -255,7 +255,7 @@ func main() { nsID := ident.StringID(namespaceStr) runOpts := bootstrap.NewRunOptions(). // Dont save intermediate results - SetIncremental(false) + SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) nsMetadata, err := namespace.NewMetadata(nsID, namespace.NewOptions().SetRetentionOptions(retentionOpts)) if err != nil { log.Fatal(err.Error()) diff --git a/src/dbnode/integration/cluster_add_one_node_commitlog_test.go b/src/dbnode/integration/cluster_add_one_node_commitlog_test.go new file mode 100644 index 0000000000..90b12bd808 --- /dev/null +++ b/src/dbnode/integration/cluster_add_one_node_commitlog_test.go @@ -0,0 +1,35 @@ +// +build integration + +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "testing" +) + +func TestClusterAddOneNodeCommitlog(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + testClusterAddOneNode(t, true) +} diff --git a/src/dbnode/integration/cluster_add_one_node_test.go b/src/dbnode/integration/cluster_add_one_node_test.go index 65ea749cfe..9c8b6cda40 100644 --- a/src/dbnode/integration/cluster_add_one_node_test.go +++ b/src/dbnode/integration/cluster_add_one_node_test.go @@ -48,6 +48,10 @@ type idShard struct { } func TestClusterAddOneNode(t *testing.T) { + testClusterAddOneNode(t, false) +} + +func testClusterAddOneNode(t *testing.T, verifyCommitlogCanBootstrapAfterNodeJoin bool) { if testing.Short() { t.SkipNow() } @@ -258,4 +262,13 @@ func TestClusterAddOneNode(t *testing.T) { for i := range setups { verifySeriesMaps(t, setups[i], namesp.ID(), expectedSeriesMaps[i]) } + + if verifyCommitlogCanBootstrapAfterNodeJoin { + // Verify that the node that joined the cluster can immediately bootstrap + // the data it streamed from its peers from the commit log as soon as + // the bootstrapping process completes. + require.NoError(t, setups[1].stopServer()) + startServerWithNewInspection(t, opts, setups[1]) + verifySeriesMaps(t, setups[1], namesp.ID(), expectedSeriesMaps[1]) + } } diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index 5068cd01de..5f7bdb69a1 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -250,7 +250,7 @@ func newDefaultBootstrappableTestSetups( SetAdminClient(adminClient). SetFetchBlocksMetadataEndpointVersion(setupOpts[i].fetchBlocksMetadataEndpointVersion). // DatabaseBlockRetrieverManager and PersistManager need to be set or we will never execute - // the incremental path + // the persist bootstrapping path SetDatabaseBlockRetrieverManager(setup.storageOpts.DatabaseBlockRetrieverManager()). SetPersistManager(setup.storageOpts.PersistManager()). SetRuntimeOptionsManager(runtimeOptsMgr) diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index 153b234c6b..20e63574c1 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -236,28 +236,28 @@ func (m *MockRunOptions) EXPECT() *MockRunOptionsMockRecorder { return m.recorder } -// SetIncremental mocks base method -func (m *MockRunOptions) SetIncremental(value bool) RunOptions { - ret := m.ctrl.Call(m, "SetIncremental", value) +// SetPersistConfig mocks base method +func (m *MockRunOptions) SetPersistConfig(value PersistConfig) RunOptions { + ret := m.ctrl.Call(m, "SetPersistConfig", value) ret0, _ := ret[0].(RunOptions) return ret0 } -// SetIncremental indicates an expected call of SetIncremental -func (mr *MockRunOptionsMockRecorder) SetIncremental(value interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIncremental", reflect.TypeOf((*MockRunOptions)(nil).SetIncremental), value) +// SetPersistConfig indicates an expected call of SetPersistConfig +func (mr *MockRunOptionsMockRecorder) SetPersistConfig(value interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetPersistConfig", reflect.TypeOf((*MockRunOptions)(nil).SetPersistConfig), value) } -// Incremental mocks base method -func (m *MockRunOptions) Incremental() bool { - ret := m.ctrl.Call(m, "Incremental") - ret0, _ := ret[0].(bool) +// PersistConfig mocks base method +func (m *MockRunOptions) PersistConfig() PersistConfig { + ret := m.ctrl.Call(m, "PersistConfig") + ret0, _ := ret[0].(PersistConfig) return ret0 } -// Incremental indicates an expected call of Incremental -func (mr *MockRunOptionsMockRecorder) Incremental() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Incremental", reflect.TypeOf((*MockRunOptions)(nil).Incremental)) +// PersistConfig indicates an expected call of PersistConfig +func (mr *MockRunOptionsMockRecorder) PersistConfig() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PersistConfig", reflect.TypeOf((*MockRunOptions)(nil).PersistConfig)) } // SetCacheSeriesMetadata mocks base method diff --git a/src/dbnode/storage/bootstrap/bootstrapper/README.md b/src/dbnode/storage/bootstrap/bootstrapper/README.md index 8e1f59dddf..2be0917566 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/README.md +++ b/src/dbnode/storage/bootstrap/bootstrapper/README.md @@ -22,4 +22,4 @@ The peers bootstrapper similarly bootstraps all the data from peers that the fil For the recently read policy the filesystem bootstrapper will simply fulfill the time ranges requested matching without actually loading the series and blocks from the files it discovers. This relies on data been fetched lazily from the filesystem when data is required for a series that does not live on heap. -The peers bootstrapper will bootstrap all time ranges requested, and if performing an incremental bootstrap for a time range will write the data to disk and then remove the results from memory. An incremental bootstrap is used for any data that is immutable at the time that bootstrapping commences. For time ranges that are mutable the series and blocks are returned directly as a result from the bootstrapper. +The peers bootstrapper will bootstrap all time ranges requested, and if performing a bootstrap with persistence enabled for a time range, will write the data to disk and then remove the results from memory. A bootstrap with persistence enabled is used for any data that is immutable at the time that bootstrapping commences. For time ranges that are mutable the peer bootstrapper will still write the data out to disk in a durable manner, but in the form of a snapshot, and the series and blocks will still be returned directly as a result from the bootstrapper. This enables the commit log bootstrapper to recover the data in case the node shuts down before the in-memory data can be flushed. diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go index 6a50f694cd..df5762d9d3 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go @@ -44,7 +44,8 @@ var ( testNamespaceID = ident.StringID("testNamespace") testTargetStart = time.Now() testShard = uint32(0) - testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(false) + testDefaultRunOpts = bootstrap.NewRunOptions(). + SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) ) func testNsMetadata(t *testing.T) namespace.Metadata { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go index f47cb335f4..9a68b8a2ee 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go @@ -50,7 +50,7 @@ import ( var ( testNamespaceID = ident.StringID("testnamespace") - testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(true) + testDefaultRunOpts = bootstrap.NewRunOptions() minCommitLogRetention = 10 * time.Minute ) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 15b6f77ae9..1dbd78604e 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -220,15 +220,15 @@ func (s *fileSystemSource) enqueueReaders( // Close the readers ch if and only if all readers are enqueued defer close(readersCh) - indexIncrementalBootstrap := run == bootstrapIndexRunType && runOpts.Incremental() - if !indexIncrementalBootstrap { + shouldPersistIndexBootstrap := run == bootstrapIndexRunType && s.shouldPersist(runOpts) + if !shouldPersistIndexBootstrap { // Normal run, open readers s.enqueueReadersGroupedByBlockSize(ns, run, runOpts, shardsTimeRanges, readerPool, readersCh) return } - // If the run is an index bootstrap using incremental bootstrapping + // If the run is an index bootstrap with the persist configuration enabled // then we need to write out the metadata into FSTs that we store on disk, // to avoid creating any one single huge FST at once we bucket the // shards into number of buckets @@ -594,10 +594,12 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( } } - incremental := runOpts.Incremental() - noneRemaining := remainingRanges.IsEmpty() - if run == bootstrapIndexRunType && incremental && noneRemaining { - err := s.incrementalBootstrapIndexSegment(ns, requestedRanges, runResult) + var ( + shouldPersist = s.shouldPersist(runOpts) + noneRemaining = remainingRanges.IsEmpty() + ) + if run == bootstrapIndexRunType && shouldPersist && noneRemaining { + err := s.persistBootstrapIndexSegment(ns, requestedRanges, runResult) if err != nil { iopts := s.opts.ResultOptions().InstrumentOptions() log := instrument.EmitInvariantViolationAndGetLogger(iopts) @@ -605,7 +607,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( xlog.NewField("namespace", ns.ID().String()), xlog.NewField("requestedRanges", requestedRanges.String()), xlog.NewField("error", err.Error()), - ).Error("incremental fs index bootstrap failed") + ).Error("persist fs index bootstrap failed") } } @@ -755,12 +757,12 @@ func (s *fileSystemSource) readNextEntryAndIndex( return err } -func (s *fileSystemSource) incrementalBootstrapIndexSegment( +func (s *fileSystemSource) persistBootstrapIndexSegment( ns namespace.Metadata, requestedRanges result.ShardTimeRanges, runResult *runResult, ) error { - // If we're performing an index run in incremental mode + // If we're performing an index run with persistence enabled // determine if we covered a full block exactly (which should // occur since we always group readers by block size) min, max := requestedRanges.MinMax() @@ -839,7 +841,7 @@ func (s *fileSystemSource) incrementalBootstrapIndexSegment( requireFulfilled.Subtract(fulfilled) exactStartEnd := min.Equal(blockStart) && max.Equal(blockStart.Add(blockSize)) if !exactStartEnd || !requireFulfilled.IsEmpty() { - return fmt.Errorf("incremental fs index bootstrap invalid ranges to persist: expected=%v, actual=%v, fulfilled=%v", + return fmt.Errorf("persistent fs index bootstrap invalid ranges to persist: expected=%v, actual=%v, fulfilled=%v", expectedRanges.String(), requestedRanges.String(), fulfilled.String()) } @@ -1155,6 +1157,11 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( return res, nil } +func (s *fileSystemSource) shouldPersist(runOpts bootstrap.RunOptions) bool { + persistConfig := runOpts.PersistConfig() + return persistConfig.Enabled && persistConfig.FileSetType == persist.FileSetFlushType +} + type timeWindowReaders struct { ranges result.ShardTimeRanges readers map[shardID]shardReaders diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go index 6a760c1484..3abdafe67c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go @@ -48,15 +48,16 @@ import ( ) var ( - testShard = uint32(0) - testNs1ID = ident.StringID("testNs") - testBlockSize = 2 * time.Hour - testIndexBlockSize = 4 * time.Hour - testStart = time.Now().Truncate(testBlockSize) - testFileMode = os.FileMode(0666) - testDirMode = os.ModeDir | os.FileMode(0755) - testWriterBufferSize = 10 - testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(false) + testShard = uint32(0) + testNs1ID = ident.StringID("testNs") + testBlockSize = 2 * time.Hour + testIndexBlockSize = 4 * time.Hour + testStart = time.Now().Truncate(testBlockSize) + testFileMode = os.FileMode(0666) + testDirMode = os.ModeDir | os.FileMode(0755) + testWriterBufferSize = 10 + testDefaultRunOpts = bootstrap.NewRunOptions(). + SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) testDefaultOpts = NewOptions().SetResultOptions(testDefaultResultOpts) ) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go index deee9d4ef3..bccd65189a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/namespace" @@ -270,7 +271,7 @@ func TestBootstrapIndex(t *testing.T) { validateGoodTaggedSeries(t, times.start, indexResults) } -func TestBootstrapIndexIncremental(t *testing.T) { +func TestBootstrapIndexWithPersist(t *testing.T) { dir := createTempDir(t) defer os.RemoveAll(dir) @@ -284,7 +285,8 @@ func TestBootstrapIndexIncremental(t *testing.T) { scope := tally.NewTestScope("", nil) opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope)) - runOpts := testDefaultRunOpts.SetIncremental(true) + runOpts := testDefaultRunOpts. + SetPersistConfig(bootstrap.PersistConfig{Enabled: true}) src := newFileSystemSource(opts).(*fileSystemSource) res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges, @@ -332,7 +334,61 @@ func TestBootstrapIndexIncremental(t *testing.T) { require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-write+"].Value()) } -func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) { +func TestBootstrapIndexIgnoresPersistConfigIfSnapshotType(t *testing.T) { + dir := createTempDir(t) + defer os.RemoveAll(dir) + + times := newTestBootstrapIndexTimes(testTimesOptions{ + numBlocks: 2, + }) + + writeTSDBGoodTaggedSeriesDataFiles(t, dir, testNs1ID, times.start) + + opts := newTestOptionsWithPersistManager(t, dir) + scope := tally.NewTestScope("", nil) + opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope)) + + runOpts := testDefaultRunOpts. + SetPersistConfig(bootstrap.PersistConfig{ + Enabled: true, + FileSetType: persist.FileSetSnapshotType, + }) + + src := newFileSystemSource(opts).(*fileSystemSource) + res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges, + runOpts) + require.NoError(t, err) + + // Check that not segments were written out + infoFiles := fs.ReadIndexInfoFiles(src.fsopts.FilePathPrefix(), testNs1ID, + src.fsopts.InfoReaderBufferSize()) + require.Equal(t, 0, len(infoFiles)) + + indexResults := res.IndexResults() + + // Check that both segments are mutable + block, ok := indexResults[xtime.ToUnixNano(times.start)] + require.True(t, ok) + require.Equal(t, 1, len(block.Segments())) + _, mutable := block.Segments()[0].(segment.MutableSegment) + require.True(t, mutable) + + block, ok = indexResults[xtime.ToUnixNano(times.start.Add(testIndexBlockSize))] + require.True(t, ok) + require.Equal(t, 1, len(block.Segments())) + _, mutable = block.Segments()[0].(segment.MutableSegment) + require.True(t, mutable) + + // Validate results + validateGoodTaggedSeries(t, times.start, indexResults) + + // Validate that no index blocks were read from disk and that no files were written out + counters := scope.Snapshot().Counters() + require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-read+"].Value()) + require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-write+"].Value()) +} + +func TestBootstrapIndexWithPersistPrefersPersistedIndexBlocks(t *testing.T) { dir := createTempDir(t) defer os.RemoveAll(dir) @@ -353,7 +409,8 @@ func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) { scope := tally.NewTestScope("", nil) opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope)) - runOpts := testDefaultRunOpts.SetIncremental(true) + runOpts := testDefaultRunOpts. + SetPersistConfig(bootstrap.PersistConfig{Enabled: true}) src := newFileSystemSource(opts).(*fileSystemSource) res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges, @@ -381,7 +438,7 @@ func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) { validateGoodTaggedSeries(t, times.start, indexResults) // Validate that read the block and no blocks were written - // (ensure incremental didn't write it back out again) + // (ensure persist config didn't write it back out again) counters := scope.Snapshot().Counters() require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-read+"].Value()) require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-write+"].Value()) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go index cb037cc21d..dd32487655 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go @@ -54,11 +54,11 @@ type Options interface { FilesystemOptions() fs.Options // SetPersistManager sets the persistence manager used to flush blocks - // when performing an incremental bootstrap run. + // when performing a bootstrap run with persistence enabled. SetPersistManager(value persist.Manager) Options // PersistManager returns the persistence manager used to flush blocks - // when performing an incremental bootstrap run. + // when performing a bootstrap run with persistence enabled. PersistManager() persist.Manager // SetBoostrapDataNumProcessors sets the number of processors for CPU-bound diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go index 9ec8d9ccbd..43b83a2b93 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go @@ -34,8 +34,8 @@ import ( var ( defaultDefaultShardConcurrency = runtime.NumCPU() - defaultIncrementalShardConcurrency = int(math.Max(1, float64(runtime.NumCPU())/2)) - defaultIncrementalPersistMaxQueueSize = 0 + defaultShardPersistenceConcurrency = int(math.Max(1, float64(runtime.NumCPU())/2)) + defaultPersistenceMaxQueueSize = 0 defaultFetchBlocksMetadataEndpointVersion = client.FetchBlocksMetadataEndpointV1 ) @@ -49,8 +49,8 @@ type options struct { resultOpts result.Options client client.AdminClient defaultShardConcurrency int - incrementalShardConcurrency int - incrementalPersistMaxQueueSize int + shardPersistenceConcurrency int + persistenceMaxQueueSize int persistManager persist.Manager blockRetrieverManager block.DatabaseBlockRetrieverManager fetchBlocksMetadataEndpointVersion client.FetchBlocksMetadataEndpointVersion @@ -62,8 +62,8 @@ func NewOptions() Options { return &options{ resultOpts: result.NewOptions(), defaultShardConcurrency: defaultDefaultShardConcurrency, - incrementalShardConcurrency: defaultIncrementalShardConcurrency, - incrementalPersistMaxQueueSize: defaultIncrementalPersistMaxQueueSize, + shardPersistenceConcurrency: defaultShardPersistenceConcurrency, + persistenceMaxQueueSize: defaultPersistenceMaxQueueSize, fetchBlocksMetadataEndpointVersion: defaultFetchBlocksMetadataEndpointVersion, } } @@ -111,24 +111,24 @@ func (o *options) DefaultShardConcurrency() int { return o.defaultShardConcurrency } -func (o *options) SetIncrementalShardConcurrency(value int) Options { +func (o *options) SetShardPersistenceConcurrency(value int) Options { opts := *o - opts.incrementalShardConcurrency = value + opts.shardPersistenceConcurrency = value return &opts } -func (o *options) IncrementalShardConcurrency() int { - return o.incrementalShardConcurrency +func (o *options) ShardPersistenceConcurrency() int { + return o.shardPersistenceConcurrency } -func (o *options) SetIncrementalPersistMaxQueueSize(value int) Options { +func (o *options) SetPersistenceMaxQueueSize(value int) Options { opts := *o - opts.incrementalPersistMaxQueueSize = value + opts.persistenceMaxQueueSize = value return &opts } -func (o *options) IncrementalPersistMaxQueueSize() int { - return o.incrementalPersistMaxQueueSize +func (o *options) PersistenceMaxQueueSize() int { + return o.persistenceMaxQueueSize } func (o *options) SetPersistManager(value persist.Manager) Options { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index ef12104442..316b34cd8a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -48,7 +48,7 @@ type peersSource struct { nowFn clock.NowFn } -type incrementalFlush struct { +type persistenceFlush struct { nsMetadata namespace.Metadata shard uint32 shardRetrieverMgr block.DatabaseShardBlockRetrieverManager @@ -82,6 +82,8 @@ func (s *peersSource) AvailableData( shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) result.ShardTimeRanges { + // TODO: Call validateRunOpts here when we modify this interface + // to support returning errors. return s.peerAvailability(nsMetadata, shardsTimeRanges, runOpts) } @@ -90,6 +92,10 @@ func (s *peersSource) ReadData( shardsTimeRanges result.ShardTimeRanges, opts bootstrap.RunOptions, ) (result.DataBootstrapResult, error) { + if err := s.validateRunOpts(opts); err != nil { + return nil, err + } + if shardsTimeRanges.IsEmpty() { return result.NewDataBootstrapResult(), nil } @@ -99,19 +105,20 @@ func (s *peersSource) ReadData( blockRetriever block.DatabaseBlockRetriever shardRetrieverMgr block.DatabaseShardBlockRetrieverManager persistFlush persist.DataFlush - incremental = false + shouldPersist = false seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() + persistConfig = opts.PersistConfig() ) - if opts.Incremental() && seriesCachePolicy != series.CacheAll { + if persistConfig.Enabled && seriesCachePolicy != series.CacheAll { retrieverMgr := s.opts.DatabaseBlockRetrieverManager() persistManager := s.opts.PersistManager() // Neither of these should ever happen if seriesCachePolicy != series.CacheAll && retrieverMgr == nil { - s.log.Fatal("tried to perform incremental flush without retriever manager") + s.log.Fatal("tried to perform a bootstrap with persistence without retriever manager") } if seriesCachePolicy != series.CacheAll && persistManager == nil { - s.log.Fatal("tried to perform incremental flush without persist manager") + s.log.Fatal("tried to perform a bootstrap with persistence without persist manager") } s.log.WithFields( @@ -130,7 +137,7 @@ func (s *peersSource) ReadData( defer persist.DoneData() - incremental = true + shouldPersist = true blockRetriever = r shardRetrieverMgr = block.NewDatabaseShardBlockRetrieverManager(r) persistFlush = persist @@ -147,26 +154,26 @@ func (s *peersSource) ReadData( var ( resultLock sync.Mutex wg sync.WaitGroup - incrementalWorkerDoneCh = make(chan struct{}) - incrementalMaxQueue = s.opts.IncrementalPersistMaxQueueSize() - incrementalQueue = make(chan incrementalFlush, incrementalMaxQueue) + persistenceWorkerDoneCh = make(chan struct{}) + persistenceMaxQueueSize = s.opts.PersistenceMaxQueueSize() + persistenceQueue = make(chan persistenceFlush, persistenceMaxQueueSize) resultOpts = s.opts.ResultOptions() count = len(shardsTimeRanges) concurrency = s.opts.DefaultShardConcurrency() blockSize = nsMetadata.Options().RetentionOptions().BlockSize() ) - if incremental { - concurrency = s.opts.IncrementalShardConcurrency() + if shouldPersist { + concurrency = s.opts.ShardPersistenceConcurrency() } s.log.WithFields( xlog.NewField("shards", count), xlog.NewField("concurrency", concurrency), - xlog.NewField("incremental", incremental), + xlog.NewField("shouldPersist", shouldPersist), ).Infof("peers bootstrapper bootstrapping shards for ranges") - if incremental { - go s.startIncrementalQueueWorkerLoop( - incrementalWorkerDoneCh, incrementalQueue, persistFlush, result, &resultLock) + if shouldPersist { + go s.startPersistenceQueueWorkerLoop( + opts, persistenceWorkerDoneCh, persistenceQueue, persistFlush, result, &resultLock) } workers := xsync.NewWorkerPool(concurrency) @@ -177,20 +184,20 @@ func (s *peersSource) ReadData( workers.Go(func() { defer wg.Done() s.fetchBootstrapBlocksFromPeers(shard, ranges, nsMetadata, session, - resultOpts, result, &resultLock, incremental, incrementalQueue, + resultOpts, result, &resultLock, shouldPersist, persistenceQueue, shardRetrieverMgr, blockSize) }) } wg.Wait() - close(incrementalQueue) - if incremental { - // Wait for the incrementalQueueWorker to finish incrementally flushing everything - <-incrementalWorkerDoneCh + close(persistenceQueue) + if shouldPersist { + // Wait for the persistenceQueueWorker to finish flushing everything + <-persistenceWorkerDoneCh } - if incremental { - // Now cache the incremental results + if shouldPersist { + // Now cache the flushed results err := s.cacheShardIndices(shardsTimeRanges, blockRetriever) if err != nil { return nil, err @@ -200,37 +207,38 @@ func (s *peersSource) ReadData( return result, nil } -// startIncrementalQueueWorkerLoop is meant to be run in its own goroutine, and it creates a worker that -// loops through the incrementalQueue and performs an incrementalFlush for each entry, ensuring that -// no more than one incremental flush is ever happening at once. Once the incrementalQueue channel +// startPersistenceQueueWorkerLoop is meant to be run in its own goroutine, and it creates a worker that +// loops through the persistenceQueue and performs a flush for each entry, ensuring that +// no more than one flush is ever happening at once. Once the persistenceQueue channel // is closed, and the worker has completed flushing all the remaining entries, it will close the // provided doneCh so that callers can block until everything has been successfully flushed. -func (s *peersSource) startIncrementalQueueWorkerLoop( +func (s *peersSource) startPersistenceQueueWorkerLoop( + opts bootstrap.RunOptions, doneCh chan struct{}, - incrementalQueue chan incrementalFlush, + persistenceQueue chan persistenceFlush, persistFlush persist.DataFlush, bootstrapResult result.DataBootstrapResult, lock *sync.Mutex, ) { - // If performing an incremental bootstrap then flush one - // at a time as shard results are gathered - for flush := range incrementalQueue { - err := s.incrementalFlush(persistFlush, flush.nsMetadata, flush.shard, + // If performing a bootstrap with persistence enabled then flush one + // at a time as shard results are gathered. + for flush := range persistenceQueue { + err := s.flush(opts, persistFlush, flush.nsMetadata, flush.shard, flush.shardRetrieverMgr, flush.shardResult, flush.timeRange) if err == nil { - // Safe to add to the shared bootstrap result now + // Safe to add to the shared bootstrap result now. lock.Lock() bootstrapResult.Add(flush.shard, flush.shardResult, xtime.Ranges{}) lock.Unlock() continue } - // Remove results and make unfulfilled if an error occurred + // Remove results and make unfulfilled if an error occurred. s.log.WithFields( xlog.NewField("error", err.Error()), - ).Errorf("peers bootstrapper incremental flush encountered error") + ).Errorf("peers bootstrapper bootstrap with persistence flush encountered error") - // Make unfulfilled + // Make unfulfilled. lock.Lock() bootstrapResult.Add(flush.shard, nil, xtime.NewRanges(flush.timeRange)) lock.Unlock() @@ -240,10 +248,10 @@ func (s *peersSource) startIncrementalQueueWorkerLoop( // fetchBootstrapBlocksFromPeers loops through all the provided ranges for a given shard and // fetches all the bootstrap blocks from the appropriate peers. -// Non-incremental case: Immediately add the results to the bootstrap result -// Incremental case: Don't add the results yet, but push an incrementalFlush into the -// incremental queue. The incrementalQueue worker will eventually -// add the results once its performed the incremental flush. +// Persistence enabled case: Immediately add the results to the bootstrap result +// Persistence disabled case: Don't add the results yet, but push a flush into the +// persistenceQueue. The persistenceQueue worker will eventually +// add the results once its performed the flush. func (s *peersSource) fetchBootstrapBlocksFromPeers( shard uint32, ranges xtime.Ranges, @@ -252,8 +260,8 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( bopts result.Options, bootstrapResult result.DataBootstrapResult, lock *sync.Mutex, - incremental bool, - incrementalQueue chan incrementalFlush, + shouldPersist bool, + persistenceQueue chan persistenceFlush, shardRetrieverMgr block.DatabaseShardBlockRetrieverManager, blockSize time.Duration, ) { @@ -277,8 +285,8 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( continue } - if incremental { - incrementalQueue <- incrementalFlush{ + if shouldPersist { + persistenceQueue <- persistenceFlush{ nsMetadata: nsMetadata, shard: shard, shardRetrieverMgr: shardRetrieverMgr, @@ -288,7 +296,7 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( continue } - // If not waiting to incremental flush, add straight away to bootstrap result + // If not waiting to flush, add straight away to bootstrap result lock.Lock() bootstrapResult.Add(shard, shardResult, xtime.Ranges{}) lock.Unlock() @@ -325,10 +333,9 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( } } -// incrementalFlush is used to incrementally flush peer-bootstrapped shards -// to disk as they finish so that we're not (necessarily) holding everything -// in memory at once. -// incrementalFlush starts by looping through every block in a timerange for +// flush is used to flush peer-bootstrapped shards to disk as they finish so +// that we're not (necessarily) holding everything in memory at once. +// flush starts by looping through every block in a timerange for // a given shard, and then subsequently looping through every series in that // shard/block and flushing it to disk. Depending on the series caching policy, // the series will either be held in memory, or removed from memory once @@ -343,7 +350,8 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( // (since all their corresponding blocks have been removed anyways) to prevent // a huge memory spike caused by adding lots of unused series to the Shard // object and then immediately evicting them in the next tick. -func (s *peersSource) incrementalFlush( +func (s *peersSource) flush( + opts bootstrap.RunOptions, flush persist.DataFlush, nsMetadata namespace.Metadata, shard uint32, @@ -357,6 +365,7 @@ func (s *peersSource) incrementalFlush( shardRetriever = shardRetrieverMgr.ShardRetriever(shard) tmpCtx = context.NewContext() seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() + persistConfig = opts.PersistConfig() ) if seriesCachePolicy == series.CacheAllMetadata && shardRetriever == nil { return fmt.Errorf("shard retriever missing for shard: %d", shard) @@ -365,6 +374,7 @@ func (s *peersSource) incrementalFlush( for start := tr.Start; start.Before(tr.End); start = start.Add(blockSize) { prepareOpts := persist.DataPrepareOptions{ NamespaceMetadata: nsMetadata, + FileSetType: persistConfig.FileSetType, Shard: shard, BlockStart: start, // If we've peer bootstrapped this shard/block combination AND the fileset @@ -422,28 +432,42 @@ func (s *peersSource) incrementalFlush( break } - switch seriesCachePolicy { - case series.CacheAll: - // Leave the blocks in the shard result, we need to return all blocks - // so we can cache in memory - case series.CacheAllMetadata: - // NB(r): We can now make the flushed blocks retrievable, note that we - // explicitly perform another loop here and lookup the block again - // to avoid a large expensive allocation to hold onto the blocks - // that we just flushed that would have to be pooled. - // We are explicitly trading CPU time here for lower GC pressure. - metadata := block.RetrievableBlockMetadata{ - ID: s.ID, - Length: bl.Len(), - Checksum: checksum, + switch persistConfig.FileSetType { + case persist.FileSetFlushType: + switch seriesCachePolicy { + case series.CacheAll: + // Leave the blocks in the shard result, we need to return all blocks + // so we can cache in memory + case series.CacheAllMetadata: + // NB(r): We can now make the flushed blocks retrievable, note that we + // explicitly perform another loop here and lookup the block again + // to avoid a large expensive allocation to hold onto the blocks + // that we just flushed that would have to be pooled. + // We are explicitly trading CPU time here for lower GC pressure. + metadata := block.RetrievableBlockMetadata{ + ID: s.ID, + Length: bl.Len(), + Checksum: checksum, + } + bl.ResetRetrievable(start, blockSize, shardRetriever, metadata) + default: + // Not caching the series or metadata in memory so finalize the block, + // better to do this as we loop through to make blocks return to the + // pool earlier than at the end of this flush cycle + s.Blocks.RemoveBlockAt(start) + bl.Close() } - bl.ResetRetrievable(start, blockSize, shardRetriever, metadata) + case persist.FileSetSnapshotType: + // Unlike the FileSetFlushType scenario, in this case the caching + // strategy doesn't matter. Even if the LRU/RecentlyRead strategies are + // enabled, we still need to keep all the data in memory long enough for it + // to be flushed because the snapshot that we wrote out earlier will only ever + // be used if the node goes down again before we have a chance to flush the data + // from memory AND the commit log bootstrapper is set before the peers bootstrapper + // in the bootstrappers configuration. default: - // Not caching the series or metadata in memory so finalize the block, - // better to do this as we loop through to make blocks return to the - // pool earlier than at the end of this flush cycle - s.Blocks.RemoveBlockAt(start) - bl.Close() + // Should never happen + return fmt.Errorf("unknown FileSetFileType: %v", persistConfig.FileSetType) } } @@ -512,7 +536,7 @@ func (s *peersSource) cacheShardIndices( shardsTimeRanges result.ShardTimeRanges, blockRetriever block.DatabaseBlockRetriever, ) error { - // Now cache the incremental results + // Now cache the flushed results shards := make([]uint32, 0, len(shardsTimeRanges)) for shard := range shardsTimeRanges { shards = append(shards, shard) @@ -526,6 +550,8 @@ func (s *peersSource) AvailableIndex( shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) result.ShardTimeRanges { + // TODO: Call validateRunOpts here when we modify this interface + // to support returning errors. return s.peerAvailability(nsMetadata, shardsTimeRanges, runOpts) } @@ -534,6 +560,10 @@ func (s *peersSource) ReadIndex( shardsTimeRanges result.ShardTimeRanges, opts bootstrap.RunOptions, ) (result.IndexBootstrapResult, error) { + if err := s.validateRunOpts(opts); err != nil { + return nil, err + } + // FOLLOWUP(r): Try to reuse any metadata fetched during the ReadData(...) // call rather than going to the network again r := result.NewIndexBootstrapResult() @@ -786,3 +816,14 @@ func (s *peersSource) markIndexResultErrorAsUnfulfilled( } r.Add(result.IndexBlock{}, unfulfilled) } + +func (s *peersSource) validateRunOpts(runOpts bootstrap.RunOptions) error { + persistConfig := runOpts.PersistConfig() + if persistConfig.FileSetType != persist.FileSetFlushType && + persistConfig.FileSetType != persist.FileSetSnapshotType { + // Should never happen + return fmt.Errorf("unknown persist config fileset file type: %v", persistConfig.FileSetType) + } + + return nil +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go index 5a2d136d50..05a39c4c68 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go @@ -58,11 +58,13 @@ var ( return ns } - testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(false) - testIncrementalRunOpts = bootstrap.NewRunOptions().SetIncremental(true) - testBlockOpts = block.NewOptions() - testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) - testDefaultOpts = NewOptions(). + testDefaultRunOpts = bootstrap.NewRunOptions(). + SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) + testRunOptsWithPersist = bootstrap.NewRunOptions(). + SetPersistConfig(bootstrap.PersistConfig{Enabled: true}) + testBlockOpts = block.NewOptions() + testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) + testDefaultOpts = NewOptions(). SetResultOptions(testDefaultResultOpts) ) @@ -227,7 +229,7 @@ func TestPeersSourceReturnsFulfilledAndUnfulfilled(t *testing.T) { require.Equal(t, ropts.BlockSize(), block.BlockSize()) } -func TestPeersSourceIncrementalRun(t *testing.T) { +func TestPeersSourceRunWithPersist(t *testing.T) { for _, cachePolicy := range []series.CachePolicy{ series.CacheAllMetadata, series.CacheRecentlyRead, @@ -397,7 +399,7 @@ func TestPeersSourceIncrementalRun(t *testing.T) { 1: xtime.NewRanges(xtime.Range{Start: start, End: end}), } - r, err := src.ReadData(testNsMd, target, testIncrementalRunOpts) + r, err := src.ReadData(testNsMd, target, testRunOptsWithPersist) assert.NoError(t, err) require.True(t, r.Unfulfilled()[0].IsEmpty()) @@ -444,7 +446,7 @@ func TestPeersSourceIncrementalRun(t *testing.T) { } } -func TestPeersSourceMarksUnfulfilledOnIncrementalFlushErrors(t *testing.T) { +func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -736,7 +738,7 @@ func TestPeersSourceMarksUnfulfilledOnIncrementalFlushErrors(t *testing.T) { AddRange(xtime.Range{Start: midway, End: end}), } - r, err := src.ReadData(testNsMd, target, testIncrementalRunOpts) + r, err := src.ReadData(testNsMd, target, testRunOptsWithPersist) assert.NoError(t, err) assert.Equal(t, 0, len(r.ShardResults())) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go index 419ca8d6a4..c75d6b39fc 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go @@ -21,10 +21,12 @@ package peers import ( + "strings" "testing" "time" m3dbruntime "github.com/m3db/m3/src/dbnode/runtime" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" tu "github.com/m3db/m3/src/dbnode/topology/testutil" @@ -158,3 +160,35 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { }) } } + +func TestPeersSourceReturnsErrorIfUnknownPersistenceFileSetType(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + testNsMd = testNamespaceMetadata(t) + resultOpts = testDefaultResultOpts + opts = testDefaultOpts.SetResultOptions(resultOpts) + ropts = testNsMd.Options().RetentionOptions() + + start = time.Now().Add(-ropts.RetentionPeriod()).Truncate(ropts.BlockSize()) + end = start.Add(2 * ropts.BlockSize()) + ) + + src, err := newPeersSource(opts) + require.NoError(t, err) + + target := result.ShardTimeRanges{ + 0: xtime.NewRanges(xtime.Range{Start: start, End: end}), + 1: xtime.NewRanges(xtime.Range{Start: start, End: end}), + } + + runOpts := testRunOptsWithPersist.SetPersistConfig(bootstrap.PersistConfig{Enabled: true, FileSetType: 999}) + _, err = src.ReadData(testNsMd, target, runOpts) + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), "unknown persist config fileset file type")) + + _, err = src.ReadIndex(testNsMd, target, runOpts) + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), "unknown persist config fileset file type")) +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go index 18d05dbf8b..a01b4453ec 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go @@ -46,47 +46,53 @@ type Options interface { AdminClient() client.AdminClient // SetDefaultShardConcurrency sets the concurrency for - // bootstrapping shards when performing a non-incremental bootstrap. + // bootstrapping shards when performing a bootstrap with + // persistence enabled. SetDefaultShardConcurrency(value int) Options // DefaultShardConcurrency returns the concurrency for - // bootstrapping shards when performing a non-incremental bootstrap. + // bootstrapping shards when performing a bootstrap with + // persistence enabled. DefaultShardConcurrency() int - // SetIncrementalShardConcurrency sets the concurrency for - // bootstrapping shards when performing an incremental bootstrap. - SetIncrementalShardConcurrency(value int) Options + // SetShardPersistenceConcurrency sets the concurrency for + // bootstrapping shards when performing a bootstrap with + // persistence enabled. + SetShardPersistenceConcurrency(value int) Options - // IncrementalShardConcurrency returns the concurrency for - // bootstrapping shards when performing an incremental bootstrap. - IncrementalShardConcurrency() int + // ShardPersistenceConcurrency returns the concurrency for + // bootstrapping shards when performing a bootstrap with + // persistence enabled. + ShardPersistenceConcurrency() int - // SetIncrementalPersistMaxQueueSize sets the max queue for + // SetPersistenceMaxQueueSize sets the max queue for // bootstrapping shards waiting in line to persist without blocking // the concurrent shard fetchers. - SetIncrementalPersistMaxQueueSize(value int) Options + SetPersistenceMaxQueueSize(value int) Options - // IncrementalPersistMaxQueueSize returns the max queue for + // PersistenceMaxQueueSize returns the max queue for // bootstrapping shards waiting in line to persist without blocking // the concurrent shard fetchers. - IncrementalPersistMaxQueueSize() int + PersistenceMaxQueueSize() int // SetPersistManager sets the persistence manager used to flush blocks - // when performing an incremental bootstrap run. + // when performing a bootstrap with persistence. SetPersistManager(value persist.Manager) Options // PersistManager returns the persistence manager used to flush blocks - // when performing an incremental bootstrap run. + // when performing a bootstrap with persistence. PersistManager() persist.Manager // SetDatabaseBlockRetrieverManager sets the block retriever manager to - // pass to newly flushed blocks when performing an incremental bootstrap run. + // pass to newly flushed blocks when performing a bootstrap run with + // persistence enabled. SetDatabaseBlockRetrieverManager( value block.DatabaseBlockRetrieverManager, ) Options // NewBlockRetrieverFn returns the block retriever manager to - // pass to newly flushed blocks when performing an incremental bootstrap run. + // pass to newly flushed blocks when performing a bootstrap run with + // persistence enabled. DatabaseBlockRetrieverManager() block.DatabaseBlockRetrieverManager // SetFetchBlocksMetadataEndpointVersion sets the version of the fetch blocks diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index 9f184a11f6..b29295fb63 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/namespace" @@ -337,17 +338,30 @@ func (b bootstrapProcess) targetRanges( Add(opts.blockSize) // NB(r): We want the large initial time range bootstrapped to - // bootstrap incrementally so we don't keep the full raw + // bootstrap with persistence so we don't keep the full raw // data in process until we finish bootstrapping which could // cause the process to OOM. return []TargetRange{ { - Range: xtime.Range{Start: start, End: midPoint}, - RunOptions: b.newRunOptions().SetIncremental(true), + Range: xtime.Range{Start: start, End: midPoint}, + RunOptions: b.newRunOptions().SetPersistConfig(PersistConfig{ + Enabled: true, + // These blocks are no longer active, so we want to flush them + // to disk as we receive them so that we don't hold too much + // data in memory at once. + FileSetType: persist.FileSetFlushType, + }), }, { - Range: xtime.Range{Start: midPoint, End: cutover}, - RunOptions: b.newRunOptions().SetIncremental(false), + Range: xtime.Range{Start: midPoint, End: cutover}, + RunOptions: b.newRunOptions().SetPersistConfig(PersistConfig{ + Enabled: true, + // These blocks are still active so we'll have to keep them + // in memory, but we want to snapshot them as we receive them + // so that once bootstrapping completes we can still recover + // from just the commit log bootstrapper. + FileSetType: persist.FileSetSnapshotType, + }), }, } } diff --git a/src/dbnode/storage/bootstrap/run_options.go b/src/dbnode/storage/bootstrap/run_options.go index aed9864f69..ce6014eb96 100644 --- a/src/dbnode/storage/bootstrap/run_options.go +++ b/src/dbnode/storage/bootstrap/run_options.go @@ -22,14 +22,16 @@ package bootstrap import "github.com/m3db/m3/src/dbnode/topology" -const ( - // defaultIncremental declares the intent to by default not perform an - // incremental bootstrap. - defaultIncremental = false +var ( + // defaultPersistConfig declares the intent to by default to perform + // a bootstrap without persistence enabled. + defaultPersistConfig = PersistConfig{ + Enabled: false, + } ) type runOptions struct { - incremental bool + persistConfig PersistConfig cacheSeriesMetadata bool initialTopologyState *topology.StateSnapshot } @@ -37,20 +39,20 @@ type runOptions struct { // NewRunOptions creates new bootstrap run options func NewRunOptions() RunOptions { return &runOptions{ - incremental: defaultIncremental, + persistConfig: defaultPersistConfig, cacheSeriesMetadata: defaultCacheSeriesMetadata, initialTopologyState: nil, } } -func (o *runOptions) SetIncremental(value bool) RunOptions { +func (o *runOptions) SetPersistConfig(value PersistConfig) RunOptions { opts := *o - opts.incremental = value + opts.persistConfig = value return &opts } -func (o *runOptions) Incremental() bool { - return o.incremental +func (o *runOptions) PersistConfig() PersistConfig { + return o.persistConfig } func (o *runOptions) SetCacheSeriesMetadata(value bool) RunOptions { diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index 56dfc15468..a81f416922 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -24,6 +24,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/topology" @@ -68,6 +69,16 @@ type TargetRange struct { RunOptions RunOptions } +// PersistConfig is the configuration for a bootstrap with persistence. +type PersistConfig struct { + // If enabled bootstrappers are allowed to write out bootstrapped data + // to disk on their own instead of just returning result in-memory. + Enabled bool + // If enabled, what type of persistence files should be generated during + // the process. + FileSetType persist.FileSetType +} + // ProcessOptions is a set of options for a bootstrap provider. type ProcessOptions interface { // SetCacheSeriesMetadata sets whether bootstrappers created by this @@ -90,13 +101,11 @@ type ProcessOptions interface { // RunOptions is a set of options for a bootstrap run. type RunOptions interface { - // SetIncremental sets whether this bootstrap should be an incremental - // that saves intermediate results to durable storage or not. - SetIncremental(value bool) RunOptions + // SetPersistConfig sets persistence configuration for this bootstrap. + SetPersistConfig(value PersistConfig) RunOptions - // Incremental returns whether this bootstrap should be an incremental - // that saves intermediate results to durable storage or not. - Incremental() bool + // PersistConfig returns the persistence configuration for this bootstrap. + PersistConfig() PersistConfig // SetCacheSeriesMetadata sets whether bootstrappers created by this // provider should cache series metadata between runs.