diff --git a/src/dbnode/integration/disk_flush_helpers.go b/src/dbnode/integration/disk_flush_helpers.go index 7d3c87d400..ce954a036c 100644 --- a/src/dbnode/integration/disk_flush_helpers.go +++ b/src/dbnode/integration/disk_flush_helpers.go @@ -40,6 +40,7 @@ import ( "github.com/m3db/m3/src/x/ident/testutil" xtime "github.com/m3db/m3/src/x/time" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" ) @@ -84,7 +85,8 @@ func waitUntilSnapshotFilesFlushed( namespace ident.ID, expectedSnapshots []snapshotID, timeout time.Duration, -) error { +) (uuid.UUID, error) { + var snapshotID uuid.UUID dataFlushed := func() bool { for _, shard := range shardSet.AllIDs() { for _, e := range expectedSnapshots { @@ -102,14 +104,19 @@ func waitUntilSnapshotFilesFlushed( if !(latest.ID.VolumeIndex >= e.minVolume) { return false } + + _, snapshotID, err = latest.SnapshotTimeAndID() + if err != nil { + panic(err) + } } } return true } if waitUntil(dataFlushed, timeout) { - return nil + return snapshotID, nil } - return errDiskFlushTimedOut + return snapshotID, errDiskFlushTimedOut } func waitUntilDataFilesFlushed( diff --git a/src/dbnode/integration/disk_snapshot_test.go b/src/dbnode/integration/disk_snapshot_test.go index f6aed5eeb4..5b7d886339 100644 --- a/src/dbnode/integration/disk_snapshot_test.go +++ b/src/dbnode/integration/disk_snapshot_test.go @@ -152,8 +152,8 @@ func TestDiskSnapshotSimple(t *testing.T) { maxWaitTime := time.Minute for i, ns := range testSetup.Namespaces() { log.Info("waiting for snapshot files to flush") - require.NoError(t, waitUntilSnapshotFilesFlushed( - filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime)) + _, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime) + require.NoError(t, err) log.Info("verifying snapshot files") verifySnapshottedDataFiles(t, shardSet, testSetup.StorageOpts(), ns.ID(), seriesMaps) } @@ -167,15 +167,17 @@ func TestDiskSnapshotSimple(t *testing.T) { for _, ns := range testSetup.Namespaces() { log.Info("waiting for new snapshot files to be written out") snapshotsToWaitFor := []snapshotID{{blockStart: newTime.Truncate(blockSize)}} - require.NoError(t, waitUntilSnapshotFilesFlushed( - filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)) + // NB(bodu): We need to check if a specific snapshot ID was deleted since snapshotting logic now changed + // to always snapshotting every block start w/in retention. + snapshotID, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime) + require.NoError(t, err) log.Info("waiting for old snapshot files to be deleted") for _, shard := range shardSet.All() { waitUntil(func() bool { // Increase the time each check to ensure that the filesystem processes are able to progress (some // of them throttle themselves based on time elapsed since the previous time.) testSetup.SetNowFn(testSetup.NowFn()().Add(10 * time.Second)) - exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), shard.ID(), oldTime.Truncate(blockSize)) + exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), snapshotID, shard.ID(), oldTime.Truncate(blockSize)) require.NoError(t, err) return !exists }, maxWaitTime) diff --git a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go index 78341889e9..7e209dee63 100644 --- a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go +++ b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go @@ -29,8 +29,8 @@ import ( "testing" "time" - "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/x/context" xtime "github.com/m3db/m3/src/x/time" "go.uber.org/zap" @@ -228,7 +228,7 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) { } else { snapshotBlock = now.Truncate(ns1BlockSize).Add(-ns1BlockSize) } - err := waitUntilSnapshotFilesFlushed( + _, err := waitUntilSnapshotFilesFlushed( filePathPrefix, setup.ShardSet(), nsID, diff --git a/src/dbnode/persist/fs/files.go b/src/dbnode/persist/fs/files.go index d09714a562..8919d6158a 100644 --- a/src/dbnode/persist/fs/files.go +++ b/src/dbnode/persist/fs/files.go @@ -1441,17 +1441,32 @@ func DataFileSetExists( } // SnapshotFileSetExistsAt determines whether snapshot fileset files exist for the given namespace, shard, and block start time. -func SnapshotFileSetExistsAt(prefix string, namespace ident.ID, shard uint32, blockStart time.Time) (bool, error) { +func SnapshotFileSetExistsAt( + prefix string, + namespace ident.ID, + snapshotID uuid.UUID, + shard uint32, + blockStart time.Time, +) (bool, error) { snapshotFiles, err := SnapshotFiles(prefix, namespace, shard) if err != nil { return false, err } - _, ok := snapshotFiles.LatestVolumeForBlock(blockStart) + latest, ok := snapshotFiles.LatestVolumeForBlock(blockStart) if !ok { return false, nil } + _, latestSnapshotID, err := latest.SnapshotTimeAndID() + if err != nil { + return false, err + } + + if !uuid.Equal(latestSnapshotID, snapshotID) { + return false, nil + } + // LatestVolumeForBlock checks for a complete checkpoint file, so we don't // need to recheck it here. return true, nil diff --git a/src/dbnode/persist/fs/files_test.go b/src/dbnode/persist/fs/files_test.go index c34f162558..bb2cb23dab 100644 --- a/src/dbnode/persist/fs/files_test.go +++ b/src/dbnode/persist/fs/files_test.go @@ -889,7 +889,7 @@ func TestSnapshotFileSetExistsAt(t *testing.T) { writeOutTestSnapshot(t, dir, shard, ts, 0) - exists, err := SnapshotFileSetExistsAt(dir, testNs1ID, shard, ts) + exists, err := SnapshotFileSetExistsAt(dir, testNs1ID, testSnapshotID, shard, ts) require.NoError(t, err) require.True(t, exists) } diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index a730d829c4..846791067f 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -141,7 +141,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // NB(xichen): disable filesystem manager before we bootstrap to minimize // the impact of file operations on bootstrapping performance - m.mediator.DisableFileOps() + m.mediator.DisableFileOpsAndWait() defer m.mediator.EnableFileOps() // Keep performing bootstraps until none pending and no error returned. diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index be0a2e2a6a..3ed656c2b7 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -677,6 +677,30 @@ func (s *commitLogSource) bootstrapShardSnapshots( blockSize time.Duration, mostRecentCompleteSnapshotByBlockShard map[xtime.UnixNano]map[uint32]fs.FileSetFile, ) error { + // NB(bodu): We use info files on disk to check if a snapshot should be loaded in as cold or warm. + // We do this instead of cross refing blockstarts and current time to handle the case of bootstrapping a + // once warm block start after a node has been shut down for a long time. We consider all block starts we + // haven't flushed data for yet a warm block start. + fsOpts := s.opts.CommitLogOptions().FilesystemOptions() + readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), ns.ID(), shard, + fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions()) + shardBlockStartsOnDisk := make(map[xtime.UnixNano]struct{}) + for _, result := range readInfoFilesResults { + if err := result.Err.Error(); err != nil { + // If we couldn't read the info files then keep going to be consistent + // with the way the db shard updates its flush states in UpdateFlushStates(). + s.log.Error("unable to read info files in commit log bootstrap", + zap.Uint32("shard", shard), + zap.Stringer("namespace", ns.ID()), + zap.String("filepath", result.Err.Filepath()), + zap.Error(err)) + continue + } + info := result.Info + at := xtime.FromNanoseconds(info.BlockStart) + shardBlockStartsOnDisk[xtime.ToUnixNano(at)] = struct{}{} + } + rangeIter := shardTimeRanges.Iter() for rangeIter.Next() { var ( @@ -709,9 +733,13 @@ func (s *commitLogSource) bootstrapShardSnapshots( continue } + writeType := series.WarmWrite + if _, ok := shardBlockStartsOnDisk[xtime.ToUnixNano(blockStart)]; ok { + writeType = series.ColdWrite + } if err := s.bootstrapShardBlockSnapshot( ns, accumulator, shard, blockStart, blockSize, - mostRecentCompleteSnapshotForShardBlock); err != nil { + mostRecentCompleteSnapshotForShardBlock, writeType); err != nil { return err } } @@ -727,6 +755,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( blockStart time.Time, blockSize time.Duration, mostRecentCompleteSnapshot fs.FileSetFile, + writeType series.WriteType, ) error { var ( bOpts = s.opts.ResultOptions() @@ -806,7 +835,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( } // Load into series. - if err := ref.Series.LoadBlock(dbBlock, series.WarmWrite); err != nil { + if err := ref.Series.LoadBlock(dbBlock, writeType); err != nil { return err } diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 901ecf29be..95337e996f 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -58,7 +58,7 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(namespaces, nil) m := NewMockdatabaseMediator(ctrl) - m.EXPECT().DisableFileOps() + m.EXPECT().DisableFileOpsAndWait() m.EXPECT().EnableFileOps().AnyTimes() bsm := newBootstrapManager(db, m, opts).(*bootstrapManager) @@ -101,7 +101,7 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) { })) m := NewMockdatabaseMediator(ctrl) - m.EXPECT().DisableFileOps() + m.EXPECT().DisableFileOpsAndWait() m.EXPECT().EnableFileOps().AnyTimes() db := NewMockdatabase(ctrl) @@ -159,7 +159,7 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) { })) m := NewMockdatabaseMediator(ctrl) - m.EXPECT().DisableFileOps() + m.EXPECT().DisableFileOpsAndWait() m.EXPECT().EnableFileOps().AnyTimes() db := NewMockdatabase(ctrl) diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 7bdc74fe26..d91db51329 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -70,13 +70,15 @@ type cleanupManager struct { deleteFilesFn deleteFilesFn deleteInactiveDirectoriesFn deleteInactiveDirectoriesFn - cleanupInProgress bool + warmFlushCleanupInProgress bool + coldFlushCleanupInProgress bool metrics cleanupManagerMetrics logger *zap.Logger } type cleanupManagerMetrics struct { - status tally.Gauge + warmFlushCleanupStatus tally.Gauge + coldFlushCleanupStatus tally.Gauge corruptCommitlogFile tally.Counter corruptSnapshotFile tally.Counter corruptSnapshotMetadataFile tally.Counter @@ -90,7 +92,8 @@ func newCleanupManagerMetrics(scope tally.Scope) cleanupManagerMetrics { sScope := scope.SubScope("snapshot") smScope := scope.SubScope("snapshot-metadata") return cleanupManagerMetrics{ - status: scope.Gauge("cleanup"), + warmFlushCleanupStatus: scope.Gauge("warm-flush-cleanup"), + coldFlushCleanupStatus: scope.Gauge("cold-flush-cleanup"), corruptCommitlogFile: clScope.Counter("corrupt"), corruptSnapshotFile: sScope.Counter("corrupt"), corruptSnapshotMetadataFile: smScope.Counter("corrupt"), @@ -124,7 +127,7 @@ func newCleanupManager( } } -func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { +func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { // Don't perform any cleanup if we are not boostrapped yet. if !isBootstrapped { m.logger.Debug("database is still bootstrapping, terminating cleanup") @@ -132,12 +135,12 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { } m.Lock() - m.cleanupInProgress = true + m.warmFlushCleanupInProgress = true m.Unlock() defer func() { m.Lock() - m.cleanupInProgress = false + m.warmFlushCleanupInProgress = false m.Unlock() }() @@ -147,11 +150,6 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { } multiErr := xerrors.NewMultiError() - if err := m.cleanupDataFiles(t, namespaces); err != nil { - multiErr = multiErr.Add(fmt.Errorf( - "encountered errors when cleaning up data files for %v: %v", t, err)) - } - if err := m.cleanupExpiredIndexFiles(t, namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when cleaning up index files for %v: %v", t, err)) @@ -162,11 +160,6 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { "encountered errors when cleaning up index files for %v: %v", t, err)) } - if err := m.deleteInactiveDataFiles(namespaces); err != nil { - multiErr = multiErr.Add(fmt.Errorf( - "encountered errors when deleting inactive data files for %v: %v", t, err)) - } - if err := m.deleteInactiveDataSnapshotFiles(namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when deleting inactive snapshot files for %v: %v", t, err)) @@ -185,15 +178,57 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { return multiErr.FinalError() } +func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { + // Don't perform any cleanup if we are not boostrapped yet. + if !isBootstrapped { + m.logger.Debug("database is still bootstrapping, terminating cleanup") + return nil + } + + m.Lock() + m.coldFlushCleanupInProgress = true + m.Unlock() + + defer func() { + m.Lock() + m.coldFlushCleanupInProgress = false + m.Unlock() + }() + + namespaces, err := m.database.OwnedNamespaces() + if err != nil { + return err + } + + multiErr := xerrors.NewMultiError() + if err := m.cleanupDataFiles(t, namespaces); err != nil { + multiErr = multiErr.Add(fmt.Errorf( + "encountered errors when cleaning up data files for %v: %v", t, err)) + } + + if err := m.deleteInactiveDataFiles(namespaces); err != nil { + multiErr = multiErr.Add(fmt.Errorf( + "encountered errors when deleting inactive data files for %v: %v", t, err)) + } + + return multiErr.FinalError() +} func (m *cleanupManager) Report() { m.RLock() - cleanupInProgress := m.cleanupInProgress + coldFlushCleanupInProgress := m.coldFlushCleanupInProgress + warmFlushCleanupInProgress := m.warmFlushCleanupInProgress m.RUnlock() - if cleanupInProgress { - m.metrics.status.Update(1) + if coldFlushCleanupInProgress { + m.metrics.coldFlushCleanupStatus.Update(1) + } else { + m.metrics.coldFlushCleanupStatus.Update(0) + } + + if warmFlushCleanupInProgress { + m.metrics.warmFlushCleanupStatus.Update(1) } else { - m.metrics.status.Update(0) + m.metrics.warmFlushCleanupStatus.Update(0) } } diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index ca8fe33e72..acc15dc251 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/retention" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" xtest "github.com/m3db/m3/src/x/test" @@ -317,7 +318,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) { return nil } - err := mgr.Cleanup(ts, true) + err := cleanup(mgr, ts, true) if tc.expectErr { require.Error(t, err) } else { @@ -360,7 +361,7 @@ func TestCleanupManagerNamespaceCleanupBootstrapped(t *testing.T) { mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) idx.EXPECT().CleanupExpiredFileSets(ts).Return(nil) idx.EXPECT().CleanupDuplicateFileSets().Return(nil) - require.NoError(t, mgr.Cleanup(ts, true)) + require.NoError(t, cleanup(mgr, ts, true)) } func TestCleanupManagerNamespaceCleanupNotBootstrapped(t *testing.T) { @@ -389,7 +390,7 @@ func TestCleanupManagerNamespaceCleanupNotBootstrapped(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(nses, nil).AnyTimes() mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) - require.NoError(t, mgr.Cleanup(ts, false)) + require.NoError(t, cleanup(mgr, ts, false)) } // Test NS doesn't cleanup when flag is present @@ -422,7 +423,7 @@ func TestCleanupManagerDoesntNeedCleanup(t *testing.T) { return nil } - require.NoError(t, mgr.Cleanup(ts, true)) + require.NoError(t, cleanup(mgr, ts, true)) } func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { @@ -448,7 +449,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(namespaces, nil).AnyTimes() mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) - require.NoError(t, mgr.Cleanup(ts, true)) + require.NoError(t, cleanup(mgr, ts, true)) } type deleteInactiveDirectoriesCall struct { @@ -487,7 +488,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) { } mgr.deleteInactiveDirectoriesFn = deleteInactiveDirectoriesFn - require.NoError(t, mgr.Cleanup(ts, true)) + require.NoError(t, cleanup(mgr, ts, true)) expectedCalls := []deleteInactiveDirectoriesCall{ deleteInactiveDirectoriesCall{ @@ -532,7 +533,7 @@ func TestCleanupManagerPropagatesOwnedNamespacesError(t *testing.T) { require.NoError(t, db.Open()) require.NoError(t, db.Terminate()) - require.Error(t, mgr.Cleanup(ts, true)) + require.Error(t, cleanup(mgr, ts, true)) } func timeFor(s int64) time.Time { @@ -556,3 +557,14 @@ func newFakeActiveLogs(activeLogs persist.CommitLogFiles) fakeActiveLogs { activeLogs: activeLogs, } } + +func cleanup( + mgr databaseCleanupManager, + t time.Time, + isBootstrapped bool, +) error { + multiErr := xerrors.NewMultiError() + multiErr = multiErr.Add(mgr.WarmFlushCleanup(t, isBootstrapped)) + multiErr = multiErr.Add(mgr.ColdFlushCleanup(t, isBootstrapped)) + return multiErr.FinalError() +} diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go new file mode 100644 index 0000000000..17913c4530 --- /dev/null +++ b/src/dbnode/storage/coldflush.go @@ -0,0 +1,199 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "sync" + "time" + + "github.com/m3db/m3/src/dbnode/persist" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/x/instrument" + + "github.com/uber-go/tally" + "go.uber.org/zap" +) + +type coldFlushManager struct { + databaseCleanupManager + sync.RWMutex + + log *zap.Logger + database database + pm persist.Manager + opts Options + // Retain using fileOpStatus here to be consistent w/ the + // filesystem manager since both are filesystem processes. + status fileOpStatus + isColdFlushing tally.Gauge + enabled bool +} + +func newColdFlushManager( + database database, + pm persist.Manager, + opts Options, +) databaseColdFlushManager { + instrumentOpts := opts.InstrumentOptions() + scope := instrumentOpts.MetricsScope().SubScope("fs") + // NB(bodu): cold flush cleanup doesn't require commit logs. + cm := newCleanupManager(database, nil, scope) + + return &coldFlushManager{ + databaseCleanupManager: cm, + log: instrumentOpts.Logger(), + database: database, + pm: pm, + opts: opts, + status: fileOpNotStarted, + isColdFlushing: scope.Gauge("cold-flush"), + enabled: true, + } +} + +func (m *coldFlushManager) Disable() fileOpStatus { + m.Lock() + status := m.status + m.enabled = false + m.Unlock() + return status +} + +func (m *coldFlushManager) Enable() fileOpStatus { + m.Lock() + status := m.status + m.enabled = true + m.Unlock() + return status +} + +func (m *coldFlushManager) Status() fileOpStatus { + m.RLock() + status := m.status + m.RUnlock() + return status +} + +func (m *coldFlushManager) Run(t time.Time) bool { + m.Lock() + if !m.shouldRunWithLock() { + m.Unlock() + return false + } + m.status = fileOpInProgress + m.Unlock() + + // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. + // NB(r): Use invariant here since flush errors were introduced + // and not caught in CI or integration tests. + // When an invariant occurs in CI tests it panics so as to fail + // the build. + if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil { + instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err)) + }) + } + if err := m.trackedColdFlush(); err != nil { + instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err)) + }) + } + m.Lock() + m.status = fileOpNotStarted + m.Unlock() + return true +} + +func (m *coldFlushManager) trackedColdFlush() error { + // The cold flush process will persist any data that has been "loaded" into memory via + // the Load() API but has not yet been persisted durably. As a result, if the cold flush + // process completes without error, then we want to "decrement" the number of tracked bytes + // by however many were outstanding right before the cold flush began. + // + // For example: + // t0: Load 100 bytes --> (numLoadedBytes == 100, numPendingLoadedBytes == 0) + // t1: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 100, numPendingLoadedBytes == 100) + // t2: Load 200 bytes --> (numLoadedBytes == 300, numPendingLoadedBytes == 100) + // t3: ColdFlushStart() + // t4: Load 300 bytes --> (numLoadedBytes == 600, numPendingLoadedBytes == 100) + // t5: ColdFlushEnd() + // t6: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 500, numPendingLoadedBytes == 0) + // t7: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) + // t8: ColdFlushStart() + // t9: ColdFlushError() + // t10: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) + // t11: ColdFlushStart() + // t12: ColdFlushEnd() + // t13: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 0, numPendingLoadedBytes == 0) + memTracker := m.opts.MemoryTracker() + memTracker.MarkLoadedAsPending() + + if err := m.coldFlush(); err != nil { + return err + } + + // Only decrement if the cold flush was a success. In this case, the decrement will reduce the + // value by however many bytes had been tracked when the cold flush began. + memTracker.DecPendingLoadedBytes() + return nil +} + +func (m *coldFlushManager) coldFlush() error { + namespaces, err := m.database.OwnedNamespaces() + if err != nil { + return err + } + + flushPersist, err := m.pm.StartFlushPersist() + if err != nil { + return err + } + + multiErr := xerrors.NewMultiError() + for _, ns := range namespaces { + if err = ns.ColdFlush(flushPersist); err != nil { + multiErr = multiErr.Add(err) + } + } + + multiErr = multiErr.Add(flushPersist.DoneFlush()) + err = multiErr.FinalError() + return err +} + +func (m *coldFlushManager) Report() { + m.databaseCleanupManager.Report() + + m.RLock() + status := m.status + m.RUnlock() + if status == fileOpInProgress { + m.isColdFlushing.Update(1) + } else { + m.isColdFlushing.Update(0) + } +} + +func (m *coldFlushManager) shouldRunWithLock() bool { + return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrapped() +} diff --git a/src/dbnode/storage/coldflush_test.go b/src/dbnode/storage/coldflush_test.go new file mode 100644 index 0000000000..55eb3d355c --- /dev/null +++ b/src/dbnode/storage/coldflush_test.go @@ -0,0 +1,118 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "errors" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/m3db/m3/src/dbnode/persist" + "github.com/stretchr/testify/require" +) + +func TestColdFlushManagerFlushAlreadyInProgress(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + mockPersistManager = persist.NewMockManager(ctrl) + mockFlushPersist = persist.NewMockFlushPreparer(ctrl) + + // Channels used to coordinate cold flushing + startCh = make(chan struct{}, 1) + doneCh = make(chan struct{}, 1) + ) + defer func() { + close(startCh) + close(doneCh) + }() + + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Do(func() { + startCh <- struct{}{} + <-doneCh + }).Return(mockFlushPersist, nil) + + testOpts := DefaultTestOptions().SetPersistManager(mockPersistManager) + db := newMockdatabase(ctrl) + db.EXPECT().Options().Return(testOpts).AnyTimes() + db.EXPECT().IsBootstrapped().Return(true).AnyTimes() + db.EXPECT().OwnedNamespaces().Return(nil, nil).AnyTimes() + + cfm := newColdFlushManager(db, mockPersistManager, testOpts).(*coldFlushManager) + cfm.pm = mockPersistManager + + var ( + wg sync.WaitGroup + now = time.Unix(0, 0) + ) + wg.Add(2) + + // Goroutine 1 should successfully flush. + go func() { + defer wg.Done() + require.True(t, cfm.Run(now)) + }() + + // Goroutine 2 should indicate already flushing. + go func() { + defer wg.Done() + + // Wait until we start the cold flushing process. + <-startCh + + // Ensure it doesn't allow a parallel flush. + require.False(t, cfm.Run(now)) + + // Allow the cold flush to finish. + doneCh <- struct{}{} + }() + + wg.Wait() + +} + +func TestColdFlushManagerFlushDoneFlushError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + var ( + fakeErr = errors.New("fake error while marking flush done") + mockPersistManager = persist.NewMockManager(ctrl) + mockFlushPersist = persist.NewMockFlushPreparer(ctrl) + ) + + mockFlushPersist.EXPECT().DoneFlush().Return(fakeErr) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) + + testOpts := DefaultTestOptions().SetPersistManager(mockPersistManager) + db := newMockdatabase(ctrl) + db.EXPECT().Options().Return(testOpts).AnyTimes() + db.EXPECT().OwnedNamespaces().Return(nil, nil) + + cfm := newColdFlushManager(db, mockPersistManager, testOpts).(*coldFlushManager) + cfm.pm = mockPersistManager + + require.EqualError(t, fakeErr, cfm.coldFlush().Error()) +} diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 653f98d4c7..fbef19f04f 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -561,8 +561,9 @@ func (d *db) terminateWithLock() error { } func (d *db) Terminate() error { - // NB(bodu): Wait for fs processes to finish. - d.mediator.WaitForFileSystemProcesses() + // NB(bodu): Disable file ops waits for current fs processes to + // finish before disabling. + d.mediator.DisableFileOpsAndWait() d.Lock() defer d.Unlock() @@ -571,8 +572,9 @@ func (d *db) Terminate() error { } func (d *db) Close() error { - // NB(bodu): Wait for fs processes to finish. - d.mediator.WaitForFileSystemProcesses() + // NB(bodu): Disable file ops waits for current fs processes to + // finish before disabling. + d.mediator.DisableFileOpsAndWait() d.Lock() defer d.Unlock() diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index f323d7adc9..73ee135117 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -26,6 +26,7 @@ import ( "sync" "time" + "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/retention" @@ -33,6 +34,7 @@ import ( "github.com/pborman/uuid" "github.com/uber-go/tally" + "go.uber.org/zap" ) var ( @@ -47,11 +49,34 @@ const ( // when we haven't begun either a flush or snapshot. flushManagerNotIdle flushManagerFlushInProgress - flushManagerColdFlushInProgress flushManagerSnapshotInProgress flushManagerIndexFlushInProgress ) +type flushManagerMetrics struct { + isFlushing tally.Gauge + isSnapshotting tally.Gauge + isIndexFlushing tally.Gauge + // This is a "debug" metric for making sure that the snapshotting process + // is not overly aggressive. + maxBlocksSnapshottedByNamespace tally.Gauge + dataWarmFlushDuration tally.Timer + dataSnapshotDuration tally.Timer + indexFlushDuration tally.Timer +} + +func newFlushManagerMetrics(scope tally.Scope) flushManagerMetrics { + return flushManagerMetrics{ + isFlushing: scope.Gauge("flush"), + isSnapshotting: scope.Gauge("snapshot"), + isIndexFlushing: scope.Gauge("index-flush"), + maxBlocksSnapshottedByNamespace: scope.Gauge("max-blocks-snapshotted-by-namespace"), + dataWarmFlushDuration: scope.Timer("data-warm-flush-duration"), + dataSnapshotDuration: scope.Timer("data-snapshot-duration"), + indexFlushDuration: scope.Timer("index-flush-duration"), + } +} + type flushManager struct { sync.RWMutex @@ -62,16 +87,12 @@ type flushManager struct { // state is used to protect the flush manager against concurrent use, // while flushInProgress and snapshotInProgress are more granular and // are used for emitting granular gauges. - state flushManagerState - isFlushing tally.Gauge - isColdFlushing tally.Gauge - isSnapshotting tally.Gauge - isIndexFlushing tally.Gauge - // This is a "debug" metric for making sure that the snapshotting process - // is not overly aggressive. - maxBlocksSnapshottedByNamespace tally.Gauge + state flushManagerState + metrics flushManagerMetrics lastSuccessfulSnapshotStartTime time.Time + logger *zap.Logger + nowFn clock.NowFn } func newFlushManager( @@ -81,15 +102,13 @@ func newFlushManager( ) databaseFlushManager { opts := database.Options() return &flushManager{ - database: database, - commitlog: commitlog, - opts: opts, - pm: opts.PersistManager(), - isFlushing: scope.Gauge("flush"), - isColdFlushing: scope.Gauge("cold-flush"), - isSnapshotting: scope.Gauge("snapshot"), - isIndexFlushing: scope.Gauge("index-flush"), - maxBlocksSnapshottedByNamespace: scope.Gauge("max-blocks-snapshotted-by-namespace"), + database: database, + commitlog: commitlog, + opts: opts, + pm: opts.PersistManager(), + metrics: newFlushManagerMetrics(scope), + logger: opts.InstrumentOptions().Logger(), + nowFn: opts.ClockOptions().NowFn(), } } @@ -110,16 +129,16 @@ func (m *flushManager) Flush(startTime time.Time) error { return err } - // Perform three separate loops through all the namespaces so that we can + // Perform two separate loops through all the namespaces so that we can // emit better gauges, i.e. all the flushing for all the namespaces happens - // at once, then all the cold flushes, then all the snapshotting. This is + // at once then all the snapshotting. This is // also slightly better semantically because flushing should take priority - // over cold flushes and snapshotting. + // over snapshotting. // // In addition, we need to make sure that for any given shard/blockStart - // combination, we attempt a flush and then a cold flush before a snapshot - // as the snapshotting process will attempt to snapshot any unflushed blocks - // which would be wasteful if the block is already flushable. + // combination, we attempt a flush before a snapshot as the snapshotting process + // will attempt to snapshot blocks w/ unflushed data which would be wasteful if + // the block is already flushable. multiErr := xerrors.NewMultiError() if err = m.dataWarmFlush(namespaces, startTime); err != nil { multiErr = multiErr.Add(err) @@ -127,44 +146,6 @@ func (m *flushManager) Flush(startTime time.Time) error { rotatedCommitlogID, err := m.commitlog.RotateLogs() if err == nil { - // The cold flush process will persist any data that has been "loaded" into memory via - // the Load() API but has not yet been persisted durably. As a result, if the cold flush - // process completes without error, then we want to "decrement" the number of tracked bytes - // by however many were outstanding right before the cold flush began. - // - // For example: - // t0: Load 100 bytes --> (numLoadedBytes == 100, numPendingLoadedBytes == 0) - // t1: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 100, numPendingLoadedBytes == 100) - // t2: Load 200 bytes --> (numLoadedBytes == 300, numPendingLoadedBytes == 100) - // t3: ColdFlushStart() - // t4: Load 300 bytes --> (numLoadedBytes == 600, numPendingLoadedBytes == 100) - // t5: ColdFlushEnd() - // t6: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 500, numPendingLoadedBytes == 0) - // t7: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) - // t8: ColdFlushStart() - // t9: ColdFlushError() - // t10: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) - // t11: ColdFlushStart() - // t12: ColdFlushEnd() - // t13: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 0, numPendingLoadedBytes == 0) - memTracker := m.opts.MemoryTracker() - memTracker.MarkLoadedAsPending() - if err = m.dataColdFlush(namespaces); err != nil { - multiErr = multiErr.Add(err) - // If cold flush fails, we can't proceed to snapshotting because - // commit log cleanup logic uses the presence of a successful - // snapshot checkpoint file to determine which commit log files are - // safe to delete. Therefore if a cold flush fails and a snapshot - // succeeds, the writes from the failed cold flush might be lost - // when commit logs get cleaned up, leaving the node in an undurable - // state such that if it restarted, it would not be able to recover - // the cold writes from its commit log. - return multiErr.FinalError() - } - // Only decrement if the cold flush was a success. In this case, the decrement will reduce the - // value by however many bytes had been tracked when the cold flush began. - memTracker.DecPendingLoadedBytes() - if err = m.dataSnapshot(namespaces, startTime, rotatedCommitlogID); err != nil { multiErr = multiErr.Add(err) } @@ -189,7 +170,10 @@ func (m *flushManager) dataWarmFlush( } m.setState(flushManagerFlushInProgress) - multiErr := xerrors.NewMultiError() + var ( + start = m.nowFn() + multiErr = xerrors.NewMultiError() + ) for _, ns := range namespaces { // Flush first because we will only snapshot if there are no outstanding flushes. flushTimes, err := m.namespaceFlushTimes(ns, startTime) @@ -208,30 +192,7 @@ func (m *flushManager) dataWarmFlush( multiErr = multiErr.Add(err) } - return multiErr.FinalError() -} - -func (m *flushManager) dataColdFlush( - namespaces []databaseNamespace, -) error { - flushPersist, err := m.pm.StartFlushPersist() - if err != nil { - return err - } - - m.setState(flushManagerColdFlushInProgress) - multiErr := xerrors.NewMultiError() - for _, ns := range namespaces { - if err = ns.ColdFlush(flushPersist); err != nil { - multiErr = multiErr.Add(err) - } - } - - err = flushPersist.DoneFlush() - if err != nil { - multiErr = multiErr.Add(err) - } - + m.metrics.dataWarmFlushDuration.Record(m.nowFn().Sub(start)) return multiErr.FinalError() } @@ -249,6 +210,7 @@ func (m *flushManager) dataSnapshot( m.setState(flushManagerSnapshotInProgress) var ( + start = m.nowFn() maxBlocksSnapshottedByNamespace = 0 multiErr = xerrors.NewMultiError() ) @@ -278,7 +240,7 @@ func (m *flushManager) dataSnapshot( } } } - m.maxBlocksSnapshottedByNamespace.Update(float64(maxBlocksSnapshottedByNamespace)) + m.metrics.maxBlocksSnapshottedByNamespace.Update(float64(maxBlocksSnapshottedByNamespace)) err = snapshotPersist.DoneSnapshot(snapshotID, rotatedCommitlogID) multiErr = multiErr.Add(err) @@ -287,6 +249,7 @@ func (m *flushManager) dataSnapshot( if finalErr == nil { m.lastSuccessfulSnapshotStartTime = startTime } + m.metrics.dataSnapshotDuration.Record(m.nowFn().Sub(start)) return finalErr } @@ -299,7 +262,10 @@ func (m *flushManager) indexFlush( } m.setState(flushManagerIndexFlushInProgress) - multiErr := xerrors.NewMultiError() + var ( + start = m.nowFn() + multiErr = xerrors.NewMultiError() + ) for _, ns := range namespaces { var ( indexOpts = ns.Options().IndexOptions() @@ -312,6 +278,7 @@ func (m *flushManager) indexFlush( } multiErr = multiErr.Add(indexFlush.DoneIndex()) + m.metrics.indexFlushDuration.Record(m.nowFn().Sub(start)) return multiErr.FinalError() } @@ -321,27 +288,21 @@ func (m *flushManager) Report() { m.RUnlock() if state == flushManagerFlushInProgress { - m.isFlushing.Update(1) - } else { - m.isFlushing.Update(0) - } - - if state == flushManagerColdFlushInProgress { - m.isColdFlushing.Update(1) + m.metrics.isFlushing.Update(1) } else { - m.isColdFlushing.Update(0) + m.metrics.isFlushing.Update(0) } if state == flushManagerSnapshotInProgress { - m.isSnapshotting.Update(1) + m.metrics.isSnapshotting.Update(1) } else { - m.isSnapshotting.Update(0) + m.metrics.isSnapshotting.Update(0) } if state == flushManagerIndexFlushInProgress { - m.isIndexFlushing.Update(1) + m.metrics.isIndexFlushing.Update(1) } else { - m.isIndexFlushing.Update(0) + m.metrics.isIndexFlushing.Update(0) } } @@ -392,13 +353,8 @@ func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Ti candidateTimes := timesInRange(earliest, latest, blockSize) var loopErr error return filterTimes(candidateTimes, func(t time.Time) bool { - // Snapshot anything that is unflushed. - needsFlush, err := ns.NeedsFlush(t, t) - if err != nil { - loopErr = err - return false - } - return needsFlush + // NB(bodu): Snapshot everything since to account for cold writes/blocks. + return true }), loopErr } diff --git a/src/dbnode/storage/flush_test.go b/src/dbnode/storage/flush_test.go index c909229839..1e271a7b3f 100644 --- a/src/dbnode/storage/flush_test.go +++ b/src/dbnode/storage/flush_test.go @@ -141,17 +141,12 @@ func TestFlushManagerFlushAlreadyInProgress(t *testing.T) { // Allow the flush to finish. doneCh <- struct{}{} - // Wait until we start the compaction process. + // Allow the snapshot to begin and finish. <-startCh // Ensure it doesn't allow a parallel flush. require.Equal(t, errFlushOperationsInProgress, fm.Flush(now)) - // Allow the compaction to finish. - doneCh <- struct{}{} - - // Allow the snapshot to begin and finish. - <-startCh doneCh <- struct{}{} }() @@ -171,11 +166,8 @@ func TestFlushManagerFlushDoneFlushError(t *testing.T) { mockSnapshotPersist = persist.NewMockSnapshotPreparer(ctrl) ) - gomock.InOrder( - mockFlushPersist.EXPECT().DoneFlush().Return(fakeErr), - mockFlushPersist.EXPECT().DoneFlush().Return(nil), - ) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(fakeErr) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -213,8 +205,8 @@ func TestFlushManagerNamespaceFlushTimesErr(t *testing.T) { ) // Make sure DoneFlush is called despite encountering an error, once for snapshot and once for warm flush. - mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -232,7 +224,6 @@ func TestFlushManagerNamespaceFlushTimesErr(t *testing.T) { ns.EXPECT().Options().Return(nsOpts).AnyTimes() ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, fakeErr).AnyTimes() - ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() db.EXPECT().OwnedNamespaces().Return([]databaseNamespace{ns}, nil) @@ -259,8 +250,8 @@ func TestFlushManagerFlushDoneSnapshotError(t *testing.T) { mockSnapshotPersist = persist.NewMockSnapshotPreparer(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(fakeErr) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -294,8 +285,8 @@ func TestFlushManagerFlushDoneIndexError(t *testing.T) { mockPersistManager = persist.NewMockManager(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -330,7 +321,6 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() var ( @@ -339,8 +329,8 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) { mockPersistManager = persist.NewMockManager(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -374,7 +364,6 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().FlushIndex(gomock.Any()).Return(nil) @@ -384,8 +373,8 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) { mockPersistManager = persist.NewMockManager(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) + mockFlushPersist.EXPECT().DoneFlush().Return(nil) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -553,13 +542,10 @@ func TestFlushManagerFlushSnapshot(t *testing.T) { ns.EXPECT().NeedsFlush(st, st).Return(false, nil) } - ns.EXPECT().ColdFlush(gomock.Any()) - snapshotEnd := now.Add(bufferFuture).Truncate(blockSize) num = numIntervals(start, snapshotEnd, blockSize) for i := 0; i < num; i++ { st := start.Add(time.Duration(i) * blockSize) - ns.EXPECT().NeedsFlush(st, st).Return(true, nil) ns.EXPECT().Snapshot(st, now, gomock.Any()) } } diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index 44cd043c4a..0fd7509982 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -158,7 +158,7 @@ func (m *fileSystemManager) Run( // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.Cleanup(t, m.database.IsBootstrapped()); err != nil { + if err := m.WarmFlushCleanup(t, m.database.IsBootstrapped()); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) diff --git a/src/dbnode/storage/fs_test.go b/src/dbnode/storage/fs_test.go index 4832808737..17bff01cc6 100644 --- a/src/dbnode/storage/fs_test.go +++ b/src/dbnode/storage/fs_test.go @@ -85,7 +85,7 @@ func TestFileSystemManagerRun(t *testing.T) { ts := time.Now() gomock.InOrder( - cm.EXPECT().Cleanup(ts, true).Return(errors.New("foo")), + cm.EXPECT().WarmFlushCleanup(ts, true).Return(errors.New("foo")), fm.EXPECT().Flush(ts).Return(errors.New("bar")), ) diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index 26b27f151c..74bd0934dc 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -26,6 +26,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/x/instrument" @@ -34,8 +35,7 @@ import ( ) type ( - mediatorState int - fileSystemProcessesState int + mediatorState int ) const ( @@ -46,9 +46,6 @@ const ( mediatorNotOpen mediatorState = iota mediatorOpen mediatorClosed - - fileSystemProcessesIdle fileSystemProcessesState = iota - fileSystemProcessesBusy ) var ( @@ -80,17 +77,17 @@ type mediator struct { database database databaseBootstrapManager databaseFileSystemManager + databaseColdFlushManager databaseTickManager databaseRepairer - opts Options - nowFn clock.NowFn - sleepFn sleepFn - metrics mediatorMetrics - state mediatorState - fileSystemProcessesState fileSystemProcessesState - mediatorTimeBarrier mediatorTimeBarrier - closedCh chan struct{} + opts Options + nowFn clock.NowFn + sleepFn sleepFn + metrics mediatorMetrics + state mediatorState + mediatorTimeBarrier mediatorTimeBarrier + closedCh chan struct{} } // TODO(r): Consider renaming "databaseMediator" to "databaseCoordinator" @@ -102,23 +99,31 @@ func newMediator(database database, commitlog commitlog.CommitLog, opts Options) nowFn = opts.ClockOptions().NowFn() ) d := &mediator{ - database: database, - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - metrics: newMediatorMetrics(scope), - state: mediatorNotOpen, - fileSystemProcessesState: fileSystemProcessesIdle, - mediatorTimeBarrier: newMediatorTimeBarrier(nowFn, iOpts), - closedCh: make(chan struct{}), + database: database, + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + sleepFn: time.Sleep, + metrics: newMediatorMetrics(scope), + state: mediatorNotOpen, + mediatorTimeBarrier: newMediatorTimeBarrier(nowFn, iOpts), + closedCh: make(chan struct{}), } fsm := newFileSystemManager(database, commitlog, opts) d.databaseFileSystemManager = fsm + // NB(bodu): Cold flush needs its own persist manager now + // that its running in its own thread. + fsOpts := opts.CommitLogOptions().FilesystemOptions() + pm, err := fs.NewPersistManager(fsOpts) + if err != nil { + return nil, err + } + cfm := newColdFlushManager(database, pm, opts) + d.databaseColdFlushManager = cfm + d.databaseRepairer = newNoopDatabaseRepairer() if opts.RepairEnabled() { - var err error d.databaseRepairer, err = newDatabaseRepairer(database, opts) if err != nil { return nil, err @@ -139,39 +144,39 @@ func (m *mediator) Open() error { m.state = mediatorOpen go m.reportLoop() go m.ongoingFileSystemProcesses() + go m.ongoingColdFlushProcesses() go m.ongoingTick() m.databaseRepairer.Start() return nil } -func (m *mediator) DisableFileOps() { +func (m *mediator) DisableFileOpsAndWait() { status := m.databaseFileSystemManager.Disable() for status == fileOpInProgress { m.sleepFn(fileOpCheckInterval) status = m.databaseFileSystemManager.Status() } + // Even though the cold flush runs separately, its still + // considered a fs process. + status = m.databaseColdFlushManager.Disable() + for status == fileOpInProgress { + m.sleepFn(fileOpCheckInterval) + status = m.databaseColdFlushManager.Status() + } } func (m *mediator) EnableFileOps() { m.databaseFileSystemManager.Enable() + // Even though the cold flush runs separately, its still + // considered a fs process. + m.databaseColdFlushManager.Enable() } func (m *mediator) Report() { m.databaseBootstrapManager.Report() m.databaseRepairer.Report() m.databaseFileSystemManager.Report() -} - -func (m *mediator) WaitForFileSystemProcesses() { - m.RLock() - fileSystemProcessesState := m.fileSystemProcessesState - m.RUnlock() - for fileSystemProcessesState == fileSystemProcessesBusy { - m.sleepFn(fileSystemProcessesCheckInterval) - m.RLock() - fileSystemProcessesState = m.fileSystemProcessesState - m.RUnlock() - } + m.databaseColdFlushManager.Report() } func (m *mediator) Close() error { @@ -189,7 +194,7 @@ func (m *mediator) Close() error { return nil } -// The mediator mediates the relationship between ticks and flushes(warm and cold)/snapshots/cleanups. +// The mediator mediates the relationship between ticks and warm flushes/snapshots. // // For example, the requirements to perform a flush are: // 1) currentTime > blockStart.Add(blockSize).Add(bufferPast) @@ -221,6 +226,27 @@ func (m *mediator) ongoingFileSystemProcesses() { } } +// The mediator mediates the relationship between ticks and cold flushes/cleanup the same way it does for warm flushes/snapshots. +// We want to begin each cold/warm flush with an in sync view of time as a tick. +// NB(bodu): Cold flushes and cleanup have been separated out into it's own thread to avoid blocking snapshots. +func (m *mediator) ongoingColdFlushProcesses() { + for { + select { + case <-m.closedCh: + return + default: + m.sleepFn(tickCheckInterval) + + // Check if the mediator is already closed. + if !m.isOpen() { + return + } + + m.runColdFlushProcesses() + } + } +} + func (m *mediator) ongoingTick() { var ( log = m.opts.InstrumentOptions().Logger() @@ -256,15 +282,6 @@ func (m *mediator) ongoingTick() { } func (m *mediator) runFileSystemProcesses() { - m.Lock() - m.fileSystemProcessesState = fileSystemProcessesBusy - m.Unlock() - defer func() { - m.Lock() - m.fileSystemProcessesState = fileSystemProcessesIdle - m.Unlock() - }() - // See comment over mediatorTimeBarrier for an explanation of this logic. log := m.opts.InstrumentOptions().Logger() mediatorTime, err := m.mediatorTimeBarrier.fsProcessesWait() @@ -276,6 +293,18 @@ func (m *mediator) runFileSystemProcesses() { m.databaseFileSystemManager.Run(mediatorTime, syncRun, noForce) } +func (m *mediator) runColdFlushProcesses() { + // See comment over mediatorTimeBarrier for an explanation of this logic. + log := m.opts.InstrumentOptions().Logger() + mediatorTime, err := m.mediatorTimeBarrier.fsProcessesWait() + if err != nil { + log.Error("error within ongoingColdFlushProcesses waiting for next mediatorTime", zap.Error(err)) + return + } + + m.databaseColdFlushManager.Run(mediatorTime) +} + func (m *mediator) reportLoop() { interval := m.opts.InstrumentOptions().ReportInterval() t := time.NewTicker(interval) @@ -314,42 +343,65 @@ func (m *mediator) isOpen() bool { // This means that once a run of filesystem processes completes it will always have to wait until the currently // executing tick completes before performing the next run, but in practice this should not be much of an issue. // -// ____________ ___________ -// | Flush (t0) | | Tick (t0) | -// | | | | -// | | |___________| -// | | ___________ -// | | | Tick (t0) | -// | | | | -// | | |___________| -// | | ___________ -// |____________| | Tick (t0) | -// barrier.wait() | | -// |___________| -// mediatorTime = t1 -// barrier.release() -// ------------------------------------- -// ____________ ___________ -// | Flush (t1) | | Tick (t1) | -// | | | | -// | | |___________| -// | | ___________ -// | | | Tick (t1) | -// | | | | +// Additionally, an independent cold flush process complicates this a bit more in that we have more than one filesystem +// process waiting on the mediator barrier. The invariant here is that both warm and cold flushes always start on a tick +// with a consistent view of time as the tick it is on. They don't necessarily need to start on the same tick. See the +// diagram below for an example case. +// +// ____________ ___________ _________________ +// | Flush (t0) | | Tick (t0) | | Cold Flush (t0) | +// | | | | | | +// | | |___________| | | +// | | ___________ | | +// | | | Tick (t0) | | | +// | | | | | | +// | | |___________| | | +// | | ___________ | | +// |____________| | Tick (t0) | | | +// barrier.wait() | | | | +// |___________| | | +// mediatorTime = t1 | | +// barrier.release() | | +// ____________ ___________ | | +// | Flush (t1) | | Tick (t1) | |_________________| +// | | | | barrier.wait() // | | |___________| -// | | ___________ -// |____________| | Tick (t1) | -// barrier.wait() | | -// |___________| -// barrier.release() -// ------------------------------------ +// | | mediatorTime = t2 +// | | barrier.release() +// | | ___________ _________________ +// | | | Tick (t2) | | Cold Flush (t2) | +// |____________| | | | | +// barrier.wait() |___________| | | +// mediatorTime = t3 | | +// barrier.release() | | +// ____________ ___________ | | +// | Flush (t3) | | Tick (t3) | | | +// | | | | | | +// | | |___________| | | +// | | ___________ | | +// | | | Tick (t3) | | | +// | | | | | | +// | | |___________| | | +// | | ___________ | | +// |____________| | Tick (t3) | |_________________| +// barrier.wait() | | barrier.wait() +// |___________| +// mediatorTime = t4 +// barrier.release() +// ____________ ___________ _________________ +// | Flush (t4) | | Tick (t4) | | Cold Flush (t4) | +// | | | | | | +// ------------------------------------------------------------ type mediatorTimeBarrier struct { sync.Mutex - mediatorTime time.Time - nowFn func() time.Time - iOpts instrument.Options - fsProcessesWaiting bool - releaseCh chan time.Time + // Both mediatorTime and numFsProcessesWaiting are protected + // by the mutex. + mediatorTime time.Time + numFsProcessesWaiting int + + nowFn func() time.Time + iOpts instrument.Options + releaseCh chan time.Time } // initialMediatorTime should only be used to obtain the initial time for @@ -363,28 +415,24 @@ func (b *mediatorTimeBarrier) initialMediatorTime() time.Time { func (b *mediatorTimeBarrier) fsProcessesWait() (time.Time, error) { b.Lock() - if b.fsProcessesWaiting { - b.Unlock() - return time.Time{}, errMediatorTimeBarrierAlreadyWaiting - } - b.fsProcessesWaiting = true + b.numFsProcessesWaiting++ b.Unlock() t := <-b.releaseCh b.Lock() - b.fsProcessesWaiting = false + b.numFsProcessesWaiting-- b.Unlock() return t, nil } func (b *mediatorTimeBarrier) maybeRelease() (time.Time, error) { b.Lock() - hasWaiter := b.fsProcessesWaiting + numWaiters := b.numFsProcessesWaiting mediatorTime := b.mediatorTime b.Unlock() - if !hasWaiter { + if numWaiters == 0 { // If there isn't a waiter yet then the filesystem processes may still // be ongoing in which case we don't want to release the barrier / update // the current time yet. Allow the tick to run again with the same time @@ -405,7 +453,9 @@ func (b *mediatorTimeBarrier) maybeRelease() (time.Time, error) { } b.mediatorTime = newMediatorTime - b.releaseCh <- b.mediatorTime + for i := 0; i < numWaiters; i++ { + b.releaseCh <- b.mediatorTime + } return b.mediatorTime, nil } diff --git a/src/dbnode/storage/mediator_test.go b/src/dbnode/storage/mediator_test.go index 76a7e4f78d..14056d4fca 100644 --- a/src/dbnode/storage/mediator_test.go +++ b/src/dbnode/storage/mediator_test.go @@ -56,7 +56,7 @@ func TestDatabaseMediatorOpenClose(t *testing.T) { require.Equal(t, errMediatorAlreadyClosed, m.Close()) } -func TestDatabaseMediatorDisableFileOps(t *testing.T) { +func TestDatabaseMediatorDisableFileOpsAndWait(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -86,6 +86,6 @@ func TestDatabaseMediatorDisableFileOps(t *testing.T) { fsm.EXPECT().Status().Return(fileOpNotStarted), ) - m.DisableFileOps() + m.DisableFileOpsAndWait() require.Equal(t, 3, len(slept)) } diff --git a/src/dbnode/storage/series/buffer.go b/src/dbnode/storage/series/buffer.go index af8c742fcf..04f261d399 100644 --- a/src/dbnode/storage/series/buffer.go +++ b/src/dbnode/storage/series/buffer.go @@ -133,6 +133,8 @@ type databaseBuffer interface { IsEmpty() bool + IsEmptyAtBlockStart(time.Time) bool + ColdFlushBlockStarts(blockStates map[xtime.UnixNano]BlockState) OptimizedTimes Stats() bufferStats @@ -417,6 +419,14 @@ func (b *dbBuffer) IsEmpty() bool { return len(b.bucketsMap) == 0 } +func (b *dbBuffer) IsEmptyAtBlockStart(start time.Time) bool { + bv, exists := b.bucketVersionsAt(start) + if !exists { + return true + } + return bv.streamsLen() == 0 +} + func (b *dbBuffer) ColdFlushBlockStarts(blockStates map[xtime.UnixNano]BlockState) OptimizedTimes { var times OptimizedTimes diff --git a/src/dbnode/storage/series/buffer_mock.go b/src/dbnode/storage/series/buffer_mock.go index a215f611f4..f242c84f93 100644 --- a/src/dbnode/storage/series/buffer_mock.go +++ b/src/dbnode/storage/series/buffer_mock.go @@ -194,6 +194,20 @@ func (mr *MockdatabaseBufferMockRecorder) IsEmpty() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsEmpty", reflect.TypeOf((*MockdatabaseBuffer)(nil).IsEmpty)) } +// IsEmptyAtBlockStart mocks base method +func (m *MockdatabaseBuffer) IsEmptyAtBlockStart(arg0 time.Time) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsEmptyAtBlockStart", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsEmptyAtBlockStart indicates an expected call of IsEmptyAtBlockStart +func (mr *MockdatabaseBufferMockRecorder) IsEmptyAtBlockStart(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsEmptyAtBlockStart", reflect.TypeOf((*MockdatabaseBuffer)(nil).IsEmptyAtBlockStart), arg0) +} + // ColdFlushBlockStarts mocks base method func (m *MockdatabaseBuffer) ColdFlushBlockStarts(blockStates map[time0.UnixNano]BlockState) OptimizedTimes { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index 4e5931ca67..23b8ecaa0a 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -297,6 +297,13 @@ func (s *dbSeries) IsEmpty() bool { return false } +func (s *dbSeries) IsBufferEmptyAtBlockStart(blockStart time.Time) bool { + s.RLock() + bufferEmpty := s.buffer.IsEmptyAtBlockStart(blockStart) + s.RUnlock() + return bufferEmpty +} + func (s *dbSeries) NumActiveBlocks() int { s.RLock() value := s.cachedBlocks.Len() + s.buffer.Stats().wiredBlocks diff --git a/src/dbnode/storage/series/series_mock.go b/src/dbnode/storage/series/series_mock.go index c6fc05e145..9ffca2a59d 100644 --- a/src/dbnode/storage/series/series_mock.go +++ b/src/dbnode/storage/series/series_mock.go @@ -163,6 +163,20 @@ func (mr *MockDatabaseSeriesMockRecorder) ID() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockDatabaseSeries)(nil).ID)) } +// IsBufferEmptyAtBlockStart mocks base method +func (m *MockDatabaseSeries) IsBufferEmptyAtBlockStart(arg0 time.Time) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsBufferEmptyAtBlockStart", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsBufferEmptyAtBlockStart indicates an expected call of IsBufferEmptyAtBlockStart +func (mr *MockDatabaseSeriesMockRecorder) IsBufferEmptyAtBlockStart(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsBufferEmptyAtBlockStart", reflect.TypeOf((*MockDatabaseSeries)(nil).IsBufferEmptyAtBlockStart), arg0) +} + // IsEmpty mocks base method func (m *MockDatabaseSeries) IsEmpty() bool { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/series/types.go b/src/dbnode/storage/series/types.go index 0d5f627183..d279979f8e 100644 --- a/src/dbnode/storage/series/types.go +++ b/src/dbnode/storage/series/types.go @@ -109,9 +109,13 @@ type DatabaseSeries interface { opts FetchBlocksMetadataOptions, ) (block.FetchBlocksMetadataResult, error) - // IsEmpty returns whether series is empty. + // IsEmpty returns whether series is empty (includes both cached blocks and in-mem buffer data). IsEmpty() bool + // IsBufferEmptyAtBlockStart returns whether the series buffer is empty at block start + // (only checks for in-mem buffer data). + IsBufferEmptyAtBlockStart(time.Time) bool + // NumActiveBlocks returns the number of active blocks the series currently holds. NumActiveBlocks() int diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index ab8dd5dfb2..e7f6a70982 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -251,11 +251,16 @@ type shardFlushState struct { sync.RWMutex statesByTime map[xtime.UnixNano]fileOpState initialized bool + + // NB(bodu): Cache state on whether we snapshotted last or not to avoid + // going to disk to see if filesets are empty. + emptySnapshotOnDiskByTime map[xtime.UnixNano]bool } func newShardFlushState() shardFlushState { return shardFlushState{ - statesByTime: make(map[xtime.UnixNano]fileOpState), + statesByTime: make(map[xtime.UnixNano]fileOpState), + emptySnapshotOnDiskByTime: make(map[xtime.UnixNano]bool), } } @@ -2370,8 +2375,29 @@ func (s *dbShard) Snapshot( s.RUnlock() return errShardNotBootstrappedToSnapshot } + s.RUnlock() + var needsSnapshot bool + s.forEachShardEntry(func(entry *lookup.Entry) bool { + if !entry.Series.IsBufferEmptyAtBlockStart(blockStart) { + needsSnapshot = true + return false + } + return true + }) + // Only terminate early when we would be over-writing an empty snapshot fileset on disk. + // TODO(bodu): We could bootstrap empty snapshot state in the bs path to avoid doing extra + // snapshotting work after a bootstrap since this cached state gets cleared. + s.flushState.RLock() + // NB(bodu): This always defaults to false if the record does not exist. + emptySnapshotOnDisk := s.flushState.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] + s.flushState.RUnlock() + + if !needsSnapshot && emptySnapshotOnDisk { + return nil + } + var multiErr xerrors.MultiError prepareOpts := persist.DataPrepareOptions{ @@ -2418,7 +2444,24 @@ func (s *dbShard) Snapshot( multiErr = multiErr.Add(err) } - return multiErr.FinalError() + if err := multiErr.FinalError(); err != nil { + return err + } + + // Only update cached snapshot state if we successfully flushed data to disk. + s.flushState.Lock() + if needsSnapshot { + s.flushState.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] = false + } else { + // NB(bodu): If we flushed an empty snapshot to disk, it means that the previous + // snapshot on disk was not empty (or we just bootstrapped and cached state was lost). + // The snapshot we just flushed may or may not have data, although whatever data we flushed + // would be recoverable from the rotate commit log as well. + s.flushState.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] = true + } + s.flushState.Unlock() + + return nil } func (s *dbShard) FlushState(blockStart time.Time) (fileOpState, error) { diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 55e0bb46c6..0ea3546cd8 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -829,6 +829,7 @@ func TestShardSnapshotSeriesSnapshotSuccess(t *testing.T) { series := series.NewMockDatabaseSeries(ctrl) series.EXPECT().ID().Return(ident.StringID("foo" + strconv.Itoa(i))).AnyTimes() series.EXPECT().IsEmpty().Return(false).AnyTimes() + series.EXPECT().IsBufferEmptyAtBlockStart(blockStart).Return(false).AnyTimes() series.EXPECT(). Snapshot(gomock.Any(), blockStart, gomock.Any(), gomock.Any()). Do(func(context.Context, time.Time, persist.DataFn, namespace.Context) { diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 864655e539..85a40a0cd5 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2542,18 +2542,32 @@ func (m *MockdatabaseCleanupManager) EXPECT() *MockdatabaseCleanupManagerMockRec return m.recorder } -// Cleanup mocks base method -func (m *MockdatabaseCleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { +// WarmFlushCleanup mocks base method +func (m *MockdatabaseCleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Cleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "WarmFlushCleanup", t, isBootstrapped) ret0, _ := ret[0].(error) return ret0 } -// Cleanup indicates an expected call of Cleanup -func (mr *MockdatabaseCleanupManagerMockRecorder) Cleanup(t, isBootstrapped interface{}) *gomock.Call { +// WarmFlushCleanup indicates an expected call of WarmFlushCleanup +func (mr *MockdatabaseCleanupManagerMockRecorder) WarmFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).Cleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).WarmFlushCleanup), t, isBootstrapped) +} + +// ColdFlushCleanup mocks base method +func (m *MockdatabaseCleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ColdFlushCleanup", t, isBootstrapped) + ret0, _ := ret[0].(error) + return ret0 +} + +// ColdFlushCleanup indicates an expected call of ColdFlushCleanup +func (mr *MockdatabaseCleanupManagerMockRecorder) ColdFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).ColdFlushCleanup), t, isBootstrapped) } // Report mocks base method @@ -2591,20 +2605,6 @@ func (m *MockdatabaseFileSystemManager) EXPECT() *MockdatabaseFileSystemManagerM return m.recorder } -// Cleanup mocks base method -func (m *MockdatabaseFileSystemManager) Cleanup(t time.Time, isBootstrapped bool) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Cleanup", t, isBootstrapped) - ret0, _ := ret[0].(error) - return ret0 -} - -// Cleanup indicates an expected call of Cleanup -func (mr *MockdatabaseFileSystemManagerMockRecorder) Cleanup(t, isBootstrapped interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cleanup", reflect.TypeOf((*MockdatabaseFileSystemManager)(nil).Cleanup), t, isBootstrapped) -} - // Flush mocks base method func (m *MockdatabaseFileSystemManager) Flush(t time.Time) error { m.ctrl.T.Helper() @@ -2702,6 +2702,125 @@ func (mr *MockdatabaseFileSystemManagerMockRecorder) LastSuccessfulSnapshotStart return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastSuccessfulSnapshotStartTime", reflect.TypeOf((*MockdatabaseFileSystemManager)(nil).LastSuccessfulSnapshotStartTime)) } +// MockdatabaseColdFlushManager is a mock of databaseColdFlushManager interface +type MockdatabaseColdFlushManager struct { + ctrl *gomock.Controller + recorder *MockdatabaseColdFlushManagerMockRecorder +} + +// MockdatabaseColdFlushManagerMockRecorder is the mock recorder for MockdatabaseColdFlushManager +type MockdatabaseColdFlushManagerMockRecorder struct { + mock *MockdatabaseColdFlushManager +} + +// NewMockdatabaseColdFlushManager creates a new mock instance +func NewMockdatabaseColdFlushManager(ctrl *gomock.Controller) *MockdatabaseColdFlushManager { + mock := &MockdatabaseColdFlushManager{ctrl: ctrl} + mock.recorder = &MockdatabaseColdFlushManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockdatabaseColdFlushManager) EXPECT() *MockdatabaseColdFlushManagerMockRecorder { + return m.recorder +} + +// WarmFlushCleanup mocks base method +func (m *MockdatabaseColdFlushManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WarmFlushCleanup", t, isBootstrapped) + ret0, _ := ret[0].(error) + return ret0 +} + +// WarmFlushCleanup indicates an expected call of WarmFlushCleanup +func (mr *MockdatabaseColdFlushManagerMockRecorder) WarmFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).WarmFlushCleanup), t, isBootstrapped) +} + +// ColdFlushCleanup mocks base method +func (m *MockdatabaseColdFlushManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ColdFlushCleanup", t, isBootstrapped) + ret0, _ := ret[0].(error) + return ret0 +} + +// ColdFlushCleanup indicates an expected call of ColdFlushCleanup +func (mr *MockdatabaseColdFlushManagerMockRecorder) ColdFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).ColdFlushCleanup), t, isBootstrapped) +} + +// Report mocks base method +func (m *MockdatabaseColdFlushManager) Report() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Report") +} + +// Report indicates an expected call of Report +func (mr *MockdatabaseColdFlushManagerMockRecorder) Report() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Report", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Report)) +} + +// Disable mocks base method +func (m *MockdatabaseColdFlushManager) Disable() fileOpStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Disable") + ret0, _ := ret[0].(fileOpStatus) + return ret0 +} + +// Disable indicates an expected call of Disable +func (mr *MockdatabaseColdFlushManagerMockRecorder) Disable() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disable", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Disable)) +} + +// Enable mocks base method +func (m *MockdatabaseColdFlushManager) Enable() fileOpStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Enable") + ret0, _ := ret[0].(fileOpStatus) + return ret0 +} + +// Enable indicates an expected call of Enable +func (mr *MockdatabaseColdFlushManagerMockRecorder) Enable() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enable", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Enable)) +} + +// Status mocks base method +func (m *MockdatabaseColdFlushManager) Status() fileOpStatus { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Status") + ret0, _ := ret[0].(fileOpStatus) + return ret0 +} + +// Status indicates an expected call of Status +func (mr *MockdatabaseColdFlushManagerMockRecorder) Status() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Status", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Status)) +} + +// Run mocks base method +func (m *MockdatabaseColdFlushManager) Run(t time.Time) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run", t) + ret0, _ := ret[0].(bool) + return ret0 +} + +// Run indicates an expected call of Run +func (mr *MockdatabaseColdFlushManagerMockRecorder) Run(t interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Run), t) +} + // MockdatabaseShardRepairer is a mock of databaseShardRepairer interface type MockdatabaseShardRepairer struct { ctrl *gomock.Controller @@ -2945,16 +3064,16 @@ func (mr *MockdatabaseMediatorMockRecorder) Bootstrap() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseMediator)(nil).Bootstrap)) } -// DisableFileOps mocks base method -func (m *MockdatabaseMediator) DisableFileOps() { +// DisableFileOpsAndWait mocks base method +func (m *MockdatabaseMediator) DisableFileOpsAndWait() { m.ctrl.T.Helper() - m.ctrl.Call(m, "DisableFileOps") + m.ctrl.Call(m, "DisableFileOpsAndWait") } -// DisableFileOps indicates an expected call of DisableFileOps -func (mr *MockdatabaseMediatorMockRecorder) DisableFileOps() *gomock.Call { +// DisableFileOpsAndWait indicates an expected call of DisableFileOpsAndWait +func (mr *MockdatabaseMediatorMockRecorder) DisableFileOpsAndWait() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableFileOps", reflect.TypeOf((*MockdatabaseMediator)(nil).DisableFileOps)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableFileOpsAndWait", reflect.TypeOf((*MockdatabaseMediator)(nil).DisableFileOpsAndWait)) } // EnableFileOps mocks base method @@ -2983,18 +3102,6 @@ func (mr *MockdatabaseMediatorMockRecorder) Tick(forceType, startTime interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockdatabaseMediator)(nil).Tick), forceType, startTime) } -// WaitForFileSystemProcesses mocks base method -func (m *MockdatabaseMediator) WaitForFileSystemProcesses() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "WaitForFileSystemProcesses") -} - -// WaitForFileSystemProcesses indicates an expected call of WaitForFileSystemProcesses -func (mr *MockdatabaseMediatorMockRecorder) WaitForFileSystemProcesses() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForFileSystemProcesses", reflect.TypeOf((*MockdatabaseMediator)(nil).WaitForFileSystemProcesses)) -} - // Repair mocks base method func (m *MockdatabaseMediator) Repair() error { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 68a77e2038..0afa5fd356 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -753,9 +753,14 @@ type databaseFlushManager interface { } // databaseCleanupManager manages cleaning up persistent storage space. +// NB(bodu): We have to separate flush methods since we separated out flushing into warm/cold flush +// and cleaning up certain types of data concurrently w/ either can be problematic. type databaseCleanupManager interface { - // Cleanup cleans up data not needed in the persistent storage. - Cleanup(t time.Time, isBootstrapped bool) error + // WarmFlushCleanup cleans up data not needed in the persistent storage before a warm flush. + WarmFlushCleanup(t time.Time, isBootstrapped bool) error + + // ColdFlushCleanup cleans up data not needed in the persistent storage before a cold flush. + ColdFlushCleanup(t time.Time, isBootstrapped bool) error // Report reports runtime information. Report() @@ -763,9 +768,6 @@ type databaseCleanupManager interface { // databaseFileSystemManager manages the database related filesystem activities. type databaseFileSystemManager interface { - // Cleanup cleans up data not needed in the persistent storage. - Cleanup(t time.Time, isBootstrapped bool) error - // Flush flushes in-memory data to persistent storage. Flush(t time.Time) error @@ -795,6 +797,25 @@ type databaseFileSystemManager interface { LastSuccessfulSnapshotStartTime() (time.Time, bool) } +// databaseColdFlushManager manages the database related cold flush activities. +type databaseColdFlushManager interface { + databaseCleanupManager + + // Disable disables the cold flush manager and prevents it from + // performing file operations, returns the current file operation status. + Disable() fileOpStatus + + // Enable enables the cold flush manager to perform file operations. + Enable() fileOpStatus + + // Status returns the file operation status. + Status() fileOpStatus + + // Run attempts to perform all cold flush related operations, + // returning true if those operations are performed, and false otherwise. + Run(t time.Time) bool +} + // databaseShardRepairer repairs in-memory data for a shard. type databaseShardRepairer interface { // Options returns the repair options. @@ -848,8 +869,8 @@ type databaseMediator interface { // Bootstrap bootstraps the database with file operations performed at the end. Bootstrap() (BootstrapResult, error) - // DisableFileOps disables file operations. - DisableFileOps() + // DisableFileOpsAndWait disables file operations. + DisableFileOpsAndWait() // EnableFileOps enables file operations. EnableFileOps() @@ -857,9 +878,6 @@ type databaseMediator interface { // Tick performs a tick. Tick(forceType forceType, startTime time.Time) error - // WaitForFileSystemProcesses waits for any ongoing fs processes to finish. - WaitForFileSystemProcesses() - // Repair repairs the database. Repair() error