Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify checkpoint files are completely written out when checking if they exist #1086

Merged
merged 11 commits into from
Oct 16, 2018
28 changes: 26 additions & 2 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
xclose "github.com/m3db/m3x/close"
xerrors "github.com/m3db/m3x/errors"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/instrument"
)

var timeZero time.Time
Expand Down Expand Up @@ -468,7 +469,7 @@ func forEachInfoFile(
digestsFilePath = filesetPathFromTimeAndIndex(dir, t, volume, digestFileSuffix)
infoFilePath = filesetPathFromTimeAndIndex(dir, t, volume, infoFileSuffix)
}
checkpointExists, err := FileExists(checkpointFilePath)
checkpointExists, err := CompleteCheckpointFileExists(checkpointFilePath)
if err != nil {
continue
}
Expand Down Expand Up @@ -1035,7 +1036,7 @@ func CommitLogsDirPath(prefix string) string {
func DataFileSetExistsAt(filePathPrefix string, namespace ident.ID, shard uint32, blockStart time.Time) (bool, error) {
shardDir := ShardDataDirPath(filePathPrefix, namespace, shard)
checkpointPath := filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix)
return FileExists(checkpointPath)
return CompleteCheckpointFileExists(checkpointPath)
}

// SnapshotFileSetExistsAt determines whether snapshot fileset files exist for the given namespace, shard, and block start time.
Expand Down Expand Up @@ -1126,8 +1127,31 @@ func NextIndexSnapshotFileIndex(filePathPrefix string, namespace ident.ID, block
return currentSnapshotIndex + 1, nil
}

