Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Fix index flush recovery when previous flush attempts have failed #1574

Merged
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

# UNRELEASED

## Bug Fixes

- **M3DB**: Fix index flush recovery when previous flush attempts have failed (#1574)

# 0.8.3 (2019-04-12)

## Performance
Expand Down
58 changes: 45 additions & 13 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,27 @@ var (

type fileOpener func(filePath string) (*os.File, error)

// LazyEvalBool is a boolean is lazily evaluated.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boolean that is

type LazyEvalBool uint8

const (
// EvalNone indicated the boolean has not been evaluated.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indicates

EvalNone LazyEvalBool = iota
// EvalTrue indicates the boolean has been evaluated to true.
EvalTrue
// EvalFalse indicates the boolean has been evaluated to false.
EvalFalse
)

// FileSetFile represents a set of FileSet files for a given block start
type FileSetFile struct {
ID FileSetFileIdentifier
AbsoluteFilepaths []string

CachedSnapshotTime time.Time
CachedSnapshotID uuid.UUID
filePathPrefix string
CachedSnapshotTime time.Time
CachedSnapshotID uuid.UUID
CachedHasCompleteCheckpointFile LazyEvalBool
filePathPrefix string
}

// SnapshotTimeAndID returns the snapshot time and id for the given FileSetFile.
Expand All @@ -89,9 +102,11 @@ func (f *FileSetFile) SnapshotTimeAndID() (time.Time, uuid.UUID, error) {
if f.IsZero() {
return time.Time{}, nil, errSnapshotTimeAndIDZero
}
if len(f.AbsoluteFilepaths) > 0 && !strings.Contains(f.AbsoluteFilepaths[0], snapshotDirName) {
if len(f.AbsoluteFilepaths) > 0 &&
!strings.Contains(f.AbsoluteFilepaths[0], snapshotDirName) {
return time.Time{}, nil, fmt.Errorf(
"tried to determine snapshot time and id of non-snapshot: %s", f.AbsoluteFilepaths[0])
"tried to determine snapshot time and id of non-snapshot: %s",
f.AbsoluteFilepaths[0])
}

if !f.CachedSnapshotTime.IsZero() || f.CachedSnapshotID != nil {
Expand All @@ -115,16 +130,33 @@ func (f FileSetFile) IsZero() bool {
return len(f.AbsoluteFilepaths) == 0
}

// HasCheckpointFile returns a bool indicating whether the given set of
// HasCompleteCheckpointFile returns a bool indicating whether the given set of
// fileset files has a checkpoint file.
func (f FileSetFile) HasCheckpointFile() bool {
func (f *FileSetFile) HasCompleteCheckpointFile() bool {
switch f.CachedHasCompleteCheckpointFile {
case EvalNone:
f.CachedHasCompleteCheckpointFile = f.evalHasCompleteCheckpointFile()
return f.HasCompleteCheckpointFile()
case EvalTrue:
return true
}
return false
}

func (f *FileSetFile) evalHasCompleteCheckpointFile() LazyEvalBool {
for _, fileName := range f.AbsoluteFilepaths {
if strings.Contains(fileName, checkpointFileSuffix) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably don't need this line anymore since CompleteCheckpointFileExists does this already.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True but that will cause an "Invariant error" to be allocated which I'd like to avoid in the case that we're scanning a lot of files.

return true
exists, err := CompleteCheckpointFileExists(fileName)
if err != nil {
continue
}
if exists {
return EvalTrue
}
}
}

return false
return EvalFalse
}

// FileSetFilesSlice is a slice of FileSetFile
Expand Down Expand Up @@ -161,7 +193,7 @@ func (f FileSetFilesSlice) LatestVolumeForBlock(blockStart time.Time) (FileSetFi
break
}

if curr.HasCheckpointFile() && curr.ID.VolumeIndex >= bestSoFar.ID.VolumeIndex {
if curr.HasCompleteCheckpointFile() && curr.ID.VolumeIndex >= bestSoFar.ID.VolumeIndex {
bestSoFar = curr
bestSoFarExists = true
}
Expand Down Expand Up @@ -812,7 +844,7 @@ func FileSetAt(filePathPrefix string, namespace ident.ID, shard uint32, blockSta
)
}

if !fileset.HasCheckpointFile() {
if !fileset.HasCompleteCheckpointFile() {
continue
}

Expand Down Expand Up @@ -841,7 +873,7 @@ func IndexFileSetsAt(filePathPrefix string, namespace ident.ID, blockStart time.
matches.sortByTimeAscending()
for _, fileset := range matches {
if fileset.ID.BlockStart.Equal(blockStart) {
if !fileset.HasCheckpointFile() {
if !fileset.HasCompleteCheckpointFile() {
continue
}
filesets = append(filesets, fileset)
Expand Down Expand Up @@ -1198,7 +1230,7 @@ func SnapshotFileSetExistsAt(prefix string, namespace ident.ID, shard uint32, bl
return false, nil
}

return latest.HasCheckpointFile(), nil
return latest.HasCompleteCheckpointFile(), nil
}

// NextSnapshotMetadataFileIndex returns the next snapshot metadata file index.
Expand Down
76 changes: 63 additions & 13 deletions src/dbnode/persist/fs/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,11 @@ func TestNextIndexFileSetVolumeIndex(t *testing.T) {
curr = index

p := filesetPathFromTimeAndIndex(dataDir, blockStart, index, checkpointFileSuffix)
err = ioutil.WriteFile(p, []byte("bar"), defaultNewFileMode)

digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum([]byte("bar")))

err = ioutil.WriteFile(p, digestBuf, defaultNewFileMode)
require.NoError(t, err)
}
}
Expand Down Expand Up @@ -780,14 +784,38 @@ func TestMultipleForBlockStart(t *testing.T) {
require.Equal(t, numSnapshotsPerBlock-1, latestSnapshot.ID.VolumeIndex)
}

func TestSnapshotFileHasCheckPointFile(t *testing.T) {
require.Equal(t, true, FileSetFile{
AbsoluteFilepaths: []string{"123-checkpoint-0.db"},
}.HasCheckpointFile())
func TestSnapshotFileHasCompleteCheckpointFile(t *testing.T) {
dir := createTempDir(t)
defer os.RemoveAll(dir)

checkpointFilePath := path.Join(dir, "123-checkpoint-0.db")

// Test a valid complete checkpoint file
digestBuffer := digest.NewBuffer()
digestBuffer.WriteDigest(digest.Checksum([]byte{1, 2, 3}))
err := ioutil.WriteFile(checkpointFilePath, digestBuffer, defaultNewFileMode)
require.NoError(t, err)

// Check validates a valid checkpoint file
f := FileSetFile{
AbsoluteFilepaths: []string{checkpointFilePath},
}
require.Equal(t, true, f.HasCompleteCheckpointFile())

require.Equal(t, false, FileSetFile{
AbsoluteFilepaths: []string{"123-index-0.db"},
}.HasCheckpointFile())
// Check fails when checkpoint exists but not valid
err = ioutil.WriteFile(checkpointFilePath, []byte{42}, defaultNewFileMode)
require.NoError(t, err)
f = FileSetFile{
AbsoluteFilepaths: []string{checkpointFilePath},
}
require.Equal(t, false, f.HasCompleteCheckpointFile())

// Check ignores index file path
indexFilePath := path.Join(dir, "123-index-0.db")
f = FileSetFile{
AbsoluteFilepaths: []string{indexFilePath},
}
require.Equal(t, false, f.HasCompleteCheckpointFile())
}

func TestSnapshotDirPath(t *testing.T) {
Expand Down Expand Up @@ -1107,7 +1135,16 @@ func createDataFiles(t *testing.T,
} else {
infoFilePath = filesetPathFromTime(shardDir, ts, fileSuffix)
}
createFile(t, infoFilePath, nil)
var contents []byte
if fileSuffix == checkpointFileSuffix {
// If writing a checkpoint file then write out a checksum of contents
// so that when code that validates the checkpoint file runs it returns
// successfully
digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum(contents))
contents = []byte(digestBuf)
}
createFile(t, infoFilePath, contents)
}
return dir
}
Expand All @@ -1121,13 +1158,26 @@ type indexFileSetFileIdentifiers []indexFileSetFileIdentifier

func (indexFilesets indexFileSetFileIdentifiers) create(t *testing.T, prefixDir string) {
for _, fileset := range indexFilesets {
fileSetFileIdentifiers{fileset.FileSetFileIdentifier}.create(t, prefixDir, persist.FileSetFlushType, fileset.Suffix)
idents := fileSetFileIdentifiers{fileset.FileSetFileIdentifier}
idents.create(t, prefixDir, persist.FileSetFlushType, fileset.Suffix)
}
}

type fileSetFileIdentifiers []FileSetFileIdentifier

func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fileSetType persist.FileSetType, fileSuffixes ...string) {
writeFile := func(t *testing.T, path string, contents []byte) {
if strings.Contains(path, checkpointFileSuffix) {
// If writing a checkpoint file then write out a checksum of contents
// so that when code that validates the checkpoint file runs it returns
// successfully
digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum(contents))
contents = []byte(digestBuf)
}
createFile(t, path, contents)
}

for _, suffix := range fileSuffixes {
for _, fileset := range filesets {
switch fileset.FileSetContentType {
Expand All @@ -1141,10 +1191,10 @@ func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fi
switch fileSetType {
case persist.FileSetFlushType:
path = filesetPathFromTime(shardDir, blockStart, suffix)
createFile(t, path, nil)
writeFile(t, path, nil)
case persist.FileSetSnapshotType:
path = filesetPathFromTimeAndIndex(shardDir, blockStart, 0, fileSuffix)
createFile(t, path, nil)
writeFile(t, path, nil)
default:
panic("unknown FileSetType")
}
Expand All @@ -1158,7 +1208,7 @@ func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fi
switch fileSetType {
case persist.FileSetFlushType:
path = filesetPathFromTimeAndIndex(indexDir, blockStart, volumeIndex, suffix)
createFile(t, path, nil)
writeFile(t, path, nil)
case persist.FileSetSnapshotType:
fallthrough
default:
Expand Down
19 changes: 10 additions & 9 deletions src/dbnode/persist/fs/index_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,18 @@ func (w *indexWriter) Open(opts IndexWriterOpenOptions) error {
if err := os.MkdirAll(w.namespaceDir, w.newDirectoryMode); err != nil {
return err
}
w.checkpointFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, checkpointFileSuffix)
w.infoFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, infoFileSuffix)
w.digestFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, digestFileSuffix)
w.checkpointFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, checkpointFileSuffix)

exists, err := CompleteCheckpointFileExists(w.checkpointFilePath)
if err != nil {
return err
}
if exists {
return fmt.Errorf("checkpoint already exists for volume: %s",
w.checkpointFilePath)
}

return nil
}
Expand Down Expand Up @@ -175,14 +184,6 @@ func (w *indexWriter) WriteSegmentFileSet(
err := fmt.Errorf("unknown fileset type: %s", w.fileSetType)
return w.markSegmentWriteError(segType, segFileType, err)
}
exists, err := FileExists(filePath)
if err != nil {
return err
}
if exists {
err := fmt.Errorf("segment file type already exists at %s", filePath)
return w.markSegmentWriteError(segType, segFileType, err)
}

fd, err := OpenWritable(filePath, w.newFileMode)
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions src/dbnode/persist/fs/persist_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package fs
import (
"errors"
"io"
"io/ioutil"
"os"
"testing"
"time"
Expand All @@ -33,9 +34,9 @@ import (
"github.com/m3db/m3/src/m3ninx/index/segment"
m3ninxfs "github.com/m3db/m3/src/m3ninx/index/segment/fst"
m3ninxpersist "github.com/m3db/m3/src/m3ninx/persist"
m3test "github.com/m3db/m3/src/x/test"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/ident"
m3test "github.com/m3db/m3/src/x/test"
xtest "github.com/m3db/m3/src/x/test"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -327,9 +328,12 @@ func TestPersistenceManagerPrepareIndexFileExists(t *testing.T) {
blockStart := time.Unix(1000, 0)
indexDir := createIndexDataDir(t, pm.filePathPrefix, testNs1ID)
checkpointFilePath := filesetPathFromTimeAndIndex(indexDir, blockStart, 0, checkpointFileSuffix)
f, err := os.Create(checkpointFilePath)

digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum([]byte("foo")))

err := ioutil.WriteFile(checkpointFilePath, digestBuf, defaultNewFileMode)
require.NoError(t, err)
f.Close()

flush, err := pm.StartIndexPersist()
require.NoError(t, err)
Expand Down
8 changes: 6 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ import (
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"

"go.uber.org/zap"
"github.com/uber-go/tally"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -402,9 +402,13 @@ func (s *commitLogSource) mostRecentCompleteSnapshotByBlockShard(
// CachedSnapshotTime field so that we can rely upon it from here on out.
_, _, err := mostRecentSnapshotVolume.SnapshotTimeAndID()
if err != nil {
namespace := mostRecentSnapshot.ID.Namespace
if namespace == nil {
namespace = ident.StringID("<nil>")
}
s.log.
With(
zap.Stringer("namespace", mostRecentSnapshot.ID.Namespace),
zap.Stringer("namespace", namespace),
zap.Time("blockStart", mostRecentSnapshot.ID.BlockStart),
zap.Uint32("shard", mostRecentSnapshot.ID.Shard),
zap.Int("index", mostRecentSnapshot.ID.VolumeIndex),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,9 @@ func TestItMergesSnapshotsAndCommitLogs(t *testing.T) {
VolumeIndex: 0,
},
// Make sure path passes the "is snapshot" check in SnapshotTimeAndID method.
AbsoluteFilepaths: []string{"snapshots/checkpoint"},
CachedSnapshotTime: start.Add(time.Minute),
AbsoluteFilepaths: []string{"snapshots/checkpoint"},
CachedHasCompleteCheckpointFile: fs.EvalTrue,
CachedSnapshotTime: start.Add(time.Minute),
},
}, nil
}
Expand Down
Loading