Skip to content

Commit

Permalink
[dbnode] Background cold flush process (#2508)
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu authored Aug 6, 2020
1 parent 975b99c commit a801229
Show file tree
Hide file tree
Showing 53 changed files with 1,407 additions and 713 deletions.
37 changes: 8 additions & 29 deletions src/dbnode/integration/bootstrap_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ import (
"testing"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/result"
"github.com/m3db/m3/src/x/context"

Expand Down Expand Up @@ -140,30 +137,12 @@ func (t testBootstrapperSource) String() string {
}

func setupCommitLogBootstrapperWithFSInspection(
t *testing.T, setup TestSetup, commitLogOpts commitlog.Options) {
noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider()
bsOpts := newDefaulTestResultOptions(setup.StorageOpts())
bclOpts := bcl.NewOptions().
SetResultOptions(bsOpts).
SetCommitLogOptions(commitLogOpts).
SetRuntimeOptionsManager(runtime.NewOptionsManager())
fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions()
bs, err := bcl.NewCommitLogBootstrapperProvider(
bclOpts, mustInspectFilesystem(fsOpts), noOpAll)
require.NoError(t, err)
processOpts := bootstrap.NewProcessOptions().
SetTopologyMapProvider(setup).
SetOrigin(setup.Origin())
process, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts)
require.NoError(t, err)
setup.SetStorageOpts(setup.StorageOpts().SetBootstrapProcessProvider(process))
}

func mustInspectFilesystem(fsOpts fs.Options) fs.Inspection {
inspection, err := fs.InspectFilesystem(fsOpts)
if err != nil {
panic(err)
}

return inspection
t *testing.T,
setup TestSetup,
commitLogOpts commitlog.Options,
) {
require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{
CommitLogOptions: commitLogOpts,
WithCommitLog: true,
}))
}
43 changes: 5 additions & 38 deletions src/dbnode/integration/commitlog_bootstrap_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@ import (

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
persistfs "github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper"
bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog"
"github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -118,38 +112,11 @@ func TestCommitLogAndFSMergeBootstrap(t *testing.T) {
}
writeCommitLogData(t, setup, commitLogOpts, commitlogSeriesMaps, ns1, false)

// commit log bootstrapper (must be after writing out commitlog files so inspection finds files)
noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider()
bsOpts := newDefaulTestResultOptions(setup.StorageOpts())
bclOpts := bcl.NewOptions().
SetResultOptions(bsOpts).
SetCommitLogOptions(commitLogOpts).
SetRuntimeOptionsManager(runtime.NewOptionsManager())
fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions()

commitLogBootstrapper, err := bcl.NewCommitLogBootstrapperProvider(
bclOpts, mustInspectFilesystem(fsOpts), noOpAll)
require.NoError(t, err)
// fs bootstrapper
persistMgr, err := persistfs.NewPersistManager(fsOpts)
require.NoError(t, err)
storageIdxOpts := setup.StorageOpts().IndexOptions()
bfsOpts := fs.NewOptions().
SetResultOptions(bsOpts).
SetFilesystemOptions(fsOpts).
SetIndexOptions(storageIdxOpts).
SetPersistManager(persistMgr).
SetCompactor(newCompactor(t, storageIdxOpts))
fsBootstrapper, err := fs.NewFileSystemBootstrapperProvider(bfsOpts, commitLogBootstrapper)
require.NoError(t, err)
// bootstrapper storage opts
processOpts := bootstrap.NewProcessOptions().
SetTopologyMapProvider(setup).
SetOrigin(setup.Origin())
process, err := bootstrap.NewProcessProvider(
fsBootstrapper, processOpts, bsOpts)
require.NoError(t, err)
setup.SetStorageOpts(setup.StorageOpts().SetBootstrapProcessProvider(process))
require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{
CommitLogOptions: commitLogOpts,
WithCommitLog: true,
WithFileSystem: true,
}))

log.Info("moving time forward and starting server")
setup.SetNowFn(t3)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// +build integration

// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/retention"
xclock "github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
)

func TestCommitLogBootstrapWithSnapshotsAfterRestart(t *testing.T) {
if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
}

