Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Shards assignment improvements during cluster topology changes #3425

Merged
merged 24 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4fe5430
use `IsBootstrappedAndDurable()` instead of `IsBootstrapped()` otherw…
soundvibe Mar 30, 2021
1a33b7f
Merge branch 'master' into linasn/cold-flush-cleanup-panic-fix
soundvibe Mar 30, 2021
b4ac31a
do not run file ops (cold and warm flush) when new shards are being a…
soundvibe Apr 1, 2021
7c4a9bc
update unit test to validate handling of not bootstrapped shards.
soundvibe Apr 6, 2021
3e4c350
Merge branch 'master' into linasn/cold-flush-cleanup-panic-fix
soundvibe Apr 6, 2021
02e86ec
removed `IsBootstrapped` method arg (boolean args are a code smell), …
soundvibe Apr 7, 2021
4a21659
reduce locking on a db level when new shards are assigned.
soundvibe Apr 7, 2021
83d2c7c
can use read lock for `d.hasReceivedNewShardsWithLock()`
soundvibe Apr 7, 2021
df3f9fd
Merge branch 'master' into linasn/cold-flush-cleanup-panic-fix
soundvibe Apr 7, 2021
56177c3
Enqueue assignShardSet fn when received update from topology so that …
soundvibe Apr 14, 2021
006065e
need to set `lastReceivedNewShards` when received new shards immediat…
soundvibe Apr 15, 2021
3dfeb14
cleaned up some code.
soundvibe Apr 15, 2021
4bc496a
ensure that bootstrap is started when new shards are assigned.
soundvibe Apr 15, 2021
6befcdf
added BootstrapEnqueue().
soundvibe Apr 15, 2021
a8d3765
updated logging levels.
soundvibe Apr 16, 2021
2825847
Merge branch 'master' into linasn/assign-new-shards-fix
soundvibe Apr 16, 2021
856939d
more test coverage.
soundvibe Apr 16, 2021
80ea6b9
removed invariant violation
soundvibe Apr 16, 2021
fbf17ee
linter fix
soundvibe Apr 16, 2021
80c0568
set bootstrap result value directly.
soundvibe Apr 21, 2021
df5b491
Merge branch 'master' into linasn/assign-new-shards-fix
soundvibe Apr 21, 2021
4ba1d96
changes after review
soundvibe Apr 21, 2021
49b7f62
fixed failing tests.
soundvibe Apr 21, 2021
003ff06
Merge branch 'master' into linasn/assign-new-shards-fix
soundvibe Apr 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,22 @@ func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool)
return bsTime, bsTime > 0
}

func (m *bootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult {
bootstrapAsyncResult := newBootstrapAsyncResult()
go func(r *BootstrapAsyncResult) {
if result, err := m.startBootstrap(r); err != nil && !result.AlreadyBootstrapping {
m.instrumentation.emitAndLogInvariantViolation(err, "error bootstrapping")
}
}(bootstrapAsyncResult)
return bootstrapAsyncResult
}

func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
bootstrapAsyncResult := newBootstrapAsyncResult()
return m.startBootstrap(bootstrapAsyncResult)
}

func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (BootstrapResult, error) {
m.Lock()
switch m.state {
case Bootstrapping:
Expand All @@ -128,6 +143,8 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// reshard occurs and we need to bootstrap more shards.
m.hasPending = true
m.Unlock()
asyncResult.bootstrapStarted.Done()
robskillington marked this conversation as resolved.
Show resolved Hide resolved
asyncResult.bootstrapCompleted.Done()
return BootstrapResult{AlreadyBootstrapping: true}, errBootstrapEnqueued
default:
m.state = Bootstrapping
Expand All @@ -138,6 +155,8 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
m.mediator.DisableFileOpsAndWait()
defer m.mediator.EnableFileOps()

asyncResult.bootstrapStarted.Done()

// Keep performing bootstraps until none pending and no error returned.
var result BootstrapResult
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can use a defer here? Just in case another code path returns early.

Suggested change
asyncResult.bootstrapStarted.Done()
// Keep performing bootstraps until none pending and no error returned.
var result BootstrapResult
// Keep performing bootstraps until none pending and no error returned.
var result BootstrapResult
asyncResult.bootstrapStarted.Done()
defer func() {
asyncResult.bootstrapResult = result
asyncResult.bootstrapCompleted.Done()
}()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Would need to remove the bootstrapResult = result assignment and bootstrapCompleted.Done() calls at bottom of method obviously)

for i := 0; true; i++ {
Expand Down Expand Up @@ -185,6 +204,8 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn())
m.state = Bootstrapped
m.Unlock()
asyncResult.bootstrapResult = result
asyncResult.bootstrapCompleted.Done()
return result, nil
}

Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/storage/bootstrap_instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,9 @@ func (i *bootstrapInstrumentation) setIsBootstrappedAndDurable(isBootstrappedAnd
}
i.durableStatus.Update(status)
}

