Skip to content

Commit

Permalink
[dbnode] Use invariant error for flush errors to fail CI with flush e…
Browse files Browse the repository at this point in the history
…rrors in integration tests (#2217)

* [dbnode] Fail builds with flush errors in integration tests

* [dbnode] DB Close Wait for Fs Processes (#2229)

* [dbnode] Only read data from disk when flushing index segments.
  • Loading branch information
robskillington authored Mar 28, 2020
1 parent 02d1877 commit 8584d9a
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 77 deletions.
2 changes: 1 addition & 1 deletion docs/operational_guide/bootstrapping_crash_recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ In this case, the `peers` bootstrapper running on node A will not be able to ful
└─────────────────────────┘ └───────────────────────┘ └──────────────────────┘
```

Note that a bootstrap consistency level of `majority` is the default value, but can be modified by changing the value of the key `m3db.client.bootstrap-consistency-level` in [etcd](https://coreos.com/etcd/) to one of: `none`, `one`, `unstrict_majority` (attempt to read from majority, but settle for less if any errors occur), `majority` (strict majority), and `all`. For example, if an entire cluster with a replication factor of 3 was restarted simultaneously, all the nodes would get stuck in an infinite loop trying to peer bootstrap from each other and not achieving majority until an operator modified this value. Note that this can happen even if all the shards were in the `Available` state because M3DB nodes will reject all read requests for a shard until they have bootstrapped that shard (which has to happen everytime the node is restarted).
Note that a bootstrap consistency level of `majority` is the default value, but can be modified by changing the value of the key `m3db.client.bootstrap-consistency-level` in [etcd](https://etcd.io/) to one of: `none`, `one`, `unstrict_majority` (attempt to read from majority, but settle for less if any errors occur), `majority` (strict majority), and `all`. For example, if an entire cluster with a replication factor of 3 was restarted simultaneously, all the nodes would get stuck in an infinite loop trying to peer bootstrap from each other and not achieving majority until an operator modified this value. Note that this can happen even if all the shards were in the `Available` state because M3DB nodes will reject all read requests for a shard until they have bootstrapped that shard (which has to happen everytime the node is restarted).

**Note**: Any bootstrappers configuration that does not include the `peers` bootstrapper will be unable to handle dynamic placement changes of any kind.

Expand Down
2 changes: 1 addition & 1 deletion docs/operational_guide/placement.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

A M3DB cluster has exactly one Placement. That placement maps the cluster's shard replicas to nodes. A cluster also has 0 or more namespaces (analogous to tables in other databases), and each node serves every namespace for the shards it owns. In other words, if the cluster topology states that node A owns shards 1, 2, and 3 then node A will own shards 1, 2, 3 for all configured namespaces in the cluster.

M3DB stores its placement (mapping of which NODES are responsible for which shards) in [etcd](https://coreos.com/etcd/). There are three possible states that each node/shard pair can be in:
M3DB stores its placement (mapping of which NODES are responsible for which shards) in [etcd](https://etcd.io/). There are three possible states that each node/shard pair can be in:

1. `Initializing`
2. `Available`
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/storage/block/types.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2016 Uber Technologies, Inc.
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -79,6 +79,7 @@ type FetchBlocksMetadataOptions struct {
IncludeSizes bool
IncludeChecksums bool
IncludeLastRead bool
OnlyDisk bool
}

// FetchBlockMetadataResult captures the block start time, the block size, and any errors encountered
Expand Down
56 changes: 20 additions & 36 deletions src/dbnode/storage/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,33 +133,38 @@ func (m *cleanupManager) Cleanup(t time.Time) error {
m.Unlock()
}()

namespaces, err := m.database.GetOwnedNamespaces()
if err != nil {
return err
}

multiErr := xerrors.NewMultiError()
if err := m.cleanupDataFiles(t); err != nil {
if err := m.cleanupDataFiles(t, namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when cleaning up data files for %v: %v", t, err))
}

if err := m.cleanupExpiredIndexFiles(t); err != nil {
if err := m.cleanupExpiredIndexFiles(t, namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when cleaning up index files for %v: %v", t, err))
}

if err := m.deleteInactiveDataFiles(); err != nil {
if err := m.deleteInactiveDataFiles(namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when deleting inactive data files for %v: %v", t, err))
}

if err := m.deleteInactiveDataSnapshotFiles(); err != nil {
if err := m.deleteInactiveDataSnapshotFiles(namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when deleting inactive snapshot files for %v: %v", t, err))
}

if err := m.deleteInactiveNamespaceFiles(); err != nil {
if err := m.deleteInactiveNamespaceFiles(namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when deleting inactive namespace files for %v: %v", t, err))
}

if err := m.cleanupSnapshotsAndCommitlogs(); err != nil {
if err := m.cleanupSnapshotsAndCommitlogs(namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when cleaning up snapshot and commitlog files: %v", err))
}
Expand All @@ -179,14 +184,10 @@ func (m *cleanupManager) Report() {
}
}

func (m *cleanupManager) deleteInactiveNamespaceFiles() error {
func (m *cleanupManager) deleteInactiveNamespaceFiles(namespaces []databaseNamespace) error {
var namespaceDirNames []string
filePathPrefix := m.database.Options().CommitLogOptions().FilesystemOptions().FilePathPrefix()
dataDirPath := fs.DataDirPath(filePathPrefix)
namespaces, err := m.database.GetOwnedNamespaces()
if err != nil {
return err
}

for _, n := range namespaces {
namespaceDirNames = append(namespaceDirNames, n.ID().String())
Expand All @@ -197,23 +198,19 @@ func (m *cleanupManager) deleteInactiveNamespaceFiles() error {

// deleteInactiveDataFiles will delete data files for shards that the node no longer owns
// which can occur in the case of topology changes
func (m *cleanupManager) deleteInactiveDataFiles() error {
return m.deleteInactiveDataFileSetFiles(fs.NamespaceDataDirPath)
func (m *cleanupManager) deleteInactiveDataFiles(namespaces []databaseNamespace) error {
return m.deleteInactiveDataFileSetFiles(fs.NamespaceDataDirPath, namespaces)
}

// deleteInactiveDataSnapshotFiles will delete snapshot files for shards that the node no longer owns
// which can occur in the case of topology changes
func (m *cleanupManager) deleteInactiveDataSnapshotFiles() error {
return m.deleteInactiveDataFileSetFiles(fs.NamespaceSnapshotsDirPath)
func (m *cleanupManager) deleteInactiveDataSnapshotFiles(namespaces []databaseNamespace) error {
return m.deleteInactiveDataFileSetFiles(fs.NamespaceSnapshotsDirPath, namespaces)
}

func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn func(string, ident.ID) string) error {
func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn func(string, ident.ID) string, namespaces []databaseNamespace) error {
multiErr := xerrors.NewMultiError()
filePathPrefix := m.database.Options().CommitLogOptions().FilesystemOptions().FilePathPrefix()
namespaces, err := m.database.GetOwnedNamespaces()
if err != nil {
return err
}
for _, n := range namespaces {
var activeShards []string
namespaceDirPath := filesetFilesDirPathFn(filePathPrefix, n.ID())
Expand All @@ -227,12 +224,8 @@ func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn fu
return multiErr.FinalError()
}

func (m *cleanupManager) cleanupDataFiles(t time.Time) error {
func (m *cleanupManager) cleanupDataFiles(t time.Time, namespaces []databaseNamespace) error {
multiErr := xerrors.NewMultiError()
namespaces, err := m.database.GetOwnedNamespaces()
if err != nil {
return err
}
for _, n := range namespaces {
if !n.Options().CleanupEnabled() {
continue
Expand All @@ -245,11 +238,7 @@ func (m *cleanupManager) cleanupDataFiles(t time.Time) error {
return multiErr.FinalError()
}

func (m *cleanupManager) cleanupExpiredIndexFiles(t time.Time) error {
namespaces, err := m.database.GetOwnedNamespaces()
if err != nil {
return err
}
func (m *cleanupManager) cleanupExpiredIndexFiles(t time.Time, namespaces []databaseNamespace) error {
multiErr := xerrors.NewMultiError()
for _, n := range namespaces {
if !n.Options().CleanupEnabled() || !n.Options().IndexOptions().Enabled() {
Expand Down Expand Up @@ -317,17 +306,12 @@ func (m *cleanupManager) cleanupCompactedNamespaceDataFiles(shards []databaseSha
// 9. Delete all corrupt commitlog files (ignoring any commitlog files being actively written to.)
//
// This process is also modeled formally in TLA+ in the file `SnapshotsSpec.tla`.
func (m *cleanupManager) cleanupSnapshotsAndCommitlogs() (finalErr error) {
func (m *cleanupManager) cleanupSnapshotsAndCommitlogs(namespaces []databaseNamespace) (finalErr error) {
logger := m.opts.InstrumentOptions().Logger().With(
zap.String("comment",
"partial/corrupt files are expected as result of a restart (this is ok)"),
)

namespaces, err := m.database.GetOwnedNamespaces()
if err != nil {
return err
}

fsOpts := m.opts.CommitLogOptions().FilesystemOptions()
snapshotMetadatas, snapshotMetadataErrorsWithPaths, err := m.snapshotMetadataFilesFn(fsOpts)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,13 +537,19 @@ func (d *db) terminateWithLock() error {
}

func (d *db) Terminate() error {
// NB(bodu): Wait for fs processes to finish.
d.mediator.WaitForFileSystemProcesses()

d.Lock()
defer d.Unlock()

return d.terminateWithLock()
}

func (d *db) Close() error {
// NB(bodu): Wait for fs processes to finish.
d.mediator.WaitForFileSystemProcesses()

d.Lock()
defer d.Unlock()

Expand Down
15 changes: 13 additions & 2 deletions src/dbnode/storage/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/x/instrument"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -153,11 +154,21 @@ func (m *fileSystemManager) Run(

// NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks.
flushFn := func() {
// NB(r): Use invariant here since flush errors were introduced
// and not caught in CI or integration tests.
// When an invariant occurs in CI tests it panics so as to fail
// the build.
if err := m.Cleanup(t); err != nil {
m.log.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err))
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err))
})
}
if err := m.Flush(t); err != nil {
m.log.Error("error when flushing data", zap.Time("time", t), zap.Error(err))
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("error when flushing data", zap.Time("time", t), zap.Error(err))
})
}
m.Lock()
m.status = fileOpNotStarted
Expand Down
11 changes: 9 additions & 2 deletions src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2018 Uber Technologies, Inc.
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -903,7 +903,14 @@ func (i *nsIndex) flushBlockSegment(
first = false

var (
opts = block.FetchBlocksMetadataOptions{}
opts = block.FetchBlocksMetadataOptions{
// NB(bodu): There is a lag between when data gets flushed
// to disk and when it gets removed from memory during the next
// Tick. In this case, the same series can exist both on disk
// and in memory at the same time resulting in dupe series IDs.
// Only read data from disk when flushing index segments.
OnlyDisk: true,
}
limit = defaultFlushReadDataBlocksBatchSize
results block.FetchBlocksMetadataResults
err error
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestNamespaceIndexFlushSuccess(t *testing.T) {
results.EXPECT().Results().Return(nil)
results.EXPECT().Close()
mockShard.EXPECT().FetchBlocksMetadataV2(gomock.Any(), blockTime, blockTime.Add(test.indexBlockSize),
gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{}).Return(results, nil, nil)
gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{OnlyDisk: true}).Return(results, nil, nil)

mockBlock.EXPECT().AddResults(gomock.Any()).Return(nil)
mockBlock.EXPECT().EvictMutableSegments().Return(nil)
Expand Down Expand Up @@ -260,13 +260,13 @@ func TestNamespaceIndexFlushSuccessMultipleShards(t *testing.T) {
results1.EXPECT().Results().Return(nil)
results1.EXPECT().Close()
mockShard1.EXPECT().FetchBlocksMetadataV2(gomock.Any(), blockTime, blockTime.Add(test.indexBlockSize),
gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{}).Return(results1, nil, nil)
gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{OnlyDisk: true}).Return(results1, nil, nil)

results2 := block.NewMockFetchBlocksMetadataResults(ctrl)
results2.EXPECT().Results().Return(nil)
results2.EXPECT().Close()
mockShard2.EXPECT().FetchBlocksMetadataV2(gomock.Any(), blockTime, blockTime.Add(test.indexBlockSize),
gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{}).Return(results2, nil, nil)
gomock.Any(), gomock.Any(), block.FetchBlocksMetadataOptions{OnlyDisk: true}).Return(results2, nil, nil)

mockBlock.EXPECT().AddResults(gomock.Any()).Return(nil)
mockBlock.EXPECT().EvictMutableSegments().Return(nil)
Expand Down
Loading

0 comments on commit 8584d9a

Please sign in to comment.