Skip to content

Commit

Permalink
[dbnode] Shards assignment improvements during cluster topology chang…
Browse files Browse the repository at this point in the history
…es (#3425)

* use `IsBootstrappedAndDurable()` instead of `IsBootstrapped()` otherwise warm and cold flushes might fail because some shards might still be not bootstrapped.

* do not run file ops (cold and warm flush) when new shards are being assigned.
check for bootstrapped shards when doing cold flush cleanups.

* update unit test to validate handling of not bootstrapped shards.

* removed `IsBootstrapped` method arg (boolean args are a code smell), it is better to make a check before calling cleanup.

* reduce locking on a db level when new shards are assigned.

* can use read lock for `d.hasReceivedNewShardsWithLock()`

* Enqueue assignShardSet fn when received update from topology so that shards get assigned when file ops are not running.

* need to set `lastReceivedNewShards` when received new shards immediately so that `IsBootstrappedAndDurable()` won't return true when db was previously bootstrapped and new bootstrap is enqueued.

* cleaned up some code.

* ensure that bootstrap is started when new shards are assigned.

* added BootstrapEnqueue().

* updated logging levels.

* more test coverage.

* removed invariant violation

* linter fix

* set bootstrap result value directly.

* changes after review

* fixed failing tests.
  • Loading branch information
soundvibe authored Apr 22, 2021
1 parent 4f035bd commit a4cee97
Show file tree
Hide file tree
Showing 17 changed files with 573 additions and 179 deletions.
29 changes: 27 additions & 2 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,22 @@ func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool)
return bsTime, bsTime > 0
}

func (m *bootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult {
bootstrapAsyncResult := newBootstrapAsyncResult()
go func(r *BootstrapAsyncResult) {
if result, err := m.startBootstrap(r); err != nil && !result.AlreadyBootstrapping {
m.instrumentation.emitAndLogInvariantViolation(err, "error bootstrapping")
}
}(bootstrapAsyncResult)
return bootstrapAsyncResult
}

func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
bootstrapAsyncResult := newBootstrapAsyncResult()
return m.startBootstrap(bootstrapAsyncResult)
}

func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (BootstrapResult, error) {
m.Lock()
switch m.state {
case Bootstrapping:
Expand All @@ -128,7 +143,11 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// reshard occurs and we need to bootstrap more shards.
m.hasPending = true
m.Unlock()
return BootstrapResult{AlreadyBootstrapping: true}, errBootstrapEnqueued
result := BootstrapResult{AlreadyBootstrapping: true}
asyncResult.bootstrapResult = result
asyncResult.bootstrapStarted.Done()
asyncResult.bootstrapCompleted.Done()
return result, errBootstrapEnqueued
default:
m.state = Bootstrapping
}
Expand All @@ -138,8 +157,14 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
m.mediator.DisableFileOpsAndWait()
defer m.mediator.EnableFileOps()

// Keep performing bootstraps until none pending and no error returned.
var result BootstrapResult
asyncResult.bootstrapStarted.Done()
defer func() {
asyncResult.bootstrapResult = result
asyncResult.bootstrapCompleted.Done()
}()

// Keep performing bootstraps until none pending and no error returned.
for i := 0; true; i++ {
// NB(r): Decouple implementation of bootstrap so can override in tests.
bootstrapErr := m.bootstrapFn()
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/storage/bootstrap_instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,9 @@ func (i *bootstrapInstrumentation) setIsBootstrappedAndDurable(isBootstrappedAnd
}
i.durableStatus.Update(status)
}

func (i *bootstrapInstrumentation) emitAndLogInvariantViolation(err error, msg string) {
instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), func(l *zap.Logger) {
l.Error(msg, zap.Error(err))
})
}
19 changes: 17 additions & 2 deletions src/dbnode/storage/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ import (
)

func TestDatabaseBootstrapWithBootstrapError(t *testing.T) {
testDatabaseBootstrapWithBootstrapError(t, false)
}