// Test setup
var (
ropts = retention.NewOptions().SetRetentionPeriod(12 * time.Hour)
blockSize = ropts.BlockSize()
)
ns, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions().
SetRetentionOptions(ropts).
SetColdWritesEnabled(true))
require.NoError(t, err)
opts := NewTestOptions(t).
SetNamespaces([]namespace.Metadata{ns}).
SetTickMinimumInterval(100 * time.Millisecond)

setup, err := NewTestSetup(t, opts, nil)
require.NoError(t, err)
defer setup.Close()

commitLogOpts := setup.StorageOpts().CommitLogOptions().
SetFlushInterval(defaultIntegrationTestFlushInterval)
setup.SetStorageOpts(setup.StorageOpts().
SetCommitLogOptions(commitLogOpts).
SetMediatorTickInterval(50 * time.Millisecond))

log := setup.StorageOpts().InstrumentOptions().Logger()
log.Info("commit log bootstrap with snapshots after restart test")

// Start the server with filesystem bootstrapper
require.NoError(t, setup.StartServer())
log.Debug("server is now up")

// Stop the server
defer func() {
require.NoError(t, setup.StopServer())
log.Debug("server is now down")
}()

// Write test data
log.Info("writing test data")
now := setup.NowFn()().Truncate(blockSize)
seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock)
inputData := []generate.BlockConfig{
{IDs: []string{"foo", "bar"}, NumPoints: 50, Start: now.Add(-5 * blockSize)},
{IDs: []string{"foo", "qux"}, NumPoints: 50, Start: now.Add(-4 * blockSize)},
{IDs: []string{"qux", "quux"}, NumPoints: 50, Start: now.Add(-3 * blockSize)},
{IDs: []string{"corge", "porgie"}, NumPoints: 50, Start: now.Add(-2 * blockSize)},
}
for _, input := range inputData {
testData := generate.Block(input)
seriesMaps[xtime.ToUnixNano(input.Start)] = testData
require.NoError(t, setup.WriteBatch(testNamespaces[0], testData))
}

// Wait until snapshots are on disk.
fsOpts := commitLogOpts.FilesystemOptions()
expectedNumSeries := 0
for _, data := range inputData {
expectedNumSeries += len(data.IDs)
}
xclock.WaitUntil(func() bool {
var totalNumEntries int
for _, numEntries := range getNumEntriesPerBlockStart(ns.ID(), opts.NumShards(), fsOpts) {
totalNumEntries += numEntries
}
return totalNumEntries == expectedNumSeries
}, time.Minute)

// Stop and restart server to allow bootstrapping from commit logs.
require.NoError(t, setup.StopServer())
// Setup commitlog bootstrapper after writing data so filesystem inspection can find it.
require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{
CommitLogOptions: commitLogOpts,
WithCommitLog: true,
// Also setup fs bootstrapper to be ensure correct behaviour on restart w/ fs bootstrapper enabled.
WithFileSystem: true,
}))
require.NoError(t, setup.StartServer())
log.Debug("server restarted")

// Verify that data is what we expect.
metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize))
observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns, metadatasByShard)
verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps)

// Wait until empty snapshots are on disk.
xclock.WaitUntil(func() bool {
var totalNumEntries int
for _, numEntries := range getNumEntriesPerBlockStart(ns.ID(), opts.NumShards(), fsOpts) {
totalNumEntries += numEntries
}
return totalNumEntries == 0
}, time.Minute)

// Verify that data is still what we expect.
metadatasByShard = testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize))
observedSeriesMaps = testSetupToSeriesMaps(t, setup, ns, metadatasByShard)
verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps)
}

