From 44bf00b3254ef762c3966d9c920d968cee6b7ec1 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 10 Sep 2018 12:50:41 -0400 Subject: [PATCH 01/19] Refactor commit log bootstrapper to only return it can satisfy everything if node is Available or Leaving for that shard and add uninitialized source --- .../storage/bootstrap/bootstrapper/uninitialized/source_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go index be20411eea..99df96a825 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go @@ -33,7 +33,6 @@ import ( "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" xtime "github.com/m3db/m3x/time" - "github.com/stretchr/testify/require" ) From 606049200bcc4fc45fe5aec42276e264e4bee865 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 10 Sep 2018 15:42:02 -0400 Subject: [PATCH 02/19] Fix lint issues --- .../storage/bootstrap/bootstrapper/uninitialized/source_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go index 99df96a825..be20411eea 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/uninitialized/source_test.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" xtime "github.com/m3db/m3x/time" + "github.com/stretchr/testify/require" ) From 77f94c82984b394eccfe72b90972584766ebb19f Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 12 Sep 2018 15:40:28 -0400 Subject: [PATCH 03/19] Working imlementation --- .../cluster_add_one_node_commitlog_test.go | 260 ++++++++++++++++++ .../bootstrap/bootstrapper/fs/source.go | 4 +- .../bootstrap/bootstrapper/peers/source.go | 57 ++-- src/dbnode/storage/bootstrap/process.go | 15 +- src/dbnode/storage/bootstrap/run_options.go | 18 +- src/dbnode/storage/bootstrap/types.go | 20 +- 6 files changed, 331 insertions(+), 43 deletions(-) create mode 100644 src/dbnode/integration/cluster_add_one_node_commitlog_test.go diff --git a/src/dbnode/integration/cluster_add_one_node_commitlog_test.go b/src/dbnode/integration/cluster_add_one_node_commitlog_test.go new file mode 100644 index 0000000000..519b1c53da --- /dev/null +++ b/src/dbnode/integration/cluster_add_one_node_commitlog_test.go @@ -0,0 +1,260 @@ +// +build integration + +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "strconv" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/integration/fake" + "github.com/m3db/m3/src/dbnode/integration/generate" + "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/storage/namespace" + "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/dbnode/topology/testutil" + "github.com/m3db/m3cluster/services" + "github.com/m3db/m3cluster/shard" + "github.com/m3db/m3x/ident" + xlog "github.com/m3db/m3x/log" + xtime "github.com/m3db/m3x/time" + + "github.com/stretchr/testify/require" +) + +func TestClusterAddOneNodeCommitlog(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + // Test setups + log := xlog.SimpleLogger + + namesp, err := namespace.NewMetadata(testNamespaces[0], + namespace.NewOptions().SetRetentionOptions( + retention.NewOptions(). + SetRetentionPeriod(6*time.Hour). + SetBlockSize(2*time.Hour). + SetBufferPast(10*time.Minute). + SetBufferFuture(2*time.Minute))) + require.NoError(t, err) + opts := newTestOptions(t). + SetNamespaces([]namespace.Metadata{namesp}) + + minShard := uint32(0) + maxShard := uint32(opts.NumShards()) - uint32(1) + midShard := uint32((maxShard - minShard) / 2) + + instances := struct { + start []services.ServiceInstance + add []services.ServiceInstance + added []services.ServiceInstance + }{ + start: []services.ServiceInstance{ + node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Available)), + node(t, 1, newClusterEmptyShardsRange()), + }, + + add: []services.ServiceInstance{ + node(t, 0, concatShards( + newClusterShardsRange(minShard, midShard, shard.Available), + newClusterShardsRange(midShard+1, maxShard, shard.Leaving))), + node(t, 1, newClusterShardsRange(midShard+1, maxShard, shard.Initializing)), + }, + added: []services.ServiceInstance{ + node(t, 0, newClusterShardsRange(minShard, midShard, shard.Available)), + node(t, 1, newClusterShardsRange(midShard+1, maxShard, shard.Available)), + }, + } + + svc := fake.NewM3ClusterService(). + SetInstances(instances.start). + SetReplication(services.NewServiceReplication().SetReplicas(1)). + SetSharding(services.NewServiceSharding().SetNumShards(opts.NumShards())) + + svcs := fake.NewM3ClusterServices() + svcs.RegisterService("m3db", svc) + + topoOpts := topology.NewDynamicOptions(). + SetConfigServiceClient(fake.NewM3ClusterClient(svcs, nil)) + topoInit := topology.NewDynamicInitializer(topoOpts) + setupOpts := []bootstrappableTestSetupOptions{ + { + disablePeersBootstrapper: true, + topologyInitializer: topoInit, + }, + { + disablePeersBootstrapper: false, + topologyInitializer: topoInit, + }, + } + setups, closeFn := newDefaultBootstrappableTestSetups(t, opts, setupOpts) + defer closeFn() + + // Write test data for first node + topo, err := topoInit.Init() + require.NoError(t, err) + ids := []idShard{} + + // Boilerplate code to find two ID's that hash to the first half of the + // shards, and one ID that hashes to the second half of the shards. + shardSet := topo.Get().ShardSet() + i := 0 + numFirstHalf := 0 + numSecondHalf := 0 + for { + if numFirstHalf == 2 && numSecondHalf == 1 { + break + } + idStr := strconv.Itoa(i) + shard := shardSet.Lookup(ident.StringID(idStr)) + if shard < midShard && numFirstHalf < 2 { + ids = append(ids, idShard{str: idStr, shard: shard}) + numFirstHalf++ + } + if shard > midShard && numSecondHalf < 1 { + ids = append(ids, idShard{idStr, shard}) + numSecondHalf++ + } + i++ + } + + for _, id := range ids { + // Verify IDs will map to halves of the shard space + require.Equal(t, id.shard, shardSet.Lookup(ident.StringID(id.str))) + } + + now := setups[0].getNowFn() + blockSize := namesp.Options().RetentionOptions().BlockSize() + seriesMaps := generate.BlocksByStart([]generate.BlockConfig{ + {IDs: []string{ids[0].str, ids[1].str}, NumPoints: 180, Start: now.Add(-blockSize)}, + {IDs: []string{ids[0].str, ids[2].str}, NumPoints: 90, Start: now}, + }) + err = writeTestDataToDisk(namesp, setups[0], seriesMaps) + require.NoError(t, err) + + // Prepare verification of data on nodes + expectedSeriesMaps := make([]map[xtime.UnixNano]generate.SeriesBlock, 2) + expectedSeriesIDs := make([]map[string]struct{}, 2) + for i := range expectedSeriesMaps { + expectedSeriesMaps[i] = make(map[xtime.UnixNano]generate.SeriesBlock) + expectedSeriesIDs[i] = make(map[string]struct{}) + } + for start, series := range seriesMaps { + list := make([]generate.SeriesBlock, 2) + for j := range series { + if shardSet.Lookup(series[j].ID) < midShard+1 { + list[0] = append(list[0], series[j]) + } else { + list[1] = append(list[1], series[j]) + } + } + for i := range expectedSeriesMaps { + if len(list[i]) > 0 { + expectedSeriesMaps[i][start] = list[i] + } + } + } + for i := range expectedSeriesMaps { + for _, series := range expectedSeriesMaps[i] { + for _, elem := range series { + expectedSeriesIDs[i][elem.ID.String()] = struct{}{} + } + } + } + require.Equal(t, 2, len(expectedSeriesIDs[0])) + require.Equal(t, 1, len(expectedSeriesIDs[1])) + + // Start the first server with filesystem bootstrapper + require.NoError(t, setups[0].startServer()) + + // Start the last server with peers and filesystem bootstrappers, no shards + // are assigned at first + require.NoError(t, setups[1].startServer()) + log.Debug("servers are now up") + + // Stop the servers at test completion + defer func() { + log.Debug("servers closing") + setups.parallel(func(s *testSetup) { + require.NoError(t, s.stopServer()) + }) + log.Debug("servers are now down") + }() + + // Bootstrap the new shards + log.Debug("resharding to initialize shards on second node") + svc.SetInstances(instances.add) + svcs.NotifyServiceUpdate("m3db") + waitUntilHasBootstrappedShardsExactly(setups[1].db, testutil.Uint32Range(midShard+1, maxShard)) + + log.Debug("waiting for shards to be marked initialized") + allMarkedAvailable := func( + fakePlacementService fake.M3ClusterPlacementService, + instanceID string, + shards []shard.Shard, + ) bool { + markedAvailable := fakePlacementService.InstanceShardsMarkedAvailable() + if len(markedAvailable) != 1 { + return false + } + if len(markedAvailable[instanceID]) != len(shards) { + return false + } + marked := shard.NewShards(nil) + for _, id := range markedAvailable[instanceID] { + marked.Add(shard.NewShard(id).SetState(shard.Available)) + } + for _, shard := range shards { + if !marked.Contains(shard.ID()) { + return false + } + } + return true + } + + fps := svcs.FakePlacementService() + shouldMark := instances.add[1].Shards().All() + for !allMarkedAvailable(fps, "testhost1", shouldMark) { + time.Sleep(100 * time.Millisecond) + } + log.Debug("all shards marked as initialized") + + // Shed the old shards from the first node + log.Debug("resharding to shed shards from first node") + svc.SetInstances(instances.added) + svcs.NotifyServiceUpdate("m3db") + waitUntilHasBootstrappedShardsExactly(setups[0].db, testutil.Uint32Range(minShard, midShard)) + waitUntilHasBootstrappedShardsExactly(setups[1].db, testutil.Uint32Range(midShard+1, maxShard)) + + log.Debug("verifying data in servers matches expected data set") + + // Verify in-memory data match what we expect + for i := range setups { + verifySeriesMaps(t, setups[i], namesp.ID(), expectedSeriesMaps[i]) + } + + require.NoError(t, setups[1].stopServer()) + startServerWithNewInspection(t, opts, setups[1]) + verifySeriesMaps(t, setups[1], namesp.ID(), expectedSeriesMaps[1]) +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 15b6f77ae9..73b697b6c4 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -220,7 +220,7 @@ func (s *fileSystemSource) enqueueReaders( // Close the readers ch if and only if all readers are enqueued defer close(readersCh) - indexIncrementalBootstrap := run == bootstrapIndexRunType && runOpts.Incremental() + indexIncrementalBootstrap := run == bootstrapIndexRunType && runOpts.IncrementalConfig().Enabled if !indexIncrementalBootstrap { // Normal run, open readers s.enqueueReadersGroupedByBlockSize(ns, run, runOpts, @@ -594,7 +594,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( } } - incremental := runOpts.Incremental() + incremental := runOpts.IncrementalConfig().Enabled noneRemaining := remainingRanges.IsEmpty() if run == bootstrapIndexRunType && incremental && noneRemaining { err := s.incrementalBootstrapIndexSegment(ns, requestedRanges, runResult) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index ef12104442..1641d3d2b9 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -101,8 +101,9 @@ func (s *peersSource) ReadData( persistFlush persist.DataFlush incremental = false seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() + incrementalConfig = opts.IncrementalConfig() ) - if opts.Incremental() && seriesCachePolicy != series.CacheAll { + if incrementalConfig.Enabled && seriesCachePolicy != series.CacheAll { retrieverMgr := s.opts.DatabaseBlockRetrieverManager() persistManager := s.opts.PersistManager() @@ -166,7 +167,7 @@ func (s *peersSource) ReadData( ).Infof("peers bootstrapper bootstrapping shards for ranges") if incremental { go s.startIncrementalQueueWorkerLoop( - incrementalWorkerDoneCh, incrementalQueue, persistFlush, result, &resultLock) + opts, incrementalWorkerDoneCh, incrementalQueue, persistFlush, result, &resultLock) } workers := xsync.NewWorkerPool(concurrency) @@ -206,6 +207,7 @@ func (s *peersSource) ReadData( // is closed, and the worker has completed flushing all the remaining entries, it will close the // provided doneCh so that callers can block until everything has been successfully flushed. func (s *peersSource) startIncrementalQueueWorkerLoop( + opts bootstrap.RunOptions, doneCh chan struct{}, incrementalQueue chan incrementalFlush, persistFlush persist.DataFlush, @@ -215,7 +217,7 @@ func (s *peersSource) startIncrementalQueueWorkerLoop( // If performing an incremental bootstrap then flush one // at a time as shard results are gathered for flush := range incrementalQueue { - err := s.incrementalFlush(persistFlush, flush.nsMetadata, flush.shard, + err := s.incrementalFlush(opts, persistFlush, flush.nsMetadata, flush.shard, flush.shardRetrieverMgr, flush.shardResult, flush.timeRange) if err == nil { // Safe to add to the shared bootstrap result now @@ -344,6 +346,7 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( // a huge memory spike caused by adding lots of unused series to the Shard // object and then immediately evicting them in the next tick. func (s *peersSource) incrementalFlush( + opts bootstrap.RunOptions, flush persist.DataFlush, nsMetadata namespace.Metadata, shard uint32, @@ -357,6 +360,7 @@ func (s *peersSource) incrementalFlush( shardRetriever = shardRetrieverMgr.ShardRetriever(shard) tmpCtx = context.NewContext() seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() + incrementalConfig = opts.IncrementalConfig() ) if seriesCachePolicy == series.CacheAllMetadata && shardRetriever == nil { return fmt.Errorf("shard retriever missing for shard: %d", shard) @@ -365,6 +369,7 @@ func (s *peersSource) incrementalFlush( for start := tr.Start; start.Before(tr.End); start = start.Add(blockSize) { prepareOpts := persist.DataPrepareOptions{ NamespaceMetadata: nsMetadata, + FileSetType: incrementalConfig.FileSetType, Shard: shard, BlockStart: start, // If we've peer bootstrapped this shard/block combination AND the fileset @@ -422,28 +427,34 @@ func (s *peersSource) incrementalFlush( break } - switch seriesCachePolicy { - case series.CacheAll: - // Leave the blocks in the shard result, we need to return all blocks - // so we can cache in memory - case series.CacheAllMetadata: - // NB(r): We can now make the flushed blocks retrievable, note that we - // explicitly perform another loop here and lookup the block again - // to avoid a large expensive allocation to hold onto the blocks - // that we just flushed that would have to be pooled. - // We are explicitly trading CPU time here for lower GC pressure. - metadata := block.RetrievableBlockMetadata{ - ID: s.ID, - Length: bl.Len(), - Checksum: checksum, + switch incrementalConfig.FileSetType { + case persist.FileSetFlushType: + switch seriesCachePolicy { + case series.CacheAll: + // Leave the blocks in the shard result, we need to return all blocks + // so we can cache in memory + case series.CacheAllMetadata: + // NB(r): We can now make the flushed blocks retrievable, note that we + // explicitly perform another loop here and lookup the block again + // to avoid a large expensive allocation to hold onto the blocks + // that we just flushed that would have to be pooled. + // We are explicitly trading CPU time here for lower GC pressure. + metadata := block.RetrievableBlockMetadata{ + ID: s.ID, + Length: bl.Len(), + Checksum: checksum, + } + bl.ResetRetrievable(start, blockSize, shardRetriever, metadata) + default: + // Not caching the series or metadata in memory so finalize the block, + // better to do this as we loop through to make blocks return to the + // pool earlier than at the end of this flush cycle + s.Blocks.RemoveBlockAt(start) + bl.Close() } - bl.ResetRetrievable(start, blockSize, shardRetriever, metadata) + case persist.FileSetSnapshotType: default: - // Not caching the series or metadata in memory so finalize the block, - // better to do this as we loop through to make blocks return to the - // pool earlier than at the end of this flush cycle - s.Blocks.RemoveBlockAt(start) - bl.Close() + panic(fmt.Sprintf("unknown FileSetFileType: %v", incrementalConfig.FileSetType)) } } diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index 9f184a11f6..0d10e029a9 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/dbnode/client" "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/namespace" @@ -342,12 +343,18 @@ func (b bootstrapProcess) targetRanges( // cause the process to OOM. return []TargetRange{ { - Range: xtime.Range{Start: start, End: midPoint}, - RunOptions: b.newRunOptions().SetIncremental(true), + Range: xtime.Range{Start: start, End: midPoint}, + RunOptions: b.newRunOptions().SetIncrementalConfig(IncrementalConfig{ + Enabled: true, + FileSetType: persist.FileSetFlushType, + }), }, { - Range: xtime.Range{Start: midPoint, End: cutover}, - RunOptions: b.newRunOptions().SetIncremental(false), + Range: xtime.Range{Start: midPoint, End: cutover}, + RunOptions: b.newRunOptions().SetIncrementalConfig(IncrementalConfig{ + Enabled: true, + FileSetType: persist.FileSetSnapshotType, + }), }, } } diff --git a/src/dbnode/storage/bootstrap/run_options.go b/src/dbnode/storage/bootstrap/run_options.go index aed9864f69..7d278660e3 100644 --- a/src/dbnode/storage/bootstrap/run_options.go +++ b/src/dbnode/storage/bootstrap/run_options.go @@ -22,14 +22,16 @@ package bootstrap import "github.com/m3db/m3/src/dbnode/topology" -const ( +var ( // defaultIncremental declares the intent to by default not perform an // incremental bootstrap. - defaultIncremental = false + defaultIncremental = IncrementalConfig{ + Enabled: false, + } ) type runOptions struct { - incremental bool + incrementalConfig IncrementalConfig cacheSeriesMetadata bool initialTopologyState *topology.StateSnapshot } @@ -37,20 +39,20 @@ type runOptions struct { // NewRunOptions creates new bootstrap run options func NewRunOptions() RunOptions { return &runOptions{ - incremental: defaultIncremental, + incrementalConfig: defaultIncremental, cacheSeriesMetadata: defaultCacheSeriesMetadata, initialTopologyState: nil, } } -func (o *runOptions) SetIncremental(value bool) RunOptions { +func (o *runOptions) SetIncrementalConfig(value IncrementalConfig) RunOptions { opts := *o - opts.incremental = value + opts.incrementalConfig = value return &opts } -func (o *runOptions) Incremental() bool { - return o.incremental +func (o *runOptions) IncrementalConfig() IncrementalConfig { + return o.incrementalConfig } func (o *runOptions) SetCacheSeriesMetadata(value bool) RunOptions { diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index 56dfc15468..99c6045968 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -24,6 +24,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/dbnode/topology" @@ -68,6 +69,15 @@ type TargetRange struct { RunOptions RunOptions } +// IncrementalConfig is the configuration for an incremental bootstrap. +type IncrementalConfig struct { + // Whether this bootstrap should incrementally persist data to disk. + Enabled bool + // If enabled, what type of files should be generated during the incremental + // process. + FileSetType persist.FileSetType +} + // ProcessOptions is a set of options for a bootstrap provider. type ProcessOptions interface { // SetCacheSeriesMetadata sets whether bootstrappers created by this @@ -90,13 +100,11 @@ type ProcessOptions interface { // RunOptions is a set of options for a bootstrap run. type RunOptions interface { - // SetIncremental sets whether this bootstrap should be an incremental - // that saves intermediate results to durable storage or not. - SetIncremental(value bool) RunOptions + // SetIncrementalConfig sets incremental configuration for this bootstrap. + SetIncrementalConfig(value IncrementalConfig) RunOptions - // Incremental returns whether this bootstrap should be an incremental - // that saves intermediate results to durable storage or not. - Incremental() bool + // IncrementalConfig returns the incremental configuration for this bootstrap. + IncrementalConfig() IncrementalConfig // SetCacheSeriesMetadata sets whether bootstrappers created by this // provider should cache series metadata between runs. From 1220e9e641ee1e2d4cb4abb8c94a46b3a6aae49e Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 13 Sep 2018 13:51:20 -0400 Subject: [PATCH 04/19] fix lint issues --- src/cmd/tools/verify_commitlogs/main/main.go | 2 +- .../storage/bootstrap/bootstrap_mock.go | 26 +++++++++---------- .../bootstrap/bootstrapper/base_test.go | 3 ++- .../commitlog/source_data_test.go | 2 +- .../bootstrapper/fs/source_data_test.go | 19 +++++++------- .../bootstrapper/fs/source_index_test.go | 7 +++-- .../bootstrapper/peers/source_data_test.go | 12 +++++---- 7 files changed, 39 insertions(+), 32 deletions(-) diff --git a/src/cmd/tools/verify_commitlogs/main/main.go b/src/cmd/tools/verify_commitlogs/main/main.go index 5cc3f7155e..0d2fc42212 100644 --- a/src/cmd/tools/verify_commitlogs/main/main.go +++ b/src/cmd/tools/verify_commitlogs/main/main.go @@ -255,7 +255,7 @@ func main() { nsID := ident.StringID(namespaceStr) runOpts := bootstrap.NewRunOptions(). // Dont save intermediate results - SetIncremental(false) + SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: false}) nsMetadata, err := namespace.NewMetadata(nsID, namespace.NewOptions().SetRetentionOptions(retentionOpts)) if err != nil { log.Fatal(err.Error()) diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index 153b234c6b..caffb700d6 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -236,28 +236,28 @@ func (m *MockRunOptions) EXPECT() *MockRunOptionsMockRecorder { return m.recorder } -// SetIncremental mocks base method -func (m *MockRunOptions) SetIncremental(value bool) RunOptions { - ret := m.ctrl.Call(m, "SetIncremental", value) +// SetIncrementalConfig mocks base method +func (m *MockRunOptions) SetIncrementalConfig(value IncrementalConfig) RunOptions { + ret := m.ctrl.Call(m, "SetIncrementalConfig", value) ret0, _ := ret[0].(RunOptions) return ret0 } -// SetIncremental indicates an expected call of SetIncremental -func (mr *MockRunOptionsMockRecorder) SetIncremental(value interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIncremental", reflect.TypeOf((*MockRunOptions)(nil).SetIncremental), value) +// SetIncrementalConfig indicates an expected call of SetIncrementalConfig +func (mr *MockRunOptionsMockRecorder) SetIncrementalConfig(value interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIncrementalConfig", reflect.TypeOf((*MockRunOptions)(nil).SetIncrementalConfig), value) } -// Incremental mocks base method -func (m *MockRunOptions) Incremental() bool { - ret := m.ctrl.Call(m, "Incremental") - ret0, _ := ret[0].(bool) +// IncrementalConfig mocks base method +func (m *MockRunOptions) IncrementalConfig() IncrementalConfig { + ret := m.ctrl.Call(m, "IncrementalConfig") + ret0, _ := ret[0].(IncrementalConfig) return ret0 } -// Incremental indicates an expected call of Incremental -func (mr *MockRunOptionsMockRecorder) Incremental() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Incremental", reflect.TypeOf((*MockRunOptions)(nil).Incremental)) +// IncrementalConfig indicates an expected call of IncrementalConfig +func (mr *MockRunOptionsMockRecorder) IncrementalConfig() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncrementalConfig", reflect.TypeOf((*MockRunOptions)(nil).IncrementalConfig)) } // SetCacheSeriesMetadata mocks base method diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go index 6a50f694cd..6722ba9098 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go @@ -44,7 +44,8 @@ var ( testNamespaceID = ident.StringID("testNamespace") testTargetStart = time.Now() testShard = uint32(0) - testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(false) + testDefaultRunOpts = bootstrap.NewRunOptions(). + SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: false}) ) func testNsMetadata(t *testing.T) namespace.Metadata { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go index f47cb335f4..9a68b8a2ee 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_data_test.go @@ -50,7 +50,7 @@ import ( var ( testNamespaceID = ident.StringID("testnamespace") - testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(true) + testDefaultRunOpts = bootstrap.NewRunOptions() minCommitLogRetention = 10 * time.Minute ) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go index 6a760c1484..711abbf46d 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go @@ -48,15 +48,16 @@ import ( ) var ( - testShard = uint32(0) - testNs1ID = ident.StringID("testNs") - testBlockSize = 2 * time.Hour - testIndexBlockSize = 4 * time.Hour - testStart = time.Now().Truncate(testBlockSize) - testFileMode = os.FileMode(0666) - testDirMode = os.ModeDir | os.FileMode(0755) - testWriterBufferSize = 10 - testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(false) + testShard = uint32(0) + testNs1ID = ident.StringID("testNs") + testBlockSize = 2 * time.Hour + testIndexBlockSize = 4 * time.Hour + testStart = time.Now().Truncate(testBlockSize) + testFileMode = os.FileMode(0666) + testDirMode = os.ModeDir | os.FileMode(0755) + testWriterBufferSize = 10 + testDefaultRunOpts = bootstrap.NewRunOptions(). + SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: false}) testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) testDefaultOpts = NewOptions().SetResultOptions(testDefaultResultOpts) ) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go index deee9d4ef3..8dd0f55d5c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/namespace" @@ -284,7 +285,8 @@ func TestBootstrapIndexIncremental(t *testing.T) { scope := tally.NewTestScope("", nil) opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope)) - runOpts := testDefaultRunOpts.SetIncremental(true) + runOpts := testDefaultRunOpts. + SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: true}) src := newFileSystemSource(opts).(*fileSystemSource) res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges, @@ -353,7 +355,8 @@ func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) { scope := tally.NewTestScope("", nil) opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope)) - runOpts := testDefaultRunOpts.SetIncremental(true) + runOpts := testDefaultRunOpts. + SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: true}) src := newFileSystemSource(opts).(*fileSystemSource) res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go index 5a2d136d50..4c1134307e 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go @@ -58,11 +58,13 @@ var ( return ns } - testDefaultRunOpts = bootstrap.NewRunOptions().SetIncremental(false) - testIncrementalRunOpts = bootstrap.NewRunOptions().SetIncremental(true) - testBlockOpts = block.NewOptions() - testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) - testDefaultOpts = NewOptions(). + testDefaultRunOpts = bootstrap.NewRunOptions(). + SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: false}) + testIncrementalRunOpts = bootstrap.NewRunOptions(). + SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: true}) + testBlockOpts = block.NewOptions() + testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) + testDefaultOpts = NewOptions(). SetResultOptions(testDefaultResultOpts) ) From 554bdfc5ca2ab3788056cd5106ac0fca6581e1c4 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 13 Sep 2018 13:56:02 -0400 Subject: [PATCH 05/19] Fix copyright year --- src/dbnode/integration/cluster_add_one_node_commitlog_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/integration/cluster_add_one_node_commitlog_test.go b/src/dbnode/integration/cluster_add_one_node_commitlog_test.go index 519b1c53da..c8d62aad58 100644 --- a/src/dbnode/integration/cluster_add_one_node_commitlog_test.go +++ b/src/dbnode/integration/cluster_add_one_node_commitlog_test.go @@ -1,6 +1,6 @@ // +build integration -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2018 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal From e6ef6fedee9c2c19be98730a98342212f41b6e08 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 13 Sep 2018 14:10:33 -0400 Subject: [PATCH 06/19] Simplify integration test --- .../cluster_add_one_node_commitlog_test.go | 227 +----------------- .../integration/cluster_add_one_node_test.go | 13 + 2 files changed, 14 insertions(+), 226 deletions(-) diff --git a/src/dbnode/integration/cluster_add_one_node_commitlog_test.go b/src/dbnode/integration/cluster_add_one_node_commitlog_test.go index c8d62aad58..90b12bd808 100644 --- a/src/dbnode/integration/cluster_add_one_node_commitlog_test.go +++ b/src/dbnode/integration/cluster_add_one_node_commitlog_test.go @@ -23,23 +23,7 @@ package integration import ( - "strconv" "testing" - "time" - - "github.com/m3db/m3/src/dbnode/integration/fake" - "github.com/m3db/m3/src/dbnode/integration/generate" - "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/storage/namespace" - "github.com/m3db/m3/src/dbnode/topology" - "github.com/m3db/m3/src/dbnode/topology/testutil" - "github.com/m3db/m3cluster/services" - "github.com/m3db/m3cluster/shard" - "github.com/m3db/m3x/ident" - xlog "github.com/m3db/m3x/log" - xtime "github.com/m3db/m3x/time" - - "github.com/stretchr/testify/require" ) func TestClusterAddOneNodeCommitlog(t *testing.T) { @@ -47,214 +31,5 @@ func TestClusterAddOneNodeCommitlog(t *testing.T) { t.SkipNow() } - // Test setups - log := xlog.SimpleLogger - - namesp, err := namespace.NewMetadata(testNamespaces[0], - namespace.NewOptions().SetRetentionOptions( - retention.NewOptions(). - SetRetentionPeriod(6*time.Hour). - SetBlockSize(2*time.Hour). - SetBufferPast(10*time.Minute). - SetBufferFuture(2*time.Minute))) - require.NoError(t, err) - opts := newTestOptions(t). - SetNamespaces([]namespace.Metadata{namesp}) - - minShard := uint32(0) - maxShard := uint32(opts.NumShards()) - uint32(1) - midShard := uint32((maxShard - minShard) / 2) - - instances := struct { - start []services.ServiceInstance - add []services.ServiceInstance - added []services.ServiceInstance - }{ - start: []services.ServiceInstance{ - node(t, 0, newClusterShardsRange(minShard, maxShard, shard.Available)), - node(t, 1, newClusterEmptyShardsRange()), - }, - - add: []services.ServiceInstance{ - node(t, 0, concatShards( - newClusterShardsRange(minShard, midShard, shard.Available), - newClusterShardsRange(midShard+1, maxShard, shard.Leaving))), - node(t, 1, newClusterShardsRange(midShard+1, maxShard, shard.Initializing)), - }, - added: []services.ServiceInstance{ - node(t, 0, newClusterShardsRange(minShard, midShard, shard.Available)), - node(t, 1, newClusterShardsRange(midShard+1, maxShard, shard.Available)), - }, - } - - svc := fake.NewM3ClusterService(). - SetInstances(instances.start). - SetReplication(services.NewServiceReplication().SetReplicas(1)). - SetSharding(services.NewServiceSharding().SetNumShards(opts.NumShards())) - - svcs := fake.NewM3ClusterServices() - svcs.RegisterService("m3db", svc) - - topoOpts := topology.NewDynamicOptions(). - SetConfigServiceClient(fake.NewM3ClusterClient(svcs, nil)) - topoInit := topology.NewDynamicInitializer(topoOpts) - setupOpts := []bootstrappableTestSetupOptions{ - { - disablePeersBootstrapper: true, - topologyInitializer: topoInit, - }, - { - disablePeersBootstrapper: false, - topologyInitializer: topoInit, - }, - } - setups, closeFn := newDefaultBootstrappableTestSetups(t, opts, setupOpts) - defer closeFn() - - // Write test data for first node - topo, err := topoInit.Init() - require.NoError(t, err) - ids := []idShard{} - - // Boilerplate code to find two ID's that hash to the first half of the - // shards, and one ID that hashes to the second half of the shards. - shardSet := topo.Get().ShardSet() - i := 0 - numFirstHalf := 0 - numSecondHalf := 0 - for { - if numFirstHalf == 2 && numSecondHalf == 1 { - break - } - idStr := strconv.Itoa(i) - shard := shardSet.Lookup(ident.StringID(idStr)) - if shard < midShard && numFirstHalf < 2 { - ids = append(ids, idShard{str: idStr, shard: shard}) - numFirstHalf++ - } - if shard > midShard && numSecondHalf < 1 { - ids = append(ids, idShard{idStr, shard}) - numSecondHalf++ - } - i++ - } - - for _, id := range ids { - // Verify IDs will map to halves of the shard space - require.Equal(t, id.shard, shardSet.Lookup(ident.StringID(id.str))) - } - - now := setups[0].getNowFn() - blockSize := namesp.Options().RetentionOptions().BlockSize() - seriesMaps := generate.BlocksByStart([]generate.BlockConfig{ - {IDs: []string{ids[0].str, ids[1].str}, NumPoints: 180, Start: now.Add(-blockSize)}, - {IDs: []string{ids[0].str, ids[2].str}, NumPoints: 90, Start: now}, - }) - err = writeTestDataToDisk(namesp, setups[0], seriesMaps) - require.NoError(t, err) - - // Prepare verification of data on nodes - expectedSeriesMaps := make([]map[xtime.UnixNano]generate.SeriesBlock, 2) - expectedSeriesIDs := make([]map[string]struct{}, 2) - for i := range expectedSeriesMaps { - expectedSeriesMaps[i] = make(map[xtime.UnixNano]generate.SeriesBlock) - expectedSeriesIDs[i] = make(map[string]struct{}) - } - for start, series := range seriesMaps { - list := make([]generate.SeriesBlock, 2) - for j := range series { - if shardSet.Lookup(series[j].ID) < midShard+1 { - list[0] = append(list[0], series[j]) - } else { - list[1] = append(list[1], series[j]) - } - } - for i := range expectedSeriesMaps { - if len(list[i]) > 0 { - expectedSeriesMaps[i][start] = list[i] - } - } - } - for i := range expectedSeriesMaps { - for _, series := range expectedSeriesMaps[i] { - for _, elem := range series { - expectedSeriesIDs[i][elem.ID.String()] = struct{}{} - } - } - } - require.Equal(t, 2, len(expectedSeriesIDs[0])) - require.Equal(t, 1, len(expectedSeriesIDs[1])) - - // Start the first server with filesystem bootstrapper - require.NoError(t, setups[0].startServer()) - - // Start the last server with peers and filesystem bootstrappers, no shards - // are assigned at first - require.NoError(t, setups[1].startServer()) - log.Debug("servers are now up") - - // Stop the servers at test completion - defer func() { - log.Debug("servers closing") - setups.parallel(func(s *testSetup) { - require.NoError(t, s.stopServer()) - }) - log.Debug("servers are now down") - }() - - // Bootstrap the new shards - log.Debug("resharding to initialize shards on second node") - svc.SetInstances(instances.add) - svcs.NotifyServiceUpdate("m3db") - waitUntilHasBootstrappedShardsExactly(setups[1].db, testutil.Uint32Range(midShard+1, maxShard)) - - log.Debug("waiting for shards to be marked initialized") - allMarkedAvailable := func( - fakePlacementService fake.M3ClusterPlacementService, - instanceID string, - shards []shard.Shard, - ) bool { - markedAvailable := fakePlacementService.InstanceShardsMarkedAvailable() - if len(markedAvailable) != 1 { - return false - } - if len(markedAvailable[instanceID]) != len(shards) { - return false - } - marked := shard.NewShards(nil) - for _, id := range markedAvailable[instanceID] { - marked.Add(shard.NewShard(id).SetState(shard.Available)) - } - for _, shard := range shards { - if !marked.Contains(shard.ID()) { - return false - } - } - return true - } - - fps := svcs.FakePlacementService() - shouldMark := instances.add[1].Shards().All() - for !allMarkedAvailable(fps, "testhost1", shouldMark) { - time.Sleep(100 * time.Millisecond) - } - log.Debug("all shards marked as initialized") - - // Shed the old shards from the first node - log.Debug("resharding to shed shards from first node") - svc.SetInstances(instances.added) - svcs.NotifyServiceUpdate("m3db") - waitUntilHasBootstrappedShardsExactly(setups[0].db, testutil.Uint32Range(minShard, midShard)) - waitUntilHasBootstrappedShardsExactly(setups[1].db, testutil.Uint32Range(midShard+1, maxShard)) - - log.Debug("verifying data in servers matches expected data set") - - // Verify in-memory data match what we expect - for i := range setups { - verifySeriesMaps(t, setups[i], namesp.ID(), expectedSeriesMaps[i]) - } - - require.NoError(t, setups[1].stopServer()) - startServerWithNewInspection(t, opts, setups[1]) - verifySeriesMaps(t, setups[1], namesp.ID(), expectedSeriesMaps[1]) + testClusterAddOneNode(t, true) } diff --git a/src/dbnode/integration/cluster_add_one_node_test.go b/src/dbnode/integration/cluster_add_one_node_test.go index 65ea749cfe..9c8b6cda40 100644 --- a/src/dbnode/integration/cluster_add_one_node_test.go +++ b/src/dbnode/integration/cluster_add_one_node_test.go @@ -48,6 +48,10 @@ type idShard struct { } func TestClusterAddOneNode(t *testing.T) { + testClusterAddOneNode(t, false) +} + +func testClusterAddOneNode(t *testing.T, verifyCommitlogCanBootstrapAfterNodeJoin bool) { if testing.Short() { t.SkipNow() } @@ -258,4 +262,13 @@ func TestClusterAddOneNode(t *testing.T) { for i := range setups { verifySeriesMaps(t, setups[i], namesp.ID(), expectedSeriesMaps[i]) } + + if verifyCommitlogCanBootstrapAfterNodeJoin { + // Verify that the node that joined the cluster can immediately bootstrap + // the data it streamed from its peers from the commit log as soon as + // the bootstrapping process completes. + require.NoError(t, setups[1].stopServer()) + startServerWithNewInspection(t, opts, setups[1]) + verifySeriesMaps(t, setups[1], namesp.ID(), expectedSeriesMaps[1]) + } } From f1370e7668c80d0c14aa8322418d735ac969178e Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 13 Sep 2018 14:12:00 -0400 Subject: [PATCH 07/19] Change name of var --- src/dbnode/storage/bootstrap/run_options.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dbnode/storage/bootstrap/run_options.go b/src/dbnode/storage/bootstrap/run_options.go index 7d278660e3..92b53bf9af 100644 --- a/src/dbnode/storage/bootstrap/run_options.go +++ b/src/dbnode/storage/bootstrap/run_options.go @@ -23,9 +23,9 @@ package bootstrap import "github.com/m3db/m3/src/dbnode/topology" var ( - // defaultIncremental declares the intent to by default not perform an + // defaultIncrementalConfig declares the intent to by default not perform an // incremental bootstrap. - defaultIncremental = IncrementalConfig{ + defaultIncrementalConfig = IncrementalConfig{ Enabled: false, } ) @@ -39,7 +39,7 @@ type runOptions struct { // NewRunOptions creates new bootstrap run options func NewRunOptions() RunOptions { return &runOptions{ - incrementalConfig: defaultIncremental, + incrementalConfig: defaultIncrementalConfig, cacheSeriesMetadata: defaultCacheSeriesMetadata, initialTopologyState: nil, } From b0ef427c1223f42bcb72df54b40a0fb2553f5772 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 17 Sep 2018 16:42:00 -0400 Subject: [PATCH 08/19] Remove panic and rename incrementalconfig to persistconfig --- src/cmd/tools/verify_commitlogs/main/main.go | 2 +- .../storage/bootstrap/bootstrapper/peers/source.go | 12 ++++++------ src/dbnode/storage/bootstrap/types.go | 12 ++++++------ 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/cmd/tools/verify_commitlogs/main/main.go b/src/cmd/tools/verify_commitlogs/main/main.go index 0d2fc42212..925edc7370 100644 --- a/src/cmd/tools/verify_commitlogs/main/main.go +++ b/src/cmd/tools/verify_commitlogs/main/main.go @@ -255,7 +255,7 @@ func main() { nsID := ident.StringID(namespaceStr) runOpts := bootstrap.NewRunOptions(). // Dont save intermediate results - SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: false}) + SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) nsMetadata, err := namespace.NewMetadata(nsID, namespace.NewOptions().SetRetentionOptions(retentionOpts)) if err != nil { log.Fatal(err.Error()) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 1641d3d2b9..98f11d2c2b 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -101,9 +101,9 @@ func (s *peersSource) ReadData( persistFlush persist.DataFlush incremental = false seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() - incrementalConfig = opts.IncrementalConfig() + persistConfig = opts.PersistConfig() ) - if incrementalConfig.Enabled && seriesCachePolicy != series.CacheAll { + if persistConfig.Enabled && seriesCachePolicy != series.CacheAll { retrieverMgr := s.opts.DatabaseBlockRetrieverManager() persistManager := s.opts.PersistManager() @@ -360,7 +360,7 @@ func (s *peersSource) incrementalFlush( shardRetriever = shardRetrieverMgr.ShardRetriever(shard) tmpCtx = context.NewContext() seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() - incrementalConfig = opts.IncrementalConfig() + persistConfig = opts.PersistConfig() ) if seriesCachePolicy == series.CacheAllMetadata && shardRetriever == nil { return fmt.Errorf("shard retriever missing for shard: %d", shard) @@ -369,7 +369,7 @@ func (s *peersSource) incrementalFlush( for start := tr.Start; start.Before(tr.End); start = start.Add(blockSize) { prepareOpts := persist.DataPrepareOptions{ NamespaceMetadata: nsMetadata, - FileSetType: incrementalConfig.FileSetType, + FileSetType: persistConfig.FileSetType, Shard: shard, BlockStart: start, // If we've peer bootstrapped this shard/block combination AND the fileset @@ -427,7 +427,7 @@ func (s *peersSource) incrementalFlush( break } - switch incrementalConfig.FileSetType { + switch persistConfig.FileSetType { case persist.FileSetFlushType: switch seriesCachePolicy { case series.CacheAll: @@ -454,7 +454,7 @@ func (s *peersSource) incrementalFlush( } case persist.FileSetSnapshotType: default: - panic(fmt.Sprintf("unknown FileSetFileType: %v", incrementalConfig.FileSetType)) + return fmt.Errorf("unknown FileSetFileType: %v", persistConfig.FileSetType) } } diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index 99c6045968..eb800bdf01 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -69,8 +69,8 @@ type TargetRange struct { RunOptions RunOptions } -// IncrementalConfig is the configuration for an incremental bootstrap. -type IncrementalConfig struct { +// PersistConfig is the configuration for an incremental bootstrap. +type PersistConfig struct { // Whether this bootstrap should incrementally persist data to disk. Enabled bool // If enabled, what type of files should be generated during the incremental @@ -100,11 +100,11 @@ type ProcessOptions interface { // RunOptions is a set of options for a bootstrap run. type RunOptions interface { - // SetIncrementalConfig sets incremental configuration for this bootstrap. - SetIncrementalConfig(value IncrementalConfig) RunOptions + // SetPersistConfig sets incremental configuration for this bootstrap. + SetPersistConfig(value PersistConfig) RunOptions - // IncrementalConfig returns the incremental configuration for this bootstrap. - IncrementalConfig() IncrementalConfig + // PersistConfig returns the incremental configuration for this bootstrap. + PersistConfig() PersistConfig // SetCacheSeriesMetadata sets whether bootstrappers created by this // provider should cache series metadata between runs. From 3636222bfa3499c77118b84692ccc643bcdff75c Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 17 Sep 2018 17:14:13 -0400 Subject: [PATCH 09/19] Regen mocks and fix naming --- .../storage/bootstrap/bootstrap_mock.go | 26 +++++++++---------- src/dbnode/storage/bootstrap/process.go | 4 +-- src/dbnode/storage/bootstrap/run_options.go | 16 ++++++------ 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index caffb700d6..20e63574c1 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -236,28 +236,28 @@ func (m *MockRunOptions) EXPECT() *MockRunOptionsMockRecorder { return m.recorder } -// SetIncrementalConfig mocks base method -func (m *MockRunOptions) SetIncrementalConfig(value IncrementalConfig) RunOptions { - ret := m.ctrl.Call(m, "SetIncrementalConfig", value) +// SetPersistConfig mocks base method +func (m *MockRunOptions) SetPersistConfig(value PersistConfig) RunOptions { + ret := m.ctrl.Call(m, "SetPersistConfig", value) ret0, _ := ret[0].(RunOptions) return ret0 } -// SetIncrementalConfig indicates an expected call of SetIncrementalConfig -func (mr *MockRunOptionsMockRecorder) SetIncrementalConfig(value interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIncrementalConfig", reflect.TypeOf((*MockRunOptions)(nil).SetIncrementalConfig), value) +// SetPersistConfig indicates an expected call of SetPersistConfig +func (mr *MockRunOptionsMockRecorder) SetPersistConfig(value interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetPersistConfig", reflect.TypeOf((*MockRunOptions)(nil).SetPersistConfig), value) } -// IncrementalConfig mocks base method -func (m *MockRunOptions) IncrementalConfig() IncrementalConfig { - ret := m.ctrl.Call(m, "IncrementalConfig") - ret0, _ := ret[0].(IncrementalConfig) +// PersistConfig mocks base method +func (m *MockRunOptions) PersistConfig() PersistConfig { + ret := m.ctrl.Call(m, "PersistConfig") + ret0, _ := ret[0].(PersistConfig) return ret0 } -// IncrementalConfig indicates an expected call of IncrementalConfig -func (mr *MockRunOptionsMockRecorder) IncrementalConfig() *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IncrementalConfig", reflect.TypeOf((*MockRunOptions)(nil).IncrementalConfig)) +// PersistConfig indicates an expected call of PersistConfig +func (mr *MockRunOptionsMockRecorder) PersistConfig() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PersistConfig", reflect.TypeOf((*MockRunOptions)(nil).PersistConfig)) } // SetCacheSeriesMetadata mocks base method diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index 0d10e029a9..e2dab74a19 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -344,14 +344,14 @@ func (b bootstrapProcess) targetRanges( return []TargetRange{ { Range: xtime.Range{Start: start, End: midPoint}, - RunOptions: b.newRunOptions().SetIncrementalConfig(IncrementalConfig{ + RunOptions: b.newRunOptions().SetPersistConfig(PersistConfig{ Enabled: true, FileSetType: persist.FileSetFlushType, }), }, { Range: xtime.Range{Start: midPoint, End: cutover}, - RunOptions: b.newRunOptions().SetIncrementalConfig(IncrementalConfig{ + RunOptions: b.newRunOptions().SetPersistConfig(PersistConfig{ Enabled: true, FileSetType: persist.FileSetSnapshotType, }), diff --git a/src/dbnode/storage/bootstrap/run_options.go b/src/dbnode/storage/bootstrap/run_options.go index 92b53bf9af..d44fc29ff5 100644 --- a/src/dbnode/storage/bootstrap/run_options.go +++ b/src/dbnode/storage/bootstrap/run_options.go @@ -23,15 +23,15 @@ package bootstrap import "github.com/m3db/m3/src/dbnode/topology" var ( - // defaultIncrementalConfig declares the intent to by default not perform an + // defaultPersistConfig declares the intent to by default not perform an // incremental bootstrap. - defaultIncrementalConfig = IncrementalConfig{ + defaultPersistConfig = PersistConfig{ Enabled: false, } ) type runOptions struct { - incrementalConfig IncrementalConfig + persistConfig PersistConfig cacheSeriesMetadata bool initialTopologyState *topology.StateSnapshot } @@ -39,20 +39,20 @@ type runOptions struct { // NewRunOptions creates new bootstrap run options func NewRunOptions() RunOptions { return &runOptions{ - incrementalConfig: defaultIncrementalConfig, + persistConfig: defaultPersistConfig, cacheSeriesMetadata: defaultCacheSeriesMetadata, initialTopologyState: nil, } } -func (o *runOptions) SetIncrementalConfig(value IncrementalConfig) RunOptions { +func (o *runOptions) SetPersistConfig(value PersistConfig) RunOptions { opts := *o - opts.incrementalConfig = value + opts.persistConfig = value return &opts } -func (o *runOptions) IncrementalConfig() IncrementalConfig { - return o.incrementalConfig +func (o *runOptions) PersistConfig() PersistConfig { + return o.persistConfig } func (o *runOptions) SetCacheSeriesMetadata(value bool) RunOptions { From 45793701add65ae63db7cf9ac8c1b4c6375a8c8d Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Mon, 17 Sep 2018 17:27:57 -0400 Subject: [PATCH 10/19] Rename --- src/dbnode/storage/bootstrap/bootstrapper/fs/source.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 73b697b6c4..1c3e438966 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -220,7 +220,7 @@ func (s *fileSystemSource) enqueueReaders( // Close the readers ch if and only if all readers are enqueued defer close(readersCh) - indexIncrementalBootstrap := run == bootstrapIndexRunType && runOpts.IncrementalConfig().Enabled + indexIncrementalBootstrap := run == bootstrapIndexRunType && runOpts.PersistConfig().Enabled if !indexIncrementalBootstrap { // Normal run, open readers s.enqueueReadersGroupedByBlockSize(ns, run, runOpts, @@ -594,7 +594,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( } } - incremental := runOpts.IncrementalConfig().Enabled + incremental := runOpts.PersistConfig().Enabled noneRemaining := remainingRanges.IsEmpty() if run == bootstrapIndexRunType && incremental && noneRemaining { err := s.incrementalBootstrapIndexSegment(ns, requestedRanges, runResult) From 760640bb58319af2979af3ca9719f0e65a7dbe29 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 18 Sep 2018 10:22:05 -0400 Subject: [PATCH 11/19] More renaming --- src/dbnode/storage/bootstrap/bootstrapper/base_test.go | 2 +- .../storage/bootstrap/bootstrapper/fs/source_data_test.go | 2 +- .../storage/bootstrap/bootstrapper/fs/source_index_test.go | 4 ++-- .../storage/bootstrap/bootstrapper/peers/source_data_test.go | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go index 6722ba9098..df5762d9d3 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go @@ -45,7 +45,7 @@ var ( testTargetStart = time.Now() testShard = uint32(0) testDefaultRunOpts = bootstrap.NewRunOptions(). - SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: false}) + SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) ) func testNsMetadata(t *testing.T) namespace.Metadata { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go index 711abbf46d..3abdafe67c 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go @@ -57,7 +57,7 @@ var ( testDirMode = os.ModeDir | os.FileMode(0755) testWriterBufferSize = 10 testDefaultRunOpts = bootstrap.NewRunOptions(). - SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: false}) + SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) testDefaultOpts = NewOptions().SetResultOptions(testDefaultResultOpts) ) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go index 8dd0f55d5c..500fa3278a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go @@ -286,7 +286,7 @@ func TestBootstrapIndexIncremental(t *testing.T) { opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope)) runOpts := testDefaultRunOpts. - SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: true}) + SetPersistConfig(bootstrap.PersistConfig{Enabled: true}) src := newFileSystemSource(opts).(*fileSystemSource) res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges, @@ -356,7 +356,7 @@ func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) { opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope)) runOpts := testDefaultRunOpts. - SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: true}) + SetPersistConfig(bootstrap.PersistConfig{Enabled: true}) src := newFileSystemSource(opts).(*fileSystemSource) res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go index 4c1134307e..50096f06db 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go @@ -59,9 +59,9 @@ var ( } testDefaultRunOpts = bootstrap.NewRunOptions(). - SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: false}) + SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) testIncrementalRunOpts = bootstrap.NewRunOptions(). - SetIncrementalConfig(bootstrap.IncrementalConfig{Enabled: true}) + SetPersistConfig(bootstrap.PersistConfig{Enabled: true}) testBlockOpts = block.NewOptions() testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) testDefaultOpts = NewOptions(). From 228a26e8d16cb951d878f6c17475ed4fd20eabec Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 18 Sep 2018 10:41:25 -0400 Subject: [PATCH 12/19] Add explanatory comment --- src/dbnode/storage/bootstrap/bootstrapper/peers/source.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 98f11d2c2b..ed8555251d 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -453,6 +453,13 @@ func (s *peersSource) incrementalFlush( bl.Close() } case persist.FileSetSnapshotType: + // Unlike the FileSetFlushType scenario, in this case the caching + // strategy doesn't matter. Even if the LRU/RecentlyRead strategies are + // enabled, we still need to keep all the data in memory long enough for it + // to be flushed because the snapshot that we wrote out earlier will only ever + // be used if the node goes down again before we have a chance to flush the data + // from memory AND the commit log bootstrapper is set before the peers bootstrapper + // in the bootstrappers configuration. default: return fmt.Errorf("unknown FileSetFileType: %v", persistConfig.FileSetType) } From 1281f763d5805a398f0c5d70892b10bc2b262258 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 18 Sep 2018 10:43:06 -0400 Subject: [PATCH 13/19] More comments --- src/dbnode/storage/bootstrap/process.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index e2dab74a19..cbba8cae16 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -345,14 +345,21 @@ func (b bootstrapProcess) targetRanges( { Range: xtime.Range{Start: start, End: midPoint}, RunOptions: b.newRunOptions().SetPersistConfig(PersistConfig{ - Enabled: true, + Enabled: true, + // These blocks are no longer active, so we want to flush them + // to disk as we receive them so that we don't hold too much + // data in memory at once. FileSetType: persist.FileSetFlushType, }), }, { Range: xtime.Range{Start: midPoint, End: cutover}, RunOptions: b.newRunOptions().SetPersistConfig(PersistConfig{ - Enabled: true, + Enabled: true, + // These blocks are still active so we'll have to keep them + // in memory, but we want to snapshot them as we receive them + // so that once bootstrapping completes we can still recover + // from just the commit log bootstrapper. FileSetType: persist.FileSetSnapshotType, }), }, From f65a4e4819282aa27b4386c039437f7fc6b9aab9 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 18 Sep 2018 13:43:03 -0400 Subject: [PATCH 14/19] Add test for invalid fileset type in persistconfig --- .../bootstrap/bootstrapper/peers/source.go | 23 +++++++++++++ .../bootstrapper/peers/source_test.go | 34 +++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index ed8555251d..c6eafabd38 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -82,6 +82,8 @@ func (s *peersSource) AvailableData( shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) result.ShardTimeRanges { + // TODO: Call validateRunOpts here when we modify this interface + // to support returning errors. return s.peerAvailability(nsMetadata, shardsTimeRanges, runOpts) } @@ -90,6 +92,10 @@ func (s *peersSource) ReadData( shardsTimeRanges result.ShardTimeRanges, opts bootstrap.RunOptions, ) (result.DataBootstrapResult, error) { + if err := s.validateRunOpts(opts); err != nil { + return nil, err + } + if shardsTimeRanges.IsEmpty() { return result.NewDataBootstrapResult(), nil } @@ -461,6 +467,7 @@ func (s *peersSource) incrementalFlush( // from memory AND the commit log bootstrapper is set before the peers bootstrapper // in the bootstrappers configuration. default: + // panic("wtf") return fmt.Errorf("unknown FileSetFileType: %v", persistConfig.FileSetType) } } @@ -544,6 +551,8 @@ func (s *peersSource) AvailableIndex( shardsTimeRanges result.ShardTimeRanges, runOpts bootstrap.RunOptions, ) result.ShardTimeRanges { + // TODO: Call validateRunOpts here when we modify this interface + // to support returning errors. return s.peerAvailability(nsMetadata, shardsTimeRanges, runOpts) } @@ -552,6 +561,10 @@ func (s *peersSource) ReadIndex( shardsTimeRanges result.ShardTimeRanges, opts bootstrap.RunOptions, ) (result.IndexBootstrapResult, error) { + if err := s.validateRunOpts(opts); err != nil { + return nil, err + } + // FOLLOWUP(r): Try to reuse any metadata fetched during the ReadData(...) // call rather than going to the network again r := result.NewIndexBootstrapResult() @@ -804,3 +817,13 @@ func (s *peersSource) markIndexResultErrorAsUnfulfilled( } r.Add(result.IndexBlock{}, unfulfilled) } + +func (s *peersSource) validateRunOpts(runOpts bootstrap.RunOptions) error { + persistConfig := runOpts.PersistConfig() + if persistConfig.FileSetType != persist.FileSetFlushType && + persistConfig.FileSetType != persist.FileSetSnapshotType { + return fmt.Errorf("unknown persist config fileset file type: %v", persistConfig.FileSetType) + } + + return nil +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go index 419ca8d6a4..087b007866 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go @@ -21,10 +21,12 @@ package peers import ( + "strings" "testing" "time" m3dbruntime "github.com/m3db/m3/src/dbnode/runtime" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/topology" tu "github.com/m3db/m3/src/dbnode/topology/testutil" @@ -158,3 +160,35 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { }) } } + +func TestPeersSourceReturnsErrorIfUnknownIncrementalFileSetType(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + testNsMd = testNamespaceMetadata(t) + resultOpts = testDefaultResultOpts + opts = testDefaultOpts.SetResultOptions(resultOpts) + ropts = testNsMd.Options().RetentionOptions() + + start = time.Now().Add(-ropts.RetentionPeriod()).Truncate(ropts.BlockSize()) + end = start.Add(2 * ropts.BlockSize()) + ) + + src, err := newPeersSource(opts) + require.NoError(t, err) + + target := result.ShardTimeRanges{ + 0: xtime.NewRanges(xtime.Range{Start: start, End: end}), + 1: xtime.NewRanges(xtime.Range{Start: start, End: end}), + } + + runOpts := testIncrementalRunOpts.SetPersistConfig(bootstrap.PersistConfig{Enabled: true, FileSetType: 999}) + _, err = src.ReadData(testNsMd, target, runOpts) + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), "unknown persist config fileset file type")) + + _, err = src.ReadIndex(testNsMd, target, runOpts) + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), "unknown persist config fileset file type")) +} From 73528612b3b0e135654b1d4144bb2da9d6982525 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 18 Sep 2018 13:59:31 -0400 Subject: [PATCH 15/19] Make fs bootstrapper ignore snapshot persists --- .../storage/bootstrap/bootstrapper/fs/source.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 1c3e438966..a83820a9a0 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -220,7 +220,7 @@ func (s *fileSystemSource) enqueueReaders( // Close the readers ch if and only if all readers are enqueued defer close(readersCh) - indexIncrementalBootstrap := run == bootstrapIndexRunType && runOpts.PersistConfig().Enabled + indexIncrementalBootstrap := run == bootstrapIndexRunType && s.isIncrementalBootstrap(runOpts) if !indexIncrementalBootstrap { // Normal run, open readers s.enqueueReadersGroupedByBlockSize(ns, run, runOpts, @@ -594,8 +594,10 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( } } - incremental := runOpts.PersistConfig().Enabled - noneRemaining := remainingRanges.IsEmpty() + var ( + incremental = s.isIncrementalBootstrap(runOpts) + noneRemaining = remainingRanges.IsEmpty() + ) if run == bootstrapIndexRunType && incremental && noneRemaining { err := s.incrementalBootstrapIndexSegment(ns, requestedRanges, runResult) if err != nil { @@ -1155,6 +1157,11 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( return res, nil } +func (s *fileSystemSource) isIncrementalBootstrap(runOpts bootstrap.RunOptions) bool { + persistConfig := runOpts.PersistConfig() + return persistConfig.Enabled && persistConfig.FileSetType == persist.FileSetFlushType +} + type timeWindowReaders struct { ranges result.ShardTimeRanges readers map[shardID]shardReaders From a2270674a12b92ec520cec846c51182d0a0863e4 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 18 Sep 2018 14:03:15 -0400 Subject: [PATCH 16/19] Add test to make sure fs skips snapshot persist --- .../bootstrapper/fs/source_index_test.go | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go index 500fa3278a..ca4e5db7ed 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go @@ -334,6 +334,60 @@ func TestBootstrapIndexIncremental(t *testing.T) { require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-write+"].Value()) } +func TestBootstrapIndexIgnoresIncrementalIfSnapshotType(t *testing.T) { + dir := createTempDir(t) + defer os.RemoveAll(dir) + + times := newTestBootstrapIndexTimes(testTimesOptions{ + numBlocks: 2, + }) + + writeTSDBGoodTaggedSeriesDataFiles(t, dir, testNs1ID, times.start) + + opts := newTestOptionsWithPersistManager(t, dir) + scope := tally.NewTestScope("", nil) + opts = opts.SetInstrumentOptions(opts.InstrumentOptions().SetMetricsScope(scope)) + + runOpts := testDefaultRunOpts. + SetPersistConfig(bootstrap.PersistConfig{ + Enabled: true, + FileSetType: persist.FileSetSnapshotType, + }) + + src := newFileSystemSource(opts).(*fileSystemSource) + res, err := src.ReadIndex(testNsMetadata(t), times.shardTimeRanges, + runOpts) + require.NoError(t, err) + + // Check that not segments were written out + infoFiles := fs.ReadIndexInfoFiles(src.fsopts.FilePathPrefix(), testNs1ID, + src.fsopts.InfoReaderBufferSize()) + require.Equal(t, 0, len(infoFiles)) + + indexResults := res.IndexResults() + + // Check that both segments are mutable + block, ok := indexResults[xtime.ToUnixNano(times.start)] + require.True(t, ok) + require.Equal(t, 1, len(block.Segments())) + _, mutable := block.Segments()[0].(segment.MutableSegment) + require.True(t, mutable) + + block, ok = indexResults[xtime.ToUnixNano(times.start.Add(testIndexBlockSize))] + require.True(t, ok) + require.Equal(t, 1, len(block.Segments())) + _, mutable = block.Segments()[0].(segment.MutableSegment) + require.True(t, mutable) + + // Validate results + validateGoodTaggedSeries(t, times.start, indexResults) + + // Validate that no index blocks were read from disk and that no files were written out + counters := scope.Snapshot().Counters() + require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-read+"].Value()) + require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-write+"].Value()) +} + func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) { dir := createTempDir(t) defer os.RemoveAll(dir) From 1b4b26990a26853da088fd3b6d768335b2a4b123 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 18 Sep 2018 14:13:42 -0400 Subject: [PATCH 17/19] Remove panic --- src/dbnode/storage/bootstrap/bootstrapper/peers/source.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index c6eafabd38..4d24afaf67 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -467,7 +467,6 @@ func (s *peersSource) incrementalFlush( // from memory AND the commit log bootstrapper is set before the peers bootstrapper // in the bootstrappers configuration. default: - // panic("wtf") return fmt.Errorf("unknown FileSetFileType: %v", persistConfig.FileSetType) } } From cedc702994a6fa5ec4d540baad467f900ba125c9 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 18 Sep 2018 14:14:08 -0400 Subject: [PATCH 18/19] Remove commented out code and add comments --- src/dbnode/storage/bootstrap/bootstrapper/peers/source.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 4d24afaf67..56473c0061 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -467,6 +467,7 @@ func (s *peersSource) incrementalFlush( // from memory AND the commit log bootstrapper is set before the peers bootstrapper // in the bootstrappers configuration. default: + // Should never happen return fmt.Errorf("unknown FileSetFileType: %v", persistConfig.FileSetType) } } @@ -821,6 +822,7 @@ func (s *peersSource) validateRunOpts(runOpts bootstrap.RunOptions) error { persistConfig := runOpts.PersistConfig() if persistConfig.FileSetType != persist.FileSetFlushType && persistConfig.FileSetType != persist.FileSetSnapshotType { + // Should never happen return fmt.Errorf("unknown persist config fileset file type: %v", persistConfig.FileSetType) } From 0b378cd379f5a462fadd2f9c2ff00b0e22e66a37 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Tue, 18 Sep 2018 15:21:59 -0400 Subject: [PATCH 19/19] Rename all the things --- src/dbnode/integration/integration.go | 2 +- .../storage/bootstrap/bootstrapper/README.md | 2 +- .../bootstrap/bootstrapper/fs/source.go | 22 ++--- .../bootstrapper/fs/source_index_test.go | 8 +- .../bootstrap/bootstrapper/fs/types.go | 4 +- .../bootstrap/bootstrapper/peers/options.go | 28 +++--- .../bootstrap/bootstrapper/peers/source.go | 97 +++++++++---------- .../bootstrapper/peers/source_data_test.go | 10 +- .../bootstrapper/peers/source_test.go | 4 +- .../bootstrap/bootstrapper/peers/types.go | 38 +++++--- src/dbnode/storage/bootstrap/process.go | 2 +- src/dbnode/storage/bootstrap/run_options.go | 4 +- src/dbnode/storage/bootstrap/types.go | 13 +-- 13 files changed, 120 insertions(+), 114 deletions(-) diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index 5068cd01de..5f7bdb69a1 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -250,7 +250,7 @@ func newDefaultBootstrappableTestSetups( SetAdminClient(adminClient). SetFetchBlocksMetadataEndpointVersion(setupOpts[i].fetchBlocksMetadataEndpointVersion). // DatabaseBlockRetrieverManager and PersistManager need to be set or we will never execute - // the incremental path + // the persist bootstrapping path SetDatabaseBlockRetrieverManager(setup.storageOpts.DatabaseBlockRetrieverManager()). SetPersistManager(setup.storageOpts.PersistManager()). SetRuntimeOptionsManager(runtimeOptsMgr) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/README.md b/src/dbnode/storage/bootstrap/bootstrapper/README.md index 8e1f59dddf..2be0917566 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/README.md +++ b/src/dbnode/storage/bootstrap/bootstrapper/README.md @@ -22,4 +22,4 @@ The peers bootstrapper similarly bootstraps all the data from peers that the fil For the recently read policy the filesystem bootstrapper will simply fulfill the time ranges requested matching without actually loading the series and blocks from the files it discovers. This relies on data been fetched lazily from the filesystem when data is required for a series that does not live on heap. -The peers bootstrapper will bootstrap all time ranges requested, and if performing an incremental bootstrap for a time range will write the data to disk and then remove the results from memory. An incremental bootstrap is used for any data that is immutable at the time that bootstrapping commences. For time ranges that are mutable the series and blocks are returned directly as a result from the bootstrapper. +The peers bootstrapper will bootstrap all time ranges requested, and if performing a bootstrap with persistence enabled for a time range, will write the data to disk and then remove the results from memory. A bootstrap with persistence enabled is used for any data that is immutable at the time that bootstrapping commences. For time ranges that are mutable the peer bootstrapper will still write the data out to disk in a durable manner, but in the form of a snapshot, and the series and blocks will still be returned directly as a result from the bootstrapper. This enables the commit log bootstrapper to recover the data in case the node shuts down before the in-memory data can be flushed. diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index a83820a9a0..1dbd78604e 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -220,15 +220,15 @@ func (s *fileSystemSource) enqueueReaders( // Close the readers ch if and only if all readers are enqueued defer close(readersCh) - indexIncrementalBootstrap := run == bootstrapIndexRunType && s.isIncrementalBootstrap(runOpts) - if !indexIncrementalBootstrap { + shouldPersistIndexBootstrap := run == bootstrapIndexRunType && s.shouldPersist(runOpts) + if !shouldPersistIndexBootstrap { // Normal run, open readers s.enqueueReadersGroupedByBlockSize(ns, run, runOpts, shardsTimeRanges, readerPool, readersCh) return } - // If the run is an index bootstrap using incremental bootstrapping + // If the run is an index bootstrap with the persist configuration enabled // then we need to write out the metadata into FSTs that we store on disk, // to avoid creating any one single huge FST at once we bucket the // shards into number of buckets @@ -595,11 +595,11 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( } var ( - incremental = s.isIncrementalBootstrap(runOpts) + shouldPersist = s.shouldPersist(runOpts) noneRemaining = remainingRanges.IsEmpty() ) - if run == bootstrapIndexRunType && incremental && noneRemaining { - err := s.incrementalBootstrapIndexSegment(ns, requestedRanges, runResult) + if run == bootstrapIndexRunType && shouldPersist && noneRemaining { + err := s.persistBootstrapIndexSegment(ns, requestedRanges, runResult) if err != nil { iopts := s.opts.ResultOptions().InstrumentOptions() log := instrument.EmitInvariantViolationAndGetLogger(iopts) @@ -607,7 +607,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( xlog.NewField("namespace", ns.ID().String()), xlog.NewField("requestedRanges", requestedRanges.String()), xlog.NewField("error", err.Error()), - ).Error("incremental fs index bootstrap failed") + ).Error("persist fs index bootstrap failed") } } @@ -757,12 +757,12 @@ func (s *fileSystemSource) readNextEntryAndIndex( return err } -func (s *fileSystemSource) incrementalBootstrapIndexSegment( +func (s *fileSystemSource) persistBootstrapIndexSegment( ns namespace.Metadata, requestedRanges result.ShardTimeRanges, runResult *runResult, ) error { - // If we're performing an index run in incremental mode + // If we're performing an index run with persistence enabled // determine if we covered a full block exactly (which should // occur since we always group readers by block size) min, max := requestedRanges.MinMax() @@ -841,7 +841,7 @@ func (s *fileSystemSource) incrementalBootstrapIndexSegment( requireFulfilled.Subtract(fulfilled) exactStartEnd := min.Equal(blockStart) && max.Equal(blockStart.Add(blockSize)) if !exactStartEnd || !requireFulfilled.IsEmpty() { - return fmt.Errorf("incremental fs index bootstrap invalid ranges to persist: expected=%v, actual=%v, fulfilled=%v", + return fmt.Errorf("persistent fs index bootstrap invalid ranges to persist: expected=%v, actual=%v, fulfilled=%v", expectedRanges.String(), requestedRanges.String(), fulfilled.String()) } @@ -1157,7 +1157,7 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( return res, nil } -func (s *fileSystemSource) isIncrementalBootstrap(runOpts bootstrap.RunOptions) bool { +func (s *fileSystemSource) shouldPersist(runOpts bootstrap.RunOptions) bool { persistConfig := runOpts.PersistConfig() return persistConfig.Enabled && persistConfig.FileSetType == persist.FileSetFlushType } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go index ca4e5db7ed..bccd65189a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go @@ -271,7 +271,7 @@ func TestBootstrapIndex(t *testing.T) { validateGoodTaggedSeries(t, times.start, indexResults) } -func TestBootstrapIndexIncremental(t *testing.T) { +func TestBootstrapIndexWithPersist(t *testing.T) { dir := createTempDir(t) defer os.RemoveAll(dir) @@ -334,7 +334,7 @@ func TestBootstrapIndexIncremental(t *testing.T) { require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-write+"].Value()) } -func TestBootstrapIndexIgnoresIncrementalIfSnapshotType(t *testing.T) { +func TestBootstrapIndexIgnoresPersistConfigIfSnapshotType(t *testing.T) { dir := createTempDir(t) defer os.RemoveAll(dir) @@ -388,7 +388,7 @@ func TestBootstrapIndexIgnoresIncrementalIfSnapshotType(t *testing.T) { require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-write+"].Value()) } -func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) { +func TestBootstrapIndexWithPersistPrefersPersistedIndexBlocks(t *testing.T) { dir := createTempDir(t) defer os.RemoveAll(dir) @@ -438,7 +438,7 @@ func TestBootstrapIndexIncrementalPrefersPersistedIndexBlocks(t *testing.T) { validateGoodTaggedSeries(t, times.start, indexResults) // Validate that read the block and no blocks were written - // (ensure incremental didn't write it back out again) + // (ensure persist config didn't write it back out again) counters := scope.Snapshot().Counters() require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-read+"].Value()) require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-write+"].Value()) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go index cb037cc21d..dd32487655 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go @@ -54,11 +54,11 @@ type Options interface { FilesystemOptions() fs.Options // SetPersistManager sets the persistence manager used to flush blocks - // when performing an incremental bootstrap run. + // when performing a bootstrap run with persistence enabled. SetPersistManager(value persist.Manager) Options // PersistManager returns the persistence manager used to flush blocks - // when performing an incremental bootstrap run. + // when performing a bootstrap run with persistence enabled. PersistManager() persist.Manager // SetBoostrapDataNumProcessors sets the number of processors for CPU-bound diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go index 9ec8d9ccbd..43b83a2b93 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go @@ -34,8 +34,8 @@ import ( var ( defaultDefaultShardConcurrency = runtime.NumCPU() - defaultIncrementalShardConcurrency = int(math.Max(1, float64(runtime.NumCPU())/2)) - defaultIncrementalPersistMaxQueueSize = 0 + defaultShardPersistenceConcurrency = int(math.Max(1, float64(runtime.NumCPU())/2)) + defaultPersistenceMaxQueueSize = 0 defaultFetchBlocksMetadataEndpointVersion = client.FetchBlocksMetadataEndpointV1 ) @@ -49,8 +49,8 @@ type options struct { resultOpts result.Options client client.AdminClient defaultShardConcurrency int - incrementalShardConcurrency int - incrementalPersistMaxQueueSize int + shardPersistenceConcurrency int + persistenceMaxQueueSize int persistManager persist.Manager blockRetrieverManager block.DatabaseBlockRetrieverManager fetchBlocksMetadataEndpointVersion client.FetchBlocksMetadataEndpointVersion @@ -62,8 +62,8 @@ func NewOptions() Options { return &options{ resultOpts: result.NewOptions(), defaultShardConcurrency: defaultDefaultShardConcurrency, - incrementalShardConcurrency: defaultIncrementalShardConcurrency, - incrementalPersistMaxQueueSize: defaultIncrementalPersistMaxQueueSize, + shardPersistenceConcurrency: defaultShardPersistenceConcurrency, + persistenceMaxQueueSize: defaultPersistenceMaxQueueSize, fetchBlocksMetadataEndpointVersion: defaultFetchBlocksMetadataEndpointVersion, } } @@ -111,24 +111,24 @@ func (o *options) DefaultShardConcurrency() int { return o.defaultShardConcurrency } -func (o *options) SetIncrementalShardConcurrency(value int) Options { +func (o *options) SetShardPersistenceConcurrency(value int) Options { opts := *o - opts.incrementalShardConcurrency = value + opts.shardPersistenceConcurrency = value return &opts } -func (o *options) IncrementalShardConcurrency() int { - return o.incrementalShardConcurrency +func (o *options) ShardPersistenceConcurrency() int { + return o.shardPersistenceConcurrency } -func (o *options) SetIncrementalPersistMaxQueueSize(value int) Options { +func (o *options) SetPersistenceMaxQueueSize(value int) Options { opts := *o - opts.incrementalPersistMaxQueueSize = value + opts.persistenceMaxQueueSize = value return &opts } -func (o *options) IncrementalPersistMaxQueueSize() int { - return o.incrementalPersistMaxQueueSize +func (o *options) PersistenceMaxQueueSize() int { + return o.persistenceMaxQueueSize } func (o *options) SetPersistManager(value persist.Manager) Options { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 56473c0061..316b34cd8a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -48,7 +48,7 @@ type peersSource struct { nowFn clock.NowFn } -type incrementalFlush struct { +type persistenceFlush struct { nsMetadata namespace.Metadata shard uint32 shardRetrieverMgr block.DatabaseShardBlockRetrieverManager @@ -105,7 +105,7 @@ func (s *peersSource) ReadData( blockRetriever block.DatabaseBlockRetriever shardRetrieverMgr block.DatabaseShardBlockRetrieverManager persistFlush persist.DataFlush - incremental = false + shouldPersist = false seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() persistConfig = opts.PersistConfig() ) @@ -115,10 +115,10 @@ func (s *peersSource) ReadData( // Neither of these should ever happen if seriesCachePolicy != series.CacheAll && retrieverMgr == nil { - s.log.Fatal("tried to perform incremental flush without retriever manager") + s.log.Fatal("tried to perform a bootstrap with persistence without retriever manager") } if seriesCachePolicy != series.CacheAll && persistManager == nil { - s.log.Fatal("tried to perform incremental flush without persist manager") + s.log.Fatal("tried to perform a bootstrap with persistence without persist manager") } s.log.WithFields( @@ -137,7 +137,7 @@ func (s *peersSource) ReadData( defer persist.DoneData() - incremental = true + shouldPersist = true blockRetriever = r shardRetrieverMgr = block.NewDatabaseShardBlockRetrieverManager(r) persistFlush = persist @@ -154,26 +154,26 @@ func (s *peersSource) ReadData( var ( resultLock sync.Mutex wg sync.WaitGroup - incrementalWorkerDoneCh = make(chan struct{}) - incrementalMaxQueue = s.opts.IncrementalPersistMaxQueueSize() - incrementalQueue = make(chan incrementalFlush, incrementalMaxQueue) + persistenceWorkerDoneCh = make(chan struct{}) + persistenceMaxQueueSize = s.opts.PersistenceMaxQueueSize() + persistenceQueue = make(chan persistenceFlush, persistenceMaxQueueSize) resultOpts = s.opts.ResultOptions() count = len(shardsTimeRanges) concurrency = s.opts.DefaultShardConcurrency() blockSize = nsMetadata.Options().RetentionOptions().BlockSize() ) - if incremental { - concurrency = s.opts.IncrementalShardConcurrency() + if shouldPersist { + concurrency = s.opts.ShardPersistenceConcurrency() } s.log.WithFields( xlog.NewField("shards", count), xlog.NewField("concurrency", concurrency), - xlog.NewField("incremental", incremental), + xlog.NewField("shouldPersist", shouldPersist), ).Infof("peers bootstrapper bootstrapping shards for ranges") - if incremental { - go s.startIncrementalQueueWorkerLoop( - opts, incrementalWorkerDoneCh, incrementalQueue, persistFlush, result, &resultLock) + if shouldPersist { + go s.startPersistenceQueueWorkerLoop( + opts, persistenceWorkerDoneCh, persistenceQueue, persistFlush, result, &resultLock) } workers := xsync.NewWorkerPool(concurrency) @@ -184,20 +184,20 @@ func (s *peersSource) ReadData( workers.Go(func() { defer wg.Done() s.fetchBootstrapBlocksFromPeers(shard, ranges, nsMetadata, session, - resultOpts, result, &resultLock, incremental, incrementalQueue, + resultOpts, result, &resultLock, shouldPersist, persistenceQueue, shardRetrieverMgr, blockSize) }) } wg.Wait() - close(incrementalQueue) - if incremental { - // Wait for the incrementalQueueWorker to finish incrementally flushing everything - <-incrementalWorkerDoneCh + close(persistenceQueue) + if shouldPersist { + // Wait for the persistenceQueueWorker to finish flushing everything + <-persistenceWorkerDoneCh } - if incremental { - // Now cache the incremental results + if shouldPersist { + // Now cache the flushed results err := s.cacheShardIndices(shardsTimeRanges, blockRetriever) if err != nil { return nil, err @@ -207,38 +207,38 @@ func (s *peersSource) ReadData( return result, nil } -// startIncrementalQueueWorkerLoop is meant to be run in its own goroutine, and it creates a worker that -// loops through the incrementalQueue and performs an incrementalFlush for each entry, ensuring that -// no more than one incremental flush is ever happening at once. Once the incrementalQueue channel +// startPersistenceQueueWorkerLoop is meant to be run in its own goroutine, and it creates a worker that +// loops through the persistenceQueue and performs a flush for each entry, ensuring that +// no more than one flush is ever happening at once. Once the persistenceQueue channel // is closed, and the worker has completed flushing all the remaining entries, it will close the // provided doneCh so that callers can block until everything has been successfully flushed. -func (s *peersSource) startIncrementalQueueWorkerLoop( +func (s *peersSource) startPersistenceQueueWorkerLoop( opts bootstrap.RunOptions, doneCh chan struct{}, - incrementalQueue chan incrementalFlush, + persistenceQueue chan persistenceFlush, persistFlush persist.DataFlush, bootstrapResult result.DataBootstrapResult, lock *sync.Mutex, ) { - // If performing an incremental bootstrap then flush one - // at a time as shard results are gathered - for flush := range incrementalQueue { - err := s.incrementalFlush(opts, persistFlush, flush.nsMetadata, flush.shard, + // If performing a bootstrap with persistence enabled then flush one + // at a time as shard results are gathered. + for flush := range persistenceQueue { + err := s.flush(opts, persistFlush, flush.nsMetadata, flush.shard, flush.shardRetrieverMgr, flush.shardResult, flush.timeRange) if err == nil { - // Safe to add to the shared bootstrap result now + // Safe to add to the shared bootstrap result now. lock.Lock() bootstrapResult.Add(flush.shard, flush.shardResult, xtime.Ranges{}) lock.Unlock() continue } - // Remove results and make unfulfilled if an error occurred + // Remove results and make unfulfilled if an error occurred. s.log.WithFields( xlog.NewField("error", err.Error()), - ).Errorf("peers bootstrapper incremental flush encountered error") + ).Errorf("peers bootstrapper bootstrap with persistence flush encountered error") - // Make unfulfilled + // Make unfulfilled. lock.Lock() bootstrapResult.Add(flush.shard, nil, xtime.NewRanges(flush.timeRange)) lock.Unlock() @@ -248,10 +248,10 @@ func (s *peersSource) startIncrementalQueueWorkerLoop( // fetchBootstrapBlocksFromPeers loops through all the provided ranges for a given shard and // fetches all the bootstrap blocks from the appropriate peers. -// Non-incremental case: Immediately add the results to the bootstrap result -// Incremental case: Don't add the results yet, but push an incrementalFlush into the -// incremental queue. The incrementalQueue worker will eventually -// add the results once its performed the incremental flush. +// Persistence enabled case: Immediately add the results to the bootstrap result +// Persistence disabled case: Don't add the results yet, but push a flush into the +// persistenceQueue. The persistenceQueue worker will eventually +// add the results once its performed the flush. func (s *peersSource) fetchBootstrapBlocksFromPeers( shard uint32, ranges xtime.Ranges, @@ -260,8 +260,8 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( bopts result.Options, bootstrapResult result.DataBootstrapResult, lock *sync.Mutex, - incremental bool, - incrementalQueue chan incrementalFlush, + shouldPersist bool, + persistenceQueue chan persistenceFlush, shardRetrieverMgr block.DatabaseShardBlockRetrieverManager, blockSize time.Duration, ) { @@ -285,8 +285,8 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( continue } - if incremental { - incrementalQueue <- incrementalFlush{ + if shouldPersist { + persistenceQueue <- persistenceFlush{ nsMetadata: nsMetadata, shard: shard, shardRetrieverMgr: shardRetrieverMgr, @@ -296,7 +296,7 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( continue } - // If not waiting to incremental flush, add straight away to bootstrap result + // If not waiting to flush, add straight away to bootstrap result lock.Lock() bootstrapResult.Add(shard, shardResult, xtime.Ranges{}) lock.Unlock() @@ -333,10 +333,9 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( } } -// incrementalFlush is used to incrementally flush peer-bootstrapped shards -// to disk as they finish so that we're not (necessarily) holding everything -// in memory at once. -// incrementalFlush starts by looping through every block in a timerange for +// flush is used to flush peer-bootstrapped shards to disk as they finish so +// that we're not (necessarily) holding everything in memory at once. +// flush starts by looping through every block in a timerange for // a given shard, and then subsequently looping through every series in that // shard/block and flushing it to disk. Depending on the series caching policy, // the series will either be held in memory, or removed from memory once @@ -351,7 +350,7 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( // (since all their corresponding blocks have been removed anyways) to prevent // a huge memory spike caused by adding lots of unused series to the Shard // object and then immediately evicting them in the next tick. -func (s *peersSource) incrementalFlush( +func (s *peersSource) flush( opts bootstrap.RunOptions, flush persist.DataFlush, nsMetadata namespace.Metadata, @@ -537,7 +536,7 @@ func (s *peersSource) cacheShardIndices( shardsTimeRanges result.ShardTimeRanges, blockRetriever block.DatabaseBlockRetriever, ) error { - // Now cache the incremental results + // Now cache the flushed results shards := make([]uint32, 0, len(shardsTimeRanges)) for shard := range shardsTimeRanges { shards = append(shards, shard) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go index 50096f06db..05a39c4c68 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go @@ -60,7 +60,7 @@ var ( testDefaultRunOpts = bootstrap.NewRunOptions(). SetPersistConfig(bootstrap.PersistConfig{Enabled: false}) - testIncrementalRunOpts = bootstrap.NewRunOptions(). + testRunOptsWithPersist = bootstrap.NewRunOptions(). SetPersistConfig(bootstrap.PersistConfig{Enabled: true}) testBlockOpts = block.NewOptions() testDefaultResultOpts = result.NewOptions().SetSeriesCachePolicy(series.CacheAll) @@ -229,7 +229,7 @@ func TestPeersSourceReturnsFulfilledAndUnfulfilled(t *testing.T) { require.Equal(t, ropts.BlockSize(), block.BlockSize()) } -func TestPeersSourceIncrementalRun(t *testing.T) { +func TestPeersSourceRunWithPersist(t *testing.T) { for _, cachePolicy := range []series.CachePolicy{ series.CacheAllMetadata, series.CacheRecentlyRead, @@ -399,7 +399,7 @@ func TestPeersSourceIncrementalRun(t *testing.T) { 1: xtime.NewRanges(xtime.Range{Start: start, End: end}), } - r, err := src.ReadData(testNsMd, target, testIncrementalRunOpts) + r, err := src.ReadData(testNsMd, target, testRunOptsWithPersist) assert.NoError(t, err) require.True(t, r.Unfulfilled()[0].IsEmpty()) @@ -446,7 +446,7 @@ func TestPeersSourceIncrementalRun(t *testing.T) { } } -func TestPeersSourceMarksUnfulfilledOnIncrementalFlushErrors(t *testing.T) { +func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -738,7 +738,7 @@ func TestPeersSourceMarksUnfulfilledOnIncrementalFlushErrors(t *testing.T) { AddRange(xtime.Range{Start: midway, End: end}), } - r, err := src.ReadData(testNsMd, target, testIncrementalRunOpts) + r, err := src.ReadData(testNsMd, target, testRunOptsWithPersist) assert.NoError(t, err) assert.Equal(t, 0, len(r.ShardResults())) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go index 087b007866..c75d6b39fc 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_test.go @@ -161,7 +161,7 @@ func TestPeersSourceAvailableDataAndIndex(t *testing.T) { } } -func TestPeersSourceReturnsErrorIfUnknownIncrementalFileSetType(t *testing.T) { +func TestPeersSourceReturnsErrorIfUnknownPersistenceFileSetType(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -183,7 +183,7 @@ func TestPeersSourceReturnsErrorIfUnknownIncrementalFileSetType(t *testing.T) { 1: xtime.NewRanges(xtime.Range{Start: start, End: end}), } - runOpts := testIncrementalRunOpts.SetPersistConfig(bootstrap.PersistConfig{Enabled: true, FileSetType: 999}) + runOpts := testRunOptsWithPersist.SetPersistConfig(bootstrap.PersistConfig{Enabled: true, FileSetType: 999}) _, err = src.ReadData(testNsMd, target, runOpts) require.Error(t, err) require.True(t, strings.Contains(err.Error(), "unknown persist config fileset file type")) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go index 18d05dbf8b..a01b4453ec 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go @@ -46,47 +46,53 @@ type Options interface { AdminClient() client.AdminClient // SetDefaultShardConcurrency sets the concurrency for - // bootstrapping shards when performing a non-incremental bootstrap. + // bootstrapping shards when performing a bootstrap with + // persistence enabled. SetDefaultShardConcurrency(value int) Options // DefaultShardConcurrency returns the concurrency for - // bootstrapping shards when performing a non-incremental bootstrap. + // bootstrapping shards when performing a bootstrap with + // persistence enabled. DefaultShardConcurrency() int - // SetIncrementalShardConcurrency sets the concurrency for - // bootstrapping shards when performing an incremental bootstrap. - SetIncrementalShardConcurrency(value int) Options + // SetShardPersistenceConcurrency sets the concurrency for + // bootstrapping shards when performing a bootstrap with + // persistence enabled. + SetShardPersistenceConcurrency(value int) Options - // IncrementalShardConcurrency returns the concurrency for - // bootstrapping shards when performing an incremental bootstrap. - IncrementalShardConcurrency() int + // ShardPersistenceConcurrency returns the concurrency for + // bootstrapping shards when performing a bootstrap with + // persistence enabled. + ShardPersistenceConcurrency() int - // SetIncrementalPersistMaxQueueSize sets the max queue for + // SetPersistenceMaxQueueSize sets the max queue for // bootstrapping shards waiting in line to persist without blocking // the concurrent shard fetchers. - SetIncrementalPersistMaxQueueSize(value int) Options + SetPersistenceMaxQueueSize(value int) Options - // IncrementalPersistMaxQueueSize returns the max queue for + // PersistenceMaxQueueSize returns the max queue for // bootstrapping shards waiting in line to persist without blocking // the concurrent shard fetchers. - IncrementalPersistMaxQueueSize() int + PersistenceMaxQueueSize() int // SetPersistManager sets the persistence manager used to flush blocks - // when performing an incremental bootstrap run. + // when performing a bootstrap with persistence. SetPersistManager(value persist.Manager) Options // PersistManager returns the persistence manager used to flush blocks - // when performing an incremental bootstrap run. + // when performing a bootstrap with persistence. PersistManager() persist.Manager // SetDatabaseBlockRetrieverManager sets the block retriever manager to - // pass to newly flushed blocks when performing an incremental bootstrap run. + // pass to newly flushed blocks when performing a bootstrap run with + // persistence enabled. SetDatabaseBlockRetrieverManager( value block.DatabaseBlockRetrieverManager, ) Options // NewBlockRetrieverFn returns the block retriever manager to - // pass to newly flushed blocks when performing an incremental bootstrap run. + // pass to newly flushed blocks when performing a bootstrap run with + // persistence enabled. DatabaseBlockRetrieverManager() block.DatabaseBlockRetrieverManager // SetFetchBlocksMetadataEndpointVersion sets the version of the fetch blocks diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index cbba8cae16..b29295fb63 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -338,7 +338,7 @@ func (b bootstrapProcess) targetRanges( Add(opts.blockSize) // NB(r): We want the large initial time range bootstrapped to - // bootstrap incrementally so we don't keep the full raw + // bootstrap with persistence so we don't keep the full raw // data in process until we finish bootstrapping which could // cause the process to OOM. return []TargetRange{ diff --git a/src/dbnode/storage/bootstrap/run_options.go b/src/dbnode/storage/bootstrap/run_options.go index d44fc29ff5..ce6014eb96 100644 --- a/src/dbnode/storage/bootstrap/run_options.go +++ b/src/dbnode/storage/bootstrap/run_options.go @@ -23,8 +23,8 @@ package bootstrap import "github.com/m3db/m3/src/dbnode/topology" var ( - // defaultPersistConfig declares the intent to by default not perform an - // incremental bootstrap. + // defaultPersistConfig declares the intent to by default to perform + // a bootstrap without persistence enabled. defaultPersistConfig = PersistConfig{ Enabled: false, } diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index eb800bdf01..a81f416922 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -69,12 +69,13 @@ type TargetRange struct { RunOptions RunOptions } -// PersistConfig is the configuration for an incremental bootstrap. +// PersistConfig is the configuration for a bootstrap with persistence. type PersistConfig struct { - // Whether this bootstrap should incrementally persist data to disk. + // If enabled bootstrappers are allowed to write out bootstrapped data + // to disk on their own instead of just returning result in-memory. Enabled bool - // If enabled, what type of files should be generated during the incremental - // process. + // If enabled, what type of persistence files should be generated during + // the process. FileSetType persist.FileSetType } @@ -100,10 +101,10 @@ type ProcessOptions interface { // RunOptions is a set of options for a bootstrap run. type RunOptions interface { - // SetPersistConfig sets incremental configuration for this bootstrap. + // SetPersistConfig sets persistence configuration for this bootstrap. SetPersistConfig(value PersistConfig) RunOptions - // PersistConfig returns the incremental configuration for this bootstrap. + // PersistConfig returns the persistence configuration for this bootstrap. PersistConfig() PersistConfig // SetCacheSeriesMetadata sets whether bootstrappers created by this