func TestDatabaseBootstrapEnqueueWithBootstrapError(t *testing.T) {
testDatabaseBootstrapWithBootstrapError(t, true)
}

func testDatabaseBootstrapWithBootstrapError(t *testing.T, async bool) {
ctrl := gomock.NewController(xtest.Reporter{T: t})
defer ctrl.Finish()

Expand Down Expand Up @@ -88,9 +96,16 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) {

require.Equal(t, BootstrapNotStarted, bsm.state)

result, err := bsm.Bootstrap()
var result BootstrapResult
if async {
asyncResult := bsm.BootstrapEnqueue()
asyncResult.WaitForStart()
result = asyncResult.Result()
} else {
result, err = bsm.Bootstrap()
require.NoError(t, err)
}

require.NoError(t, err)
require.Equal(t, Bootstrapped, bsm.state)
require.Equal(t, 1, len(result.ErrorsBootstrap))
require.Equal(t, "an error", result.ErrorsBootstrap[0].Error())
Expand Down
26 changes: 12 additions & 14 deletions src/dbnode/storage/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,7 @@ func newCleanupManager(
}
}

func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error {
// Don't perform any cleanup if we are not boostrapped yet.
if !isBootstrapped {
m.logger.Debug("database is still bootstrapping, terminating cleanup")
return nil
}

func (m *cleanupManager) WarmFlushCleanup(t time.Time) error {
m.Lock()
m.warmFlushCleanupInProgress = true
m.Unlock()
Expand Down Expand Up @@ -178,13 +172,7 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro
return multiErr.FinalError()
}

func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error {
// Don't perform any cleanup if we are not boostrapped yet.
if !isBootstrapped {
m.logger.Debug("database is still bootstrapping, terminating cleanup")
return nil
}

func (m *cleanupManager) ColdFlushCleanup(t time.Time) error {
m.Lock()
m.coldFlushCleanupInProgress = true
m.Unlock()
Expand Down Expand Up @@ -262,6 +250,7 @@ func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn fu
for _, n := range namespaces {
var activeShards []string
namespaceDirPath := filesetFilesDirPathFn(filePathPrefix, n.ID())
// NB(linasn) This should list ALL shards because it will delete dirs for the shards NOT LISTED below.
for _, s := range n.OwnedShards() {
shard := fmt.Sprintf("%d", s.ID())
activeShards = append(activeShards, shard)
Expand Down Expand Up @@ -321,6 +310,9 @@ func (m *cleanupManager) cleanupDuplicateIndexFiles(namespaces []databaseNamespa
func (m *cleanupManager) cleanupExpiredNamespaceDataFiles(earliestToRetain time.Time, shards []databaseShard) error {
multiErr := xerrors.NewMultiError()
for _, shard := range shards {
if !shard.IsBootstrapped() {
continue
}
if err := shard.CleanupExpiredFileSets(earliestToRetain); err != nil {
multiErr = multiErr.Add(err)
}
Expand All @@ -332,6 +324,9 @@ func (m *cleanupManager) cleanupExpiredNamespaceDataFiles(earliestToRetain time.
func (m *cleanupManager) cleanupCompactedNamespaceDataFiles(shards []databaseShard) error {
multiErr := xerrors.NewMultiError()
for _, shard := range shards {
if !shard.IsBootstrapped() {
continue
}
if err := shard.CleanupCompactedFileSets(); err != nil {
multiErr = multiErr.Add(err)
}
Expand Down Expand Up @@ -425,6 +420,9 @@ func (m *cleanupManager) cleanupSnapshotsAndCommitlogs(namespaces []databaseName

for _, ns := range namespaces {
for _, s := range ns.OwnedShards() {
if !s.IsBootstrapped() {
continue
}
shardSnapshots, err := m.snapshotFilesFn(fsOpts.FilePathPrefix(), ns.ID(), s.ID())
if err != nil {
multiErr = multiErr.Add(fmt.Errorf("err reading snapshot files for ns: %s and shard: %d, err: %v", ns.ID(), s.ID(), err))
Expand Down
54 changes: 15 additions & 39 deletions src/dbnode/storage/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) {
for i := 0; i < 3; i++ {
shard := NewMockdatabaseShard(ctrl)
shard.EXPECT().ID().Return(uint32(i)).AnyTimes()
shard.EXPECT().IsBootstrapped().Return(true).AnyTimes()
shard.EXPECT().CleanupExpiredFileSets(gomock.Any()).Return(nil).AnyTimes()
shard.EXPECT().CleanupCompactedFileSets().Return(nil).AnyTimes()

Expand Down Expand Up @@ -318,7 +319,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) {
return nil
}

err := cleanup(mgr, ts, true)
err := cleanup(mgr, ts)
if tc.expectErr {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -361,36 +362,7 @@ func TestCleanupManagerNamespaceCleanupBootstrapped(t *testing.T) {
mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager)
idx.EXPECT().CleanupExpiredFileSets(ts).Return(nil)
idx.EXPECT().CleanupDuplicateFileSets().Return(nil)
require.NoError(t, cleanup(mgr, ts, true))
}

func TestCleanupManagerNamespaceCleanupNotBootstrapped(t *testing.T) {
ctrl := gomock.NewController(xtest.Reporter{T: t})
defer ctrl.Finish()

ts := timeFor(36000)
rOpts := retentionOptions.
SetRetentionPeriod(21600 * time.Second).
SetBlockSize(3600 * time.Second)
nsOpts := namespaceOptions.
SetRetentionOptions(rOpts).
SetCleanupEnabled(true).
SetIndexOptions(namespace.NewIndexOptions().
SetEnabled(true).
SetBlockSize(7200 * time.Second))

ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().ID().Return(ident.StringID("ns")).AnyTimes()
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
ns.EXPECT().OwnedShards().Return(nil).AnyTimes()

nses := []databaseNamespace{ns}
db := newMockdatabase(ctrl, ns)
db.EXPECT().OwnedNamespaces().Return(nses, nil).AnyTimes()

mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager)
require.NoError(t, cleanup(mgr, ts, false))
require.NoError(t, cleanup(mgr, ts))
}

// Test NS doesn't cleanup when flag is present
Expand Down Expand Up @@ -423,7 +395,7 @@ func TestCleanupManagerDoesntNeedCleanup(t *testing.T) {
return nil
}

require.NoError(t, cleanup(mgr, ts, true))
require.NoError(t, cleanup(mgr, ts))
}

func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) {
Expand All @@ -436,11 +408,15 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) {
ns.EXPECT().Options().Return(nsOpts).AnyTimes()

shard := NewMockdatabaseShard(ctrl)
shardNotBootstrapped := NewMockdatabaseShard(ctrl)
shardNotBootstrapped.EXPECT().IsBootstrapped().Return(false).AnyTimes()
shardNotBootstrapped.EXPECT().ID().Return(uint32(1)).AnyTimes()
expectedEarliestToRetain := retention.FlushTimeStart(ns.Options().RetentionOptions(), ts)
shard.EXPECT().IsBootstrapped().Return(true).AnyTimes()
shard.EXPECT().CleanupExpiredFileSets(expectedEarliestToRetain).Return(nil)
shard.EXPECT().CleanupCompactedFileSets().Return(nil)
shard.EXPECT().ID().Return(uint32(0)).AnyTimes()
ns.EXPECT().OwnedShards().Return([]databaseShard{shard}).AnyTimes()
ns.EXPECT().OwnedShards().Return([]databaseShard{shard, shardNotBootstrapped}).AnyTimes()
ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
namespaces := []databaseNamespace{ns}
Expand All @@ -449,7 +425,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) {
db.EXPECT().OwnedNamespaces().Return(namespaces, nil).AnyTimes()
mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager)

require.NoError(t, cleanup(mgr, ts, true))
require.NoError(t, cleanup(mgr, ts))
}

type deleteInactiveDirectoriesCall struct {
Expand All @@ -469,6 +445,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) {

shard := NewMockdatabaseShard(ctrl)
shard.EXPECT().ID().Return(uint32(0)).AnyTimes()
shard.EXPECT().IsBootstrapped().Return(true).AnyTimes()
ns.EXPECT().OwnedShards().Return([]databaseShard{shard}).AnyTimes()
ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
Expand All @@ -488,7 +465,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) {
}
mgr.deleteInactiveDirectoriesFn = deleteInactiveDirectoriesFn

require.NoError(t, cleanup(mgr, ts, true))
require.NoError(t, cleanup(mgr, ts))

expectedCalls := []deleteInactiveDirectoriesCall{
deleteInactiveDirectoriesCall{
Expand Down Expand Up @@ -533,7 +510,7 @@ func TestCleanupManagerPropagatesOwnedNamespacesError(t *testing.T) {
require.NoError(t, db.Open())
require.NoError(t, db.Terminate())

require.Error(t, cleanup(mgr, ts, true))
require.Error(t, cleanup(mgr, ts))
}

func timeFor(s int64) time.Time {
Expand Down Expand Up @@ -561,10 +538,9 @@ func newFakeActiveLogs(activeLogs persist.CommitLogFiles) fakeActiveLogs {
func cleanup(
mgr databaseCleanupManager,
t time.Time,
isBootstrapped bool,
) error {
multiErr := xerrors.NewMultiError()
multiErr = multiErr.Add(mgr.WarmFlushCleanup(t, isBootstrapped))
multiErr = multiErr.Add(mgr.ColdFlushCleanup(t, isBootstrapped))
multiErr = multiErr.Add(mgr.WarmFlushCleanup(t))
multiErr = multiErr.Add(mgr.ColdFlushCleanup(t))
return multiErr.FinalError()
}
17 changes: 7 additions & 10 deletions src/dbnode/storage/cluster/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -408,14 +406,13 @@ func (d *clusterDB) analyzeAndReportShardStates() {
for id := range d.initializing {
count := d.bootstrapCount[id]
if count != len(namespaces) {
// Should never happen if bootstrapped and durable.
instrument.EmitAndLogInvariantViolation(d.opts.InstrumentOptions(), func(l *zap.Logger) {
l.With(
zap.Uint32("shard", id),
zap.Int("count", count),
zap.Int("numNamespaces", len(namespaces)),
).Error("database indicated that it was bootstrapped and durable, but number of bootstrapped shards did not match number of namespaces")
})
// This could temporarily occur due to the race condition, e.g. database was bootstrapped and durable
// at the time we checked but then new shards were assigned which are still not bootstrapped.
d.log.Debug("database indicated that it was bootstrapped and durable, "+
"but number of bootstrapped shards did not match number of namespaces",
zap.Uint32("shard", id),
zap.Int("count", count),
zap.Int("numNamespaces", len(namespaces)))
continue
}

Expand Down
14 changes: 10 additions & 4 deletions src/dbnode/storage/coldflush.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,20 @@ func (m *coldFlushManager) Run(t time.Time) bool {
m.status = fileOpInProgress
m.Unlock()

defer func() {
m.Lock()
m.status = fileOpNotStarted
m.Unlock()
}()

m.log.Debug("starting cold flush", zap.Time("time", t))

// NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks.
// NB(r): Use invariant here since flush errors were introduced
// and not caught in CI or integration tests.
// When an invariant occurs in CI tests it panics so as to fail
// the build.
if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil {
if err := m.ColdFlushCleanup(t); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err))
Expand All @@ -118,9 +126,7 @@ func (m *coldFlushManager) Run(t time.Time) bool {
l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err))
})
}
m.Lock()
m.status = fileOpNotStarted
m.Unlock()
m.log.Debug("completed cold flush", zap.Time("time", t))
return true
}

Expand Down
Loading

0 comments on commit a4cee97

Please sign in to comment.