Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Background cold flush process #2508

Merged
merged 29 commits into from
Aug 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
688d956
Background cold flush process.
notbdu Jul 14, 2020
85dbbd1
Add a TODO.
notbdu Jul 14, 2020
d01a98d
Separate out cold flush into its own process.
notbdu Jul 16, 2020
d7a5e0e
Determine if snapshots are warm/cold. Update mocks.
notbdu Jul 16, 2020
efa8ce6
Cold flush needs its own persist manager.
notbdu Jul 16, 2020
5aa6bd0
Formatting.
notbdu Jul 16, 2020
b7829f9
Remove cleanup mgr from fs mgr.
notbdu Jul 16, 2020
467c7ef
Separate out cleanup logic.
notbdu Jul 16, 2020
81545e0
Update disk snapshot integration test and mocks.
notbdu Jul 17, 2020
9dd1713
Fix unit test.
notbdu Jul 17, 2020
bba3beb
Terminate early and cache empty fileset on disk state.
notbdu Jul 22, 2020
8b5c75e
Check individual block starts for buffer data.
notbdu Jul 22, 2020
7038739
Fix unit test.
notbdu Jul 23, 2020
c5e0f6b
Add timers for granular view of phases of flush duration.
notbdu Jul 23, 2020
699874f
Fix tests.
notbdu Jul 23, 2020
cdfcfc1
Cleanup coldflush manager and add a test.
notbdu Jul 23, 2020
79a2f92
Update cold flush tests.
notbdu Jul 24, 2020
73b6733
Address PR comments.
notbdu Jul 28, 2020
c3eedc5
Address PR comments.
notbdu Jul 29, 2020
031de16
Add test for bootstrapping from snapshots for all block starts.
notbdu Aug 2, 2020
43e7132
Verify that the data is consistent after cold flushing.
notbdu Aug 2, 2020
aec4856
Update mocks
notbdu Aug 3, 2020
8e14316
Refactor bootstrapper initializing logic in integration tests.
notbdu Aug 4, 2020
6f69086
Make commit log bootstrapper bootstrap the entire original target ran…
notbdu Aug 4, 2020
66d8c76
Do not shortcircuit bootstrapping in base bootstrapper. Short circuit…
notbdu Aug 5, 2020
a68e4c2
Pass only target shard ranges to all required. Use copies of shard ra…
notbdu Aug 5, 2020
e96b5ca
Next bootstrapper gets called in all cases now even in the empty rang…
notbdu Aug 5, 2020
b26631c
Wait on a more deterministic condition (checking snapshot filesets on…
notbdu Aug 6, 2020
5f21c6c
Add comment
notbdu Aug 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 8 additions & 29 deletions src/dbnode/integration/bootstrap_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ import (
"testing"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/x/context"

Expand Down Expand Up @@ -140,30 +137,12 @@ func (t testBootstrapperSource) String() string {
}

func setupCommitLogBootstrapperWithFSInspection(
t *testing.T, setup TestSetup, commitLogOpts commitlog.Options) {
noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider()
bsOpts := newDefaulTestResultOptions(setup.StorageOpts())
bclOpts := bcl.NewOptions().
SetResultOptions(bsOpts).
SetCommitLogOptions(commitLogOpts).
SetRuntimeOptionsManager(runtime.NewOptionsManager())
fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions()
bs, err := bcl.NewCommitLogBootstrapperProvider(
bclOpts, mustInspectFilesystem(fsOpts), noOpAll)
require.NoError(t, err)
processOpts := bootstrap.NewProcessOptions().
SetTopologyMapProvider(setup).
SetOrigin(setup.Origin())
process, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts)
require.NoError(t, err)
setup.SetStorageOpts(setup.StorageOpts().SetBootstrapProcessProvider(process))
}

func mustInspectFilesystem(fsOpts fs.Options) fs.Inspection {
inspection, err := fs.InspectFilesystem(fsOpts)
if err != nil {
panic(err)
}

return inspection
t *testing.T,
setup TestSetup,
commitLogOpts commitlog.Options,
) {
require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{
CommitLogOptions: commitLogOpts,
WithCommitLog: true,
}))
}
43 changes: 5 additions & 38 deletions src/dbnode/integration/commitlog_bootstrap_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@ import (

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
persistfs "github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -118,38 +112,11 @@ func TestCommitLogAndFSMergeBootstrap(t *testing.T) {
}
writeCommitLogData(t, setup, commitLogOpts, commitlogSeriesMaps, ns1, false)

// commit log bootstrapper (must be after writing out commitlog files so inspection finds files)
noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider()
bsOpts := newDefaulTestResultOptions(setup.StorageOpts())
bclOpts := bcl.NewOptions().
SetResultOptions(bsOpts).
SetCommitLogOptions(commitLogOpts).
SetRuntimeOptionsManager(runtime.NewOptionsManager())
fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions()