// CompleteCheckpointFileExists returns whether a checkpoint file exists, and if so,
// is it complete.
func CompleteCheckpointFileExists(filePath string) (bool, error) {
f, err := os.Stat(filePath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this have the same strings.Contains check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would the check be? This one is specifically written to be used for checkpoint files, and if its not a checkpoint file you'll probably get an error anyways because its unlikely the file you're checking is exactly 4 bytes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think prateek just means, should you check that the filepath they're checking for definitely is a checkpoint file (i.e. make sure that it passes the checkpoint suffix)? I think that makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, I guess I just wasn't worried about it because this function is harder to misuse but I'll add it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}

return f.Size() == checkpointFileSizeBytes, nil
}

// FileExists returns whether a file at the given path exists.
func FileExists(filePath string) (bool, error) {
if strings.Contains(filePath, checkpointFileSuffix) {
// Existence of a checkpoint file needs to be verified using the function
// CompleteCheckpointFileExists instead to ensure that it has been
// completely written out.
return false, fmt.Errorf(
"%s tried to use FileExists to verify existence of checkpoint file: %s",
instrument.InvariantViolatedMetricName, filePath)
}

_, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
Expand Down
27 changes: 18 additions & 9 deletions src/dbnode/persist/fs/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,29 +282,38 @@ func TestTimeAndVolumeIndexFromFileSetFilename(t *testing.T) {
}

func TestFileExists(t *testing.T) {
dir := createTempDir(t)
defer os.RemoveAll(dir)

shard := uint32(10)
start := time.Now()
shardDir := ShardDataDirPath(dir, testNs1ID, shard)
err := os.MkdirAll(shardDir, defaultNewDirectoryMode)
var (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind making a new test which only tests the checkpointFileSizeBytes condition we're worried about and adding a little blurb about why it exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test and put a comment in the function itself

dir = createTempDir(t)
shard = uint32(10)
start = time.Now()
shardDir = ShardDataDirPath(dir, testNs1ID, shard)
checkpointFileBuf = make([]byte, checkpointFileSizeBytes)
err = os.MkdirAll(shardDir, defaultNewDirectoryMode)
)
defer os.RemoveAll(dir)
require.NoError(t, err)

infoFilePath := filesetPathFromTime(shardDir, start, infoFileSuffix)
createDataFile(t, shardDir, start, infoFileSuffix, nil)
createDataFile(t, shardDir, start, infoFileSuffix, checkpointFileBuf)
require.True(t, mustFileExists(t, infoFilePath))
exists, err := DataFileSetExistsAt(dir, testNs1ID, uint32(shard), start)
require.NoError(t, err)
require.False(t, exists)

checkpointFilePath := filesetPathFromTime(shardDir, start, checkpointFileSuffix)
createDataFile(t, shardDir, start, checkpointFileSuffix, nil)
require.True(t, mustFileExists(t, checkpointFilePath))
createDataFile(t, shardDir, start, checkpointFileSuffix, checkpointFileBuf)
exists, err = DataFileSetExistsAt(dir, testNs1ID, uint32(shard), start)
require.NoError(t, err)
require.True(t, exists)

exists, err = CompleteCheckpointFileExists(checkpointFilePath)
require.NoError(t, err)
require.True(t, exists)

_, err = FileExists(checkpointFilePath)
require.Error(t, err)

os.Remove(infoFilePath)
require.False(t, mustFileExists(t, infoFilePath))
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/index_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (r *indexReader) Open(
}

func (r *indexReader) readCheckpointFile(filePath string) error {
exists, err := FileExists(filePath)
exists, err := CompleteCheckpointFileExists(filePath)
if err != nil {
return err
}
Expand Down
32 changes: 18 additions & 14 deletions src/dbnode/persist/fs/persist_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ func TestPersistenceManagerPrepareDataFileExistsNoDelete(t *testing.T) {
pm, _, _ := testDataPersistManager(t, ctrl)
defer os.RemoveAll(pm.filePathPrefix)

shard := uint32(0)
blockStart := time.Unix(1000, 0)
shardDir := createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard)
checkpointFilePath := filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix)
f, err := os.Create(checkpointFilePath)
require.NoError(t, err)
f.Close()
var (
shard = uint32(0)
blockStart = time.Unix(1000, 0)
shardDir = createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard)
checkpointFilePath = filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix)
checkpointFileBuf = make([]byte, checkpointFileSizeBytes)
)
createFile(t, checkpointFilePath, checkpointFileBuf)

flush, err := pm.StartDataPersist()
require.NoError(t, err)
Expand All @@ -83,8 +84,10 @@ func TestPersistenceManagerPrepareDataFileExistsWithDelete(t *testing.T) {
pm, writer, _ := testDataPersistManager(t, ctrl)
defer os.RemoveAll(pm.filePathPrefix)

shard := uint32(0)
blockStart := time.Unix(1000, 0)
var (
shard = uint32(0)
blockStart = time.Unix(1000, 0)
)

writerOpts := xtest.CmpMatcher(DataWriterOpenOptions{
Identifier: FileSetFileIdentifier{
Expand All @@ -96,11 +99,12 @@ func TestPersistenceManagerPrepareDataFileExistsWithDelete(t *testing.T) {
}, m3test.IdentTransformer)
writer.EXPECT().Open(writerOpts).Return(nil)

shardDir := createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard)
checkpointFilePath := filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix)
f, err := os.Create(checkpointFilePath)
require.NoError(t, err)
f.Close()
var (
shardDir = createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard)
checkpointFilePath = filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix)
checkpointFileBuf = make([]byte, checkpointFileSizeBytes)
)
createFile(t, checkpointFilePath, checkpointFileBuf)

flush, err := pm.StartDataPersist()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (r *reader) Status() DataFileSetReaderStatus {
}

func (r *reader) readCheckpointFile(filePath string) error {
exists, err := FileExists(filePath)
exists, err := CompleteCheckpointFileExists(filePath)
if err != nil {
return err
}
Expand Down
10 changes: 7 additions & 3 deletions src/dbnode/persist/fs/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,13 @@ func TestReadNoCheckpointFile(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, w.Close())

shardDir := ShardDataDirPath(filePathPrefix, testNs1ID, shard)
checkpointFile := filesetPathFromTime(shardDir, testWriterStart, checkpointFileSuffix)
require.True(t, mustFileExists(t, checkpointFile))
var (
shardDir = ShardDataDirPath(filePathPrefix, testNs1ID, shard)
checkpointFile = filesetPathFromTime(shardDir, testWriterStart, checkpointFileSuffix)
)
exists, err := CompleteCheckpointFileExists(checkpointFile)
require.NoError(t, err)
require.True(t, exists)
os.Remove(checkpointFile)

r := newTestReader(t, filePathPrefix)
Expand Down
4 changes: 4 additions & 0 deletions src/dbnode/persist/fs/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ import (
xtime "github.com/m3db/m3x/time"
)

const (
checkpointFileSizeBytes = 4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe have this and digest/digestLen be exported types and ensure they're the same in a test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

)

var (
errWriterEncodeTagsDataNotAccessible = errors.New(
"failed to encode tags: cannot get data")
Expand Down