diff --git a/src/dbnode/integration/disk_flush_helpers.go b/src/dbnode/integration/disk_flush_helpers.go index d550fe8198..def6fce34e 100644 --- a/src/dbnode/integration/disk_flush_helpers.go +++ b/src/dbnode/integration/disk_flush_helpers.go @@ -46,23 +46,59 @@ var ( errDiskFlushTimedOut = errors.New("flushing data to disk took too long") ) +type snapshotID struct { + blockStart time.Time + minVolume int +} + +func getLatestSnapshotVolumeIndex( + filePathPrefix string, + shardSet sharding.ShardSet, + namespace ident.ID, + blockStart time.Time, +) int { + latestVolumeIndex := -1 + + for _, shard := range shardSet.AllIDs() { + snapshotFiles, err := fs.SnapshotFiles( + filePathPrefix, namespace, shard) + if err != nil { + panic(err) + } + latestSnapshot, ok := snapshotFiles.LatestVolumeForBlock(blockStart) + if !ok { + continue + } + if latestSnapshot.ID.VolumeIndex > latestVolumeIndex { + latestVolumeIndex = latestSnapshot.ID.VolumeIndex + } + } + + return latestVolumeIndex +} + func waitUntilSnapshotFilesFlushed( filePathPrefix string, shardSet sharding.ShardSet, namespace ident.ID, - expectedSnapshotTimes []time.Time, + expectedSnapshots []snapshotID, timeout time.Duration, ) error { dataFlushed := func() bool { for _, shard := range shardSet.AllIDs() { - for _, t := range expectedSnapshotTimes { - exists, err := fs.SnapshotFileSetExistsAt( - filePathPrefix, namespace, shard, t) + for _, e := range expectedSnapshots { + snapshotFiles, err := fs.SnapshotFiles( + filePathPrefix, namespace, shard) if err != nil { panic(err) } - if !exists { + latest, ok := snapshotFiles.LatestVolumeForBlock(e.blockStart) + if !ok { + return false + } + + if !(latest.ID.VolumeIndex >= e.minVolume) { return false } } @@ -202,18 +238,17 @@ func verifySnapshottedDataFiles( shardSet sharding.ShardSet, storageOpts storage.Options, namespace ident.ID, - snapshotTime time.Time, seriesMaps map[xtime.UnixNano]generate.SeriesBlock, ) { fsOpts := storageOpts.CommitLogOptions().FilesystemOptions() reader, err := fs.NewReader(storageOpts.BytesPool(), fsOpts) require.NoError(t, err) iteratorPool := storageOpts.ReaderIteratorPool() - for _, ns := range testNamespaces { - for _, seriesList := range seriesMaps { - verifyForTime( - t, storageOpts, reader, shardSet, iteratorPool, snapshotTime, - ns, persist.FileSetSnapshotType, seriesList) - } + for blockStart, seriesList := range seriesMaps { + + verifyForTime( + t, storageOpts, reader, shardSet, iteratorPool, blockStart.ToTime(), + namespace, persist.FileSetSnapshotType, seriesList) } + } diff --git a/src/dbnode/integration/disk_snapshot_test.go b/src/dbnode/integration/disk_snapshot_test.go index 842554b5e4..8d5dc287da 100644 --- a/src/dbnode/integration/disk_snapshot_test.go +++ b/src/dbnode/integration/disk_snapshot_test.go @@ -39,7 +39,19 @@ func TestDiskSnapshotSimple(t *testing.T) { t.SkipNow() // Just skip if we're doing a short run } // Test setup - nOpts := namespace.NewOptions().SetSnapshotEnabled(true) + var ( + nOpts = namespace.NewOptions(). + SetSnapshotEnabled(true) + bufferPast = 50 * time.Minute + bufferFuture = 50 * time.Minute + blockSize = time.Hour + ) + + nOpts = nOpts. + SetRetentionOptions(nOpts.RetentionOptions(). + SetBufferFuture(bufferFuture). + SetBufferPast(bufferPast). + SetBlockSize(blockSize)) md1, err := namespace.NewMetadata(testNamespaces[0], nOpts) require.NoError(t, err) md2, err := namespace.NewMetadata(testNamespaces[1], nOpts) @@ -52,8 +64,7 @@ func TestDiskSnapshotSimple(t *testing.T) { require.NoError(t, err) defer testSetup.close() - blockSize := md1.Options().RetentionOptions().BlockSize() - filePathPrefix := testSetup.storageOpts.CommitLogOptions().FilesystemOptions().FilePathPrefix() + shardSet := testSetup.shardSet // Start the server log := testSetup.storageOpts.InstrumentOptions().Logger() @@ -68,39 +79,99 @@ func TestDiskSnapshotSimple(t *testing.T) { }() // Write test data - now := testSetup.getNowFn() - seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock) - inputData := []generate.BlockConfig{ - {IDs: []string{"foo", "bar", "baz"}, NumPoints: 100, Start: now}, - } + var ( + currBlock = testSetup.getNowFn().Truncate(blockSize) + now = currBlock.Add(11 * time.Minute) + assertTimeAllowsWritesToAllBlocks = func(ti time.Time) { + // Make sure now is within bufferPast of the previous block + require.True(t, ti.Before(ti.Truncate(blockSize).Add(bufferPast))) + // Make sure now is within bufferFuture of the next block + require.True(t, ti.After(ti.Truncate(blockSize).Add(blockSize).Add(-bufferFuture))) + } + ) + + assertTimeAllowsWritesToAllBlocks(now) + testSetup.setNowFn(now) + + var ( + seriesMaps = make(map[xtime.UnixNano]generate.SeriesBlock) + inputData = []generate.BlockConfig{ + // Writes in the previous block which should be mutable due to bufferPast + {IDs: []string{"foo", "bar", "baz"}, NumPoints: 5, Start: currBlock.Add(-10 * time.Minute)}, + // Writes in the current block + {IDs: []string{"a", "b", "c"}, NumPoints: 30, Start: currBlock}, + // Writes in the next block which should be mutable due to bufferFuture + {IDs: []string{"1", "2", "3"}, NumPoints: 30, Start: currBlock.Add(blockSize)}, + } + ) for _, input := range inputData { - testSetup.setNowFn(input.Start) testData := generate.Block(input) - seriesMaps[xtime.ToUnixNano(input.Start)] = testData + seriesMaps[xtime.ToUnixNano(input.Start.Truncate(blockSize))] = testData for _, ns := range testSetup.namespaces { require.NoError(t, testSetup.writeBatch(ns.ID(), testData)) } } - now = testSetup.getNowFn().Add(blockSize).Add(-10 * time.Minute) + // Now that we've completed the writes, we need to make sure that we wait for + // the next snapshot to guarantee that it should contain all the writes. We do + // this by measuring the current highest snapshot volume index and then updating + // the time (to allow another snapshot process to occur due to the configurable + // minimum time between snapshots), and then waiting for the snapshot files with + // the measure volume index + 1. + var ( + snapshotsToWaitForByNS = make([][]snapshotID, 0, len(testSetup.namespaces)) + filePathPrefix = testSetup.storageOpts. + CommitLogOptions(). + FilesystemOptions(). + FilePathPrefix() + ) + for _, ns := range testSetup.namespaces { + snapshotsToWaitForByNS = append(snapshotsToWaitForByNS, []snapshotID{ + { + blockStart: currBlock.Add(-blockSize), + minVolume: getLatestSnapshotVolumeIndex( + filePathPrefix, shardSet, ns.ID(), currBlock.Add(-blockSize)) + 1, + }, + { + blockStart: currBlock, + minVolume: getLatestSnapshotVolumeIndex( + filePathPrefix, shardSet, ns.ID(), currBlock) + 1, + }, + { + blockStart: currBlock.Add(blockSize), + minVolume: getLatestSnapshotVolumeIndex( + filePathPrefix, shardSet, ns.ID(), currBlock.Add(blockSize)) + 1, + }, + }) + } + + now = testSetup.getNowFn().Add(2 * time.Minute) + assertTimeAllowsWritesToAllBlocks(now) testSetup.setNowFn(now) + maxWaitTime := time.Minute - for _, ns := range testSetup.namespaces { - require.NoError(t, waitUntilSnapshotFilesFlushed(filePathPrefix, testSetup.shardSet, ns.ID(), []time.Time{now.Truncate(blockSize)}, maxWaitTime)) - verifySnapshottedDataFiles(t, testSetup.shardSet, testSetup.storageOpts, ns.ID(), now.Truncate(blockSize), seriesMaps) + for i, ns := range testSetup.namespaces { + log.Info("waiting for snapshot files to flush") + require.NoError(t, waitUntilSnapshotFilesFlushed( + filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime)) + log.Info("verifying snapshot files") + verifySnapshottedDataFiles(t, shardSet, testSetup.storageOpts, ns.ID(), seriesMaps) } - oldTime := testSetup.getNowFn() - newTime := oldTime.Add(blockSize * 2) + var ( + oldTime = testSetup.getNowFn() + newTime = oldTime.Add(blockSize * 2) + ) testSetup.setNowFn(newTime) - testSetup.sleepFor10xTickMinimumInterval() for _, ns := range testSetup.namespaces { - // Make sure new snapshotfiles are written out - require.NoError(t, waitUntilSnapshotFilesFlushed(filePathPrefix, testSetup.shardSet, ns.ID(), []time.Time{newTime.Truncate(blockSize)}, maxWaitTime)) - for _, shard := range testSetup.shardSet.All() { + log.Info("waiting for new snapshot files to be written out") + snapshotsToWaitFor := []snapshotID{{blockStart: newTime.Truncate(blockSize)}} + require.NoError(t, waitUntilSnapshotFilesFlushed( + filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)) + log.Info("waiting for old snapshot files to be deleted") + for _, shard := range shardSet.All() { waitUntil(func() bool { - // Make sure old snapshot files are deleted exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), shard.ID(), oldTime.Truncate(blockSize)) require.NoError(t, err) return !exists diff --git a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go index 4949bcc735..b6926c29a8 100644 --- a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go +++ b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go @@ -236,7 +236,9 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) { filePathPrefix, setup.shardSet, nsID, - []time.Time{snapshotBlock}, maxFlushWaitTime) + []snapshotID{{blockStart: snapshotBlock}}, + maxFlushWaitTime, + ) if err != nil { return false, fmt.Errorf("error waiting for snapshot files: %s", err.Error()) } diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index bde972724c..38bb78c4aa 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -62,17 +62,22 @@ type flushManager struct { isFlushing tally.Gauge isSnapshotting tally.Gauge isIndexFlushing tally.Gauge + + // This is a "debug" metric for making sure that the snapshotting process + // is not overly aggressive. + maxBlocksSnapshottedByNamespace tally.Gauge } func newFlushManager(database database, scope tally.Scope) databaseFlushManager { opts := database.Options() return &flushManager{ - database: database, - opts: opts, - pm: opts.PersistManager(), - isFlushing: scope.Gauge("flush"), - isSnapshotting: scope.Gauge("snapshot"), - isIndexFlushing: scope.Gauge("index-flush"), + database: database, + opts: opts, + pm: opts.PersistManager(), + isFlushing: scope.Gauge("flush"), + isSnapshotting: scope.Gauge("snapshot"), + isIndexFlushing: scope.Gauge("index-flush"), + maxBlocksSnapshottedByNamespace: scope.Gauge("max-blocks-snapshotted-by-namespace"), } } @@ -102,6 +107,15 @@ func (m *flushManager) Flush( return err } + // Perform two separate loops through all the namespaces so that we can emit better + // gauges I.E all the flushing for all the namespaces happens at once and then all + // the snapshotting for all the namespaces happens at once. This is also slightly + // better semantically because flushing should take priority over snapshotting. + // + // In addition, we need to make sure that for any given shard/blockStart combination, + // we attempt a flush before a snapshot as the snapshotting process will attempt to + // snapshot any unflushed blocks which would be wasteful if the block is already + // flushable. multiErr := xerrors.NewMultiError() m.setState(flushManagerFlushInProgress) for _, ns := range namespaces { @@ -117,28 +131,36 @@ func (m *flushManager) Flush( multiErr = multiErr.Add(m.flushNamespaceWithTimes(ns, shardBootstrapTimes, flushTimes, flush)) } - // Perform two separate loops through all the namespaces so that we can emit better - // gauges I.E all the flushing for all the namespaces happens at once and then all - // the snapshotting for all the namespaces happens at once. This is also slightly - // better semantically because flushing should take priority over snapshotting. m.setState(flushManagerSnapshotInProgress) + maxBlocksSnapshottedByNamespace := 0 for _, ns := range namespaces { var ( - blockSize = ns.Options().RetentionOptions().BlockSize() - snapshotBlockStart = m.snapshotBlockStart(ns, tickStart) - prevBlockStart = snapshotBlockStart.Add(-blockSize) + snapshotBlockStarts = m.namespaceSnapshotTimes(ns, tickStart) + shardBootstrapTimes, ok = dbBootstrapStateAtTickStart.NamespaceBootstrapStates[ns.ID().String()] ) - // Only perform snapshots if the previous block (I.E the block directly before - // the block that we would snapshot) has been flushed. - if !ns.NeedsFlush(prevBlockStart, prevBlockStart) { - if err := ns.Snapshot(snapshotBlockStart, tickStart, flush); err != nil { + if !ok { + // Could happen if namespaces are added / removed. + multiErr = multiErr.Add(fmt.Errorf( + "tried to flush ns: %s, but did not have shard bootstrap times", ns.ID().String())) + continue + } + + if len(snapshotBlockStarts) > maxBlocksSnapshottedByNamespace { + maxBlocksSnapshottedByNamespace = len(snapshotBlockStarts) + } + for _, snapshotBlockStart := range snapshotBlockStarts { + err := ns.Snapshot( + snapshotBlockStart, tickStart, shardBootstrapTimes, flush) + + if err != nil { detailedErr := fmt.Errorf("namespace %s failed to snapshot data: %v", ns.ID().String(), err) multiErr = multiErr.Add(detailedErr) } } } + m.maxBlocksSnapshottedByNamespace.Update(float64(maxBlocksSnapshottedByNamespace)) // mark data flush finished multiErr = multiErr.Add(flush.DoneData()) @@ -198,26 +220,8 @@ func (m *flushManager) setState(state flushManagerState) { m.Unlock() } -func (m *flushManager) snapshotBlockStart(ns databaseNamespace, curr time.Time) time.Time { - var ( - rOpts = ns.Options().RetentionOptions() - blockSize = rOpts.BlockSize() - bufferPast = rOpts.BufferPast() - ) - // Only begin snapshotting a new block once the previous one is immutable. I.E if we have - // a 2-hour blocksize, and bufferPast is 10 minutes and our blocks are aligned on even hours, - // then at: - // 1) 1:30PM we want to snapshot with a 12PM block start and 1:30.Add(-10min).Truncate(2hours) = 12PM - // 2) 1:59PM we want to snapshot with a 12PM block start and 1:59.Add(-10min).Truncate(2hours) = 12PM - // 3) 2:09PM we want to snapshot with a 12PM block start (because the 12PM block can still be receiving - // "buffer past" writes) and 2:09.Add(-10min).Truncate(2hours) = 12PM - // 4) 2:10PM we want to snapshot with a 2PM block start (because the 12PM block can no long receive - // "buffer past" writes) and 2:10.Add(-10min).Truncate(2hours) = 2PM - return curr.Add(-bufferPast).Truncate(blockSize) -} - -func (m *flushManager) flushRange(ropts retention.Options, t time.Time) (time.Time, time.Time) { - return retention.FlushTimeStart(ropts, t), retention.FlushTimeEnd(ropts, t) +func (m *flushManager) flushRange(rOpts retention.Options, t time.Time) (time.Time, time.Time) { + return retention.FlushTimeStart(rOpts, t), retention.FlushTimeEnd(rOpts, t) } func (m *flushManager) namespaceFlushTimes(ns databaseNamespace, curr time.Time) []time.Time { @@ -233,6 +237,28 @@ func (m *flushManager) namespaceFlushTimes(ns databaseNamespace, curr time.Time) }) } +func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Time) []time.Time { + var ( + rOpts = ns.Options().RetentionOptions() + blockSize = rOpts.BlockSize() + // Earliest possible snapshottable block is the earliest possible flushable + // blockStart which is the first block in the retention period. + earliest = retention.FlushTimeStart(rOpts, curr) + // Latest possible snapshotting block is either the current block OR the + // next block if the current time and bufferFuture configuration would + // allow writes to be written into the next block. Note that "current time" + // here is defined as "tick start time" because all the guarantees about + // snapshotting are based around the tick start time, now the current time. + latest = curr.Add(rOpts.BufferFuture()).Truncate(blockSize) + ) + + candidateTimes := timesInRange(earliest, latest, blockSize) + return filterTimes(candidateTimes, func(t time.Time) bool { + // Snapshot anything that is unflushed. + return ns.NeedsFlush(t, t) + }) +} + // flushWithTime flushes in-memory data for a given namespace, at a given // time, returning any error encountered during flushing func (m *flushManager) flushNamespaceWithTimes( diff --git a/src/dbnode/storage/flush_test.go b/src/dbnode/storage/flush_test.go index 8dd1525e5e..1b3df42cdb 100644 --- a/src/dbnode/storage/flush_test.go +++ b/src/dbnode/storage/flush_test.go @@ -172,6 +172,7 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true).AnyTimes() ns.EXPECT().Flush(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() mockFlusher := persist.NewMockDataFlush(ctrl) mockFlusher.EXPECT().DoneData().Return(nil) @@ -209,6 +210,7 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true).AnyTimes() ns.EXPECT().Flush(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().FlushIndex(gomock.Any()).Return(nil) mockFlusher := persist.NewMockDataFlush(ctrl) @@ -352,56 +354,24 @@ func TestFlushManagerFlushSnapshot(t *testing.T) { for _, ns := range []*MockdatabaseNamespace{ns1, ns2} { rOpts := ns.Options().RetentionOptions() blockSize := rOpts.BlockSize() - bufferPast := rOpts.BufferPast() + bufferFuture := rOpts.BufferFuture() start := retention.FlushTimeStart(ns.Options().RetentionOptions(), now) - end := retention.FlushTimeEnd(ns.Options().RetentionOptions(), now) - num := numIntervals(start, end, blockSize) + flushEnd := retention.FlushTimeEnd(ns.Options().RetentionOptions(), now) + num := numIntervals(start, flushEnd, blockSize) for i := 0; i < num; i++ { st := start.Add(time.Duration(i) * blockSize) ns.EXPECT().NeedsFlush(st, st).Return(false) } - currBlockStart := now.Add(-bufferPast).Truncate(blockSize) - prevBlockStart := currBlockStart.Add(-blockSize) - ns.EXPECT().NeedsFlush(prevBlockStart, prevBlockStart).Return(false) - ns.EXPECT().Snapshot(currBlockStart, now, gomock.Any()) - } - - bootstrapStates := DatabaseBootstrapState{ - NamespaceBootstrapStates: map[string]ShardBootstrapStates{ - ns1.ID().String(): ShardBootstrapStates{}, - ns2.ID().String(): ShardBootstrapStates{}, - }, - } - require.NoError(t, fm.Flush(now, bootstrapStates)) -} - -func TestFlushManagerFlushNoSnapshotWhileFlushPending(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - fm, ns1, ns2 := newMultipleFlushManagerNeedsFlush(t, ctrl) - now := time.Now() - - for _, ns := range []*MockdatabaseNamespace{ns1, ns2} { - rOpts := ns.Options().RetentionOptions() - blockSize := rOpts.BlockSize() - bufferPast := rOpts.BufferPast() - - start := retention.FlushTimeStart(ns.Options().RetentionOptions(), now) - end := retention.FlushTimeEnd(ns.Options().RetentionOptions(), now) - num := numIntervals(start, end, blockSize) - + snapshotEnd := now.Add(bufferFuture).Truncate(blockSize) + num = numIntervals(start, snapshotEnd, blockSize) for i := 0; i < num; i++ { st := start.Add(time.Duration(i) * blockSize) - ns.EXPECT().NeedsFlush(st, st).Return(false) + ns.EXPECT().NeedsFlush(st, st).Return(true) + ns.EXPECT().Snapshot(st, now, gomock.Any(), gomock.Any()) } - - currBlockStart := now.Add(-bufferPast).Truncate(blockSize) - prevBlockStart := currBlockStart.Add(-blockSize) - ns.EXPECT().NeedsFlush(prevBlockStart, prevBlockStart).Return(true) } bootstrapStates := DatabaseBootstrapState{ @@ -413,50 +383,6 @@ func TestFlushManagerFlushNoSnapshotWhileFlushPending(t *testing.T) { require.NoError(t, fm.Flush(now, bootstrapStates)) } -func TestFlushManagerSnapshotBlockStart(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - fm, _, _ := newMultipleFlushManagerNeedsFlush(t, ctrl) - now := time.Now() - - nsOpts := namespace.NewOptions() - rOpts := nsOpts.RetentionOptions(). - SetBlockSize(2 * time.Hour). - SetBufferPast(10 * time.Minute) - blockSize := rOpts.BlockSize() - nsOpts = nsOpts.SetRetentionOptions(rOpts) - ns := NewMockdatabaseNamespace(ctrl) - ns.EXPECT().Options().Return(nsOpts).AnyTimes() - - testCases := []struct { - currTime time.Time - expectedBlockStart time.Time - }{ - // Set comment in snapshotBlockStart for explanation of these test cases - { - currTime: now.Truncate(blockSize).Add(30 * time.Minute), - expectedBlockStart: now.Truncate(blockSize), - }, - { - currTime: now.Truncate(blockSize).Add(119 * time.Minute), - expectedBlockStart: now.Truncate(blockSize), - }, - { - currTime: now.Truncate(blockSize).Add(129 * time.Minute), - expectedBlockStart: now.Truncate(blockSize), - }, - { - currTime: now.Truncate(blockSize).Add(130 * time.Minute), - expectedBlockStart: now.Truncate(blockSize).Add(blockSize), - }, - } - - for _, tc := range testCases { - require.Equal(t, tc.expectedBlockStart, fm.snapshotBlockStart(ns, tc.currTime)) - } -} - type timesInOrder []time.Time func (a timesInOrder) Len() int { return len(a) } diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 2e165989dc..d6d62817b9 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -830,6 +830,11 @@ func (n *dbNamespace) Flush( // before the previous tick which means that we have no guarantee that all // bootstrapped blocks have been rotated out of the series buffer buckets, // so we wait until the next opportunity. + n.log. + WithFields(xlog.NewField("shard", shard.ID())). + WithFields(xlog.NewField("bootstrapStateBeforeTick", shardBootstrapStateBeforeTick)). + WithFields(xlog.NewField("bootstrapStateExists", ok)). + Debug("skipping snapshot due to shard bootstrap state before tick") continue } @@ -873,7 +878,11 @@ func (n *dbNamespace) FlushIndex( return err } -func (n *dbNamespace) Snapshot(blockStart, snapshotTime time.Time, flush persist.DataFlush) error { +func (n *dbNamespace) Snapshot( + blockStart, + snapshotTime time.Time, + shardBootstrapStatesAtTickStart ShardBootstrapStates, + flush persist.DataFlush) error { // NB(rartoul): This value can be used for emitting metrics, but should not be used // for business logic. callStart := n.nowFn() @@ -909,6 +918,19 @@ func (n *dbNamespace) Snapshot(blockStart, snapshotTime time.Time, flush persist continue } + // We don't need to perform this check for correctness, but we apply the same logic + // here as we do in the Flush() method so that we don't end up snapshotting a bunch + // of shards/blocks that would have been flushed after the next tick. + shardBootstrapStateBeforeTick, ok := shardBootstrapStatesAtTickStart[shard.ID()] + if !ok || shardBootstrapStateBeforeTick != Bootstrapped { + n.log. + WithFields(xlog.NewField("shard", shard.ID())). + WithFields(xlog.NewField("bootstrapStateBeforeTick", shardBootstrapStateBeforeTick)). + WithFields(xlog.NewField("bootstrapStateExists", ok)). + Debug("skipping snapshot due to shard bootstrap state before tick") + continue + } + err := shard.Snapshot(blockStart, snapshotTime, flush) if err != nil { detailedErr := fmt.Errorf("shard %d failed to snapshot: %v", shard.ID(), err) diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 4395768797..bb850308e2 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -405,20 +405,21 @@ func TestNamespaceFlushSkipShardNotBootstrappedBeforeTick(t *testing.T) { blockStart := time.Now().Truncate(ns.Options().RetentionOptions().BlockSize()) shard := NewMockdatabaseShard(ctrl) - shard.EXPECT().ID().Return(testShardIDs[0].ID()) + shard.EXPECT().ID().Return(testShardIDs[0].ID()).AnyTimes() ns.shards[testShardIDs[0].ID()] = shard - ShardBootstrapStates := ShardBootstrapStates{} - ShardBootstrapStates[testShardIDs[0].ID()] = Bootstrapping + shardBootstrapStates := ShardBootstrapStates{} + shardBootstrapStates[testShardIDs[0].ID()] = Bootstrapping - require.NoError(t, ns.Flush(blockStart, ShardBootstrapStates, nil)) + require.NoError(t, ns.Flush(blockStart, shardBootstrapStates, nil)) } type snapshotTestCase struct { - isSnapshotting bool - expectSnapshot bool - lastSnapshotTime func(blockStart time.Time, blockSize time.Duration) time.Time - snapshotErr error + isSnapshotting bool + expectSnapshot bool + shardBootstrapStateBeforeTick BootstrapState + lastSnapshotTime func(blockStart time.Time, blockSize time.Duration) time.Time + shardSnapshotErr error } func TestNamespaceSnapshotNotBootstrapped(t *testing.T) { @@ -435,26 +436,28 @@ func TestNamespaceSnapshotNotBootstrapped(t *testing.T) { blockSize := ns.Options().RetentionOptions().BlockSize() blockStart := time.Now().Truncate(blockSize) - require.Equal(t, errNamespaceNotBootstrapped, ns.Snapshot(blockStart, blockStart, nil)) + require.Equal(t, errNamespaceNotBootstrapped, ns.Snapshot(blockStart, blockStart, nil, nil)) } func TestNamespaceSnapshotNotEnoughTimeSinceLastSnapshot(t *testing.T) { shardMethodResults := []snapshotTestCase{ snapshotTestCase{ - isSnapshotting: false, - expectSnapshot: false, + isSnapshotting: false, + expectSnapshot: false, + shardBootstrapStateBeforeTick: Bootstrapped, lastSnapshotTime: func(curr time.Time, blockSize time.Duration) time.Time { return curr }, - snapshotErr: nil, + shardSnapshotErr: nil, }, snapshotTestCase{ - isSnapshotting: false, - expectSnapshot: true, + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, lastSnapshotTime: func(curr time.Time, blockSize time.Duration) time.Time { return curr.Add(-2 * defaultMinSnapshotInterval) }, - snapshotErr: nil, + shardSnapshotErr: nil, }, } require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) @@ -462,28 +465,70 @@ func TestNamespaceSnapshotNotEnoughTimeSinceLastSnapshot(t *testing.T) { func TestNamespaceSnapshotShardIsSnapshotting(t *testing.T) { shardMethodResults := []snapshotTestCase{ - snapshotTestCase{isSnapshotting: false, snapshotErr: nil, expectSnapshot: true}, - snapshotTestCase{isSnapshotting: true, snapshotErr: nil, expectSnapshot: false}, + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, + shardSnapshotErr: nil, + }, + snapshotTestCase{ + isSnapshotting: true, + expectSnapshot: false, + shardBootstrapStateBeforeTick: Bootstrapped, + shardSnapshotErr: nil, + }, } require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) } func TestNamespaceSnapshotAllShardsSuccess(t *testing.T) { shardMethodResults := []snapshotTestCase{ - snapshotTestCase{isSnapshotting: false, snapshotErr: nil, expectSnapshot: true}, - snapshotTestCase{isSnapshotting: false, snapshotErr: nil, expectSnapshot: true}, + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, + shardSnapshotErr: nil, + }, + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, + shardSnapshotErr: nil, + }, } require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) } func TestNamespaceSnapshotShardError(t *testing.T) { shardMethodResults := []snapshotTestCase{ - snapshotTestCase{isSnapshotting: false, snapshotErr: nil, expectSnapshot: true}, - snapshotTestCase{isSnapshotting: false, snapshotErr: errors.New("err"), expectSnapshot: true}, + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, + shardSnapshotErr: nil, + }, + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: true, + shardBootstrapStateBeforeTick: Bootstrapped, + shardSnapshotErr: errors.New("err"), + }, } require.Error(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) } +func TestNamespaceSnapshotShardNotBootstrappedBeforeTick(t *testing.T) { + shardMethodResults := []snapshotTestCase{ + snapshotTestCase{ + isSnapshotting: false, + expectSnapshot: false, + shardBootstrapStateBeforeTick: Bootstrapping, + shardSnapshotErr: nil, + }, + } + require.NoError(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults)) +} + func testSnapshotWithShardSnapshotErrs(t *testing.T, shardMethodResults []snapshotTestCase) error { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -499,8 +544,12 @@ func testSnapshotWithShardSnapshotErrs(t *testing.T, shardMethodResults []snapsh ns.nowFn = func() time.Time { return now } - blockSize := ns.Options().RetentionOptions().BlockSize() - blockStart := now.Truncate(blockSize) + + var ( + shardBootstrapStates = ShardBootstrapStates{} + blockSize = ns.Options().RetentionOptions().BlockSize() + blockStart = now.Truncate(blockSize) + ) for i, tc := range shardMethodResults { shard := NewMockdatabaseShard(ctrl) @@ -511,14 +560,16 @@ func testSnapshotWithShardSnapshotErrs(t *testing.T, shardMethodResults []snapsh lastSnapshotTime = tc.lastSnapshotTime(now, blockSize) } shard.EXPECT().SnapshotState().Return(tc.isSnapshotting, lastSnapshotTime) + shardID := uint32(i) shard.EXPECT().ID().Return(uint32(i)).AnyTimes() if tc.expectSnapshot { - shard.EXPECT().Snapshot(blockStart, now, nil).Return(tc.snapshotErr) + shard.EXPECT().Snapshot(blockStart, now, nil).Return(tc.shardSnapshotErr) } ns.shards[testShardIDs[i].ID()] = shard + shardBootstrapStates[shardID] = tc.shardBootstrapStateBeforeTick } - return ns.Snapshot(blockStart, now, nil) + return ns.Snapshot(blockStart, now, shardBootstrapStates, nil) } func TestNamespaceTruncate(t *testing.T) { diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index a70330f9f1..ec9c87fda8 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -930,15 +930,15 @@ func (mr *MockdatabaseNamespaceMockRecorder) FlushIndex(flush interface{}) *gomo } // Snapshot mocks base method -func (m *MockdatabaseNamespace) Snapshot(blockStart, snapshotTime time.Time, flush persist.DataFlush) error { - ret := m.ctrl.Call(m, "Snapshot", blockStart, snapshotTime, flush) +func (m *MockdatabaseNamespace) Snapshot(blockStart, snapshotTime time.Time, shardBootstrapStatesAtTickStart ShardBootstrapStates, flush persist.DataFlush) error { + ret := m.ctrl.Call(m, "Snapshot", blockStart, snapshotTime, shardBootstrapStatesAtTickStart, flush) ret0, _ := ret[0].(error) return ret0 } // Snapshot indicates an expected call of Snapshot -func (mr *MockdatabaseNamespaceMockRecorder) Snapshot(blockStart, snapshotTime, flush interface{}) *gomock.Call { - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Snapshot", reflect.TypeOf((*MockdatabaseNamespace)(nil).Snapshot), blockStart, snapshotTime, flush) +func (mr *MockdatabaseNamespaceMockRecorder) Snapshot(blockStart, snapshotTime, shardBootstrapStatesAtTickStart, flush interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Snapshot", reflect.TypeOf((*MockdatabaseNamespace)(nil).Snapshot), blockStart, snapshotTime, shardBootstrapStatesAtTickStart, flush) } // NeedsFlush mocks base method diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index df41cefcf0..9efceccf21 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -282,7 +282,12 @@ type databaseNamespace interface { ) error // Snapshot snapshots unflushed in-memory data - Snapshot(blockStart, snapshotTime time.Time, flush persist.DataFlush) error + Snapshot( + blockStart, + snapshotTime time.Time, + shardBootstrapStatesAtTickStart ShardBootstrapStates, + flush persist.DataFlush, + ) error // NeedsFlush returns true if the namespace needs a flush for the // period: [start, end] (both inclusive).