Skip to content

Commit

Permalink
Wait on a more deterministic condition (checking snapshot filesets on…
Browse files Browse the repository at this point in the history
… disk).
  • Loading branch information
notbdu committed Aug 6, 2020
1 parent e96b5ca commit b26631c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/dbnode/integration/bootstrap_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/x/context"

"github.com/stretchr/testify/require"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/retention"
xclock "github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
Expand All @@ -53,7 +55,6 @@ func TestCommitLogBootstrapWithSnapshotsAfterRestart(t *testing.T) {
require.NoError(t, err)
opts := NewTestOptions(t).
SetNamespaces([]namespace.Metadata{ns}).
// Make tick interval short enough to sleep on.
SetTickMinimumInterval(100 * time.Millisecond)

setup, err := NewTestSetup(t, opts, nil)
Expand Down Expand Up @@ -95,9 +96,19 @@ func TestCommitLogBootstrapWithSnapshotsAfterRestart(t *testing.T) {
require.NoError(t, setup.WriteBatch(testNamespaces[0], testData))
}

// Sleep to allow snapshotting to occur.
setup.SleepFor10xTickMinimumInterval()
setup.SleepFor10xTickMinimumInterval()
// Wait until snapshots are on disk.
fsOpts := commitLogOpts.FilesystemOptions()
expectedNumSeries := 0
for _, data := range inputData {
expectedNumSeries += len(data.IDs)
}
xclock.WaitUntil(func() bool {
var totalNumEntries int
for _, numEntries := range getNumEntriesPerBlockStart(ns.ID(), opts.NumShards(), fsOpts) {
totalNumEntries += numEntries
}
return totalNumEntries == expectedNumSeries
}, time.Minute)

// Stop and restart server to allow bootstrapping from commit logs.
require.NoError(t, setup.StopServer())
Expand All @@ -111,45 +122,57 @@ func TestCommitLogBootstrapWithSnapshotsAfterRestart(t *testing.T) {
require.NoError(t, setup.StartServer())
log.Debug("server restarted")

// Verify that data is what we expect
// Verify that data is what we expect.
metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize))
observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns, metadatasByShard)
verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps)

// Sleep to allow snapshotting to occur again. This time these should be empty filesets since
// we haven't written any data since the last snapshot.
setup.SleepFor10xTickMinimumInterval()
// Wait until empty snapshots are on disk.
xclock.WaitUntil(func() bool {
var totalNumEntries int
for _, numEntries := range getNumEntriesPerBlockStart(ns.ID(), opts.NumShards(), fsOpts) {
totalNumEntries += numEntries
}
return totalNumEntries == 0
}, time.Minute)

fsOpts := commitLogOpts.FilesystemOptions()
for shard := 0; shard < opts.NumShards(); shard++ {
// Verify that data is still what we expect.
metadatasByShard = testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize))
observedSeriesMaps = testSetupToSeriesMaps(t, setup, ns, metadatasByShard)
verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps)
}

func getNumEntriesPerBlockStart(
nsID ident.ID,
numShards int,
fsOpts fs.Options,
) map[xtime.UnixNano]int {
numEntriesPerBlockStart := make(map[xtime.UnixNano]int)
for shard := 0; shard < numShards; shard++ {
infoFiles := fs.ReadInfoFiles(
fsOpts.FilePathPrefix(),
ns.ID(),
nsID,
uint32(shard),
fsOpts.InfoReaderBufferSize(),
fsOpts.DecodingOptions(),
persist.FileSetSnapshotType,
)
// Grab the latest snapshot file for each blockstart.
latestSnapshotInfoPerBlockStart := make(map[int64]schema.IndexInfo)
latestSnapshotInfoPerBlockStart := make(map[xtime.UnixNano]schema.IndexInfo)
for _, f := range infoFiles {
info, ok := latestSnapshotInfoPerBlockStart[f.Info.BlockStart]
info, ok := latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)]
if !ok {
latestSnapshotInfoPerBlockStart[f.Info.BlockStart] = f.Info
latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = f.Info
continue
}

if f.Info.VolumeIndex > info.VolumeIndex {
latestSnapshotInfoPerBlockStart[f.Info.BlockStart] = f.Info
latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = f.Info
}
}
for _, info := range latestSnapshotInfoPerBlockStart {
require.Equal(t, 0, int(info.Entries))
for blockStart, info := range latestSnapshotInfoPerBlockStart {
numEntriesPerBlockStart[blockStart] += int(info.Entries)
}
}

// Verify that data is still what we expect
metadatasByShard = testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize))
observedSeriesMaps = testSetupToSeriesMaps(t, setup, ns, metadatasByShard)
verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps)
return numEntriesPerBlockStart
}

0 comments on commit b26631c

Please sign in to comment.