From 8584d9a5827a298de0ad0f5c663153202875fd6c Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sat, 28 Mar 2020 16:51:12 -0400 Subject: [PATCH] [dbnode] Use invariant error for flush errors to fail CI with flush errors in integration tests (#2217) * [dbnode] Fail builds with flush errors in integration tests * [dbnode] DB Close Wait for Fs Processes (#2229) * [dbnode] Only read data from disk when flushing index segments. --- .../bootstrapping_crash_recovery.md | 2 +- docs/operational_guide/placement.md | 2 +- src/dbnode/storage/block/types.go | 3 +- src/dbnode/storage/cleanup.go | 56 ++++----- src/dbnode/storage/database.go | 6 + src/dbnode/storage/fs.go | 15 ++- src/dbnode/storage/index.go | 11 +- src/dbnode/storage/index_test.go | 6 +- src/dbnode/storage/mediator.go | 112 +++++++++++++----- src/dbnode/storage/shard.go | 7 +- src/dbnode/storage/storage_mock.go | 12 ++ src/dbnode/storage/types.go | 3 + 12 files changed, 158 insertions(+), 77 deletions(-) diff --git a/docs/operational_guide/bootstrapping_crash_recovery.md b/docs/operational_guide/bootstrapping_crash_recovery.md index 936c1aa3cb..b350622b64 100644 --- a/docs/operational_guide/bootstrapping_crash_recovery.md +++ b/docs/operational_guide/bootstrapping_crash_recovery.md @@ -71,7 +71,7 @@ In this case, the `peers` bootstrapper running on node A will not be able to ful └─────────────────────────┘ └───────────────────────┘ └──────────────────────┘ ``` -Note that a bootstrap consistency level of `majority` is the default value, but can be modified by changing the value of the key `m3db.client.bootstrap-consistency-level` in [etcd](https://coreos.com/etcd/) to one of: `none`, `one`, `unstrict_majority` (attempt to read from majority, but settle for less if any errors occur), `majority` (strict majority), and `all`. For example, if an entire cluster with a replication factor of 3 was restarted simultaneously, all the nodes would get stuck in an infinite loop trying to peer bootstrap from each other and not achieving majority until an operator modified this value. Note that this can happen even if all the shards were in the `Available` state because M3DB nodes will reject all read requests for a shard until they have bootstrapped that shard (which has to happen everytime the node is restarted). +Note that a bootstrap consistency level of `majority` is the default value, but can be modified by changing the value of the key `m3db.client.bootstrap-consistency-level` in [etcd](https://etcd.io/) to one of: `none`, `one`, `unstrict_majority` (attempt to read from majority, but settle for less if any errors occur), `majority` (strict majority), and `all`. For example, if an entire cluster with a replication factor of 3 was restarted simultaneously, all the nodes would get stuck in an infinite loop trying to peer bootstrap from each other and not achieving majority until an operator modified this value. Note that this can happen even if all the shards were in the `Available` state because M3DB nodes will reject all read requests for a shard until they have bootstrapped that shard (which has to happen everytime the node is restarted). **Note**: Any bootstrappers configuration that does not include the `peers` bootstrapper will be unable to handle dynamic placement changes of any kind. diff --git a/docs/operational_guide/placement.md b/docs/operational_guide/placement.md index 1c5f25ff37..ff36241ffe 100644 --- a/docs/operational_guide/placement.md +++ b/docs/operational_guide/placement.md @@ -6,7 +6,7 @@ A M3DB cluster has exactly one Placement. That placement maps the cluster's shard replicas to nodes. A cluster also has 0 or more namespaces (analogous to tables in other databases), and each node serves every namespace for the shards it owns. In other words, if the cluster topology states that node A owns shards 1, 2, and 3 then node A will own shards 1, 2, 3 for all configured namespaces in the cluster. -M3DB stores its placement (mapping of which NODES are responsible for which shards) in [etcd](https://coreos.com/etcd/). There are three possible states that each node/shard pair can be in: +M3DB stores its placement (mapping of which NODES are responsible for which shards) in [etcd](https://etcd.io/). There are three possible states that each node/shard pair can be in: 1. `Initializing` 2. `Available` diff --git a/src/dbnode/storage/block/types.go b/src/dbnode/storage/block/types.go index 132b1fb20c..f63c7b5cc0 100644 --- a/src/dbnode/storage/block/types.go +++ b/src/dbnode/storage/block/types.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -79,6 +79,7 @@ type FetchBlocksMetadataOptions struct { IncludeSizes bool IncludeChecksums bool IncludeLastRead bool + OnlyDisk bool } // FetchBlockMetadataResult captures the block start time, the block size, and any errors encountered diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 00cf1a418f..93298d427d 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -133,33 +133,38 @@ func (m *cleanupManager) Cleanup(t time.Time) error { m.Unlock() }() + namespaces, err := m.database.GetOwnedNamespaces() + if err != nil { + return err + } + multiErr := xerrors.NewMultiError() - if err := m.cleanupDataFiles(t); err != nil { + if err := m.cleanupDataFiles(t, namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when cleaning up data files for %v: %v", t, err)) } - if err := m.cleanupExpiredIndexFiles(t); err != nil { + if err := m.cleanupExpiredIndexFiles(t, namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when cleaning up index files for %v: %v", t, err)) } - if err := m.deleteInactiveDataFiles(); err != nil { + if err := m.deleteInactiveDataFiles(namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when deleting inactive data files for %v: %v", t, err)) } - if err := m.deleteInactiveDataSnapshotFiles(); err != nil { + if err := m.deleteInactiveDataSnapshotFiles(namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when deleting inactive snapshot files for %v: %v", t, err)) } - if err := m.deleteInactiveNamespaceFiles(); err != nil { + if err := m.deleteInactiveNamespaceFiles(namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when deleting inactive namespace files for %v: %v", t, err)) } - if err := m.cleanupSnapshotsAndCommitlogs(); err != nil { + if err := m.cleanupSnapshotsAndCommitlogs(namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when cleaning up snapshot and commitlog files: %v", err)) } @@ -179,14 +184,10 @@ func (m *cleanupManager) Report() { } } -func (m *cleanupManager) deleteInactiveNamespaceFiles() error { +func (m *cleanupManager) deleteInactiveNamespaceFiles(namespaces []databaseNamespace) error { var namespaceDirNames []string filePathPrefix := m.database.Options().CommitLogOptions().FilesystemOptions().FilePathPrefix() dataDirPath := fs.DataDirPath(filePathPrefix) - namespaces, err := m.database.GetOwnedNamespaces() - if err != nil { - return err - } for _, n := range namespaces { namespaceDirNames = append(namespaceDirNames, n.ID().String()) @@ -197,23 +198,19 @@ func (m *cleanupManager) deleteInactiveNamespaceFiles() error { // deleteInactiveDataFiles will delete data files for shards that the node no longer owns // which can occur in the case of topology changes -func (m *cleanupManager) deleteInactiveDataFiles() error { - return m.deleteInactiveDataFileSetFiles(fs.NamespaceDataDirPath) +func (m *cleanupManager) deleteInactiveDataFiles(namespaces []databaseNamespace) error { + return m.deleteInactiveDataFileSetFiles(fs.NamespaceDataDirPath, namespaces) } // deleteInactiveDataSnapshotFiles will delete snapshot files for shards that the node no longer owns // which can occur in the case of topology changes -func (m *cleanupManager) deleteInactiveDataSnapshotFiles() error { - return m.deleteInactiveDataFileSetFiles(fs.NamespaceSnapshotsDirPath) +func (m *cleanupManager) deleteInactiveDataSnapshotFiles(namespaces []databaseNamespace) error { + return m.deleteInactiveDataFileSetFiles(fs.NamespaceSnapshotsDirPath, namespaces) } -func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn func(string, ident.ID) string) error { +func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn func(string, ident.ID) string, namespaces []databaseNamespace) error { multiErr := xerrors.NewMultiError() filePathPrefix := m.database.Options().CommitLogOptions().FilesystemOptions().FilePathPrefix() - namespaces, err := m.database.GetOwnedNamespaces() - if err != nil { - return err - } for _, n := range namespaces { var activeShards []string namespaceDirPath := filesetFilesDirPathFn(filePathPrefix, n.ID()) @@ -227,12 +224,8 @@ func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn fu return multiErr.FinalError() } -func (m *cleanupManager) cleanupDataFiles(t time.Time) error { +func (m *cleanupManager) cleanupDataFiles(t time.Time, namespaces []databaseNamespace) error { multiErr := xerrors.NewMultiError() - namespaces, err := m.database.GetOwnedNamespaces() - if err != nil { - return err - } for _, n := range namespaces { if !n.Options().CleanupEnabled() { continue @@ -245,11 +238,7 @@ func (m *cleanupManager) cleanupDataFiles(t time.Time) error { return multiErr.FinalError() } -func (m *cleanupManager) cleanupExpiredIndexFiles(t time.Time) error { - namespaces, err := m.database.GetOwnedNamespaces() - if err != nil { - return err - } +func (m *cleanupManager) cleanupExpiredIndexFiles(t time.Time, namespaces []databaseNamespace) error { multiErr := xerrors.NewMultiError() for _, n := range namespaces { if !n.Options().CleanupEnabled() || !n.Options().IndexOptions().Enabled() { @@ -317,17 +306,12 @@ func (m *cleanupManager) cleanupCompactedNamespaceDataFiles(shards []databaseSha // 9. Delete all corrupt commitlog files (ignoring any commitlog files being actively written to.) // // This process is also modeled formally in TLA+ in the file `SnapshotsSpec.tla`. -func (m *cleanupManager) cleanupSnapshotsAndCommitlogs() (finalErr error) { +func (m *cleanupManager) cleanupSnapshotsAndCommitlogs(namespaces []databaseNamespace) (finalErr error) { logger := m.opts.InstrumentOptions().Logger().With( zap.String("comment", "partial/corrupt files are expected as result of a restart (this is ok)"), ) - namespaces, err := m.database.GetOwnedNamespaces() - if err != nil { - return err - } - fsOpts := m.opts.CommitLogOptions().FilesystemOptions() snapshotMetadatas, snapshotMetadataErrorsWithPaths, err := m.snapshotMetadataFilesFn(fsOpts) if err != nil { diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index c0c132ff5a..185c2d9ed7 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -537,6 +537,9 @@ func (d *db) terminateWithLock() error { } func (d *db) Terminate() error { + // NB(bodu): Wait for fs processes to finish. + d.mediator.WaitForFileSystemProcesses() + d.Lock() defer d.Unlock() @@ -544,6 +547,9 @@ func (d *db) Terminate() error { } func (d *db) Close() error { + // NB(bodu): Wait for fs processes to finish. + d.mediator.WaitForFileSystemProcesses() + d.Lock() defer d.Unlock() diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index a4d85683dc..63cce44717 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -25,6 +25,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" + "github.com/m3db/m3/src/x/instrument" "go.uber.org/zap" ) @@ -153,11 +154,21 @@ func (m *fileSystemManager) Run( // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. flushFn := func() { + // NB(r): Use invariant here since flush errors were introduced + // and not caught in CI or integration tests. + // When an invariant occurs in CI tests it panics so as to fail + // the build. if err := m.Cleanup(t); err != nil { - m.log.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) + instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) + }) } if err := m.Flush(t); err != nil { - m.log.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) + instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) + }) } m.Lock() m.status = fileOpNotStarted diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index 626eadcca1..ee7d7f4509 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -903,7 +903,14 @@ func (i *nsIndex) flushBlockSegment( first = false var ( - opts = block.FetchBlocksMetadataOptions{} + opts = block.FetchBlocksMetadataOptions{ + // NB(bodu): There is a lag between when data gets flushed + // to disk and when it gets removed from memory during the next + // Tick. In this case, the same series can exist both on disk + // and in memory at the same time resulting in dupe series IDs. + // Only read data from disk when flushing index segments. + OnlyDisk: true, + } limit = defaultFlushReadDataBlocksBatchSize results block.FetchBlocksMetadataResults err error diff --git a/src/dbnode/storage/index_test.go b/src/dbnode/storage/index_test.go index 86f02fe014..cdd81e936f 100644 --- a/src/dbnode/storage/index_test.go +++ b/src/dbnode/storage/index_test.go @@ -151,7 +151,7 @@ func TestNamespaceIndexFlushSuccess(t *testing.T) { results.EXPECT().Results().Return(nil) results.EXPECT().Close() mockShard.EXPECT().FetchBlocksMetadataV2(gomock.Any(), blockTime, blockTime.Add(test.indexBlockSize), - gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{}).Return(results, nil, nil) + gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{OnlyDisk: true}).Return(results, nil, nil) mockBlock.EXPECT().AddResults(gomock.Any()).Return(nil) mockBlock.EXPECT().EvictMutableSegments().Return(nil) @@ -260,13 +260,13 @@ func TestNamespaceIndexFlushSuccessMultipleShards(t *testing.T) { results1.EXPECT().Results().Return(nil) results1.EXPECT().Close() mockShard1.EXPECT().FetchBlocksMetadataV2(gomock.Any(), blockTime, blockTime.Add(test.indexBlockSize), - gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{}).Return(results1, nil, nil) + gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{OnlyDisk: true}).Return(results1, nil, nil) results2 := block.NewMockFetchBlocksMetadataResults(ctrl) results2.EXPECT().Results().Return(nil) results2.EXPECT().Close() mockShard2.EXPECT().FetchBlocksMetadataV2(gomock.Any(), blockTime, blockTime.Add(test.indexBlockSize), - gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{}).Return(results2, nil, nil) + gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{OnlyDisk: true}).Return(results2, nil, nil) mockBlock.EXPECT().AddResults(gomock.Any()).Return(nil) mockBlock.EXPECT().EvictMutableSegments().Return(nil) diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index 71f343e871..26b27f151c 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -33,15 +33,22 @@ import ( "go.uber.org/zap" ) -type mediatorState int +type ( + mediatorState int + fileSystemProcessesState int +) const ( - fileOpCheckInterval = time.Second - tickCheckInterval = 5 * time.Second + fileOpCheckInterval = time.Second + tickCheckInterval = 5 * time.Second + fileSystemProcessesCheckInterval = 100 * time.Millisecond mediatorNotOpen mediatorState = iota mediatorOpen mediatorClosed + + fileSystemProcessesIdle fileSystemProcessesState = iota + fileSystemProcessesBusy ) var ( @@ -76,13 +83,14 @@ type mediator struct { databaseTickManager databaseRepairer - opts Options - nowFn clock.NowFn - sleepFn sleepFn - metrics mediatorMetrics - state mediatorState - mediatorTimeBarrier mediatorTimeBarrier - closedCh chan struct{} + opts Options + nowFn clock.NowFn + sleepFn sleepFn + metrics mediatorMetrics + state mediatorState + fileSystemProcessesState fileSystemProcessesState + mediatorTimeBarrier mediatorTimeBarrier + closedCh chan struct{} } // TODO(r): Consider renaming "databaseMediator" to "databaseCoordinator" @@ -94,14 +102,15 @@ func newMediator(database database, commitlog commitlog.CommitLog, opts Options) nowFn = opts.ClockOptions().NowFn() ) d := &mediator{ - database: database, - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - metrics: newMediatorMetrics(scope), - state: mediatorNotOpen, - mediatorTimeBarrier: newMediatorTimeBarrier(nowFn, iOpts), - closedCh: make(chan struct{}), + database: database, + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + sleepFn: time.Sleep, + metrics: newMediatorMetrics(scope), + state: mediatorNotOpen, + fileSystemProcessesState: fileSystemProcessesIdle, + mediatorTimeBarrier: newMediatorTimeBarrier(nowFn, iOpts), + closedCh: make(chan struct{}), } fsm := newFileSystemManager(database, commitlog, opts) @@ -129,7 +138,7 @@ func (m *mediator) Open() error { } m.state = mediatorOpen go m.reportLoop() - go m.ongoingFilesystemProcesses() + go m.ongoingFileSystemProcesses() go m.ongoingTick() m.databaseRepairer.Start() return nil @@ -153,6 +162,18 @@ func (m *mediator) Report() { m.databaseFileSystemManager.Report() } +func (m *mediator) WaitForFileSystemProcesses() { + m.RLock() + fileSystemProcessesState := m.fileSystemProcessesState + m.RUnlock() + for fileSystemProcessesState == fileSystemProcessesBusy { + m.sleepFn(fileSystemProcessesCheckInterval) + m.RLock() + fileSystemProcessesState = m.fileSystemProcessesState + m.RUnlock() + } +} + func (m *mediator) Close() error { m.Lock() defer m.Unlock() @@ -182,22 +203,20 @@ func (m *mediator) Close() error { // is potentially still on disk (if it hasn't been cleaned up yet). // // See comment over mediatorTimeBarrier for more details on how this is implemented. -func (m *mediator) ongoingFilesystemProcesses() { - log := m.opts.InstrumentOptions().Logger() +func (m *mediator) ongoingFileSystemProcesses() { for { select { case <-m.closedCh: return default: m.sleepFn(tickCheckInterval) - // See comment over mediatorTimeBarrier for an explanation of this logic. - mediatorTime, err := m.mediatorTimeBarrier.fsProcessesWait() - if err != nil { - log.Error("error within ongoingFilesystemProcesses waiting for next mediatorTime", zap.Error(err)) - continue + + // Check if the mediator is already closed. + if !m.isOpen() { + return } - m.databaseFileSystemManager.Run(mediatorTime, syncRun, noForce) + m.runFileSystemProcesses() } } } @@ -214,22 +233,49 @@ func (m *mediator) ongoingTick() { default: m.sleepFn(tickCheckInterval) + // Check if the mediator is already closed. + if !m.isOpen() { + return + } + // See comment over mediatorTimeBarrier for an explanation of this logic. newMediatorTime, err := m.mediatorTimeBarrier.maybeRelease() if err != nil { - log.Error( - "ongoing tick was unable to release time barrier", zap.Error(err)) + log.Error("ongoing tick was unable to release time barrier", zap.Error(err)) continue } mediatorTime = newMediatorTime - if err := m.Tick(force, mediatorTime); err != nil { + // NB(bodu): We may still hit a db closed error here since the db does not wait upon + // completion of ticks. + if err := m.Tick(force, mediatorTime); err != nil && err != errDatabaseIsClosed { log.Error("error within tick", zap.Error(err)) } } } } +func (m *mediator) runFileSystemProcesses() { + m.Lock() + m.fileSystemProcessesState = fileSystemProcessesBusy + m.Unlock() + defer func() { + m.Lock() + m.fileSystemProcessesState = fileSystemProcessesIdle + m.Unlock() + }() + + // See comment over mediatorTimeBarrier for an explanation of this logic. + log := m.opts.InstrumentOptions().Logger() + mediatorTime, err := m.mediatorTimeBarrier.fsProcessesWait() + if err != nil { + log.Error("error within ongoingFileSystemProcesses waiting for next mediatorTime", zap.Error(err)) + return + } + + m.databaseFileSystemManager.Run(mediatorTime, syncRun, noForce) +} + func (m *mediator) reportLoop() { interval := m.opts.InstrumentOptions().ReportInterval() t := time.NewTicker(interval) @@ -245,6 +291,12 @@ func (m *mediator) reportLoop() { } } +func (m *mediator) isOpen() bool { + m.RLock() + defer m.RUnlock() + return m.state == mediatorOpen +} + // mediatorTimeBarrier is used to prevent the tick process and the filesystem processes from ever running // concurrently with an inconsistent view of time. Each time the filesystem processes want to run they first // register for the next barrier by calling fsProcessesWait(). Once a tick completes it will call maybeRelease() diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 58f2174bf0..37fd0c30ee 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -1649,6 +1649,11 @@ func (s *dbShard) FetchBlocksMetadataV2( if err := proto.Unmarshal(encodedPageToken, token); err != nil { return nil, nil, xerrors.NewInvalidParamsError(errShardInvalidPageToken) } + } else { + // NB(bodu): Allow callers to specify that they only want results from disk. + if opts.OnlyDisk { + token.FlushedSeriesPhase = &pagetoken.PageToken_FlushedSeriesPhase{} + } } // NB(r): If returning mixed in memory and disk results, then we return anything diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index ac700625b3..51218219cf 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2804,6 +2804,18 @@ func (mr *MockdatabaseMediatorMockRecorder) Tick(forceType, startTime interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockdatabaseMediator)(nil).Tick), forceType, startTime) } +// WaitForFileSystemProcesses mocks base method +func (m *MockdatabaseMediator) WaitForFileSystemProcesses() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "WaitForFileSystemProcesses") +} + +// WaitForFileSystemProcesses indicates an expected call of WaitForFileSystemProcesses +func (mr *MockdatabaseMediatorMockRecorder) WaitForFileSystemProcesses() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForFileSystemProcesses", reflect.TypeOf((*MockdatabaseMediator)(nil).WaitForFileSystemProcesses)) +} + // Repair mocks base method func (m *MockdatabaseMediator) Repair() error { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 4f0c14cf99..7d8c9f572c 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -797,6 +797,9 @@ type databaseMediator interface { // Tick performs a tick. Tick(forceType forceType, startTime time.Time) error + // WaitForFileSystemProcesses waits for any ongoing fs processes to finish. + WaitForFileSystemProcesses() + // Repair repairs the database. Repair() error