From a8012291e42ad9f675dc251f12aa939b422bc0ba Mon Sep 17 00:00:00 2001 From: Bo Du Date: Thu, 6 Aug 2020 17:00:41 -0600 Subject: [PATCH] [dbnode] Background cold flush process (#2508) --- src/dbnode/integration/bootstrap_helpers.go | 37 +-- .../commitlog_bootstrap_merge_test.go | 43 +--- ...strap_with_snapshots_after_restart_test.go | 178 +++++++++++++ ...commitlog_bootstrap_with_snapshots_test.go | 21 +- src/dbnode/integration/disk_flush_helpers.go | 13 +- src/dbnode/integration/disk_snapshot_test.go | 12 +- .../integration/fs_bootstrap_index_test.go | 49 +--- .../fs_bootstrap_index_volume_type_test.go | 49 +--- .../integration/fs_bootstrap_multi_ns_test.go | 35 +-- .../integration/fs_bootstrap_tags_test.go | 33 +-- src/dbnode/integration/fs_bootstrap_test.go | 33 +-- ...fs_commitlog_mixed_mode_read_write_test.go | 45 +--- ...napshot_mixed_mode_read_write_prop_test.go | 4 +- .../fs_data_expiry_bootstrap_test.go | 32 +-- src/dbnode/integration/integration.go | 11 +- src/dbnode/integration/setup.go | 87 +++++++ src/dbnode/persist/fs/files.go | 22 +- src/dbnode/persist/fs/files_test.go | 2 +- src/dbnode/persist/fs/read_write_test.go | 4 +- src/dbnode/storage/bootstrap.go | 2 +- .../storage/bootstrap/bootstrapper/README.md | 1 + .../storage/bootstrap/bootstrapper/base.go | 11 +- .../bootstrap/bootstrapper/base_test.go | 8 +- .../bootstrapper/commitlog/source.go | 56 +++-- .../bootstrap/bootstrapper/fs/source.go | 3 +- .../fs/source_index_bench_test.go | 2 +- .../bootstrap/bootstrapper/peers/source.go | 17 ++ .../storage/bootstrap/bootstrapper/readers.go | 3 +- src/dbnode/storage/bootstrap/process.go | 31 ++- src/dbnode/storage/bootstrap/types.go | 7 +- src/dbnode/storage/bootstrap/util.go | 10 +- src/dbnode/storage/bootstrap_test.go | 6 +- src/dbnode/storage/cleanup.go | 75 ++++-- src/dbnode/storage/cleanup_test.go | 26 +- src/dbnode/storage/coldflush.go | 199 +++++++++++++++ src/dbnode/storage/coldflush_test.go | 118 +++++++++ src/dbnode/storage/database.go | 10 +- src/dbnode/storage/flush.go | 172 +++++-------- src/dbnode/storage/flush_test.go | 40 +-- src/dbnode/storage/fs.go | 2 +- src/dbnode/storage/fs_test.go | 2 +- src/dbnode/storage/mediator.go | 233 +++++++++++------- src/dbnode/storage/mediator_test.go | 4 +- src/dbnode/storage/options.go | 14 ++ src/dbnode/storage/series/buffer.go | 12 +- src/dbnode/storage/series/buffer_mock.go | 14 ++ src/dbnode/storage/series/series.go | 7 + src/dbnode/storage/series/series_mock.go | 14 ++ src/dbnode/storage/series/types.go | 6 +- src/dbnode/storage/shard.go | 49 +++- src/dbnode/storage/shard_test.go | 1 + src/dbnode/storage/storage_mock.go | 211 +++++++++++++--- src/dbnode/storage/types.go | 44 +++- 53 files changed, 1407 insertions(+), 713 deletions(-) create mode 100644 src/dbnode/integration/commitlog_bootstrap_with_snapshots_after_restart_test.go create mode 100644 src/dbnode/storage/coldflush.go create mode 100644 src/dbnode/storage/coldflush_test.go diff --git a/src/dbnode/integration/bootstrap_helpers.go b/src/dbnode/integration/bootstrap_helpers.go index c5ef8ab74d..3f11b2d8d0 100644 --- a/src/dbnode/integration/bootstrap_helpers.go +++ b/src/dbnode/integration/bootstrap_helpers.go @@ -26,12 +26,9 @@ import ( "testing" "github.com/m3db/m3/src/dbnode/namespace" - "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" - "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" - bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/x/context" @@ -140,30 +137,12 @@ func (t testBootstrapperSource) String() string { } func setupCommitLogBootstrapperWithFSInspection( - t *testing.T, setup TestSetup, commitLogOpts commitlog.Options) { - noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider() - bsOpts := newDefaulTestResultOptions(setup.StorageOpts()) - bclOpts := bcl.NewOptions(). - SetResultOptions(bsOpts). - SetCommitLogOptions(commitLogOpts). - SetRuntimeOptionsManager(runtime.NewOptionsManager()) - fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions() - bs, err := bcl.NewCommitLogBootstrapperProvider( - bclOpts, mustInspectFilesystem(fsOpts), noOpAll) - require.NoError(t, err) - processOpts := bootstrap.NewProcessOptions(). - SetTopologyMapProvider(setup). - SetOrigin(setup.Origin()) - process, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts) - require.NoError(t, err) - setup.SetStorageOpts(setup.StorageOpts().SetBootstrapProcessProvider(process)) -} - -func mustInspectFilesystem(fsOpts fs.Options) fs.Inspection { - inspection, err := fs.InspectFilesystem(fsOpts) - if err != nil { - panic(err) - } - - return inspection + t *testing.T, + setup TestSetup, + commitLogOpts commitlog.Options, +) { + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + CommitLogOptions: commitLogOpts, + WithCommitLog: true, + })) } diff --git a/src/dbnode/integration/commitlog_bootstrap_merge_test.go b/src/dbnode/integration/commitlog_bootstrap_merge_test.go index 52cd7f1a2e..95004c842c 100644 --- a/src/dbnode/integration/commitlog_bootstrap_merge_test.go +++ b/src/dbnode/integration/commitlog_bootstrap_merge_test.go @@ -28,13 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" - persistfs "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/runtime" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" - bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/require" @@ -118,38 +112,11 @@ func TestCommitLogAndFSMergeBootstrap(t *testing.T) { } writeCommitLogData(t, setup, commitLogOpts, commitlogSeriesMaps, ns1, false) - // commit log bootstrapper (must be after writing out commitlog files so inspection finds files) - noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider() - bsOpts := newDefaulTestResultOptions(setup.StorageOpts()) - bclOpts := bcl.NewOptions(). - SetResultOptions(bsOpts). - SetCommitLogOptions(commitLogOpts). - SetRuntimeOptionsManager(runtime.NewOptionsManager()) - fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions() - - commitLogBootstrapper, err := bcl.NewCommitLogBootstrapperProvider( - bclOpts, mustInspectFilesystem(fsOpts), noOpAll) - require.NoError(t, err) - // fs bootstrapper - persistMgr, err := persistfs.NewPersistManager(fsOpts) - require.NoError(t, err) - storageIdxOpts := setup.StorageOpts().IndexOptions() - bfsOpts := fs.NewOptions(). - SetResultOptions(bsOpts). - SetFilesystemOptions(fsOpts). - SetIndexOptions(storageIdxOpts). - SetPersistManager(persistMgr). - SetCompactor(newCompactor(t, storageIdxOpts)) - fsBootstrapper, err := fs.NewFileSystemBootstrapperProvider(bfsOpts, commitLogBootstrapper) - require.NoError(t, err) - // bootstrapper storage opts - processOpts := bootstrap.NewProcessOptions(). - SetTopologyMapProvider(setup). - SetOrigin(setup.Origin()) - process, err := bootstrap.NewProcessProvider( - fsBootstrapper, processOpts, bsOpts) - require.NoError(t, err) - setup.SetStorageOpts(setup.StorageOpts().SetBootstrapProcessProvider(process)) + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + CommitLogOptions: commitLogOpts, + WithCommitLog: true, + WithFileSystem: true, + })) log.Info("moving time forward and starting server") setup.SetNowFn(t3) diff --git a/src/dbnode/integration/commitlog_bootstrap_with_snapshots_after_restart_test.go b/src/dbnode/integration/commitlog_bootstrap_with_snapshots_after_restart_test.go new file mode 100644 index 0000000000..b0756493b8 --- /dev/null +++ b/src/dbnode/integration/commitlog_bootstrap_with_snapshots_after_restart_test.go @@ -0,0 +1,178 @@ +// +build integration + +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/integration/generate" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/persist/schema" + "github.com/m3db/m3/src/dbnode/retention" + xclock "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/ident" + xtime "github.com/m3db/m3/src/x/time" + + "github.com/stretchr/testify/require" +) + +func TestCommitLogBootstrapWithSnapshotsAfterRestart(t *testing.T) { + if testing.Short() { + t.SkipNow() // Just skip if we're doing a short run + } + + // Test setup + var ( + ropts = retention.NewOptions().SetRetentionPeriod(12 * time.Hour) + blockSize = ropts.BlockSize() + ) + ns, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions(). + SetRetentionOptions(ropts). + SetColdWritesEnabled(true)) + require.NoError(t, err) + opts := NewTestOptions(t). + SetNamespaces([]namespace.Metadata{ns}). + SetTickMinimumInterval(100 * time.Millisecond) + + setup, err := NewTestSetup(t, opts, nil) + require.NoError(t, err) + defer setup.Close() + + commitLogOpts := setup.StorageOpts().CommitLogOptions(). + SetFlushInterval(defaultIntegrationTestFlushInterval) + setup.SetStorageOpts(setup.StorageOpts(). + SetCommitLogOptions(commitLogOpts). + SetMediatorTickInterval(50 * time.Millisecond)) + + log := setup.StorageOpts().InstrumentOptions().Logger() + log.Info("commit log bootstrap with snapshots after restart test") + + // Start the server with filesystem bootstrapper + require.NoError(t, setup.StartServer()) + log.Debug("server is now up") + + // Stop the server + defer func() { + require.NoError(t, setup.StopServer()) + log.Debug("server is now down") + }() + + // Write test data + log.Info("writing test data") + now := setup.NowFn()().Truncate(blockSize) + seriesMaps := make(map[xtime.UnixNano]generate.SeriesBlock) + inputData := []generate.BlockConfig{ + {IDs: []string{"foo", "bar"}, NumPoints: 50, Start: now.Add(-5 * blockSize)}, + {IDs: []string{"foo", "qux"}, NumPoints: 50, Start: now.Add(-4 * blockSize)}, + {IDs: []string{"qux", "quux"}, NumPoints: 50, Start: now.Add(-3 * blockSize)}, + {IDs: []string{"corge", "porgie"}, NumPoints: 50, Start: now.Add(-2 * blockSize)}, + } + for _, input := range inputData { + testData := generate.Block(input) + seriesMaps[xtime.ToUnixNano(input.Start)] = testData + require.NoError(t, setup.WriteBatch(testNamespaces[0], testData)) + } + + // Wait until snapshots are on disk. + fsOpts := commitLogOpts.FilesystemOptions() + expectedNumSeries := 0 + for _, data := range inputData { + expectedNumSeries += len(data.IDs) + } + xclock.WaitUntil(func() bool { + var totalNumEntries int + for _, numEntries := range getNumEntriesPerBlockStart(ns.ID(), opts.NumShards(), fsOpts) { + totalNumEntries += numEntries + } + return totalNumEntries == expectedNumSeries + }, time.Minute) + + // Stop and restart server to allow bootstrapping from commit logs. + require.NoError(t, setup.StopServer()) + // Setup commitlog bootstrapper after writing data so filesystem inspection can find it. + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + CommitLogOptions: commitLogOpts, + WithCommitLog: true, + // Also setup fs bootstrapper to be ensure correct behaviour on restart w/ fs bootstrapper enabled. + WithFileSystem: true, + })) + require.NoError(t, setup.StartServer()) + log.Debug("server restarted") + + // Verify that data is what we expect. + metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize)) + observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns, metadatasByShard) + verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps) + + // Wait until empty snapshots are on disk. + xclock.WaitUntil(func() bool { + var totalNumEntries int + for _, numEntries := range getNumEntriesPerBlockStart(ns.ID(), opts.NumShards(), fsOpts) { + totalNumEntries += numEntries + } + return totalNumEntries == 0 + }, time.Minute) + + // Verify that data is still what we expect. + metadatasByShard = testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-5*blockSize), now.Add(-blockSize)) + observedSeriesMaps = testSetupToSeriesMaps(t, setup, ns, metadatasByShard) + verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps) +} + +func getNumEntriesPerBlockStart( + nsID ident.ID, + numShards int, + fsOpts fs.Options, +) map[xtime.UnixNano]int { + numEntriesPerBlockStart := make(map[xtime.UnixNano]int) + for shard := 0; shard < numShards; shard++ { + infoFiles := fs.ReadInfoFiles( + fsOpts.FilePathPrefix(), + nsID, + uint32(shard), + fsOpts.InfoReaderBufferSize(), + fsOpts.DecodingOptions(), + persist.FileSetSnapshotType, + ) + // Grab the latest snapshot file for each blockstart. + latestSnapshotInfoPerBlockStart := make(map[xtime.UnixNano]schema.IndexInfo) + for _, f := range infoFiles { + info, ok := latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] + if !ok { + latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = f.Info + continue + } + + if f.Info.VolumeIndex > info.VolumeIndex { + latestSnapshotInfoPerBlockStart[xtime.UnixNano(f.Info.BlockStart)] = f.Info + } + } + for blockStart, info := range latestSnapshotInfoPerBlockStart { + numEntriesPerBlockStart[blockStart] += int(info.Entries) + } + } + return numEntriesPerBlockStart +} diff --git a/src/dbnode/integration/commitlog_bootstrap_with_snapshots_test.go b/src/dbnode/integration/commitlog_bootstrap_with_snapshots_test.go index 4f22d977ed..60c50d3d10 100644 --- a/src/dbnode/integration/commitlog_bootstrap_with_snapshots_test.go +++ b/src/dbnode/integration/commitlog_bootstrap_with_snapshots_test.go @@ -51,9 +51,13 @@ func testCommitLogBootstrapWithSnapshots(t *testing.T, setTestOpts setTestOption ropts = retention.NewOptions().SetRetentionPeriod(12 * time.Hour) blockSize = ropts.BlockSize() ) - ns1, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions().SetRetentionOptions(ropts)) + ns1, err := namespace.NewMetadata(testNamespaces[0], namespace.NewOptions(). + SetRetentionOptions(ropts). + SetColdWritesEnabled(true)) require.NoError(t, err) - ns2, err := namespace.NewMetadata(testNamespaces[1], namespace.NewOptions().SetRetentionOptions(ropts)) + ns2, err := namespace.NewMetadata(testNamespaces[1], namespace.NewOptions(). + SetRetentionOptions(ropts). + SetColdWritesEnabled(true)) require.NoError(t, err) opts := NewTestOptions(t). SetNamespaces([]namespace.Metadata{ns1, ns2}) @@ -79,7 +83,14 @@ func testCommitLogBootstrapWithSnapshots(t *testing.T, setTestOpts setTestOption log.Info("generating data") var ( now = setup.NowFn()().Truncate(blockSize) - seriesMaps = generateSeriesMaps(30, updateInputConfig, now.Add(-2*blockSize), now.Add(-blockSize)) + seriesMaps = generateSeriesMaps( + 100, + updateInputConfig, + now.Add(-4*blockSize), + now.Add(-3*blockSize), + now.Add(-2*blockSize), + now.Add(-blockSize), + ) ) log.Info("writing data") @@ -133,14 +144,14 @@ func testCommitLogBootstrapWithSnapshots(t *testing.T, setTestOpts setTestOption // Verify in-memory data match what we expect - all writes from seriesMaps // should be present - metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-2*blockSize), now) + metadatasByShard := testSetupMetadatas(t, setup, testNamespaces[0], now.Add(-4*blockSize), now) observedSeriesMaps := testSetupToSeriesMaps(t, setup, ns1, metadatasByShard) verifySeriesMapsEqual(t, seriesMaps, observedSeriesMaps) // Verify in-memory data match what we expect - no writes should be present // because we didn't issue any writes for this namespaces emptySeriesMaps := make(generate.SeriesBlocksByStart) - metadatasByShard2 := testSetupMetadatas(t, setup, testNamespaces[1], now.Add(-2*blockSize), now) + metadatasByShard2 := testSetupMetadatas(t, setup, testNamespaces[1], now.Add(-4*blockSize), now) observedSeriesMaps2 := testSetupToSeriesMaps(t, setup, ns2, metadatasByShard2) verifySeriesMapsEqual(t, emptySeriesMaps, observedSeriesMaps2) diff --git a/src/dbnode/integration/disk_flush_helpers.go b/src/dbnode/integration/disk_flush_helpers.go index 7d3c87d400..ce954a036c 100644 --- a/src/dbnode/integration/disk_flush_helpers.go +++ b/src/dbnode/integration/disk_flush_helpers.go @@ -40,6 +40,7 @@ import ( "github.com/m3db/m3/src/x/ident/testutil" xtime "github.com/m3db/m3/src/x/time" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" ) @@ -84,7 +85,8 @@ func waitUntilSnapshotFilesFlushed( namespace ident.ID, expectedSnapshots []snapshotID, timeout time.Duration, -) error { +) (uuid.UUID, error) { + var snapshotID uuid.UUID dataFlushed := func() bool { for _, shard := range shardSet.AllIDs() { for _, e := range expectedSnapshots { @@ -102,14 +104,19 @@ func waitUntilSnapshotFilesFlushed( if !(latest.ID.VolumeIndex >= e.minVolume) { return false } + + _, snapshotID, err = latest.SnapshotTimeAndID() + if err != nil { + panic(err) + } } } return true } if waitUntil(dataFlushed, timeout) { - return nil + return snapshotID, nil } - return errDiskFlushTimedOut + return snapshotID, errDiskFlushTimedOut } func waitUntilDataFilesFlushed( diff --git a/src/dbnode/integration/disk_snapshot_test.go b/src/dbnode/integration/disk_snapshot_test.go index f6aed5eeb4..5b7d886339 100644 --- a/src/dbnode/integration/disk_snapshot_test.go +++ b/src/dbnode/integration/disk_snapshot_test.go @@ -152,8 +152,8 @@ func TestDiskSnapshotSimple(t *testing.T) { maxWaitTime := time.Minute for i, ns := range testSetup.Namespaces() { log.Info("waiting for snapshot files to flush") - require.NoError(t, waitUntilSnapshotFilesFlushed( - filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime)) + _, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime) + require.NoError(t, err) log.Info("verifying snapshot files") verifySnapshottedDataFiles(t, shardSet, testSetup.StorageOpts(), ns.ID(), seriesMaps) } @@ -167,15 +167,17 @@ func TestDiskSnapshotSimple(t *testing.T) { for _, ns := range testSetup.Namespaces() { log.Info("waiting for new snapshot files to be written out") snapshotsToWaitFor := []snapshotID{{blockStart: newTime.Truncate(blockSize)}} - require.NoError(t, waitUntilSnapshotFilesFlushed( - filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)) + // NB(bodu): We need to check if a specific snapshot ID was deleted since snapshotting logic now changed + // to always snapshotting every block start w/in retention. + snapshotID, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime) + require.NoError(t, err) log.Info("waiting for old snapshot files to be deleted") for _, shard := range shardSet.All() { waitUntil(func() bool { // Increase the time each check to ensure that the filesystem processes are able to progress (some // of them throttle themselves based on time elapsed since the previous time.) testSetup.SetNowFn(testSetup.NowFn()().Add(10 * time.Second)) - exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), shard.ID(), oldTime.Truncate(blockSize)) + exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), snapshotID, shard.ID(), oldTime.Truncate(blockSize)) require.NoError(t, err) return !exists }, maxWaitTime) diff --git a/src/dbnode/integration/fs_bootstrap_index_test.go b/src/dbnode/integration/fs_bootstrap_index_test.go index e8c625951d..2469420f73 100644 --- a/src/dbnode/integration/fs_bootstrap_index_test.go +++ b/src/dbnode/integration/fs_bootstrap_index_test.go @@ -28,16 +28,9 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" - persistfs "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" - "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/m3ninx/idx" - "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/x/ident" "github.com/stretchr/testify/require" @@ -67,45 +60,9 @@ func TestFilesystemBootstrapIndexWithIndexingEnabled(t *testing.T) { require.NoError(t, err) defer setup.Close() - fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions() - - persistMgr, err := persistfs.NewPersistManager(fsOpts) - require.NoError(t, err) - - storageIdxOpts := setup.StorageOpts().IndexOptions() - compactor, err := compaction.NewCompactor(storageIdxOpts.DocumentArrayPool(), - index.DocumentArrayPoolCapacity, - storageIdxOpts.SegmentBuilderOptions(), - storageIdxOpts.FSTSegmentOptions(), - compaction.CompactorOptions{ - FSTWriterOptions: &fst.WriterOptions{ - // DisableRegistry is set to true to trade a larger FST size - // for a faster FST compaction since we want to reduce the end - // to end latency for time to first index a metric. - DisableRegistry: true, - }, - }) - require.NoError(t, err) - - noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider() - bsOpts := result.NewOptions(). - SetSeriesCachePolicy(setup.StorageOpts().SeriesCachePolicy()) - bfsOpts := fs.NewOptions(). - SetResultOptions(bsOpts). - SetFilesystemOptions(fsOpts). - SetIndexOptions(storageIdxOpts). - SetPersistManager(persistMgr). - SetCompactor(compactor) - bs, err := fs.NewFileSystemBootstrapperProvider(bfsOpts, noOpAll) - require.NoError(t, err) - processOpts := bootstrap.NewProcessOptions(). - SetTopologyMapProvider(setup). - SetOrigin(setup.Origin()) - processProvider, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts) - require.NoError(t, err) - - setup.SetStorageOpts(setup.StorageOpts(). - SetBootstrapProcessProvider(processProvider)) + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + WithFileSystem: true, + })) // Write test data now := setup.NowFn()() diff --git a/src/dbnode/integration/fs_bootstrap_index_volume_type_test.go b/src/dbnode/integration/fs_bootstrap_index_volume_type_test.go index 9be217c713..58ac71aca6 100644 --- a/src/dbnode/integration/fs_bootstrap_index_volume_type_test.go +++ b/src/dbnode/integration/fs_bootstrap_index_volume_type_test.go @@ -28,17 +28,10 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" - persistfs "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" - "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" - "github.com/m3db/m3/src/m3ninx/index/segment/fst" idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/ident" @@ -69,45 +62,9 @@ func TestFilesystemBootstrapIndexVolumeTypes(t *testing.T) { require.NoError(t, err) defer setup.Close() - fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions() - - persistMgr, err := persistfs.NewPersistManager(fsOpts) - require.NoError(t, err) - - storageIdxOpts := setup.StorageOpts().IndexOptions() - compactor, err := compaction.NewCompactor(storageIdxOpts.DocumentArrayPool(), - index.DocumentArrayPoolCapacity, - storageIdxOpts.SegmentBuilderOptions(), - storageIdxOpts.FSTSegmentOptions(), - compaction.CompactorOptions{ - FSTWriterOptions: &fst.WriterOptions{ - // DisableRegistry is set to true to trade a larger FST size - // for a faster FST compaction since we want to reduce the end - // to end latency for time to first index a metric. - DisableRegistry: true, - }, - }) - require.NoError(t, err) - - noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider() - bsOpts := result.NewOptions(). - SetSeriesCachePolicy(setup.StorageOpts().SeriesCachePolicy()) - bfsOpts := fs.NewOptions(). - SetResultOptions(bsOpts). - SetFilesystemOptions(fsOpts). - SetIndexOptions(storageIdxOpts). - SetPersistManager(persistMgr). - SetCompactor(compactor) - bs, err := fs.NewFileSystemBootstrapperProvider(bfsOpts, noOpAll) - require.NoError(t, err) - processOpts := bootstrap.NewProcessOptions(). - SetTopologyMapProvider(setup). - SetOrigin(setup.Origin()) - processProvider, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts) - require.NoError(t, err) - - setup.SetStorageOpts(setup.StorageOpts(). - SetBootstrapProcessProvider(processProvider)) + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + WithFileSystem: true, + })) // Write test data now := setup.NowFn()() diff --git a/src/dbnode/integration/fs_bootstrap_multi_ns_test.go b/src/dbnode/integration/fs_bootstrap_multi_ns_test.go index 4c2a83c196..9d63a1ccc5 100644 --- a/src/dbnode/integration/fs_bootstrap_multi_ns_test.go +++ b/src/dbnode/integration/fs_bootstrap_multi_ns_test.go @@ -28,12 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" - persistfs "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/stretchr/testify/require" ) @@ -63,33 +58,9 @@ func TestFilesystemBootstrapMultipleNamespaces(t *testing.T) { require.NoError(t, err) defer setup.Close() - fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions() - - persistMgr, err := persistfs.NewPersistManager(fsOpts) - require.NoError(t, err) - - noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider() - bsOpts := result.NewOptions(). - SetSeriesCachePolicy(setup.StorageOpts().SeriesCachePolicy()) - storageIdxOpts := setup.StorageOpts().IndexOptions() - bfsOpts := fs.NewOptions(). - SetResultOptions(bsOpts). - SetFilesystemOptions(fsOpts). - SetIndexOptions(storageIdxOpts). - SetPersistManager(persistMgr). - SetCompactor(newCompactor(t, storageIdxOpts)) - - bs, err := fs.NewFileSystemBootstrapperProvider(bfsOpts, noOpAll) - require.NoError(t, err) - - processOpts := bootstrap.NewProcessOptions(). - SetTopologyMapProvider(setup). - SetOrigin(setup.Origin()) - processProvider, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts) - require.NoError(t, err) - - setup.SetStorageOpts(setup.StorageOpts(). - SetBootstrapProcessProvider(processProvider)) + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + WithFileSystem: true, + })) log := setup.StorageOpts().InstrumentOptions().Logger() diff --git a/src/dbnode/integration/fs_bootstrap_tags_test.go b/src/dbnode/integration/fs_bootstrap_tags_test.go index 2c752aab32..2a4021f607 100644 --- a/src/dbnode/integration/fs_bootstrap_tags_test.go +++ b/src/dbnode/integration/fs_bootstrap_tags_test.go @@ -28,12 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" - persistfs "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/x/ident" "github.com/stretchr/testify/require" @@ -63,31 +58,9 @@ func TestFilesystemBootstrapTagsWithIndexingDisabled(t *testing.T) { require.NoError(t, err) defer setup.Close() - fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions() - - persistMgr, err := persistfs.NewPersistManager(fsOpts) - require.NoError(t, err) - - noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider() - bsOpts := result.NewOptions(). - SetSeriesCachePolicy(setup.StorageOpts().SeriesCachePolicy()) - storageIdxOpts := setup.StorageOpts().IndexOptions() - bfsOpts := fs.NewOptions(). - SetResultOptions(bsOpts). - SetFilesystemOptions(fsOpts). - SetIndexOptions(storageIdxOpts). - SetPersistManager(persistMgr). - SetCompactor(newCompactor(t, storageIdxOpts)) - bs, err := fs.NewFileSystemBootstrapperProvider(bfsOpts, noOpAll) - require.NoError(t, err) - processOpts := bootstrap.NewProcessOptions(). - SetTopologyMapProvider(setup). - SetOrigin(setup.Origin()) - processProvider, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts) - require.NoError(t, err) - - setup.SetStorageOpts(setup.StorageOpts(). - SetBootstrapProcessProvider(processProvider)) + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + WithFileSystem: true, + })) // Write test data now := setup.NowFn()() diff --git a/src/dbnode/integration/fs_bootstrap_test.go b/src/dbnode/integration/fs_bootstrap_test.go index c89e787347..a6451dd7bf 100644 --- a/src/dbnode/integration/fs_bootstrap_test.go +++ b/src/dbnode/integration/fs_bootstrap_test.go @@ -28,12 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" - persistfs "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/stretchr/testify/require" ) @@ -73,31 +68,9 @@ func testFilesystemBootstrap(t *testing.T, setTestOpts setTestOptions, updateInp require.NoError(t, err) defer setup.Close() - fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions() - - persistMgr, err := persistfs.NewPersistManager(fsOpts) - require.NoError(t, err) - - noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider() - bsOpts := result.NewOptions(). - SetSeriesCachePolicy(setup.StorageOpts().SeriesCachePolicy()) - storageIdxOpts := setup.StorageOpts().IndexOptions() - bfsOpts := fs.NewOptions(). - SetResultOptions(bsOpts). - SetFilesystemOptions(fsOpts). - SetIndexOptions(storageIdxOpts). - SetPersistManager(persistMgr). - SetCompactor(newCompactor(t, storageIdxOpts)) - bs, err := fs.NewFileSystemBootstrapperProvider(bfsOpts, noOpAll) - require.NoError(t, err) - processOpts := bootstrap.NewProcessOptions(). - SetTopologyMapProvider(setup). - SetOrigin(setup.Origin()) - processProvider, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts) - require.NoError(t, err) - - setup.SetStorageOpts(setup.StorageOpts(). - SetBootstrapProcessProvider(processProvider)) + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + WithFileSystem: true, + })) // Write test data now := setup.NowFn()() diff --git a/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go b/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go index b38847f450..dc98a1c4b7 100644 --- a/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go +++ b/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go @@ -29,13 +29,7 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" - persistfs "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/runtime" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" - bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" @@ -230,49 +224,20 @@ func newTestSetupWithCommitLogAndFilesystemBootstrapper(t *testing.T, opts TestO func setCommitLogAndFilesystemBootstrapper(t *testing.T, opts TestOptions, setup TestSetup) TestSetup { commitLogOpts := setup.StorageOpts().CommitLogOptions() - fsOpts := commitLogOpts.FilesystemOptions() commitLogOpts = commitLogOpts. SetFlushInterval(defaultIntegrationTestFlushInterval) setup.SetStorageOpts(setup.StorageOpts().SetCommitLogOptions(commitLogOpts)) - // commit log bootstrapper - noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider() - bsOpts := newDefaulTestResultOptions(setup.StorageOpts()) - bclOpts := bcl.NewOptions(). - SetResultOptions(bsOpts). - SetCommitLogOptions(commitLogOpts). - SetRuntimeOptionsManager(runtime.NewOptionsManager()) - - commitLogBootstrapper, err := bcl.NewCommitLogBootstrapperProvider( - bclOpts, mustInspectFilesystem(fsOpts), noOpAll) - require.NoError(t, err) - - // fs bootstrapper - persistMgr, err := persistfs.NewPersistManager(fsOpts) - require.NoError(t, err) - - storageIdxOpts := setup.StorageOpts().IndexOptions() - bfsOpts := fs.NewOptions(). - SetResultOptions(bsOpts). - SetFilesystemOptions(fsOpts). - SetIndexOptions(storageIdxOpts). - SetPersistManager(persistMgr). - SetCompactor(newCompactor(t, storageIdxOpts)) - - fsBootstrapper, err := fs.NewFileSystemBootstrapperProvider(bfsOpts, commitLogBootstrapper) - require.NoError(t, err) + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + CommitLogOptions: commitLogOpts, + WithCommitLog: true, + WithFileSystem: true, + })) // Need to make sure we have an active m3dbAdminClient because the previous one // may have been shutdown by StopServer(). setup.MaybeResetClients() - // bootstrapper storage opts - processOpts := bootstrap.NewProcessOptions(). - SetTopologyMapProvider(setup). - SetOrigin(setup.Origin()) - processProvider, err := bootstrap.NewProcessProvider(fsBootstrapper, processOpts, bsOpts) - require.NoError(t, err) - setup.SetStorageOpts(setup.StorageOpts().SetBootstrapProcessProvider(processProvider)) return setup } diff --git a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go index 78341889e9..7e209dee63 100644 --- a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go +++ b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go @@ -29,8 +29,8 @@ import ( "testing" "time" - "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/x/context" xtime "github.com/m3db/m3/src/x/time" "go.uber.org/zap" @@ -228,7 +228,7 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) { } else { snapshotBlock = now.Truncate(ns1BlockSize).Add(-ns1BlockSize) } - err := waitUntilSnapshotFilesFlushed( + _, err := waitUntilSnapshotFilesFlushed( filePathPrefix, setup.ShardSet(), nsID, diff --git a/src/dbnode/integration/fs_data_expiry_bootstrap_test.go b/src/dbnode/integration/fs_data_expiry_bootstrap_test.go index 2215e62979..4d020d183e 100644 --- a/src/dbnode/integration/fs_data_expiry_bootstrap_test.go +++ b/src/dbnode/integration/fs_data_expiry_bootstrap_test.go @@ -28,12 +28,7 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" - "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" - "github.com/m3db/m3/src/dbnode/storage/bootstrap" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" - bfs "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" - "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/stretchr/testify/require" ) @@ -65,31 +60,10 @@ func TestFilesystemDataExpiryBootstrap(t *testing.T) { defer setup.Close() log := setup.StorageOpts().InstrumentOptions().Logger() - fsOpts := setup.StorageOpts().CommitLogOptions().FilesystemOptions() - persistMgr, err := fs.NewPersistManager(fsOpts) - require.NoError(t, err) - - noOpAll := bootstrapper.NewNoOpAllBootstrapperProvider() - bsOpts := result.NewOptions(). - SetSeriesCachePolicy(setup.StorageOpts().SeriesCachePolicy()) - storageIdxOpts := setup.StorageOpts().IndexOptions() - bfsOpts := bfs.NewOptions(). - SetResultOptions(bsOpts). - SetIndexOptions(storageIdxOpts). - SetFilesystemOptions(fsOpts). - SetPersistManager(persistMgr). - SetCompactor(newCompactor(t, storageIdxOpts)) - bs, err := bfs.NewFileSystemBootstrapperProvider(bfsOpts, noOpAll) - require.NoError(t, err) - processOpts := bootstrap.NewProcessOptions(). - SetTopologyMapProvider(setup). - SetOrigin(setup.Origin()) - processProvider, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts) - require.NoError(t, err) - - setup.SetStorageOpts(setup.StorageOpts(). - SetBootstrapProcessProvider(processProvider)) + require.NoError(t, setup.InitializeBootstrappers(InitializeBootstrappersOptions{ + WithFileSystem: true, + })) // Write test data now := setup.NowFn()() diff --git a/src/dbnode/integration/integration.go b/src/dbnode/integration/integration.go index 17681602c8..b0fdba6c39 100644 --- a/src/dbnode/integration/integration.go +++ b/src/dbnode/integration/integration.go @@ -417,7 +417,13 @@ func newCompactor( t *testing.T, opts index.Options, ) *compaction.Compactor { - compactor, err := compaction.NewCompactor(opts.DocumentArrayPool(), + compactor, err := newCompactorWithErr(opts) + require.NoError(t, err) + return compactor +} + +func newCompactorWithErr(opts index.Options) (*compaction.Compactor, error) { + return compaction.NewCompactor(opts.DocumentArrayPool(), index.DocumentArrayPoolCapacity, opts.SegmentBuilderOptions(), opts.FSTSegmentOptions(), @@ -429,9 +435,6 @@ func newCompactor( DisableRegistry: true, }, }) - require.NoError(t, err) - return compactor - } func writeTestIndexDataToDisk( diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index 6f169931be..9186052bc7 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -41,11 +41,16 @@ import ( "github.com/m3db/m3/src/dbnode/integration/generate" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage" "github.com/m3db/m3/src/dbnode/storage/block" + "github.com/m3db/m3/src/dbnode/storage/bootstrap" "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" + bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog" + bfs "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" "github.com/m3db/m3/src/dbnode/storage/cluster" "github.com/m3db/m3/src/dbnode/storage/index" "github.com/m3db/m3/src/dbnode/storage/series" @@ -164,6 +169,8 @@ type TestSetup interface { Close() WriteBatch(ident.ID, generate.SeriesBlock) error ShouldBeEqual() bool + // *NOTE*: This method is deprecated and should not be used in future tests. + // Also, we should migrate existing tests when we touch them away from using this. SleepFor10xTickMinimumInterval() BlockLeaseManager() block.LeaseManager ShardSet() sharding.ShardSet @@ -177,6 +184,7 @@ type TestSetup interface { WaitUntilServerIsUp() error WaitUntilServerIsDown() error Truncate(*rpc.TruncateRequest) (int64, error) + InitializeBootstrappers(opts InitializeBootstrappersOptions) error } type storageOption func(storage.Options) storage.Options @@ -883,6 +891,76 @@ func (ts *testSetup) SchemaRegistry() namespace.SchemaRegistry { return ts.schemaReg } +// InitializeBootstrappersOptions supplies options for bootstrapper initialization. +type InitializeBootstrappersOptions struct { + CommitLogOptions commitlog.Options + WithCommitLog bool + WithFileSystem bool +} + +func (o InitializeBootstrappersOptions) validate() error { + if o.WithCommitLog && o.CommitLogOptions == nil { + return errors.New("commit log options required when initializing a commit log bootstrapper") + } + return nil +} + +func (ts *testSetup) InitializeBootstrappers(opts InitializeBootstrappersOptions) error { + var err error + if err := opts.validate(); err != nil { + return err + } + + bs := bootstrapper.NewNoOpAllBootstrapperProvider() + storageOpts := ts.StorageOpts() + bsOpts := newDefaulTestResultOptions(storageOpts) + fsOpts := storageOpts.CommitLogOptions().FilesystemOptions() + if opts.WithCommitLog { + bclOpts := bcl.NewOptions(). + SetResultOptions(bsOpts). + SetCommitLogOptions(opts.CommitLogOptions). + SetRuntimeOptionsManager(runtime.NewOptionsManager()) + bs, err = bcl.NewCommitLogBootstrapperProvider( + bclOpts, mustInspectFilesystem(fsOpts), bs) + if err != nil { + return err + } + } + + if opts.WithFileSystem { + persistMgr, err := fs.NewPersistManager(fsOpts) + if err != nil { + return err + } + storageIdxOpts := storageOpts.IndexOptions() + compactor, err := newCompactorWithErr(storageIdxOpts) + if err != nil { + return err + } + bfsOpts := bfs.NewOptions(). + SetResultOptions(bsOpts). + SetFilesystemOptions(fsOpts). + SetIndexOptions(storageIdxOpts). + SetPersistManager(persistMgr). + SetCompactor(compactor) + bs, err = bfs.NewFileSystemBootstrapperProvider(bfsOpts, bs) + if err != nil { + return err + } + } + + processOpts := bootstrap.NewProcessOptions(). + SetTopologyMapProvider(ts). + SetOrigin(ts.Origin()) + process, err := bootstrap.NewProcessProvider(bs, processOpts, bsOpts) + if err != nil { + return err + } + ts.SetStorageOpts(storageOpts.SetBootstrapProcessProvider(process)) + + return nil +} + // Implements topology.MapProvider, and makes sure that the topology // map provided always comes from the most recent database in the testSetup // since they get\ recreated everytime StartServer/StopServer is called and @@ -1021,3 +1099,12 @@ func newNodes( return nodes, topoInit, nodeClose } + +func mustInspectFilesystem(fsOpts fs.Options) fs.Inspection { + inspection, err := fs.InspectFilesystem(fsOpts) + if err != nil { + panic(err) + } + + return inspection +} diff --git a/src/dbnode/persist/fs/files.go b/src/dbnode/persist/fs/files.go index d09714a562..20dcade55d 100644 --- a/src/dbnode/persist/fs/files.go +++ b/src/dbnode/persist/fs/files.go @@ -828,12 +828,13 @@ func ReadInfoFiles( shard uint32, readerBufferSize int, decodingOpts msgpack.DecodingOptions, + fileSetType persist.FileSetType, ) []ReadInfoFileResult { var infoFileResults []ReadInfoFileResult decoder := msgpack.NewDecoder(decodingOpts) forEachInfoFile( forEachInfoFileSelector{ - fileSetType: persist.FileSetFlushType, + fileSetType: fileSetType, contentType: persist.FileSetDataContentType, filePathPrefix: filePathPrefix, namespace: namespace, @@ -1441,17 +1442,32 @@ func DataFileSetExists( } // SnapshotFileSetExistsAt determines whether snapshot fileset files exist for the given namespace, shard, and block start time. -func SnapshotFileSetExistsAt(prefix string, namespace ident.ID, shard uint32, blockStart time.Time) (bool, error) { +func SnapshotFileSetExistsAt( + prefix string, + namespace ident.ID, + snapshotID uuid.UUID, + shard uint32, + blockStart time.Time, +) (bool, error) { snapshotFiles, err := SnapshotFiles(prefix, namespace, shard) if err != nil { return false, err } - _, ok := snapshotFiles.LatestVolumeForBlock(blockStart) + latest, ok := snapshotFiles.LatestVolumeForBlock(blockStart) if !ok { return false, nil } + _, latestSnapshotID, err := latest.SnapshotTimeAndID() + if err != nil { + return false, err + } + + if !uuid.Equal(latestSnapshotID, snapshotID) { + return false, nil + } + // LatestVolumeForBlock checks for a complete checkpoint file, so we don't // need to recheck it here. return true, nil diff --git a/src/dbnode/persist/fs/files_test.go b/src/dbnode/persist/fs/files_test.go index c34f162558..bb2cb23dab 100644 --- a/src/dbnode/persist/fs/files_test.go +++ b/src/dbnode/persist/fs/files_test.go @@ -889,7 +889,7 @@ func TestSnapshotFileSetExistsAt(t *testing.T) { writeOutTestSnapshot(t, dir, shard, ts, 0) - exists, err := SnapshotFileSetExistsAt(dir, testNs1ID, shard, ts) + exists, err := SnapshotFileSetExistsAt(dir, testNs1ID, testSnapshotID, shard, ts) require.NoError(t, err) require.True(t, exists) } diff --git a/src/dbnode/persist/fs/read_write_test.go b/src/dbnode/persist/fs/read_write_test.go index 521f9df19f..787c1febf6 100644 --- a/src/dbnode/persist/fs/read_write_test.go +++ b/src/dbnode/persist/fs/read_write_test.go @@ -355,7 +355,7 @@ func TestInfoReadWrite(t *testing.T) { w := newTestWriter(t, filePathPrefix) writeTestData(t, w, 0, testWriterStart, entries, persist.FileSetFlushType) - readInfoFileResults := ReadInfoFiles(filePathPrefix, testNs1ID, 0, 16, nil) + readInfoFileResults := ReadInfoFiles(filePathPrefix, testNs1ID, 0, 16, nil, persist.FileSetFlushType) require.Equal(t, 1, len(readInfoFileResults)) for _, result := range readInfoFileResults { require.NoError(t, result.Err.Error()) @@ -380,7 +380,7 @@ func TestInfoReadWriteVolumeIndex(t *testing.T) { writeTestDataWithVolume(t, w, 0, testWriterStart, volume, entries, persist.FileSetFlushType) - readInfoFileResults := ReadInfoFiles(filePathPrefix, testNs1ID, 0, 16, nil) + readInfoFileResults := ReadInfoFiles(filePathPrefix, testNs1ID, 0, 16, nil, persist.FileSetFlushType) require.Equal(t, 1, len(readInfoFileResults)) for _, result := range readInfoFileResults { require.NoError(t, result.Err.Error()) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index a730d829c4..846791067f 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -141,7 +141,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // NB(xichen): disable filesystem manager before we bootstrap to minimize // the impact of file operations on bootstrapping performance - m.mediator.DisableFileOps() + m.mediator.DisableFileOpsAndWait() defer m.mediator.EnableFileOps() // Keep performing bootstraps until none pending and no error returned. diff --git a/src/dbnode/storage/bootstrap/bootstrapper/README.md b/src/dbnode/storage/bootstrap/bootstrapper/README.md index 9927bd41e9..b062aaac3d 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/README.md +++ b/src/dbnode/storage/bootstrap/bootstrapper/README.md @@ -7,6 +7,7 @@ The collection of bootstrappers comprise the task executed when bootstrapping a - `fs`: The filesystem bootstrapper, used to bootstrap as much data as possible from the local filesystem. - `peers`: The peers bootstrapper, used to bootstrap any remaining data from peers. This is used for a full node join too. - `commitlog`: The commit log bootstrapper, currently only used in the case that peers bootstrapping fails. Once the current block is being snapshotted frequently to disk it might be faster and make more sense to not actively use the peers bootstrapper and just use a combination of the filesystem bootstrapper and the minimal time range required from the commit log bootstrapper. + - *NOTE*: the commitlog bootstrapper is special cased in that it runs for the *entire* bootstrappable range per shard whereas other bootstrappers fill in the unfulfilled gaps as bootstrapping progresses. ## Cache policies diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base.go b/src/dbnode/storage/bootstrap/bootstrapper/base.go index 5d968a9237..c6e78d36cb 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base.go @@ -268,12 +268,11 @@ func (b baseBootstrapper) logSuccessAndDetermineCurrResultsUnfulfilledAndNextBoo // Set the modified result. currResults.Results.Set(id, currResult) - // Set the next bootstrapper namespace run options if we need to bootstrap - // further time ranges. - if !nextNamespace.DataRunOptions.ShardTimeRanges.IsEmpty() || - !nextNamespace.IndexRunOptions.ShardTimeRanges.IsEmpty() { - next.Namespaces.Set(id, nextNamespace) - } + // Always set the next bootstrapper namespace run options regardless of + // whether there are unfulfilled index/data shard time ranges. + // NB(bodu): We perform short circuiting directly in the peers bootstrapper and the + // commitlog bootstrapper should always run for all time ranges. + next.Namespaces.Set(id, nextNamespace) // Log the result. _, _, dataRangeRequested := dataCurrRequested.MinMaxRange() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go index 62f99afb7a..8aa123d0f0 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/base_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/base_test.go @@ -129,12 +129,13 @@ func TestBaseBootstrapperEmptyRangeWithIndex(t *testing.T) { func testBaseBootstrapperEmptyRange(t *testing.T, withIndex bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() - src, _, base := testBaseBootstrapper(t, ctrl) + src, next, base := testBaseBootstrapper(t, ctrl) testNs := testNsMetadata(t, withIndex) rngs := result.NewShardTimeRanges() unfulfilled := xtime.NewRanges() nsResults := testResult(testNs, withIndex, testShard, unfulfilled) + nextResult := testResult(testNs, withIndex, testShard, xtime.NewRanges()) shardRangeMatcher := bootstrap.ShardTimeRangesMatcher{Ranges: rngs} src.EXPECT().AvailableData(testNs, shardRangeMatcher, testDefaultRunOpts). Return(rngs, nil) @@ -155,6 +156,7 @@ func testBaseBootstrapperEmptyRange(t *testing.T, withIndex bool) { ) (bootstrap.NamespaceResults, error) { return nsResults, nil }) + next.EXPECT().Bootstrap(gomock.Any(), matcher).Return(nextResult, nil) // Test non-nil empty range tester.TestBootstrapWith(base) @@ -176,11 +178,12 @@ func TestBaseBootstrapperCurrentNoUnfulfilledWithIndex(t *testing.T) { func testBaseBootstrapperCurrentNoUnfulfilled(t *testing.T, withIndex bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() - src, _, base := testBaseBootstrapper(t, ctrl) + src, next, base := testBaseBootstrapper(t, ctrl) testNs := testNsMetadata(t, withIndex) unfulfilled := xtime.NewRanges() nsResults := testResult(testNs, withIndex, testShard, unfulfilled) + nextResult := testResult(testNs, withIndex, testShard, xtime.NewRanges()) targetRanges := testShardTimeRanges() src.EXPECT().AvailableData(testNs, targetRanges, testDefaultRunOpts). @@ -203,6 +206,7 @@ func testBaseBootstrapperCurrentNoUnfulfilled(t *testing.T, withIndex bool) { ) (bootstrap.NamespaceResults, error) { return nsResults, nil }) + next.EXPECT().Bootstrap(gomock.Any(), matcher).Return(nextResult, nil) tester.TestBootstrapWith(base) assert.Equal(t, nsResults, tester.Results) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index be0a2e2a6a..91af1ba4a3 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -175,23 +175,6 @@ func (s *commitLogSource) Read( ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperCommitLogSourceRead) defer span.Finish() - timeRangesEmpty := true - for _, elem := range namespaces.Namespaces.Iter() { - namespace := elem.Value() - dataRangesNotEmpty := !namespace.DataRunOptions.ShardTimeRanges.IsEmpty() - - indexEnabled := namespace.Metadata.Options().IndexOptions().Enabled() - indexRangesNotEmpty := indexEnabled && !namespace.IndexRunOptions.ShardTimeRanges.IsEmpty() - if dataRangesNotEmpty || indexRangesNotEmpty { - timeRangesEmpty = false - break - } - } - if timeRangesEmpty { - // Return empty result with no unfulfilled ranges. - return bootstrap.NewNamespaceResults(namespaces), nil - } - var ( // Emit bootstrapping gauge for duration of ReadData. doneReadingData = s.metrics.emitBootstrapping() @@ -216,9 +199,11 @@ func (s *commitLogSource) Read( // NB(r): Combine all shard time ranges across data and index // so we can do in one go. shardTimeRanges := result.NewShardTimeRanges() - shardTimeRanges.AddRanges(ns.DataRunOptions.ShardTimeRanges) + // NB(bodu): Use TargetShardTimeRanges which covers the entire original target shard range + // since the commitlog bootstrapper should run for the entire bootstrappable range per shard. + shardTimeRanges.AddRanges(ns.DataRunOptions.TargetShardTimeRanges) if ns.Metadata.Options().IndexOptions().Enabled() { - shardTimeRanges.AddRanges(ns.IndexRunOptions.ShardTimeRanges) + shardTimeRanges.AddRanges(ns.IndexRunOptions.TargetShardTimeRanges) } namespaceResults[ns.Metadata.ID().String()] = &readNamespaceResult{ @@ -677,6 +662,30 @@ func (s *commitLogSource) bootstrapShardSnapshots( blockSize time.Duration, mostRecentCompleteSnapshotByBlockShard map[xtime.UnixNano]map[uint32]fs.FileSetFile, ) error { + // NB(bodu): We use info files on disk to check if a snapshot should be loaded in as cold or warm. + // We do this instead of cross refing blockstarts and current time to handle the case of bootstrapping a + // once warm block start after a node has been shut down for a long time. We consider all block starts we + // haven't flushed data for yet a warm block start. + fsOpts := s.opts.CommitLogOptions().FilesystemOptions() + readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), ns.ID(), shard, + fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions(), persist.FileSetFlushType) + shardBlockStartsOnDisk := make(map[xtime.UnixNano]struct{}) + for _, result := range readInfoFilesResults { + if err := result.Err.Error(); err != nil { + // If we couldn't read the info files then keep going to be consistent + // with the way the db shard updates its flush states in UpdateFlushStates(). + s.log.Error("unable to read info files in commit log bootstrap", + zap.Uint32("shard", shard), + zap.Stringer("namespace", ns.ID()), + zap.String("filepath", result.Err.Filepath()), + zap.Error(err)) + continue + } + info := result.Info + at := xtime.FromNanoseconds(info.BlockStart) + shardBlockStartsOnDisk[xtime.ToUnixNano(at)] = struct{}{} + } + rangeIter := shardTimeRanges.Iter() for rangeIter.Next() { var ( @@ -709,9 +718,13 @@ func (s *commitLogSource) bootstrapShardSnapshots( continue } + writeType := series.WarmWrite + if _, ok := shardBlockStartsOnDisk[xtime.ToUnixNano(blockStart)]; ok { + writeType = series.ColdWrite + } if err := s.bootstrapShardBlockSnapshot( ns, accumulator, shard, blockStart, blockSize, - mostRecentCompleteSnapshotForShardBlock); err != nil { + mostRecentCompleteSnapshotForShardBlock, writeType); err != nil { return err } } @@ -727,6 +740,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( blockStart time.Time, blockSize time.Duration, mostRecentCompleteSnapshot fs.FileSetFile, + writeType series.WriteType, ) error { var ( bOpts = s.opts.ResultOptions() @@ -806,7 +820,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( } // Load into series. - if err := ref.Series.LoadBlock(dbBlock, series.WarmWrite); err != nil { + if err := ref.Series.LoadBlock(dbBlock, writeType); err != nil { return err } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index 9e6dfab1bc..aee996bb5d 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -237,7 +237,8 @@ func (s *fileSystemSource) shardAvailability( } readInfoFilesResults := fs.ReadInfoFiles(s.fsopts.FilePathPrefix(), - namespace, shard, s.fsopts.InfoReaderBufferSize(), s.fsopts.DecodingOptions()) + namespace, shard, s.fsopts.InfoReaderBufferSize(), s.fsopts.DecodingOptions(), + persist.FileSetFlushType) tr := xtime.NewRanges() for i := 0; i < len(readInfoFilesResults); i++ { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_bench_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_bench_test.go index 39ff66ec0d..0f2886912a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_bench_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_bench_test.go @@ -114,7 +114,7 @@ func BenchmarkBootstrapIndex(b *testing.B) { max = time.Unix(0, 0) ranges = xtime.NewRanges() entries = fs.ReadInfoFiles(dir, testNamespace, shard, - 0, msgpack.NewDecodingOptions()) + 0, msgpack.NewDecodingOptions(), persist.FileSetFlushType) ) for _, entry := range entries { if entry.Err != nil { diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 67ebb99eeb..410b7a6220 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -121,6 +121,23 @@ func (s *peersSource) Read( ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) defer span.Finish() + timeRangesEmpty := true + for _, elem := range namespaces.Namespaces.Iter() { + namespace := elem.Value() + dataRangesNotEmpty := !namespace.DataRunOptions.ShardTimeRanges.IsEmpty() + + indexEnabled := namespace.Metadata.Options().IndexOptions().Enabled() + indexRangesNotEmpty := indexEnabled && !namespace.IndexRunOptions.ShardTimeRanges.IsEmpty() + if dataRangesNotEmpty || indexRangesNotEmpty { + timeRangesEmpty = false + break + } + } + if timeRangesEmpty { + // Return empty result with no unfulfilled ranges. + return bootstrap.NewNamespaceResults(namespaces), nil + } + results := bootstrap.NamespaceResults{ Results: bootstrap.NewNamespaceResultsMap(bootstrap.NamespaceResultsMapOptions{}), } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/readers.go b/src/dbnode/storage/bootstrap/bootstrapper/readers.go index 75ecc52b38..16e93c61a2 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/readers.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/readers.go @@ -25,6 +25,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/bootstrap" @@ -112,7 +113,7 @@ func newShardReaders( logger *zap.Logger, ) ShardReaders { readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), - ns.ID(), shard, fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions()) + ns.ID(), shard, fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions(), persist.FileSetFlushType) if len(readInfoFilesResults) == 0 { // No readers. return ShardReaders{} diff --git a/src/dbnode/storage/bootstrap/process.go b/src/dbnode/storage/bootstrap/process.go index e25cc94ed6..f875918fae 100644 --- a/src/dbnode/storage/bootstrap/process.go +++ b/src/dbnode/storage/bootstrap/process.go @@ -169,7 +169,10 @@ func (b bootstrapProcess) Run( idxopts := namespace.Metadata.Options().IndexOptions() dataRanges := b.targetRangesForData(at, ropts) indexRanges := b.targetRangesForIndex(at, ropts, idxopts) - + firstRanges := b.newShardTimeRanges( + dataRanges.firstRangeWithPersistTrue.Range, + namespace.Shards, + ) namespacesRunFirst.Namespaces.Set(namespace.Metadata.ID(), Namespace{ Metadata: namespace.Metadata, Shards: namespace.Shards, @@ -178,16 +181,18 @@ func (b bootstrapProcess) Run( DataTargetRange: dataRanges.firstRangeWithPersistTrue, IndexTargetRange: indexRanges.firstRangeWithPersistTrue, DataRunOptions: NamespaceRunOptions{ - ShardTimeRanges: b.newShardTimeRanges( - dataRanges.firstRangeWithPersistTrue.Range, namespace.Shards), - RunOptions: dataRanges.firstRangeWithPersistTrue.RunOptions, + ShardTimeRanges: firstRanges.Copy(), + TargetShardTimeRanges: firstRanges.Copy(), + RunOptions: dataRanges.firstRangeWithPersistTrue.RunOptions, }, IndexRunOptions: NamespaceRunOptions{ - ShardTimeRanges: b.newShardTimeRanges( - indexRanges.firstRangeWithPersistTrue.Range, namespace.Shards), - RunOptions: indexRanges.firstRangeWithPersistTrue.RunOptions, + ShardTimeRanges: firstRanges.Copy(), + TargetShardTimeRanges: firstRanges.Copy(), + RunOptions: indexRanges.firstRangeWithPersistTrue.RunOptions, }, }) + secondRanges := b.newShardTimeRanges( + dataRanges.secondRangeWithPersistFalse.Range, namespace.Shards) namespacesRunSecond.Namespaces.Set(namespace.Metadata.ID(), Namespace{ Metadata: namespace.Metadata, Shards: namespace.Shards, @@ -196,14 +201,14 @@ func (b bootstrapProcess) Run( DataTargetRange: dataRanges.secondRangeWithPersistFalse, IndexTargetRange: indexRanges.secondRangeWithPersistFalse, DataRunOptions: NamespaceRunOptions{ - ShardTimeRanges: b.newShardTimeRanges( - dataRanges.secondRangeWithPersistFalse.Range, namespace.Shards), - RunOptions: dataRanges.secondRangeWithPersistFalse.RunOptions, + ShardTimeRanges: secondRanges.Copy(), + TargetShardTimeRanges: secondRanges.Copy(), + RunOptions: dataRanges.secondRangeWithPersistFalse.RunOptions, }, IndexRunOptions: NamespaceRunOptions{ - ShardTimeRanges: b.newShardTimeRanges( - indexRanges.secondRangeWithPersistFalse.Range, namespace.Shards), - RunOptions: indexRanges.secondRangeWithPersistFalse.RunOptions, + ShardTimeRanges: secondRanges.Copy(), + TargetShardTimeRanges: secondRanges.Copy(), + RunOptions: indexRanges.secondRangeWithPersistFalse.RunOptions, }, }) } diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index 7c5aef631c..f6bb4b932b 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -224,8 +224,13 @@ type Namespace struct { // NamespaceRunOptions are the run options for a bootstrap process run. type NamespaceRunOptions struct { // ShardTimeRanges are the time ranges for the shards that should be fulfilled - // by the bootstrapper. + // by the bootstrapper. This changes each bootstrapper pass as time ranges are fulfilled. ShardTimeRanges result.ShardTimeRanges + // TargetShardTimeRanges are the original target time ranges for shards and does not change + // each bootstrapper pass. + // NB(bodu): This is used by the commit log bootstrapper as it needs to run for the entire original + // target shard time ranges. + TargetShardTimeRanges result.ShardTimeRanges // RunOptions are the run options for the bootstrap run. RunOptions RunOptions } diff --git a/src/dbnode/storage/bootstrap/util.go b/src/dbnode/storage/bootstrap/util.go index 6f4c03e807..560b9fc48f 100644 --- a/src/dbnode/storage/bootstrap/util.go +++ b/src/dbnode/storage/bootstrap/util.go @@ -378,12 +378,14 @@ func BuildNamespacesTesterWithReaderIteratorPool( Shards: shards, DataAccumulator: acc, DataRunOptions: NamespaceRunOptions{ - ShardTimeRanges: ranges.Copy(), - RunOptions: runOpts, + ShardTimeRanges: ranges.Copy(), + TargetShardTimeRanges: ranges.Copy(), + RunOptions: runOpts, }, IndexRunOptions: NamespaceRunOptions{ - ShardTimeRanges: ranges.Copy(), - RunOptions: runOpts, + ShardTimeRanges: ranges.Copy(), + TargetShardTimeRanges: ranges.Copy(), + RunOptions: runOpts, }, }) } diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 901ecf29be..95337e996f 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -58,7 +58,7 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(namespaces, nil) m := NewMockdatabaseMediator(ctrl) - m.EXPECT().DisableFileOps() + m.EXPECT().DisableFileOpsAndWait() m.EXPECT().EnableFileOps().AnyTimes() bsm := newBootstrapManager(db, m, opts).(*bootstrapManager) @@ -101,7 +101,7 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) { })) m := NewMockdatabaseMediator(ctrl) - m.EXPECT().DisableFileOps() + m.EXPECT().DisableFileOpsAndWait() m.EXPECT().EnableFileOps().AnyTimes() db := NewMockdatabase(ctrl) @@ -159,7 +159,7 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) { })) m := NewMockdatabaseMediator(ctrl) - m.EXPECT().DisableFileOps() + m.EXPECT().DisableFileOpsAndWait() m.EXPECT().EnableFileOps().AnyTimes() db := NewMockdatabase(ctrl) diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 7bdc74fe26..d91db51329 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -70,13 +70,15 @@ type cleanupManager struct { deleteFilesFn deleteFilesFn deleteInactiveDirectoriesFn deleteInactiveDirectoriesFn - cleanupInProgress bool + warmFlushCleanupInProgress bool + coldFlushCleanupInProgress bool metrics cleanupManagerMetrics logger *zap.Logger } type cleanupManagerMetrics struct { - status tally.Gauge + warmFlushCleanupStatus tally.Gauge + coldFlushCleanupStatus tally.Gauge corruptCommitlogFile tally.Counter corruptSnapshotFile tally.Counter corruptSnapshotMetadataFile tally.Counter @@ -90,7 +92,8 @@ func newCleanupManagerMetrics(scope tally.Scope) cleanupManagerMetrics { sScope := scope.SubScope("snapshot") smScope := scope.SubScope("snapshot-metadata") return cleanupManagerMetrics{ - status: scope.Gauge("cleanup"), + warmFlushCleanupStatus: scope.Gauge("warm-flush-cleanup"), + coldFlushCleanupStatus: scope.Gauge("cold-flush-cleanup"), corruptCommitlogFile: clScope.Counter("corrupt"), corruptSnapshotFile: sScope.Counter("corrupt"), corruptSnapshotMetadataFile: smScope.Counter("corrupt"), @@ -124,7 +127,7 @@ func newCleanupManager( } } -func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { +func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { // Don't perform any cleanup if we are not boostrapped yet. if !isBootstrapped { m.logger.Debug("database is still bootstrapping, terminating cleanup") @@ -132,12 +135,12 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { } m.Lock() - m.cleanupInProgress = true + m.warmFlushCleanupInProgress = true m.Unlock() defer func() { m.Lock() - m.cleanupInProgress = false + m.warmFlushCleanupInProgress = false m.Unlock() }() @@ -147,11 +150,6 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { } multiErr := xerrors.NewMultiError() - if err := m.cleanupDataFiles(t, namespaces); err != nil { - multiErr = multiErr.Add(fmt.Errorf( - "encountered errors when cleaning up data files for %v: %v", t, err)) - } - if err := m.cleanupExpiredIndexFiles(t, namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when cleaning up index files for %v: %v", t, err)) @@ -162,11 +160,6 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { "encountered errors when cleaning up index files for %v: %v", t, err)) } - if err := m.deleteInactiveDataFiles(namespaces); err != nil { - multiErr = multiErr.Add(fmt.Errorf( - "encountered errors when deleting inactive data files for %v: %v", t, err)) - } - if err := m.deleteInactiveDataSnapshotFiles(namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when deleting inactive snapshot files for %v: %v", t, err)) @@ -185,15 +178,57 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { return multiErr.FinalError() } +func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { + // Don't perform any cleanup if we are not boostrapped yet. + if !isBootstrapped { + m.logger.Debug("database is still bootstrapping, terminating cleanup") + return nil + } + + m.Lock() + m.coldFlushCleanupInProgress = true + m.Unlock() + + defer func() { + m.Lock() + m.coldFlushCleanupInProgress = false + m.Unlock() + }() + + namespaces, err := m.database.OwnedNamespaces() + if err != nil { + return err + } + + multiErr := xerrors.NewMultiError() + if err := m.cleanupDataFiles(t, namespaces); err != nil { + multiErr = multiErr.Add(fmt.Errorf( + "encountered errors when cleaning up data files for %v: %v", t, err)) + } + + if err := m.deleteInactiveDataFiles(namespaces); err != nil { + multiErr = multiErr.Add(fmt.Errorf( + "encountered errors when deleting inactive data files for %v: %v", t, err)) + } + + return multiErr.FinalError() +} func (m *cleanupManager) Report() { m.RLock() - cleanupInProgress := m.cleanupInProgress + coldFlushCleanupInProgress := m.coldFlushCleanupInProgress + warmFlushCleanupInProgress := m.warmFlushCleanupInProgress m.RUnlock() - if cleanupInProgress { - m.metrics.status.Update(1) + if coldFlushCleanupInProgress { + m.metrics.coldFlushCleanupStatus.Update(1) + } else { + m.metrics.coldFlushCleanupStatus.Update(0) + } + + if warmFlushCleanupInProgress { + m.metrics.warmFlushCleanupStatus.Update(1) } else { - m.metrics.status.Update(0) + m.metrics.warmFlushCleanupStatus.Update(0) } } diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index ca8fe33e72..acc15dc251 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/retention" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" xtest "github.com/m3db/m3/src/x/test" @@ -317,7 +318,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) { return nil } - err := mgr.Cleanup(ts, true) + err := cleanup(mgr, ts, true) if tc.expectErr { require.Error(t, err) } else { @@ -360,7 +361,7 @@ func TestCleanupManagerNamespaceCleanupBootstrapped(t *testing.T) { mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) idx.EXPECT().CleanupExpiredFileSets(ts).Return(nil) idx.EXPECT().CleanupDuplicateFileSets().Return(nil) - require.NoError(t, mgr.Cleanup(ts, true)) + require.NoError(t, cleanup(mgr, ts, true)) } func TestCleanupManagerNamespaceCleanupNotBootstrapped(t *testing.T) { @@ -389,7 +390,7 @@ func TestCleanupManagerNamespaceCleanupNotBootstrapped(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(nses, nil).AnyTimes() mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) - require.NoError(t, mgr.Cleanup(ts, false)) + require.NoError(t, cleanup(mgr, ts, false)) } // Test NS doesn't cleanup when flag is present @@ -422,7 +423,7 @@ func TestCleanupManagerDoesntNeedCleanup(t *testing.T) { return nil } - require.NoError(t, mgr.Cleanup(ts, true)) + require.NoError(t, cleanup(mgr, ts, true)) } func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { @@ -448,7 +449,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(namespaces, nil).AnyTimes() mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) - require.NoError(t, mgr.Cleanup(ts, true)) + require.NoError(t, cleanup(mgr, ts, true)) } type deleteInactiveDirectoriesCall struct { @@ -487,7 +488,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) { } mgr.deleteInactiveDirectoriesFn = deleteInactiveDirectoriesFn - require.NoError(t, mgr.Cleanup(ts, true)) + require.NoError(t, cleanup(mgr, ts, true)) expectedCalls := []deleteInactiveDirectoriesCall{ deleteInactiveDirectoriesCall{ @@ -532,7 +533,7 @@ func TestCleanupManagerPropagatesOwnedNamespacesError(t *testing.T) { require.NoError(t, db.Open()) require.NoError(t, db.Terminate()) - require.Error(t, mgr.Cleanup(ts, true)) + require.Error(t, cleanup(mgr, ts, true)) } func timeFor(s int64) time.Time { @@ -556,3 +557,14 @@ func newFakeActiveLogs(activeLogs persist.CommitLogFiles) fakeActiveLogs { activeLogs: activeLogs, } } + +func cleanup( + mgr databaseCleanupManager, + t time.Time, + isBootstrapped bool, +) error { + multiErr := xerrors.NewMultiError() + multiErr = multiErr.Add(mgr.WarmFlushCleanup(t, isBootstrapped)) + multiErr = multiErr.Add(mgr.ColdFlushCleanup(t, isBootstrapped)) + return multiErr.FinalError() +} diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go new file mode 100644 index 0000000000..17913c4530 --- /dev/null +++ b/src/dbnode/storage/coldflush.go @@ -0,0 +1,199 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "sync" + "time" + + "github.com/m3db/m3/src/dbnode/persist" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + + "github.com/uber-go/tally" + "go.uber.org/zap" +) + +type coldFlushManager struct { + databaseCleanupManager + sync.RWMutex + + log *zap.Logger + database database + pm persist.Manager + opts Options + // Retain using fileOpStatus here to be consistent w/ the + // filesystem manager since both are filesystem processes. + status fileOpStatus + isColdFlushing tally.Gauge + enabled bool +} + +func newColdFlushManager( + database database, + pm persist.Manager, + opts Options, +) databaseColdFlushManager { + instrumentOpts := opts.InstrumentOptions() + scope := instrumentOpts.MetricsScope().SubScope("fs") + // NB(bodu): cold flush cleanup doesn't require commit logs. + cm := newCleanupManager(database, nil, scope) + + return &coldFlushManager{ + databaseCleanupManager: cm, + log: instrumentOpts.Logger(), + database: database, + pm: pm, + opts: opts, + status: fileOpNotStarted, + isColdFlushing: scope.Gauge("cold-flush"), + enabled: true, + } +} + +func (m *coldFlushManager) Disable() fileOpStatus { + m.Lock() + status := m.status + m.enabled = false + m.Unlock() + return status +} + +func (m *coldFlushManager) Enable() fileOpStatus { + m.Lock() + status := m.status + m.enabled = true + m.Unlock() + return status +} + +func (m *coldFlushManager) Status() fileOpStatus { + m.RLock() + status := m.status + m.RUnlock() + return status +} + +func (m *coldFlushManager) Run(t time.Time) bool { + m.Lock() + if !m.shouldRunWithLock() { + m.Unlock() + return false + } + m.status = fileOpInProgress + m.Unlock() + + // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. + // NB(r): Use invariant here since flush errors were introduced + // and not caught in CI or integration tests. + // When an invariant occurs in CI tests it panics so as to fail + // the build. + if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil { + instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err)) + }) + } + if err := m.trackedColdFlush(); err != nil { + instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err)) + }) + } + m.Lock() + m.status = fileOpNotStarted + m.Unlock() + return true +} + +func (m *coldFlushManager) trackedColdFlush() error { + // The cold flush process will persist any data that has been "loaded" into memory via + // the Load() API but has not yet been persisted durably. As a result, if the cold flush + // process completes without error, then we want to "decrement" the number of tracked bytes + // by however many were outstanding right before the cold flush began. + // + // For example: + // t0: Load 100 bytes --> (numLoadedBytes == 100, numPendingLoadedBytes == 0) + // t1: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 100, numPendingLoadedBytes == 100) + // t2: Load 200 bytes --> (numLoadedBytes == 300, numPendingLoadedBytes == 100) + // t3: ColdFlushStart() + // t4: Load 300 bytes --> (numLoadedBytes == 600, numPendingLoadedBytes == 100) + // t5: ColdFlushEnd() + // t6: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 500, numPendingLoadedBytes == 0) + // t7: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) + // t8: ColdFlushStart() + // t9: ColdFlushError() + // t10: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) + // t11: ColdFlushStart() + // t12: ColdFlushEnd() + // t13: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 0, numPendingLoadedBytes == 0) + memTracker := m.opts.MemoryTracker() + memTracker.MarkLoadedAsPending() + + if err := m.coldFlush(); err != nil { + return err + } + + // Only decrement if the cold flush was a success. In this case, the decrement will reduce the + // value by however many bytes had been tracked when the cold flush began. + memTracker.DecPendingLoadedBytes() + return nil +} + +func (m *coldFlushManager) coldFlush() error { + namespaces, err := m.database.OwnedNamespaces() + if err != nil { + return err + } + + flushPersist, err := m.pm.StartFlushPersist() + if err != nil { + return err + } + + multiErr := xerrors.NewMultiError() + for _, ns := range namespaces { + if err = ns.ColdFlush(flushPersist); err != nil { + multiErr = multiErr.Add(err) + } + } + + multiErr = multiErr.Add(flushPersist.DoneFlush()) + err = multiErr.FinalError() + return err +} + +func (m *coldFlushManager) Report() { + m.databaseCleanupManager.Report() + + m.RLock() + status := m.status + m.RUnlock() + if status == fileOpInProgress { + m.isColdFlushing.Update(1) + } else { + m.isColdFlushing.Update(0) + } +} + +func (m *coldFlushManager) shouldRunWithLock() bool { + return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrapped() +} diff --git a/src/dbnode/storage/coldflush_test.go b/src/dbnode/storage/coldflush_test.go new file mode 100644 index 0000000000..55eb3d355c --- /dev/null +++ b/src/dbnode/storage/coldflush_test.go @@ -0,0 +1,118 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "errors" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/m3db/m3/src/dbnode/persist" + "github.com/stretchr/testify/require" +) + +func TestColdFlushManagerFlushAlreadyInProgress(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + mockPersistManager = persist.NewMockManager(ctrl) + mockFlushPersist = persist.NewMockFlushPreparer(ctrl) + + // Channels used to coordinate cold flushing + startCh = make(chan struct{}, 1) + doneCh = make(chan struct{}, 1) + ) + defer func() { + close(startCh) + close(doneCh) + }() + + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Do(func() { + startCh <- struct{}{} + <-doneCh + }).Return(mockFlushPersist, nil) + + testOpts := DefaultTestOptions().SetPersistManager(mockPersistManager) + db := newMockdatabase(ctrl) + db.EXPECT().Options().Return(testOpts).AnyTimes() + db.EXPECT().IsBootstrapped().Return(true).AnyTimes() + db.EXPECT().OwnedNamespaces().Return(nil, nil).AnyTimes() + + cfm := newColdFlushManager(db, mockPersistManager, testOpts).(*coldFlushManager) + cfm.pm = mockPersistManager + + var ( + wg sync.WaitGroup + now = time.Unix(0, 0) + ) + wg.Add(2) + + // Goroutine 1 should successfully flush. + go func() { + defer wg.Done() + require.True(t, cfm.Run(now)) + }() + + // Goroutine 2 should indicate already flushing. + go func() { + defer wg.Done() + + // Wait until we start the cold flushing process. + <-startCh + + // Ensure it doesn't allow a parallel flush. + require.False(t, cfm.Run(now)) + + // Allow the cold flush to finish. + doneCh <- struct{}{} + }() + + wg.Wait() + +} + +func TestColdFlushManagerFlushDoneFlushError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + fakeErr = errors.New("fake error while marking flush done") + mockPersistManager = persist.NewMockManager(ctrl) + mockFlushPersist = persist.NewMockFlushPreparer(ctrl) + ) + + mockFlushPersist.EXPECT().DoneFlush().Return(fakeErr) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) + + testOpts := DefaultTestOptions().SetPersistManager(mockPersistManager) + db := newMockdatabase(ctrl) + db.EXPECT().Options().Return(testOpts).AnyTimes() + db.EXPECT().OwnedNamespaces().Return(nil, nil) + + cfm := newColdFlushManager(db, mockPersistManager, testOpts).(*coldFlushManager) + cfm.pm = mockPersistManager + + require.EqualError(t, fakeErr, cfm.coldFlush().Error()) +} diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 653f98d4c7..fbef19f04f 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -561,8 +561,9 @@ func (d *db) terminateWithLock() error { } func (d *db) Terminate() error { - // NB(bodu): Wait for fs processes to finish. - d.mediator.WaitForFileSystemProcesses() + // NB(bodu): Disable file ops waits for current fs processes to + // finish before disabling. + d.mediator.DisableFileOpsAndWait() d.Lock() defer d.Unlock() @@ -571,8 +572,9 @@ func (d *db) Terminate() error { } func (d *db) Close() error { - // NB(bodu): Wait for fs processes to finish. - d.mediator.WaitForFileSystemProcesses() + // NB(bodu): Disable file ops waits for current fs processes to + // finish before disabling. + d.mediator.DisableFileOpsAndWait() d.Lock() defer d.Unlock() diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index f323d7adc9..73ee135117 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/retention" @@ -33,6 +34,7 @@ import ( "github.com/pborman/uuid" "github.com/uber-go/tally" + "go.uber.org/zap" ) var ( @@ -47,11 +49,34 @@ const ( // when we haven't begun either a flush or snapshot. flushManagerNotIdle flushManagerFlushInProgress - flushManagerColdFlushInProgress flushManagerSnapshotInProgress flushManagerIndexFlushInProgress ) +type flushManagerMetrics struct { + isFlushing tally.Gauge + isSnapshotting tally.Gauge + isIndexFlushing tally.Gauge + // This is a "debug" metric for making sure that the snapshotting process + // is not overly aggressive. + maxBlocksSnapshottedByNamespace tally.Gauge + dataWarmFlushDuration tally.Timer + dataSnapshotDuration tally.Timer + indexFlushDuration tally.Timer +} + +func newFlushManagerMetrics(scope tally.Scope) flushManagerMetrics { + return flushManagerMetrics{ + isFlushing: scope.Gauge("flush"), + isSnapshotting: scope.Gauge("snapshot"), + isIndexFlushing: scope.Gauge("index-flush"), + maxBlocksSnapshottedByNamespace: scope.Gauge("max-blocks-snapshotted-by-namespace"), + dataWarmFlushDuration: scope.Timer("data-warm-flush-duration"), + dataSnapshotDuration: scope.Timer("data-snapshot-duration"), + indexFlushDuration: scope.Timer("index-flush-duration"), + } +} + type flushManager struct { sync.RWMutex @@ -62,16 +87,12 @@ type flushManager struct { // state is used to protect the flush manager against concurrent use, // while flushInProgress and snapshotInProgress are more granular and // are used for emitting granular gauges. - state flushManagerState - isFlushing tally.Gauge - isColdFlushing tally.Gauge - isSnapshotting tally.Gauge - isIndexFlushing tally.Gauge - // This is a "debug" metric for making sure that the snapshotting process - // is not overly aggressive. - maxBlocksSnapshottedByNamespace tally.Gauge + state flushManagerState + metrics flushManagerMetrics lastSuccessfulSnapshotStartTime time.Time + logger *zap.Logger + nowFn clock.NowFn } func newFlushManager( @@ -81,15 +102,13 @@ func newFlushManager( ) databaseFlushManager { opts := database.Options() return &flushManager{ - database: database, - commitlog: commitlog, - opts: opts, - pm: opts.PersistManager(), - isFlushing: scope.Gauge("flush"), - isColdFlushing: scope.Gauge("cold-flush"), - isSnapshotting: scope.Gauge("snapshot"), - isIndexFlushing: scope.Gauge("index-flush"), - maxBlocksSnapshottedByNamespace: scope.Gauge("max-blocks-snapshotted-by-namespace"), + database: database, + commitlog: commitlog, + opts: opts, + pm: opts.PersistManager(), + metrics: newFlushManagerMetrics(scope), + logger: opts.InstrumentOptions().Logger(), + nowFn: opts.ClockOptions().NowFn(), } } @@ -110,16 +129,16 @@ func (m *flushManager) Flush(startTime time.Time) error { return err } - // Perform three separate loops through all the namespaces so that we can + // Perform two separate loops through all the namespaces so that we can // emit better gauges, i.e. all the flushing for all the namespaces happens - // at once, then all the cold flushes, then all the snapshotting. This is + // at once then all the snapshotting. This is // also slightly better semantically because flushing should take priority - // over cold flushes and snapshotting. + // over snapshotting. // // In addition, we need to make sure that for any given shard/blockStart - // combination, we attempt a flush and then a cold flush before a snapshot - // as the snapshotting process will attempt to snapshot any unflushed blocks - // which would be wasteful if the block is already flushable. + // combination, we attempt a flush before a snapshot as the snapshotting process + // will attempt to snapshot blocks w/ unflushed data which would be wasteful if + // the block is already flushable. multiErr := xerrors.NewMultiError() if err = m.dataWarmFlush(namespaces, startTime); err != nil { multiErr = multiErr.Add(err) @@ -127,44 +146,6 @@ func (m *flushManager) Flush(startTime time.Time) error { rotatedCommitlogID, err := m.commitlog.RotateLogs() if err == nil { - // The cold flush process will persist any data that has been "loaded" into memory via - // the Load() API but has not yet been persisted durably. As a result, if the cold flush - // process completes without error, then we want to "decrement" the number of tracked bytes - // by however many were outstanding right before the cold flush began. - // - // For example: - // t0: Load 100 bytes --> (numLoadedBytes == 100, numPendingLoadedBytes == 0) - // t1: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 100, numPendingLoadedBytes == 100) - // t2: Load 200 bytes --> (numLoadedBytes == 300, numPendingLoadedBytes == 100) - // t3: ColdFlushStart() - // t4: Load 300 bytes --> (numLoadedBytes == 600, numPendingLoadedBytes == 100) - // t5: ColdFlushEnd() - // t6: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 500, numPendingLoadedBytes == 0) - // t7: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) - // t8: ColdFlushStart() - // t9: ColdFlushError() - // t10: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) - // t11: ColdFlushStart() - // t12: ColdFlushEnd() - // t13: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 0, numPendingLoadedBytes == 0) - memTracker := m.opts.MemoryTracker() - memTracker.MarkLoadedAsPending() - if err = m.dataColdFlush(namespaces); err != nil { - multiErr = multiErr.Add(err) - // If cold flush fails, we can't proceed to snapshotting because - // commit log cleanup logic uses the presence of a successful - // snapshot checkpoint file to determine which commit log files are - // safe to delete. Therefore if a cold flush fails and a snapshot - // succeeds, the writes from the failed cold flush might be lost - // when commit logs get cleaned up, leaving the node in an undurable - // state such that if it restarted, it would not be able to recover - // the cold writes from its commit log. - return multiErr.FinalError() - } - // Only decrement if the cold flush was a success. In this case, the decrement will reduce the - // value by however many bytes had been tracked when the cold flush began. - memTracker.DecPendingLoadedBytes() - if err = m.dataSnapshot(namespaces, startTime, rotatedCommitlogID); err != nil { multiErr = multiErr.Add(err) } @@ -189,7 +170,10 @@ func (m *flushManager) dataWarmFlush( } m.setState(flushManagerFlushInProgress) - multiErr := xerrors.NewMultiError() + var ( + start = m.nowFn() + multiErr = xerrors.NewMultiError() + ) for _, ns := range namespaces { // Flush first because we will only snapshot if there are no outstanding flushes. flushTimes, err := m.namespaceFlushTimes(ns, startTime) @@ -208,30 +192,7 @@ func (m *flushManager) dataWarmFlush( multiErr = multiErr.Add(err) } - return multiErr.FinalError() -} - -func (m *flushManager) dataColdFlush( - namespaces []databaseNamespace, -) error { - flushPersist, err := m.pm.StartFlushPersist() - if err != nil { - return err - } - - m.setState(flushManagerColdFlushInProgress) - multiErr := xerrors.NewMultiError() - for _, ns := range namespaces { - if err = ns.ColdFlush(flushPersist); err != nil { - multiErr = multiErr.Add(err) - } - } - - err = flushPersist.DoneFlush() - if err != nil { - multiErr = multiErr.Add(err) - } - + m.metrics.dataWarmFlushDuration.Record(m.nowFn().Sub(start)) return multiErr.FinalError() } @@ -249,6 +210,7 @@ func (m *flushManager) dataSnapshot( m.setState(flushManagerSnapshotInProgress) var ( + start = m.nowFn() maxBlocksSnapshottedByNamespace = 0 multiErr = xerrors.NewMultiError() ) @@ -278,7 +240,7 @@ func (m *flushManager) dataSnapshot( } } } - m.maxBlocksSnapshottedByNamespace.Update(float64(maxBlocksSnapshottedByNamespace)) + m.metrics.maxBlocksSnapshottedByNamespace.Update(float64(maxBlocksSnapshottedByNamespace)) err = snapshotPersist.DoneSnapshot(snapshotID, rotatedCommitlogID) multiErr = multiErr.Add(err) @@ -287,6 +249,7 @@ func (m *flushManager) dataSnapshot( if finalErr == nil { m.lastSuccessfulSnapshotStartTime = startTime } + m.metrics.dataSnapshotDuration.Record(m.nowFn().Sub(start)) return finalErr } @@ -299,7 +262,10 @@ func (m *flushManager) indexFlush( } m.setState(flushManagerIndexFlushInProgress) - multiErr := xerrors.NewMultiError() + var ( + start = m.nowFn() + multiErr = xerrors.NewMultiError() + ) for _, ns := range namespaces { var ( indexOpts = ns.Options().IndexOptions() @@ -312,6 +278,7 @@ func (m *flushManager) indexFlush( } multiErr = multiErr.Add(indexFlush.DoneIndex()) + m.metrics.indexFlushDuration.Record(m.nowFn().Sub(start)) return multiErr.FinalError() } @@ -321,27 +288,21 @@ func (m *flushManager) Report() { m.RUnlock() if state == flushManagerFlushInProgress { - m.isFlushing.Update(1) - } else { - m.isFlushing.Update(0) - } - - if state == flushManagerColdFlushInProgress { - m.isColdFlushing.Update(1) + m.metrics.isFlushing.Update(1) } else { - m.isColdFlushing.Update(0) + m.metrics.isFlushing.Update(0) } if state == flushManagerSnapshotInProgress { - m.isSnapshotting.Update(1) + m.metrics.isSnapshotting.Update(1) } else { - m.isSnapshotting.Update(0) + m.metrics.isSnapshotting.Update(0) } if state == flushManagerIndexFlushInProgress { - m.isIndexFlushing.Update(1) + m.metrics.isIndexFlushing.Update(1) } else { - m.isIndexFlushing.Update(0) + m.metrics.isIndexFlushing.Update(0) } } @@ -392,13 +353,8 @@ func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Ti candidateTimes := timesInRange(earliest, latest, blockSize) var loopErr error return filterTimes(candidateTimes, func(t time.Time) bool { - // Snapshot anything that is unflushed. - needsFlush, err := ns.NeedsFlush(t, t) - if err != nil { - loopErr = err - return false - } - return needsFlush + // NB(bodu): Snapshot everything since to account for cold writes/blocks. + return true }), loopErr } diff --git a/src/dbnode/storage/flush_test.go b/src/dbnode/storage/flush_test.go index c909229839..1e271a7b3f 100644 --- a/src/dbnode/storage/flush_test.go +++ b/src/dbnode/storage/flush_test.go @@ -141,17 +141,12 @@ func TestFlushManagerFlushAlreadyInProgress(t *testing.T) { // Allow the flush to finish. doneCh <- struct{}{} - // Wait until we start the compaction process. + // Allow the snapshot to begin and finish. <-startCh // Ensure it doesn't allow a parallel flush. require.Equal(t, errFlushOperationsInProgress, fm.Flush(now)) - // Allow the compaction to finish. - doneCh <- struct{}{} - - // Allow the snapshot to begin and finish. - <-startCh doneCh <- struct{}{} }() @@ -171,11 +166,8 @@ func TestFlushManagerFlushDoneFlushError(t *testing.T) { mockSnapshotPersist = persist.NewMockSnapshotPreparer(ctrl) ) - gomock.InOrder( - mockFlushPersist.EXPECT().DoneFlush().Return(fakeErr), - mockFlushPersist.EXPECT().DoneFlush().Return(nil), - ) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(fakeErr) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -213,8 +205,8 @@ func TestFlushManagerNamespaceFlushTimesErr(t *testing.T) { ) // Make sure DoneFlush is called despite encountering an error, once for snapshot and once for warm flush. - mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -232,7 +224,6 @@ func TestFlushManagerNamespaceFlushTimesErr(t *testing.T) { ns.EXPECT().Options().Return(nsOpts).AnyTimes() ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, fakeErr).AnyTimes() - ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() db.EXPECT().OwnedNamespaces().Return([]databaseNamespace{ns}, nil) @@ -259,8 +250,8 @@ func TestFlushManagerFlushDoneSnapshotError(t *testing.T) { mockSnapshotPersist = persist.NewMockSnapshotPreparer(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(fakeErr) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -294,8 +285,8 @@ func TestFlushManagerFlushDoneIndexError(t *testing.T) { mockPersistManager = persist.NewMockManager(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -330,7 +321,6 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() var ( @@ -339,8 +329,8 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) { mockPersistManager = persist.NewMockManager(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -374,7 +364,6 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().FlushIndex(gomock.Any()).Return(nil) @@ -384,8 +373,8 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) { mockPersistManager = persist.NewMockManager(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -553,13 +542,10 @@ func TestFlushManagerFlushSnapshot(t *testing.T) { ns.EXPECT().NeedsFlush(st, st).Return(false, nil) } - ns.EXPECT().ColdFlush(gomock.Any()) - snapshotEnd := now.Add(bufferFuture).Truncate(blockSize) num = numIntervals(start, snapshotEnd, blockSize) for i := 0; i < num; i++ { st := start.Add(time.Duration(i) * blockSize) - ns.EXPECT().NeedsFlush(st, st).Return(true, nil) ns.EXPECT().Snapshot(st, now, gomock.Any()) } } diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index 44cd043c4a..0fd7509982 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -158,7 +158,7 @@ func (m *fileSystemManager) Run( // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.Cleanup(t, m.database.IsBootstrapped()); err != nil { + if err := m.WarmFlushCleanup(t, m.database.IsBootstrapped()); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) diff --git a/src/dbnode/storage/fs_test.go b/src/dbnode/storage/fs_test.go index 4832808737..17bff01cc6 100644 --- a/src/dbnode/storage/fs_test.go +++ b/src/dbnode/storage/fs_test.go @@ -85,7 +85,7 @@ func TestFileSystemManagerRun(t *testing.T) { ts := time.Now() gomock.InOrder( - cm.EXPECT().Cleanup(ts, true).Return(errors.New("foo")), + cm.EXPECT().WarmFlushCleanup(ts, true).Return(errors.New("foo")), fm.EXPECT().Flush(ts).Return(errors.New("bar")), ) diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index 26b27f151c..f1462a82d2 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -26,6 +26,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/x/instrument" @@ -34,21 +35,16 @@ import ( ) type ( - mediatorState int - fileSystemProcessesState int + mediatorState int ) const ( fileOpCheckInterval = time.Second - tickCheckInterval = 5 * time.Second fileSystemProcessesCheckInterval = 100 * time.Millisecond mediatorNotOpen mediatorState = iota mediatorOpen mediatorClosed - - fileSystemProcessesIdle fileSystemProcessesState = iota - fileSystemProcessesBusy ) var ( @@ -80,17 +76,18 @@ type mediator struct { database database databaseBootstrapManager databaseFileSystemManager + databaseColdFlushManager databaseTickManager databaseRepairer - opts Options - nowFn clock.NowFn - sleepFn sleepFn - metrics mediatorMetrics - state mediatorState - fileSystemProcessesState fileSystemProcessesState - mediatorTimeBarrier mediatorTimeBarrier - closedCh chan struct{} + opts Options + nowFn clock.NowFn + sleepFn sleepFn + metrics mediatorMetrics + state mediatorState + mediatorTimeBarrier mediatorTimeBarrier + closedCh chan struct{} + tickInterval time.Duration } // TODO(r): Consider renaming "databaseMediator" to "databaseCoordinator" @@ -102,23 +99,32 @@ func newMediator(database database, commitlog commitlog.CommitLog, opts Options) nowFn = opts.ClockOptions().NowFn() ) d := &mediator{ - database: database, - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - metrics: newMediatorMetrics(scope), - state: mediatorNotOpen, - fileSystemProcessesState: fileSystemProcessesIdle, - mediatorTimeBarrier: newMediatorTimeBarrier(nowFn, iOpts), - closedCh: make(chan struct{}), + database: database, + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + sleepFn: time.Sleep, + metrics: newMediatorMetrics(scope), + state: mediatorNotOpen, + mediatorTimeBarrier: newMediatorTimeBarrier(nowFn, iOpts), + closedCh: make(chan struct{}), + tickInterval: opts.MediatorTickInterval(), } fsm := newFileSystemManager(database, commitlog, opts) d.databaseFileSystemManager = fsm + // NB(bodu): Cold flush needs its own persist manager now + // that its running in its own thread. + fsOpts := opts.CommitLogOptions().FilesystemOptions() + pm, err := fs.NewPersistManager(fsOpts) + if err != nil { + return nil, err + } + cfm := newColdFlushManager(database, pm, opts) + d.databaseColdFlushManager = cfm + d.databaseRepairer = newNoopDatabaseRepairer() if opts.RepairEnabled() { - var err error d.databaseRepairer, err = newDatabaseRepairer(database, opts) if err != nil { return nil, err @@ -139,39 +145,39 @@ func (m *mediator) Open() error { m.state = mediatorOpen go m.reportLoop() go m.ongoingFileSystemProcesses() + go m.ongoingColdFlushProcesses() go m.ongoingTick() m.databaseRepairer.Start() return nil } -func (m *mediator) DisableFileOps() { +func (m *mediator) DisableFileOpsAndWait() { status := m.databaseFileSystemManager.Disable() for status == fileOpInProgress { m.sleepFn(fileOpCheckInterval) status = m.databaseFileSystemManager.Status() } + // Even though the cold flush runs separately, its still + // considered a fs process. + status = m.databaseColdFlushManager.Disable() + for status == fileOpInProgress { + m.sleepFn(fileOpCheckInterval) + status = m.databaseColdFlushManager.Status() + } } func (m *mediator) EnableFileOps() { m.databaseFileSystemManager.Enable() + // Even though the cold flush runs separately, its still + // considered a fs process. + m.databaseColdFlushManager.Enable() } func (m *mediator) Report() { m.databaseBootstrapManager.Report() m.databaseRepairer.Report() m.databaseFileSystemManager.Report() -} - -func (m *mediator) WaitForFileSystemProcesses() { - m.RLock() - fileSystemProcessesState := m.fileSystemProcessesState - m.RUnlock() - for fileSystemProcessesState == fileSystemProcessesBusy { - m.sleepFn(fileSystemProcessesCheckInterval) - m.RLock() - fileSystemProcessesState = m.fileSystemProcessesState - m.RUnlock() - } + m.databaseColdFlushManager.Report() } func (m *mediator) Close() error { @@ -189,7 +195,7 @@ func (m *mediator) Close() error { return nil } -// The mediator mediates the relationship between ticks and flushes(warm and cold)/snapshots/cleanups. +// The mediator mediates the relationship between ticks and warm flushes/snapshots. // // For example, the requirements to perform a flush are: // 1) currentTime > blockStart.Add(blockSize).Add(bufferPast) @@ -209,7 +215,7 @@ func (m *mediator) ongoingFileSystemProcesses() { case <-m.closedCh: return default: - m.sleepFn(tickCheckInterval) + m.sleepFn(m.tickInterval) // Check if the mediator is already closed. if !m.isOpen() { @@ -221,6 +227,27 @@ func (m *mediator) ongoingFileSystemProcesses() { } } +// The mediator mediates the relationship between ticks and cold flushes/cleanup the same way it does for warm flushes/snapshots. +// We want to begin each cold/warm flush with an in sync view of time as a tick. +// NB(bodu): Cold flushes and cleanup have been separated out into it's own thread to avoid blocking snapshots. +func (m *mediator) ongoingColdFlushProcesses() { + for { + select { + case <-m.closedCh: + return + default: + m.sleepFn(m.tickInterval) + + // Check if the mediator is already closed. + if !m.isOpen() { + return + } + + m.runColdFlushProcesses() + } + } +} + func (m *mediator) ongoingTick() { var ( log = m.opts.InstrumentOptions().Logger() @@ -231,7 +258,7 @@ func (m *mediator) ongoingTick() { case <-m.closedCh: return default: - m.sleepFn(tickCheckInterval) + m.sleepFn(m.tickInterval) // Check if the mediator is already closed. if !m.isOpen() { @@ -256,15 +283,6 @@ func (m *mediator) ongoingTick() { } func (m *mediator) runFileSystemProcesses() { - m.Lock() - m.fileSystemProcessesState = fileSystemProcessesBusy - m.Unlock() - defer func() { - m.Lock() - m.fileSystemProcessesState = fileSystemProcessesIdle - m.Unlock() - }() - // See comment over mediatorTimeBarrier for an explanation of this logic. log := m.opts.InstrumentOptions().Logger() mediatorTime, err := m.mediatorTimeBarrier.fsProcessesWait() @@ -276,6 +294,18 @@ func (m *mediator) runFileSystemProcesses() { m.databaseFileSystemManager.Run(mediatorTime, syncRun, noForce) } +func (m *mediator) runColdFlushProcesses() { + // See comment over mediatorTimeBarrier for an explanation of this logic. + log := m.opts.InstrumentOptions().Logger() + mediatorTime, err := m.mediatorTimeBarrier.fsProcessesWait() + if err != nil { + log.Error("error within ongoingColdFlushProcesses waiting for next mediatorTime", zap.Error(err)) + return + } + + m.databaseColdFlushManager.Run(mediatorTime) +} + func (m *mediator) reportLoop() { interval := m.opts.InstrumentOptions().ReportInterval() t := time.NewTicker(interval) @@ -314,42 +344,65 @@ func (m *mediator) isOpen() bool { // This means that once a run of filesystem processes completes it will always have to wait until the currently // executing tick completes before performing the next run, but in practice this should not be much of an issue. // -// ____________ ___________ -// | Flush (t0) | | Tick (t0) | -// | | | | -// | | |___________| -// | | ___________ -// | | | Tick (t0) | -// | | | | -// | | |___________| -// | | ___________ -// |____________| | Tick (t0) | -// barrier.wait() | | -// |___________| -// mediatorTime = t1 -// barrier.release() -// ------------------------------------- -// ____________ ___________ -// | Flush (t1) | | Tick (t1) | -// | | | | -// | | |___________| -// | | ___________ -// | | | Tick (t1) | -// | | | | +// Additionally, an independent cold flush process complicates this a bit more in that we have more than one filesystem +// process waiting on the mediator barrier. The invariant here is that both warm and cold flushes always start on a tick +// with a consistent view of time as the tick it is on. They don't necessarily need to start on the same tick. See the +// diagram below for an example case. +// +// ____________ ___________ _________________ +// | Flush (t0) | | Tick (t0) | | Cold Flush (t0) | +// | | | | | | +// | | |___________| | | +// | | ___________ | | +// | | | Tick (t0) | | | +// | | | | | | +// | | |___________| | | +// | | ___________ | | +// |____________| | Tick (t0) | | | +// barrier.wait() | | | | +// |___________| | | +// mediatorTime = t1 | | +// barrier.release() | | +// ____________ ___________ | | +// | Flush (t1) | | Tick (t1) | |_________________| +// | | | | barrier.wait() // | | |___________| -// | | ___________ -// |____________| | Tick (t1) | -// barrier.wait() | | -// |___________| -// barrier.release() -// ------------------------------------ +// | | mediatorTime = t2 +// | | barrier.release() +// | | ___________ _________________ +// | | | Tick (t2) | | Cold Flush (t2) | +// |____________| | | | | +// barrier.wait() |___________| | | +// mediatorTime = t3 | | +// barrier.release() | | +// ____________ ___________ | | +// | Flush (t3) | | Tick (t3) | | | +// | | | | | | +// | | |___________| | | +// | | ___________ | | +// | | | Tick (t3) | | | +// | | | | | | +// | | |___________| | | +// | | ___________ | | +// |____________| | Tick (t3) | |_________________| +// barrier.wait() | | barrier.wait() +// |___________| +// mediatorTime = t4 +// barrier.release() +// ____________ ___________ _________________ +// | Flush (t4) | | Tick (t4) | | Cold Flush (t4) | +// | | | | | | +// ------------------------------------------------------------ type mediatorTimeBarrier struct { sync.Mutex - mediatorTime time.Time - nowFn func() time.Time - iOpts instrument.Options - fsProcessesWaiting bool - releaseCh chan time.Time + // Both mediatorTime and numFsProcessesWaiting are protected + // by the mutex. + mediatorTime time.Time + numFsProcessesWaiting int + + nowFn func() time.Time + iOpts instrument.Options + releaseCh chan time.Time } // initialMediatorTime should only be used to obtain the initial time for @@ -363,28 +416,24 @@ func (b *mediatorTimeBarrier) initialMediatorTime() time.Time { func (b *mediatorTimeBarrier) fsProcessesWait() (time.Time, error) { b.Lock() - if b.fsProcessesWaiting { - b.Unlock() - return time.Time{}, errMediatorTimeBarrierAlreadyWaiting - } - b.fsProcessesWaiting = true + b.numFsProcessesWaiting++ b.Unlock() t := <-b.releaseCh b.Lock() - b.fsProcessesWaiting = false + b.numFsProcessesWaiting-- b.Unlock() return t, nil } func (b *mediatorTimeBarrier) maybeRelease() (time.Time, error) { b.Lock() - hasWaiter := b.fsProcessesWaiting + numWaiters := b.numFsProcessesWaiting mediatorTime := b.mediatorTime b.Unlock() - if !hasWaiter { + if numWaiters == 0 { // If there isn't a waiter yet then the filesystem processes may still // be ongoing in which case we don't want to release the barrier / update // the current time yet. Allow the tick to run again with the same time @@ -405,7 +454,9 @@ func (b *mediatorTimeBarrier) maybeRelease() (time.Time, error) { } b.mediatorTime = newMediatorTime - b.releaseCh <- b.mediatorTime + for i := 0; i < numWaiters; i++ { + b.releaseCh <- b.mediatorTime + } return b.mediatorTime, nil } diff --git a/src/dbnode/storage/mediator_test.go b/src/dbnode/storage/mediator_test.go index 76a7e4f78d..14056d4fca 100644 --- a/src/dbnode/storage/mediator_test.go +++ b/src/dbnode/storage/mediator_test.go @@ -56,7 +56,7 @@ func TestDatabaseMediatorOpenClose(t *testing.T) { require.Equal(t, errMediatorAlreadyClosed, m.Close()) } -func TestDatabaseMediatorDisableFileOps(t *testing.T) { +func TestDatabaseMediatorDisableFileOpsAndWait(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -86,6 +86,6 @@ func TestDatabaseMediatorDisableFileOps(t *testing.T) { fsm.EXPECT().Status().Return(fileOpNotStarted), ) - m.DisableFileOps() + m.DisableFileOpsAndWait() require.Equal(t, 3, len(slept)) } diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index bfbc758f9c..3bff2271fe 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -75,6 +75,8 @@ const ( // defaultNumLoadedBytesLimit is the default limit (2GiB) for the number of outstanding loaded bytes that // the memory tracker will allow. defaultNumLoadedBytesLimit = 2 << 30 + + defaultMediatorTickInterval = 5 * time.Second ) var ( @@ -162,6 +164,7 @@ type options struct { mmapReporter mmap.Reporter doNotIndexWithFieldsMap map[string]string namespaceRuntimeOptsMgrRegistry namespace.RuntimeOptionsManagerRegistry + mediatorTickInterval time.Duration } // NewOptions creates a new set of storage options with defaults @@ -234,6 +237,7 @@ func newOptions(poolOpts pool.ObjectPoolOptions) Options { onColdFlush: &noOpColdFlush{}, memoryTracker: NewMemoryTracker(NewMemoryTrackerOptions(defaultNumLoadedBytesLimit)), namespaceRuntimeOptsMgrRegistry: namespace.NewRuntimeOptionsManagerRegistry(), + mediatorTickInterval: defaultMediatorTickInterval, } return o.SetEncodingM3TSZPooled() } @@ -798,6 +802,16 @@ func (o *options) NamespaceRuntimeOptionsManagerRegistry() namespace.RuntimeOpti return o.namespaceRuntimeOptsMgrRegistry } +func (o *options) SetMediatorTickInterval(value time.Duration) Options { + opts := *o + opts.mediatorTickInterval = value + return &opts +} + +func (o *options) MediatorTickInterval() time.Duration { + return o.mediatorTickInterval +} + type noOpColdFlush struct{} func (n *noOpColdFlush) ColdFlushNamespace(ns Namespace) (OnColdFlushNamespace, error) { diff --git a/src/dbnode/storage/series/buffer.go b/src/dbnode/storage/series/buffer.go index af8c742fcf..7784ed0d52 100644 --- a/src/dbnode/storage/series/buffer.go +++ b/src/dbnode/storage/series/buffer.go @@ -54,7 +54,6 @@ const ( var ( timeZero time.Time errIncompleteMerge = errors.New("bucket merge did not result in only one encoder") - logger, _ = zap.NewProduction() ) const ( @@ -133,6 +132,8 @@ type databaseBuffer interface { IsEmpty() bool + IsEmptyAtBlockStart(time.Time) bool + ColdFlushBlockStarts(blockStates map[xtime.UnixNano]BlockState) OptimizedTimes Stats() bufferStats @@ -417,6 +418,14 @@ func (b *dbBuffer) IsEmpty() bool { return len(b.bucketsMap) == 0 } +func (b *dbBuffer) IsEmptyAtBlockStart(start time.Time) bool { + bv, exists := b.bucketVersionsAt(start) + if !exists { + return true + } + return bv.streamsLen() == 0 +} + func (b *dbBuffer) ColdFlushBlockStarts(blockStates map[xtime.UnixNano]BlockState) OptimizedTimes { var times OptimizedTimes @@ -594,6 +603,7 @@ func (b *dbBuffer) Snapshot( } checksum := segment.CalculateChecksum() + return persistFn(metadata, segment, checksum) } diff --git a/src/dbnode/storage/series/buffer_mock.go b/src/dbnode/storage/series/buffer_mock.go index a215f611f4..f242c84f93 100644 --- a/src/dbnode/storage/series/buffer_mock.go +++ b/src/dbnode/storage/series/buffer_mock.go @@ -194,6 +194,20 @@ func (mr *MockdatabaseBufferMockRecorder) IsEmpty() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsEmpty", reflect.TypeOf((*MockdatabaseBuffer)(nil).IsEmpty)) } +// IsEmptyAtBlockStart mocks base method +func (m *MockdatabaseBuffer) IsEmptyAtBlockStart(arg0 time.Time) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsEmptyAtBlockStart", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsEmptyAtBlockStart indicates an expected call of IsEmptyAtBlockStart +func (mr *MockdatabaseBufferMockRecorder) IsEmptyAtBlockStart(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsEmptyAtBlockStart", reflect.TypeOf((*MockdatabaseBuffer)(nil).IsEmptyAtBlockStart), arg0) +} + // ColdFlushBlockStarts mocks base method func (m *MockdatabaseBuffer) ColdFlushBlockStarts(blockStates map[time0.UnixNano]BlockState) OptimizedTimes { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index 4e5931ca67..23b8ecaa0a 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -297,6 +297,13 @@ func (s *dbSeries) IsEmpty() bool { return false } +func (s *dbSeries) IsBufferEmptyAtBlockStart(blockStart time.Time) bool { + s.RLock() + bufferEmpty := s.buffer.IsEmptyAtBlockStart(blockStart) + s.RUnlock() + return bufferEmpty +} + func (s *dbSeries) NumActiveBlocks() int { s.RLock() value := s.cachedBlocks.Len() + s.buffer.Stats().wiredBlocks diff --git a/src/dbnode/storage/series/series_mock.go b/src/dbnode/storage/series/series_mock.go index c6fc05e145..9ffca2a59d 100644 --- a/src/dbnode/storage/series/series_mock.go +++ b/src/dbnode/storage/series/series_mock.go @@ -163,6 +163,20 @@ func (mr *MockDatabaseSeriesMockRecorder) ID() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockDatabaseSeries)(nil).ID)) } +// IsBufferEmptyAtBlockStart mocks base method +func (m *MockDatabaseSeries) IsBufferEmptyAtBlockStart(arg0 time.Time) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsBufferEmptyAtBlockStart", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsBufferEmptyAtBlockStart indicates an expected call of IsBufferEmptyAtBlockStart +func (mr *MockDatabaseSeriesMockRecorder) IsBufferEmptyAtBlockStart(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsBufferEmptyAtBlockStart", reflect.TypeOf((*MockDatabaseSeries)(nil).IsBufferEmptyAtBlockStart), arg0) +} + // IsEmpty mocks base method func (m *MockDatabaseSeries) IsEmpty() bool { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/series/types.go b/src/dbnode/storage/series/types.go index 0d5f627183..d279979f8e 100644 --- a/src/dbnode/storage/series/types.go +++ b/src/dbnode/storage/series/types.go @@ -109,9 +109,13 @@ type DatabaseSeries interface { opts FetchBlocksMetadataOptions, ) (block.FetchBlocksMetadataResult, error) - // IsEmpty returns whether series is empty. + // IsEmpty returns whether series is empty (includes both cached blocks and in-mem buffer data). IsEmpty() bool + // IsBufferEmptyAtBlockStart returns whether the series buffer is empty at block start + // (only checks for in-mem buffer data). + IsBufferEmptyAtBlockStart(time.Time) bool + // NumActiveBlocks returns the number of active blocks the series currently holds. NumActiveBlocks() int diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index ab8dd5dfb2..5ec330434c 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -251,11 +251,16 @@ type shardFlushState struct { sync.RWMutex statesByTime map[xtime.UnixNano]fileOpState initialized bool + + // NB(bodu): Cache state on whether we snapshotted last or not to avoid + // going to disk to see if filesets are empty. + emptySnapshotOnDiskByTime map[xtime.UnixNano]bool } func newShardFlushState() shardFlushState { return shardFlushState{ - statesByTime: make(map[xtime.UnixNano]fileOpState), + statesByTime: make(map[xtime.UnixNano]fileOpState), + emptySnapshotOnDiskByTime: make(map[xtime.UnixNano]bool), } } @@ -1937,7 +1942,7 @@ func (s *dbShard) initializeFlushStates() { func (s *dbShard) UpdateFlushStates() { fsOpts := s.opts.CommitLogOptions().FilesystemOptions() readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), s.namespace.ID(), s.shard, - fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions()) + fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions(), persist.FileSetFlushType) for _, result := range readInfoFilesResults { if err := result.Err.Error(); err != nil { @@ -2370,8 +2375,29 @@ func (s *dbShard) Snapshot( s.RUnlock() return errShardNotBootstrappedToSnapshot } + s.RUnlock() + var needsSnapshot bool + s.forEachShardEntry(func(entry *lookup.Entry) bool { + if !entry.Series.IsBufferEmptyAtBlockStart(blockStart) { + needsSnapshot = true + return false + } + return true + }) + // Only terminate early when we would be over-writing an empty snapshot fileset on disk. + // TODO(bodu): We could bootstrap empty snapshot state in the bs path to avoid doing extra + // snapshotting work after a bootstrap since this cached state gets cleared. + s.flushState.RLock() + // NB(bodu): This always defaults to false if the record does not exist. + emptySnapshotOnDisk := s.flushState.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] + s.flushState.RUnlock() + + if !needsSnapshot && emptySnapshotOnDisk { + return nil + } + var multiErr xerrors.MultiError prepareOpts := persist.DataPrepareOptions{ @@ -2418,7 +2444,24 @@ func (s *dbShard) Snapshot( multiErr = multiErr.Add(err) } - return multiErr.FinalError() + if err := multiErr.FinalError(); err != nil { + return err + } + + // Only update cached snapshot state if we successfully flushed data to disk. + s.flushState.Lock() + if needsSnapshot { + s.flushState.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] = false + } else { + // NB(bodu): If we flushed an empty snapshot to disk, it means that the previous + // snapshot on disk was not empty (or we just bootstrapped and cached state was lost). + // The snapshot we just flushed may or may not have data, although whatever data we flushed + // would be recoverable from the rotate commit log as well. + s.flushState.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] = true + } + s.flushState.Unlock() + + return nil } func (s *dbShard) FlushState(blockStart time.Time) (fileOpState, error) { diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 55e0bb46c6..0ea3546cd8 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -829,6 +829,7 @@ func TestShardSnapshotSeriesSnapshotSuccess(t *testing.T) { series := series.NewMockDatabaseSeries(ctrl) series.EXPECT().ID().Return(ident.StringID("foo" + strconv.Itoa(i))).AnyTimes() series.EXPECT().IsEmpty().Return(false).AnyTimes() + series.EXPECT().IsBufferEmptyAtBlockStart(blockStart).Return(false).AnyTimes() series.EXPECT(). Snapshot(gomock.Any(), blockStart, gomock.Any(), gomock.Any()). Do(func(context.Context, time.Time, persist.DataFn, namespace.Context) { diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 864655e539..7e3098a116 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2542,18 +2542,32 @@ func (m *MockdatabaseCleanupManager) EXPECT() *MockdatabaseCleanupManagerMockRec return m.recorder } -// Cleanup mocks base method -func (m *MockdatabaseCleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { +// WarmFlushCleanup mocks base method +func (m *MockdatabaseCleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Cleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "WarmFlushCleanup", t, isBootstrapped) ret0, _ := ret[0].(error) return ret0 } -// Cleanup indicates an expected call of Cleanup -func (mr *MockdatabaseCleanupManagerMockRecorder) Cleanup(t, isBootstrapped interface{}) *gomock.Call { +// WarmFlushCleanup indicates an expected call of WarmFlushCleanup +func (mr *MockdatabaseCleanupManagerMockRecorder) WarmFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).Cleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).WarmFlushCleanup), t, isBootstrapped) +} + +// ColdFlushCleanup mocks base method +func (m *MockdatabaseCleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ColdFlushCleanup", t, isBootstrapped) + ret0, _ := ret[0].(error) + return ret0 +} + +// ColdFlushCleanup indicates an expected call of ColdFlushCleanup +func (mr *MockdatabaseCleanupManagerMockRecorder) ColdFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).ColdFlushCleanup), t, isBootstrapped) } // Report mocks base method @@ -2591,20 +2605,6 @@ func (m *MockdatabaseFileSystemManager) EXPECT() *MockdatabaseFileSystemManagerM return m.recorder } -// Cleanup mocks base method -func (m *MockdatabaseFileSystemManager) Cleanup(t time.Time, isBootstrapped bool) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Cleanup", t, isBootstrapped) - ret0, _ := ret[0].(error) - return ret0 -} - -// Cleanup indicates an expected call of Cleanup -func (mr *MockdatabaseFileSystemManagerMockRecorder) Cleanup(t, isBootstrapped interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cleanup", reflect.TypeOf((*MockdatabaseFileSystemManager)(nil).Cleanup), t, isBootstrapped) -} - // Flush mocks base method func (m *MockdatabaseFileSystemManager) Flush(t time.Time) error { m.ctrl.T.Helper() @@ -2702,6 +2702,125 @@ func (mr *MockdatabaseFileSystemManagerMockRecorder) LastSuccessfulSnapshotStart return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastSuccessfulSnapshotStartTime", reflect.TypeOf((*MockdatabaseFileSystemManager)(nil).LastSuccessfulSnapshotStartTime)) } +// MockdatabaseColdFlushManager is a mock of databaseColdFlushManager interface +type MockdatabaseColdFlushManager struct { + ctrl *gomock.Controller + recorder *MockdatabaseColdFlushManagerMockRecorder +} + +// MockdatabaseColdFlushManagerMockRecorder is the mock recorder for MockdatabaseColdFlushManager +type MockdatabaseColdFlushManagerMockRecorder struct { + mock *MockdatabaseColdFlushManager +} + +// NewMockdatabaseColdFlushManager creates a new mock instance +func NewMockdatabaseColdFlushManager(ctrl *gomock.Controller) *MockdatabaseColdFlushManager { + mock := &MockdatabaseColdFlushManager{ctrl: ctrl} + mock.recorder = &MockdatabaseColdFlushManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockdatabaseColdFlushManager) EXPECT() *MockdatabaseColdFlushManagerMockRecorder { + return m.recorder +} + +// WarmFlushCleanup mocks base method +func (m *MockdatabaseColdFlushManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WarmFlushCleanup", t, isBootstrapped) + ret0, _ := ret[0].(error) + return ret0 +} + +// WarmFlushCleanup indicates an expected call of WarmFlushCleanup +func (mr *MockdatabaseColdFlushManagerMockRecorder) WarmFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).WarmFlushCleanup), t, isBootstrapped) +} + +// ColdFlushCleanup mocks base method +func (m *MockdatabaseColdFlushManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ColdFlushCleanup", t, isBootstrapped) + ret0, _ := ret[0].(error) + return ret0 +} + +// ColdFlushCleanup indicates an expected call of ColdFlushCleanup +func (mr *MockdatabaseColdFlushManagerMockRecorder) ColdFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).ColdFlushCleanup), t, isBootstrapped) +} + +// Report mocks base method +func (m *MockdatabaseColdFlushManager) Report() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Report") +} + +// Report indicates an expected call of Report +func (mr *MockdatabaseColdFlushManagerMockRecorder) Report() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Report", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Report)) +} + +// Disable mocks base method +func (m *MockdatabaseColdFlushManager) Disable() fileOpStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Disable") + ret0, _ := ret[0].(fileOpStatus) + return ret0 +} + +// Disable indicates an expected call of Disable +func (mr *MockdatabaseColdFlushManagerMockRecorder) Disable() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disable", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Disable)) +} + +// Enable mocks base method +func (m *MockdatabaseColdFlushManager) Enable() fileOpStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Enable") + ret0, _ := ret[0].(fileOpStatus) + return ret0 +} + +// Enable indicates an expected call of Enable +func (mr *MockdatabaseColdFlushManagerMockRecorder) Enable() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enable", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Enable)) +} + +// Status mocks base method +func (m *MockdatabaseColdFlushManager) Status() fileOpStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Status") + ret0, _ := ret[0].(fileOpStatus) + return ret0 +} + +// Status indicates an expected call of Status +func (mr *MockdatabaseColdFlushManagerMockRecorder) Status() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Status", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Status)) +} + +// Run mocks base method +func (m *MockdatabaseColdFlushManager) Run(t time.Time) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run", t) + ret0, _ := ret[0].(bool) + return ret0 +} + +// Run indicates an expected call of Run +func (mr *MockdatabaseColdFlushManagerMockRecorder) Run(t interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Run), t) +} + // MockdatabaseShardRepairer is a mock of databaseShardRepairer interface type MockdatabaseShardRepairer struct { ctrl *gomock.Controller @@ -2945,16 +3064,16 @@ func (mr *MockdatabaseMediatorMockRecorder) Bootstrap() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseMediator)(nil).Bootstrap)) } -// DisableFileOps mocks base method -func (m *MockdatabaseMediator) DisableFileOps() { +// DisableFileOpsAndWait mocks base method +func (m *MockdatabaseMediator) DisableFileOpsAndWait() { m.ctrl.T.Helper() - m.ctrl.Call(m, "DisableFileOps") + m.ctrl.Call(m, "DisableFileOpsAndWait") } -// DisableFileOps indicates an expected call of DisableFileOps -func (mr *MockdatabaseMediatorMockRecorder) DisableFileOps() *gomock.Call { +// DisableFileOpsAndWait indicates an expected call of DisableFileOpsAndWait +func (mr *MockdatabaseMediatorMockRecorder) DisableFileOpsAndWait() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableFileOps", reflect.TypeOf((*MockdatabaseMediator)(nil).DisableFileOps)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableFileOpsAndWait", reflect.TypeOf((*MockdatabaseMediator)(nil).DisableFileOpsAndWait)) } // EnableFileOps mocks base method @@ -2983,18 +3102,6 @@ func (mr *MockdatabaseMediatorMockRecorder) Tick(forceType, startTime interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockdatabaseMediator)(nil).Tick), forceType, startTime) } -// WaitForFileSystemProcesses mocks base method -func (m *MockdatabaseMediator) WaitForFileSystemProcesses() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "WaitForFileSystemProcesses") -} - -// WaitForFileSystemProcesses indicates an expected call of WaitForFileSystemProcesses -func (mr *MockdatabaseMediatorMockRecorder) WaitForFileSystemProcesses() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForFileSystemProcesses", reflect.TypeOf((*MockdatabaseMediator)(nil).WaitForFileSystemProcesses)) -} - // Repair mocks base method func (m *MockdatabaseMediator) Repair() error { m.ctrl.T.Helper() @@ -4338,6 +4445,34 @@ func (mr *MockOptionsMockRecorder) NamespaceRuntimeOptionsManagerRegistry() *gom return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NamespaceRuntimeOptionsManagerRegistry", reflect.TypeOf((*MockOptions)(nil).NamespaceRuntimeOptionsManagerRegistry)) } +// SetMediatorTickInterval mocks base method +func (m *MockOptions) SetMediatorTickInterval(value time.Duration) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetMediatorTickInterval", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetMediatorTickInterval indicates an expected call of SetMediatorTickInterval +func (mr *MockOptionsMockRecorder) SetMediatorTickInterval(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetMediatorTickInterval", reflect.TypeOf((*MockOptions)(nil).SetMediatorTickInterval), value) +} + +// MediatorTickInterval mocks base method +func (m *MockOptions) MediatorTickInterval() time.Duration { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MediatorTickInterval") + ret0, _ := ret[0].(time.Duration) + return ret0 +} + +// MediatorTickInterval indicates an expected call of MediatorTickInterval +func (mr *MockOptionsMockRecorder) MediatorTickInterval() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MediatorTickInterval", reflect.TypeOf((*MockOptions)(nil).MediatorTickInterval)) +} + // MockMemoryTracker is a mock of MemoryTracker interface type MockMemoryTracker struct { ctrl *gomock.Controller diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 68a77e2038..328a24ec8c 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -753,9 +753,14 @@ type databaseFlushManager interface { } // databaseCleanupManager manages cleaning up persistent storage space. +// NB(bodu): We have to separate flush methods since we separated out flushing into warm/cold flush +// and cleaning up certain types of data concurrently w/ either can be problematic. type databaseCleanupManager interface { - // Cleanup cleans up data not needed in the persistent storage. - Cleanup(t time.Time, isBootstrapped bool) error + // WarmFlushCleanup cleans up data not needed in the persistent storage before a warm flush. + WarmFlushCleanup(t time.Time, isBootstrapped bool) error + + // ColdFlushCleanup cleans up data not needed in the persistent storage before a cold flush. + ColdFlushCleanup(t time.Time, isBootstrapped bool) error // Report reports runtime information. Report() @@ -763,9 +768,6 @@ type databaseCleanupManager interface { // databaseFileSystemManager manages the database related filesystem activities. type databaseFileSystemManager interface { - // Cleanup cleans up data not needed in the persistent storage. - Cleanup(t time.Time, isBootstrapped bool) error - // Flush flushes in-memory data to persistent storage. Flush(t time.Time) error @@ -795,6 +797,25 @@ type databaseFileSystemManager interface { LastSuccessfulSnapshotStartTime() (time.Time, bool) } +// databaseColdFlushManager manages the database related cold flush activities. +type databaseColdFlushManager interface { + databaseCleanupManager + + // Disable disables the cold flush manager and prevents it from + // performing file operations, returns the current file operation status. + Disable() fileOpStatus + + // Enable enables the cold flush manager to perform file operations. + Enable() fileOpStatus + + // Status returns the file operation status. + Status() fileOpStatus + + // Run attempts to perform all cold flush related operations, + // returning true if those operations are performed, and false otherwise. + Run(t time.Time) bool +} + // databaseShardRepairer repairs in-memory data for a shard. type databaseShardRepairer interface { // Options returns the repair options. @@ -848,8 +869,8 @@ type databaseMediator interface { // Bootstrap bootstraps the database with file operations performed at the end. Bootstrap() (BootstrapResult, error) - // DisableFileOps disables file operations. - DisableFileOps() + // DisableFileOpsAndWait disables file operations. + DisableFileOpsAndWait() // EnableFileOps enables file operations. EnableFileOps() @@ -857,9 +878,6 @@ type databaseMediator interface { // Tick performs a tick. Tick(forceType forceType, startTime time.Time) error - // WaitForFileSystemProcesses waits for any ongoing fs processes to finish. - WaitForFileSystemProcesses() - // Repair repairs the database. Repair() error @@ -1153,6 +1171,12 @@ type Options interface { // NamespaceRuntimeOptionsManagerRegistry returns the namespace runtime options manager. NamespaceRuntimeOptionsManagerRegistry() namespace.RuntimeOptionsManagerRegistry + + // SetMediatorTickInterval sets the ticking interval for the medidator. + SetMediatorTickInterval(value time.Duration) Options + + // MediatorTickInterval returns the ticking interval for the mediator. + MediatorTickInterval() time.Duration } // MemoryTracker tracks memory.