diff --git a/src/dbnode/integration/bootstrap_helpers.go b/src/dbnode/integration/bootstrap_helpers.go index 26b6da9813..3f11b2d8d0 100644 --- a/src/dbnode/integration/bootstrap_helpers.go +++ b/src/dbnode/integration/bootstrap_helpers.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/x/context" + "github.com/stretchr/testify/require" ) diff --git a/src/dbnode/integration/commitlog_bootstrap_with_snapshots_after_restart_test.go b/src/dbnode/integration/commitlog_bootstrap_with_snapshots_after_restart_test.go index a7cde08d48..b0756493b8 100644 --- a/src/dbnode/integration/commitlog_bootstrap_with_snapshots_after_restart_test.go +++ b/src/dbnode/integration/commitlog_bootstrap_with_snapshots_after_restart_test.go @@ -32,6 +32,8 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/retention" + xclock "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/require" @@ -53,7 +55,6 @@ func TestCommitLogBootstrapWithSnapshotsAfterRestart(t *testing.T) { require.NoError(t, err) opts := NewTestOptions(t). SetNamespaces([]namespace.Metadata{ns}). - // Make tick interval short enough to sleep on. SetTickMinimumInterval(100 * time.Millisecond) setup, err := NewTestSetup(t, opts, nil) @@ -95,9 +96,19 @@ func TestCommitLogBootstrapWithSnapshotsAfterRestart(t *testing.T) { require.NoError(t, setup.WriteBatch(testNamespaces[0], testData)) } - // Sleep to allow snapshotting to occur. - setup.SleepFor10xTickMinimumInterval() - setup.SleepFor10xTickMinimumInterval() + // Wait until snapshots are on disk. + fsOpts := commitLogOpts.FilesystemOptions() + expectedNumSeries := 0 + for _, data := range inputData { + expectedNumSeries += len(data.IDs) + } + xclock.WaitUntil(func() bool { + var totalNumEntries int + for _, numEntries := range getNumEntriesPerBlockStart(ns.ID(), opts.NumShards(), fsOpts) { + totalNumEntries += numEntries + } + return totalNumEntries == expectedNumSeries + }, time.Minute) // Stop and restart server to allow bootstrapping from commit logs. require.NoError(t, setup.StopServer()) @@ -111,45 +122,57 @@ func TestCommitLogBootstrapWithSnapshotsAfterRestart(t *testing.T) { require.NoError(t, setup.StartServer()) log.Debug("server restarted") - // Verify that data is what we expect + // Verify that data is what we expect. metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize)) observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns, metadatasByShard) verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps) - // Sleep to allow snapshotting to occur again. This time these should be empty filesets since - // we haven't written any data since the last snapshot. - setup.SleepFor10xTickMinimumInterval() + // Wait until empty snapshots are on disk. + xclock.WaitUntil(func() bool { + var totalNumEntries int + for _, numEntries := range getNumEntriesPerBlockStart(ns.ID(), opts.NumShards(), fsOpts) { + totalNumEntries += numEntries + } + return totalNumEntries == 0 + }, time.Minute) - fsOpts := commitLogOpts.FilesystemOptions() - for shard := 0; shard < opts.NumShards(); shard++ { + // Verify that data is still what we expect. + metadatasByShard = testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize)) + observedSeriesMaps = testSetupToSeriesMaps(t, setup, ns, metadatasByShard) + verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps) +} + +func getNumEntriesPerBlockStart( + nsID ident.ID, + numShards int, + fsOpts fs.Options, +) map[xtime.UnixNano]int { + numEntriesPerBlockStart := make(map[xtime.UnixNano]int) + for shard := 0; shard < numShards; shard++ { infoFiles := fs.ReadInfoFiles( fsOpts.FilePathPrefix(), - ns.ID(), + nsID, uint32(shard), fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions(), persist.FileSetSnapshotType, ) // Grab the latest snapshot file for each blockstart. - latestSnapshotInfoPerBlockStart := make(map[int64]schema.IndexInfo) + latestSnapshotInfoPerBlockStart := make(map[xtime.UnixNano]schema.IndexInfo) for _, f := range infoFiles { - info, ok := latestSnapshotInfoPerBlockStart[f.Info.BlockStart] + info, ok := latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] if !ok { - latestSnapshotInfoPerBlockStart[f.Info.BlockStart] = f.Info + latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = f.Info continue } if f.Info.VolumeIndex > info.VolumeIndex { - latestSnapshotInfoPerBlockStart[f.Info.BlockStart] = f.Info + latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = f.Info } } - for _, info := range latestSnapshotInfoPerBlockStart { - require.Equal(t, 0, int(info.Entries)) + for blockStart, info := range latestSnapshotInfoPerBlockStart { + numEntriesPerBlockStart[blockStart] += int(info.Entries) } } - - // Verify that data is still what we expect - metadatasByShard = testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize)) - observedSeriesMaps = testSetupToSeriesMaps(t, setup, ns, metadatasByShard) - verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps) + return numEntriesPerBlockStart }