Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify checkpoint files are completely written out when checking if they exist #1086

Merged
merged 11 commits into from
Oct 16, 2018
8 changes: 4 additions & 4 deletions src/dbnode/digest/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
)

const (
// digest size is 4 bytes
digestLen = 4
// DigestLenBytes is the length of generated digests in bytes.
DigestLenBytes = 4
)

var (
Expand All @@ -40,7 +40,7 @@ type Buffer []byte

// NewBuffer creates a new digest buffer.
func NewBuffer() Buffer {
return make([]byte, digestLen)
return make([]byte, DigestLenBytes)
}

// WriteDigest writes a digest to the buffer.
Expand Down Expand Up @@ -71,5 +71,5 @@ func (b Buffer) ReadDigestFromFile(fd *os.File) (uint32, error) {

// ToBuffer converts a byte slice to a digest buffer.
func ToBuffer(buf []byte) Buffer {
return Buffer(buf[:digestLen])
return Buffer(buf[:DigestLenBytes])
}
30 changes: 28 additions & 2 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
xclose "github.com/m3db/m3x/close"
xerrors "github.com/m3db/m3x/errors"
"github.com/m3db/m3x/ident"
"github.com/m3db/m3x/instrument"
)

var timeZero time.Time
Expand Down Expand Up @@ -468,7 +469,7 @@ func forEachInfoFile(
digestsFilePath = filesetPathFromTimeAndIndex(dir, t, volume, digestFileSuffix)
infoFilePath = filesetPathFromTimeAndIndex(dir, t, volume, infoFileSuffix)
}
checkpointExists, err := FileExists(checkpointFilePath)
checkpointExists, err := CompleteCheckpointFileExists(checkpointFilePath)
if err != nil {
continue
}
Expand Down Expand Up @@ -1035,7 +1036,7 @@ func CommitLogsDirPath(prefix string) string {
func DataFileSetExistsAt(filePathPrefix string, namespace ident.ID, shard uint32, blockStart time.Time) (bool, error) {
shardDir := ShardDataDirPath(filePathPrefix, namespace, shard)
checkpointPath := filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix)
return FileExists(checkpointPath)
return CompleteCheckpointFileExists(checkpointPath)
}

// SnapshotFileSetExistsAt determines whether snapshot fileset files exist for the given namespace, shard, and block start time.
Expand Down Expand Up @@ -1126,8 +1127,33 @@ func NextIndexSnapshotFileIndex(filePathPrefix string, namespace ident.ID, block
return currentSnapshotIndex + 1, nil
}

// CompleteCheckpointFileExists returns whether a checkpoint file exists, and if so,
// is it complete.
func CompleteCheckpointFileExists(filePath string) (bool, error) {
f, err := os.Stat(filePath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this have the same strings.Contains check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would the check be? This one is specifically written to be used for checkpoint files, and if its not a checkpoint file you'll probably get an error anyways because its unlikely the file you're checking is exactly 4 bytes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think prateek just means, should you check that the filepath they're checking for definitely is a checkpoint file (i.e. make sure that it passes the checkpoint suffix)? I think that makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, I guess I just wasn't worried about it because this function is harder to misuse but I'll add it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}

// Make sure the checkpoint file was completely written out and its
// not just an empty file.
return f.Size() == CheckpointFileSizeBytes, nil
}

// FileExists returns whether a file at the given path exists.
func FileExists(filePath string) (bool, error) {
if strings.Contains(filePath, checkpointFileSuffix) {
// Existence of a checkpoint file needs to be verified using the function
// CompleteCheckpointFileExists instead to ensure that it has been
// completely written out.
return false, fmt.Errorf(
"%s tried to use FileExists to verify existence of checkpoint file: %s",
instrument.InvariantViolatedMetricName, filePath)
}

_, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
Expand Down
53 changes: 44 additions & 9 deletions src/dbnode/persist/fs/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,33 +282,68 @@ func TestTimeAndVolumeIndexFromFileSetFilename(t *testing.T) {
}

func TestFileExists(t *testing.T) {
dir := createTempDir(t)
defer os.RemoveAll(dir)

shard := uint32(10)
start := time.Now()
shardDir := ShardDataDirPath(dir, testNs1ID, shard)
err := os.MkdirAll(shardDir, defaultNewDirectoryMode)
var (
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mind making a new test which only tests the checkpointFileSizeBytes condition we're worried about and adding a little blurb about why it exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test and put a comment in the function itself

dir = createTempDir(t)
shard = uint32(10)
start = time.Now()
shardDir = ShardDataDirPath(dir, testNs1ID, shard)
checkpointFileBuf = make([]byte, CheckpointFileSizeBytes)
err = os.MkdirAll(shardDir, defaultNewDirectoryMode)
)
defer os.RemoveAll(dir)
require.NoError(t, err)

infoFilePath := filesetPathFromTime(shardDir, start, infoFileSuffix)
createDataFile(t, shardDir, start, infoFileSuffix, nil)
createDataFile(t, shardDir, start, infoFileSuffix, checkpointFileBuf)
require.True(t, mustFileExists(t, infoFilePath))
exists, err := DataFileSetExistsAt(dir, testNs1ID, uint32(shard), start)
require.NoError(t, err)
require.False(t, exists)

checkpointFilePath := filesetPathFromTime(shardDir, start, checkpointFileSuffix)
createDataFile(t, shardDir, start, checkpointFileSuffix, nil)
require.True(t, mustFileExists(t, checkpointFilePath))
createDataFile(t, shardDir, start, checkpointFileSuffix, checkpointFileBuf)
exists, err = DataFileSetExistsAt(dir, testNs1ID, uint32(shard), start)
require.NoError(t, err)
require.True(t, exists)

exists, err = CompleteCheckpointFileExists(checkpointFilePath)
require.NoError(t, err)
require.True(t, exists)

_, err = FileExists(checkpointFilePath)
require.Error(t, err)

os.Remove(infoFilePath)
require.False(t, mustFileExists(t, infoFilePath))
}

func TestCompleteCheckpointFileExists(t *testing.T) {
var (
dir = createTempDir(t)
shard = uint32(10)
start = time.Now()
shardDir = ShardDataDirPath(dir, testNs1ID, shard)
checkpointFilePath = filesetPathFromTime(shardDir, start, checkpointFileSuffix)
err = os.MkdirAll(shardDir, defaultNewDirectoryMode)

validCheckpointFileBuf = make([]byte, CheckpointFileSizeBytes)
invalidCheckpointFileBuf = make([]byte, CheckpointFileSizeBytes+1)
)
defer os.RemoveAll(dir)
require.NoError(t, err)

createDataFile(t, shardDir, start, checkpointFileSuffix, invalidCheckpointFileBuf)
exists, err := CompleteCheckpointFileExists(checkpointFilePath)
require.NoError(t, err)
require.False(t, exists)

createDataFile(t, shardDir, start, checkpointFileSuffix, validCheckpointFileBuf)
exists, err = CompleteCheckpointFileExists(checkpointFilePath)
require.NoError(t, err)
require.True(t, exists)
}

func TestShardDirPath(t *testing.T) {
require.Equal(t, "foo/bar/data/testNs/12", ShardDataDirPath("foo/bar", testNs1ID, 12))
require.Equal(t, "foo/bar/data/testNs/12", ShardDataDirPath("foo/bar/", testNs1ID, 12))
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/index_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (r *indexReader) Open(
}

func (r *indexReader) readCheckpointFile(filePath string) error {
exists, err := FileExists(filePath)
exists, err := CompleteCheckpointFileExists(filePath)
if err != nil {
return err
}
Expand Down
32 changes: 18 additions & 14 deletions src/dbnode/persist/fs/persist_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ func TestPersistenceManagerPrepareDataFileExistsNoDelete(t *testing.T) {
pm, _, _ := testDataPersistManager(t, ctrl)
defer os.RemoveAll(pm.filePathPrefix)

shard := uint32(0)
blockStart := time.Unix(1000, 0)
shardDir := createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard)
checkpointFilePath := filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix)
f, err := os.Create(checkpointFilePath)
require.NoError(t, err)
f.Close()
var (
shard = uint32(0)
blockStart = time.Unix(1000, 0)
shardDir = createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard)
checkpointFilePath = filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix)
checkpointFileBuf = make([]byte, CheckpointFileSizeBytes)
)
createFile(t, checkpointFilePath, checkpointFileBuf)

flush, err := pm.StartDataPersist()
require.NoError(t, err)
Expand All @@ -83,8 +84,10 @@ func TestPersistenceManagerPrepareDataFileExistsWithDelete(t *testing.T) {
pm, writer, _ := testDataPersistManager(t, ctrl)
defer os.RemoveAll(pm.filePathPrefix)

shard := uint32(0)
blockStart := time.Unix(1000, 0)
var (
shard = uint32(0)
blockStart = time.Unix(1000, 0)
)

writerOpts := xtest.CmpMatcher(DataWriterOpenOptions{
Identifier: FileSetFileIdentifier{
Expand All @@ -96,11 +99,12 @@ func TestPersistenceManagerPrepareDataFileExistsWithDelete(t *testing.T) {
}, m3test.IdentTransformer)
writer.EXPECT().Open(writerOpts).Return(nil)

shardDir := createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard)
checkpointFilePath := filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix)
f, err := os.Create(checkpointFilePath)
require.NoError(t, err)
f.Close()
var (
shardDir = createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard)
checkpointFilePath = filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix)
checkpointFileBuf = make([]byte, CheckpointFileSizeBytes)
)
createFile(t, checkpointFilePath, checkpointFileBuf)

flush, err := pm.StartDataPersist()
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (r *reader) Status() DataFileSetReaderStatus {
}

func (r *reader) readCheckpointFile(filePath string) error {
exists, err := FileExists(filePath)
exists, err := CompleteCheckpointFileExists(filePath)
if err != nil {
return err
}
Expand Down
10 changes: 7 additions & 3 deletions src/dbnode/persist/fs/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,13 @@ func TestReadNoCheckpointFile(t *testing.T) {
assert.NoError(t, err)
assert.NoError(t, w.Close())

shardDir := ShardDataDirPath(filePathPrefix, testNs1ID, shard)
checkpointFile := filesetPathFromTime(shardDir, testWriterStart, checkpointFileSuffix)
require.True(t, mustFileExists(t, checkpointFile))
var (
shardDir = ShardDataDirPath(filePathPrefix, testNs1ID, shard)
checkpointFile = filesetPathFromTime(shardDir, testWriterStart, checkpointFileSuffix)
)
exists, err := CompleteCheckpointFileExists(checkpointFile)
require.NoError(t, err)
require.True(t, exists)
os.Remove(checkpointFile)

r := newTestReader(t, filePathPrefix)
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/persist/fs/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ func TestSimpleReadWrite(t *testing.T) {
readTestData(t, r, 0, testWriterStart, entries)
}

func TestCheckpointFileSizeBytesSize(t *testing.T) {
// These values need to match so that the logic for determining whether
// a checkpoint file is complete or not remains correct.
require.Equal(t, digest.DigestLenBytes, CheckpointFileSizeBytes)
Copy link
Collaborator

@robskillington robskillington Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this, why don't you just make CheckpointFileSizeBytes = digest.DigestLenBytes? I actually don't mind either way though, whatever works.

Copy link
Contributor Author

@richardartoul richardartoul Oct 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I discussed this with Prateek and we like the idea that the tests would break if you changed one and not the other just to make the person think about the change more

}

func TestDuplicateWrite(t *testing.T) {
dir := createTempDir(t)
filePathPrefix := filepath.Join(dir, "")
Expand Down
5 changes: 5 additions & 0 deletions src/dbnode/persist/fs/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ import (
xtime "github.com/m3db/m3x/time"
)

const (
// CheckpointFileSizeBytes is the expected size of a valid checkpoint file.
CheckpointFileSizeBytes = 4
)

var (
errWriterEncodeTagsDataNotAccessible = errors.New(
"failed to encode tags: cannot get data")
Expand Down