Skip to content

Commit

Permalink
[dbnode] Background cold flush process (#2460)
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu authored Jul 30, 2020
1 parent 6435114 commit 3ac68a2
Show file tree
Hide file tree
Showing 28 changed files with 957 additions and 328 deletions.
13 changes: 10 additions & 3 deletions src/dbnode/integration/disk_flush_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/m3db/m3/src/x/ident/testutil"
xtime "github.com/m3db/m3/src/x/time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -84,7 +85,8 @@ func waitUntilSnapshotFilesFlushed(
namespace ident.ID,
expectedSnapshots []snapshotID,
timeout time.Duration,
) error {
) (uuid.UUID, error) {
var snapshotID uuid.UUID
dataFlushed := func() bool {
for _, shard := range shardSet.AllIDs() {
for _, e := range expectedSnapshots {
Expand All @@ -102,14 +104,19 @@ func waitUntilSnapshotFilesFlushed(
if !(latest.ID.VolumeIndex >= e.minVolume) {
return false
}

_, snapshotID, err = latest.SnapshotTimeAndID()
if err != nil {
panic(err)
}
}
}
return true
}
if waitUntil(dataFlushed, timeout) {
return nil
return snapshotID, nil
}
return errDiskFlushTimedOut
return snapshotID, errDiskFlushTimedOut
}

func waitUntilDataFilesFlushed(
Expand Down
12 changes: 7 additions & 5 deletions src/dbnode/integration/disk_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func TestDiskSnapshotSimple(t *testing.T) {
maxWaitTime := time.Minute
for i, ns := range testSetup.Namespaces() {
log.Info("waiting for snapshot files to flush")
require.NoError(t, waitUntilSnapshotFilesFlushed(
filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime))
_, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime)
require.NoError(t, err)
log.Info("verifying snapshot files")
verifySnapshottedDataFiles(t, shardSet, testSetup.StorageOpts(), ns.ID(), seriesMaps)
}
Expand All @@ -167,15 +167,17 @@ func TestDiskSnapshotSimple(t *testing.T) {
for _, ns := range testSetup.Namespaces() {
log.Info("waiting for new snapshot files to be written out")
snapshotsToWaitFor := []snapshotID{{blockStart: newTime.Truncate(blockSize)}}
require.NoError(t, waitUntilSnapshotFilesFlushed(
filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime))
// NB(bodu): We need to check if a specific snapshot ID was deleted since snapshotting logic now changed
// to always snapshotting every block start w/in retention.
snapshotID, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)
require.NoError(t, err)
log.Info("waiting for old snapshot files to be deleted")
for _, shard := range shardSet.All() {
waitUntil(func() bool {
// Increase the time each check to ensure that the filesystem processes are able to progress (some
// of them throttle themselves based on time elapsed since the previous time.)
testSetup.SetNowFn(testSetup.NowFn()().Add(10 * time.Second))
exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), shard.ID(), oldTime.Truncate(blockSize))
exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), snapshotID, shard.ID(), oldTime.Truncate(blockSize))
require.NoError(t, err)
return !exists
}, maxWaitTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/x/context"
xtime "github.com/m3db/m3/src/x/time"
"go.uber.org/zap"
Expand Down Expand Up @@ -228,7 +228,7 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) {
} else {
snapshotBlock = now.Truncate(ns1BlockSize).Add(-ns1BlockSize)
}
err := waitUntilSnapshotFilesFlushed(
_, err := waitUntilSnapshotFilesFlushed(
filePathPrefix,
setup.ShardSet(),
nsID,
Expand Down
19 changes: 17 additions & 2 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -1441,17 +1441,32 @@ func DataFileSetExists(
}

// SnapshotFileSetExistsAt determines whether snapshot fileset files exist for the given namespace, shard, and block start time.
func SnapshotFileSetExistsAt(prefix string, namespace ident.ID, shard uint32, blockStart time.Time) (bool, error) {
func SnapshotFileSetExistsAt(
prefix string,
namespace ident.ID,
snapshotID uuid.UUID,
shard uint32,
blockStart time.Time,
) (bool, error) {
snapshotFiles, err := SnapshotFiles(prefix, namespace, shard)
if err != nil {
return false, err
}

_, ok := snapshotFiles.LatestVolumeForBlock(blockStart)
latest, ok := snapshotFiles.LatestVolumeForBlock(blockStart)
if !ok {
return false, nil
}

_, latestSnapshotID, err := latest.SnapshotTimeAndID()
if err != nil {
return false, err
}

if !uuid.Equal(latestSnapshotID, snapshotID) {
return false, nil
}

// LatestVolumeForBlock checks for a complete checkpoint file, so we don't
// need to recheck it here.
return true, nil
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/persist/fs/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ func TestSnapshotFileSetExistsAt(t *testing.T) {

writeOutTestSnapshot(t, dir, shard, ts, 0)

exists, err := SnapshotFileSetExistsAt(dir, testNs1ID, shard, ts)
exists, err := SnapshotFileSetExistsAt(dir, testNs1ID, testSnapshotID, shard, ts)
require.NoError(t, err)
require.True(t, exists)
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {

// NB(xichen): disable filesystem manager before we bootstrap to minimize
// the impact of file operations on bootstrapping performance
m.mediator.DisableFileOps()
m.mediator.DisableFileOpsAndWait()
defer m.mediator.EnableFileOps()

// Keep performing bootstraps until none pending and no error returned.
Expand Down
33 changes: 31 additions & 2 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,30 @@ func (s *commitLogSource) bootstrapShardSnapshots(
blockSize time.Duration,
mostRecentCompleteSnapshotByBlockShard map[xtime.UnixNano]map[uint32]fs.FileSetFile,
) error {
// NB(bodu): We use info files on disk to check if a snapshot should be loaded in as cold or warm.
// We do this instead of cross refing blockstarts and current time to handle the case of bootstrapping a
// once warm block start after a node has been shut down for a long time. We consider all block starts we
// haven't flushed data for yet a warm block start.
fsOpts := s.opts.CommitLogOptions().FilesystemOptions()
readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), ns.ID(), shard,
fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions())
shardBlockStartsOnDisk := make(map[xtime.UnixNano]struct{})
for _, result := range readInfoFilesResults {
if err := result.Err.Error(); err != nil {
// If we couldn't read the info files then keep going to be consistent
// with the way the db shard updates its flush states in UpdateFlushStates().
s.log.Error("unable to read info files in commit log bootstrap",
zap.Uint32("shard", shard),
zap.Stringer("namespace", ns.ID()),
zap.String("filepath", result.Err.Filepath()),
zap.Error(err))
continue
}
info := result.Info
at := xtime.FromNanoseconds(info.BlockStart)
shardBlockStartsOnDisk[xtime.ToUnixNano(at)] = struct{}{}
}

rangeIter := shardTimeRanges.Iter()
for rangeIter.Next() {
var (
Expand Down Expand Up @@ -709,9 +733,13 @@ func (s *commitLogSource) bootstrapShardSnapshots(
continue
}

writeType := series.WarmWrite
if _, ok := shardBlockStartsOnDisk[xtime.ToUnixNano(blockStart)]; ok {
writeType = series.ColdWrite
}
if err := s.bootstrapShardBlockSnapshot(
ns, accumulator, shard, blockStart, blockSize,
mostRecentCompleteSnapshotForShardBlock); err != nil {
mostRecentCompleteSnapshotForShardBlock, writeType); err != nil {
return err
}
}
Expand All @@ -727,6 +755,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot(
blockStart time.Time,
blockSize time.Duration,
mostRecentCompleteSnapshot fs.FileSetFile,
writeType series.WriteType,
) error {
var (
bOpts = s.opts.ResultOptions()
Expand Down Expand Up @@ -806,7 +835,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot(
}

// Load into series.
if err := ref.Series.LoadBlock(dbBlock, series.WarmWrite); err != nil {
if err := ref.Series.LoadBlock(dbBlock, writeType); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/storage/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) {
db.EXPECT().OwnedNamespaces().Return(namespaces, nil)

m := NewMockdatabaseMediator(ctrl)
m.EXPECT().DisableFileOps()
m.EXPECT().DisableFileOpsAndWait()
m.EXPECT().EnableFileOps().AnyTimes()

bsm := newBootstrapManager(db, m, opts).(*bootstrapManager)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) {
}))

m := NewMockdatabaseMediator(ctrl)
m.EXPECT().DisableFileOps()
m.EXPECT().DisableFileOpsAndWait()
m.EXPECT().EnableFileOps().AnyTimes()

db := NewMockdatabase(ctrl)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) {
}))

m := NewMockdatabaseMediator(ctrl)
m.EXPECT().DisableFileOps()
m.EXPECT().DisableFileOpsAndWait()
m.EXPECT().EnableFileOps().AnyTimes()

db := NewMockdatabase(ctrl)
Expand Down
75 changes: 55 additions & 20 deletions src/dbnode/storage/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ type cleanupManager struct {

deleteFilesFn deleteFilesFn
deleteInactiveDirectoriesFn deleteInactiveDirectoriesFn
cleanupInProgress bool
warmFlushCleanupInProgress bool
coldFlushCleanupInProgress bool
metrics cleanupManagerMetrics
logger *zap.Logger
}

type cleanupManagerMetrics struct {
status tally.Gauge
warmFlushCleanupStatus tally.Gauge
coldFlushCleanupStatus tally.Gauge
corruptCommitlogFile tally.Counter
corruptSnapshotFile tally.Counter
corruptSnapshotMetadataFile tally.Counter
Expand All @@ -90,7 +92,8 @@ func newCleanupManagerMetrics(scope tally.Scope) cleanupManagerMetrics {
sScope := scope.SubScope("snapshot")
smScope := scope.SubScope("snapshot-metadata")
return cleanupManagerMetrics{
status: scope.Gauge("cleanup"),
warmFlushCleanupStatus: scope.Gauge("warm-flush-cleanup"),
coldFlushCleanupStatus: scope.Gauge("cold-flush-cleanup"),
corruptCommitlogFile: clScope.Counter("corrupt"),
corruptSnapshotFile: sScope.Counter("corrupt"),
corruptSnapshotMetadataFile: smScope.Counter("corrupt"),
Expand Down Expand Up @@ -124,20 +127,20 @@ func newCleanupManager(
}
}

func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error {
func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error {
// Don't perform any cleanup if we are not boostrapped yet.
if !isBootstrapped {
m.logger.Debug("database is still bootstrapping, terminating cleanup")
return nil
}

m.Lock()
m.cleanupInProgress = true
m.warmFlushCleanupInProgress = true
m.Unlock()

defer func() {
m.Lock()
m.cleanupInProgress = false
m.warmFlushCleanupInProgress = false
m.Unlock()
}()

Expand All @@ -147,11 +150,6 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error {
}

multiErr := xerrors.NewMultiError()
if err := m.cleanupDataFiles(t, namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when cleaning up data files for %v: %v", t, err))
}

if err := m.cleanupExpiredIndexFiles(t, namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when cleaning up index files for %v: %v", t, err))
Expand All @@ -162,11 +160,6 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error {
"encountered errors when cleaning up index files for %v: %v", t, err))
}

if err := m.deleteInactiveDataFiles(namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when deleting inactive data files for %v: %v", t, err))
}

if err := m.deleteInactiveDataSnapshotFiles(namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when deleting inactive snapshot files for %v: %v", t, err))
Expand All @@ -185,15 +178,57 @@ func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error {
return multiErr.FinalError()
}

func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error {
// Don't perform any cleanup if we are not boostrapped yet.
if !isBootstrapped {
m.logger.Debug("database is still bootstrapping, terminating cleanup")
return nil
}

m.Lock()
m.coldFlushCleanupInProgress = true
m.Unlock()

defer func() {
m.Lock()
m.coldFlushCleanupInProgress = false
m.Unlock()
}()

namespaces, err := m.database.OwnedNamespaces()
if err != nil {
return err
}

multiErr := xerrors.NewMultiError()
if err := m.cleanupDataFiles(t, namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when cleaning up data files for %v: %v", t, err))
}

if err := m.deleteInactiveDataFiles(namespaces); err != nil {
multiErr = multiErr.Add(fmt.Errorf(
"encountered errors when deleting inactive data files for %v: %v", t, err))
}

return multiErr.FinalError()
}
func (m *cleanupManager) Report() {
m.RLock()
cleanupInProgress := m.cleanupInProgress
coldFlushCleanupInProgress := m.coldFlushCleanupInProgress
warmFlushCleanupInProgress := m.warmFlushCleanupInProgress
m.RUnlock()

if cleanupInProgress {
m.metrics.status.Update(1)
if coldFlushCleanupInProgress {
m.metrics.coldFlushCleanupStatus.Update(1)
} else {
m.metrics.coldFlushCleanupStatus.Update(0)
}

if warmFlushCleanupInProgress {
m.metrics.warmFlushCleanupStatus.Update(1)
} else {
m.metrics.status.Update(0)
m.metrics.warmFlushCleanupStatus.Update(0)
}
}

Expand Down
Loading

0 comments on commit 3ac68a2

Please sign in to comment.