Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Fix index flush recovery when previous flush attempts have failed #1574

Merged
20 changes: 13 additions & 7 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,18 @@ func (f FileSetFile) IsZero() bool {
return len(f.AbsoluteFilepaths) == 0
}

// HasCheckpointFile returns a bool indicating whether the given set of
// HasCompleteCheckpointFile returns a bool indicating whether the given set of
// fileset files has a checkpoint file.
func (f FileSetFile) HasCheckpointFile() bool {
func (f FileSetFile) HasCompleteCheckpointFile() bool {
for _, fileName := range f.AbsoluteFilepaths {
if strings.Contains(fileName, checkpointFileSuffix) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably don't need this line anymore since CompleteCheckpointFileExists does this already.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True but that will cause an "Invariant error" to be allocated which I'd like to avoid in the case that we're scanning a lot of files.

return true
exists, err := CompleteCheckpointFileExists(fileName)
if err != nil {
continue
}
if exists {
return true
}
}
}

Expand Down Expand Up @@ -161,7 +167,7 @@ func (f FileSetFilesSlice) LatestVolumeForBlock(blockStart time.Time) (FileSetFi
break
}

if curr.HasCheckpointFile() && curr.ID.VolumeIndex >= bestSoFar.ID.VolumeIndex {
if curr.HasCompleteCheckpointFile() && curr.ID.VolumeIndex >= bestSoFar.ID.VolumeIndex {
bestSoFar = curr
bestSoFarExists = true
}
Expand Down Expand Up @@ -812,7 +818,7 @@ func FileSetAt(filePathPrefix string, namespace ident.ID, shard uint32, blockSta
)
}

if !fileset.HasCheckpointFile() {
if !fileset.HasCompleteCheckpointFile() {
continue
}

Expand Down Expand Up @@ -841,7 +847,7 @@ func IndexFileSetsAt(filePathPrefix string, namespace ident.ID, blockStart time.
matches.sortByTimeAscending()
for _, fileset := range matches {
if fileset.ID.BlockStart.Equal(blockStart) {
if !fileset.HasCheckpointFile() {
if !fileset.HasCompleteCheckpointFile() {
continue
}
filesets = append(filesets, fileset)
Expand Down Expand Up @@ -1198,7 +1204,7 @@ func SnapshotFileSetExistsAt(prefix string, namespace ident.ID, shard uint32, bl
return false, nil
}

return latest.HasCheckpointFile(), nil
return latest.HasCompleteCheckpointFile(), nil
}

// NextSnapshotMetadataFileIndex returns the next snapshot metadata file index.
Expand Down
69 changes: 58 additions & 11 deletions src/dbnode/persist/fs/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,11 @@ func TestNextIndexFileSetVolumeIndex(t *testing.T) {
curr = index

p := filesetPathFromTimeAndIndex(dataDir, blockStart, index, checkpointFileSuffix)
err = ioutil.WriteFile(p, []byte("bar"), defaultNewFileMode)

digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum([]byte("bar")))

err = ioutil.WriteFile(p, digestBuf, defaultNewFileMode)
require.NoError(t, err)
}
}
Expand Down Expand Up @@ -780,14 +784,35 @@ func TestMultipleForBlockStart(t *testing.T) {
require.Equal(t, numSnapshotsPerBlock-1, latestSnapshot.ID.VolumeIndex)
}

func TestSnapshotFileHasCheckPointFile(t *testing.T) {
func TestSnapshotFileHasCompleteCheckpointFile(t *testing.T) {
dir := createTempDir(t)
defer os.RemoveAll(dir)

checkpointFilePath := path.Join(dir, "123-checkpoint-0.db")

// Test a valid complete checkpoint file
digestBuffer := digest.NewBuffer()
digestBuffer.WriteDigest(digest.Checksum([]byte{1, 2, 3}))
err := ioutil.WriteFile(checkpointFilePath, digestBuffer, defaultNewFileMode)
require.NoError(t, err)

// Check validates a valid checkpoint file
require.Equal(t, true, FileSetFile{
AbsoluteFilepaths: []string{"123-checkpoint-0.db"},
}.HasCheckpointFile())
AbsoluteFilepaths: []string{checkpointFilePath},
}.HasCompleteCheckpointFile())

// Check fails when checkpoint exists but not valid
err = ioutil.WriteFile(checkpointFilePath, []byte{42}, defaultNewFileMode)
require.NoError(t, err)
require.Equal(t, false, FileSetFile{
AbsoluteFilepaths: []string{checkpointFilePath},
}.HasCompleteCheckpointFile())

// Check ignores index file path
indexFilePath := path.Join(dir, "123-index-0.db")
require.Equal(t, false, FileSetFile{
AbsoluteFilepaths: []string{"123-index-0.db"},
}.HasCheckpointFile())
AbsoluteFilepaths: []string{indexFilePath},
}.HasCompleteCheckpointFile())
}

