From a4cee97b6cf94d5558eb51169d6ad811005d61d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=CE=BBinas?= Date: Thu, 22 Apr 2021 16:59:41 +0300 Subject: [PATCH] [dbnode] Shards assignment improvements during cluster topology changes (#3425) * use `IsBootstrappedAndDurable()` instead of `IsBootstrapped()` otherwise warm and cold flushes might fail because some shards might still be not bootstrapped. * do not run file ops (cold and warm flush) when new shards are being assigned. check for bootstrapped shards when doing cold flush cleanups. * update unit test to validate handling of not bootstrapped shards. * removed `IsBootstrapped` method arg (boolean args are a code smell), it is better to make a check before calling cleanup. * reduce locking on a db level when new shards are assigned. * can use read lock for `d.hasReceivedNewShardsWithLock()` * Enqueue assignShardSet fn when received update from topology so that shards get assigned when file ops are not running. * need to set `lastReceivedNewShards` when received new shards immediately so that `IsBootstrappedAndDurable()` won't return true when db was previously bootstrapped and new bootstrap is enqueued. * cleaned up some code. * ensure that bootstrap is started when new shards are assigned. * added BootstrapEnqueue(). * updated logging levels. * more test coverage. * removed invariant violation * linter fix * set bootstrap result value directly. * changes after review * fixed failing tests. --- src/dbnode/storage/bootstrap.go | 29 +++- .../storage/bootstrap_instrumentation.go | 6 + src/dbnode/storage/bootstrap_test.go | 19 ++- src/dbnode/storage/cleanup.go | 26 ++-- src/dbnode/storage/cleanup_test.go | 54 ++------ src/dbnode/storage/cluster/database.go | 17 +-- src/dbnode/storage/coldflush.go | 14 +- src/dbnode/storage/coldflush_test.go | 53 +++++++ src/dbnode/storage/database.go | 44 ++++-- src/dbnode/storage/database_test.go | 44 +++++- src/dbnode/storage/fs.go | 56 +++----- src/dbnode/storage/fs_test.go | 32 ++++- src/dbnode/storage/mediator.go | 89 ++++++++---- src/dbnode/storage/mediator_test.go | 50 +++++++ src/dbnode/storage/namespace.go | 19 ++- src/dbnode/storage/storage_mock.go | 131 +++++++++++++++--- src/dbnode/storage/types.go | 69 ++++++++- 17 files changed, 573 insertions(+), 179 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index c616e8cc01..9cf5cb3e79 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -116,7 +116,22 @@ func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) return bsTime, bsTime > 0 } +func (m *bootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult { + bootstrapAsyncResult := newBootstrapAsyncResult() + go func(r *BootstrapAsyncResult) { + if result, err := m.startBootstrap(r); err != nil && !result.AlreadyBootstrapping { + m.instrumentation.emitAndLogInvariantViolation(err, "error bootstrapping") + } + }(bootstrapAsyncResult) + return bootstrapAsyncResult +} + func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { + bootstrapAsyncResult := newBootstrapAsyncResult() + return m.startBootstrap(bootstrapAsyncResult) +} + +func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (BootstrapResult, error) { m.Lock() switch m.state { case Bootstrapping: @@ -128,7 +143,11 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // reshard occurs and we need to bootstrap more shards. m.hasPending = true m.Unlock() - return BootstrapResult{AlreadyBootstrapping: true}, errBootstrapEnqueued + result := BootstrapResult{AlreadyBootstrapping: true} + asyncResult.bootstrapResult = result + asyncResult.bootstrapStarted.Done() + asyncResult.bootstrapCompleted.Done() + return result, errBootstrapEnqueued default: m.state = Bootstrapping } @@ -138,8 +157,14 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { m.mediator.DisableFileOpsAndWait() defer m.mediator.EnableFileOps() - // Keep performing bootstraps until none pending and no error returned. var result BootstrapResult + asyncResult.bootstrapStarted.Done() + defer func() { + asyncResult.bootstrapResult = result + asyncResult.bootstrapCompleted.Done() + }() + + // Keep performing bootstraps until none pending and no error returned. for i := 0; true; i++ { // NB(r): Decouple implementation of bootstrap so can override in tests. bootstrapErr := m.bootstrapFn() diff --git a/src/dbnode/storage/bootstrap_instrumentation.go b/src/dbnode/storage/bootstrap_instrumentation.go index 4a5f2421a7..0fbd90a60b 100644 --- a/src/dbnode/storage/bootstrap_instrumentation.go +++ b/src/dbnode/storage/bootstrap_instrumentation.go @@ -191,3 +191,9 @@ func (i *bootstrapInstrumentation) setIsBootstrappedAndDurable(isBootstrappedAnd } i.durableStatus.Update(status) } + +func (i *bootstrapInstrumentation) emitAndLogInvariantViolation(err error, msg string) { + instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), func(l *zap.Logger) { + l.Error(msg, zap.Error(err)) + }) +} diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 154ed70520..526961f92d 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -39,6 +39,14 @@ import ( ) func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { + testDatabaseBootstrapWithBootstrapError(t, false) +} + +func TestDatabaseBootstrapEnqueueWithBootstrapError(t *testing.T) { + testDatabaseBootstrapWithBootstrapError(t, true) +} + +func testDatabaseBootstrapWithBootstrapError(t *testing.T, async bool) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() @@ -88,9 +96,16 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { require.Equal(t, BootstrapNotStarted, bsm.state) - result, err := bsm.Bootstrap() + var result BootstrapResult + if async { + asyncResult := bsm.BootstrapEnqueue() + asyncResult.WaitForStart() + result = asyncResult.Result() + } else { + result, err = bsm.Bootstrap() + require.NoError(t, err) + } - require.NoError(t, err) require.Equal(t, Bootstrapped, bsm.state) require.Equal(t, 1, len(result.ErrorsBootstrap)) require.Equal(t, "an error", result.ErrorsBootstrap[0].Error()) diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 96dde48304..f51897bcef 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -127,13 +127,7 @@ func newCleanupManager( } } -func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { - // Don't perform any cleanup if we are not boostrapped yet. - if !isBootstrapped { - m.logger.Debug("database is still bootstrapping, terminating cleanup") - return nil - } - +func (m *cleanupManager) WarmFlushCleanup(t time.Time) error { m.Lock() m.warmFlushCleanupInProgress = true m.Unlock() @@ -178,13 +172,7 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro return multiErr.FinalError() } -func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { - // Don't perform any cleanup if we are not boostrapped yet. - if !isBootstrapped { - m.logger.Debug("database is still bootstrapping, terminating cleanup") - return nil - } - +func (m *cleanupManager) ColdFlushCleanup(t time.Time) error { m.Lock() m.coldFlushCleanupInProgress = true m.Unlock() @@ -262,6 +250,7 @@ func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn fu for _, n := range namespaces { var activeShards []string namespaceDirPath := filesetFilesDirPathFn(filePathPrefix, n.ID()) + // NB(linasn) This should list ALL shards because it will delete dirs for the shards NOT LISTED below. for _, s := range n.OwnedShards() { shard := fmt.Sprintf("%d", s.ID()) activeShards = append(activeShards, shard) @@ -321,6 +310,9 @@ func (m *cleanupManager) cleanupDuplicateIndexFiles(namespaces []databaseNamespa func (m *cleanupManager) cleanupExpiredNamespaceDataFiles(earliestToRetain time.Time, shards []databaseShard) error { multiErr := xerrors.NewMultiError() for _, shard := range shards { + if !shard.IsBootstrapped() { + continue + } if err := shard.CleanupExpiredFileSets(earliestToRetain); err != nil { multiErr = multiErr.Add(err) } @@ -332,6 +324,9 @@ func (m *cleanupManager) cleanupExpiredNamespaceDataFiles(earliestToRetain time. func (m *cleanupManager) cleanupCompactedNamespaceDataFiles(shards []databaseShard) error { multiErr := xerrors.NewMultiError() for _, shard := range shards { + if !shard.IsBootstrapped() { + continue + } if err := shard.CleanupCompactedFileSets(); err != nil { multiErr = multiErr.Add(err) } @@ -425,6 +420,9 @@ func (m *cleanupManager) cleanupSnapshotsAndCommitlogs(namespaces []databaseName for _, ns := range namespaces { for _, s := range ns.OwnedShards() { + if !s.IsBootstrapped() { + continue + } shardSnapshots, err := m.snapshotFilesFn(fsOpts.FilePathPrefix(), ns.ID(), s.ID()) if err != nil { multiErr = multiErr.Add(fmt.Errorf("err reading snapshot files for ns: %s and shard: %d, err: %v", ns.ID(), s.ID(), err)) diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index acc15dc251..470e52ea82 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -286,6 +286,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) { for i := 0; i < 3; i++ { shard := NewMockdatabaseShard(ctrl) shard.EXPECT().ID().Return(uint32(i)).AnyTimes() + shard.EXPECT().IsBootstrapped().Return(true).AnyTimes() shard.EXPECT().CleanupExpiredFileSets(gomock.Any()).Return(nil).AnyTimes() shard.EXPECT().CleanupCompactedFileSets().Return(nil).AnyTimes() @@ -318,7 +319,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) { return nil } - err := cleanup(mgr, ts, true) + err := cleanup(mgr, ts) if tc.expectErr { require.Error(t, err) } else { @@ -361,36 +362,7 @@ func TestCleanupManagerNamespaceCleanupBootstrapped(t *testing.T) { mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) idx.EXPECT().CleanupExpiredFileSets(ts).Return(nil) idx.EXPECT().CleanupDuplicateFileSets().Return(nil) - require.NoError(t, cleanup(mgr, ts, true)) -} - -func TestCleanupManagerNamespaceCleanupNotBootstrapped(t *testing.T) { - ctrl := gomock.NewController(xtest.Reporter{T: t}) - defer ctrl.Finish() - - ts := timeFor(36000) - rOpts := retentionOptions. - SetRetentionPeriod(21600 * time.Second). - SetBlockSize(3600 * time.Second) - nsOpts := namespaceOptions. - SetRetentionOptions(rOpts). - SetCleanupEnabled(true). - SetIndexOptions(namespace.NewIndexOptions(). - SetEnabled(true). - SetBlockSize(7200 * time.Second)) - - ns := NewMockdatabaseNamespace(ctrl) - ns.EXPECT().ID().Return(ident.StringID("ns")).AnyTimes() - ns.EXPECT().Options().Return(nsOpts).AnyTimes() - ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() - ns.EXPECT().OwnedShards().Return(nil).AnyTimes() - - nses := []databaseNamespace{ns} - db := newMockdatabase(ctrl, ns) - db.EXPECT().OwnedNamespaces().Return(nses, nil).AnyTimes() - - mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) - require.NoError(t, cleanup(mgr, ts, false)) + require.NoError(t, cleanup(mgr, ts)) } // Test NS doesn't cleanup when flag is present @@ -423,7 +395,7 @@ func TestCleanupManagerDoesntNeedCleanup(t *testing.T) { return nil } - require.NoError(t, cleanup(mgr, ts, true)) + require.NoError(t, cleanup(mgr, ts)) } func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { @@ -436,11 +408,15 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { ns.EXPECT().Options().Return(nsOpts).AnyTimes() shard := NewMockdatabaseShard(ctrl) + shardNotBootstrapped := NewMockdatabaseShard(ctrl) + shardNotBootstrapped.EXPECT().IsBootstrapped().Return(false).AnyTimes() + shardNotBootstrapped.EXPECT().ID().Return(uint32(1)).AnyTimes() expectedEarliestToRetain := retention.FlushTimeStart(ns.Options().RetentionOptions(), ts) + shard.EXPECT().IsBootstrapped().Return(true).AnyTimes() shard.EXPECT().CleanupExpiredFileSets(expectedEarliestToRetain).Return(nil) shard.EXPECT().CleanupCompactedFileSets().Return(nil) shard.EXPECT().ID().Return(uint32(0)).AnyTimes() - ns.EXPECT().OwnedShards().Return([]databaseShard{shard}).AnyTimes() + ns.EXPECT().OwnedShards().Return([]databaseShard{shard, shardNotBootstrapped}).AnyTimes() ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() namespaces := []databaseNamespace{ns} @@ -449,7 +425,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(namespaces, nil).AnyTimes() mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) - require.NoError(t, cleanup(mgr, ts, true)) + require.NoError(t, cleanup(mgr, ts)) } type deleteInactiveDirectoriesCall struct { @@ -469,6 +445,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) { shard := NewMockdatabaseShard(ctrl) shard.EXPECT().ID().Return(uint32(0)).AnyTimes() + shard.EXPECT().IsBootstrapped().Return(true).AnyTimes() ns.EXPECT().OwnedShards().Return([]databaseShard{shard}).AnyTimes() ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() @@ -488,7 +465,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) { } mgr.deleteInactiveDirectoriesFn = deleteInactiveDirectoriesFn - require.NoError(t, cleanup(mgr, ts, true)) + require.NoError(t, cleanup(mgr, ts)) expectedCalls := []deleteInactiveDirectoriesCall{ deleteInactiveDirectoriesCall{ @@ -533,7 +510,7 @@ func TestCleanupManagerPropagatesOwnedNamespacesError(t *testing.T) { require.NoError(t, db.Open()) require.NoError(t, db.Terminate()) - require.Error(t, cleanup(mgr, ts, true)) + require.Error(t, cleanup(mgr, ts)) } func timeFor(s int64) time.Time { @@ -561,10 +538,9 @@ func newFakeActiveLogs(activeLogs persist.CommitLogFiles) fakeActiveLogs { func cleanup( mgr databaseCleanupManager, t time.Time, - isBootstrapped bool, ) error { multiErr := xerrors.NewMultiError() - multiErr = multiErr.Add(mgr.WarmFlushCleanup(t, isBootstrapped)) - multiErr = multiErr.Add(mgr.ColdFlushCleanup(t, isBootstrapped)) + multiErr = multiErr.Add(mgr.WarmFlushCleanup(t)) + multiErr = multiErr.Add(mgr.ColdFlushCleanup(t)) return multiErr.FinalError() } diff --git a/src/dbnode/storage/cluster/database.go b/src/dbnode/storage/cluster/database.go index 280d0e498c..0e082be647 100644 --- a/src/dbnode/storage/cluster/database.go +++ b/src/dbnode/storage/cluster/database.go @@ -30,8 +30,6 @@ import ( "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage" "github.com/m3db/m3/src/dbnode/topology" - "github.com/m3db/m3/src/x/instrument" - "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -408,14 +406,13 @@ func (d *clusterDB) analyzeAndReportShardStates() { for id := range d.initializing { count := d.bootstrapCount[id] if count != len(namespaces) { - // Should never happen if bootstrapped and durable. - instrument.EmitAndLogInvariantViolation(d.opts.InstrumentOptions(), func(l *zap.Logger) { - l.With( - zap.Uint32("shard", id), - zap.Int("count", count), - zap.Int("numNamespaces", len(namespaces)), - ).Error("database indicated that it was bootstrapped and durable, but number of bootstrapped shards did not match number of namespaces") - }) + // This could temporarily occur due to the race condition, e.g. database was bootstrapped and durable + // at the time we checked but then new shards were assigned which are still not bootstrapped. + d.log.Debug("database indicated that it was bootstrapped and durable, "+ + "but number of bootstrapped shards did not match number of namespaces", + zap.Uint32("shard", id), + zap.Int("count", count), + zap.Int("numNamespaces", len(namespaces))) continue } diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index 9229b5de8b..a505c28748 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -101,12 +101,20 @@ func (m *coldFlushManager) Run(t time.Time) bool { m.status = fileOpInProgress m.Unlock() + defer func() { + m.Lock() + m.status = fileOpNotStarted + m.Unlock() + }() + + m.log.Debug("starting cold flush", zap.Time("time", t)) + // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. // NB(r): Use invariant here since flush errors were introduced // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil { + if err := m.ColdFlushCleanup(t); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err)) @@ -118,9 +126,7 @@ func (m *coldFlushManager) Run(t time.Time) bool { l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err)) }) } - m.Lock() - m.status = fileOpNotStarted - m.Unlock() + m.log.Debug("completed cold flush", zap.Time("time", t)) return true } diff --git a/src/dbnode/storage/coldflush_test.go b/src/dbnode/storage/coldflush_test.go index 55eb3d355c..a22bd05c34 100644 --- a/src/dbnode/storage/coldflush_test.go +++ b/src/dbnode/storage/coldflush_test.go @@ -28,6 +28,7 @@ import ( "github.com/golang/mock/gomock" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/x/instrument" "github.com/stretchr/testify/require" ) @@ -116,3 +117,55 @@ func TestColdFlushManagerFlushDoneFlushError(t *testing.T) { require.EqualError(t, fakeErr, cfm.coldFlush().Error()) } + +func TestColdFlushManagerSkipRun(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + database := newMockdatabase(ctrl) + cfm := newColdFlushManager(database, nil, DefaultTestOptions()) + + database.EXPECT().IsBootstrapped().Return(false) + require.False(t, cfm.Run(time.Now())) +} + +func TestColdFlushManagerRunCleanupPanic(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + database := newMockdatabase(ctrl) + database.EXPECT().IsBootstrapped().Return(true) + + cm := NewMockdatabaseCleanupManager(ctrl) + cfm := newColdFlushManager(database, nil, DefaultTestOptions()) + mgr := cfm.(*coldFlushManager) + mgr.databaseCleanupManager = cm + + ts := time.Now() + gomock.InOrder( + cm.EXPECT().ColdFlushCleanup(ts).Return(errors.New("cleanup error")), + ) + + defer instrument.SetShouldPanicEnvironmentVariable(true)() + require.Panics(t, func() { mgr.Run(ts) }) +} + +func TestColdFlushManagerRunFlushPanic(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + database := newMockdatabase(ctrl) + database.EXPECT().IsBootstrapped().Return(true) + database.EXPECT().OwnedNamespaces().Return(nil, errors.New("flush error")) + + cm := NewMockdatabaseCleanupManager(ctrl) + cfm := newColdFlushManager(database, nil, DefaultTestOptions()) + mgr := cfm.(*coldFlushManager) + mgr.databaseCleanupManager = cm + + ts := time.Now() + gomock.InOrder( + cm.EXPECT().ColdFlushCleanup(ts).Return(nil), + ) + + defer instrument.SetShouldPanicEnvironmentVariable(true)() + require.Panics(t, func() { mgr.Run(ts) }) +} diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 00e353d64e..96070adf8a 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -453,14 +453,38 @@ func (d *db) Options() Options { func (d *db) AssignShardSet(shardSet sharding.ShardSet) { d.Lock() - defer d.Unlock() - receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) - - d.shardSet = shardSet if receivedNewShards { d.lastReceivedNewShards = d.nowFn() } + d.Unlock() + + if !d.mediator.IsOpen() { + d.assignShardSet(shardSet) + return + } + + if err := d.mediator.EnqueueMutuallyExclusiveFn(func() { + d.assignShardSet(shardSet) + }); err != nil { + // should not happen. + instrument.EmitAndLogInvariantViolation(d.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("failed to enqueue assignShardSet fn", + zap.Error(err), + zap.Uint32s("shards", shardSet.AllIDs())) + }) + } +} + +func (d *db) assignShardSet(shardSet sharding.ShardSet) { + d.Lock() + defer d.Unlock() + + d.log.Info("assigning shards", zap.Uint32s("shards", shardSet.AllIDs())) + + receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) + d.shardSet = shardSet for _, elem := range d.namespaces.Iter() { ns := elem.Value() @@ -516,14 +540,10 @@ func (d *db) queueBootstrapWithLock() { // the non-clustered database bootstrapped by assigning it shardsets which will trigger new // bootstraps since d.bootstraps > 0 will be true. if d.bootstraps > 0 { - // NB(r): Trigger another bootstrap, if already bootstrapping this will - // enqueue a new bootstrap to execute before the current bootstrap - // completes. - go func() { - if result, err := d.mediator.Bootstrap(); err != nil && !result.AlreadyBootstrapping { - d.log.Error("error bootstrapping", zap.Error(err)) - } - }() + bootstrapAsyncResult := d.mediator.BootstrapEnqueue() + // NB(linasn): We need to wait for the bootstrap to start and set it's state to Bootstrapping in order + // to safely run fileOps in mediator later. + bootstrapAsyncResult.WaitForStart() } } diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 418de14121..efe5f17b13 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -436,6 +436,11 @@ func TestDatabaseAssignShardSetBehaviorNoNewShards(t *testing.T) { // Set a mock mediator to be certain that bootstrap is not called when // no new shards are assigned. mediator := NewMockdatabaseMediator(ctrl) + mediator.EXPECT().IsOpen().Return(true) + mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).DoAndReturn(func(fn func()) error { + fn() + return nil + }) d.mediator = mediator var ns []*MockdatabaseNamespace @@ -470,7 +475,14 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { ns := dbAddNewMockNamespace(ctrl, d, "testns") mediator := NewMockdatabaseMediator(ctrl) - mediator.EXPECT().Bootstrap().Return(BootstrapResult{}, nil) + mediator.EXPECT().IsOpen().Return(true) + mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).DoAndReturn(func(fn func()) error { + fn() + return nil + }) + mediator.EXPECT().Bootstrap().DoAndReturn(func() (BootstrapResult, error) { + return BootstrapResult{}, nil + }) d.mediator = mediator assert.NoError(t, d.Bootstrap()) @@ -484,8 +496,11 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - mediator.EXPECT().Bootstrap().Return(BootstrapResult{}, nil).Do(func() { + mediator.EXPECT().BootstrapEnqueue().DoAndReturn(func() *BootstrapAsyncResult { + asyncResult := newBootstrapAsyncResult() + asyncResult.bootstrapStarted = &wg wg.Done() + return asyncResult }) d.AssignShardSet(shardSet) @@ -493,6 +508,31 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { wg.Wait() } +func TestDatabaseAssignShardSetShouldPanic(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) + defer func() { + close(mapCh) + }() + + mediator := NewMockdatabaseMediator(ctrl) + mediator.EXPECT().IsOpen().Return(true) + mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).Return(errors.New("unknown error")) + d.mediator = mediator + + shards := append(sharding.NewShards([]uint32{0, 1}, shard.Available), + sharding.NewShards([]uint32{2}, shard.Initializing)...) + shardSet, err := sharding.NewShardSet(shards, nil) + require.NoError(t, err) + + defer instrument.SetShouldPanicEnvironmentVariable(true)() + require.Panics(t, func() { + d.AssignShardSet(shardSet) + }) +} + func TestDatabaseRemoveNamespace(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index e6363f75f0..03e3ded37a 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -69,13 +69,6 @@ type fileOpState struct { NumFailures int } -type runType int - -const ( - syncRun runType = iota - asyncRun -) - type forceType int const ( @@ -139,47 +132,42 @@ func (m *fileSystemManager) Status() fileOpStatus { return status } -func (m *fileSystemManager) Run( - t time.Time, - runType runType, - forceType forceType, -) bool { +func (m *fileSystemManager) Run(t time.Time) bool { m.Lock() - if forceType == noForce && !m.shouldRunWithLock() { + if !m.shouldRunWithLock() { m.Unlock() return false } m.status = fileOpInProgress m.Unlock() + defer func() { + m.Lock() + m.status = fileOpNotStarted + m.Unlock() + }() + + m.log.Debug("starting warm flush", zap.Time("time", t)) + // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. - flushFn := func() { + if err := m.WarmFlushCleanup(t); err != nil { // NB(r): Use invariant here since flush errors were introduced // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.WarmFlushCleanup(t, m.database.IsBootstrapped()); err != nil { - instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) - }) - } - if err := m.Flush(t); err != nil { - instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) - }) - } - m.Lock() - m.status = fileOpNotStarted - m.Unlock() + instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) + }) } - - if runType == syncRun { - flushFn() - } else { - go flushFn() + if err := m.Flush(t); err != nil { + instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) + }) } + m.log.Debug("completed warm flush", zap.Time("time", t)) + return true } diff --git a/src/dbnode/storage/fs_test.go b/src/dbnode/storage/fs_test.go index 9ae349bf1b..10186d1644 100644 --- a/src/dbnode/storage/fs_test.go +++ b/src/dbnode/storage/fs_test.go @@ -39,8 +39,9 @@ func TestFileSystemManagerShouldRunDuringBootstrap(t *testing.T) { fsm := newFileSystemManager(database, nil, DefaultTestOptions()) mgr := fsm.(*fileSystemManager) - database.EXPECT().IsBootstrapped().Return(false) + database.EXPECT().IsBootstrapped().Return(false).Times(2) require.False(t, mgr.shouldRunWithLock()) + require.False(t, mgr.Run(time.Now())) database.EXPECT().IsBootstrapped().Return(true) require.True(t, mgr.shouldRunWithLock()) @@ -72,7 +73,7 @@ func TestFileSystemManagerShouldRunEnableDisable(t *testing.T) { require.True(t, mgr.shouldRunWithLock()) } -func TestFileSystemManagerRun(t *testing.T) { +func TestFileSystemManagerRunCleanupPanic(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() database := newMockdatabase(ctrl) @@ -87,9 +88,32 @@ func TestFileSystemManagerRun(t *testing.T) { ts := time.Now() gomock.InOrder( - cm.EXPECT().WarmFlushCleanup(ts, true).Return(errors.New("foo")), + cm.EXPECT().WarmFlushCleanup(ts).Return(errors.New("foo")), ) defer instrument.SetShouldPanicEnvironmentVariable(true)() - require.Panics(t, func() { mgr.Run(ts, syncRun, noForce) }) + require.Panics(t, func() { mgr.Run(ts) }) +} + +func TestFileSystemManagerRunFlushPanic(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + database := newMockdatabase(ctrl) + database.EXPECT().IsBootstrapped().Return(true).AnyTimes() + + fm := NewMockdatabaseFlushManager(ctrl) + cm := NewMockdatabaseCleanupManager(ctrl) + fsm := newFileSystemManager(database, nil, DefaultTestOptions()) + mgr := fsm.(*fileSystemManager) + mgr.databaseFlushManager = fm + mgr.databaseCleanupManager = cm + + ts := time.Now() + gomock.InOrder( + cm.EXPECT().WarmFlushCleanup(ts).Return(nil), + fm.EXPECT().Flush(ts).Return(errors.New("flush error")), + ) + + defer instrument.SetShouldPanicEnvironmentVariable(true)() + require.Panics(t, func() { mgr.Run(ts) }) } diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index efe70159e7..974a1298c7 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -39,7 +39,8 @@ type ( ) const ( - fileOpCheckInterval = time.Second + fileOpCheckInterval = time.Second + defaultExternalChannelSize = 8 mediatorNotOpen mediatorState = iota mediatorOpen @@ -85,6 +86,7 @@ type mediator struct { mediatorTimeBarrier mediatorTimeBarrier closedCh chan struct{} tickInterval time.Duration + fileOpsProcesses []FileOpsProcess backgroundProcesses []BackgroundProcess } @@ -97,19 +99,22 @@ func newMediator(database database, commitlog commitlog.CommitLog, opts Options) nowFn = opts.ClockOptions().NowFn() ) d := &mediator{ - database: database, - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - metrics: newMediatorMetrics(scope), - state: mediatorNotOpen, - mediatorTimeBarrier: newMediatorTimeBarrier(nowFn, iOpts), - closedCh: make(chan struct{}), - tickInterval: opts.MediatorTickInterval(), + database: database, + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + sleepFn: time.Sleep, + metrics: newMediatorMetrics(scope), + state: mediatorNotOpen, + closedCh: make(chan struct{}), + tickInterval: opts.MediatorTickInterval(), } - fsm := newFileSystemManager(database, commitlog, opts) d.databaseFileSystemManager = fsm + d.fileOpsProcesses = []FileOpsProcess{ + FileOpsProcessFn(d.ongoingFileSystemProcesses), + FileOpsProcessFn(d.ongoingColdFlushProcesses), + } + d.mediatorTimeBarrier = newMediatorTimeBarrier(nowFn, iOpts, len(d.fileOpsProcesses)) // NB(bodu): Cold flush needs its own persist manager now // that its running in its own thread. @@ -138,6 +143,18 @@ func (m *mediator) RegisterBackgroundProcess(process BackgroundProcess) error { return nil } +func (m *mediator) EnqueueMutuallyExclusiveFn(fn func()) error { + m.RLock() + if m.state != mediatorOpen { + m.RUnlock() + return errMediatorNotOpen + } + m.RUnlock() + + m.mediatorTimeBarrier.externalFnCh <- fn + return nil +} + func (m *mediator) Open() error { m.Lock() defer m.Unlock() @@ -147,8 +164,9 @@ func (m *mediator) Open() error { m.state = mediatorOpen go m.reportLoop() - go m.ongoingFileSystemProcesses() - go m.ongoingColdFlushProcesses() + for _, fileOpsProcess := range m.fileOpsProcesses { + go fileOpsProcess.Start() + } go m.ongoingTick() for _, process := range m.backgroundProcesses { @@ -232,7 +250,7 @@ func (m *mediator) ongoingFileSystemProcesses() { m.sleepFn(m.tickInterval) // Check if the mediator is already closed. - if !m.isOpen() { + if !m.IsOpen() { return } @@ -253,7 +271,7 @@ func (m *mediator) ongoingColdFlushProcesses() { m.sleepFn(m.tickInterval) // Check if the mediator is already closed. - if !m.isOpen() { + if !m.IsOpen() { return } @@ -275,7 +293,7 @@ func (m *mediator) ongoingTick() { m.sleepFn(m.tickInterval) // Check if the mediator is already closed. - if !m.isOpen() { + if !m.IsOpen() { return } @@ -305,7 +323,7 @@ func (m *mediator) runFileSystemProcesses() { return } - m.databaseFileSystemManager.Run(mediatorTime, syncRun, noForce) + m.databaseFileSystemManager.Run(mediatorTime) } func (m *mediator) runColdFlushProcesses() { @@ -335,7 +353,7 @@ func (m *mediator) reportLoop() { } } -func (m *mediator) isOpen() bool { +func (m *mediator) IsOpen() bool { m.RLock() defer m.RUnlock() return m.state == mediatorOpen @@ -413,10 +431,12 @@ type mediatorTimeBarrier struct { // by the mutex. mediatorTime time.Time numFsProcessesWaiting int + numMaxWaiters int - nowFn func() time.Time - iOpts instrument.Options - releaseCh chan time.Time + nowFn func() time.Time + iOpts instrument.Options + releaseCh chan time.Time + externalFnCh chan func() } // initialMediatorTime should only be used to obtain the initial time for @@ -468,17 +488,34 @@ func (b *mediatorTimeBarrier) maybeRelease() (time.Time, error) { } b.mediatorTime = newMediatorTime + + // If all waiters are waiting, we can safely call mutually exclusive external functions. + if numWaiters == b.numMaxWaiters { + // Drain the channel. + Loop: + for { + select { + case fn := <-b.externalFnCh: + fn() + default: + break Loop + } + } + } + for i := 0; i < numWaiters; i++ { b.releaseCh <- b.mediatorTime } return b.mediatorTime, nil } -func newMediatorTimeBarrier(nowFn func() time.Time, iOpts instrument.Options) mediatorTimeBarrier { +func newMediatorTimeBarrier(nowFn func() time.Time, iOpts instrument.Options, maxWaiters int) mediatorTimeBarrier { return mediatorTimeBarrier{ - mediatorTime: nowFn(), - nowFn: nowFn, - iOpts: iOpts, - releaseCh: make(chan time.Time, 0), + mediatorTime: nowFn(), + nowFn: nowFn, + iOpts: iOpts, + numMaxWaiters: maxWaiters, + releaseCh: make(chan time.Time), + externalFnCh: make(chan func(), defaultExternalChannelSize), } } diff --git a/src/dbnode/storage/mediator_test.go b/src/dbnode/storage/mediator_test.go index 53807529fe..10b24fdab9 100644 --- a/src/dbnode/storage/mediator_test.go +++ b/src/dbnode/storage/mediator_test.go @@ -21,6 +21,7 @@ package storage import ( + "sync" "testing" "time" @@ -116,3 +117,52 @@ func TestDatabaseMediatorDisableFileOpsAndWait(t *testing.T) { m.DisableFileOpsAndWait() require.Equal(t, 3, len(slept)) } + +func TestDatabaseMediatorEnqueueMutuallyExclusiveFnAndExecute(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + opts := DefaultTestOptions() + opts = opts. + SetMediatorTickInterval(time.Millisecond * 100). + SetBootstrapProcessProvider(nil) + + db := NewMockdatabase(ctrl) + db.EXPECT().Options().Return(opts).AnyTimes() + db.EXPECT().IsBootstrapped().Return(true).AnyTimes() + db.EXPECT().IsBootstrappedAndDurable().Return(true).AnyTimes() + + med, err := newMediator(db, nil, opts) + require.NoError(t, err) + m := med.(*mediator) + + tm := NewMockdatabaseTickManager(ctrl) + tm.EXPECT().Tick(force, gomock.Any()).Return(nil).AnyTimes() + m.databaseTickManager = tm + fsm := NewMockdatabaseFileSystemManager(ctrl) + fsm.EXPECT().Run(gomock.Any()).Return(true).AnyTimes() + fsm.EXPECT().Report().AnyTimes() + m.databaseFileSystemManager = fsm + cfm := NewMockdatabaseColdFlushManager(ctrl) + cfm.EXPECT().Run(gomock.Any()).Return(true).AnyTimes() + cfm.EXPECT().Report().AnyTimes() + m.databaseColdFlushManager = cfm + + require.NoError(t, med.Open()) + defer func() { + require.NoError(t, med.Close()) + }() + + fsm.EXPECT().Status().Return(fileOpNotStarted).AnyTimes() + cfm.EXPECT().Status().Return(fileOpNotStarted).AnyTimes() + + var wg sync.WaitGroup + wg.Add(1) + require.NoError(t, med.EnqueueMutuallyExclusiveFn(func() { + defer wg.Done() + require.Equal(t, fileOpNotStarted, m.databaseFileSystemManager.Status()) + require.Equal(t, fileOpNotStarted, m.databaseColdFlushManager.Status()) + })) + + wg.Wait() +} diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index c5a7d8b6f4..ac05a4acb0 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -491,9 +491,10 @@ func (n *dbNamespace) assignShardSet( opts assignShardSetOptions, ) { var ( - incoming = make(map[uint32]struct{}, len(shardSet.All())) - existing []databaseShard - closing []databaseShard + incoming = make(map[uint32]struct{}, len(shardSet.All())) + createdShardIds []uint32 + existing []databaseShard + closing []databaseShard ) for _, shard := range shardSet.AllIDs() { incoming[shard] = struct{}{} @@ -525,6 +526,7 @@ func (n *dbNamespace) assignShardSet( n.shards[shard] = newDatabaseShard(metadata, shard, n.blockRetriever, n.namespaceReaderMgr, n.increasingIndex, n.reverseIndex, opts.needsBootstrap, n.opts, n.seriesOpts) + createdShardIds = append(createdShardIds, shard) // NB(bodu): We only record shard add metrics for shards created in non // initial assignments. if !opts.initialAssignment { @@ -532,6 +534,14 @@ func (n *dbNamespace) assignShardSet( } } + if len(createdShardIds) > 0 { + n.log.Info("created new shards", + zap.Stringer("namespace", n.ID()), + zap.Uint32s("shards", createdShardIds), + zap.Bool("initialAssignment", opts.initialAssignment), + zap.Bool("needsBootstrap", opts.needsBootstrap)) + } + if idx := n.reverseIndex; idx != nil { idx.AssignShardSet(shardSet) } @@ -1045,6 +1055,9 @@ func (n *dbNamespace) Bootstrap( } if !bootstrapped { // NB(r): Not bootstrapped in this bootstrap run. + n.log.Debug("skipping already bootstrapped shard", + zap.Uint32("shard", shardID), + zap.Stringer("namespace", n.id)) continue } diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 4052e4f7cf..3337110eaa 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2855,6 +2855,20 @@ func (mr *MockdatabaseBootstrapManagerMockRecorder) Bootstrap() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseBootstrapManager)(nil).Bootstrap)) } +// BootstrapEnqueue mocks base method +func (m *MockdatabaseBootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BootstrapEnqueue") + ret0, _ := ret[0].(*BootstrapAsyncResult) + return ret0 +} + +// BootstrapEnqueue indicates an expected call of BootstrapEnqueue +func (mr *MockdatabaseBootstrapManagerMockRecorder) BootstrapEnqueue() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapEnqueue", reflect.TypeOf((*MockdatabaseBootstrapManager)(nil).BootstrapEnqueue)) +} + // Report mocks base method func (m *MockdatabaseBootstrapManager) Report() { m.ctrl.T.Helper() @@ -2955,31 +2969,31 @@ func (m *MockdatabaseCleanupManager) EXPECT() *MockdatabaseCleanupManagerMockRec } // WarmFlushCleanup mocks base method -func (m *MockdatabaseCleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { +func (m *MockdatabaseCleanupManager) WarmFlushCleanup(t time.Time) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WarmFlushCleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "WarmFlushCleanup", t) ret0, _ := ret[0].(error) return ret0 } // WarmFlushCleanup indicates an expected call of WarmFlushCleanup -func (mr *MockdatabaseCleanupManagerMockRecorder) WarmFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { +func (mr *MockdatabaseCleanupManagerMockRecorder) WarmFlushCleanup(t interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).WarmFlushCleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).WarmFlushCleanup), t) } // ColdFlushCleanup mocks base method -func (m *MockdatabaseCleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { +func (m *MockdatabaseCleanupManager) ColdFlushCleanup(t time.Time) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ColdFlushCleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "ColdFlushCleanup", t) ret0, _ := ret[0].(error) return ret0 } // ColdFlushCleanup indicates an expected call of ColdFlushCleanup -func (mr *MockdatabaseCleanupManagerMockRecorder) ColdFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { +func (mr *MockdatabaseCleanupManagerMockRecorder) ColdFlushCleanup(t interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).ColdFlushCleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).ColdFlushCleanup), t) } // Report mocks base method @@ -3074,17 +3088,17 @@ func (mr *MockdatabaseFileSystemManagerMockRecorder) Status() *gomock.Call { } // Run mocks base method -func (m *MockdatabaseFileSystemManager) Run(t time.Time, runType runType, forceType forceType) bool { +func (m *MockdatabaseFileSystemManager) Run(t time.Time) bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Run", t, runType, forceType) + ret := m.ctrl.Call(m, "Run", t) ret0, _ := ret[0].(bool) return ret0 } // Run indicates an expected call of Run -func (mr *MockdatabaseFileSystemManagerMockRecorder) Run(t, runType, forceType interface{}) *gomock.Call { +func (mr *MockdatabaseFileSystemManagerMockRecorder) Run(t interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockdatabaseFileSystemManager)(nil).Run), t, runType, forceType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockdatabaseFileSystemManager)(nil).Run), t) } // Report mocks base method @@ -3138,31 +3152,31 @@ func (m *MockdatabaseColdFlushManager) EXPECT() *MockdatabaseColdFlushManagerMoc } // WarmFlushCleanup mocks base method -func (m *MockdatabaseColdFlushManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { +func (m *MockdatabaseColdFlushManager) WarmFlushCleanup(t time.Time) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WarmFlushCleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "WarmFlushCleanup", t) ret0, _ := ret[0].(error) return ret0 } // WarmFlushCleanup indicates an expected call of WarmFlushCleanup -func (mr *MockdatabaseColdFlushManagerMockRecorder) WarmFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { +func (mr *MockdatabaseColdFlushManagerMockRecorder) WarmFlushCleanup(t interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).WarmFlushCleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).WarmFlushCleanup), t) } // ColdFlushCleanup mocks base method -func (m *MockdatabaseColdFlushManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { +func (m *MockdatabaseColdFlushManager) ColdFlushCleanup(t time.Time) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ColdFlushCleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "ColdFlushCleanup", t) ret0, _ := ret[0].(error) return ret0 } // ColdFlushCleanup indicates an expected call of ColdFlushCleanup -func (mr *MockdatabaseColdFlushManagerMockRecorder) ColdFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { +func (mr *MockdatabaseColdFlushManagerMockRecorder) ColdFlushCleanup(t interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).ColdFlushCleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).ColdFlushCleanup), t) } // Report mocks base method @@ -3344,6 +3358,41 @@ func (mr *MockBackgroundProcessMockRecorder) Report() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Report", reflect.TypeOf((*MockBackgroundProcess)(nil).Report)) } +// MockFileOpsProcess is a mock of FileOpsProcess interface +type MockFileOpsProcess struct { + ctrl *gomock.Controller + recorder *MockFileOpsProcessMockRecorder +} + +// MockFileOpsProcessMockRecorder is the mock recorder for MockFileOpsProcess +type MockFileOpsProcessMockRecorder struct { + mock *MockFileOpsProcess +} + +// NewMockFileOpsProcess creates a new mock instance +func NewMockFileOpsProcess(ctrl *gomock.Controller) *MockFileOpsProcess { + mock := &MockFileOpsProcess{ctrl: ctrl} + mock.recorder = &MockFileOpsProcessMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockFileOpsProcess) EXPECT() *MockFileOpsProcessMockRecorder { + return m.recorder +} + +// Start mocks base method +func (m *MockFileOpsProcess) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start +func (mr *MockFileOpsProcessMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockFileOpsProcess)(nil).Start)) +} + // MockdatabaseRepairer is a mock of databaseRepairer interface type MockdatabaseRepairer struct { ctrl *gomock.Controller @@ -3519,6 +3568,20 @@ func (mr *MockdatabaseMediatorMockRecorder) IsBootstrapped() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsBootstrapped", reflect.TypeOf((*MockdatabaseMediator)(nil).IsBootstrapped)) } +// IsOpen mocks base method +func (m *MockdatabaseMediator) IsOpen() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsOpen") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsOpen indicates an expected call of IsOpen +func (mr *MockdatabaseMediatorMockRecorder) IsOpen() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsOpen", reflect.TypeOf((*MockdatabaseMediator)(nil).IsOpen)) +} + // LastBootstrapCompletionTime mocks base method func (m *MockdatabaseMediator) LastBootstrapCompletionTime() (time0.UnixNano, bool) { m.ctrl.T.Helper() @@ -3549,6 +3612,20 @@ func (mr *MockdatabaseMediatorMockRecorder) Bootstrap() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseMediator)(nil).Bootstrap)) } +// BootstrapEnqueue mocks base method +func (m *MockdatabaseMediator) BootstrapEnqueue() *BootstrapAsyncResult { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BootstrapEnqueue") + ret0, _ := ret[0].(*BootstrapAsyncResult) + return ret0 +} + +// BootstrapEnqueue indicates an expected call of BootstrapEnqueue +func (mr *MockdatabaseMediatorMockRecorder) BootstrapEnqueue() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapEnqueue", reflect.TypeOf((*MockdatabaseMediator)(nil).BootstrapEnqueue)) +} + // DisableFileOpsAndWait mocks base method func (m *MockdatabaseMediator) DisableFileOpsAndWait() { m.ctrl.T.Helper() @@ -3628,6 +3705,20 @@ func (mr *MockdatabaseMediatorMockRecorder) LastSuccessfulSnapshotStartTime() *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastSuccessfulSnapshotStartTime", reflect.TypeOf((*MockdatabaseMediator)(nil).LastSuccessfulSnapshotStartTime)) } +// EnqueueMutuallyExclusiveFn mocks base method +func (m *MockdatabaseMediator) EnqueueMutuallyExclusiveFn(fn func()) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EnqueueMutuallyExclusiveFn", fn) + ret0, _ := ret[0].(error) + return ret0 +} + +// EnqueueMutuallyExclusiveFn indicates an expected call of EnqueueMutuallyExclusiveFn +func (mr *MockdatabaseMediatorMockRecorder) EnqueueMutuallyExclusiveFn(fn interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueMutuallyExclusiveFn", reflect.TypeOf((*MockdatabaseMediator)(nil).EnqueueMutuallyExclusiveFn), fn) +} + // MockColdFlushNsOpts is a mock of ColdFlushNsOpts interface type MockColdFlushNsOpts struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index eca40f3f59..266d0d0723 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -822,6 +822,9 @@ type databaseBootstrapManager interface { // Bootstrap performs bootstrapping for all namespaces and shards owned. Bootstrap() (BootstrapResult, error) + // BootstrapEnqueue performs bootstrapping asynchronously for all namespaces and shards owned. + BootstrapEnqueue() *BootstrapAsyncResult + // Report reports runtime information. Report() } @@ -832,6 +835,37 @@ type BootstrapResult struct { AlreadyBootstrapping bool } +// BootstrapAsyncResult is a bootstrap async result. +type BootstrapAsyncResult struct { + bootstrapStarted *sync.WaitGroup + bootstrapCompleted *sync.WaitGroup + bootstrapResult BootstrapResult +} + +func newBootstrapAsyncResult() *BootstrapAsyncResult { + var ( + wgStarted sync.WaitGroup + wgCompleted sync.WaitGroup + ) + wgStarted.Add(1) + wgCompleted.Add(1) + return &BootstrapAsyncResult{ + bootstrapStarted: &wgStarted, + bootstrapCompleted: &wgCompleted, + } +} + +// Result will wait for bootstrap to complete and return BootstrapResult. +func (b *BootstrapAsyncResult) Result() BootstrapResult { + b.bootstrapCompleted.Wait() + return b.bootstrapResult +} + +// WaitForStart waits until bootstrap has been started. +func (b *BootstrapAsyncResult) WaitForStart() { + b.bootstrapStarted.Wait() +} + // databaseFlushManager manages flushing in-memory data to persistent storage. type databaseFlushManager interface { // Flush flushes in-memory data to persistent storage. @@ -850,10 +884,10 @@ type databaseFlushManager interface { // and cleaning up certain types of data concurrently w/ either can be problematic. type databaseCleanupManager interface { // WarmFlushCleanup cleans up data not needed in the persistent storage before a warm flush. - WarmFlushCleanup(t time.Time, isBootstrapped bool) error + WarmFlushCleanup(t time.Time) error // ColdFlushCleanup cleans up data not needed in the persistent storage before a cold flush. - ColdFlushCleanup(t time.Time, isBootstrapped bool) error + ColdFlushCleanup(t time.Time) error // Report reports runtime information. Report() @@ -876,11 +910,7 @@ type databaseFileSystemManager interface { // Run attempts to perform all filesystem-related operations, // returning true if those operations are performed, and false otherwise. - Run( - t time.Time, - runType runType, - forceType forceType, - ) bool + Run(t time.Time) bool // Report reports runtime information. Report() @@ -936,6 +966,21 @@ type BackgroundProcess interface { Report() } +// FileOpsProcess is a background process that is run by the database. +type FileOpsProcess interface { + // Start launches the FileOpsProcess to run asynchronously. + Start() +} + +// FileOpsProcessFn is a file ops process function. +type FileOpsProcessFn func() + +// Start starts file ops process function. +func (f FileOpsProcessFn) Start() { + // delegate to the anonymous function. + f() +} + // databaseRepairer repairs in-memory database data. type databaseRepairer interface { BackgroundProcess @@ -963,6 +1008,9 @@ type databaseMediator interface { // IsBootstrapped returns whether the database is bootstrapped. IsBootstrapped() bool + // IsOpen returns whether mediator is opened. + IsOpen() bool + // LastBootstrapCompletionTime returns the last bootstrap completion time, // if any. LastBootstrapCompletionTime() (xtime.UnixNano, bool) @@ -970,6 +1018,9 @@ type databaseMediator interface { // Bootstrap bootstraps the database with file operations performed at the end. Bootstrap() (BootstrapResult, error) + // BootstrapEnqueue bootstraps the database asynchronously with file operations performed at the end. + BootstrapEnqueue() *BootstrapAsyncResult + // DisableFileOpsAndWait disables file operations. DisableFileOpsAndWait() @@ -988,6 +1039,10 @@ type databaseMediator interface { // LastSuccessfulSnapshotStartTime returns the start time of the last // successful snapshot, if any. LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool) + + // EnqueueMutuallyExclusiveFn enqueues function to be executed mutually exclusively, + // when file operations are idle. + EnqueueMutuallyExclusiveFn(fn func()) error } // ColdFlushNsOpts are options for OnColdFlush.ColdFlushNamespace.