diff --git a/CHANGELOG.md b/CHANGELOG.md index d0cde64516..8c9d2b10b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,22 @@ # Changelog +# 0.11.0 (pending) + +## Migration Disclaimer + +Version 0.11.0 of M3 includes further work on supporting writing data at arbitrary times (within the retention period). While most of these changes are transparent to the user in terms of functionality and performance, we had to make a change to the naming format of the files that get persisted to disk (#1720). This change was required to handle multiple fileset volumes per block, which is necessary after introducing "cold flushes" (#1624). + +The redesign is **backwards compatible** but not **forwards compatible**. This means that you should be able upgrade your < 0.11.0 clusters to 0.11.0 with no issues, but you will not be able to downgrade without taking some additional steps. + +### Troubleshooting and Rolling Back + +If you run into any issues with the upgrade or need to downgrade to a previous version for any reason, follow these steps: + +1. Stop the node that is having trouble with the upgrade or that you're trying to downgrade. +2. Remove the filesets that include a volume index in them, e.g. this is a filename with the new volumed format: `fileset-1257890400000000000-0-data.db`, and this is the corresponding filename in the original format: `fileset-1257890400000000000-data.db`. +3. Modify the `bootstrappers` config in the M3DB YAML file from `filesystem, commitlog, peers, uninitialized_topology` to `filesystem, peers, commitlog, uninitialized_topology`. This will force the node to bootstrap from its peers instead of the local snapshot and commitlog files it has on disk, which is important otherwise when the node restarts, it will think that it has already been bootstrapped. +4. Turn the node back on. + # 0.10.2 ## Performance diff --git a/src/cmd/tools/dtest/util/seed/generator_test.go b/src/cmd/tools/dtest/util/seed/generator_test.go index 364fb0da06..31a73196dc 100644 --- a/src/cmd/tools/dtest/util/seed/generator_test.go +++ b/src/cmd/tools/dtest/util/seed/generator_test.go @@ -128,18 +128,12 @@ func (t *fileInfoExtractor) visit(fPath string, f os.FileInfo, err error) error t.shards[uint32(shardNum)] = struct{}{} name := f.Name() - first := strings.Index(name, "-") - if first == -1 { - return fmt.Errorf("unable to find '-' in %v", name) + nameSplit := strings.Split(name, "-") + if len(nameSplit) < 2 { + return fmt.Errorf("unable to parse time from %v", name) } - last := strings.LastIndex(name, "-") - if last == -1 { - return fmt.Errorf("unable to find '-' in %v", name) - } - if first == last { - return fmt.Errorf("found only single '-' in %v", name) - } - num, parseErr := strconv.ParseInt(name[first+1:last], 10, 64) + + num, parseErr := strconv.ParseInt(nameSplit[1], 10, 64) if parseErr != nil { return err } diff --git a/src/dbnode/integration/disk_cleanup_helpers.go b/src/dbnode/integration/disk_cleanup_helpers.go index 4de994e6e6..235ee220a1 100644 --- a/src/dbnode/integration/disk_cleanup_helpers.go +++ b/src/dbnode/integration/disk_cleanup_helpers.go @@ -27,11 +27,11 @@ import ( "testing" "time" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/x/ident" "github.com/stretchr/testify/require" @@ -114,7 +114,7 @@ type cleanupTimesFileSet struct { func (fset *cleanupTimesFileSet) anyExist() bool { for _, t := range fset.times { - exists, err := fs.DataFileSetExistsAt(fset.filePathPrefix, fset.namespace, fset.shard, t) + exists, err := fs.DataFileSetExists(fset.filePathPrefix, fset.namespace, fset.shard, t, 0) if err != nil { panic(err) } diff --git a/src/dbnode/integration/disk_cleanup_index_test.go b/src/dbnode/integration/disk_cleanup_index_test.go index 36b53ff855..d622a633b2 100644 --- a/src/dbnode/integration/disk_cleanup_index_test.go +++ b/src/dbnode/integration/disk_cleanup_index_test.go @@ -26,9 +26,9 @@ import ( "testing" "time" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/namespace" xclock "github.com/m3db/m3/src/x/clock" "github.com/stretchr/testify/require" diff --git a/src/dbnode/integration/disk_flush_helpers.go b/src/dbnode/integration/disk_flush_helpers.go index 6ef38aca2e..f0104c93a6 100644 --- a/src/dbnode/integration/disk_flush_helpers.go +++ b/src/dbnode/integration/disk_flush_helpers.go @@ -30,11 +30,11 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/integration/generate" + ns "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage" - ns "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/ident/testutil" xtime "github.com/m3db/m3/src/x/time" @@ -122,8 +122,8 @@ func waitUntilDataFilesFlushed( for timestamp, seriesList := range testData { for _, series := range seriesList { shard := shardSet.Lookup(series.ID) - exists, err := fs.DataFileSetExistsAt( - filePathPrefix, namespace, shard, timestamp.ToTime()) + exists, err := fs.DataFileSetExists( + filePathPrefix, namespace, shard, timestamp.ToTime(), 0) if err != nil { panic(err) } diff --git a/src/dbnode/persist/fs/files.go b/src/dbnode/persist/fs/files.go index ca7abf4935..f5dc1d2d27 100644 --- a/src/dbnode/persist/fs/files.go +++ b/src/dbnode/persist/fs/files.go @@ -58,13 +58,28 @@ const ( snapshotDirName = "snapshots" commitLogsDirName = "commitlogs" + // The maximum number of delimeters ('-' or '.') that is expected in a + // (base) filename. + maxDelimNum = 4 + + // The volume index assigned to (legacy) filesets that don't have a volume + // number in their filename. + // NOTE: Since this index is the same as the index for the first + // (non-legacy) fileset, receiving an index of 0 means that we need to + // check for both indexed and non-indexed filenames. + unindexedFilesetIndex = 0 + + timeComponentPosition = 1 commitLogComponentPosition = 2 indexFileSetComponentPosition = 2 + dataFileSetComponentPosition = 2 numComponentsSnapshotMetadataFile = 4 numComponentsSnapshotMetadataCheckpointFile = 5 snapshotMetadataUUIDComponentPosition = 1 snapshotMetadataIndexComponentPosition = 2 + + errUnexpectedFilenamePattern = "unexpected filename: %s" ) var ( @@ -174,9 +189,9 @@ func (f FileSetFilesSlice) Filepaths() []string { } // LatestVolumeForBlock returns the latest (highest index) FileSetFile in the -// slice for a given block start, only applicable for index and snapshot file set files. +// slice for a given block start that has a complete checkpoint file. func (f FileSetFilesSlice) LatestVolumeForBlock(blockStart time.Time) (FileSetFile, bool) { - // Make sure we're already sorted + // Make sure we're already sorted. f.sortByTimeAndVolumeIndexAscending() for i, curr := range f { @@ -207,6 +222,18 @@ func (f FileSetFilesSlice) LatestVolumeForBlock(blockStart time.Time) (FileSetFi return FileSetFile{}, false } +// VolumeExistsForBlock returns whether there is a valid FileSetFile for the +// given block start and volume index. +func (f FileSetFilesSlice) VolumeExistsForBlock(blockStart time.Time, volume int) bool { + for _, curr := range f { + if curr.ID.BlockStart.Equal(blockStart) && curr.ID.VolumeIndex == volume { + return curr.HasCompleteCheckpointFile() + } + } + + return false +} + // ignores the index in the FileSetFileIdentifier because fileset files should // always have index 0. func (f FileSetFilesSlice) sortByTimeAscending() { @@ -357,9 +384,25 @@ func (a commitlogsByTimeAndIndexAscending) Less(i, j int) bool { return ti.Equal(tj) && ii < ij } -// fileSetFilesByTimeAndIndexAscending sorts file sets files by their block start times and volume -// index in ascending order. If the files do not have block start times or indexes in their names, -// the result is undefined. +// dataFileSetFilesByTimeAndVolumeIndexAscending sorts file sets files by their +// block start times and volume index in ascending order. If the files do not +// have block start times or indexes in their names, the result is undefined. +type dataFileSetFilesByTimeAndVolumeIndexAscending []string + +func (a dataFileSetFilesByTimeAndVolumeIndexAscending) Len() int { return len(a) } +func (a dataFileSetFilesByTimeAndVolumeIndexAscending) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a dataFileSetFilesByTimeAndVolumeIndexAscending) Less(i, j int) bool { + ti, ii, _ := TimeAndVolumeIndexFromDataFileSetFilename(a[i]) + tj, ij, _ := TimeAndVolumeIndexFromDataFileSetFilename(a[j]) + if ti.Before(tj) { + return true + } + return ti.Equal(tj) && ii < ij +} + +// fileSetFilesByTimeAndVolumeIndexAscending sorts file sets files by their +// block start times and volume index in ascending order. If the files do not +// have block start times or indexes in their names, the result is undefined. type fileSetFilesByTimeAndVolumeIndexAscending []string func (a fileSetFilesByTimeAndVolumeIndexAscending) Len() int { return len(a) } @@ -373,51 +416,140 @@ func (a fileSetFilesByTimeAndVolumeIndexAscending) Less(i, j int) bool { return ti.Equal(tj) && ii < ij } -func componentsAndTimeFromFileName(fname string) ([]string, time.Time, error) { - components := strings.Split(filepath.Base(fname), separator) - if len(components) < 3 { - return nil, timeZero, fmt.Errorf("unexpected file name %s", fname) +// Returns the positions of filename delimiters ('-' and '.') and the number of +// delimeters found, to be used in conjunction with the intComponentAtIndex +// function to extract filename components. This function is deliberately +// optimized for speed and lack of allocations, since allocation-heavy filename +// parsing can quickly become a large source of allocations in the entire +// system, especially when namespaces with long retentions are configured. +func delimiterPositions(baseFilename string) ([maxDelimNum]int, int) { + var ( + delimPos [maxDelimNum]int + delimsFound int + ) + + for i := range baseFilename { + if r := baseFilename[i]; r == separatorRune || r == fileSuffixDelimeterRune { + delimPos[delimsFound] = i + delimsFound++ + + if delimsFound == len(delimPos) { + // Found the maximum expected number of separators. + break + } + } } - str := strings.Replace(components[1], fileSuffix, "", 1) - nanoseconds, err := strconv.ParseInt(str, 10, 64) + + return delimPos, delimsFound +} + +// Returns the the specified component of a filename, given the positions of +// delimeters. Our only use cases for this involve extracting numeric +// components, so this function assumes this and returns the component as an +// int64. +func intComponentAtIndex( + baseFilename string, + componentPos int, + delimPos [maxDelimNum]int, +) (int64, error) { + start := 0 + if componentPos > 0 { + start = delimPos[componentPos-1] + 1 + } + end := delimPos[componentPos] + if start > end || end > len(baseFilename)-1 || start < 0 { + return 0, fmt.Errorf(errUnexpectedFilenamePattern, baseFilename) + } + + num, err := strconv.ParseInt(baseFilename[start:end], 10, 64) if err != nil { - return nil, timeZero, err + return 0, fmt.Errorf(errUnexpectedFilenamePattern, baseFilename) } - return components, time.Unix(0, nanoseconds), nil + return num, nil } // TimeFromFileName extracts the block start time from file name. func TimeFromFileName(fname string) (time.Time, error) { - _, t, err := componentsAndTimeFromFileName(fname) - return t, err + base := filepath.Base(fname) + + delims, delimsFound := delimiterPositions(base) + // There technically only needs to be two delimeters here since the time + // component is in index 1. However, all DB files have a minimum of three + // delimeters, so check for that instead. + if delimsFound < 3 { + return timeZero, fmt.Errorf(errUnexpectedFilenamePattern, fname) + } + nanos, err := intComponentAtIndex(base, timeComponentPosition, delims) + if err != nil { + return timeZero, fmt.Errorf(errUnexpectedFilenamePattern, fname) + } + + return time.Unix(0, nanos), nil } -// TimeAndIndexFromCommitlogFilename extracts the block start and index from file name for a commitlog. +// TimeAndIndexFromCommitlogFilename extracts the block start and index from +// file name for a commitlog. func TimeAndIndexFromCommitlogFilename(fname string) (time.Time, int, error) { return timeAndIndexFromFileName(fname, commitLogComponentPosition) } -// TimeAndVolumeIndexFromFileSetFilename extracts the block start and volume index from file name. +// TimeAndVolumeIndexFromDataFileSetFilename extracts the block start and volume +// index from a data fileset file name that may or may not have an index. If the +// file name does not include an index, unindexedFilesetIndex is returned as the +// volume index. +func TimeAndVolumeIndexFromDataFileSetFilename(fname string) (time.Time, int, error) { + base := filepath.Base(fname) + + delims, delimsFound := delimiterPositions(base) + if delimsFound < 3 { + return timeZero, 0, fmt.Errorf(errUnexpectedFilenamePattern, fname) + } + + nanos, err := intComponentAtIndex(base, timeComponentPosition, delims) + if err != nil { + return timeZero, 0, fmt.Errorf(errUnexpectedFilenamePattern, fname) + } + unixNanos := time.Unix(0, nanos) + + // Legacy filename with no volume index. + if delimsFound == 3 { + return unixNanos, unindexedFilesetIndex, nil + } + + volume, err := intComponentAtIndex(base, dataFileSetComponentPosition, delims) + if err != nil { + return timeZero, 0, fmt.Errorf(errUnexpectedFilenamePattern, fname) + } + + return unixNanos, int(volume), nil +} + +// TimeAndVolumeIndexFromFileSetFilename extracts the block start and +// volume index from an index file name. func TimeAndVolumeIndexFromFileSetFilename(fname string) (time.Time, int, error) { return timeAndIndexFromFileName(fname, indexFileSetComponentPosition) } func timeAndIndexFromFileName(fname string, componentPosition int) (time.Time, int, error) { - components, t, err := componentsAndTimeFromFileName(fname) - if err != nil { - return timeZero, 0, err + base := filepath.Base(fname) + + delims, delimsFound := delimiterPositions(base) + if componentPosition > delimsFound { + return timeZero, 0, fmt.Errorf(errUnexpectedFilenamePattern, fname) } - if componentPosition > len(components)-1 { - return timeZero, 0, fmt.Errorf("malformed filename: %s", fname) + nanos, err := intComponentAtIndex(base, 1, delims) + if err != nil { + return timeZero, 0, fmt.Errorf(errUnexpectedFilenamePattern, fname) } + unixNanos := time.Unix(0, nanos) - str := strings.Replace(components[componentPosition], fileSuffix, "", 1) - index, err := strconv.ParseInt(str, 10, 64) + index, err := intComponentAtIndex(base, componentPosition, delims) if err != nil { - return timeZero, 0, err + return timeZero, 0, fmt.Errorf(errUnexpectedFilenamePattern, fname) } - return t, int(index), nil + + return unixNanos, int(index), nil } // SnapshotTimeAndID returns the metadata for the snapshot. @@ -490,6 +622,27 @@ func readSnapshotInfoFile(filePathPrefix string, id FileSetFileIdentifier, reade infoFilePath, readerBufferSize, expectedInfoDigest) } +func readCheckpointFile(filePath string, digestBuf digest.Buffer) (uint32, error) { + exists, err := CompleteCheckpointFileExists(filePath) + if err != nil { + return 0, err + } + if !exists { + return 0, ErrCheckpointFileNotFound + } + fd, err := os.Open(filePath) + if err != nil { + return 0, err + } + defer fd.Close() + digest, err := digestBuf.ReadDigestFromFile(fd) + if err != nil { + return 0, err + } + + return digest, nil +} + type forEachInfoFileSelector struct { fileSetType persist.FileSetType contentType persist.FileSetContentType @@ -556,9 +709,16 @@ func forEachInfoFile( case persist.FileSetFlushType: switch args.contentType { case persist.FileSetDataContentType: - checkpointFilePath = filesetPathFromTime(dir, t, checkpointFileSuffix) - digestsFilePath = filesetPathFromTime(dir, t, digestFileSuffix) - infoFilePath = filesetPathFromTime(dir, t, infoFileSuffix) + isLegacy := false + if volume == 0 { + isLegacy, err = isFirstVolumeLegacy(dir, t, checkpointFileSuffix) + if err != nil { + continue + } + } + checkpointFilePath = dataFilesetPathFromTimeAndIndex(dir, t, volume, checkpointFileSuffix, isLegacy) + digestsFilePath = dataFilesetPathFromTimeAndIndex(dir, t, volume, digestFileSuffix, isLegacy) + infoFilePath = dataFilesetPathFromTimeAndIndex(dir, t, volume, infoFileSuffix, isLegacy) case persist.FileSetIndexContentType: checkpointFilePath = filesetPathFromTimeAndIndex(dir, t, volume, checkpointFileSuffix) digestsFilePath = filesetPathFromTimeAndIndex(dir, t, volume, digestFileSuffix) @@ -569,21 +729,8 @@ func forEachInfoFile( digestsFilePath = filesetPathFromTimeAndIndex(dir, t, volume, digestFileSuffix) infoFilePath = filesetPathFromTimeAndIndex(dir, t, volume, infoFileSuffix) } - checkpointExists, err := CompleteCheckpointFileExists(checkpointFilePath) - if err != nil { - continue - } - if !checkpointExists { - continue - } - checkpointFd, err := os.Open(checkpointFilePath) - if err != nil { - continue - } - // Read digest of digests from the checkpoint file - expectedDigestOfDigest, err := digestBuf.ReadDigestFromFile(checkpointFd) - checkpointFd.Close() + expectedDigestOfDigest, err := readCheckpointFile(checkpointFilePath, digestBuf) if err != nil { continue } @@ -793,7 +940,20 @@ func SortedSnapshotMetadataFiles(opts Options) ( return metadatas, errorsWithPaths, nil } -// SnapshotFiles returns a slice of all the names for all the fileset files +// DataFiles returns a slice of all the names for all the fileset files +// for a given namespace and shard combination. +func DataFiles(filePathPrefix string, namespace ident.ID, shard uint32) (FileSetFilesSlice, error) { + return filesetFiles(filesetFilesSelector{ + fileSetType: persist.FileSetFlushType, + contentType: persist.FileSetDataContentType, + filePathPrefix: filePathPrefix, + namespace: namespace, + shard: shard, + pattern: filesetFilePattern, + }) +} + +// SnapshotFiles returns a slice of all the names for all the snapshot files // for a given namespace and shard combination. func SnapshotFiles(filePathPrefix string, namespace ident.ID, shard uint32) (FileSetFilesSlice, error) { return filesetFiles(filesetFilesSelector{ @@ -818,8 +978,8 @@ func IndexSnapshotFiles(filePathPrefix string, namespace ident.ID) (FileSetFiles }) } -// FileSetAt returns a FileSetFile for the given namespace/shard/blockStart combination if it exists. -func FileSetAt(filePathPrefix string, namespace ident.ID, shard uint32, blockStart time.Time) (FileSetFile, bool, error) { +// FileSetAt returns a FileSetFile for the given namespace/shard/blockStart/volume combination if it exists. +func FileSetAt(filePathPrefix string, namespace ident.ID, shard uint32, blockStart time.Time, volume int) (FileSetFile, bool, error) { matched, err := filesetFiles(filesetFilesSelector{ fileSetType: persist.FileSetFlushType, contentType: persist.FileSetDataContentType, @@ -832,12 +992,12 @@ func FileSetAt(filePathPrefix string, namespace ident.ID, shard uint32, blockSta return FileSetFile{}, false, err } - matched.sortByTimeAscending() + matched.sortByTimeAndVolumeIndexAscending() for i, fileset := range matched { - if fileset.ID.BlockStart.Equal(blockStart) { + if fileset.ID.BlockStart.Equal(blockStart) && fileset.ID.VolumeIndex == volume { nextIdx := i + 1 if nextIdx < len(matched) && matched[nextIdx].ID.BlockStart.Equal(blockStart) { - // Should never happen + // Should never happen. return FileSetFile{}, false, fmt.Errorf( "found multiple fileset files for blockStart: %d", blockStart.Unix(), @@ -883,20 +1043,20 @@ func IndexFileSetsAt(filePathPrefix string, namespace ident.ID, blockStart time. return filesets, nil } -// DeleteFileSetAt deletes a FileSetFile for a given namespace/shard/blockStart combination if it exists. -func DeleteFileSetAt(filePathPrefix string, namespace ident.ID, shard uint32, t time.Time) error { - fileset, ok, err := FileSetAt(filePathPrefix, namespace, shard, t) +// DeleteFileSetAt deletes a FileSetFile for a given namespace/shard/blockStart/volume combination if it exists. +func DeleteFileSetAt(filePathPrefix string, namespace ident.ID, shard uint32, blockStart time.Time, volume int) error { + fileset, ok, err := FileSetAt(filePathPrefix, namespace, shard, blockStart, volume) if err != nil { return err } if !ok { - return fmt.Errorf("fileset for blockStart: %d does not exist", t.Unix()) + return fmt.Errorf("fileset for blockStart: %d does not exist", blockStart.Unix()) } return DeleteFiles(fileset.AbsoluteFilepaths) } -// DataFileSetsBefore returns all the flush data fileset files whose timestamps are earlier than a given time. +// DataFileSetsBefore returns all the flush data fileset paths whose timestamps are earlier than a given time. func DataFileSetsBefore(filePathPrefix string, namespace ident.ID, shard uint32, t time.Time) ([]string, error) { matched, err := filesetFiles(filesetFilesSelector{ fileSetType: persist.FileSetFlushType, @@ -912,7 +1072,7 @@ func DataFileSetsBefore(filePathPrefix string, namespace ident.ID, shard uint32, return FilesBefore(matched.Filepaths(), t) } -// IndexFileSetsBefore returns all the flush index fileset files whose timestamps are earlier than a given time. +// IndexFileSetsBefore returns all the flush index fileset paths whose timestamps are earlier than a given time. func IndexFileSetsBefore(filePathPrefix string, namespace ident.ID, t time.Time) ([]string, error) { matched, err := filesetFiles(filesetFilesSelector{ fileSetType: persist.FileSetFlushType, @@ -1011,7 +1171,7 @@ func filesetFiles(args filesetFilesSelector) (FileSetFilesSlice, error) { case persist.FileSetDataContentType: dir := ShardDataDirPath(args.filePathPrefix, args.namespace, args.shard) byTimeAsc, err = findFiles(dir, args.pattern, func(files []string) sort.Interface { - return byTimeAscending(files) + return dataFileSetFilesByTimeAndVolumeIndexAscending(files) }) case persist.FileSetIndexContentType: dir := NamespaceIndexDataDirPath(args.filePathPrefix, args.namespace) @@ -1061,7 +1221,7 @@ func filesetFiles(args filesetFilesSelector) (FileSetFilesSlice, error) { case persist.FileSetFlushType: switch args.contentType { case persist.FileSetDataContentType: - currentFileBlockStart, err = TimeFromFileName(file) + currentFileBlockStart, volumeIndex, err = TimeAndVolumeIndexFromDataFileSetFilename(file) case persist.FileSetIndexContentType: currentFileBlockStart, volumeIndex, err = TimeAndVolumeIndexFromFileSetFilename(file) default: @@ -1211,10 +1371,31 @@ func CommitLogsDirPath(prefix string) string { return path.Join(prefix, commitLogsDirName) } -// DataFileSetExistsAt determines whether data fileset files exist for the given namespace, shard, and block start. -func DataFileSetExistsAt(filePathPrefix string, namespace ident.ID, shard uint32, blockStart time.Time) (bool, error) { +// DataFileSetExists determines whether data fileset files exist for the given +// namespace, shard, block start, and volume. +func DataFileSetExists( + filePathPrefix string, + namespace ident.ID, + shard uint32, + blockStart time.Time, + volume int, +) (bool, error) { + // This function can easily become a performance bottleneck if the + // implementation is slow or requires scanning directories with a large + // number of files in them (as is common if namespaces with long retentions + // are configured). As a result, instead of using existing helper functions, + // it implements an optimized code path that only involves checking if a few + // specific files exist and contain the correct contents. shardDir := ShardDataDirPath(filePathPrefix, namespace, shard) - checkpointPath := filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix) + + // Check fileset with volume first to optimize for non-legacy use case. + checkpointPath := filesetPathFromTimeAndIndex(shardDir, blockStart, volume, checkpointFileSuffix) + exists, err := CompleteCheckpointFileExists(checkpointPath) + if err == nil && exists { + return true, nil + } + + checkpointPath = filesetPathFromTimeLegacy(shardDir, blockStart, checkpointFileSuffix) return CompleteCheckpointFileExists(checkpointPath) } @@ -1225,12 +1406,14 @@ func SnapshotFileSetExistsAt(prefix string, namespace ident.ID, shard uint32, bl return false, err } - latest, ok := snapshotFiles.LatestVolumeForBlock(blockStart) + _, ok := snapshotFiles.LatestVolumeForBlock(blockStart) if !ok { return false, nil } - return latest.HasCompleteCheckpointFile(), nil + // LatestVolumeForBlock checks for a complete checkpoint file, so we don't + // need to recheck it here. + return true, nil } // NextSnapshotMetadataFileIndex returns the next snapshot metadata file index. @@ -1375,12 +1558,53 @@ func filesetFileForTime(t time.Time, suffix string) string { return fmt.Sprintf("%s%s%d%s%s%s", filesetFilePrefix, separator, t.UnixNano(), separator, suffix, fileSuffix) } -func filesetPathFromTime(prefix string, t time.Time, suffix string) string { +func filesetPathFromTimeLegacy(prefix string, t time.Time, suffix string) string { return path.Join(prefix, filesetFileForTime(t, suffix)) } func filesetPathFromTimeAndIndex(prefix string, t time.Time, index int, suffix string) string { - return path.Join(prefix, filesetFileForTime(t, fmt.Sprintf("%d%s%s", index, separator, suffix))) + newSuffix := fmt.Sprintf("%d%s%s", index, separator, suffix) + return path.Join(prefix, filesetFileForTime(t, newSuffix)) +} + +// isFirstVolumeLegacy returns whether the first volume of the provided type is +// legacy, i.e. does not have a volume index in its filename. Using this +// function, the caller expects there to be a legacy or non-legacy file, and +// thus returns an error if neither exist. Note that this function does not +// check for the volume's complete checkpoint file. +func isFirstVolumeLegacy(prefix string, t time.Time, suffix string) (bool, error) { + // Check non-legacy path first to optimize for newer files. + path := filesetPathFromTimeAndIndex(prefix, t, 0, suffix) + _, err := os.Stat(path) + if err == nil { + return false, nil + } + + legacyPath := filesetPathFromTimeLegacy(prefix, t, suffix) + _, err = os.Stat(legacyPath) + if err == nil { + return true, nil + } + + return false, ErrCheckpointFileNotFound +} + +// Once we decide that we no longer want to support legacy (non-volume-indexed) +// filesets, we can remove this function and just use +// `filesetPathFromTimeAndIndex`. Getting code to compile and tests to pass +// after that should be a comprehensive way to remove dead code. +func dataFilesetPathFromTimeAndIndex( + prefix string, + t time.Time, + index int, + suffix string, + isLegacy bool, +) string { + if isLegacy { + return filesetPathFromTimeLegacy(prefix, t, suffix) + } + + return filesetPathFromTimeAndIndex(prefix, t, index, suffix) } func filesetIndexSegmentFileSuffixFromTime( diff --git a/src/dbnode/persist/fs/files_test.go b/src/dbnode/persist/fs/files_test.go index e16eeef31a..f68fc968bc 100644 --- a/src/dbnode/persist/fs/files_test.go +++ b/src/dbnode/persist/fs/files_test.go @@ -34,9 +34,9 @@ import ( "time" "github.com/m3db/m3/src/dbnode/digest" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" @@ -208,14 +208,14 @@ func TestForEachInfoFile(t *testing.T) { res = append(res, data...) }) - require.Equal(t, []string{filesetPathFromTime(shardDir, blockStart, infoFileSuffix)}, fnames) + require.Equal(t, []string{filesetPathFromTimeLegacy(shardDir, blockStart, infoFileSuffix)}, fnames) require.Equal(t, infoData, res) } -func TestTimeFromName(t *testing.T) { +func TestTimeFromFileName(t *testing.T) { _, err := TimeFromFileName("foo/bar") require.Error(t, err) - require.Equal(t, "unexpected file name foo/bar", err.Error()) + require.Equal(t, "unexpected filename: foo/bar", err.Error()) _, err = TimeFromFileName("foo/bar-baz") require.Error(t, err) @@ -225,6 +225,11 @@ func TestTimeFromName(t *testing.T) { require.Equal(t, expected, v) require.NoError(t, err) + v, err = TimeFromFileName("foo-12345-6-bar.db") + expected = time.Unix(0, 12345) + require.Equal(t, expected, v) + require.NoError(t, err) + v, err = TimeFromFileName("foo/bar/foo-21234567890-bar.db") expected = time.Unix(0, 21234567890) require.Equal(t, expected, v) @@ -234,7 +239,7 @@ func TestTimeFromName(t *testing.T) { func TestTimeAndIndexFromCommitlogFileName(t *testing.T) { _, _, err := TimeAndIndexFromCommitlogFilename("foo/bar") require.Error(t, err) - require.Equal(t, "unexpected file name foo/bar", err.Error()) + require.Equal(t, "unexpected filename: foo/bar", err.Error()) _, _, err = TimeAndIndexFromCommitlogFilename("foo/bar-baz") require.Error(t, err) @@ -259,7 +264,7 @@ func TestTimeAndIndexFromCommitlogFileName(t *testing.T) { func TestTimeAndVolumeIndexFromFileSetFilename(t *testing.T) { _, _, err := TimeAndVolumeIndexFromFileSetFilename("foo/bar") require.Error(t, err) - require.Equal(t, "unexpected file name foo/bar", err.Error()) + require.Equal(t, "unexpected filename: foo/bar", err.Error()) _, _, err = TimeAndVolumeIndexFromFileSetFilename("foo/bar-baz") require.Error(t, err) @@ -283,6 +288,41 @@ func TestTimeAndVolumeIndexFromFileSetFilename(t *testing.T) { require.Equal(t, filesetPathFromTimeAndIndex("foo/bar", exp.t, exp.i, "data"), validName) } +func TestTimeAndVolumeIndexFromDataFileSetFilename(t *testing.T) { + _, _, err := TimeAndVolumeIndexFromDataFileSetFilename("foo/bar") + require.Error(t, err) + require.Equal(t, "unexpected filename: foo/bar", err.Error()) + + _, _, err = TimeAndVolumeIndexFromDataFileSetFilename("foo/bar-baz") + require.Error(t, err) + + type expected struct { + t time.Time + i int + } + ts, i, err := TimeAndVolumeIndexFromDataFileSetFilename("foo-1-0-data.db") + exp := expected{time.Unix(0, 1), 0} + require.Equal(t, exp.t, ts) + require.Equal(t, exp.i, i) + require.NoError(t, err) + + validName := "foo/bar/fileset-21234567890-1-data.db" + ts, i, err = TimeAndVolumeIndexFromDataFileSetFilename(validName) + exp = expected{time.Unix(0, 21234567890), 1} + require.Equal(t, exp.t, ts) + require.Equal(t, exp.i, i) + require.NoError(t, err) + require.Equal(t, dataFilesetPathFromTimeAndIndex("foo/bar", exp.t, exp.i, "data", false), validName) + + unindexedName := "foo/bar/fileset-21234567890-data.db" + ts, i, err = TimeAndVolumeIndexFromDataFileSetFilename(unindexedName) + exp = expected{time.Unix(0, 21234567890), 0} + require.Equal(t, exp.t, ts) + require.Equal(t, exp.i, i) + require.NoError(t, err) + require.Equal(t, dataFilesetPathFromTimeAndIndex("foo/bar", exp.t, exp.i, "data", true), unindexedName) +} + func TestSnapshotMetadataFilePathFromIdentifierRoundTrip(t *testing.T) { idUUID := uuid.Parse("bf58eb3e-0582-42ee-83b2-d098c206260e") require.NotNil(t, idUUID) @@ -351,16 +391,16 @@ func TestFileExists(t *testing.T) { defer os.RemoveAll(dir) require.NoError(t, err) - infoFilePath := filesetPathFromTime(shardDir, start, infoFileSuffix) + infoFilePath := filesetPathFromTimeLegacy(shardDir, start, infoFileSuffix) createDataFile(t, shardDir, start, infoFileSuffix, checkpointFileBuf) require.True(t, mustFileExists(t, infoFilePath)) - exists, err := DataFileSetExistsAt(dir, testNs1ID, uint32(shard), start) + exists, err := DataFileSetExists(dir, testNs1ID, uint32(shard), start, 0) require.NoError(t, err) require.False(t, exists) - checkpointFilePath := filesetPathFromTime(shardDir, start, checkpointFileSuffix) + checkpointFilePath := filesetPathFromTimeLegacy(shardDir, start, checkpointFileSuffix) createDataFile(t, shardDir, start, checkpointFileSuffix, checkpointFileBuf) - exists, err = DataFileSetExistsAt(dir, testNs1ID, uint32(shard), start) + exists, err = DataFileSetExists(dir, testNs1ID, uint32(shard), start, 0) require.NoError(t, err) require.True(t, exists) @@ -381,7 +421,7 @@ func TestCompleteCheckpointFileExists(t *testing.T) { shard = uint32(10) start = time.Now() shardDir = ShardDataDirPath(dir, testNs1ID, shard) - checkpointFilePath = filesetPathFromTime(shardDir, start, checkpointFileSuffix) + checkpointFilePath = filesetPathFromTimeLegacy(shardDir, start, checkpointFileSuffix) err = os.MkdirAll(shardDir, defaultNewDirectoryMode) validCheckpointFileBuf = make([]byte, CheckpointFileSizeBytes) @@ -424,7 +464,7 @@ func TestFilePathFromTime(t *testing.T) { {"foo/bar/", infoFileSuffix, "foo/bar/fileset-1465501321123456789-info.db"}, } for _, input := range inputs { - require.Equal(t, input.expected, filesetPathFromTime(input.prefix, start, input.suffix)) + require.Equal(t, input.expected, filesetPathFromTimeLegacy(input.prefix, start, input.suffix)) } } @@ -442,7 +482,7 @@ func TestFileSetFilesBefore(t *testing.T) { shardDir := path.Join(dir, dataDirName, testNs1ID.String(), strconv.Itoa(int(shard))) for i := 0; i < len(res); i++ { ts := time.Unix(0, int64(i)) - require.Equal(t, filesetPathFromTime(shardDir, ts, infoFileSuffix), res[i]) + require.Equal(t, filesetPathFromTimeLegacy(shardDir, ts, infoFileSuffix), res[i]) } } @@ -454,7 +494,7 @@ func TestFileSetAt(t *testing.T) { for i := 0; i < numIters; i++ { timestamp := time.Unix(0, int64(i)) - res, ok, err := FileSetAt(dir, testNs1ID, shard, timestamp) + res, ok, err := FileSetAt(dir, testNs1ID, shard, timestamp, 0) require.NoError(t, err) require.True(t, ok) require.Equal(t, timestamp, res.ID.BlockStart) @@ -469,7 +509,7 @@ func TestFileSetAtIgnoresWithoutCheckpoint(t *testing.T) { for i := 0; i < numIters; i++ { timestamp := time.Unix(0, int64(i)) - _, ok, err := FileSetAt(dir, testNs1ID, shard, timestamp) + _, ok, err := FileSetAt(dir, testNs1ID, shard, timestamp, 0) require.NoError(t, err) require.False(t, ok) } @@ -483,15 +523,15 @@ func TestDeleteFileSetAt(t *testing.T) { for i := 0; i < numIters; i++ { timestamp := time.Unix(0, int64(i)) - res, ok, err := FileSetAt(dir, testNs1ID, shard, timestamp) + res, ok, err := FileSetAt(dir, testNs1ID, shard, timestamp, 0) require.NoError(t, err) require.True(t, ok) require.Equal(t, timestamp, res.ID.BlockStart) - err = DeleteFileSetAt(dir, testNs1ID, shard, timestamp) + err = DeleteFileSetAt(dir, testNs1ID, shard, timestamp, 0) require.NoError(t, err) - res, ok, err = FileSetAt(dir, testNs1ID, shard, timestamp) + res, ok, err = FileSetAt(dir, testNs1ID, shard, timestamp, 0) require.NoError(t, err) require.False(t, ok) } @@ -503,7 +543,7 @@ func TestFileSetAtNotExist(t *testing.T) { defer os.RemoveAll(dir) timestamp := time.Unix(0, 0) - _, ok, err := FileSetAt(dir, testNs1ID, shard, timestamp) + _, ok, err := FileSetAt(dir, testNs1ID, shard, timestamp, 0) require.NoError(t, err) require.False(t, ok) } @@ -1133,7 +1173,7 @@ func createDataFiles(t *testing.T, if isSnapshot { infoFilePath = filesetPathFromTimeAndIndex(shardDir, ts, 0, fileSuffix) } else { - infoFilePath = filesetPathFromTime(shardDir, ts, fileSuffix) + infoFilePath = filesetPathFromTimeLegacy(shardDir, ts, fileSuffix) } var contents []byte if fileSuffix == checkpointFileSuffix { @@ -1190,7 +1230,7 @@ func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fi var path string switch fileSetType { case persist.FileSetFlushType: - path = filesetPathFromTime(shardDir, blockStart, suffix) + path = filesetPathFromTimeLegacy(shardDir, blockStart, suffix) writeFile(t, path, nil) case persist.FileSetSnapshotType: path = filesetPathFromTimeAndIndex(shardDir, blockStart, 0, fileSuffix) @@ -1222,7 +1262,7 @@ func (filesets fileSetFileIdentifiers) create(t *testing.T, prefixDir string, fi } func createDataFile(t *testing.T, shardDir string, blockStart time.Time, suffix string, b []byte) { - filePath := filesetPathFromTime(shardDir, blockStart, suffix) + filePath := filesetPathFromTimeLegacy(shardDir, blockStart, suffix) createFile(t, filePath, b) } diff --git a/src/dbnode/persist/fs/fs.go b/src/dbnode/persist/fs/fs.go index a29a118393..ae3e727f6f 100644 --- a/src/dbnode/persist/fs/fs.go +++ b/src/dbnode/persist/fs/fs.go @@ -34,13 +34,16 @@ const ( filesetFilePrefix = "fileset" commitLogFilePrefix = "commitlog" segmentFileSetFilePrefix = "segment" - fileSuffix = ".db" + + fileSuffix = ".db" + fileSuffixDelimeterRune = '.' anyLowerCaseCharsPattern = "[a-z]*" anyNumbersPattern = "[0-9]*" anyLowerCaseCharsNumbersPattern = "[a-z0-9]*" - separator = "-" + separator = "-" + separatorRune = '-' infoFilePattern = filesetFilePrefix + separator + anyNumbersPattern + separator + infoFileSuffix + fileSuffix filesetFilePattern = filesetFilePrefix + separator + anyNumbersPattern + separator + anyLowerCaseCharsPattern + fileSuffix diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index e69dd50653..84fd159ba5 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -400,17 +400,17 @@ func (mr *MockDataFileSetSeekerMockRecorder) ConcurrentIDBloomFilter() *gomock.C } // Open mocks base method -func (m *MockDataFileSetSeeker) Open(arg0 ident.ID, arg1 uint32, arg2 time.Time, arg3 ReusableSeekerResources) error { +func (m *MockDataFileSetSeeker) Open(arg0 ident.ID, arg1 uint32, arg2 time.Time, arg3 int, arg4 ReusableSeekerResources) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Open", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "Open", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(error) return ret0 } // Open indicates an expected call of Open -func (mr *MockDataFileSetSeekerMockRecorder) Open(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockDataFileSetSeekerMockRecorder) Open(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockDataFileSetSeeker)(nil).Open), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockDataFileSetSeeker)(nil).Open), arg0, arg1, arg2, arg3, arg4) } // Range mocks base method diff --git a/src/dbnode/persist/fs/index_lookup_prop_test.go b/src/dbnode/persist/fs/index_lookup_prop_test.go index 2c0e9f2ca5..e634381382 100644 --- a/src/dbnode/persist/fs/index_lookup_prop_test.go +++ b/src/dbnode/persist/fs/index_lookup_prop_test.go @@ -107,8 +107,8 @@ func TestIndexLookupWriteRead(t *testing.T) { } // Read the summaries file into memory - summariesFilePath := filesetPathFromTime( - shardDirPath, testWriterStart, summariesFileSuffix) + summariesFilePath := dataFilesetPathFromTimeAndIndex( + shardDirPath, testWriterStart, 0, summariesFileSuffix, false) summariesFile, err := os.Open(summariesFilePath) if err != nil { return false, fmt.Errorf("err opening summaries file: %v, ", err) @@ -254,7 +254,7 @@ func genTagIdent() gopter.Gen { } func readIndexFileOffsets(shardDirPath string, numEntries int, start time.Time) (map[string]int64, error) { - indexFilePath := filesetPathFromTime(shardDirPath, start, indexFileSuffix) + indexFilePath := dataFilesetPathFromTimeAndIndex(shardDirPath, start, 0, indexFileSuffix, false) buf, err := ioutil.ReadFile(indexFilePath) if err != nil { return nil, fmt.Errorf("err reading index file: %v, ", err) diff --git a/src/dbnode/persist/fs/merger.go b/src/dbnode/persist/fs/merger.go index e6b473ae26..a7f9f34901 100644 --- a/src/dbnode/persist/fs/merger.go +++ b/src/dbnode/persist/fs/merger.go @@ -82,6 +82,7 @@ func NewMerger( func (m *merger) Merge( fileID FileSetFileIdentifier, mergeWith MergeWith, + nextVolumeIndex int, flushPreparer persist.FlushPreparer, nsCtx namespace.Context, ) (err error) { @@ -97,13 +98,15 @@ func (m *merger) Merge( nsID = fileID.Namespace shard = fileID.Shard startTime = fileID.BlockStart + volume = fileID.VolumeIndex blockSize = nsOpts.RetentionOptions().BlockSize() blockStart = xtime.ToUnixNano(startTime) openOpts = DataReaderOpenOptions{ Identifier: FileSetFileIdentifier{ - Namespace: nsID, - Shard: shard, - BlockStart: startTime, + Namespace: nsID, + Shard: shard, + BlockStart: startTime, + VolumeIndex: volume, }, FileSetType: persist.FileSetFlushType, } @@ -128,6 +131,8 @@ func (m *merger) Merge( NamespaceMetadata: nsMd, Shard: shard, BlockStart: startTime, + VolumeIndex: nextVolumeIndex, + FileSetType: persist.FileSetFlushType, DeleteIfExists: false, } prepared, err := flushPreparer.PrepareData(prepareOpts) @@ -188,10 +193,10 @@ func (m *merger) Merge( // First stage: loop through series on disk. for id, tagsIter, data, _, err := reader.Read(); err != io.EOF; id, tagsIter, data, _, err = reader.Read() { - idsToFinalize = append(idsToFinalize, id) if err != nil { return err } + idsToFinalize = append(idsToFinalize, id) // Reset BlockReaders. brs = brs[:0] @@ -216,10 +221,10 @@ func (m *merger) Merge( // are valid, and the IDs are valid for the duration of the file writing. tags, err := convert.TagsFromTagsIter(id, tagsIter, identPool) tagsIter.Close() - tagsToFinalize = append(tagsToFinalize, tags) if err != nil { return err } + tagsToFinalize = append(tagsToFinalize, tags) if err := persistIter(prepared.Persist, multiIter, startTime, id, tags, blockAllocSize, nsCtx.Schema, encoderPool); err != nil { return err diff --git a/src/dbnode/persist/fs/merger_test.go b/src/dbnode/persist/fs/merger_test.go index 48f6636c30..b1ec374638 100644 --- a/src/dbnode/persist/fs/merger_test.go +++ b/src/dbnode/persist/fs/merger_test.go @@ -453,7 +453,7 @@ func testMergeWith( BlockStart: startTime, } mergeWith := mockMergeWithFromData(t, ctrl, diskData, mergeTargetData) - err := merger.Merge(fsID, mergeWith, preparer, nsCtx) + err := merger.Merge(fsID, mergeWith, 1, preparer, nsCtx) require.NoError(t, err) assertPersistedAsExpected(t, persisted, expectedData) diff --git a/src/dbnode/persist/fs/persist_manager.go b/src/dbnode/persist/fs/persist_manager.go index 41b9458ec2..ab343cec3c 100644 --- a/src/dbnode/persist/fs/persist_manager.go +++ b/src/dbnode/persist/fs/persist_manager.go @@ -416,14 +416,19 @@ func (pm *persistManager) PrepareData(opts persist.DataPrepareOptions) (persist. return prepared, errPersistManagerCannotPrepareDataNotPersisting } - exists, err := pm.dataFilesetExistsAt(opts) + exists, err := pm.dataFilesetExists(opts) if err != nil { return prepared, err } var volumeIndex int - if opts.FileSetType == persist.FileSetSnapshotType { - // Need to work out the volume index for the next snapshot + switch opts.FileSetType { + case persist.FileSetFlushType: + // Use the volume index passed in. This ensures that the volume index is + // the same as the cold flush version. + volumeIndex = opts.VolumeIndex + case persist.FileSetSnapshotType: + // Need to work out the volume index for the next snapshot. volumeIndex, err = NextSnapshotFileSetVolumeIndex(pm.opts.FilePathPrefix(), nsMetadata.ID(), shard, blockStart) if err != nil { @@ -453,7 +458,7 @@ func (pm *persistManager) PrepareData(opts persist.DataPrepareOptions) (persist. } if exists && opts.DeleteIfExists { - err := DeleteFileSetAt(pm.opts.FilePathPrefix(), nsID, shard, blockStart) + err := DeleteFileSetAt(pm.opts.FilePathPrefix(), nsID, shard, blockStart, volumeIndex) if err != nil { return prepared, err } @@ -598,20 +603,25 @@ func (pm *persistManager) doneShared() error { return nil } -func (pm *persistManager) dataFilesetExistsAt(prepareOpts persist.DataPrepareOptions) (bool, error) { +func (pm *persistManager) dataFilesetExists(prepareOpts persist.DataPrepareOptions) (bool, error) { var ( - blockStart = prepareOpts.BlockStart - shard = prepareOpts.Shard nsID = prepareOpts.NamespaceMetadata.ID() + shard = prepareOpts.Shard + blockStart = prepareOpts.BlockStart + volume = prepareOpts.VolumeIndex ) switch prepareOpts.FileSetType { case persist.FileSetSnapshotType: - // Snapshot files are indexed (multiple per block-start), so checking if the file - // already exist doesn't make much sense + // Checking if a snapshot file exists for a block start doesn't make + // sense in this context because the logic for creating new snapshot + // files does not use the volume index provided in the prepareOpts. + // Instead, the new volume index is determined by looking at what files + // exist on disk. This means that there can never be a conflict when + // trying to write new snapshot files. return false, nil case persist.FileSetFlushType: - return DataFileSetExistsAt(pm.filePathPrefix, nsID, shard, blockStart) + return DataFileSetExists(pm.filePathPrefix, nsID, shard, blockStart, volume) default: return false, fmt.Errorf( "unable to determine if fileset exists in persist manager for fileset type: %s", diff --git a/src/dbnode/persist/fs/persist_manager_test.go b/src/dbnode/persist/fs/persist_manager_test.go index 015952c59b..dcdea6ce54 100644 --- a/src/dbnode/persist/fs/persist_manager_test.go +++ b/src/dbnode/persist/fs/persist_manager_test.go @@ -55,7 +55,7 @@ func TestPersistenceManagerPrepareDataFileExistsNoDelete(t *testing.T) { shard = uint32(0) blockStart = time.Unix(1000, 0) shardDir = createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard) - checkpointFilePath = filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix) + checkpointFilePath = filesetPathFromTimeLegacy(shardDir, blockStart, checkpointFileSuffix) checkpointFileBuf = make([]byte, CheckpointFileSizeBytes) ) createFile(t, checkpointFilePath, checkpointFileBuf) @@ -102,7 +102,7 @@ func TestPersistenceManagerPrepareDataFileExistsWithDelete(t *testing.T) { var ( shardDir = createDataShardDir(t, pm.filePathPrefix, testNs1ID, shard) - checkpointFilePath = filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix) + checkpointFilePath = filesetPathFromTimeLegacy(shardDir, blockStart, checkpointFileSuffix) checkpointFileBuf = make([]byte, CheckpointFileSizeBytes) ) createFile(t, checkpointFilePath, checkpointFileBuf) diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index d594b7e5ca..ed986e6520 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -92,6 +92,7 @@ type reader struct { expectedDigestOfDigest uint32 expectedBloomFilterDigest uint32 shard uint32 + volume int open bool } @@ -128,11 +129,11 @@ func NewReader( func (r *reader) Open(opts DataReaderOpenOptions) error { var ( - namespace = opts.Identifier.Namespace - shard = opts.Identifier.Shard - blockStart = opts.Identifier.BlockStart - snapshotIndex = opts.Identifier.VolumeIndex - err error + namespace = opts.Identifier.Namespace + shard = opts.Identifier.Shard + blockStart = opts.Identifier.BlockStart + volumeIndex = opts.Identifier.VolumeIndex + err error ) var ( @@ -144,23 +145,33 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { indexFilepath string dataFilepath string ) + switch opts.FileSetType { case persist.FileSetSnapshotType: shardDir = ShardSnapshotsDirPath(r.filePathPrefix, namespace, shard) - checkpointFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, snapshotIndex, checkpointFileSuffix) - infoFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, snapshotIndex, infoFileSuffix) - digestFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, snapshotIndex, digestFileSuffix) - bloomFilterFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, snapshotIndex, bloomFilterFileSuffix) - indexFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, snapshotIndex, indexFileSuffix) - dataFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, snapshotIndex, dataFileSuffix) + checkpointFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, checkpointFileSuffix) + infoFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, infoFileSuffix) + digestFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, digestFileSuffix) + bloomFilterFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, bloomFilterFileSuffix) + indexFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, indexFileSuffix) + dataFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, dataFileSuffix) case persist.FileSetFlushType: shardDir = ShardDataDirPath(r.filePathPrefix, namespace, shard) - checkpointFilepath = filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix) - infoFilepath = filesetPathFromTime(shardDir, blockStart, infoFileSuffix) - digestFilepath = filesetPathFromTime(shardDir, blockStart, digestFileSuffix) - bloomFilterFilepath = filesetPathFromTime(shardDir, blockStart, bloomFilterFileSuffix) - indexFilepath = filesetPathFromTime(shardDir, blockStart, indexFileSuffix) - dataFilepath = filesetPathFromTime(shardDir, blockStart, dataFileSuffix) + + isLegacy := false + if volumeIndex == 0 { + isLegacy, err = isFirstVolumeLegacy(shardDir, blockStart, checkpointFileSuffix) + if err != nil { + return err + } + } + + checkpointFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, checkpointFileSuffix, isLegacy) + infoFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, infoFileSuffix, isLegacy) + digestFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, digestFileSuffix, isLegacy) + bloomFilterFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, bloomFilterFileSuffix, isLegacy) + indexFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, indexFileSuffix, isLegacy) + dataFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, dataFileSuffix, isLegacy) default: return fmt.Errorf("unable to open reader with fileset type: %s", opts.FileSetType) } @@ -237,6 +248,7 @@ func (r *reader) Open(opts DataReaderOpenOptions) error { r.open = true r.namespace = namespace r.shard = shard + r.volume = volumeIndex return nil } @@ -246,6 +258,7 @@ func (r *reader) Status() DataFileSetReaderStatus { Open: r.open, Namespace: r.namespace, Shard: r.shard, + Volume: r.volume, BlockStart: r.start, } } @@ -497,27 +510,6 @@ func (r *reader) Close() error { return multiErr.FinalError() } -func readCheckpointFile(filePath string, digestBuf digest.Buffer) (uint32, error) { - exists, err := CompleteCheckpointFileExists(filePath) - if err != nil { - return 0, err - } - if !exists { - return 0, ErrCheckpointFileNotFound - } - fd, err := os.Open(filePath) - if err != nil { - return 0, err - } - defer fd.Close() - digest, err := digestBuf.ReadDigestFromFile(fd) - if err != nil { - return 0, err - } - - return digest, nil -} - // indexEntriesByOffsetAsc implements sort.Sort type indexEntriesByOffsetAsc []schema.IndexEntry diff --git a/src/dbnode/persist/fs/read_test.go b/src/dbnode/persist/fs/read_test.go index 59790ec8fe..573064d4d9 100644 --- a/src/dbnode/persist/fs/read_test.go +++ b/src/dbnode/persist/fs/read_test.go @@ -271,7 +271,7 @@ func TestReadNoCheckpointFile(t *testing.T) { var ( shardDir = ShardDataDirPath(filePathPrefix, testNs1ID, shard) - checkpointFile = filesetPathFromTime(shardDir, testWriterStart, checkpointFileSuffix) + checkpointFile = dataFilesetPathFromTimeAndIndex(shardDir, testWriterStart, 0, checkpointFileSuffix, false) ) exists, err := CompleteCheckpointFileExists(checkpointFile) require.NoError(t, err) @@ -316,7 +316,7 @@ func testReadOpen(t *testing.T, fileData map[string][]byte) { assert.NoError(t, w.Close()) for suffix, data := range fileData { - digestFile := filesetPathFromTime(shardDir, start, suffix) + digestFile := dataFilesetPathFromTimeAndIndex(shardDir, start, 0, suffix, false) fd, err := os.OpenFile(digestFile, os.O_WRONLY|os.O_TRUNC, os.FileMode(0666)) require.NoError(t, err) _, err = fd.Write(data) diff --git a/src/dbnode/persist/fs/seek.go b/src/dbnode/persist/fs/seek.go index 7dc0561dfc..065a730295 100644 --- a/src/dbnode/persist/fs/seek.go +++ b/src/dbnode/persist/fs/seek.go @@ -145,6 +145,7 @@ func (s *seeker) Open( namespace ident.ID, shard uint32, blockStart time.Time, + volumeIndex int, resources ReusableSeekerResources, ) error { if s.isClone { @@ -152,16 +153,27 @@ func (s *seeker) Open( } shardDir := ShardDataDirPath(s.opts.filePathPrefix, namespace, shard) - var infoFd, digestFd, bloomFilterFd, summariesFd *os.File + var ( + infoFd, digestFd, bloomFilterFd, summariesFd *os.File + err error + isLegacy bool + ) + + if volumeIndex == 0 { + isLegacy, err = isFirstVolumeLegacy(shardDir, blockStart, checkpointFileSuffix) + if err != nil { + return err + } + } // Open necessary files if err := openFiles(os.Open, map[string]**os.File{ - filesetPathFromTime(shardDir, blockStart, infoFileSuffix): &infoFd, - filesetPathFromTime(shardDir, blockStart, indexFileSuffix): &s.indexFd, - filesetPathFromTime(shardDir, blockStart, dataFileSuffix): &s.dataFd, - filesetPathFromTime(shardDir, blockStart, digestFileSuffix): &digestFd, - filesetPathFromTime(shardDir, blockStart, bloomFilterFileSuffix): &bloomFilterFd, - filesetPathFromTime(shardDir, blockStart, summariesFileSuffix): &summariesFd, + dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, infoFileSuffix, isLegacy): &infoFd, + dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, indexFileSuffix, isLegacy): &s.indexFd, + dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, dataFileSuffix, isLegacy): &s.dataFd, + dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, digestFileSuffix, isLegacy): &digestFd, + dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, bloomFilterFileSuffix, isLegacy): &bloomFilterFd, + dataFilesetPathFromTimeAndIndex(shardDir, blockStart, volumeIndex, summariesFileSuffix, isLegacy): &summariesFd, }); err != nil { return err } @@ -219,7 +231,7 @@ func (s *seeker) Open( s.Close() return fmt.Errorf( "index file digest for file: %s does not match the expected digest: %c", - filesetPathFromTime(shardDir, blockStart, indexFileSuffix), err, + filesetPathFromTimeLegacy(shardDir, blockStart, indexFileSuffix), err, ) } diff --git a/src/dbnode/persist/fs/seek_manager.go b/src/dbnode/persist/fs/seek_manager.go index 074908f563..b231f0719a 100644 --- a/src/dbnode/persist/fs/seek_manager.go +++ b/src/dbnode/persist/fs/seek_manager.go @@ -397,7 +397,17 @@ func (m *seekerManager) newOpenSeeker( shard uint32, blockStart time.Time, ) (DataFileSetSeeker, error) { - exists, err := DataFileSetExistsAt(m.filePathPrefix, m.namespace, shard, blockStart) + blm := m.blockRetrieverOpts.BlockLeaseManager() + state, err := blm.OpenLatestLease(m, block.LeaseDescriptor{ + Namespace: m.namespace, + Shard: shard, + BlockStart: blockStart, + }) + if err != nil { + return nil, fmt.Errorf("err opening latest lease: %v", err) + } + + exists, err := DataFileSetExists(m.filePathPrefix, m.namespace, shard, blockStart, state.Volume) if err != nil { return nil, err } @@ -424,21 +434,8 @@ func (m *seekerManager) newOpenSeeker( // Set the unread buffer to reuse it amongst all seekers. seeker.setUnreadBuffer(m.unreadBuf.value) - var ( - resources = m.getSeekerResources() - blm = m.blockRetrieverOpts.BlockLeaseManager() - ) - _, err = blm.OpenLatestLease(m, block.LeaseDescriptor{ - Namespace: m.namespace, - Shard: shard, - BlockStart: blockStart, - }) - if err != nil { - return nil, fmt.Errorf("err opening latest lease: %v", err) - } - - // TODO_OUT_OF_ORDER_WRITES(rartoul): Pass volume obtained from OpenLatestLease here. - err = seeker.Open(m.namespace, shard, blockStart, resources) + resources := m.getSeekerResources() + err = seeker.Open(m.namespace, shard, blockStart, state.Volume, resources) m.putSeekerResources(resources) if err != nil { return nil, err diff --git a/src/dbnode/persist/fs/seek_manager_test.go b/src/dbnode/persist/fs/seek_manager_test.go index eb2bbb3f83..ef04c05bfd 100644 --- a/src/dbnode/persist/fs/seek_manager_test.go +++ b/src/dbnode/persist/fs/seek_manager_test.go @@ -87,7 +87,7 @@ func TestSeekerManagerBorrowOpenSeekersLazy(t *testing.T) { blockStart time.Time, ) (DataFileSetSeeker, error) { mock := NewMockDataFileSetSeeker(ctrl) - mock.EXPECT().Open(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + mock.EXPECT().Open(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) mock.EXPECT().ConcurrentClone().Return(mock, nil) for i := 0; i < defaultFetchConcurrency; i++ { mock.EXPECT().Close().Return(nil) diff --git a/src/dbnode/persist/fs/seek_test.go b/src/dbnode/persist/fs/seek_test.go index 2d8b82b265..e57962e00f 100644 --- a/src/dbnode/persist/fs/seek_test.go +++ b/src/dbnode/persist/fs/seek_test.go @@ -64,7 +64,7 @@ func TestSeekEmptyIndex(t *testing.T) { resources := newTestReusableSeekerResources() s := newTestSeeker(filePathPrefix) - err = s.Open(testNs1ID, 0, testWriterStart, resources) + err = s.Open(testNs1ID, 0, testWriterStart, 0, resources) assert.NoError(t, err) _, err = s.SeekByID(ident.StringID("foo"), resources) assert.Error(t, err) @@ -104,7 +104,7 @@ func TestSeekDataUnexpectedSize(t *testing.T) { resources := newTestReusableSeekerResources() s := newTestSeeker(filePathPrefix) - err = s.Open(testNs1ID, 0, testWriterStart, resources) + err = s.Open(testNs1ID, 0, testWriterStart, 0, resources) assert.NoError(t, err) _, err = s.SeekByID(ident.StringID("foo"), resources) @@ -143,7 +143,7 @@ func TestSeekBadChecksum(t *testing.T) { resources := newTestReusableSeekerResources() s := newTestSeeker(filePathPrefix) - err = s.Open(testNs1ID, 0, testWriterStart, resources) + err = s.Open(testNs1ID, 0, testWriterStart, 0, resources) assert.NoError(t, err) _, err = s.SeekByID(ident.StringID("foo"), resources) @@ -193,7 +193,7 @@ func TestSeek(t *testing.T) { resources := newTestReusableSeekerResources() s := newTestSeeker(filePathPrefix) - err = s.Open(testNs1ID, 0, testWriterStart, resources) + err = s.Open(testNs1ID, 0, testWriterStart, 0, resources) assert.NoError(t, err) data, err := s.SeekByID(ident.StringID("foo3"), resources) @@ -261,7 +261,7 @@ func TestSeekIDNotExists(t *testing.T) { resources := newTestReusableSeekerResources() s := newTestSeeker(filePathPrefix) - err = s.Open(testNs1ID, 0, testWriterStart, resources) + err = s.Open(testNs1ID, 0, testWriterStart, 0, resources) assert.NoError(t, err) // Test errSeekIDNotFound when we scan far enough into the index file that @@ -323,7 +323,7 @@ func TestReuseSeeker(t *testing.T) { resources := newTestReusableSeekerResources() s := newTestSeeker(filePathPrefix) - err = s.Open(testNs1ID, 0, testWriterStart.Add(-time.Hour), resources) + err = s.Open(testNs1ID, 0, testWriterStart.Add(-time.Hour), 0, resources) assert.NoError(t, err) data, err := s.SeekByID(ident.StringID("foo"), resources) @@ -333,7 +333,7 @@ func TestReuseSeeker(t *testing.T) { defer data.DecRef() assert.Equal(t, []byte{1, 2, 1}, data.Bytes()) - err = s.Open(testNs1ID, 0, testWriterStart, resources) + err = s.Open(testNs1ID, 0, testWriterStart, 0, resources) assert.NoError(t, err) data, err = s.SeekByID(ident.StringID("foo"), resources) @@ -388,7 +388,7 @@ func TestCloneSeeker(t *testing.T) { resources := newTestReusableSeekerResources() s := newTestSeeker(filePathPrefix) - err = s.Open(testNs1ID, 0, testWriterStart.Add(-time.Hour), resources) + err = s.Open(testNs1ID, 0, testWriterStart.Add(-time.Hour), 0, resources) assert.NoError(t, err) clone, err := s.ConcurrentClone() diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index b585e4cb99..f0050cdea0 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -51,7 +51,7 @@ type FileSetFileIdentifier struct { BlockStart time.Time // Only required for data content files Shard uint32 - // Required for snapshot files (index yes, data yes) and flush files (index yes, data no) + // Required for snapshot files (index yes, data yes) and flush files (index yes, data yes) VolumeIndex int } @@ -105,9 +105,9 @@ type SnapshotMetadataFileReader interface { type DataFileSetReaderStatus struct { Namespace ident.ID BlockStart time.Time - - Shard uint32 - Open bool + Shard uint32 + Volume int + Open bool } // DataReaderOpenOptions is options struct for the reader open method. @@ -173,6 +173,7 @@ type DataFileSetSeeker interface { namespace ident.ID, shard uint32, start time.Time, + volume int, resources ReusableSeekerResources, ) error @@ -237,14 +238,14 @@ type DataFileSetSeekerManager interface { // to improve times when seeking to a block. CacheShardIndices(shards []uint32) error - // Borrow returns an open seeker for a given shard and block start time. + // Borrow returns an open seeker for a given shard, block start time, and volume. Borrow(shard uint32, start time.Time) (ConcurrentDataFileSetSeeker, error) - // Return returns an open seeker for a given shard and block start time. + // Return returns an open seeker for a given shard, block start time, and volume. Return(shard uint32, start time.Time, seeker ConcurrentDataFileSetSeeker) error // ConcurrentIDBloomFilter returns a concurrent ID bloom filter for a given - // shard and block start time + // shard, block start time, and volume. ConcurrentIDBloomFilter(shard uint32, start time.Time) (*ManagedConcurrentBloomFilter, error) } @@ -532,6 +533,7 @@ type Merger interface { Merge( fileID FileSetFileIdentifier, mergeWith MergeWith, + nextVolumeIndex int, flushPreparer persist.FlushPreparer, nsCtx namespace.Context, ) error diff --git a/src/dbnode/persist/fs/write.go b/src/dbnode/persist/fs/write.go index ebfe3e589f..b2b3bd944b 100644 --- a/src/dbnode/persist/fs/write.go +++ b/src/dbnode/persist/fs/write.go @@ -145,11 +145,11 @@ func NewWriter(opts Options) (DataFileSetWriter, error) { // opening / truncating files associated with that shard for writing. func (w *writer) Open(opts DataWriterOpenOptions) error { var ( - nextSnapshotIndex int - err error - namespace = opts.Identifier.Namespace - shard = opts.Identifier.Shard - blockStart = opts.Identifier.BlockStart + err error + namespace = opts.Identifier.Namespace + shard = opts.Identifier.Shard + blockStart = opts.Identifier.BlockStart + nextVolumeIndex = opts.Identifier.VolumeIndex ) w.blockSize = opts.BlockSize @@ -178,27 +178,26 @@ func (w *writer) Open(opts DataWriterOpenOptions) error { return err } - nextSnapshotIndex = opts.Identifier.VolumeIndex - w.checkpointFilePath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextSnapshotIndex, checkpointFileSuffix) - infoFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextSnapshotIndex, infoFileSuffix) - indexFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextSnapshotIndex, indexFileSuffix) - summariesFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextSnapshotIndex, summariesFileSuffix) - bloomFilterFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextSnapshotIndex, bloomFilterFileSuffix) - dataFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextSnapshotIndex, dataFileSuffix) - digestFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextSnapshotIndex, digestFileSuffix) + w.checkpointFilePath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, checkpointFileSuffix) + infoFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, infoFileSuffix) + indexFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, indexFileSuffix) + summariesFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, summariesFileSuffix) + bloomFilterFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, bloomFilterFileSuffix) + dataFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, dataFileSuffix) + digestFilepath = filesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, digestFileSuffix) case persist.FileSetFlushType: shardDir = ShardDataDirPath(w.filePathPrefix, namespace, shard) if err := os.MkdirAll(shardDir, w.newDirectoryMode); err != nil { return err } - w.checkpointFilePath = filesetPathFromTime(shardDir, blockStart, checkpointFileSuffix) - infoFilepath = filesetPathFromTime(shardDir, blockStart, infoFileSuffix) - indexFilepath = filesetPathFromTime(shardDir, blockStart, indexFileSuffix) - summariesFilepath = filesetPathFromTime(shardDir, blockStart, summariesFileSuffix) - bloomFilterFilepath = filesetPathFromTime(shardDir, blockStart, bloomFilterFileSuffix) - dataFilepath = filesetPathFromTime(shardDir, blockStart, dataFileSuffix) - digestFilepath = filesetPathFromTime(shardDir, blockStart, digestFileSuffix) + w.checkpointFilePath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, checkpointFileSuffix, false) + infoFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, infoFileSuffix, false) + indexFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, indexFileSuffix, false) + summariesFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, summariesFileSuffix, false) + bloomFilterFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, bloomFilterFileSuffix, false) + dataFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, dataFileSuffix, false) + digestFilepath = dataFilesetPathFromTimeAndIndex(shardDir, blockStart, nextVolumeIndex, digestFileSuffix, false) default: return fmt.Errorf("unable to open reader with fileset type: %s", opts.FileSetType) } diff --git a/src/dbnode/persist/types.go b/src/dbnode/persist/types.go index 4cd178f69f..4912883225 100644 --- a/src/dbnode/persist/types.go +++ b/src/dbnode/persist/types.go @@ -136,19 +136,16 @@ type DataPrepareOptions struct { NamespaceMetadata namespace.Metadata BlockStart time.Time Shard uint32 - FileSetType FileSetType - DeleteIfExists bool + // This volume index is only used when preparing for a flush fileset type. + // When opening a snapshot, the new volume index is determined by looking + // at what files exist on disk. + VolumeIndex int + FileSetType FileSetType + DeleteIfExists bool // Snapshot options are applicable to snapshots (index yes, data yes) Snapshot DataPrepareSnapshotOptions } -// DataPrepareVolumeOptions is the options struct for the prepare method that contains -// information specific to read/writing filesets that have multiple volumes (such as -// snapshots and index file sets). -type DataPrepareVolumeOptions struct { - VolumeIndex int -} - // IndexPrepareOptions is the options struct for the IndexFlush's Prepare method. // nolint: maligned type IndexPrepareOptions struct { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go index cc3258c97c..95ca8f394e 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go @@ -107,19 +107,19 @@ func createTempDir(t *testing.T) string { func writeInfoFile(t *testing.T, prefix string, namespace ident.ID, shard uint32, start time.Time, data []byte) { shardDir := fs.ShardDataDirPath(prefix, namespace, shard) - filePath := path.Join(shardDir, fmt.Sprintf("fileset-%d-info.db", xtime.ToNanoseconds(start))) + filePath := path.Join(shardDir, fmt.Sprintf("fileset-%d-0-info.db", xtime.ToNanoseconds(start))) writeFile(t, filePath, data) } func writeDataFile(t *testing.T, prefix string, namespace ident.ID, shard uint32, start time.Time, data []byte) { shardDir := fs.ShardDataDirPath(prefix, namespace, shard) - filePath := path.Join(shardDir, fmt.Sprintf("fileset-%d-data.db", xtime.ToNanoseconds(start))) + filePath := path.Join(shardDir, fmt.Sprintf("fileset-%d-0-data.db", xtime.ToNanoseconds(start))) writeFile(t, filePath, data) } func writeDigestFile(t *testing.T, prefix string, namespace ident.ID, shard uint32, start time.Time, data []byte) { shardDir := fs.ShardDataDirPath(prefix, namespace, shard) - filePath := path.Join(shardDir, fmt.Sprintf("fileset-%d-digest.db", xtime.ToNanoseconds(start))) + filePath := path.Join(shardDir, fmt.Sprintf("fileset-%d-0-digest.db", xtime.ToNanoseconds(start))) writeFile(t, filePath, data) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 79ff385f92..5903c0d38a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -386,6 +386,14 @@ func (s *peersSource) flush( FileSetType: persistConfig.FileSetType, Shard: shard, BlockStart: start, + // When bootstrapping, the volume index will always be 0. However, + // if we want to be able to snapshot and flush while bootstrapping, + // this may not be the case, e.g. if a flush occurs before a + // bootstrap, then the bootstrap volume index will be >0. In order + // to support this, bootstrapping code will need to incorporate + // merging logic and flush version/volume index will need to be + // synchronized between processes. + VolumeIndex: 0, // If we've peer bootstrapped this shard/block combination AND the fileset // already exists on disk, then that means either: // 1) The Filesystem bootstrapper was unable to bootstrap the fileset diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index e5f0daf325..00cf1a418f 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -134,7 +134,7 @@ func (m *cleanupManager) Cleanup(t time.Time) error { }() multiErr := xerrors.NewMultiError() - if err := m.cleanupExpiredDataFiles(t); err != nil { + if err := m.cleanupDataFiles(t); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when cleaning up data files for %v: %v", t, err)) } @@ -227,7 +227,7 @@ func (m *cleanupManager) deleteInactiveDataFileSetFiles(filesetFilesDirPathFn fu return multiErr.FinalError() } -func (m *cleanupManager) cleanupExpiredDataFiles(t time.Time) error { +func (m *cleanupManager) cleanupDataFiles(t time.Time) error { multiErr := xerrors.NewMultiError() namespaces, err := m.database.GetOwnedNamespaces() if err != nil { @@ -240,6 +240,7 @@ func (m *cleanupManager) cleanupExpiredDataFiles(t time.Time) error { earliestToRetain := retention.FlushTimeStart(n.Options().RetentionOptions(), t) shards := n.GetOwnedShards() multiErr = multiErr.Add(m.cleanupExpiredNamespaceDataFiles(earliestToRetain, shards)) + multiErr = multiErr.Add(m.cleanupCompactedNamespaceDataFiles(shards)) } return multiErr.FinalError() } @@ -275,6 +276,17 @@ func (m *cleanupManager) cleanupExpiredNamespaceDataFiles(earliestToRetain time. return multiErr.FinalError() } +func (m *cleanupManager) cleanupCompactedNamespaceDataFiles(shards []databaseShard) error { + multiErr := xerrors.NewMultiError() + for _, shard := range shards { + if err := shard.CleanupCompactedFileSets(); err != nil { + multiErr = multiErr.Add(err) + } + } + + return multiErr.FinalError() +} + // The goal of the cleanupSnapshotsAndCommitlogs function is to delete all snapshots files, snapshot metadata // files, and commitlog files except for those that are currently required for recovery from a node failure. // According to the snapshotting / commitlog rotation logic, the files that are required for a complete diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index 7b5729b92b..cdecdf9008 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -286,6 +286,8 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) { shard := NewMockdatabaseShard(ctrl) shard.EXPECT().ID().Return(uint32(i)).AnyTimes() shard.EXPECT().CleanupExpiredFileSets(gomock.Any()).Return(nil).AnyTimes() + shard.EXPECT().CleanupCompactedFileSets().Return(nil).AnyTimes() + shards = append(shards, shard) } @@ -405,6 +407,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { shard := NewMockdatabaseShard(ctrl) expectedEarliestToRetain := retention.FlushTimeStart(ns.Options().RetentionOptions(), ts) shard.EXPECT().CleanupExpiredFileSets(expectedEarliestToRetain).Return(nil) + shard.EXPECT().CleanupCompactedFileSets().Return(nil) shard.EXPECT().ID().Return(uint32(0)).AnyTimes() ns.EXPECT().GetOwnedShards().Return([]databaseShard{shard}).AnyTimes() ns.EXPECT().ID().Return(ident.StringID("nsID")).AnyTimes() diff --git a/src/dbnode/storage/namespace_readers.go b/src/dbnode/storage/namespace_readers.go index 727f6a610e..b299726765 100644 --- a/src/dbnode/storage/namespace_readers.go +++ b/src/dbnode/storage/namespace_readers.go @@ -24,8 +24,8 @@ import ( "sync" "time" - "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" xtime "github.com/m3db/m3/src/x/time" @@ -76,11 +76,12 @@ type databaseNamespaceReaderManager interface { close() } -type fsFileSetExistsAtFn func( +type fsFileSetExistsFn func( prefix string, namespace ident.ID, shard uint32, blockStart time.Time, + volume int, ) (bool, error) type fsNewReaderFn func( @@ -91,8 +92,8 @@ type fsNewReaderFn func( type namespaceReaderManager struct { sync.Mutex - filesetExistsAtFn fsFileSetExistsAtFn - newReaderFn fsNewReaderFn + filesetExistsFn fsFileSetExistsFn + newReaderFn fsNewReaderFn namespace namespace.Metadata fsOpts fs.Options @@ -109,6 +110,7 @@ type namespaceReaderManager struct { type cachedOpenReaderKey struct { shard uint32 blockStart xtime.UnixNano + volume int position readerPosition } @@ -149,14 +151,14 @@ func newNamespaceReaderManager( opts Options, ) databaseNamespaceReaderManager { return &namespaceReaderManager{ - filesetExistsAtFn: fs.DataFileSetExistsAt, - newReaderFn: fs.NewReader, - namespace: namespace, - fsOpts: opts.CommitLogOptions().FilesystemOptions(), - bytesPool: opts.BytesPool(), - logger: opts.InstrumentOptions().Logger(), - openReaders: make(map[cachedOpenReaderKey]cachedReader), - metrics: newNamespaceReaderManagerMetrics(namespaceScope), + filesetExistsFn: fs.DataFileSetExists, + newReaderFn: fs.NewReader, + namespace: namespace, + fsOpts: opts.CommitLogOptions().FilesystemOptions(), + bytesPool: opts.BytesPool(), + logger: opts.InstrumentOptions().Logger(), + openReaders: make(map[cachedOpenReaderKey]cachedReader), + metrics: newNamespaceReaderManagerMetrics(namespaceScope), } } @@ -164,8 +166,10 @@ func (m *namespaceReaderManager) filesetExistsAt( shard uint32, blockStart time.Time, ) (bool, error) { - return m.filesetExistsAtFn(m.fsOpts.FilePathPrefix(), - m.namespace.ID(), shard, blockStart) + // TODO(juchan): get the actual volume here. + vol := 0 + return m.filesetExistsFn(m.fsOpts.FilePathPrefix(), + m.namespace.ID(), shard, blockStart, vol) } type cachedReaderForKeyResult struct { @@ -251,11 +255,14 @@ func (m *namespaceReaderManager) get( // We have a closed reader from the cache (either a cached closed // reader or newly allocated, either way need to prepare it) reader := lookup.closedReader + // TODO(juchan): get the actual volume here. + vol := 0 openOpts := fs.DataReaderOpenOptions{ Identifier: fs.FileSetFileIdentifier{ - Namespace: m.namespace.ID(), - Shard: shard, - BlockStart: blockStart, + Namespace: m.namespace.ID(), + Shard: shard, + BlockStart: blockStart, + VolumeIndex: vol, }, } if err := reader.Open(openOpts); err != nil { @@ -303,6 +310,7 @@ func (m *namespaceReaderManager) put(reader fs.DataFileSetReader) { key := cachedOpenReaderKey{ shard: status.Shard, blockStart: xtime.ToUnixNano(status.BlockStart), + volume: status.Volume, position: readerPosition{ dataIdx: reader.EntriesRead(), metadataIdx: reader.MetadataRead(), diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 26ea4373fb..345c72e6bf 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -73,7 +73,13 @@ var ( errNewShardEntryTagsIterNotAtIndexZero = errors.New("new shard entry options error: tags iter not at index zero") ) -type filesetBeforeFn func( +type filesetsFn func( + filePathPrefix string, + namespace ident.ID, + shardID uint32, +) (fs.FileSetFilesSlice, error) + +type filesetPathsBeforeFn func( filePathPrefix string, namespace ident.ID, shardID uint32, @@ -150,7 +156,8 @@ type dbShard struct { bootstrapState BootstrapState newMergerFn fs.NewMergerFn newFSMergeWithMemFn newFSMergeWithMemFn - filesetBeforeFn filesetBeforeFn + filesetsFn filesetsFn + filesetPathsBeforeFn filesetPathsBeforeFn deleteFilesFn deleteFilesFn snapshotFilesFn snapshotFilesFn sleepFn func(time.Duration) @@ -247,30 +254,31 @@ func newDatabaseShard( SubScope("dbshard") s := &dbShard{ - opts: opts, - seriesOpts: seriesOpts, - nowFn: opts.ClockOptions().NowFn(), - state: dbShardStateOpen, - namespace: namespaceMetadata, - shard: shard, - namespaceReaderMgr: namespaceReaderMgr, - increasingIndex: increasingIndex, - seriesPool: opts.DatabaseSeriesPool(), - reverseIndex: reverseIndex, - lookup: newShardMap(shardMapOptions{}), - list: list.New(), - newMergerFn: fs.NewMerger, - newFSMergeWithMemFn: newFSMergeWithMem, - filesetBeforeFn: fs.DataFileSetsBefore, - deleteFilesFn: fs.DeleteFiles, - snapshotFilesFn: fs.SnapshotFiles, - sleepFn: time.Sleep, - identifierPool: opts.IdentifierPool(), - contextPool: opts.ContextPool(), - flushState: newShardFlushState(), - tickWg: &sync.WaitGroup{}, - logger: opts.InstrumentOptions().Logger(), - metrics: newDatabaseShardMetrics(shard, scope), + opts: opts, + seriesOpts: seriesOpts, + nowFn: opts.ClockOptions().NowFn(), + state: dbShardStateOpen, + namespace: namespaceMetadata, + shard: shard, + namespaceReaderMgr: namespaceReaderMgr, + increasingIndex: increasingIndex, + seriesPool: opts.DatabaseSeriesPool(), + reverseIndex: reverseIndex, + lookup: newShardMap(shardMapOptions{}), + list: list.New(), + newMergerFn: fs.NewMerger, + newFSMergeWithMemFn: newFSMergeWithMem, + filesetsFn: fs.DataFiles, + filesetPathsBeforeFn: fs.DataFileSetsBefore, + deleteFilesFn: fs.DeleteFiles, + snapshotFilesFn: fs.SnapshotFiles, + sleepFn: time.Sleep, + identifierPool: opts.IdentifierPool(), + contextPool: opts.ContextPool(), + flushState: newShardFlushState(), + tickWg: &sync.WaitGroup{}, + logger: opts.InstrumentOptions().Logger(), + metrics: newDatabaseShardMetrics(shard, scope), } s.insertQueue = newDatabaseShardInsertQueue(s.insertSeriesBatch, s.nowFn, scope) @@ -1890,10 +1898,13 @@ func (s *dbShard) WarmFlush( NamespaceMetadata: s.namespace, Shard: s.ID(), BlockStart: blockStart, + // Volume index is always 0 for warm flushes because a warm flush must + // happen first before cold flushes happen. + VolumeIndex: 0, // We explicitly set delete if exists to false here as we track which - // filesets exists at bootstrap time so we should never encounter a time - // when we attempt to flush and a fileset already exists unless there is - // racing competing processes. + // filesets exist at bootstrap time so we should never encounter a time + // where a fileset already exists when we attempt to flush unless there + // is a bug in the code. DeleteIfExists: false, } prepared, err := flushPreparer.PrepareData(prepareOpts) @@ -1992,24 +2003,25 @@ func (s *dbShard) ColdFlush( // a block, we continue to try persisting other blocks. for blockStart := range dirtySeriesToWrite { startTime := blockStart.ToTime() + coldVersion := s.RetrievableBlockColdVersion(startTime) fsID := fs.FileSetFileIdentifier{ - Namespace: s.namespace.ID(), - Shard: s.ID(), - BlockStart: startTime, + Namespace: s.namespace.ID(), + Shard: s.ID(), + BlockStart: startTime, + VolumeIndex: coldVersion, } - err := merger.Merge(fsID, mergeWithMem, flushPreparer, nsCtx) + nextVersion := coldVersion + 1 + err := merger.Merge(fsID, mergeWithMem, nextVersion, flushPreparer, nsCtx) if err != nil { multiErr = multiErr.Add(err) continue } // After writing the full block successfully, update the cold version - // in the flush state. - nextVersion := s.RetrievableBlockColdVersion(startTime) + 1 - - // Once this function completes block leasers will no longer be able to acquire - // leases on previous volumes for the given namespace/shard/blockstart. + // in the flush state. Once this function completes block leasers will + // no longer be able to acquire leases on previous volumes for the given + // namespace/shard/blockstart. s.setFlushStateColdVersion(startTime, nextVersion) // Notify all block leasers that a new volume for the namespace/shard/blockstart @@ -2156,18 +2168,36 @@ func (s *dbShard) removeAnyFlushStatesTooEarly(tickStart time.Time) { func (s *dbShard) CleanupExpiredFileSets(earliestToRetain time.Time) error { filePathPrefix := s.opts.CommitLogOptions().FilesystemOptions().FilePathPrefix() - multiErr := xerrors.NewMultiError() - expired, err := s.filesetBeforeFn(filePathPrefix, s.namespace.ID(), s.ID(), earliestToRetain) + expired, err := s.filesetPathsBeforeFn(filePathPrefix, s.namespace.ID(), s.ID(), earliestToRetain) if err != nil { - detailedErr := - fmt.Errorf("encountered errors when getting fileset files for prefix %s namespace %s shard %d: %v", - filePathPrefix, s.namespace.ID(), s.ID(), err) - multiErr = multiErr.Add(detailedErr) + return fmt.Errorf("encountered errors when getting fileset files for prefix %s namespace %s shard %d: %v", + filePathPrefix, s.namespace.ID(), s.ID(), err) } - if err := s.deleteFilesFn(expired); err != nil { - multiErr = multiErr.Add(err) + + return s.deleteFilesFn(expired) +} + +func (s *dbShard) CleanupCompactedFileSets() error { + filePathPrefix := s.opts.CommitLogOptions().FilesystemOptions().FilePathPrefix() + filesets, err := s.filesetsFn(filePathPrefix, s.namespace.ID(), s.ID()) + if err != nil { + return fmt.Errorf("encountered errors when getting fileset files for prefix %s namespace %s shard %d: %v", + filePathPrefix, s.namespace.ID(), s.ID(), err) } - return multiErr.FinalError() + // Get a snapshot of all states here to prevent constantly getting/releasing + // locks in a tight loop below. This snapshot won't become stale halfway + // through this because flushing and cleanup never happen in parallel. + blockStates := s.BlockStatesSnapshot() + toDelete := fs.FileSetFilesSlice(make([]fs.FileSetFile, 0, len(filesets))) + for _, datafile := range filesets { + fileID := datafile.ID + blockState := blockStates[xtime.ToUnixNano(fileID.BlockStart)] + if fileID.VolumeIndex < blockState.ColdVersion { + toDelete = append(toDelete, datafile) + } + } + + return s.deleteFilesFn(toDelete.Filepaths()) } func (s *dbShard) Repair( diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 00e037def4..9be40f9a03 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -425,6 +425,7 @@ type noopMerger struct{} func (m *noopMerger) Merge( fileID fs.FileSetFileIdentifier, mergeWith fs.MergeWith, + nextVersion int, flushPreparer persist.FlushPreparer, nsCtx namespace.Context, ) error { @@ -1136,7 +1137,7 @@ func TestShardCleanupExpiredFileSets(t *testing.T) { opts := DefaultTestOptions() shard := testDatabaseShard(t, opts) defer shard.Close() - shard.filesetBeforeFn = func(_ string, namespace ident.ID, shardID uint32, t time.Time) ([]string, error) { + shard.filesetPathsBeforeFn = func(_ string, namespace ident.ID, shardID uint32, t time.Time) ([]string, error) { return []string{namespace.String(), strconv.Itoa(int(shardID))}, nil } var deletedFiles []string diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 4c048812fb..4431f7bdb0 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1789,6 +1789,20 @@ func (mr *MockdatabaseShardMockRecorder) CleanupExpiredFileSets(earliestToRetain return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanupExpiredFileSets", reflect.TypeOf((*MockdatabaseShard)(nil).CleanupExpiredFileSets), earliestToRetain) } +// CleanupCompactedFileSets mocks base method +func (m *MockdatabaseShard) CleanupCompactedFileSets() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CleanupCompactedFileSets") + ret0, _ := ret[0].(error) + return ret0 +} + +// CleanupCompactedFileSets indicates an expected call of CleanupCompactedFileSets +func (mr *MockdatabaseShardMockRecorder) CleanupCompactedFileSets() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanupCompactedFileSets", reflect.TypeOf((*MockdatabaseShard)(nil).CleanupCompactedFileSets)) +} + // Repair mocks base method func (m *MockdatabaseShard) Repair(ctx context.Context, nsCtx namespace.Context, tr time0.Range, repairer databaseShardRepairer) (repair.MetadataComparisonResult, error) { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 78e02a3aa4..088c90fbdd 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -494,6 +494,11 @@ type databaseShard interface { // CleanupExpiredFileSets removes expired fileset files. CleanupExpiredFileSets(earliestToRetain time.Time) error + // CleanupCompactedFileSets removes fileset files that have been compacted, + // meaning that there exists a more recent, superset, fully persisted + // fileset for that block. + CleanupCompactedFileSets() error + // Repair repairs the shard data for a given time. Repair( ctx context.Context,