diff --git a/src/dbnode/digest/buffer.go b/src/dbnode/digest/buffer.go index 3e427a3f90..a5cbeb8c77 100644 --- a/src/dbnode/digest/buffer.go +++ b/src/dbnode/digest/buffer.go @@ -26,8 +26,8 @@ import ( ) const ( - // digest size is 4 bytes - digestLen = 4 + // DigestLenBytes is the length of generated digests in bytes. + DigestLenBytes = 4 ) var ( @@ -40,7 +40,7 @@ type Buffer []byte // NewBuffer creates a new digest buffer. func NewBuffer() Buffer { - return make([]byte, digestLen) + return make([]byte, DigestLenBytes) } // WriteDigest writes a digest to the buffer. @@ -71,5 +71,5 @@ func (b Buffer) ReadDigestFromFile(fd *os.File) (uint32, error) { // ToBuffer converts a byte slice to a digest buffer. func ToBuffer(buf []byte) Buffer { - return Buffer(buf[:digestLen]) + return Buffer(buf[:DigestLenBytes]) } diff --git a/src/dbnode/persist/fs/files.go b/src/dbnode/persist/fs/files.go index dfabce15e0..fa110708e9 100644 --- a/src/dbnode/persist/fs/files.go +++ b/src/dbnode/persist/fs/files.go @@ -40,6 +40,7 @@ import ( xclose "github.com/m3db/m3x/close" xerrors "github.com/m3db/m3x/errors" "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" ) var timeZero time.Time @@ -468,7 +469,7 @@ func forEachInfoFile( digestsFilePath = filesetPathFromTimeAndIndex(dir, t, volume, digestFileSuffix) infoFilePath = filesetPathFromTimeAndIndex(dir, t, volume, infoFileSuffix) } - checkpointExists, err := FileExists(checkpointFilePath) + checkpointExists, err := CompleteCheckpointFileExists(checkpointFilePath) if err != nil { continue } @@ -1035,7 +1036,7 @@ func CommitLogsDirPath(prefix string) string { func DataFileSetExistsAt(filePathPrefix string, namespace ident.ID, shard uint32, blockStart time.Time) (bool, error) { shardDir := ShardDataDirPath(filePathPrefix, namespace, shard) checkpointPath := filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix) - return FileExists(checkpointPath) + return CompleteCheckpointFileExists(checkpointPath) } // SnapshotFileSetExistsAt determines whether snapshot fileset files exist for the given namespace, shard, and block start time. @@ -1126,8 +1127,39 @@ func NextIndexSnapshotFileIndex(filePathPrefix string, namespace ident.ID, block return currentSnapshotIndex + 1, nil } +// CompleteCheckpointFileExists returns whether a checkpoint file exists, and if so, +// is it complete. +func CompleteCheckpointFileExists(filePath string) (bool, error) { + if !strings.Contains(filePath, checkpointFileSuffix) { + return false, fmt.Errorf( + "%s tried to use CompleteCheckpointFileExists to verify existence of non checkpoint file: %s", + instrument.InvariantViolatedMetricName, filePath) + } + + f, err := os.Stat(filePath) + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + + // Make sure the checkpoint file was completely written out and its + // not just an empty file. + return f.Size() == CheckpointFileSizeBytes, nil +} + // FileExists returns whether a file at the given path exists. func FileExists(filePath string) (bool, error) { + if strings.Contains(filePath, checkpointFileSuffix) { + // Existence of a checkpoint file needs to be verified using the function + // CompleteCheckpointFileExists instead to ensure that it has been + // completely written out. + return false, fmt.Errorf( + "%s tried to use FileExists to verify existence of checkpoint file: %s", + instrument.InvariantViolatedMetricName, filePath) + } + _, err := os.Stat(filePath) if err != nil { if os.IsNotExist(err) { diff --git a/src/dbnode/persist/fs/files_test.go b/src/dbnode/persist/fs/files_test.go index 15c9eaee90..b4ef411bfa 100644 --- a/src/dbnode/persist/fs/files_test.go +++ b/src/dbnode/persist/fs/files_test.go @@ -38,6 +38,7 @@ import ( "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3x/ident" + "github.com/m3db/m3x/instrument" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -282,33 +283,72 @@ func TestTimeAndVolumeIndexFromFileSetFilename(t *testing.T) { } func TestFileExists(t *testing.T) { - dir := createTempDir(t) - defer os.RemoveAll(dir) - shard := uint32(10) - start := time.Now() - shardDir := ShardDataDirPath(dir, testNs1ID, shard) - err := os.MkdirAll(shardDir, defaultNewDirectoryMode) + var ( + dir = createTempDir(t) + shard = uint32(10) + start = time.Now() + shardDir = ShardDataDirPath(dir, testNs1ID, shard) + checkpointFileBuf = make([]byte, CheckpointFileSizeBytes) + err = os.MkdirAll(shardDir, defaultNewDirectoryMode) + ) + defer os.RemoveAll(dir) require.NoError(t, err) infoFilePath := filesetPathFromTime(shardDir, start, infoFileSuffix) - createDataFile(t, shardDir, start, infoFileSuffix, nil) + createDataFile(t, shardDir, start, infoFileSuffix, checkpointFileBuf) require.True(t, mustFileExists(t, infoFilePath)) exists, err := DataFileSetExistsAt(dir, testNs1ID, uint32(shard), start) require.NoError(t, err) require.False(t, exists) checkpointFilePath := filesetPathFromTime(shardDir, start, checkpointFileSuffix) - createDataFile(t, shardDir, start, checkpointFileSuffix, nil) - require.True(t, mustFileExists(t, checkpointFilePath)) + createDataFile(t, shardDir, start, checkpointFileSuffix, checkpointFileBuf) exists, err = DataFileSetExistsAt(dir, testNs1ID, uint32(shard), start) require.NoError(t, err) require.True(t, exists) + exists, err = CompleteCheckpointFileExists(checkpointFilePath) + require.NoError(t, err) + require.True(t, exists) + + _, err = FileExists(checkpointFilePath) + require.Error(t, err) + os.Remove(infoFilePath) require.False(t, mustFileExists(t, infoFilePath)) } +func TestCompleteCheckpointFileExists(t *testing.T) { + var ( + dir = createTempDir(t) + shard = uint32(10) + start = time.Now() + shardDir = ShardDataDirPath(dir, testNs1ID, shard) + checkpointFilePath = filesetPathFromTime(shardDir, start, checkpointFileSuffix) + err = os.MkdirAll(shardDir, defaultNewDirectoryMode) + + validCheckpointFileBuf = make([]byte, CheckpointFileSizeBytes) + invalidCheckpointFileBuf = make([]byte, CheckpointFileSizeBytes+1) + ) + defer os.RemoveAll(dir) + require.NoError(t, err) + + createDataFile(t, shardDir, start, checkpointFileSuffix, invalidCheckpointFileBuf) + exists, err := CompleteCheckpointFileExists(checkpointFilePath) + require.NoError(t, err) + require.False(t, exists) + + createDataFile(t, shardDir, start, checkpointFileSuffix, validCheckpointFileBuf) + exists, err = CompleteCheckpointFileExists(checkpointFilePath) + require.NoError(t, err) + require.True(t, exists) + + exists, err = CompleteCheckpointFileExists("some-arbitrary-file") + require.Contains(t, err.Error(), instrument.InvariantViolatedMetricName) + require.False(t, exists) +} + func TestShardDirPath(t *testing.T) { require.Equal(t, "foo/bar/data/testNs/12", ShardDataDirPath("foo/bar", testNs1ID, 12)) require.Equal(t, "foo/bar/data/testNs/12", ShardDataDirPath("foo/bar/", testNs1ID, 12)) diff --git a/src/dbnode/persist/fs/index_read.go b/src/dbnode/persist/fs/index_read.go index f36f03f2ed..3ed9a23f18 100644 --- a/src/dbnode/persist/fs/index_read.go +++ b/src/dbnode/persist/fs/index_read.go @@ -138,7 +138,7 @@ func (r *indexReader) Open( } func (r *indexReader) readCheckpointFile(filePath string) error { - exists, err := FileExists(filePath) + exists, err := CompleteCheckpointFileExists(filePath) if err != nil { return err } diff --git a/src/dbnode/persist/fs/persist_manager_test.go b/src/dbnode/persist/fs/persist_manager_test.go index ae0b345203..a41e3a6eab 100644 --- a/src/dbnode/persist/fs/persist_manager_test.go +++ b/src/dbnode/persist/fs/persist_manager_test.go @@ -50,13 +50,14 @@ func TestPersistenceManagerPrepareDataFileExistsNoDelete(t *testing.T) { pm, _, _ := testDataPersistManager(t, ctrl) defer os.RemoveAll(pm.filePathPrefix) - shard := uint32(0) - blockStart := time.Unix(1000, 0) - shardDir := createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard) - checkpointFilePath := filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix) - f, err := os.Create(checkpointFilePath) - require.NoError(t, err) - f.Close() + var ( + shard = uint32(0) + blockStart = time.Unix(1000, 0) + shardDir = createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard) + checkpointFilePath = filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix) + checkpointFileBuf = make([]byte, CheckpointFileSizeBytes) + ) + createFile(t, checkpointFilePath, checkpointFileBuf) flush, err := pm.StartDataPersist() require.NoError(t, err) @@ -83,8 +84,10 @@ func TestPersistenceManagerPrepareDataFileExistsWithDelete(t *testing.T) { pm, writer, _ := testDataPersistManager(t, ctrl) defer os.RemoveAll(pm.filePathPrefix) - shard := uint32(0) - blockStart := time.Unix(1000, 0) + var ( + shard = uint32(0) + blockStart = time.Unix(1000, 0) + ) writerOpts := xtest.CmpMatcher(DataWriterOpenOptions{ Identifier: FileSetFileIdentifier{ @@ -96,11 +99,12 @@ func TestPersistenceManagerPrepareDataFileExistsWithDelete(t *testing.T) { }, m3test.IdentTransformer) writer.EXPECT().Open(writerOpts).Return(nil) - shardDir := createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard) - checkpointFilePath := filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix) - f, err := os.Create(checkpointFilePath) - require.NoError(t, err) - f.Close() + var ( + shardDir = createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard) + checkpointFilePath = filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix) + checkpointFileBuf = make([]byte, CheckpointFileSizeBytes) + ) + createFile(t, checkpointFilePath, checkpointFileBuf) flush, err := pm.StartDataPersist() require.NoError(t, err) diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 915efd8519..ebadde9647 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -248,7 +248,7 @@ func (r *reader) Status() DataFileSetReaderStatus { } func (r *reader) readCheckpointFile(filePath string) error { - exists, err := FileExists(filePath) + exists, err := CompleteCheckpointFileExists(filePath) if err != nil { return err } diff --git a/src/dbnode/persist/fs/read_test.go b/src/dbnode/persist/fs/read_test.go index bf9637e5e9..1e9e38a7c1 100644 --- a/src/dbnode/persist/fs/read_test.go +++ b/src/dbnode/persist/fs/read_test.go @@ -269,9 +269,13 @@ func TestReadNoCheckpointFile(t *testing.T) { assert.NoError(t, err) assert.NoError(t, w.Close()) - shardDir := ShardDataDirPath(filePathPrefix, testNs1ID, shard) - checkpointFile := filesetPathFromTime(shardDir, testWriterStart, checkpointFileSuffix) - require.True(t, mustFileExists(t, checkpointFile)) + var ( + shardDir = ShardDataDirPath(filePathPrefix, testNs1ID, shard) + checkpointFile = filesetPathFromTime(shardDir, testWriterStart, checkpointFileSuffix) + ) + exists, err := CompleteCheckpointFileExists(checkpointFile) + require.NoError(t, err) + require.True(t, exists) os.Remove(checkpointFile) r := newTestReader(t, filePathPrefix) diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index 7435a6e909..1bed65effd 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -253,6 +253,12 @@ func TestSimpleReadWrite(t *testing.T) { readTestData(t, r, 0, testWriterStart, entries) } +func TestCheckpointFileSizeBytesSize(t *testing.T) { + // These values need to match so that the logic for determining whether + // a checkpoint file is complete or not remains correct. + require.Equal(t, digest.DigestLenBytes, CheckpointFileSizeBytes) +} + func TestDuplicateWrite(t *testing.T) { dir := createTempDir(t) filePathPrefix := filepath.Join(dir, "") diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index 90eb5e7916..8b4ba3a2ed 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -40,6 +40,11 @@ import ( xtime "github.com/m3db/m3x/time" ) +const ( + // CheckpointFileSizeBytes is the expected size of a valid checkpoint file. + CheckpointFileSizeBytes = 4 +) + var ( errWriterEncodeTagsDataNotAccessible = errors.New( "failed to encode tags: cannot get data")