func (i *bootstrapInstrumentation) emitAndLogInvariantViolation(err error, msg string) {
instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), func(l *zap.Logger) {
l.Error(msg, zap.Error(err))
})
}
19 changes: 17 additions & 2 deletions src/dbnode/storage/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ import (
)

func TestDatabaseBootstrapWithBootstrapError(t *testing.T) {
testDatabaseBootstrapWithBootstrapError(t, false)
}

func TestDatabaseBootstrapEnqueueWithBootstrapError(t *testing.T) {
testDatabaseBootstrapWithBootstrapError(t, true)
}

func testDatabaseBootstrapWithBootstrapError(t *testing.T, async bool) {
ctrl := gomock.NewController(xtest.Reporter{T: t})
defer ctrl.Finish()

Expand Down Expand Up @@ -88,9 +96,16 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) {

require.Equal(t, BootstrapNotStarted, bsm.state)

result, err := bsm.Bootstrap()
var result BootstrapResult
if async {
asyncResult := bsm.BootstrapEnqueue()
asyncResult.WaitForStart()
result = asyncResult.Result()
} else {
result, err = bsm.Bootstrap()
require.NoError(t, err)
}

require.NoError(t, err)
require.Equal(t, Bootstrapped, bsm.state)
require.Equal(t, 1, len(result.ErrorsBootstrap))
require.Equal(t, "an error", result.ErrorsBootstrap[0].Error())
Expand Down
26 changes: 12 additions & 14 deletions src/dbnode/storage/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,7 @@ func newCleanupManager(
}
}

