From 4fe5430d6b11acc8dda934eb658e8dd1e52b8198 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Tue, 30 Mar 2021 11:51:02 +0300 Subject: [PATCH 01/18] use `IsBootstrappedAndDurable()` instead of `IsBootstrapped()` otherwise warm and cold flushes might fail because some shards might still be not bootstrapped. --- src/dbnode/storage/coldflush.go | 4 ++-- src/dbnode/storage/coldflush_test.go | 2 +- src/dbnode/storage/fs.go | 4 ++-- src/dbnode/storage/fs_test.go | 10 +++++----- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index 9229b5de8b..319ab41693 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -106,7 +106,7 @@ func (m *coldFlushManager) Run(t time.Time) bool { // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil { + if err := m.ColdFlushCleanup(t, m.database.IsBootstrappedAndDurable()); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err)) @@ -195,7 +195,7 @@ func (m *coldFlushManager) Report() { } func (m *coldFlushManager) shouldRunWithLock() bool { - return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrapped() + return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrappedAndDurable() } type coldFlushNsOpts struct { diff --git a/src/dbnode/storage/coldflush_test.go b/src/dbnode/storage/coldflush_test.go index 55eb3d355c..a6863b9750 100644 --- a/src/dbnode/storage/coldflush_test.go +++ b/src/dbnode/storage/coldflush_test.go @@ -57,7 +57,7 @@ func TestColdFlushManagerFlushAlreadyInProgress(t *testing.T) { testOpts := DefaultTestOptions().SetPersistManager(mockPersistManager) db := newMockdatabase(ctrl) db.EXPECT().Options().Return(testOpts).AnyTimes() - db.EXPECT().IsBootstrapped().Return(true).AnyTimes() + db.EXPECT().IsBootstrappedAndDurable().Return(true).AnyTimes() db.EXPECT().OwnedNamespaces().Return(nil, nil).AnyTimes() cfm := newColdFlushManager(db, mockPersistManager, testOpts).(*coldFlushManager) diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index e6363f75f0..3286cfe68b 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -158,7 +158,7 @@ func (m *fileSystemManager) Run( // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.WarmFlushCleanup(t, m.database.IsBootstrapped()); err != nil { + if err := m.WarmFlushCleanup(t, m.database.IsBootstrappedAndDurable()); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) @@ -189,5 +189,5 @@ func (m *fileSystemManager) Report() { } func (m *fileSystemManager) shouldRunWithLock() bool { - return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrapped() + return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrappedAndDurable() } diff --git a/src/dbnode/storage/fs_test.go b/src/dbnode/storage/fs_test.go index 9ae349bf1b..1e601b2089 100644 --- a/src/dbnode/storage/fs_test.go +++ b/src/dbnode/storage/fs_test.go @@ -39,10 +39,10 @@ func TestFileSystemManagerShouldRunDuringBootstrap(t *testing.T) { fsm := newFileSystemManager(database, nil, DefaultTestOptions()) mgr := fsm.(*fileSystemManager) - database.EXPECT().IsBootstrapped().Return(false) + database.EXPECT().IsBootstrappedAndDurable().Return(false) require.False(t, mgr.shouldRunWithLock()) - database.EXPECT().IsBootstrapped().Return(true) + database.EXPECT().IsBootstrappedAndDurable().Return(true) require.True(t, mgr.shouldRunWithLock()) } @@ -52,7 +52,7 @@ func TestFileSystemManagerShouldRunWhileRunning(t *testing.T) { database := newMockdatabase(ctrl) fsm := newFileSystemManager(database, nil, DefaultTestOptions()) mgr := fsm.(*fileSystemManager) - database.EXPECT().IsBootstrapped().Return(true) + database.EXPECT().IsBootstrappedAndDurable().Return(true) require.True(t, mgr.shouldRunWithLock()) mgr.status = fileOpInProgress require.False(t, mgr.shouldRunWithLock()) @@ -64,7 +64,7 @@ func TestFileSystemManagerShouldRunEnableDisable(t *testing.T) { database := newMockdatabase(ctrl) fsm := newFileSystemManager(database, nil, DefaultTestOptions()) mgr := fsm.(*fileSystemManager) - database.EXPECT().IsBootstrapped().Return(true).AnyTimes() + database.EXPECT().IsBootstrappedAndDurable().Return(true).AnyTimes() require.True(t, mgr.shouldRunWithLock()) require.NotEqual(t, fileOpInProgress, mgr.Disable()) require.False(t, mgr.shouldRunWithLock()) @@ -76,7 +76,7 @@ func TestFileSystemManagerRun(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() database := newMockdatabase(ctrl) - database.EXPECT().IsBootstrapped().Return(true).AnyTimes() + database.EXPECT().IsBootstrappedAndDurable().Return(true).AnyTimes() fm := NewMockdatabaseFlushManager(ctrl) cm := NewMockdatabaseCleanupManager(ctrl) From b4ac31a5f6e082d72fccbefcd58e34a2ff2c6ea2 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Thu, 1 Apr 2021 16:16:51 +0300 Subject: [PATCH 02/18] do not run file ops (cold and warm flush) when new shards are being assigned. check for bootstrapped shards when doing cold flush cleanups. --- src/dbnode/storage/cleanup.go | 9 +++++++++ src/dbnode/storage/cleanup_test.go | 3 +++ src/dbnode/storage/coldflush.go | 4 ++-- src/dbnode/storage/coldflush_test.go | 2 +- src/dbnode/storage/database.go | 14 +++++++++++--- src/dbnode/storage/database_test.go | 1 + src/dbnode/storage/fs.go | 4 ++-- src/dbnode/storage/fs_test.go | 10 +++++----- src/dbnode/storage/namespace.go | 3 +++ 9 files changed, 37 insertions(+), 13 deletions(-) diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 96dde48304..4060e36d19 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -263,6 +263,9 @@ func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn fu var activeShards []string namespaceDirPath := filesetFilesDirPathFn(filePathPrefix, n.ID()) for _, s := range n.OwnedShards() { + if !s.IsBootstrapped() { + continue + } shard := fmt.Sprintf("%d", s.ID()) activeShards = append(activeShards, shard) } @@ -321,6 +324,9 @@ func (m *cleanupManager) cleanupDuplicateIndexFiles(namespaces []databaseNamespa func (m *cleanupManager) cleanupExpiredNamespaceDataFiles(earliestToRetain time.Time, shards []databaseShard) error { multiErr := xerrors.NewMultiError() for _, shard := range shards { + if !shard.IsBootstrapped() { + continue + } if err := shard.CleanupExpiredFileSets(earliestToRetain); err != nil { multiErr = multiErr.Add(err) } @@ -332,6 +338,9 @@ func (m *cleanupManager) cleanupExpiredNamespaceDataFiles(earliestToRetain time. func (m *cleanupManager) cleanupCompactedNamespaceDataFiles(shards []databaseShard) error { multiErr := xerrors.NewMultiError() for _, shard := range shards { + if !shard.IsBootstrapped() { + continue + } if err := shard.CleanupCompactedFileSets(); err != nil { multiErr = multiErr.Add(err) } diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index acc15dc251..578773d05c 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -286,6 +286,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) { for i := 0; i < 3; i++ { shard := NewMockdatabaseShard(ctrl) shard.EXPECT().ID().Return(uint32(i)).AnyTimes() + shard.EXPECT().IsBootstrapped().Return(true).AnyTimes() shard.EXPECT().CleanupExpiredFileSets(gomock.Any()).Return(nil).AnyTimes() shard.EXPECT().CleanupCompactedFileSets().Return(nil).AnyTimes() @@ -437,6 +438,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { shard := NewMockdatabaseShard(ctrl) expectedEarliestToRetain := retention.FlushTimeStart(ns.Options().RetentionOptions(), ts) + shard.EXPECT().IsBootstrapped().Return(true).AnyTimes() shard.EXPECT().CleanupExpiredFileSets(expectedEarliestToRetain).Return(nil) shard.EXPECT().CleanupCompactedFileSets().Return(nil) shard.EXPECT().ID().Return(uint32(0)).AnyTimes() @@ -469,6 +471,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) { shard := NewMockdatabaseShard(ctrl) shard.EXPECT().ID().Return(uint32(0)).AnyTimes() + shard.EXPECT().IsBootstrapped().Return(true).AnyTimes() ns.EXPECT().OwnedShards().Return([]databaseShard{shard}).AnyTimes() ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index 319ab41693..9229b5de8b 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -106,7 +106,7 @@ func (m *coldFlushManager) Run(t time.Time) bool { // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.ColdFlushCleanup(t, m.database.IsBootstrappedAndDurable()); err != nil { + if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err)) @@ -195,7 +195,7 @@ func (m *coldFlushManager) Report() { } func (m *coldFlushManager) shouldRunWithLock() bool { - return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrappedAndDurable() + return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrapped() } type coldFlushNsOpts struct { diff --git a/src/dbnode/storage/coldflush_test.go b/src/dbnode/storage/coldflush_test.go index a6863b9750..55eb3d355c 100644 --- a/src/dbnode/storage/coldflush_test.go +++ b/src/dbnode/storage/coldflush_test.go @@ -57,7 +57,7 @@ func TestColdFlushManagerFlushAlreadyInProgress(t *testing.T) { testOpts := DefaultTestOptions().SetPersistManager(mockPersistManager) db := newMockdatabase(ctrl) db.EXPECT().Options().Return(testOpts).AnyTimes() - db.EXPECT().IsBootstrappedAndDurable().Return(true).AnyTimes() + db.EXPECT().IsBootstrapped().Return(true).AnyTimes() db.EXPECT().OwnedNamespaces().Return(nil, nil).AnyTimes() cfm := newColdFlushManager(db, mockPersistManager, testOpts).(*coldFlushManager) diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 00e353d64e..5c5dbf7290 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -460,6 +460,9 @@ func (d *db) AssignShardSet(shardSet sharding.ShardSet) { d.shardSet = shardSet if receivedNewShards { d.lastReceivedNewShards = d.nowFn() + // we need to disable file ops so that warm/cold flush is not running during shards update. + // bootstrap will enable file ops when it completes. + d.mediator.DisableFileOpsAndWait() } for _, elem := range d.namespaces.Iter() { @@ -474,7 +477,10 @@ func (d *db) AssignShardSet(shardSet sharding.ShardSet) { // // These small bootstraps can significantly delay topology changes as they prevent // the nodes from marking themselves as bootstrapped and durable, for example. - d.queueBootstrapWithLock() + if !d.queueBootstrapWithLock() { + // enable file ops if bootstrap was not queued. + d.mediator.EnableFileOps() + } } } @@ -507,7 +513,7 @@ func (d *db) ShardSet() sharding.ShardSet { return shardSet } -func (d *db) queueBootstrapWithLock() { +func (d *db) queueBootstrapWithLock() bool { // Only perform a bootstrap if at least one bootstrap has already occurred. This enables // the ability to open the clustered database and assign shardsets to the non-clustered // database when it receives an initial topology (as well as topology changes) without @@ -515,7 +521,8 @@ func (d *db) queueBootstrapWithLock() { // call to Bootstrap(). After that initial bootstrap, the clustered database will keep // the non-clustered database bootstrapped by assigning it shardsets which will trigger new // bootstraps since d.bootstraps > 0 will be true. - if d.bootstraps > 0 { + enqueued := d.bootstraps > 0 + if enqueued { // NB(r): Trigger another bootstrap, if already bootstrapping this will // enqueue a new bootstrap to execute before the current bootstrap // completes. @@ -525,6 +532,7 @@ func (d *db) queueBootstrapWithLock() { } }() } + return enqueued } func (d *db) Namespace(id ident.ID) (Namespace, bool) { diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 418de14121..b4624c639d 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -487,6 +487,7 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { mediator.EXPECT().Bootstrap().Return(BootstrapResult{}, nil).Do(func() { wg.Done() }) + mediator.EXPECT().DisableFileOpsAndWait() d.AssignShardSet(shardSet) diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index 3286cfe68b..e6363f75f0 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -158,7 +158,7 @@ func (m *fileSystemManager) Run( // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.WarmFlushCleanup(t, m.database.IsBootstrappedAndDurable()); err != nil { + if err := m.WarmFlushCleanup(t, m.database.IsBootstrapped()); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) @@ -189,5 +189,5 @@ func (m *fileSystemManager) Report() { } func (m *fileSystemManager) shouldRunWithLock() bool { - return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrappedAndDurable() + return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrapped() } diff --git a/src/dbnode/storage/fs_test.go b/src/dbnode/storage/fs_test.go index 1e601b2089..9ae349bf1b 100644 --- a/src/dbnode/storage/fs_test.go +++ b/src/dbnode/storage/fs_test.go @@ -39,10 +39,10 @@ func TestFileSystemManagerShouldRunDuringBootstrap(t *testing.T) { fsm := newFileSystemManager(database, nil, DefaultTestOptions()) mgr := fsm.(*fileSystemManager) - database.EXPECT().IsBootstrappedAndDurable().Return(false) + database.EXPECT().IsBootstrapped().Return(false) require.False(t, mgr.shouldRunWithLock()) - database.EXPECT().IsBootstrappedAndDurable().Return(true) + database.EXPECT().IsBootstrapped().Return(true) require.True(t, mgr.shouldRunWithLock()) } @@ -52,7 +52,7 @@ func TestFileSystemManagerShouldRunWhileRunning(t *testing.T) { database := newMockdatabase(ctrl) fsm := newFileSystemManager(database, nil, DefaultTestOptions()) mgr := fsm.(*fileSystemManager) - database.EXPECT().IsBootstrappedAndDurable().Return(true) + database.EXPECT().IsBootstrapped().Return(true) require.True(t, mgr.shouldRunWithLock()) mgr.status = fileOpInProgress require.False(t, mgr.shouldRunWithLock()) @@ -64,7 +64,7 @@ func TestFileSystemManagerShouldRunEnableDisable(t *testing.T) { database := newMockdatabase(ctrl) fsm := newFileSystemManager(database, nil, DefaultTestOptions()) mgr := fsm.(*fileSystemManager) - database.EXPECT().IsBootstrappedAndDurable().Return(true).AnyTimes() + database.EXPECT().IsBootstrapped().Return(true).AnyTimes() require.True(t, mgr.shouldRunWithLock()) require.NotEqual(t, fileOpInProgress, mgr.Disable()) require.False(t, mgr.shouldRunWithLock()) @@ -76,7 +76,7 @@ func TestFileSystemManagerRun(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() database := newMockdatabase(ctrl) - database.EXPECT().IsBootstrappedAndDurable().Return(true).AnyTimes() + database.EXPECT().IsBootstrapped().Return(true).AnyTimes() fm := NewMockdatabaseFlushManager(ctrl) cm := NewMockdatabaseCleanupManager(ctrl) diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index c5a7d8b6f4..737514fe74 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1045,6 +1045,9 @@ func (n *dbNamespace) Bootstrap( } if !bootstrapped { // NB(r): Not bootstrapped in this bootstrap run. + n.log.Warn("skipping shard", + zap.Uint32("shard", shardID), + zap.Stringer("namespace", n.id)) continue } From 7c4a9bc2e14dcde6e326e7c2bf34265085ee35e3 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Tue, 6 Apr 2021 11:31:26 +0300 Subject: [PATCH 03/18] update unit test to validate handling of not bootstrapped shards. --- src/dbnode/storage/cleanup_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index 578773d05c..6fa74fa611 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -437,12 +437,14 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { ns.EXPECT().Options().Return(nsOpts).AnyTimes() shard := NewMockdatabaseShard(ctrl) + shardNotBootstrapped := NewMockdatabaseShard(ctrl) + shardNotBootstrapped.EXPECT().IsBootstrapped().Return(false).AnyTimes() expectedEarliestToRetain := retention.FlushTimeStart(ns.Options().RetentionOptions(), ts) shard.EXPECT().IsBootstrapped().Return(true).AnyTimes() shard.EXPECT().CleanupExpiredFileSets(expectedEarliestToRetain).Return(nil) shard.EXPECT().CleanupCompactedFileSets().Return(nil) shard.EXPECT().ID().Return(uint32(0)).AnyTimes() - ns.EXPECT().OwnedShards().Return([]databaseShard{shard}).AnyTimes() + ns.EXPECT().OwnedShards().Return([]databaseShard{shard, shardNotBootstrapped}).AnyTimes() ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() namespaces := []databaseNamespace{ns} From 02e86ec9b26df695803840a6c1389f1f370b9c76 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 7 Apr 2021 09:27:39 +0300 Subject: [PATCH 04/18] removed `IsBootstrapped` method arg (boolean args are a code smell), it is better to make a check before calling cleanup. --- src/dbnode/storage/cleanup.go | 16 ++--------- src/dbnode/storage/cleanup_test.go | 46 ++++++------------------------ src/dbnode/storage/coldflush.go | 17 ++++++++--- src/dbnode/storage/database.go | 4 +-- src/dbnode/storage/fs.go | 16 ++++++++--- src/dbnode/storage/fs_test.go | 2 +- src/dbnode/storage/namespace.go | 2 +- src/dbnode/storage/storage_mock.go | 32 ++++++++++----------- src/dbnode/storage/types.go | 4 +-- 9 files changed, 57 insertions(+), 82 deletions(-) diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 4060e36d19..1fb115a97f 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -127,13 +127,7 @@ func newCleanupManager( } } -func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { - // Don't perform any cleanup if we are not boostrapped yet. - if !isBootstrapped { - m.logger.Debug("database is still bootstrapping, terminating cleanup") - return nil - } - +func (m *cleanupManager) WarmFlushCleanup(t time.Time) error { m.Lock() m.warmFlushCleanupInProgress = true m.Unlock() @@ -178,13 +172,7 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro return multiErr.FinalError() } -func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { - // Don't perform any cleanup if we are not boostrapped yet. - if !isBootstrapped { - m.logger.Debug("database is still bootstrapping, terminating cleanup") - return nil - } - +func (m *cleanupManager) ColdFlushCleanup(t time.Time) error { m.Lock() m.coldFlushCleanupInProgress = true m.Unlock() diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index 6fa74fa611..b3dec9b9a9 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -319,7 +319,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) { return nil } - err := cleanup(mgr, ts, true) + err := cleanup(mgr, ts) if tc.expectErr { require.Error(t, err) } else { @@ -362,36 +362,7 @@ func TestCleanupManagerNamespaceCleanupBootstrapped(t *testing.T) { mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) idx.EXPECT().CleanupExpiredFileSets(ts).Return(nil) idx.EXPECT().CleanupDuplicateFileSets().Return(nil) - require.NoError(t, cleanup(mgr, ts, true)) -} - -func TestCleanupManagerNamespaceCleanupNotBootstrapped(t *testing.T) { - ctrl := gomock.NewController(xtest.Reporter{T: t}) - defer ctrl.Finish() - - ts := timeFor(36000) - rOpts := retentionOptions. - SetRetentionPeriod(21600 * time.Second). - SetBlockSize(3600 * time.Second) - nsOpts := namespaceOptions. - SetRetentionOptions(rOpts). - SetCleanupEnabled(true). - SetIndexOptions(namespace.NewIndexOptions(). - SetEnabled(true). - SetBlockSize(7200 * time.Second)) - - ns := NewMockdatabaseNamespace(ctrl) - ns.EXPECT().ID().Return(ident.StringID("ns")).AnyTimes() - ns.EXPECT().Options().Return(nsOpts).AnyTimes() - ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes() - ns.EXPECT().OwnedShards().Return(nil).AnyTimes() - - nses := []databaseNamespace{ns} - db := newMockdatabase(ctrl, ns) - db.EXPECT().OwnedNamespaces().Return(nses, nil).AnyTimes() - - mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) - require.NoError(t, cleanup(mgr, ts, false)) + require.NoError(t, cleanup(mgr, ts)) } // Test NS doesn't cleanup when flag is present @@ -424,7 +395,7 @@ func TestCleanupManagerDoesntNeedCleanup(t *testing.T) { return nil } - require.NoError(t, cleanup(mgr, ts, true)) + require.NoError(t, cleanup(mgr, ts)) } func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { @@ -453,7 +424,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(namespaces, nil).AnyTimes() mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) - require.NoError(t, cleanup(mgr, ts, true)) + require.NoError(t, cleanup(mgr, ts)) } type deleteInactiveDirectoriesCall struct { @@ -493,7 +464,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) { } mgr.deleteInactiveDirectoriesFn = deleteInactiveDirectoriesFn - require.NoError(t, cleanup(mgr, ts, true)) + require.NoError(t, cleanup(mgr, ts)) expectedCalls := []deleteInactiveDirectoriesCall{ deleteInactiveDirectoriesCall{ @@ -538,7 +509,7 @@ func TestCleanupManagerPropagatesOwnedNamespacesError(t *testing.T) { require.NoError(t, db.Open()) require.NoError(t, db.Terminate()) - require.Error(t, cleanup(mgr, ts, true)) + require.Error(t, cleanup(mgr, ts)) } func timeFor(s int64) time.Time { @@ -566,10 +537,9 @@ func newFakeActiveLogs(activeLogs persist.CommitLogFiles) fakeActiveLogs { func cleanup( mgr databaseCleanupManager, t time.Time, - isBootstrapped bool, ) error { multiErr := xerrors.NewMultiError() - multiErr = multiErr.Add(mgr.WarmFlushCleanup(t, isBootstrapped)) - multiErr = multiErr.Add(mgr.ColdFlushCleanup(t, isBootstrapped)) + multiErr = multiErr.Add(mgr.WarmFlushCleanup(t)) + multiErr = multiErr.Add(mgr.ColdFlushCleanup(t)) return multiErr.FinalError() } diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index 9229b5de8b..208b71a197 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -101,12 +101,23 @@ func (m *coldFlushManager) Run(t time.Time) bool { m.status = fileOpInProgress m.Unlock() + defer func() { + m.Lock() + m.status = fileOpNotStarted + m.Unlock() + }() + + if !m.database.IsBootstrapped() { + m.log.Debug("database is still bootstrapping, terminating cold flush") + return true + } + // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. // NB(r): Use invariant here since flush errors were introduced // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil { + if err := m.ColdFlushCleanup(t); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err)) @@ -118,9 +129,7 @@ func (m *coldFlushManager) Run(t time.Time) bool { l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err)) }) } - m.Lock() - m.status = fileOpNotStarted - m.Unlock() + return true } diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 5c5dbf7290..8fa3a1dcba 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -460,8 +460,8 @@ func (d *db) AssignShardSet(shardSet sharding.ShardSet) { d.shardSet = shardSet if receivedNewShards { d.lastReceivedNewShards = d.nowFn() - // we need to disable file ops so that warm/cold flush is not running during shards update. - // bootstrap will enable file ops when it completes. + // We need to disable file ops so that warm/cold flush is not running during shards update. + // Bootstrap will enable file ops when it completes. d.mediator.DisableFileOpsAndWait() } diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index e6363f75f0..7c458f82b3 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -154,11 +154,22 @@ func (m *fileSystemManager) Run( // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. flushFn := func() { + defer func() { + m.Lock() + m.status = fileOpNotStarted + m.Unlock() + }() + + if !m.database.IsBootstrapped() { + m.log.Debug("database is still bootstrapping, terminating warm flush") + return + } + // NB(r): Use invariant here since flush errors were introduced // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.WarmFlushCleanup(t, m.database.IsBootstrapped()); err != nil { + if err := m.WarmFlushCleanup(t); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) @@ -170,9 +181,6 @@ func (m *fileSystemManager) Run( l.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) }) } - m.Lock() - m.status = fileOpNotStarted - m.Unlock() } if runType == syncRun { diff --git a/src/dbnode/storage/fs_test.go b/src/dbnode/storage/fs_test.go index 9ae349bf1b..45310708b2 100644 --- a/src/dbnode/storage/fs_test.go +++ b/src/dbnode/storage/fs_test.go @@ -87,7 +87,7 @@ func TestFileSystemManagerRun(t *testing.T) { ts := time.Now() gomock.InOrder( - cm.EXPECT().WarmFlushCleanup(ts, true).Return(errors.New("foo")), + cm.EXPECT().WarmFlushCleanup(ts).Return(errors.New("foo")), ) defer instrument.SetShouldPanicEnvironmentVariable(true)() diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 737514fe74..70fb3829f6 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1045,7 +1045,7 @@ func (n *dbNamespace) Bootstrap( } if !bootstrapped { // NB(r): Not bootstrapped in this bootstrap run. - n.log.Warn("skipping shard", + n.log.Warn("skipping not bootstrapped shard", zap.Uint32("shard", shardID), zap.Stringer("namespace", n.id)) continue diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 4052e4f7cf..5e1480da28 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2955,31 +2955,31 @@ func (m *MockdatabaseCleanupManager) EXPECT() *MockdatabaseCleanupManagerMockRec } // WarmFlushCleanup mocks base method -func (m *MockdatabaseCleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { +func (m *MockdatabaseCleanupManager) WarmFlushCleanup(t time.Time) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WarmFlushCleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "WarmFlushCleanup", t) ret0, _ := ret[0].(error) return ret0 } // WarmFlushCleanup indicates an expected call of WarmFlushCleanup -func (mr *MockdatabaseCleanupManagerMockRecorder) WarmFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { +func (mr *MockdatabaseCleanupManagerMockRecorder) WarmFlushCleanup(t interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).WarmFlushCleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).WarmFlushCleanup), t) } // ColdFlushCleanup mocks base method -func (m *MockdatabaseCleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { +func (m *MockdatabaseCleanupManager) ColdFlushCleanup(t time.Time) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ColdFlushCleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "ColdFlushCleanup", t) ret0, _ := ret[0].(error) return ret0 } // ColdFlushCleanup indicates an expected call of ColdFlushCleanup -func (mr *MockdatabaseCleanupManagerMockRecorder) ColdFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { +func (mr *MockdatabaseCleanupManagerMockRecorder) ColdFlushCleanup(t interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).ColdFlushCleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).ColdFlushCleanup), t) } // Report mocks base method @@ -3138,31 +3138,31 @@ func (m *MockdatabaseColdFlushManager) EXPECT() *MockdatabaseColdFlushManagerMoc } // WarmFlushCleanup mocks base method -func (m *MockdatabaseColdFlushManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { +func (m *MockdatabaseColdFlushManager) WarmFlushCleanup(t time.Time) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WarmFlushCleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "WarmFlushCleanup", t) ret0, _ := ret[0].(error) return ret0 } // WarmFlushCleanup indicates an expected call of WarmFlushCleanup -func (mr *MockdatabaseColdFlushManagerMockRecorder) WarmFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { +func (mr *MockdatabaseColdFlushManagerMockRecorder) WarmFlushCleanup(t interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).WarmFlushCleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).WarmFlushCleanup), t) } // ColdFlushCleanup mocks base method -func (m *MockdatabaseColdFlushManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { +func (m *MockdatabaseColdFlushManager) ColdFlushCleanup(t time.Time) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ColdFlushCleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "ColdFlushCleanup", t) ret0, _ := ret[0].(error) return ret0 } // ColdFlushCleanup indicates an expected call of ColdFlushCleanup -func (mr *MockdatabaseColdFlushManagerMockRecorder) ColdFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { +func (mr *MockdatabaseColdFlushManagerMockRecorder) ColdFlushCleanup(t interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).ColdFlushCleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).ColdFlushCleanup), t) } // Report mocks base method diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index eca40f3f59..b440953820 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -850,10 +850,10 @@ type databaseFlushManager interface { // and cleaning up certain types of data concurrently w/ either can be problematic. type databaseCleanupManager interface { // WarmFlushCleanup cleans up data not needed in the persistent storage before a warm flush. - WarmFlushCleanup(t time.Time, isBootstrapped bool) error + WarmFlushCleanup(t time.Time) error // ColdFlushCleanup cleans up data not needed in the persistent storage before a cold flush. - ColdFlushCleanup(t time.Time, isBootstrapped bool) error + ColdFlushCleanup(t time.Time) error // Report reports runtime information. Report() From 4a21659a294fe9c7662355f6680894c86dda7a9e Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 7 Apr 2021 14:55:50 +0300 Subject: [PATCH 05/18] reduce locking on a db level when new shards are assigned. --- src/dbnode/storage/cleanup.go | 3 +++ src/dbnode/storage/database.go | 11 ++++++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 1fb115a97f..eb1dbff4ee 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -422,6 +422,9 @@ func (m *cleanupManager) cleanupSnapshotsAndCommitlogs(namespaces []databaseName for _, ns := range namespaces { for _, s := range ns.OwnedShards() { + if !s.IsBootstrapped() { + continue + } shardSnapshots, err := m.snapshotFilesFn(fsOpts.FilePathPrefix(), ns.ID(), s.ID()) if err != nil { multiErr = multiErr.Add(fmt.Errorf("err reading snapshot files for ns: %s and shard: %d, err: %v", ns.ID(), s.ID(), err)) diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 8fa3a1dcba..d21416d30a 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -453,24 +453,25 @@ func (d *db) Options() Options { func (d *db) AssignShardSet(shardSet sharding.ShardSet) { d.Lock() - defer d.Unlock() - receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) + d.Unlock() - d.shardSet = shardSet if receivedNewShards { - d.lastReceivedNewShards = d.nowFn() // We need to disable file ops so that warm/cold flush is not running during shards update. // Bootstrap will enable file ops when it completes. d.mediator.DisableFileOpsAndWait() } + d.Lock() + defer d.Unlock() + d.shardSet = shardSet for _, elem := range d.namespaces.Iter() { ns := elem.Value() ns.AssignShardSet(shardSet) } if receivedNewShards { + d.lastReceivedNewShards = d.nowFn() // Only trigger a bootstrap if the node received new shards otherwise // the nodes will perform lots of small bootstraps (that accomplish nothing) // during topology changes as other nodes mark their shards as available. @@ -478,7 +479,7 @@ func (d *db) AssignShardSet(shardSet sharding.ShardSet) { // These small bootstraps can significantly delay topology changes as they prevent // the nodes from marking themselves as bootstrapped and durable, for example. if !d.queueBootstrapWithLock() { - // enable file ops if bootstrap was not queued. + // Enable file ops if bootstrap was not queued. d.mediator.EnableFileOps() } } From 83d2c7ce7d34e1c5a6fb82cf3526378c091cb6ab Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 7 Apr 2021 15:13:58 +0300 Subject: [PATCH 06/18] can use read lock for `d.hasReceivedNewShardsWithLock()` --- src/dbnode/storage/database.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index d21416d30a..4c8c1ec0c1 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -452,9 +452,9 @@ func (d *db) Options() Options { } func (d *db) AssignShardSet(shardSet sharding.ShardSet) { - d.Lock() + d.RLock() receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) - d.Unlock() + d.RUnlock() if receivedNewShards { // We need to disable file ops so that warm/cold flush is not running during shards update. From 56177c327ef3407684877aee9bcd981828904f60 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Thu, 15 Apr 2021 00:43:26 +0300 Subject: [PATCH 07/18] Enqueue assignShardSet fn when received update from topology so that shards get assigned when file ops are not running. --- src/dbnode/storage/coldflush.go | 4 +- src/dbnode/storage/database.go | 45 ++++++++++------- src/dbnode/storage/database_test.go | 9 +++- src/dbnode/storage/fs.go | 3 ++ src/dbnode/storage/mediator.go | 75 +++++++++++++++++++++-------- src/dbnode/storage/namespace.go | 16 ++++-- src/dbnode/storage/storage_mock.go | 14 ++++++ src/dbnode/storage/types.go | 4 ++ 8 files changed, 127 insertions(+), 43 deletions(-) diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index 208b71a197..a976ee82f9 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -112,6 +112,8 @@ func (m *coldFlushManager) Run(t time.Time) bool { return true } + m.log.Info("starting cold flush") + // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. // NB(r): Use invariant here since flush errors were introduced // and not caught in CI or integration tests. @@ -129,7 +131,7 @@ func (m *coldFlushManager) Run(t time.Time) bool { l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err)) }) } - + m.log.Info("completed cold flush") return true } diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 4c8c1ec0c1..2e87bb0213 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -452,36 +452,49 @@ func (d *db) Options() Options { } func (d *db) AssignShardSet(shardSet sharding.ShardSet) { - d.RLock() - receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) - d.RUnlock() - - if receivedNewShards { - // We need to disable file ops so that warm/cold flush is not running during shards update. - // Bootstrap will enable file ops when it completes. - d.mediator.DisableFileOpsAndWait() + if err := d.mediator.EnqueueMutuallyExclusiveFn(func() { + d.assignShardSet(shardSet) + }); err != nil { + if errors.Is(err, errMediatorNotOpen) { + // initial assignment. + d.assignShardSet(shardSet) + } else { + // should not happen. + instrument.EmitAndLogInvariantViolation(d.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("failed to enqueue assignShardSet fn", + zap.Error(err), + zap.Uint32s("shards", shardSet.AllIDs())) + }) + } } +} +func (d *db) assignShardSet(shardSet sharding.ShardSet) { d.Lock() defer d.Unlock() + + d.log.Info("assigning shards", zap.Uint32s("shards", shardSet.AllIDs())) + + receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) d.shardSet = shardSet + if receivedNewShards { + d.lastReceivedNewShards = d.nowFn() + } + for _, elem := range d.namespaces.Iter() { ns := elem.Value() ns.AssignShardSet(shardSet) } if receivedNewShards { - d.lastReceivedNewShards = d.nowFn() // Only trigger a bootstrap if the node received new shards otherwise // the nodes will perform lots of small bootstraps (that accomplish nothing) // during topology changes as other nodes mark their shards as available. // // These small bootstraps can significantly delay topology changes as they prevent // the nodes from marking themselves as bootstrapped and durable, for example. - if !d.queueBootstrapWithLock() { - // Enable file ops if bootstrap was not queued. - d.mediator.EnableFileOps() - } + d.queueBootstrapWithLock() } } @@ -514,7 +527,7 @@ func (d *db) ShardSet() sharding.ShardSet { return shardSet } -func (d *db) queueBootstrapWithLock() bool { +func (d *db) queueBootstrapWithLock() { // Only perform a bootstrap if at least one bootstrap has already occurred. This enables // the ability to open the clustered database and assign shardsets to the non-clustered // database when it receives an initial topology (as well as topology changes) without @@ -522,8 +535,7 @@ func (d *db) queueBootstrapWithLock() bool { // call to Bootstrap(). After that initial bootstrap, the clustered database will keep // the non-clustered database bootstrapped by assigning it shardsets which will trigger new // bootstraps since d.bootstraps > 0 will be true. - enqueued := d.bootstraps > 0 - if enqueued { + if d.bootstraps > 0 { // NB(r): Trigger another bootstrap, if already bootstrapping this will // enqueue a new bootstrap to execute before the current bootstrap // completes. @@ -533,7 +545,6 @@ func (d *db) queueBootstrapWithLock() bool { } }() } - return enqueued } func (d *db) Namespace(id ident.ID) (Namespace, bool) { diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index b4624c639d..d8ad6b7533 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -436,6 +436,10 @@ func TestDatabaseAssignShardSetBehaviorNoNewShards(t *testing.T) { // Set a mock mediator to be certain that bootstrap is not called when // no new shards are assigned. mediator := NewMockdatabaseMediator(ctrl) + mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).DoAndReturn(func(fn func()) error { + fn() + return nil + }) d.mediator = mediator var ns []*MockdatabaseNamespace @@ -470,6 +474,10 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { ns := dbAddNewMockNamespace(ctrl, d, "testns") mediator := NewMockdatabaseMediator(ctrl) + mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).DoAndReturn(func(fn func()) error { + fn() + return nil + }) mediator.EXPECT().Bootstrap().Return(BootstrapResult{}, nil) d.mediator = mediator @@ -487,7 +495,6 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { mediator.EXPECT().Bootstrap().Return(BootstrapResult{}, nil).Do(func() { wg.Done() }) - mediator.EXPECT().DisableFileOpsAndWait() d.AssignShardSet(shardSet) diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index 7c458f82b3..715936e5e0 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -165,6 +165,8 @@ func (m *fileSystemManager) Run( return } + m.log.Info("starting warm flush") + // NB(r): Use invariant here since flush errors were introduced // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail @@ -181,6 +183,7 @@ func (m *fileSystemManager) Run( l.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) }) } + m.log.Info("completed warm flush") } if runType == syncRun { diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index efe70159e7..2137b0921e 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -39,7 +39,8 @@ type ( ) const ( - fileOpCheckInterval = time.Second + fileOpCheckInterval = time.Second + defaultExternalChannelSize = 8 mediatorNotOpen mediatorState = iota mediatorOpen @@ -85,6 +86,7 @@ type mediator struct { mediatorTimeBarrier mediatorTimeBarrier closedCh chan struct{} tickInterval time.Duration + fileOpsProcesses []func() backgroundProcesses []BackgroundProcess } @@ -97,20 +99,24 @@ func newMediator(database database, commitlog commitlog.CommitLog, opts Options) nowFn = opts.ClockOptions().NowFn() ) d := &mediator{ - database: database, - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - metrics: newMediatorMetrics(scope), - state: mediatorNotOpen, - mediatorTimeBarrier: newMediatorTimeBarrier(nowFn, iOpts), - closedCh: make(chan struct{}), - tickInterval: opts.MediatorTickInterval(), + database: database, + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + sleepFn: time.Sleep, + metrics: newMediatorMetrics(scope), + state: mediatorNotOpen, + closedCh: make(chan struct{}), + tickInterval: opts.MediatorTickInterval(), } - fsm := newFileSystemManager(database, commitlog, opts) d.databaseFileSystemManager = fsm + d.fileOpsProcesses = []func(){ + d.ongoingFileSystemProcesses, + d.ongoingColdFlushProcesses, + } + d.mediatorTimeBarrier = newMediatorTimeBarrier(nowFn, iOpts, len(d.fileOpsProcesses)) + // NB(bodu): Cold flush needs its own persist manager now // that its running in its own thread. fsOpts := opts.CommitLogOptions().FilesystemOptions() @@ -138,6 +144,18 @@ func (m *mediator) RegisterBackgroundProcess(process BackgroundProcess) error { return nil } +func (m *mediator) EnqueueMutuallyExclusiveFn(fn func()) error { + m.RLock() + if m.state != mediatorOpen { + m.RUnlock() + return errMediatorNotOpen + } + m.RUnlock() + + m.mediatorTimeBarrier.externalFnCh <- fn + return nil +} + func (m *mediator) Open() error { m.Lock() defer m.Unlock() @@ -147,8 +165,9 @@ func (m *mediator) Open() error { m.state = mediatorOpen go m.reportLoop() - go m.ongoingFileSystemProcesses() - go m.ongoingColdFlushProcesses() + for _, fileOpsProcess := range m.fileOpsProcesses { + go fileOpsProcess() + } go m.ongoingTick() for _, process := range m.backgroundProcesses { @@ -413,10 +432,12 @@ type mediatorTimeBarrier struct { // by the mutex. mediatorTime time.Time numFsProcessesWaiting int + numMaxWaiters int - nowFn func() time.Time - iOpts instrument.Options - releaseCh chan time.Time + nowFn func() time.Time + iOpts instrument.Options + releaseCh chan time.Time + externalFnCh chan func() } // initialMediatorTime should only be used to obtain the initial time for @@ -468,17 +489,29 @@ func (b *mediatorTimeBarrier) maybeRelease() (time.Time, error) { } b.mediatorTime = newMediatorTime + + // If all waiters are waiting, we can safely call mutually exclusive external functions. + if numWaiters == b.numMaxWaiters { + // Drain the channel. + for len(b.externalFnCh) > 0 { + externalFn := <-b.externalFnCh + externalFn() + } + } + for i := 0; i < numWaiters; i++ { b.releaseCh <- b.mediatorTime } return b.mediatorTime, nil } -func newMediatorTimeBarrier(nowFn func() time.Time, iOpts instrument.Options) mediatorTimeBarrier { +func newMediatorTimeBarrier(nowFn func() time.Time, iOpts instrument.Options, maxWaiters int) mediatorTimeBarrier { return mediatorTimeBarrier{ - mediatorTime: nowFn(), - nowFn: nowFn, - iOpts: iOpts, - releaseCh: make(chan time.Time, 0), + mediatorTime: nowFn(), + nowFn: nowFn, + iOpts: iOpts, + numMaxWaiters: maxWaiters, + releaseCh: make(chan time.Time, 0), + externalFnCh: make(chan func(), defaultExternalChannelSize), } } diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 70fb3829f6..e82f39b78d 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -491,9 +491,10 @@ func (n *dbNamespace) assignShardSet( opts assignShardSetOptions, ) { var ( - incoming = make(map[uint32]struct{}, len(shardSet.All())) - existing []databaseShard - closing []databaseShard + incoming = make(map[uint32]struct{}, len(shardSet.All())) + createdShardIds []uint32 + existing []databaseShard + closing []databaseShard ) for _, shard := range shardSet.AllIDs() { incoming[shard] = struct{}{} @@ -525,6 +526,7 @@ func (n *dbNamespace) assignShardSet( n.shards[shard] = newDatabaseShard(metadata, shard, n.blockRetriever, n.namespaceReaderMgr, n.increasingIndex, n.reverseIndex, opts.needsBootstrap, n.opts, n.seriesOpts) + createdShardIds = append(createdShardIds, shard) // NB(bodu): We only record shard add metrics for shards created in non // initial assignments. if !opts.initialAssignment { @@ -532,6 +534,14 @@ func (n *dbNamespace) assignShardSet( } } + if len(createdShardIds) > 0 { + n.log.Info("created new shards", + zap.Stringer("namespace", n.ID()), + zap.Uint32s("shards", createdShardIds), + zap.Bool("initialAssignment", opts.initialAssignment), + zap.Bool("needsBootstrap", opts.needsBootstrap)) + } + if idx := n.reverseIndex; idx != nil { idx.AssignShardSet(shardSet) } diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 5e1480da28..c51d2fbe80 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -3628,6 +3628,20 @@ func (mr *MockdatabaseMediatorMockRecorder) LastSuccessfulSnapshotStartTime() *g return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastSuccessfulSnapshotStartTime", reflect.TypeOf((*MockdatabaseMediator)(nil).LastSuccessfulSnapshotStartTime)) } +// EnqueueMutuallyExclusiveFn mocks base method +func (m *MockdatabaseMediator) EnqueueMutuallyExclusiveFn(fn func()) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "EnqueueMutuallyExclusiveFn", fn) + ret0, _ := ret[0].(error) + return ret0 +} + +// EnqueueMutuallyExclusiveFn indicates an expected call of EnqueueMutuallyExclusiveFn +func (mr *MockdatabaseMediatorMockRecorder) EnqueueMutuallyExclusiveFn(fn interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EnqueueMutuallyExclusiveFn", reflect.TypeOf((*MockdatabaseMediator)(nil).EnqueueMutuallyExclusiveFn), fn) +} + // MockColdFlushNsOpts is a mock of ColdFlushNsOpts interface type MockColdFlushNsOpts struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index b440953820..d67cafd745 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -988,6 +988,10 @@ type databaseMediator interface { // LastSuccessfulSnapshotStartTime returns the start time of the last // successful snapshot, if any. LastSuccessfulSnapshotStartTime() (xtime.UnixNano, bool) + + // EnqueueMutuallyExclusiveFn enqueues function to be executed mutually exclusively, + // when file operations are idle. + EnqueueMutuallyExclusiveFn(fn func()) error } // ColdFlushNsOpts are options for OnColdFlush.ColdFlushNamespace. From 006065eac6403395e05dffa9a9e196d2582bae70 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Thu, 15 Apr 2021 10:17:09 +0300 Subject: [PATCH 08/18] need to set `lastReceivedNewShards` when received new shards immediately so that `IsBootstrappedAndDurable()` won't return true when db was previously bootstrapped and new bootstrap is enqueued. --- src/dbnode/storage/database.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 2e87bb0213..9ff55f7ff9 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -452,6 +452,13 @@ func (d *db) Options() Options { } func (d *db) AssignShardSet(shardSet sharding.ShardSet) { + d.Lock() + receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) + if receivedNewShards { + d.lastReceivedNewShards = d.nowFn() + } + d.Unlock() + if err := d.mediator.EnqueueMutuallyExclusiveFn(func() { d.assignShardSet(shardSet) }); err != nil { @@ -478,9 +485,6 @@ func (d *db) assignShardSet(shardSet sharding.ShardSet) { receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) d.shardSet = shardSet - if receivedNewShards { - d.lastReceivedNewShards = d.nowFn() - } for _, elem := range d.namespaces.Iter() { ns := elem.Value() From 3dfeb14d2e8c28ef2ef25eae5cb7bc6f09e55946 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Thu, 15 Apr 2021 11:28:24 +0300 Subject: [PATCH 09/18] cleaned up some code. --- src/dbnode/storage/cleanup.go | 4 +- src/dbnode/storage/cleanup_test.go | 1 + src/dbnode/storage/coldflush.go | 5 --- src/dbnode/storage/fs.go | 65 ++++++++++-------------------- src/dbnode/storage/fs_test.go | 2 +- src/dbnode/storage/mediator.go | 13 +++--- src/dbnode/storage/storage_mock.go | 8 ++-- src/dbnode/storage/types.go | 21 +++++++--- 8 files changed, 50 insertions(+), 69 deletions(-) diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index eb1dbff4ee..f51897bcef 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -250,10 +250,8 @@ func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn fu for _, n := range namespaces { var activeShards []string namespaceDirPath := filesetFilesDirPathFn(filePathPrefix, n.ID()) + // NB(linasn) This should list ALL shards because it will delete dirs for the shards NOT LISTED below. for _, s := range n.OwnedShards() { - if !s.IsBootstrapped() { - continue - } shard := fmt.Sprintf("%d", s.ID()) activeShards = append(activeShards, shard) } diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index b3dec9b9a9..470e52ea82 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -410,6 +410,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { shard := NewMockdatabaseShard(ctrl) shardNotBootstrapped := NewMockdatabaseShard(ctrl) shardNotBootstrapped.EXPECT().IsBootstrapped().Return(false).AnyTimes() + shardNotBootstrapped.EXPECT().ID().Return(uint32(1)).AnyTimes() expectedEarliestToRetain := retention.FlushTimeStart(ns.Options().RetentionOptions(), ts) shard.EXPECT().IsBootstrapped().Return(true).AnyTimes() shard.EXPECT().CleanupExpiredFileSets(expectedEarliestToRetain).Return(nil) diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index a976ee82f9..da5d8ecb2c 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -107,11 +107,6 @@ func (m *coldFlushManager) Run(t time.Time) bool { m.Unlock() }() - if !m.database.IsBootstrapped() { - m.log.Debug("database is still bootstrapping, terminating cold flush") - return true - } - m.log.Info("starting cold flush") // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index 715936e5e0..171e90376c 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -69,13 +69,6 @@ type fileOpState struct { NumFailures int } -type runType int - -const ( - syncRun runType = iota - asyncRun -) - type forceType int const ( @@ -139,58 +132,42 @@ func (m *fileSystemManager) Status() fileOpStatus { return status } -func (m *fileSystemManager) Run( - t time.Time, - runType runType, - forceType forceType, -) bool { +func (m *fileSystemManager) Run(t time.Time) bool { m.Lock() - if forceType == noForce && !m.shouldRunWithLock() { + if !m.shouldRunWithLock() { m.Unlock() return false } m.status = fileOpInProgress m.Unlock() - // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. - flushFn := func() { - defer func() { - m.Lock() - m.status = fileOpNotStarted - m.Unlock() - }() - - if !m.database.IsBootstrapped() { - m.log.Debug("database is still bootstrapping, terminating warm flush") - return - } + defer func() { + m.Lock() + m.status = fileOpNotStarted + m.Unlock() + }() - m.log.Info("starting warm flush") + m.log.Info("starting warm flush") + // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. + if err := m.WarmFlushCleanup(t); err != nil { // NB(r): Use invariant here since flush errors were introduced // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.WarmFlushCleanup(t); err != nil { - instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) - }) - } - if err := m.Flush(t); err != nil { - instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) - }) - } - m.log.Info("completed warm flush") + instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) + }) } - - if runType == syncRun { - flushFn() - } else { - go flushFn() + if err := m.Flush(t); err != nil { + instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) + }) } + m.log.Info("completed warm flush") + return true } diff --git a/src/dbnode/storage/fs_test.go b/src/dbnode/storage/fs_test.go index 45310708b2..cfb45e427d 100644 --- a/src/dbnode/storage/fs_test.go +++ b/src/dbnode/storage/fs_test.go @@ -91,5 +91,5 @@ func TestFileSystemManagerRun(t *testing.T) { ) defer instrument.SetShouldPanicEnvironmentVariable(true)() - require.Panics(t, func() { mgr.Run(ts, syncRun, noForce) }) + require.Panics(t, func() { mgr.Run(ts) }) } diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index 2137b0921e..f43ede129f 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -86,7 +86,7 @@ type mediator struct { mediatorTimeBarrier mediatorTimeBarrier closedCh chan struct{} tickInterval time.Duration - fileOpsProcesses []func() + fileOpsProcesses []FileOpsProcess backgroundProcesses []BackgroundProcess } @@ -110,10 +110,9 @@ func newMediator(database database, commitlog commitlog.CommitLog, opts Options) } fsm := newFileSystemManager(database, commitlog, opts) d.databaseFileSystemManager = fsm - - d.fileOpsProcesses = []func(){ - d.ongoingFileSystemProcesses, - d.ongoingColdFlushProcesses, + d.fileOpsProcesses = []FileOpsProcess{ + FileOpsProcessFn(d.ongoingFileSystemProcesses), + FileOpsProcessFn(d.ongoingColdFlushProcesses), } d.mediatorTimeBarrier = newMediatorTimeBarrier(nowFn, iOpts, len(d.fileOpsProcesses)) @@ -166,7 +165,7 @@ func (m *mediator) Open() error { go m.reportLoop() for _, fileOpsProcess := range m.fileOpsProcesses { - go fileOpsProcess() + go fileOpsProcess.Start() } go m.ongoingTick() @@ -324,7 +323,7 @@ func (m *mediator) runFileSystemProcesses() { return } - m.databaseFileSystemManager.Run(mediatorTime, syncRun, noForce) + m.databaseFileSystemManager.Run(mediatorTime) } func (m *mediator) runColdFlushProcesses() { diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index c51d2fbe80..9076cb6549 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -3074,17 +3074,17 @@ func (mr *MockdatabaseFileSystemManagerMockRecorder) Status() *gomock.Call { } // Run mocks base method -func (m *MockdatabaseFileSystemManager) Run(t time.Time, runType runType, forceType forceType) bool { +func (m *MockdatabaseFileSystemManager) Run(t time.Time) bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Run", t, runType, forceType) + ret := m.ctrl.Call(m, "Run", t) ret0, _ := ret[0].(bool) return ret0 } // Run indicates an expected call of Run -func (mr *MockdatabaseFileSystemManagerMockRecorder) Run(t, runType, forceType interface{}) *gomock.Call { +func (mr *MockdatabaseFileSystemManagerMockRecorder) Run(t interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockdatabaseFileSystemManager)(nil).Run), t, runType, forceType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockdatabaseFileSystemManager)(nil).Run), t) } // Report mocks base method diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index d67cafd745..373816fd0b 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -876,11 +876,7 @@ type databaseFileSystemManager interface { // Run attempts to perform all filesystem-related operations, // returning true if those operations are performed, and false otherwise. - Run( - t time.Time, - runType runType, - forceType forceType, - ) bool + Run(t time.Time) bool // Report reports runtime information. Report() @@ -936,6 +932,21 @@ type BackgroundProcess interface { Report() } +// FileOpsProcess is a background process that is run by the database. +type FileOpsProcess interface { + // Start launches the FileOpsProcess to run asynchronously. + Start() +} + +// FileOpsProcessFn is a file ops process function. +type FileOpsProcessFn func() + +// Start starts file ops process function. +func (f FileOpsProcessFn) Start() { + // delegate to the anonymous function. + f() +} + // databaseRepairer repairs in-memory database data. type databaseRepairer interface { BackgroundProcess From 4bc496ae21786ed4a309abbdf39026889302440f Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Thu, 15 Apr 2021 13:58:18 +0300 Subject: [PATCH 10/18] ensure that bootstrap is started when new shards are assigned. --- src/dbnode/storage/bootstrap.go | 5 ++- src/dbnode/storage/bootstrap_test.go | 25 +++++++++++--- src/dbnode/storage/database.go | 9 +++-- src/dbnode/storage/database_test.go | 9 +++-- src/dbnode/storage/storage_mock.go | 51 +++++++++++++++++++++++----- src/dbnode/storage/types.go | 4 +-- 6 files changed, 83 insertions(+), 20 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index c616e8cc01..1b7e15c0d4 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -116,7 +116,7 @@ func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) return bsTime, bsTime > 0 } -func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { +func (m *bootstrapManager) Bootstrap(wgBootstrapStarted *sync.WaitGroup) (BootstrapResult, error) { m.Lock() switch m.state { case Bootstrapping: @@ -128,6 +128,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // reshard occurs and we need to bootstrap more shards. m.hasPending = true m.Unlock() + wgBootstrapStarted.Done() return BootstrapResult{AlreadyBootstrapping: true}, errBootstrapEnqueued default: m.state = Bootstrapping @@ -138,6 +139,8 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { m.mediator.DisableFileOpsAndWait() defer m.mediator.EnableFileOps() + wgBootstrapStarted.Done() + // Keep performing bootstraps until none pending and no error returned. var result BootstrapResult for i := 0; true; i++ { diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 154ed70520..4a775d29aa 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -88,8 +88,11 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { require.Equal(t, BootstrapNotStarted, bsm.state) - result, err := bsm.Bootstrap() + var wg sync.WaitGroup + wg.Add(1) + result, err := bsm.Bootstrap(&wg) + wg.Wait() require.NoError(t, err) require.Equal(t, Bootstrapped, bsm.state) require.Equal(t, 1, len(result.ErrorsBootstrap)) @@ -130,7 +133,10 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) { defer wg.Done() // Enqueue the second bootstrap - _, err := bsm.Bootstrap() + var wgBs sync.WaitGroup + wgBs.Add(1) + _, err := bsm.Bootstrap(&wgBs) + wgBs.Wait() assert.Error(t, err) assert.Equal(t, errBootstrapEnqueued, err) assert.False(t, bsm.IsBootstrapped()) @@ -150,7 +156,10 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) { Return([]databaseNamespace{ns}, nil). Times(2) - _, err = bsm.Bootstrap() + var wgBs sync.WaitGroup + wgBs.Add(1) + _, err = bsm.Bootstrap(&wgBs) + wgBs.Wait() require.Nil(t, err) } @@ -203,7 +212,10 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) { defer wg.Done() // Enqueue the second bootstrap - _, err := bsm.Bootstrap() + var wgBs sync.WaitGroup + wgBs.Add(1) + _, err := bsm.Bootstrap(&wgBs) + wgBs.Wait() assert.Error(t, err) assert.Equal(t, errBootstrapEnqueued, err) assert.False(t, bsm.IsBootstrapped()) @@ -225,6 +237,9 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) { Return(namespaces, nil). Times(2) - _, err := bsm.Bootstrap() + var wg sync.WaitGroup + wg.Add(1) + _, err := bsm.Bootstrap(&wg) + wg.Wait() require.Nil(t, err) } diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 9ff55f7ff9..2a0c69632d 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -539,16 +539,19 @@ func (d *db) queueBootstrapWithLock() { // call to Bootstrap(). After that initial bootstrap, the clustered database will keep // the non-clustered database bootstrapped by assigning it shardsets which will trigger new // bootstraps since d.bootstraps > 0 will be true. + var wg sync.WaitGroup if d.bootstraps > 0 { + wg.Add(1) // NB(r): Trigger another bootstrap, if already bootstrapping this will // enqueue a new bootstrap to execute before the current bootstrap // completes. go func() { - if result, err := d.mediator.Bootstrap(); err != nil && !result.AlreadyBootstrapping { + if result, err := d.mediator.Bootstrap(&wg); err != nil && !result.AlreadyBootstrapping { d.log.Error("error bootstrapping", zap.Error(err)) } }() } + wg.Wait() } func (d *db) Namespace(id ident.ID) (Namespace, bool) { @@ -1130,7 +1133,9 @@ func (d *db) Bootstrap() error { d.Lock() d.bootstraps++ d.Unlock() - _, err := d.mediator.Bootstrap() + var wg sync.WaitGroup + wg.Add(1) + _, err := d.mediator.Bootstrap(&wg) return err } diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index d8ad6b7533..33fb19ea2e 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -478,7 +478,10 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { fn() return nil }) - mediator.EXPECT().Bootstrap().Return(BootstrapResult{}, nil) + mediator.EXPECT().Bootstrap(gomock.Any()).DoAndReturn(func(wg *sync.WaitGroup) (BootstrapResult, error) { + wg.Done() + return BootstrapResult{}, nil + }) d.mediator = mediator assert.NoError(t, d.Bootstrap()) @@ -492,8 +495,10 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - mediator.EXPECT().Bootstrap().Return(BootstrapResult{}, nil).Do(func() { + mediator.EXPECT().Bootstrap(gomock.Any()).DoAndReturn(func(wgBs *sync.WaitGroup) (BootstrapResult, error) { + wgBs.Done() wg.Done() + return BootstrapResult{}, nil }) d.AssignShardSet(shardSet) diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 9076cb6549..b9b8399fbb 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2841,18 +2841,18 @@ func (mr *MockdatabaseBootstrapManagerMockRecorder) LastBootstrapCompletionTime( } // Bootstrap mocks base method -func (m *MockdatabaseBootstrapManager) Bootstrap() (BootstrapResult, error) { +func (m *MockdatabaseBootstrapManager) Bootstrap(wgBootstrapStarted *sync.WaitGroup) (BootstrapResult, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Bootstrap") + ret := m.ctrl.Call(m, "Bootstrap", wgBootstrapStarted) ret0, _ := ret[0].(BootstrapResult) ret1, _ := ret[1].(error) return ret0, ret1 } // Bootstrap indicates an expected call of Bootstrap -func (mr *MockdatabaseBootstrapManagerMockRecorder) Bootstrap() *gomock.Call { +func (mr *MockdatabaseBootstrapManagerMockRecorder) Bootstrap(wgBootstrapStarted interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseBootstrapManager)(nil).Bootstrap)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseBootstrapManager)(nil).Bootstrap), wgBootstrapStarted) } // Report mocks base method @@ -3344,6 +3344,41 @@ func (mr *MockBackgroundProcessMockRecorder) Report() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Report", reflect.TypeOf((*MockBackgroundProcess)(nil).Report)) } +// MockFileOpsProcess is a mock of FileOpsProcess interface +type MockFileOpsProcess struct { + ctrl *gomock.Controller + recorder *MockFileOpsProcessMockRecorder +} + +// MockFileOpsProcessMockRecorder is the mock recorder for MockFileOpsProcess +type MockFileOpsProcessMockRecorder struct { + mock *MockFileOpsProcess +} + +// NewMockFileOpsProcess creates a new mock instance +func NewMockFileOpsProcess(ctrl *gomock.Controller) *MockFileOpsProcess { + mock := &MockFileOpsProcess{ctrl: ctrl} + mock.recorder = &MockFileOpsProcessMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockFileOpsProcess) EXPECT() *MockFileOpsProcessMockRecorder { + return m.recorder +} + +// Start mocks base method +func (m *MockFileOpsProcess) Start() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start +func (mr *MockFileOpsProcessMockRecorder) Start() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockFileOpsProcess)(nil).Start)) +} + // MockdatabaseRepairer is a mock of databaseRepairer interface type MockdatabaseRepairer struct { ctrl *gomock.Controller @@ -3535,18 +3570,18 @@ func (mr *MockdatabaseMediatorMockRecorder) LastBootstrapCompletionTime() *gomoc } // Bootstrap mocks base method -func (m *MockdatabaseMediator) Bootstrap() (BootstrapResult, error) { +func (m *MockdatabaseMediator) Bootstrap(wgBootstrapStarted *sync.WaitGroup) (BootstrapResult, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Bootstrap") + ret := m.ctrl.Call(m, "Bootstrap", wgBootstrapStarted) ret0, _ := ret[0].(BootstrapResult) ret1, _ := ret[1].(error) return ret0, ret1 } // Bootstrap indicates an expected call of Bootstrap -func (mr *MockdatabaseMediatorMockRecorder) Bootstrap() *gomock.Call { +func (mr *MockdatabaseMediatorMockRecorder) Bootstrap(wgBootstrapStarted interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseMediator)(nil).Bootstrap)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseMediator)(nil).Bootstrap), wgBootstrapStarted) } // DisableFileOpsAndWait mocks base method diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 373816fd0b..95ebcd2c73 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -820,7 +820,7 @@ type databaseBootstrapManager interface { LastBootstrapCompletionTime() (xtime.UnixNano, bool) // Bootstrap performs bootstrapping for all namespaces and shards owned. - Bootstrap() (BootstrapResult, error) + Bootstrap(wgBootstrapStarted *sync.WaitGroup) (BootstrapResult, error) // Report reports runtime information. Report() @@ -979,7 +979,7 @@ type databaseMediator interface { LastBootstrapCompletionTime() (xtime.UnixNano, bool) // Bootstrap bootstraps the database with file operations performed at the end. - Bootstrap() (BootstrapResult, error) + Bootstrap(wgBootstrapStarted *sync.WaitGroup) (BootstrapResult, error) // DisableFileOpsAndWait disables file operations. DisableFileOpsAndWait() From 6befcdf4c6445839c7d00be2191ca1befab28671 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Thu, 15 Apr 2021 18:44:56 +0300 Subject: [PATCH 11/18] added BootstrapEnqueue(). --- src/dbnode/storage/bootstrap.go | 32 ++++++++++++++++++-- src/dbnode/storage/bootstrap_test.go | 42 +++++++++++++------------- src/dbnode/storage/database.go | 19 ++++-------- src/dbnode/storage/database_test.go | 10 +++---- src/dbnode/storage/mediator.go | 2 +- src/dbnode/storage/storage_mock.go | 44 +++++++++++++++++++++++----- src/dbnode/storage/types.go | 41 ++++++++++++++++++++++++-- 7 files changed, 136 insertions(+), 54 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index 1b7e15c0d4..831621ed80 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -26,10 +26,13 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" ) @@ -116,7 +119,25 @@ func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) return bsTime, bsTime > 0 } -func (m *bootstrapManager) Bootstrap(wgBootstrapStarted *sync.WaitGroup) (BootstrapResult, error) { +func (m *bootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult { + bootstrapAsyncResult := newBootstrapAsyncResult() + go func(r *BootstrapAsyncResult) { + if result, err := m.startBootstrap(r); err != nil && !result.AlreadyBootstrapping { + instrument.EmitAndLogInvariantViolation(m.instrumentation.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error bootstrapping", zap.Error(err)) + }) + } + }(bootstrapAsyncResult) + return bootstrapAsyncResult +} + +func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { + bootstrapAsyncResult := newBootstrapAsyncResult() + return m.startBootstrap(bootstrapAsyncResult) +} + +func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (BootstrapResult, error) { m.Lock() switch m.state { case Bootstrapping: @@ -128,7 +149,8 @@ func (m *bootstrapManager) Bootstrap(wgBootstrapStarted *sync.WaitGroup) (Bootst // reshard occurs and we need to bootstrap more shards. m.hasPending = true m.Unlock() - wgBootstrapStarted.Done() + asyncResult.bootstrapStarted.Done() + asyncResult.bootstrapCompleted.Done() return BootstrapResult{AlreadyBootstrapping: true}, errBootstrapEnqueued default: m.state = Bootstrapping @@ -139,7 +161,7 @@ func (m *bootstrapManager) Bootstrap(wgBootstrapStarted *sync.WaitGroup) (Bootst m.mediator.DisableFileOpsAndWait() defer m.mediator.EnableFileOps() - wgBootstrapStarted.Done() + asyncResult.bootstrapStarted.Done() // Keep performing bootstraps until none pending and no error returned. var result BootstrapResult @@ -188,6 +210,10 @@ func (m *bootstrapManager) Bootstrap(wgBootstrapStarted *sync.WaitGroup) (Bootst m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn()) m.state = Bootstrapped m.Unlock() + asyncResult.bootstrapResultFn = func() BootstrapResult { + return result + } + asyncResult.bootstrapCompleted.Done() return result, nil } diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 4a775d29aa..526961f92d 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -39,6 +39,14 @@ import ( ) func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { + testDatabaseBootstrapWithBootstrapError(t, false) +} + +func TestDatabaseBootstrapEnqueueWithBootstrapError(t *testing.T) { + testDatabaseBootstrapWithBootstrapError(t, true) +} + +func testDatabaseBootstrapWithBootstrapError(t *testing.T, async bool) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() @@ -88,12 +96,16 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { require.Equal(t, BootstrapNotStarted, bsm.state) - var wg sync.WaitGroup - wg.Add(1) - result, err := bsm.Bootstrap(&wg) + var result BootstrapResult + if async { + asyncResult := bsm.BootstrapEnqueue() + asyncResult.WaitForStart() + result = asyncResult.Result() + } else { + result, err = bsm.Bootstrap() + require.NoError(t, err) + } - wg.Wait() - require.NoError(t, err) require.Equal(t, Bootstrapped, bsm.state) require.Equal(t, 1, len(result.ErrorsBootstrap)) require.Equal(t, "an error", result.ErrorsBootstrap[0].Error()) @@ -133,10 +145,7 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) { defer wg.Done() // Enqueue the second bootstrap - var wgBs sync.WaitGroup - wgBs.Add(1) - _, err := bsm.Bootstrap(&wgBs) - wgBs.Wait() + _, err := bsm.Bootstrap() assert.Error(t, err) assert.Equal(t, errBootstrapEnqueued, err) assert.False(t, bsm.IsBootstrapped()) @@ -156,10 +165,7 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) { Return([]databaseNamespace{ns}, nil). Times(2) - var wgBs sync.WaitGroup - wgBs.Add(1) - _, err = bsm.Bootstrap(&wgBs) - wgBs.Wait() + _, err = bsm.Bootstrap() require.Nil(t, err) } @@ -212,10 +218,7 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) { defer wg.Done() // Enqueue the second bootstrap - var wgBs sync.WaitGroup - wgBs.Add(1) - _, err := bsm.Bootstrap(&wgBs) - wgBs.Wait() + _, err := bsm.Bootstrap() assert.Error(t, err) assert.Equal(t, errBootstrapEnqueued, err) assert.False(t, bsm.IsBootstrapped()) @@ -237,9 +240,6 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) { Return(namespaces, nil). Times(2) - var wg sync.WaitGroup - wg.Add(1) - _, err := bsm.Bootstrap(&wg) - wg.Wait() + _, err := bsm.Bootstrap() require.Nil(t, err) } diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 2a0c69632d..5eec7161bd 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -539,19 +539,12 @@ func (d *db) queueBootstrapWithLock() { // call to Bootstrap(). After that initial bootstrap, the clustered database will keep // the non-clustered database bootstrapped by assigning it shardsets which will trigger new // bootstraps since d.bootstraps > 0 will be true. - var wg sync.WaitGroup if d.bootstraps > 0 { - wg.Add(1) - // NB(r): Trigger another bootstrap, if already bootstrapping this will - // enqueue a new bootstrap to execute before the current bootstrap - // completes. - go func() { - if result, err := d.mediator.Bootstrap(&wg); err != nil && !result.AlreadyBootstrapping { - d.log.Error("error bootstrapping", zap.Error(err)) - } - }() + bootstrapAsyncResult := d.mediator.BootstrapEnqueue() + // NB(linasn): We need to wait for the bootstrap to start and set it's state to Bootstrapping in order + // to safely run fileOps in mediator later. + bootstrapAsyncResult.WaitForStart() } - wg.Wait() } func (d *db) Namespace(id ident.ID) (Namespace, bool) { @@ -1133,9 +1126,7 @@ func (d *db) Bootstrap() error { d.Lock() d.bootstraps++ d.Unlock() - var wg sync.WaitGroup - wg.Add(1) - _, err := d.mediator.Bootstrap(&wg) + _, err := d.mediator.Bootstrap() return err } diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 33fb19ea2e..858797846b 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -478,8 +478,7 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { fn() return nil }) - mediator.EXPECT().Bootstrap(gomock.Any()).DoAndReturn(func(wg *sync.WaitGroup) (BootstrapResult, error) { - wg.Done() + mediator.EXPECT().Bootstrap().DoAndReturn(func() (BootstrapResult, error) { return BootstrapResult{}, nil }) d.mediator = mediator @@ -495,10 +494,11 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - mediator.EXPECT().Bootstrap(gomock.Any()).DoAndReturn(func(wgBs *sync.WaitGroup) (BootstrapResult, error) { - wgBs.Done() + mediator.EXPECT().BootstrapEnqueue().DoAndReturn(func() *BootstrapAsyncResult { + asyncResult := newBootstrapAsyncResult() + asyncResult.bootstrapStarted = &wg wg.Done() - return BootstrapResult{}, nil + return asyncResult }) d.AssignShardSet(shardSet) diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index f43ede129f..bdb2570ba6 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -510,7 +510,7 @@ func newMediatorTimeBarrier(nowFn func() time.Time, iOpts instrument.Options, ma nowFn: nowFn, iOpts: iOpts, numMaxWaiters: maxWaiters, - releaseCh: make(chan time.Time, 0), + releaseCh: make(chan time.Time), externalFnCh: make(chan func(), defaultExternalChannelSize), } } diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index b9b8399fbb..c28f8507d4 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2841,18 +2841,32 @@ func (mr *MockdatabaseBootstrapManagerMockRecorder) LastBootstrapCompletionTime( } // Bootstrap mocks base method -func (m *MockdatabaseBootstrapManager) Bootstrap(wgBootstrapStarted *sync.WaitGroup) (BootstrapResult, error) { +func (m *MockdatabaseBootstrapManager) Bootstrap() (BootstrapResult, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Bootstrap", wgBootstrapStarted) + ret := m.ctrl.Call(m, "Bootstrap") ret0, _ := ret[0].(BootstrapResult) ret1, _ := ret[1].(error) return ret0, ret1 } // Bootstrap indicates an expected call of Bootstrap -func (mr *MockdatabaseBootstrapManagerMockRecorder) Bootstrap(wgBootstrapStarted interface{}) *gomock.Call { +func (mr *MockdatabaseBootstrapManagerMockRecorder) Bootstrap() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseBootstrapManager)(nil).Bootstrap)) +} + +// BootstrapEnqueue mocks base method +func (m *MockdatabaseBootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BootstrapEnqueue") + ret0, _ := ret[0].(*BootstrapAsyncResult) + return ret0 +} + +// BootstrapEnqueue indicates an expected call of BootstrapEnqueue +func (mr *MockdatabaseBootstrapManagerMockRecorder) BootstrapEnqueue() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseBootstrapManager)(nil).Bootstrap), wgBootstrapStarted) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapEnqueue", reflect.TypeOf((*MockdatabaseBootstrapManager)(nil).BootstrapEnqueue)) } // Report mocks base method @@ -3570,18 +3584,32 @@ func (mr *MockdatabaseMediatorMockRecorder) LastBootstrapCompletionTime() *gomoc } // Bootstrap mocks base method -func (m *MockdatabaseMediator) Bootstrap(wgBootstrapStarted *sync.WaitGroup) (BootstrapResult, error) { +func (m *MockdatabaseMediator) Bootstrap() (BootstrapResult, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Bootstrap", wgBootstrapStarted) + ret := m.ctrl.Call(m, "Bootstrap") ret0, _ := ret[0].(BootstrapResult) ret1, _ := ret[1].(error) return ret0, ret1 } // Bootstrap indicates an expected call of Bootstrap -func (mr *MockdatabaseMediatorMockRecorder) Bootstrap(wgBootstrapStarted interface{}) *gomock.Call { +func (mr *MockdatabaseMediatorMockRecorder) Bootstrap() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseMediator)(nil).Bootstrap)) +} + +// BootstrapEnqueue mocks base method +func (m *MockdatabaseMediator) BootstrapEnqueue() *BootstrapAsyncResult { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BootstrapEnqueue") + ret0, _ := ret[0].(*BootstrapAsyncResult) + return ret0 +} + +// BootstrapEnqueue indicates an expected call of BootstrapEnqueue +func (mr *MockdatabaseMediatorMockRecorder) BootstrapEnqueue() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseMediator)(nil).Bootstrap), wgBootstrapStarted) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapEnqueue", reflect.TypeOf((*MockdatabaseMediator)(nil).BootstrapEnqueue)) } // DisableFileOpsAndWait mocks base method diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 95ebcd2c73..f48ee809fe 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -820,7 +820,10 @@ type databaseBootstrapManager interface { LastBootstrapCompletionTime() (xtime.UnixNano, bool) // Bootstrap performs bootstrapping for all namespaces and shards owned. - Bootstrap(wgBootstrapStarted *sync.WaitGroup) (BootstrapResult, error) + Bootstrap() (BootstrapResult, error) + + // BootstrapEnqueue performs bootstrapping asynchronously for all namespaces and shards owned. + BootstrapEnqueue() *BootstrapAsyncResult // Report reports runtime information. Report() @@ -832,6 +835,37 @@ type BootstrapResult struct { AlreadyBootstrapping bool } +// BootstrapAsyncResult is a bootstrap async result. +type BootstrapAsyncResult struct { + bootstrapStarted *sync.WaitGroup + bootstrapCompleted *sync.WaitGroup + bootstrapResultFn func() BootstrapResult +} + +func newBootstrapAsyncResult() *BootstrapAsyncResult { + var ( + wgStarted sync.WaitGroup + wgCompleted sync.WaitGroup + ) + wgStarted.Add(1) + wgCompleted.Add(1) + return &BootstrapAsyncResult{ + bootstrapStarted: &wgStarted, + bootstrapCompleted: &wgCompleted, + } +} + +// Result will wait for bootstrap to complete and return BootstrapResult. +func (b *BootstrapAsyncResult) Result() BootstrapResult { + b.bootstrapCompleted.Wait() + return b.bootstrapResultFn() +} + +// WaitForStart waits until bootstrap has been started. +func (b *BootstrapAsyncResult) WaitForStart() { + b.bootstrapStarted.Wait() +} + // databaseFlushManager manages flushing in-memory data to persistent storage. type databaseFlushManager interface { // Flush flushes in-memory data to persistent storage. @@ -979,7 +1013,10 @@ type databaseMediator interface { LastBootstrapCompletionTime() (xtime.UnixNano, bool) // Bootstrap bootstraps the database with file operations performed at the end. - Bootstrap(wgBootstrapStarted *sync.WaitGroup) (BootstrapResult, error) + Bootstrap() (BootstrapResult, error) + + // BootstrapEnqueue bootstraps the database asynchronously with file operations performed at the end. + BootstrapEnqueue() *BootstrapAsyncResult // DisableFileOpsAndWait disables file operations. DisableFileOpsAndWait() From a8d3765b9f2ecad6c123ffcd95516c6d07346cb9 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 16 Apr 2021 10:37:16 +0300 Subject: [PATCH 12/18] updated logging levels. --- src/dbnode/storage/coldflush.go | 4 ++-- src/dbnode/storage/fs.go | 4 ++-- src/dbnode/storage/namespace.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index da5d8ecb2c..b254ce26fe 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -107,7 +107,7 @@ func (m *coldFlushManager) Run(t time.Time) bool { m.Unlock() }() - m.log.Info("starting cold flush") + m.log.Debug("starting cold flush") // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. // NB(r): Use invariant here since flush errors were introduced @@ -126,7 +126,7 @@ func (m *coldFlushManager) Run(t time.Time) bool { l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err)) }) } - m.log.Info("completed cold flush") + m.log.Debug("completed cold flush") return true } diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index 171e90376c..cd4d691901 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -147,7 +147,7 @@ func (m *fileSystemManager) Run(t time.Time) bool { m.Unlock() }() - m.log.Info("starting warm flush") + m.log.Debug("starting warm flush") // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. if err := m.WarmFlushCleanup(t); err != nil { @@ -166,7 +166,7 @@ func (m *fileSystemManager) Run(t time.Time) bool { l.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) }) } - m.log.Info("completed warm flush") + m.log.Debug("completed warm flush") return true } diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index e82f39b78d..ac05a4acb0 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1055,7 +1055,7 @@ func (n *dbNamespace) Bootstrap( } if !bootstrapped { // NB(r): Not bootstrapped in this bootstrap run. - n.log.Warn("skipping not bootstrapped shard", + n.log.Debug("skipping already bootstrapped shard", zap.Uint32("shard", shardID), zap.Stringer("namespace", n.id)) continue From 856939d9f8135ee0acd383ced63316abebda124d Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 16 Apr 2021 12:51:29 +0300 Subject: [PATCH 13/18] more test coverage. --- src/dbnode/storage/bootstrap.go | 8 +-- .../storage/bootstrap_instrumentation.go | 6 +++ src/dbnode/storage/coldflush.go | 4 +- src/dbnode/storage/coldflush_test.go | 53 +++++++++++++++++++ src/dbnode/storage/database_test.go | 24 +++++++++ src/dbnode/storage/fs.go | 4 +- src/dbnode/storage/fs_test.go | 28 +++++++++- src/dbnode/storage/mediator_test.go | 50 +++++++++++++++++ 8 files changed, 164 insertions(+), 13 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index 831621ed80..e31ff5e3c4 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -26,13 +26,10 @@ import ( "sync" "time" - "go.uber.org/zap" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" ) @@ -123,10 +120,7 @@ func (m *bootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult { bootstrapAsyncResult := newBootstrapAsyncResult() go func(r *BootstrapAsyncResult) { if result, err := m.startBootstrap(r); err != nil && !result.AlreadyBootstrapping { - instrument.EmitAndLogInvariantViolation(m.instrumentation.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("error bootstrapping", zap.Error(err)) - }) + m.instrumentation.emitAndLogInvariantViolation(err, "error bootstrapping") } }(bootstrapAsyncResult) return bootstrapAsyncResult diff --git a/src/dbnode/storage/bootstrap_instrumentation.go b/src/dbnode/storage/bootstrap_instrumentation.go index 4a5f2421a7..0fbd90a60b 100644 --- a/src/dbnode/storage/bootstrap_instrumentation.go +++ b/src/dbnode/storage/bootstrap_instrumentation.go @@ -191,3 +191,9 @@ func (i *bootstrapInstrumentation) setIsBootstrappedAndDurable(isBootstrappedAnd } i.durableStatus.Update(status) } + +func (i *bootstrapInstrumentation) emitAndLogInvariantViolation(err error, msg string) { + instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), func(l *zap.Logger) { + l.Error(msg, zap.Error(err)) + }) +} diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index b254ce26fe..a505c28748 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -107,7 +107,7 @@ func (m *coldFlushManager) Run(t time.Time) bool { m.Unlock() }() - m.log.Debug("starting cold flush") + m.log.Debug("starting cold flush", zap.Time("time", t)) // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. // NB(r): Use invariant here since flush errors were introduced @@ -126,7 +126,7 @@ func (m *coldFlushManager) Run(t time.Time) bool { l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err)) }) } - m.log.Debug("completed cold flush") + m.log.Debug("completed cold flush", zap.Time("time", t)) return true } diff --git a/src/dbnode/storage/coldflush_test.go b/src/dbnode/storage/coldflush_test.go index 55eb3d355c..a22bd05c34 100644 --- a/src/dbnode/storage/coldflush_test.go +++ b/src/dbnode/storage/coldflush_test.go @@ -28,6 +28,7 @@ import ( "github.com/golang/mock/gomock" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/x/instrument" "github.com/stretchr/testify/require" ) @@ -116,3 +117,55 @@ func TestColdFlushManagerFlushDoneFlushError(t *testing.T) { require.EqualError(t, fakeErr, cfm.coldFlush().Error()) } + +func TestColdFlushManagerSkipRun(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + database := newMockdatabase(ctrl) + cfm := newColdFlushManager(database, nil, DefaultTestOptions()) + + database.EXPECT().IsBootstrapped().Return(false) + require.False(t, cfm.Run(time.Now())) +} + +func TestColdFlushManagerRunCleanupPanic(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + database := newMockdatabase(ctrl) + database.EXPECT().IsBootstrapped().Return(true) + + cm := NewMockdatabaseCleanupManager(ctrl) + cfm := newColdFlushManager(database, nil, DefaultTestOptions()) + mgr := cfm.(*coldFlushManager) + mgr.databaseCleanupManager = cm + + ts := time.Now() + gomock.InOrder( + cm.EXPECT().ColdFlushCleanup(ts).Return(errors.New("cleanup error")), + ) + + defer instrument.SetShouldPanicEnvironmentVariable(true)() + require.Panics(t, func() { mgr.Run(ts) }) +} + +func TestColdFlushManagerRunFlushPanic(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + database := newMockdatabase(ctrl) + database.EXPECT().IsBootstrapped().Return(true) + database.EXPECT().OwnedNamespaces().Return(nil, errors.New("flush error")) + + cm := NewMockdatabaseCleanupManager(ctrl) + cfm := newColdFlushManager(database, nil, DefaultTestOptions()) + mgr := cfm.(*coldFlushManager) + mgr.databaseCleanupManager = cm + + ts := time.Now() + gomock.InOrder( + cm.EXPECT().ColdFlushCleanup(ts).Return(nil), + ) + + defer instrument.SetShouldPanicEnvironmentVariable(true)() + require.Panics(t, func() { mgr.Run(ts) }) +} diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 858797846b..71ec7d218c 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -506,6 +506,30 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { wg.Wait() } +func TestDatabaseAssignShardSetShouldPanic(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) + defer func() { + close(mapCh) + }() + + mediator := NewMockdatabaseMediator(ctrl) + mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).Return(errors.New("unknown error")) + d.mediator = mediator + + shards := append(sharding.NewShards([]uint32{0, 1}, shard.Available), + sharding.NewShards([]uint32{2}, shard.Initializing)...) + shardSet, err := sharding.NewShardSet(shards, nil) + require.NoError(t, err) + + defer instrument.SetShouldPanicEnvironmentVariable(true)() + require.Panics(t, func() { + d.AssignShardSet(shardSet) + }) +} + func TestDatabaseRemoveNamespace(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index cd4d691901..03e3ded37a 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -147,7 +147,7 @@ func (m *fileSystemManager) Run(t time.Time) bool { m.Unlock() }() - m.log.Debug("starting warm flush") + m.log.Debug("starting warm flush", zap.Time("time", t)) // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. if err := m.WarmFlushCleanup(t); err != nil { @@ -166,7 +166,7 @@ func (m *fileSystemManager) Run(t time.Time) bool { l.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) }) } - m.log.Debug("completed warm flush") + m.log.Debug("completed warm flush", zap.Time("time", t)) return true } diff --git a/src/dbnode/storage/fs_test.go b/src/dbnode/storage/fs_test.go index cfb45e427d..10186d1644 100644 --- a/src/dbnode/storage/fs_test.go +++ b/src/dbnode/storage/fs_test.go @@ -39,8 +39,9 @@ func TestFileSystemManagerShouldRunDuringBootstrap(t *testing.T) { fsm := newFileSystemManager(database, nil, DefaultTestOptions()) mgr := fsm.(*fileSystemManager) - database.EXPECT().IsBootstrapped().Return(false) + database.EXPECT().IsBootstrapped().Return(false).Times(2) require.False(t, mgr.shouldRunWithLock()) + require.False(t, mgr.Run(time.Now())) database.EXPECT().IsBootstrapped().Return(true) require.True(t, mgr.shouldRunWithLock()) @@ -72,7 +73,7 @@ func TestFileSystemManagerShouldRunEnableDisable(t *testing.T) { require.True(t, mgr.shouldRunWithLock()) } -func TestFileSystemManagerRun(t *testing.T) { +func TestFileSystemManagerRunCleanupPanic(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() database := newMockdatabase(ctrl) @@ -93,3 +94,26 @@ func TestFileSystemManagerRun(t *testing.T) { defer instrument.SetShouldPanicEnvironmentVariable(true)() require.Panics(t, func() { mgr.Run(ts) }) } + +func TestFileSystemManagerRunFlushPanic(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + database := newMockdatabase(ctrl) + database.EXPECT().IsBootstrapped().Return(true).AnyTimes() + + fm := NewMockdatabaseFlushManager(ctrl) + cm := NewMockdatabaseCleanupManager(ctrl) + fsm := newFileSystemManager(database, nil, DefaultTestOptions()) + mgr := fsm.(*fileSystemManager) + mgr.databaseFlushManager = fm + mgr.databaseCleanupManager = cm + + ts := time.Now() + gomock.InOrder( + cm.EXPECT().WarmFlushCleanup(ts).Return(nil), + fm.EXPECT().Flush(ts).Return(errors.New("flush error")), + ) + + defer instrument.SetShouldPanicEnvironmentVariable(true)() + require.Panics(t, func() { mgr.Run(ts) }) +} diff --git a/src/dbnode/storage/mediator_test.go b/src/dbnode/storage/mediator_test.go index 53807529fe..10b24fdab9 100644 --- a/src/dbnode/storage/mediator_test.go +++ b/src/dbnode/storage/mediator_test.go @@ -21,6 +21,7 @@ package storage import ( + "sync" "testing" "time" @@ -116,3 +117,52 @@ func TestDatabaseMediatorDisableFileOpsAndWait(t *testing.T) { m.DisableFileOpsAndWait() require.Equal(t, 3, len(slept)) } + +func TestDatabaseMediatorEnqueueMutuallyExclusiveFnAndExecute(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + opts := DefaultTestOptions() + opts = opts. + SetMediatorTickInterval(time.Millisecond * 100). + SetBootstrapProcessProvider(nil) + + db := NewMockdatabase(ctrl) + db.EXPECT().Options().Return(opts).AnyTimes() + db.EXPECT().IsBootstrapped().Return(true).AnyTimes() + db.EXPECT().IsBootstrappedAndDurable().Return(true).AnyTimes() + + med, err := newMediator(db, nil, opts) + require.NoError(t, err) + m := med.(*mediator) + + tm := NewMockdatabaseTickManager(ctrl) + tm.EXPECT().Tick(force, gomock.Any()).Return(nil).AnyTimes() + m.databaseTickManager = tm + fsm := NewMockdatabaseFileSystemManager(ctrl) + fsm.EXPECT().Run(gomock.Any()).Return(true).AnyTimes() + fsm.EXPECT().Report().AnyTimes() + m.databaseFileSystemManager = fsm + cfm := NewMockdatabaseColdFlushManager(ctrl) + cfm.EXPECT().Run(gomock.Any()).Return(true).AnyTimes() + cfm.EXPECT().Report().AnyTimes() + m.databaseColdFlushManager = cfm + + require.NoError(t, med.Open()) + defer func() { + require.NoError(t, med.Close()) + }() + + fsm.EXPECT().Status().Return(fileOpNotStarted).AnyTimes() + cfm.EXPECT().Status().Return(fileOpNotStarted).AnyTimes() + + var wg sync.WaitGroup + wg.Add(1) + require.NoError(t, med.EnqueueMutuallyExclusiveFn(func() { + defer wg.Done() + require.Equal(t, fileOpNotStarted, m.databaseFileSystemManager.Status()) + require.Equal(t, fileOpNotStarted, m.databaseColdFlushManager.Status()) + })) + + wg.Wait() +} From 80ea6b996afecc1e30e6ead79bbdc53d804974eb Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 16 Apr 2021 17:02:27 +0300 Subject: [PATCH 14/18] removed invariant violation --- src/dbnode/storage/cluster/database.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/dbnode/storage/cluster/database.go b/src/dbnode/storage/cluster/database.go index 280d0e498c..5b149ebf6f 100644 --- a/src/dbnode/storage/cluster/database.go +++ b/src/dbnode/storage/cluster/database.go @@ -30,8 +30,6 @@ import ( "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage" "github.com/m3db/m3/src/dbnode/topology" - "github.com/m3db/m3/src/x/instrument" - "github.com/uber-go/tally" "go.uber.org/zap" ) @@ -409,13 +407,10 @@ func (d *clusterDB) analyzeAndReportShardStates() { count := d.bootstrapCount[id] if count != len(namespaces) { // Should never happen if bootstrapped and durable. - instrument.EmitAndLogInvariantViolation(d.opts.InstrumentOptions(), func(l *zap.Logger) { - l.With( - zap.Uint32("shard", id), - zap.Int("count", count), - zap.Int("numNamespaces", len(namespaces)), - ).Error("database indicated that it was bootstrapped and durable, but number of bootstrapped shards did not match number of namespaces") - }) + d.log.Debug("database indicated that it was bootstrapped and durable, but number of bootstrapped shards did not match number of namespaces", + zap.Uint32("shard", id), + zap.Int("count", count), + zap.Int("numNamespaces", len(namespaces))) continue } From fbf17ee70060226921f20c6b6080f3cc30c16c45 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Fri, 16 Apr 2021 17:03:34 +0300 Subject: [PATCH 15/18] linter fix --- src/dbnode/storage/cluster/database.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/dbnode/storage/cluster/database.go b/src/dbnode/storage/cluster/database.go index 5b149ebf6f..f63fef83c3 100644 --- a/src/dbnode/storage/cluster/database.go +++ b/src/dbnode/storage/cluster/database.go @@ -407,7 +407,8 @@ func (d *clusterDB) analyzeAndReportShardStates() { count := d.bootstrapCount[id] if count != len(namespaces) { // Should never happen if bootstrapped and durable. - d.log.Debug("database indicated that it was bootstrapped and durable, but number of bootstrapped shards did not match number of namespaces", + d.log.Debug("database indicated that it was bootstrapped and durable, "+ + "but number of bootstrapped shards did not match number of namespaces", zap.Uint32("shard", id), zap.Int("count", count), zap.Int("numNamespaces", len(namespaces))) From 80c056887f53eb3ed230938bcd0fc75c4d14fa3a Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 21 Apr 2021 10:14:33 +0300 Subject: [PATCH 16/18] set bootstrap result value directly. --- src/dbnode/storage/bootstrap.go | 4 +--- src/dbnode/storage/types.go | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index e31ff5e3c4..f013dde13b 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -204,9 +204,7 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn()) m.state = Bootstrapped m.Unlock() - asyncResult.bootstrapResultFn = func() BootstrapResult { - return result - } + asyncResult.bootstrapResult = result asyncResult.bootstrapCompleted.Done() return result, nil } diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index f48ee809fe..3eab205c53 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -839,7 +839,7 @@ type BootstrapResult struct { type BootstrapAsyncResult struct { bootstrapStarted *sync.WaitGroup bootstrapCompleted *sync.WaitGroup - bootstrapResultFn func() BootstrapResult + bootstrapResult BootstrapResult } func newBootstrapAsyncResult() *BootstrapAsyncResult { @@ -858,7 +858,7 @@ func newBootstrapAsyncResult() *BootstrapAsyncResult { // Result will wait for bootstrap to complete and return BootstrapResult. func (b *BootstrapAsyncResult) Result() BootstrapResult { b.bootstrapCompleted.Wait() - return b.bootstrapResultFn() + return b.bootstrapResult } // WaitForStart waits until bootstrap has been started. From 4ba1d9609c2583a177feda1aac35bb9089df4d0d Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 21 Apr 2021 14:37:56 +0300 Subject: [PATCH 17/18] changes after review --- src/dbnode/storage/bootstrap.go | 12 ++++++++---- src/dbnode/storage/cluster/database.go | 3 ++- src/dbnode/storage/database.go | 24 ++++++++++++------------ src/dbnode/storage/mediator.go | 18 +++++++++++------- src/dbnode/storage/storage_mock.go | 14 ++++++++++++++ src/dbnode/storage/types.go | 3 +++ 6 files changed, 50 insertions(+), 24 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index f013dde13b..9cf5cb3e79 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -143,9 +143,11 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo // reshard occurs and we need to bootstrap more shards. m.hasPending = true m.Unlock() + result := BootstrapResult{AlreadyBootstrapping: true} + asyncResult.bootstrapResult = result asyncResult.bootstrapStarted.Done() asyncResult.bootstrapCompleted.Done() - return BootstrapResult{AlreadyBootstrapping: true}, errBootstrapEnqueued + return result, errBootstrapEnqueued default: m.state = Bootstrapping } @@ -155,10 +157,14 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo m.mediator.DisableFileOpsAndWait() defer m.mediator.EnableFileOps() + var result BootstrapResult asyncResult.bootstrapStarted.Done() + defer func() { + asyncResult.bootstrapResult = result + asyncResult.bootstrapCompleted.Done() + }() // Keep performing bootstraps until none pending and no error returned. - var result BootstrapResult for i := 0; true; i++ { // NB(r): Decouple implementation of bootstrap so can override in tests. bootstrapErr := m.bootstrapFn() @@ -204,8 +210,6 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn()) m.state = Bootstrapped m.Unlock() - asyncResult.bootstrapResult = result - asyncResult.bootstrapCompleted.Done() return result, nil } diff --git a/src/dbnode/storage/cluster/database.go b/src/dbnode/storage/cluster/database.go index f63fef83c3..0e082be647 100644 --- a/src/dbnode/storage/cluster/database.go +++ b/src/dbnode/storage/cluster/database.go @@ -406,7 +406,8 @@ func (d *clusterDB) analyzeAndReportShardStates() { for id := range d.initializing { count := d.bootstrapCount[id] if count != len(namespaces) { - // Should never happen if bootstrapped and durable. + // This could temporarily occur due to the race condition, e.g. database was bootstrapped and durable + // at the time we checked but then new shards were assigned which are still not bootstrapped. d.log.Debug("database indicated that it was bootstrapped and durable, "+ "but number of bootstrapped shards did not match number of namespaces", zap.Uint32("shard", id), diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 5eec7161bd..96070adf8a 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -459,21 +459,21 @@ func (d *db) AssignShardSet(shardSet sharding.ShardSet) { } d.Unlock() + if !d.mediator.IsOpen() { + d.assignShardSet(shardSet) + return + } + if err := d.mediator.EnqueueMutuallyExclusiveFn(func() { d.assignShardSet(shardSet) }); err != nil { - if errors.Is(err, errMediatorNotOpen) { - // initial assignment. - d.assignShardSet(shardSet) - } else { - // should not happen. - instrument.EmitAndLogInvariantViolation(d.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("failed to enqueue assignShardSet fn", - zap.Error(err), - zap.Uint32s("shards", shardSet.AllIDs())) - }) - } + // should not happen. + instrument.EmitAndLogInvariantViolation(d.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("failed to enqueue assignShardSet fn", + zap.Error(err), + zap.Uint32s("shards", shardSet.AllIDs())) + }) } } diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index bdb2570ba6..8b43a986f5 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -250,7 +250,7 @@ func (m *mediator) ongoingFileSystemProcesses() { m.sleepFn(m.tickInterval) // Check if the mediator is already closed. - if !m.isOpen() { + if !m.IsOpen() { return } @@ -271,7 +271,7 @@ func (m *mediator) ongoingColdFlushProcesses() { m.sleepFn(m.tickInterval) // Check if the mediator is already closed. - if !m.isOpen() { + if !m.IsOpen() { return } @@ -293,7 +293,7 @@ func (m *mediator) ongoingTick() { m.sleepFn(m.tickInterval) // Check if the mediator is already closed. - if !m.isOpen() { + if !m.IsOpen() { return } @@ -353,7 +353,7 @@ func (m *mediator) reportLoop() { } } -func (m *mediator) isOpen() bool { +func (m *mediator) IsOpen() bool { m.RLock() defer m.RUnlock() return m.state == mediatorOpen @@ -492,9 +492,13 @@ func (b *mediatorTimeBarrier) maybeRelease() (time.Time, error) { // If all waiters are waiting, we can safely call mutually exclusive external functions. if numWaiters == b.numMaxWaiters { // Drain the channel. - for len(b.externalFnCh) > 0 { - externalFn := <-b.externalFnCh - externalFn() + for { + select { + case fn := <-b.externalFnCh: + fn() + default: + break // Break from loop, no more to read + } } } diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index c28f8507d4..3337110eaa 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -3568,6 +3568,20 @@ func (mr *MockdatabaseMediatorMockRecorder) IsBootstrapped() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsBootstrapped", reflect.TypeOf((*MockdatabaseMediator)(nil).IsBootstrapped)) } +// IsOpen mocks base method +func (m *MockdatabaseMediator) IsOpen() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsOpen") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsOpen indicates an expected call of IsOpen +func (mr *MockdatabaseMediatorMockRecorder) IsOpen() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsOpen", reflect.TypeOf((*MockdatabaseMediator)(nil).IsOpen)) +} + // LastBootstrapCompletionTime mocks base method func (m *MockdatabaseMediator) LastBootstrapCompletionTime() (time0.UnixNano, bool) { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 3eab205c53..266d0d0723 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -1008,6 +1008,9 @@ type databaseMediator interface { // IsBootstrapped returns whether the database is bootstrapped. IsBootstrapped() bool + // IsOpen returns whether mediator is opened. + IsOpen() bool + // LastBootstrapCompletionTime returns the last bootstrap completion time, // if any. LastBootstrapCompletionTime() (xtime.UnixNano, bool) From 49b7f6291fb09f2b2fd265c5671312b23698fb65 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 21 Apr 2021 15:55:30 +0300 Subject: [PATCH 18/18] fixed failing tests. --- src/dbnode/storage/database_test.go | 3 +++ src/dbnode/storage/mediator.go | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 71ec7d218c..efe5f17b13 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -436,6 +436,7 @@ func TestDatabaseAssignShardSetBehaviorNoNewShards(t *testing.T) { // Set a mock mediator to be certain that bootstrap is not called when // no new shards are assigned. mediator := NewMockdatabaseMediator(ctrl) + mediator.EXPECT().IsOpen().Return(true) mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).DoAndReturn(func(fn func()) error { fn() return nil @@ -474,6 +475,7 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { ns := dbAddNewMockNamespace(ctrl, d, "testns") mediator := NewMockdatabaseMediator(ctrl) + mediator.EXPECT().IsOpen().Return(true) mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).DoAndReturn(func(fn func()) error { fn() return nil @@ -516,6 +518,7 @@ func TestDatabaseAssignShardSetShouldPanic(t *testing.T) { }() mediator := NewMockdatabaseMediator(ctrl) + mediator.EXPECT().IsOpen().Return(true) mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).Return(errors.New("unknown error")) d.mediator = mediator diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index 8b43a986f5..974a1298c7 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -492,12 +492,13 @@ func (b *mediatorTimeBarrier) maybeRelease() (time.Time, error) { // If all waiters are waiting, we can safely call mutually exclusive external functions. if numWaiters == b.numMaxWaiters { // Drain the channel. + Loop: for { select { case fn := <-b.externalFnCh: fn() default: - break // Break from loop, no more to read + break Loop } } }