func TestSnapshotDirPath(t *testing.T) {
Expand Down Expand Up @@ -1107,7 +1132,16 @@ func createDataFiles(t *testing.T,
} else {
infoFilePath = filesetPathFromTime(shardDir, ts, fileSuffix)
}
createFile(t, infoFilePath, nil)
var contents []byte
if fileSuffix == checkpointFileSuffix {
// If writing a checkpoint file then write out a checksum of contents
// so that when code that validates the checkpoint file runs it returns
// successfully
digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum(contents))
contents = []byte(digestBuf)
}
createFile(t, infoFilePath, contents)
}
return dir
}
Expand All @@ -1121,13 +1155,26 @@ type indexFileSetFileIdentifiers []indexFileSetFileIdentifier

func (indexFilesets indexFileSetFileIdentifiers) create(t *testing.T, prefixDir string) {
for _, fileset := range indexFilesets {
fileSetFileIdentifiers{fileset.FileSetFileIdentifier}.create(t, prefixDir, persist.FileSetFlushType, fileset.Suffix)
idents := fileSetFileIdentifiers{fileset.FileSetFileIdentifier}
idents.create(t, prefixDir, persist.FileSetFlushType, fileset.Suffix)
}
}

type fileSetFileIdentifiers []FileSetFileIdentifier

func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fileSetType persist.FileSetType, fileSuffixes ...string) {
writeFile := func(t *testing.T, path string, contents []byte) {
if strings.Contains(path, checkpointFileSuffix) {
// If writing a checkpoint file then write out a checksum of contents
// so that when code that validates the checkpoint file runs it returns
// successfully
digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum(contents))
contents = []byte(digestBuf)
}
createFile(t, path, contents)
}

for _, suffix := range fileSuffixes {
for _, fileset := range filesets {
switch fileset.FileSetContentType {
Expand All @@ -1141,10 +1188,10 @@ func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fi
switch fileSetType {
case persist.FileSetFlushType:
path = filesetPathFromTime(shardDir, blockStart, suffix)
createFile(t, path, nil)
writeFile(t, path, nil)
case persist.FileSetSnapshotType:
path = filesetPathFromTimeAndIndex(shardDir, blockStart, 0, fileSuffix)
createFile(t, path, nil)
writeFile(t, path, nil)
default:
panic("unknown FileSetType")
}
Expand All @@ -1158,7 +1205,7 @@ func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fi
switch fileSetType {
case persist.FileSetFlushType:
path = filesetPathFromTimeAndIndex(indexDir, blockStart, volumeIndex, suffix)
createFile(t, path, nil)
writeFile(t, path, nil)
case persist.FileSetSnapshotType:
fallthrough
default:
Expand Down
19 changes: 10 additions & 9 deletions src/dbnode/persist/fs/index_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,18 @@ func (w *indexWriter) Open(opts IndexWriterOpenOptions) error {
if err := os.MkdirAll(w.namespaceDir, w.newDirectoryMode); err != nil {
return err
}
w.checkpointFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, checkpointFileSuffix)
w.infoFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, infoFileSuffix)
w.digestFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, digestFileSuffix)
w.checkpointFilePath = filesetPathFromTimeAndIndex(w.namespaceDir, blockStart, w.volumeIndex, checkpointFileSuffix)

exists, err := CompleteCheckpointFileExists(w.checkpointFilePath)
if err != nil {
return err
}
if exists {
return fmt.Errorf("checkpoint already exists for volume: %s",
w.checkpointFilePath)
}

return nil
}
Expand Down Expand Up @@ -175,14 +184,6 @@ func (w *indexWriter) WriteSegmentFileSet(
err := fmt.Errorf("unknown fileset type: %s", w.fileSetType)
return w.markSegmentWriteError(segType, segFileType, err)
}
exists, err := FileExists(filePath)
if err != nil {
return err
}
if exists {
err := fmt.Errorf("segment file type already exists at %s", filePath)
return w.markSegmentWriteError(segType, segFileType, err)
}

fd, err := OpenWritable(filePath, w.newFileMode)
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions src/dbnode/persist/fs/persist_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package fs
import (
"errors"
"io"
"io/ioutil"
"os"
"testing"
"time"
Expand All @@ -33,9 +34,9 @@ import (
"github.com/m3db/m3/src/m3ninx/index/segment"
m3ninxfs "github.com/m3db/m3/src/m3ninx/index/segment/fst"
m3ninxpersist "github.com/m3db/m3/src/m3ninx/persist"
m3test "github.com/m3db/m3/src/x/test"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/ident"
m3test "github.com/m3db/m3/src/x/test"
xtest "github.com/m3db/m3/src/x/test"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -327,9 +328,12 @@ func TestPersistenceManagerPrepareIndexFileExists(t *testing.T) {
blockStart := time.Unix(1000, 0)
indexDir := createIndexDataDir(t, pm.filePathPrefix, testNs1ID)
checkpointFilePath := filesetPathFromTimeAndIndex(indexDir, blockStart, 0, checkpointFileSuffix)
f, err := os.Create(checkpointFilePath)

digestBuf := digest.NewBuffer()
digestBuf.WriteDigest(digest.Checksum([]byte("foo")))

err := ioutil.WriteFile(checkpointFilePath, digestBuf, defaultNewFileMode)
require.NoError(t, err)
f.Close()

flush, err := pm.StartIndexPersist()
require.NoError(t, err)
Expand Down