func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error {
// Don't perform any cleanup if we are not boostrapped yet.
if !isBootstrapped {
m.logger.Debug("database is still bootstrapping, terminating cleanup")
return nil
}

func (m *cleanupManager) WarmFlushCleanup(t time.Time) error {
m.Lock()
m.warmFlushCleanupInProgress = true
m.Unlock()
Expand Down Expand Up @@ -178,13 +172,7 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro
return multiErr.FinalError()
}

func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error {
// Don't perform any cleanup if we are not boostrapped yet.
if !isBootstrapped {
m.logger.Debug("database is still bootstrapping, terminating cleanup")
return nil
}

func (m *cleanupManager) ColdFlushCleanup(t time.Time) error {
m.Lock()
m.coldFlushCleanupInProgress = true
m.Unlock()
Expand Down Expand Up @@ -262,6 +250,7 @@ func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn fu
for _, n := range namespaces {
var activeShards []string
namespaceDirPath := filesetFilesDirPathFn(filePathPrefix, n.ID())
// NB(linasn) This should list ALL shards because it will delete dirs for the shards NOT LISTED below.
for _, s := range n.OwnedShards() {
shard := fmt.Sprintf("%d", s.ID())
activeShards = append(activeShards, shard)
Expand Down Expand Up @@ -321,6 +310,9 @@ func (m *cleanupManager) cleanupDuplicateIndexFiles(namespaces []databaseNamespa
func (m *cleanupManager) cleanupExpiredNamespaceDataFiles(earliestToRetain time.Time, shards []databaseShard) error {
multiErr := xerrors.NewMultiError()
for _, shard := range shards {
if !shard.IsBootstrapped() {
continue
}
if err := shard.CleanupExpiredFileSets(earliestToRetain); err != nil {
multiErr = multiErr.Add(err)
}
Expand All @@ -332,6 +324,9 @@ func (m *cleanupManager) cleanupExpiredNamespaceDataFiles(earliestToRetain time.
func (m *cleanupManager) cleanupCompactedNamespaceDataFiles(shards []databaseShard) error {
multiErr := xerrors.NewMultiError()
for _, shard := range shards {
if !shard.IsBootstrapped() {
continue
}
if err := shard.CleanupCompactedFileSets(); err != nil {
multiErr = multiErr.Add(err)
}
Expand Down Expand Up @@ -425,6 +420,9 @@ func (m *cleanupManager) cleanupSnapshotsAndCommitlogs(namespaces []databaseName

for _, ns := range namespaces {
for _, s := range ns.OwnedShards() {
if !s.IsBootstrapped() {
continue
}
shardSnapshots, err := m.snapshotFilesFn(fsOpts.FilePathPrefix(), ns.ID(), s.ID())
if err != nil {
multiErr = multiErr.Add(fmt.Errorf("err reading snapshot files for ns: %s and shard: %d, err: %v", ns.ID(), s.ID(), err))
Expand Down
54 changes: 15 additions & 39 deletions src/dbnode/storage/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) {
for i := 0; i < 3; i++ {
shard := NewMockdatabaseShard(ctrl)
shard.EXPECT().ID().Return(uint32(i)).AnyTimes()
shard.EXPECT().IsBootstrapped().Return(true).AnyTimes()
shard.EXPECT().CleanupExpiredFileSets(gomock.Any()).Return(nil).AnyTimes()
shard.EXPECT().CleanupCompactedFileSets().Return(nil).AnyTimes()

Expand Down Expand Up @@ -318,7 +319,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) {
return nil
}

err := cleanup(mgr, ts, true)
err := cleanup(mgr, ts)
if tc.expectErr {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -361,36 +362,7 @@ func TestCleanupManagerNamespaceCleanupBootstrapped(t *testing.T) {
mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager)
idx.EXPECT().CleanupExpiredFileSets(ts).Return(nil)
idx.EXPECT().CleanupDuplicateFileSets().Return(nil)
require.NoError(t, cleanup(mgr, ts, true))
}

func TestCleanupManagerNamespaceCleanupNotBootstrapped(t *testing.T) {
ctrl := gomock.NewController(xtest.Reporter{T: t})
defer ctrl.Finish()

ts := timeFor(36000)
rOpts := retentionOptions.
SetRetentionPeriod(21600 * time.Second).
SetBlockSize(3600 * time.Second)
nsOpts := namespaceOptions.
SetRetentionOptions(rOpts).
SetCleanupEnabled(true).
SetIndexOptions(namespace.NewIndexOptions().
SetEnabled(true).
SetBlockSize(7200 * time.Second))

ns := NewMockdatabaseNamespace(ctrl)
ns.EXPECT().ID().Return(ident.StringID("ns")).AnyTimes()
ns.EXPECT().Options().Return(nsOpts).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
ns.EXPECT().OwnedShards().Return(nil).AnyTimes()

nses := []databaseNamespace{ns}
db := newMockdatabase(ctrl, ns)
db.EXPECT().OwnedNamespaces().Return(nses, nil).AnyTimes()

mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager)
require.NoError(t, cleanup(mgr, ts, false))
require.NoError(t, cleanup(mgr, ts))
}

// Test NS doesn't cleanup when flag is present
Expand Down Expand Up @@ -423,7 +395,7 @@ func TestCleanupManagerDoesntNeedCleanup(t *testing.T) {
return nil
}

require.NoError(t, cleanup(mgr, ts, true))
require.NoError(t, cleanup(mgr, ts))
}

func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) {
Expand All @@ -436,11 +408,15 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) {
ns.EXPECT().Options().Return(nsOpts).AnyTimes()

shard := NewMockdatabaseShard(ctrl)
shardNotBootstrapped := NewMockdatabaseShard(ctrl)
shardNotBootstrapped.EXPECT().IsBootstrapped().Return(false).AnyTimes()
shardNotBootstrapped.EXPECT().ID().Return(uint32(1)).AnyTimes()
expectedEarliestToRetain := retention.FlushTimeStart(ns.Options().RetentionOptions(), ts)
shard.EXPECT().IsBootstrapped().Return(true).AnyTimes()
shard.EXPECT().CleanupExpiredFileSets(expectedEarliestToRetain).Return(nil)
shard.EXPECT().CleanupCompactedFileSets().Return(nil)
shard.EXPECT().ID().Return(uint32(0)).AnyTimes()
ns.EXPECT().OwnedShards().Return([]databaseShard{shard}).AnyTimes()
ns.EXPECT().OwnedShards().Return([]databaseShard{shard, shardNotBootstrapped}).AnyTimes()
ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
namespaces := []databaseNamespace{ns}
Expand All @@ -449,7 +425,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) {
db.EXPECT().OwnedNamespaces().Return(namespaces, nil).AnyTimes()
mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager)

require.NoError(t, cleanup(mgr, ts, true))
require.NoError(t, cleanup(mgr, ts))
}

type deleteInactiveDirectoriesCall struct {
Expand All @@ -469,6 +445,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) {

shard := NewMockdatabaseShard(ctrl)
shard.EXPECT().ID().Return(uint32(0)).AnyTimes()
shard.EXPECT().IsBootstrapped().Return(true).AnyTimes()
ns.EXPECT().OwnedShards().Return([]databaseShard{shard}).AnyTimes()
ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes()
ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, nil).AnyTimes()
Expand All @@ -488,7 +465,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) {
}
mgr.deleteInactiveDirectoriesFn = deleteInactiveDirectoriesFn

require.NoError(t, cleanup(mgr, ts, true))
require.NoError(t, cleanup(mgr, ts))

expectedCalls := []deleteInactiveDirectoriesCall{
deleteInactiveDirectoriesCall{
Expand Down Expand Up @@ -533,7 +510,7 @@ func TestCleanupManagerPropagatesOwnedNamespacesError(t *testing.T) {
require.NoError(t, db.Open())
require.NoError(t, db.Terminate())

require.Error(t, cleanup(mgr, ts, true))
require.Error(t, cleanup(mgr, ts))
}

func timeFor(s int64) time.Time {
Expand Down Expand Up @@ -561,10 +538,9 @@ func newFakeActiveLogs(activeLogs persist.CommitLogFiles) fakeActiveLogs {
func cleanup(
mgr databaseCleanupManager,
t time.Time,
isBootstrapped bool,
) error {
multiErr := xerrors.NewMultiError()
multiErr = multiErr.Add(mgr.WarmFlushCleanup(t, isBootstrapped))
multiErr = multiErr.Add(mgr.ColdFlushCleanup(t, isBootstrapped))
multiErr = multiErr.Add(mgr.WarmFlushCleanup(t))
multiErr = multiErr.Add(mgr.ColdFlushCleanup(t))
return multiErr.FinalError()
}
14 changes: 5 additions & 9 deletions src/dbnode/storage/cluster/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import (
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -409,13 +407,11 @@ func (d *clusterDB) analyzeAndReportShardStates() {
count := d.bootstrapCount[id]
if count != len(namespaces) {
// Should never happen if bootstrapped and durable.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Update this comment to say that this can temporarily occur due to race condition?

instrument.EmitAndLogInvariantViolation(d.opts.InstrumentOptions(), func(l *zap.Logger) {
l.With(
zap.Uint32("shard", id),
zap.Int("count", count),
zap.Int("numNamespaces", len(namespaces)),
).Error("database indicated that it was bootstrapped and durable, but number of bootstrapped shards did not match number of namespaces")
})
d.log.Debug("database indicated that it was bootstrapped and durable, "+
"but number of bootstrapped shards did not match number of namespaces",
zap.Uint32("shard", id),
zap.Int("count", count),
zap.Int("numNamespaces", len(namespaces)))
continue
}

Expand Down
14 changes: 10 additions & 4 deletions src/dbnode/storage/coldflush.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,20 @@ func (m *coldFlushManager) Run(t time.Time) bool {
m.status = fileOpInProgress
m.Unlock()

defer func() {
m.Lock()
m.status = fileOpNotStarted
m.Unlock()
}()

m.log.Debug("starting cold flush", zap.Time("time", t))

// NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks.
// NB(r): Use invariant here since flush errors were introduced
// and not caught in CI or integration tests.
// When an invariant occurs in CI tests it panics so as to fail
// the build.
if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil {
if err := m.ColdFlushCleanup(t); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err))
Expand All @@ -118,9 +126,7 @@ func (m *coldFlushManager) Run(t time.Time) bool {
l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err))
})
}
m.Lock()
m.status = fileOpNotStarted
m.Unlock()
m.log.Debug("completed cold flush", zap.Time("time", t))
return true
}

Expand Down
Loading