commitLogBootstrapper, err := bcl.NewCommitLogBootstrapperProvider(
bclOpts, mustInspectFilesystem(fsOpts), noOpAll)
require.NoError(t, err)
// fs bootstrapper
persistMgr, err := persistfs.NewPersistManager(fsOpts)
require.NoError(t, err)
storageIdxOpts := setup.StorageOpts().IndexOptions()
bfsOpts := fs.NewOptions().
SetResultOptions(bsOpts).
SetFilesystemOptions(fsOpts).
SetIndexOptions(storageIdxOpts).
SetPersistManager(persistMgr).
SetCompactor(newCompactor(t, storageIdxOpts))
fsBootstrapper, err := fs.NewFileSystemBootstrapperProvider(bfsOpts, commitLogBootstrapper)
require.NoError(t, err)
// bootstrapper storage opts
processOpts := bootstrap.NewProcessOptions().
SetTopologyMapProvider(setup).
SetOrigin(setup.Origin())
process, err := bootstrap.NewProcessProvider(
fsBootstrapper, processOpts, bsOpts)
require.NoError(t, err)
setup.SetStorageOpts(setup.StorageOpts().SetBootstrapProcessProvider(process))
require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{
CommitLogOptions: commitLogOpts,
WithCommitLog: true,
WithFileSystem: true,
}))

log.Info("moving time forward and starting server")
setup.SetNowFn(t3)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// +build integration

// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/retention"
xclock "github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
)

func TestCommitLogBootstrapWithSnapshotsAfterRestart(t *testing.T) {
if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
}

// Test setup
var (
ropts = retention.NewOptions().SetRetentionPeriod(12 * time.Hour)
blockSize = ropts.BlockSize()
)
ns, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions().
SetRetentionOptions(ropts).
SetColdWritesEnabled(true))
require.NoError(t, err)
opts := NewTestOptions(t).
SetNamespaces([]namespace.Metadata{ns}).
SetTickMinimumInterval(100 * time.Millisecond)

setup, err := NewTestSetup(t, opts, nil)
require.NoError(t, err)
defer setup.Close()

commitLogOpts := setup.StorageOpts().CommitLogOptions().
SetFlushInterval(defaultIntegrationTestFlushInterval)
setup.SetStorageOpts(setup.StorageOpts().
SetCommitLogOptions(commitLogOpts).
SetMediatorTickInterval(50 * time.Millisecond))

log := setup.StorageOpts().InstrumentOptions().Logger()
log.Info("commit log bootstrap with snapshots after restart test")

// Start the server with filesystem bootstrapper
require.NoError(t, setup.StartServer())
log.Debug("server is now up")

// Stop the server
defer func() {
require.NoError(t, setup.StopServer())
log.Debug("server is now down")
}()

// Write test data
log.Info("writing test data")
now := setup.NowFn()().Truncate(blockSize)
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{IDs: []string{"foo", "bar"}, NumPoints: 50, Start: now.Add(-5 * blockSize)},
{IDs: []string{"foo", "qux"}, NumPoints: 50, Start: now.Add(-4 * blockSize)},
{IDs: []string{"qux", "quux"}, NumPoints: 50, Start: now.Add(-3 * blockSize)},
{IDs: []string{"corge", "porgie"}, NumPoints: 50, Start: now.Add(-2 * blockSize)},
}
for _, input := range inputData {
testData := generate.Block(input)
seriesMaps[xtime.ToUnixNano(input.Start)] = testData
require.NoError(t, setup.WriteBatch(testNamespaces[0], testData))
}

// Wait until snapshots are on disk.
fsOpts := commitLogOpts.FilesystemOptions()
expectedNumSeries := 0
for _, data := range inputData {
expectedNumSeries += len(data.IDs)
}
xclock.WaitUntil(func() bool {
var totalNumEntries int
for _, numEntries := range getNumEntriesPerBlockStart(ns.ID(), opts.NumShards(), fsOpts) {
totalNumEntries += numEntries
}
return totalNumEntries == expectedNumSeries
}, time.Minute)

// Stop and restart server to allow bootstrapping from commit logs.
require.NoError(t, setup.StopServer())
// Setup commitlog bootstrapper after writing data so filesystem inspection can find it.
require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{
CommitLogOptions: commitLogOpts,
WithCommitLog: true,
// Also setup fs bootstrapper to be ensure correct behaviour on restart w/ fs bootstrapper enabled.
WithFileSystem: true,
}))
require.NoError(t, setup.StartServer())
log.Debug("server restarted")

// Verify that data is what we expect.
metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize))
observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns, metadatasByShard)
verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps)

// Wait until empty snapshots are on disk.
xclock.WaitUntil(func() bool {
var totalNumEntries int
for _, numEntries := range getNumEntriesPerBlockStart(ns.ID(), opts.NumShards(), fsOpts) {
totalNumEntries += numEntries
}
return totalNumEntries == 0
}, time.Minute)

