diff --git a/src/cmd/services/m3dbnode/config/bootstrap.go b/src/cmd/services/m3dbnode/config/bootstrap.go index 108b9dc824..09d618b82c 100644 --- a/src/cmd/services/m3dbnode/config/bootstrap.go +++ b/src/cmd/services/m3dbnode/config/bootstrap.go @@ -153,6 +153,34 @@ type BootstrapConfiguration struct { // IndexSegmentConcurrency determines the concurrency for building index // segments. IndexSegmentConcurrency *int `yaml:"indexSegmentConcurrency"` + + // Verify specifies verification checks. + Verify *BootstrapVerifyConfiguration `yaml:"verify"` +} + +// VerifyOrDefault returns verify configuration or default. +func (bsc BootstrapConfiguration) VerifyOrDefault() BootstrapVerifyConfiguration { + if bsc.Verify == nil { + return BootstrapVerifyConfiguration{} + } + + return *bsc.Verify +} + +// BootstrapVerifyConfiguration outlines verification checks to enable +// during a bootstrap. +type BootstrapVerifyConfiguration struct { + VerifyIndexSegments *bool `yaml:"verifyIndexSegments"` +} + +// VerifyIndexSegmentsOrDefault returns whether to verify index segments +// or use default value. +func (c BootstrapVerifyConfiguration) VerifyIndexSegmentsOrDefault() bool { + if c.VerifyIndexSegments == nil { + return false + } + + return *c.VerifyIndexSegments } // BootstrapFilesystemConfiguration specifies config for the fs bootstrapper. @@ -288,11 +316,13 @@ func (bsc BootstrapConfiguration) New( SetFilesystemOptions(fsOpts). SetIndexOptions(opts.IndexOptions()). SetPersistManager(opts.PersistManager()). + SetIndexClaimsManager(opts.IndexClaimsManager()). SetCompactor(compactor). SetRuntimeOptionsManager(opts.RuntimeOptionsManager()). SetIdentifierPool(opts.IdentifierPool()). SetMigrationOptions(fsCfg.migration().NewOptions()). - SetStorageOptions(opts) + SetStorageOptions(opts). + SetIndexSegmentsVerify(bsc.VerifyOrDefault().VerifyIndexSegmentsOrDefault()) if v := bsc.IndexSegmentConcurrency; v != nil { fsbOpts = fsbOpts.SetIndexSegmentConcurrency(*v) } @@ -329,6 +359,7 @@ func (bsc BootstrapConfiguration) New( SetIndexOptions(opts.IndexOptions()). SetAdminClient(adminClient). SetPersistManager(opts.PersistManager()). + SetIndexClaimsManager(opts.IndexClaimsManager()). SetCompactor(compactor). SetRuntimeOptionsManager(opts.RuntimeOptionsManager()). SetContextPool(opts.ContextPool()). diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 5c98086e00..c41a099429 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -418,6 +418,7 @@ func TestConfiguration(t *testing.T) { peers: null cacheSeriesMetadata: null indexSegmentConcurrency: null + verify: null blockRetrieve: null cache: series: null diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index 31883a58fb..a5837fa673 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -272,9 +272,10 @@ func newDefaultBootstrappableTestSetups( SetAdminClient(adminClient). SetIndexOptions(storageIdxOpts). SetFilesystemOptions(fsOpts). - // DatabaseBlockRetrieverManager and PersistManager need to be set or we will never execute + // PersistManager need to be set or we will never execute // the persist bootstrapping path SetPersistManager(setup.StorageOpts().PersistManager()). + SetIndexClaimsManager(setup.StorageOpts().IndexClaimsManager()). SetCompactor(newCompactor(t, storageIdxOpts)). SetRuntimeOptionsManager(runtimeOptsMgr). SetContextPool(setup.StorageOpts().ContextPool()) @@ -285,13 +286,14 @@ func newDefaultBootstrappableTestSetups( persistMgr, err := persistfs.NewPersistManager(fsOpts) require.NoError(t, err) - + icm := persistfs.NewIndexClaimsManager(fsOpts) bfsOpts := bfs.NewOptions(). SetResultOptions(bsOpts). SetFilesystemOptions(fsOpts). SetIndexOptions(storageIdxOpts). SetCompactor(newCompactor(t, storageIdxOpts)). - SetPersistManager(persistMgr) + SetPersistManager(persistMgr). + SetIndexClaimsManager(icm) fsBootstrapper, err := bfs.NewFileSystemBootstrapperProvider(bfsOpts, finalBootstrapper) require.NoError(t, err) diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index b14123c4d3..19d391acf2 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -380,7 +380,8 @@ func NewTestSetup( if fsOpts == nil { fsOpts = fs.NewOptions(). - SetFilePathPrefix(filePathPrefix) + SetFilePathPrefix(filePathPrefix). + SetClockOptions(storageOpts.ClockOptions()) } storageOpts = storageOpts.SetCommitLogOptions( @@ -394,6 +395,10 @@ func NewTestSetup( } storageOpts = storageOpts.SetPersistManager(pm) + // Set up index claims manager + icm := fs.NewIndexClaimsManager(fsOpts) + storageOpts = storageOpts.SetIndexClaimsManager(icm) + // Set up repair options storageOpts = storageOpts. SetRepairOptions(storageOpts.RepairOptions(). @@ -931,6 +936,7 @@ func (ts *testSetup) InitializeBootstrappers(opts InitializeBootstrappersOptions if err != nil { return err } + icm := fs.NewIndexClaimsManager(fsOpts) storageIdxOpts := storageOpts.IndexOptions() compactor, err := newCompactorWithErr(storageIdxOpts) if err != nil { @@ -941,6 +947,7 @@ func (ts *testSetup) InitializeBootstrappers(opts InitializeBootstrappersOptions SetFilesystemOptions(fsOpts). SetIndexOptions(storageIdxOpts). SetPersistManager(persistMgr). + SetIndexClaimsManager(icm). SetCompactor(compactor) bs, err = bfs.NewFileSystemBootstrapperProvider(bfsOpts, bs) if err != nil { diff --git a/src/dbnode/persist/fs/index_claims_manager.go b/src/dbnode/persist/fs/index_claims_manager.go new file mode 100644 index 0000000000..e7d42f5cc8 --- /dev/null +++ b/src/dbnode/persist/fs/index_claims_manager.go @@ -0,0 +1,128 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "errors" + "sync" + "time" + + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/ident" + xtime "github.com/m3db/m3/src/x/time" +) + +var errOutOfRetentionClaim = errors.New("out of retention index volume claim") + +type indexClaimsManager struct { + sync.Mutex + + filePathPrefix string + nowFn clock.NowFn + nextIndexFileSetVolumeIndexFn nextIndexFileSetVolumeIndexFn + + // Map of ns ID string -> blockStart -> volumeIndexClaim. + volumeIndexClaims map[string]map[xtime.UnixNano]volumeIndexClaim +} + +type volumeIndexClaim struct { + volumeIndex int +} + +// NewIndexClaimsManager returns an instance of the index claim manager. This manages +// concurrent claims for volume indices per ns and block start. +// NB(bodu): There should be only a single shared index claim manager among all threads +// writing index data filesets. +func NewIndexClaimsManager(opts Options) IndexClaimsManager { + return &indexClaimsManager{ + filePathPrefix: opts.FilePathPrefix(), + nowFn: opts.ClockOptions().NowFn(), + volumeIndexClaims: make(map[string]map[xtime.UnixNano]volumeIndexClaim), + nextIndexFileSetVolumeIndexFn: NextIndexFileSetVolumeIndex, + } +} + +func (i *indexClaimsManager) ClaimNextIndexFileSetVolumeIndex( + md namespace.Metadata, + blockStart time.Time, +) (int, error) { + i.Lock() + earliestBlockStart := retention.FlushTimeStartForRetentionPeriod( + md.Options().RetentionOptions().RetentionPeriod(), + md.Options().IndexOptions().BlockSize(), + i.nowFn(), + ) + defer func() { + i.deleteOutOfRetentionEntriesWithLock(md.ID(), earliestBlockStart) + i.Unlock() + }() + + // Reject out of retention claims. + if blockStart.Before(earliestBlockStart) { + return 0, errOutOfRetentionClaim + } + + volumeIndexClaimsByBlockStart, ok := i.volumeIndexClaims[md.ID().String()] + if !ok { + volumeIndexClaimsByBlockStart = make(map[xtime.UnixNano]volumeIndexClaim) + i.volumeIndexClaims[md.ID().String()] = volumeIndexClaimsByBlockStart + } + + blockStartUnixNanos := xtime.ToUnixNano(blockStart) + if curr, ok := volumeIndexClaimsByBlockStart[blockStartUnixNanos]; ok { + // Already had a previous claim, return the next claim. + next := curr + next.volumeIndex++ + volumeIndexClaimsByBlockStart[blockStartUnixNanos] = next + return next.volumeIndex, nil + } + + volumeIndex, err := i.nextIndexFileSetVolumeIndexFn(i.filePathPrefix, md.ID(), + blockStart) + if err != nil { + return 0, err + } + volumeIndexClaimsByBlockStart[blockStartUnixNanos] = volumeIndexClaim{ + volumeIndex: volumeIndex, + } + return volumeIndex, nil +} + +func (i *indexClaimsManager) deleteOutOfRetentionEntriesWithLock( + nsID ident.ID, + earliestBlockStart time.Time, +) { + earliestBlockStartUnixNanos := xtime.ToUnixNano(earliestBlockStart) + // ns ID already exists at this point since the delete call is deferred. + for blockStart := range i.volumeIndexClaims[nsID.String()] { + if blockStart.Before(earliestBlockStartUnixNanos) { + delete(i.volumeIndexClaims[nsID.String()], blockStart) + } + } +} + +type nextIndexFileSetVolumeIndexFn func( + filePathPrefix string, + namespace ident.ID, + blockStart time.Time, +) (int, error) diff --git a/src/dbnode/persist/fs/index_claims_manager_test.go b/src/dbnode/persist/fs/index_claims_manager_test.go new file mode 100644 index 0000000000..de7755e271 --- /dev/null +++ b/src/dbnode/persist/fs/index_claims_manager_test.go @@ -0,0 +1,115 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/x/ident" + xtime "github.com/m3db/m3/src/x/time" +) + +func TestIndexClaimsManagerConcurrentClaims(t *testing.T) { + mgr, ok := NewIndexClaimsManager(NewOptions()).(*indexClaimsManager) + require.True(t, ok) + + // Always return 0 for starting volume index for testing purposes. + mgr.nextIndexFileSetVolumeIndexFn = func( + filePathPrefix string, + namespace ident.ID, + blockStart time.Time, + ) (int, error) { + return 0, nil + } + + md, err := namespace.NewMetadata(ident.StringID("foo"), namespace.NewOptions()) + require.NoError(t, err) + + var ( + m sync.Map + wg sync.WaitGroup + blockSize = md.Options().IndexOptions().BlockSize() + blockStart = time.Now().Truncate(blockSize) + ) + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + volumeIndex, err := mgr.ClaimNextIndexFileSetVolumeIndex( + md, + blockStart, + ) + require.NoError(t, err) + _, loaded := m.LoadOrStore(volumeIndex, true) + // volume index should not have been previously stored or + // there are conflicting volume indices. + require.False(t, loaded) + } + }() + } + wg.Wait() +} + +// TestIndexClaimsManagerOutOfRetention ensure that we both reject and delete out of +// retention index claims. +func TestIndexClaimsManagerOutOfRetention(t *testing.T) { + mgr, ok := NewIndexClaimsManager(NewOptions()).(*indexClaimsManager) + require.True(t, ok) + + // Always return 0 for starting volume index for testing purposes. + mgr.nextIndexFileSetVolumeIndexFn = func( + filePathPrefix string, + namespace ident.ID, + blockStart time.Time, + ) (int, error) { + return 0, nil + } + + md, err := namespace.NewMetadata(ident.StringID("foo"), namespace.NewOptions()) + blockSize := md.Options().IndexOptions().BlockSize() + blockStart := time.Now().Truncate(blockSize) + require.NoError(t, err) + + _, err = mgr.ClaimNextIndexFileSetVolumeIndex( + md, + blockStart, + ) + require.NoError(t, err) + + now := mgr.nowFn().Add(md.Options().RetentionOptions().RetentionPeriod()). + Add(blockSize) + mgr.nowFn = func() time.Time { return now } + _, err = mgr.ClaimNextIndexFileSetVolumeIndex( + md, + blockStart, + ) + require.Equal(t, errOutOfRetentionClaim, err) + + // Verify that the out of retention entry has been deleted as well. + _, ok = mgr.volumeIndexClaims[md.ID().String()][xtime.ToUnixNano(blockStart)] + require.False(t, ok) +} diff --git a/src/dbnode/persist/fs/index_read_segments.go b/src/dbnode/persist/fs/index_read_segments.go index 73a398b792..d3c1fcb668 100644 --- a/src/dbnode/persist/fs/index_read_segments.go +++ b/src/dbnode/persist/fs/index_read_segments.go @@ -22,6 +22,7 @@ package fs import ( "errors" + "fmt" "io" "github.com/m3db/m3/src/m3ninx/index/segment" @@ -84,12 +85,6 @@ func ReadIndexSegments( success = false ) - if validate { - if err = reader.Validate(); err != nil { - return ReadIndexSegmentsResult{}, err - } - } - // Need to do this to guarantee we release all resources in case of failure. defer func() { if !success { @@ -123,6 +118,13 @@ func ReadIndexSegments( segments = append(segments, seg) } + // Note: need to validate after all segment file sets read. + if validate { + if err = reader.Validate(); err != nil { + return ReadIndexSegmentsResult{}, fmt.Errorf("failed to validate index segments: %w", err) + } + } + // Indicate we don't need the defer() above to release any resources, as we are // transferring ownership to the caller. success = true diff --git a/src/dbnode/persist/fs/migration/migration_test.go b/src/dbnode/persist/fs/migration/migration_test.go index 8d81f14428..d8a654314a 100644 --- a/src/dbnode/persist/fs/migration/migration_test.go +++ b/src/dbnode/persist/fs/migration/migration_test.go @@ -50,17 +50,17 @@ func TestToVersion1_1Run(t *testing.T) { defer os.RemoveAll(dir) var shard uint32 = 1 - nsId := ident.StringID("foo") + nsID := ident.StringID("foo") // Write unmigrated fileset to disk - fsOpts := writeUnmigratedData(t, filePathPrefix, nsId, shard) + fsOpts := writeUnmigratedData(t, filePathPrefix, nsID, shard) // Read info file of just written fileset - results := fs.ReadInfoFiles(filePathPrefix, nsId, shard, + results := fs.ReadInfoFiles(filePathPrefix, nsID, shard, fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions(), persist.FileSetFlushType) require.Equal(t, 1, len(results)) infoFileResult := results[0] - indexFd := openFile(t, fsOpts, nsId, shard, infoFileResult, "index") + indexFd := openFile(t, fsOpts, nsID, shard, infoFileResult, "index") oldBytes, err := ioutil.ReadAll(indexFd) require.NoError(t, err) @@ -68,8 +68,9 @@ func TestToVersion1_1Run(t *testing.T) { pm, err := fs.NewPersistManager( fsOpts.SetEncodingOptions(msgpack.DefaultLegacyEncodingOptions)) // Set encoder to most up-to-date version require.NoError(t, err) + icm := fs.NewIndexClaimsManager(fsOpts) - md, err := namespace.NewMetadata(nsId, namespace.NewOptions()) + md, err := namespace.NewMetadata(nsID, namespace.NewOptions()) require.NoError(t, err) plCache, closer, err := index.NewPostingsListCache(1, index.PostingsListCacheOptions{ @@ -83,6 +84,7 @@ func TestToVersion1_1Run(t *testing.T) { SetNamespaceMetadata(md). SetStorageOptions(storage.NewOptions(). SetPersistManager(pm). + SetIndexClaimsManager(icm). SetNamespaceInitializer(namespace.NewStaticInitializer([]namespace.Metadata{md})). SetRepairEnabled(false). SetIndexOptions(index.NewOptions(). @@ -99,7 +101,7 @@ func TestToVersion1_1Run(t *testing.T) { require.NoError(t, err) // Read new info file and make sure it matches results returned by task - newInfoFd := openFile(t, fsOpts, nsId, shard, updatedInfoFile, "info") + newInfoFd := openFile(t, fsOpts, nsID, shard, updatedInfoFile, "info") newInfoBytes, err := ioutil.ReadAll(newInfoFd) require.NoError(t, err) @@ -111,7 +113,7 @@ func TestToVersion1_1Run(t *testing.T) { require.Equal(t, updatedInfoFile.Info, info) // Read the index entries of new volume set - indexFd = openFile(t, fsOpts, nsId, shard, updatedInfoFile, "index") + indexFd = openFile(t, fsOpts, nsID, shard, updatedInfoFile, "index") newBytes, err := ioutil.ReadAll(indexFd) require.NoError(t, err) @@ -129,18 +131,23 @@ func TestToVersion1_1Run(t *testing.T) { func openFile( t *testing.T, fsOpts fs.Options, - nsId ident.ID, + nsID ident.ID, shard uint32, infoFileResult fs.ReadInfoFileResult, fileType string, ) *os.File { indexFd, err := os.Open(path.Join(fsOpts.FilePathPrefix(), fmt.Sprintf("data/%s/%d/fileset-%d-%d-%s.db", - nsId.String(), shard, infoFileResult.Info.BlockStart, infoFileResult.Info.VolumeIndex, fileType))) + nsID.String(), shard, infoFileResult.Info.BlockStart, infoFileResult.Info.VolumeIndex, fileType))) require.NoError(t, err) return indexFd } -func writeUnmigratedData(t *testing.T, filePathPrefix string, nsId ident.ID, shard uint32) fs.Options { +func writeUnmigratedData( + t *testing.T, + filePathPrefix string, + nsID ident.ID, + shard uint32, +) fs.Options { // Use encoding options that will not generate entry level checksums eOpts := msgpack.LegacyEncodingOptions{EncodeLegacyIndexEntryVersion: msgpack.LegacyEncodingIndexEntryVersionV2} @@ -154,7 +161,7 @@ func writeUnmigratedData(t *testing.T, filePathPrefix string, nsId ident.ID, sha blockStart := time.Now().Truncate(time.Hour) writerOpts := fs.DataWriterOpenOptions{ Identifier: fs.FileSetFileIdentifier{ - Namespace: nsId, + Namespace: nsID, Shard: shard, BlockStart: blockStart, VolumeIndex: 0, diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index 8d201a298b..1051301a69 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -255,22 +255,12 @@ func (pm *persistManager) PrepareIndex(opts persist.IndexPrepareOptions) (persis return prepared, errPersistManagerCannotPrepareIndexNotPersisting } - // NB(prateek): unlike data flush files, we allow multiple index flush files for a single block start. - // As a result of this, every time we persist index flush data, we have to compute the volume index - // to uniquely identify a single FileSetFile on disk. - - // work out the volume index for the next Index FileSetFile for the given namespace/blockstart - volumeIndex, err := NextIndexFileSetVolumeIndex(pm.opts.FilePathPrefix(), nsMetadata.ID(), blockStart) - if err != nil { - return prepared, err - } - // we now have all the identifier needed to uniquely specificy a single Index FileSetFile on disk. fileSetID := FileSetFileIdentifier{ FileSetContentType: persist.FileSetIndexContentType, Namespace: nsID, BlockStart: blockStart, - VolumeIndex: volumeIndex, + VolumeIndex: opts.VolumeIndex, } blockSize := nsMetadata.Options().IndexOptions().BlockSize() idxWriterOpts := IndexWriterOpenOptions{ diff --git a/src/dbnode/persist/fs/persist_manager_test.go b/src/dbnode/persist/fs/persist_manager_test.go index b582680671..cd0f39386d 100644 --- a/src/dbnode/persist/fs/persist_manager_test.go +++ b/src/dbnode/persist/fs/persist_manager_test.go @@ -346,10 +346,17 @@ func TestPersistenceManagerPrepareIndexFileExists(t *testing.T) { segWriter.EXPECT().Reset(nil) assert.NoError(t, flush.DoneIndex()) }() + volumeIndex, err := NextIndexFileSetVolumeIndex( + pm.filePathPrefix, + testNs1ID, + blockStart, + ) + require.NoError(t, err) prepareOpts := persist.IndexPrepareOptions{ NamespaceMetadata: testNs1Metadata(t), BlockStart: blockStart, + VolumeIndex: volumeIndex, } writer.EXPECT().Open(xtest.CmpMatcher( IndexWriterOpenOptions{ diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index d043f9b3ce..55a3e70874 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -719,3 +719,12 @@ type CrossBlockIterator interface { // Reset resets the iterator to the given block records. Reset(records []BlockRecord) } + +// IndexClaimsManager manages concurrent claims to volume indices per ns and block start. +// This allows multiple threads to safely increment the volume index. +type IndexClaimsManager interface { + ClaimNextIndexFileSetVolumeIndex( + md namespace.Metadata, + blockStart time.Time, + ) (int, error) +} diff --git a/src/dbnode/persist/types.go b/src/dbnode/persist/types.go index a24d4040b1..f93f28bb56 100644 --- a/src/dbnode/persist/types.go +++ b/src/dbnode/persist/types.go @@ -271,6 +271,7 @@ type IndexPrepareOptions struct { FileSetType FileSetType Shards map[uint32]struct{} IndexVolumeType idxpersist.IndexVolumeType + VolumeIndex int } // DataPrepareSnapshotOptions is the options struct for the Prepare method that contains diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 3e619835ca..31be18cfb0 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -612,6 +612,10 @@ func Run(runOpts RunOptions) { } opts = opts.SetPersistManager(pm) + // Set the index claims manager + icm := fs.NewIndexClaimsManager(fsopts) + opts = opts.SetIndexClaimsManager(icm) + var ( envCfgResults environment.ConfigureResults ) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/options.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/options.go index 89e4787aa2..5790b957d7 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/options.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/options.go @@ -40,11 +40,12 @@ import ( ) var ( - errPersistManagerNotSet = errors.New("persist manager not set") - errCompactorNotSet = errors.New("compactor not set") - errIndexOptionsNotSet = errors.New("index options not set") - errFilesystemOptionsNotSet = errors.New("filesystem options not set") - errMigrationOptionsNotSet = errors.New("migration options not set") + errPersistManagerNotSet = errors.New("persist manager not set") + errIndexClaimsManagerNotSet = errors.New("index claims manager not set") + errCompactorNotSet = errors.New("compactor not set") + errIndexOptionsNotSet = errors.New("index options not set") + errFilesystemOptionsNotSet = errors.New("filesystem options not set") + errMigrationOptionsNotSet = errors.New("migration options not set") // NB(r): Bootstrapping data doesn't use large amounts of memory // that won't be released, so its fine to do this as fast as possible. @@ -60,6 +61,9 @@ var ( // defaultIndexSegmentConcurrency defines the default index segment building concurrency. defaultIndexSegmentConcurrency = 1 + + // defaultIndexSegmentsVerify defines default for index segments validation. + defaultIndexSegmentsVerify = false ) type options struct { @@ -68,8 +72,10 @@ type options struct { fsOpts fs.Options indexOpts index.Options persistManager persist.Manager + indexClaimsManager fs.IndexClaimsManager compactor *compaction.Compactor indexSegmentConcurrency int + indexSegmentsVerify bool runtimeOptsMgr runtime.OptionsManager identifierPool ident.Pool migrationOpts migration.Options @@ -88,6 +94,7 @@ func NewOptions() Options { instrumentOpts: instrument.NewOptions(), resultOpts: result.NewOptions(), indexSegmentConcurrency: defaultIndexSegmentConcurrency, + indexSegmentsVerify: defaultIndexSegmentsVerify, runtimeOptsMgr: runtime.NewOptionsManager(), identifierPool: idPool, migrationOpts: migration.NewOptions(), @@ -99,6 +106,9 @@ func (o *options) Validate() error { if o.persistManager == nil { return errPersistManagerNotSet } + if o.indexClaimsManager == nil { + return errIndexClaimsManagerNotSet + } if o.compactor == nil { return errCompactorNotSet } @@ -170,6 +180,16 @@ func (o *options) PersistManager() persist.Manager { return o.persistManager } +func (o *options) SetIndexClaimsManager(value fs.IndexClaimsManager) Options { + opts := *o + opts.indexClaimsManager = value + return &opts +} + +func (o *options) IndexClaimsManager() fs.IndexClaimsManager { + return o.indexClaimsManager +} + func (o *options) SetCompactor(value *compaction.Compactor) Options { opts := *o opts.compactor = value @@ -190,6 +210,16 @@ func (o *options) IndexSegmentConcurrency() int { return o.indexSegmentConcurrency } +func (o *options) SetIndexSegmentsVerify(value bool) Options { + opts := *o + opts.indexSegmentsVerify = value + return &opts +} + +func (o *options) IndexSegmentsVerify() bool { + return o.indexSegmentsVerify +} + func (o *options) SetRuntimeOptionsManager(value runtime.OptionsManager) Options { opts := *o opts.runtimeOptsMgr = value diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 90706d74d3..a9a790d15f 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -329,9 +329,7 @@ func (s *fileSystemSource) bootstrapFromReaders( persistManager *bootstrapper.SharedPersistManager, compactor *bootstrapper.SharedCompactor, ) { - var ( - resultOpts = s.opts.ResultOptions() - ) + resultOpts := s.opts.ResultOptions() for timeWindowReaders := range readersCh { // NB(bodu): Since we are re-using the same builder for all bootstrapped index blocks, @@ -586,6 +584,7 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( requestedRanges, builder.Builder(), persistManager, + s.opts.IndexClaimsManager(), s.opts.ResultOptions(), existingIndexBlock.Fulfilled(), blockStart, @@ -982,12 +981,22 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( continue } + fsOpts := s.fsopts + verify := s.opts.IndexSegmentsVerify() + if verify { + // Make sure for this call to read index segments + // to validate the index segment. + // If fails validation will rebuild since missing from + // fulfilled range. + fsOpts = fsOpts.SetIndexReaderAutovalidateIndexSegments(true) + } + readResult, err := fs.ReadIndexSegments(fs.ReadIndexSegmentsOptions{ ReaderOptions: fs.IndexReaderOpenOptions{ Identifier: infoFile.ID, FileSetType: persist.FileSetFlushType, }, - FilesystemOptions: s.fsopts, + FilesystemOptions: fsOpts, }) if err != nil { s.log.Error("unable to read segments from index fileset", diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go index f67c75c82c..6c0c30b3a3 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go @@ -95,11 +95,13 @@ func newTestOptions(t require.TestingT, filePathPrefix string) Options { fsOpts := newTestFsOptions(filePathPrefix) pm, err := fs.NewPersistManager(fsOpts) require.NoError(t, err) + icm := fs.NewIndexClaimsManager(fsOpts) return testDefaultOpts. SetCompactor(compactor). SetIndexOptions(idxOpts). SetFilesystemOptions(fsOpts). - SetPersistManager(pm) + SetPersistManager(pm). + SetIndexClaimsManager(icm) } func newTestOptionsWithPersistManager(t require.TestingT, filePathPrefix string) Options { @@ -936,7 +938,8 @@ func TestReadRunMigrations(t *testing.T) { writeGoodFilesWithFsOpts(t, testNs1ID, testShard, newTestFsOptions(dir).SetEncodingOptions(eOpts)) opts := newTestOptions(t, dir) - sOpts, closer := newTestStorageOptions(t, opts.PersistManager()) + icm := fs.NewIndexClaimsManager(opts.FilesystemOptions()) + sOpts, closer := newTestStorageOptions(t, opts.PersistManager(), icm) defer closer() src, err := newFileSystemSource(opts. @@ -949,7 +952,11 @@ func TestReadRunMigrations(t *testing.T) { validateReadResults(t, src, dir, testShardTimeRanges()) } -func newTestStorageOptions(t *testing.T, pm persist.Manager) (storage.Options, index.Closer) { +func newTestStorageOptions( + t *testing.T, + pm persist.Manager, + icm fs.IndexClaimsManager, +) (storage.Options, index.Closer) { plCache, closer, err := index.NewPostingsListCache(1, index.PostingsListCacheOptions{ InstrumentOptions: instrument.NewOptions(), }) @@ -960,6 +967,7 @@ func newTestStorageOptions(t *testing.T, pm persist.Manager) (storage.Options, i return storage.NewOptions(). SetPersistManager(pm). + SetIndexClaimsManager(icm). SetNamespaceInitializer(namespace.NewStaticInitializer([]namespace.Metadata{md})). SetRepairEnabled(false). SetIndexOptions(index.NewOptions(). diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go index 778f133b52..253561338b 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/types.go @@ -64,6 +64,13 @@ type Options interface { // when performing a bootstrap run with persistence enabled. PersistManager() persist.Manager + // SetIndexClaimsManager sets the index claims manager. + SetIndexClaimsManager(value fs.IndexClaimsManager) Options + + // IndexClaimsManager returns the index claims manager. It's used to manage + // concurrent claims for volume indices per ns and block start. + IndexClaimsManager() fs.IndexClaimsManager + // SetCompactor sets the compactor used to compact segment builders into segments. SetCompactor(value *compaction.Compactor) Options @@ -78,6 +85,14 @@ type Options interface { // building index segments. IndexSegmentConcurrency() int + // SetIndexSegmentsVerify sets the value for whether to verify bootstrapped + // index segments. + SetIndexSegmentsVerify(value bool) Options + + // IndexSegmentsVerify returns the value for whether to verify bootstrapped + // index segments. + IndexSegmentsVerify() bool + // SetRuntimeOptionsManager sets the runtime options manager. SetRuntimeOptionsManager(value runtime.OptionsManager) Options diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go index cf7856d7d2..882f4366e5 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/options.go @@ -61,6 +61,7 @@ var ( var ( errAdminClientNotSet = errors.New("admin client not set") errPersistManagerNotSet = errors.New("persist manager not set") + errIndexClaimsManagerNotSet = errors.New("index claims manager not set") errCompactorNotSet = errors.New("compactor not set") errIndexOptionsNotSet = errors.New("index options not set") errFilesystemOptionsNotSet = errors.New("filesystem options not set") @@ -76,6 +77,7 @@ type options struct { indexSegmentConcurrency int persistenceMaxQueueSize int persistManager persist.Manager + indexClaimsManager fs.IndexClaimsManager runtimeOptionsManager m3dbruntime.OptionsManager contextPool context.Pool fsOpts fs.Options @@ -106,6 +108,9 @@ func (o *options) Validate() error { if o.persistManager == nil { return errPersistManagerNotSet } + if o.indexClaimsManager == nil { + return errIndexClaimsManagerNotSet + } if o.compactor == nil { return errCompactorNotSet } @@ -204,6 +209,16 @@ func (o *options) PersistManager() persist.Manager { return o.persistManager } +func (o *options) SetIndexClaimsManager(value fs.IndexClaimsManager) Options { + opts := *o + opts.indexClaimsManager = value + return &opts +} + +func (o *options) IndexClaimsManager() fs.IndexClaimsManager { + return o.indexClaimsManager +} + func (o *options) SetCompactor(value *compaction.Compactor) Options { opts := *o opts.compactor = value diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/peers_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/peers_test.go index 0f2310c010..0f8bce23fb 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/peers_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/peers_test.go @@ -60,12 +60,15 @@ func TestNewPeersBootstrapper(t *testing.T) { }) require.NoError(t, err) + fsOpts := fs.NewOptions() + icm := fs.NewIndexClaimsManager(fsOpts) opts := NewOptions(). SetFilesystemOptions(fs.NewOptions()). SetIndexOptions(idxOpts). SetAdminClient(client.NewMockAdminClient(ctrl)). SetPersistManager(persist.NewMockManager(ctrl)). - SetFilesystemOptions(fs.NewOptions()). + SetIndexClaimsManager(icm). + SetFilesystemOptions(fsOpts). SetCompactor(compactor). SetRuntimeOptionsManager(runtime.NewMockOptionsManager(ctrl)) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index d4f7e05560..e9768fb0f5 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -929,6 +929,7 @@ func (s *peersSource) processReaders( requestedRanges, builder.Builder(), persistManager, + s.opts.IndexClaimsManager(), s.opts.ResultOptions(), existingIndexBlock.Fulfilled(), blockStart, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go index d297c5a31e..b2baf0e1a1 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_data_test.go @@ -98,11 +98,14 @@ func newTestDefaultOpts(t *testing.T, ctrl *gomock.Controller) Options { }, }) require.NoError(t, err) + fsOpts := fs.NewOptions() + icm := fs.NewIndexClaimsManager(fsOpts) return NewOptions(). SetResultOptions(testDefaultResultOpts). SetPersistManager(persist.NewMockManager(ctrl)). + SetIndexClaimsManager(icm). SetAdminClient(client.NewMockAdminClient(ctrl)). - SetFilesystemOptions(fs.NewOptions()). + SetFilesystemOptions(fsOpts). SetCompactor(compactor). SetIndexOptions(idxOpts). SetAdminClient(newValidMockClient(t, ctrl)). @@ -623,7 +626,6 @@ func TestPeersSourceMarksUnfulfilledOnPersistenceErrors(t *testing.T) { PrepareData(prepareOpts). Return(persist.PreparedDataPersist{ Persist: func(metadata persist.Metadata, segment ts.Segment, checksum uint32) error { - panic("wat") assert.Fail(t, "not expecting to flush shard 0 at start + block size") return nil }, diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go index 420b32655b..c2c75167e2 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_index_test.go @@ -157,6 +157,9 @@ func TestBootstrapIndex(t *testing.T) { require.NoError(t, err) opts = opts.SetPersistManager(pm) + icm := fs.NewIndexClaimsManager(opts.FilesystemOptions()) + opts = opts.SetIndexClaimsManager(icm) + blockSize := 2 * time.Hour indexBlockSize := 2 * blockSize diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go index 1ea5793a63..cdf518f334 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/types.go @@ -104,6 +104,13 @@ type Options interface { // when performing a bootstrap with persistence. PersistManager() persist.Manager + // SetIndexClaimsManager sets the index claims manager. + SetIndexClaimsManager(value fs.IndexClaimsManager) Options + + // IndexClaimsManager returns the index claims manager. It's used to manage + // concurrent claims for volume indices per ns and block start. + IndexClaimsManager() fs.IndexClaimsManager + // SetCompactor sets the compactor used to compact segment builders into segments. SetCompactor(value *compaction.Compactor) Options diff --git a/src/dbnode/storage/bootstrap/bootstrapper/persist.go b/src/dbnode/storage/bootstrap/bootstrapper/persist.go index acb908c7f0..cb7219af19 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/persist.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/persist.go @@ -27,6 +27,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index/compaction" @@ -58,6 +59,7 @@ func PersistBootstrapIndexSegment( requestedRanges result.ShardTimeRanges, builder segment.DocumentsBuilder, persistManager *SharedPersistManager, + indexClaimsManager fs.IndexClaimsManager, resultOpts result.Options, fulfilled result.ShardTimeRanges, blockStart time.Time, @@ -111,6 +113,7 @@ func PersistBootstrapIndexSegment( shards, builder, persistManager, + indexClaimsManager, requestedRanges, expectedRanges, fulfilled, @@ -124,6 +127,7 @@ func persistBootstrapIndexSegment( shards map[uint32]struct{}, builder segment.DocumentsBuilder, persistManager *SharedPersistManager, + indexClaimsManager fs.IndexClaimsManager, requestedRanges result.ShardTimeRanges, expectedRanges result.ShardTimeRanges, fulfilled result.ShardTimeRanges, @@ -160,6 +164,14 @@ func persistBootstrapIndexSegment( } }() + volumeIndex, err := indexClaimsManager.ClaimNextIndexFileSetVolumeIndex( + ns, + blockStart, + ) + if err != nil { + return result.IndexBlock{}, fmt.Errorf("failed to claim next index volume index: %w", err) + } + preparedPersist, err := flush.PrepareIndex(persist.IndexPrepareOptions{ NamespaceMetadata: ns, BlockStart: blockStart, @@ -167,6 +179,7 @@ func persistBootstrapIndexSegment( Shards: shards, // NB(bodu): Assume default volume type when persisted bootstrapped index data. IndexVolumeType: idxpersist.DefaultIndexVolumeType, + VolumeIndex: volumeIndex, }) if err != nil { return result.IndexBlock{}, err diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 6a61795e8b..c811175b59 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1131,6 +1131,14 @@ func (i *nsIndex) flushBlock( allShards[shard.ID()] = struct{}{} } + volumeIndex, err := i.opts.IndexClaimsManager().ClaimNextIndexFileSetVolumeIndex( + i.nsMetadata, + indexBlock.StartTime(), + ) + if err != nil { + return nil, fmt.Errorf("failed to claim next index volume index: %w", err) + } + preparedPersist, err := flush.PrepareIndex(persist.IndexPrepareOptions{ NamespaceMetadata: i.nsMetadata, BlockStart: indexBlock.StartTime(), @@ -1138,6 +1146,7 @@ func (i *nsIndex) flushBlock( Shards: allShards, // NB(bodu): By default, we always write to the "default" index volume type. IndexVolumeType: idxpersist.DefaultIndexVolumeType, + VolumeIndex: volumeIndex, }) if err != nil { return nil, err diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 43dbca3de9..999de9a99e 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -98,6 +98,7 @@ var ( errRepairOptionsNotSet = errors.New("repair enabled but repair options are not set") errIndexOptionsNotSet = errors.New("index enabled but index options are not set") errPersistManagerNotSet = errors.New("persist manager is not set") + errIndexClaimsManagerNotSet = errors.New("index claims manager is not set") errBlockLeaserNotSet = errors.New("block leaser is not set") errOnColdFlushNotSet = errors.New("on cold flush is not set, requires at least a no-op implementation") ) @@ -142,6 +143,7 @@ type options struct { newDecoderFn encoding.NewDecoderFn bootstrapProcessProvider bootstrap.ProcessProvider persistManager persist.Manager + indexClaimsManager fs.IndexClaimsManager blockRetrieverManager block.DatabaseBlockRetrieverManager poolOpts pool.ObjectPoolOptions contextPool context.Pool @@ -293,6 +295,11 @@ func (o *options) Validate() error { return errPersistManagerNotSet } + // validate that index claims manager is present + if o.indexClaimsManager == nil { + return errIndexClaimsManagerNotSet + } + // validate series cache policy if err := series.ValidateCachePolicy(o.seriesCachePolicy); err != nil { return err @@ -550,6 +557,16 @@ func (o *options) PersistManager() persist.Manager { return o.persistManager } +func (o *options) SetIndexClaimsManager(value fs.IndexClaimsManager) Options { + opts := *o + opts.indexClaimsManager = value + return &opts +} + +func (o *options) IndexClaimsManager() fs.IndexClaimsManager { + return o.indexClaimsManager +} + func (o *options) SetDatabaseBlockRetrieverManager(value block.DatabaseBlockRetrieverManager) Options { opts := *o opts.blockRetrieverManager = value diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index d6a3987454..b11da30f9c 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -4124,6 +4124,34 @@ func (mr *MockOptionsMockRecorder) PersistManager() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PersistManager", reflect.TypeOf((*MockOptions)(nil).PersistManager)) } +// SetIndexClaimsManager mocks base method +func (m *MockOptions) SetIndexClaimsManager(value fs.IndexClaimsManager) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetIndexClaimsManager", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetIndexClaimsManager indicates an expected call of SetIndexClaimsManager +func (mr *MockOptionsMockRecorder) SetIndexClaimsManager(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIndexClaimsManager", reflect.TypeOf((*MockOptions)(nil).SetIndexClaimsManager), value) +} + +// IndexClaimsManager mocks base method +func (m *MockOptions) IndexClaimsManager() fs.IndexClaimsManager { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IndexClaimsManager") + ret0, _ := ret[0].(fs.IndexClaimsManager) + return ret0 +} + +// IndexClaimsManager indicates an expected call of IndexClaimsManager +func (mr *MockOptionsMockRecorder) IndexClaimsManager() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexClaimsManager", reflect.TypeOf((*MockOptions)(nil).IndexClaimsManager)) +} + // SetDatabaseBlockRetrieverManager mocks base method func (m *MockOptions) SetDatabaseBlockRetrieverManager(value block.DatabaseBlockRetrieverManager) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index eae5765473..185ac9897e 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -1125,6 +1125,12 @@ type Options interface { // PersistManager returns the persistence manager. PersistManager() persist.Manager + // SetIndexClaimsManager sets the index claims manager. + SetIndexClaimsManager(value fs.IndexClaimsManager) Options + + // IndexClaimsManager returns the index claims manager. + IndexClaimsManager() fs.IndexClaimsManager + // SetDatabaseBlockRetrieverManager sets the block retriever manager to // use when bootstrapping retrievable blocks instead of blocks // containing data. diff --git a/src/dbnode/storage/util.go b/src/dbnode/storage/util.go index 335e67d1c4..527e562ab8 100644 --- a/src/dbnode/storage/util.go +++ b/src/dbnode/storage/util.go @@ -80,7 +80,11 @@ func DefaultTestOptions() Options { SetBlockLeaseManager(blockLeaseManager) }) - return defaultTestOptions + // Needs a unique index claims manager each time as it tracks volume indices via in mem claims that + // should be different per test. + fsOpts := defaultTestOptions.CommitLogOptions().FilesystemOptions() + icm := fs.NewIndexClaimsManager(fsOpts) + return defaultTestOptions.SetIndexClaimsManager(icm) } // numIntervals returns the number of intervals between [start, end] for a given