func getNumEntriesPerBlockStart(
nsID ident.ID,
numShards int,
fsOpts fs.Options,
) map[xtime.UnixNano]int {
numEntriesPerBlockStart := make(map[xtime.UnixNano]int)
for shard := 0; shard < numShards; shard++ {
infoFiles := fs.ReadInfoFiles(
fsOpts.FilePathPrefix(),
nsID,
uint32(shard),
fsOpts.InfoReaderBufferSize(),
fsOpts.DecodingOptions(),
persist.FileSetSnapshotType,
)
// Grab the latest snapshot file for each blockstart.
latestSnapshotInfoPerBlockStart := make(map[xtime.UnixNano]schema.IndexInfo)
for _, f := range infoFiles {
info, ok := latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)]
if !ok {
latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = f.Info
continue
}

if f.Info.VolumeIndex > info.VolumeIndex {
latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = f.Info
}
}
for blockStart, info := range latestSnapshotInfoPerBlockStart {
numEntriesPerBlockStart[blockStart] += int(info.Entries)
}
}
return numEntriesPerBlockStart
}
21 changes: 16 additions & 5 deletions src/dbnode/integration/commitlog_bootstrap_with_snapshots_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ func testCommitLogBootstrapWithSnapshots(t *testing.T, setTestOpts setTestOption
ropts = retention.NewOptions().SetRetentionPeriod(12 * time.Hour)
blockSize = ropts.BlockSize()
)
ns1, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions().SetRetentionOptions(ropts))
ns1, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions().
SetRetentionOptions(ropts).
SetColdWritesEnabled(true))
require.NoError(t, err)
ns2, err := namespace.NewMetadata(testNamespaces[1], namespace.NewOptions().SetRetentionOptions(ropts))
ns2, err := namespace.NewMetadata(testNamespaces[1], namespace.NewOptions().
SetRetentionOptions(ropts).
SetColdWritesEnabled(true))
require.NoError(t, err)
opts := NewTestOptions(t).
SetNamespaces([]namespace.Metadata{ns1, ns2})
Expand All @@ -79,7 +83,14 @@ func testCommitLogBootstrapWithSnapshots(t *testing.T, setTestOpts setTestOption
log.Info("generating data")
var (
now = setup.NowFn()().Truncate(blockSize)
seriesMaps = generateSeriesMaps(30, updateInputConfig, now.Add(-2*blockSize), now.Add(-blockSize))
seriesMaps = generateSeriesMaps(
100,
updateInputConfig,
now.Add(-4*blockSize),
now.Add(-3*blockSize),
now.Add(-2*blockSize),
now.Add(-blockSize),
)
)
log.Info("writing data")

Expand Down Expand Up @@ -133,14 +144,14 @@ func testCommitLogBootstrapWithSnapshots(t *testing.T, setTestOpts setTestOption

// Verify in-memory data match what we expect - all writes from seriesMaps
// should be present
metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-2*blockSize), now)
metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-4*blockSize), now)
observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns1, metadatasByShard)
verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps)

// Verify in-memory data match what we expect - no writes should be present
// because we didn't issue any writes for this namespaces
emptySeriesMaps := make(generate.SeriesBlocksByStart)
metadatasByShard2 := testSetupMetadatas(t, setup, testNamespaces[1], now.Add(-2*blockSize), now)
metadatasByShard2 := testSetupMetadatas(t, setup, testNamespaces[1], now.Add(-4*blockSize), now)
observedSeriesMaps2 := testSetupToSeriesMaps(t, setup, ns2, metadatasByShard2)
verifySeriesMapsEqual(t, emptySeriesMaps, observedSeriesMaps2)

Expand Down
13 changes: 10 additions & 3 deletions src/dbnode/integration/disk_flush_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/m3db/m3/src/x/ident/testutil"
xtime "github.com/m3db/m3/src/x/time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -84,7 +85,8 @@ func waitUntilSnapshotFilesFlushed(
namespace ident.ID,
expectedSnapshots []snapshotID,
timeout time.Duration,
) error {
) (uuid.UUID, error) {
var snapshotID uuid.UUID
dataFlushed := func() bool {
for _, shard := range shardSet.AllIDs() {
for _, e := range expectedSnapshots {
Expand All @@ -102,14 +104,19 @@ func waitUntilSnapshotFilesFlushed(
if !(latest.ID.VolumeIndex >= e.minVolume) {
return false
}

_, snapshotID, err = latest.SnapshotTimeAndID()
if err != nil {
panic(err)
}
}
}
return true
}
if waitUntil(dataFlushed, timeout) {
return nil
return snapshotID, nil
}
return errDiskFlushTimedOut
return snapshotID, errDiskFlushTimedOut
}

func waitUntilDataFilesFlushed(
Expand Down
Loading

0 comments on commit a801229

Please sign in to comment.