// Verify that data is still what we expect.
metadatasByShard = testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize))
observedSeriesMaps = testSetupToSeriesMaps(t, setup, ns, metadatasByShard)
verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps)
}

func getNumEntriesPerBlockStart(
nsID ident.ID,
numShards int,
fsOpts fs.Options,
) map[xtime.UnixNano]int {
numEntriesPerBlockStart := make(map[xtime.UnixNano]int)
for shard := 0; shard < numShards; shard++ {
infoFiles := fs.ReadInfoFiles(
fsOpts.FilePathPrefix(),
nsID,
uint32(shard),
fsOpts.InfoReaderBufferSize(),
fsOpts.DecodingOptions(),
persist.FileSetSnapshotType,
)
// Grab the latest snapshot file for each blockstart.
latestSnapshotInfoPerBlockStart := make(map[xtime.UnixNano]schema.IndexInfo)
for _, f := range infoFiles {
info, ok := latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)]
if !ok {
latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = f.Info
continue
}

if f.Info.VolumeIndex > info.VolumeIndex {
latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = f.Info
}
}
for blockStart, info := range latestSnapshotInfoPerBlockStart {
numEntriesPerBlockStart[blockStart] += int(info.Entries)
}
}
return numEntriesPerBlockStart
}
21 changes: 16 additions & 5 deletions src/dbnode/integration/commitlog_bootstrap_with_snapshots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ func testCommitLogBootstrapWithSnapshots(t *testing.T, setTestOpts setTestOption
ropts = retention.NewOptions().SetRetentionPeriod(12 * time.Hour)
blockSize = ropts.BlockSize()
)
ns1, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions().SetRetentionOptions(ropts))
ns1, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions().
SetRetentionOptions(ropts).
SetColdWritesEnabled(true))
require.NoError(t, err)
ns2, err := namespace.NewMetadata(testNamespaces[1], namespace.NewOptions().SetRetentionOptions(ropts))
ns2, err := namespace.NewMetadata(testNamespaces[1], namespace.NewOptions().
SetRetentionOptions(ropts).
SetColdWritesEnabled(true))
require.NoError(t, err)
opts := NewTestOptions(t).
SetNamespaces([]namespace.Metadata{ns1, ns2})
Expand All @@ -79,7 +83,14 @@ func testCommitLogBootstrapWithSnapshots(t *testing.T, setTestOpts setTestOption
log.Info("generating data")
var (
now = setup.NowFn()().Truncate(blockSize)
seriesMaps = generateSeriesMaps(30, updateInputConfig, now.Add(-2*blockSize), now.Add(-blockSize))
seriesMaps = generateSeriesMaps(
100,
updateInputConfig,
now.Add(-4*blockSize),
now.Add(-3*blockSize),
now.Add(-2*blockSize),
now.Add(-blockSize),
)
)
log.Info("writing data")

Expand Down Expand Up @@ -133,14 +144,14 @@ func testCommitLogBootstrapWithSnapshots(t *testing.T, setTestOpts setTestOption

// Verify in-memory data match what we expect - all writes from seriesMaps
// should be present
metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-2*blockSize), now)
metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-4*blockSize), now)
observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns1, metadatasByShard)
verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps)

// Verify in-memory data match what we expect - no writes should be present
// because we didn't issue any writes for this namespaces
emptySeriesMaps := make(generate.SeriesBlocksByStart)
metadatasByShard2 := testSetupMetadatas(t, setup, testNamespaces[1], now.Add(-2*blockSize), now)
metadatasByShard2 := testSetupMetadatas(t, setup, testNamespaces[1], now.Add(-4*blockSize), now)
observedSeriesMaps2 := testSetupToSeriesMaps(t, setup, ns2, metadatasByShard2)
verifySeriesMapsEqual(t, emptySeriesMaps, observedSeriesMaps2)

Expand Down
13 changes: 10 additions & 3 deletions src/dbnode/integration/disk_flush_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/m3db/m3/src/x/ident/testutil"
xtime "github.com/m3db/m3/src/x/time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -84,7 +85,8 @@ func waitUntilSnapshotFilesFlushed(
namespace ident.ID,
expectedSnapshots []snapshotID,
timeout time.Duration,
) error {
) (uuid.UUID, error) {
var snapshotID uuid.UUID
dataFlushed := func() bool {
for _, shard := range shardSet.AllIDs() {
for _, e := range expectedSnapshots {
Expand All @@ -102,14 +104,19 @@ func waitUntilSnapshotFilesFlushed(
if !(latest.ID.VolumeIndex >= e.minVolume) {
return false
}

_, snapshotID, err = latest.SnapshotTimeAndID()
if err != nil {
panic(err)
}
}
}
return true
}
if waitUntil(dataFlushed, timeout) {
return nil
return snapshotID, nil
}
return errDiskFlushTimedOut
return snapshotID, errDiskFlushTimedOut
}

func waitUntilDataFilesFlushed(
Expand Down
Loading