Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Filter out corrupt commit log files that were not present before the node started in the commit log bootstrapper #1581

Merged
merged 4 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

# 0.8.5 (TBD)

## Bug Fixes

- **M3DB**: Prevent active commit log from appearing like a corrupt file during bootstrap.

# 0.8.4 (2019-04-20)

## Performance
Expand Down
76 changes: 69 additions & 7 deletions src/dbnode/persist/fs/commitlog/commit_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ func TestCommitLogReaderIsNotReusable(t *testing.T) {
require.Equal(t, errCommitLogReaderIsNotReusable, err)
}

func TestCommitLogIteratorUsesPredicateFilter(t *testing.T) {
func TestCommitLogIteratorUsesPredicateFilterForNonCorruptFiles(t *testing.T) {
clock := mclock.NewMock()
opts, scope := newTestOptions(t, overrides{
clock: clock,
Expand Down Expand Up @@ -515,22 +515,23 @@ func TestCommitLogIteratorUsesPredicateFilter(t *testing.T) {
flushUntilDone(commitLog, wg)
}

// Close the commit log and consequently flush
// Close the commit log and consequently flush.
require.NoError(t, commitLog.Close())

// Make sure multiple commitlog files were generated
// Make sure multiple commitlog files were generated.
fsopts := opts.FilesystemOptions()
files, err := fs.SortedCommitLogFiles(fs.CommitLogsDirPath(fsopts.FilePathPrefix()))
require.NoError(t, err)
require.Equal(t, 4, len(files))

// This predicate should eliminate the first commitlog file
commitLogPredicate := func(f persist.CommitLogFile) bool {
return f.Index > 0
// This predicate should eliminate the first commitlog file.
commitLogPredicate := func(f FileFilterInfo) bool {
require.False(t, f.IsCorrupt)
return f.File.Index > 0
}

// Assert that the commitlog iterator honors the predicate and only uses
// 2 of the 3 files
// 2 of the 3 files.
iterOpts := IteratorOpts{
CommitLogOptions: opts,
FileFilterPredicate: commitLogPredicate,
Expand All @@ -544,6 +545,67 @@ func TestCommitLogIteratorUsesPredicateFilter(t *testing.T) {
require.Equal(t, 3, len(iterStruct.files))
}

func TestCommitLogIteratorUsesPredicateFilterForCorruptFiles(t *testing.T) {
clock := mclock.NewMock()
opts, _ := newTestOptions(t, overrides{
clock: clock,
strategy: StrategyWriteWait,
})
defer cleanup(t, opts)

commitLog := newTestCommitLog(t, opts)
// Close the commit log and consequently flush.
require.NoError(t, commitLog.Close())

// Make sure a valid commitlog was created.
fsopts := opts.FilesystemOptions()
files, err := fs.SortedCommitLogFiles(fs.CommitLogsDirPath(fsopts.FilePathPrefix()))
require.NoError(t, err)
require.Equal(t, 1, len(files))

// Write out a corrupt commitlog file.
nextCommitlogFilePath, _, err := NextFile(opts)
require.NoError(t, err)
err = ioutil.WriteFile(
nextCommitlogFilePath, []byte("not-a-valid-commitlog-file"), os.FileMode(0666))
require.NoError(t, err)

// Make sure the corrupt file is visibile.
files, err = fs.SortedCommitLogFiles(fs.CommitLogsDirPath(fsopts.FilePathPrefix()))
require.NoError(t, err)
require.Equal(t, 2, len(files))

// Assert that the corrupt file is returned from the iterator.
iterOpts := IteratorOpts{
CommitLogOptions: opts,
FileFilterPredicate: ReadAllPredicate(),
SeriesFilterPredicate: ReadAllSeriesPredicate(),
}
iter, corruptFiles, err := NewIterator(iterOpts)
require.NoError(t, err)
require.Equal(t, 1, len(corruptFiles))

iterStruct := iter.(*iterator)
require.Equal(t, 1, len(iterStruct.files))

// Assert that the iterator ignores the corrupt file given an appropriate predicate.
ignoreCorruptPredicate := func(f FileFilterInfo) bool {
return !f.IsCorrupt
}

iterOpts = IteratorOpts{
CommitLogOptions: opts,
FileFilterPredicate: ignoreCorruptPredicate,
SeriesFilterPredicate: ReadAllSeriesPredicate(),
}
iter, corruptFiles, err = NewIterator(iterOpts)
require.NoError(t, err)
require.Equal(t, 0, len(corruptFiles))

iterStruct = iter.(*iterator)
require.Equal(t, 1, len(iterStruct.files))
}

func TestCommitLogWriteBehind(t *testing.T) {
opts, scope := newTestOptions(t, overrides{
strategy: StrategyWriteBehind,
Expand Down
26 changes: 21 additions & 5 deletions src/dbnode/persist/fs/commitlog/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type iteratorRead struct {
// ReadAllPredicate can be passed as the ReadCommitLogPredicate for callers
// that want a convenient way to read all the commitlogs
func ReadAllPredicate() FileFilterPredicate {
return func(_ persist.CommitLogFile) bool { return true }
return func(f FileFilterInfo) bool { return true }
}

// NewIterator creates a new commit log iterator
Expand All @@ -77,7 +77,8 @@ func NewIterator(iterOpts IteratorOpts) (iter Iterator, corruptFiles []ErrorWith
if err != nil {
return nil, nil, err
}
filteredFiles := filterFiles(opts, files, iterOpts.FileFilterPredicate)
filteredFiles := filterFiles(files, iterOpts.FileFilterPredicate)
filteredCorruptFiles := filterCorruptFiles(corruptFiles, iterOpts.FileFilterPredicate)

scope := iops.MetricsScope()
return &iterator{
Expand All @@ -89,7 +90,7 @@ func NewIterator(iterOpts IteratorOpts) (iter Iterator, corruptFiles []ErrorWith
log: iops.Logger(),
files: filteredFiles,
seriesPred: iterOpts.SeriesFilterPredicate,
}, corruptFiles, nil
}, filteredCorruptFiles, nil
}

func (i *iterator) Next() bool {
Expand Down Expand Up @@ -180,16 +181,31 @@ func (i *iterator) nextReader() bool {
return true
}

func filterFiles(opts Options, files []persist.CommitLogFile, predicate FileFilterPredicate) []persist.CommitLogFile {
func filterFiles(files []persist.CommitLogFile, predicate FileFilterPredicate) []persist.CommitLogFile {
filtered := make([]persist.CommitLogFile, 0, len(files))
for _, f := range files {
if predicate(f) {
info := FileFilterInfo{File: f}
if predicate(info) {
filtered = append(filtered, f)
}
}
return filtered
}

func filterCorruptFiles(corruptFiles []ErrorWithPath, predicate FileFilterPredicate) []ErrorWithPath {
filtered := make([]ErrorWithPath, 0, len(corruptFiles))
for _, errWithPath := range corruptFiles {
info := FileFilterInfo{
Err: errWithPath,
IsCorrupt: true,
}
if predicate(info) {
filtered = append(filtered, errWithPath)
}
}
return filtered
}

func (i *iterator) closeAndResetReader() error {
if i.reader == nil {
return nil
Expand Down
12 changes: 11 additions & 1 deletion src/dbnode/persist/fs/commitlog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,19 @@ type Options interface {
IdentifierPool() ident.Pool
}

// FileFilterInfo contains information about a commitog file that can be used to
// determine whether the iterator should filter it out or not.
type FileFilterInfo struct {
// If isCorrupt is true then File will contain a valid CommitLogFile, otherwise
// ErrorWithPath will contain an error and the path of the corrupt file.
File persist.CommitLogFile
Err ErrorWithPath
IsCorrupt bool
}

// FileFilterPredicate is a predicate that allows the caller to determine
// which commitlogs the iterator should read from.
type FileFilterPredicate func(f persist.CommitLogFile) bool
type FileFilterPredicate func(f FileFilterInfo) bool

// SeriesFilterPredicate is a predicate that determines whether datapoints for a given series
// should be returned from the Commit log reader. The predicate is pushed down to the
Expand Down
23 changes: 19 additions & 4 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,13 +663,28 @@ func (s *commitLogSource) newReadCommitlogPredAndMostRecentSnapshotByBlockShard(
}
}

return func(f persist.CommitLogFile) bool {
// TODO(rartoul): Refactor this to take the SnapshotMetadata files into account to reduce
// the number of commitlog files that need to be read.
return func(f commitlog.FileFilterInfo) bool {
// Read all the commitlog files that were available on disk before the node started
// accepting writes.
// TODO(rartoul): Refactor this to take the SnapshotMetadata files into account to reduce
// the number of commitlog files that need to be read.
commitlogFilesPresentBeforeStart := s.inspection.CommitLogFilesSet()
_, ok := commitlogFilesPresentBeforeStart[f.FilePath]
if f.IsCorrupt {
// Corrupt files that existed on disk before the node started should be included so
// that the commitlog bootstrapper can detect them and determine if it will return
// unfulfilled or ignore them.
//
// Corrupt files that did not exist on disk before the node started should always be
// ignored since they have no impact on the bootstrapping process and likely only
// appear corrupt because they were just created recently by the current node as
// its alreadying accepting writes at this point.
_, ok := commitlogFilesPresentBeforeStart[f.Err.Path()]
return ok
}
// Only attempt to read commitlog files that were present on disk before the node started.
// If a commitlog file was not present when the node started then it was created once the
// node began accepting writes and the data is already in memory.
_, ok := commitlogFilesPresentBeforeStart[f.File.FilePath]
return ok
}, mostRecentCompleteSnapshotByBlockShard, nil
}
Expand Down