From ad38db02bed3a5dc164b0dcfa20dcf7ef1dca31c Mon Sep 17 00:00:00 2001 From: Bo Du Date: Mon, 27 Jul 2020 21:08:59 -0400 Subject: [PATCH] Address PR comments. --- .../bootstrapper/commitlog/source.go | 8 ++-- src/dbnode/storage/cleanup.go | 32 +++++++++------ src/dbnode/storage/coldflush.go | 39 ++++++++++--------- src/dbnode/storage/fs.go | 1 + src/dbnode/storage/series/series.go | 5 +-- src/dbnode/storage/shard.go | 39 +++++++++++-------- 6 files changed, 70 insertions(+), 54 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index fe51845bbc..3ed656c2b7 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -678,13 +678,13 @@ func (s *commitLogSource) bootstrapShardSnapshots( mostRecentCompleteSnapshotByBlockShard map[xtime.UnixNano]map[uint32]fs.FileSetFile, ) error { // NB(bodu): We use info files on disk to check if a snapshot should be loaded in as cold or warm. - // We do this instead of cross refing blockstarts and current time to handle the case of bootsrapping a + // We do this instead of cross refing blockstarts and current time to handle the case of bootstrapping a // once warm block start after a node has been shut down for a long time. We consider all block starts we // haven't flushed data for yet a warm block start. fsOpts := s.opts.CommitLogOptions().FilesystemOptions() readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), ns.ID(), shard, fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions()) - shardBlockStartsOnDisk := make(map[time.Time]struct{}) + shardBlockStartsOnDisk := make(map[xtime.UnixNano]struct{}) for _, result := range readInfoFilesResults { if err := result.Err.Error(); err != nil { // If we couldn't read the info files then keep going to be consistent @@ -698,7 +698,7 @@ func (s *commitLogSource) bootstrapShardSnapshots( } info := result.Info at := xtime.FromNanoseconds(info.BlockStart) - shardBlockStartsOnDisk[at] = struct{}{} + shardBlockStartsOnDisk[xtime.ToUnixNano(at)] = struct{}{} } rangeIter := shardTimeRanges.Iter() @@ -734,7 +734,7 @@ func (s *commitLogSource) bootstrapShardSnapshots( } writeType := series.WarmWrite - if _, ok := shardBlockStartsOnDisk[blockStart]; ok { + if _, ok := shardBlockStartsOnDisk[xtime.ToUnixNano(blockStart)]; ok { writeType = series.ColdWrite } if err := s.bootstrapShardBlockSnapshot( diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index 1fcf901833..d91db51329 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -70,13 +70,15 @@ type cleanupManager struct { deleteFilesFn deleteFilesFn deleteInactiveDirectoriesFn deleteInactiveDirectoriesFn - cleanupInProgress bool + warmFlushCleanupInProgress bool + coldFlushCleanupInProgress bool metrics cleanupManagerMetrics logger *zap.Logger } type cleanupManagerMetrics struct { - status tally.Gauge + warmFlushCleanupStatus tally.Gauge + coldFlushCleanupStatus tally.Gauge corruptCommitlogFile tally.Counter corruptSnapshotFile tally.Counter corruptSnapshotMetadataFile tally.Counter @@ -90,7 +92,8 @@ func newCleanupManagerMetrics(scope tally.Scope) cleanupManagerMetrics { sScope := scope.SubScope("snapshot") smScope := scope.SubScope("snapshot-metadata") return cleanupManagerMetrics{ - status: scope.Gauge("cleanup"), + warmFlushCleanupStatus: scope.Gauge("warm-flush-cleanup"), + coldFlushCleanupStatus: scope.Gauge("cold-flush-cleanup"), corruptCommitlogFile: clScope.Counter("corrupt"), corruptSnapshotFile: sScope.Counter("corrupt"), corruptSnapshotMetadataFile: smScope.Counter("corrupt"), @@ -132,12 +135,12 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro } m.Lock() - m.cleanupInProgress = true + m.warmFlushCleanupInProgress = true m.Unlock() defer func() { m.Lock() - m.cleanupInProgress = false + m.warmFlushCleanupInProgress = false m.Unlock() }() @@ -183,12 +186,12 @@ func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) erro } m.Lock() - m.cleanupInProgress = true + m.coldFlushCleanupInProgress = true m.Unlock() defer func() { m.Lock() - m.cleanupInProgress = false + m.coldFlushCleanupInProgress = false m.Unlock() }() @@ -212,13 +215,20 @@ func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) erro } func (m *cleanupManager) Report() { m.RLock() - cleanupInProgress := m.cleanupInProgress + coldFlushCleanupInProgress := m.coldFlushCleanupInProgress + warmFlushCleanupInProgress := m.warmFlushCleanupInProgress m.RUnlock() - if cleanupInProgress { - m.metrics.status.Update(1) + if coldFlushCleanupInProgress { + m.metrics.coldFlushCleanupStatus.Update(1) } else { - m.metrics.status.Update(0) + m.metrics.coldFlushCleanupStatus.Update(0) + } + + if warmFlushCleanupInProgress { + m.metrics.warmFlushCleanupStatus.Update(1) + } else { + m.metrics.warmFlushCleanupStatus.Update(0) } } diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go index 9d48b987e9..17913c4530 100644 --- a/src/dbnode/storage/coldflush.go +++ b/src/dbnode/storage/coldflush.go @@ -109,13 +109,13 @@ func (m *coldFlushManager) Run(t time.Time) bool { if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { - l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) + l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err)) }) } - if err := m.coldFlush(); err != nil { + if err := m.trackedColdFlush(); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { - l.Error("error when flushing data", zap.Time("time", t), zap.Error(err)) + l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err)) }) } m.Lock() @@ -124,12 +124,7 @@ func (m *coldFlushManager) Run(t time.Time) bool { return true } -func (m *coldFlushManager) coldFlush() error { - namespaces, err := m.database.OwnedNamespaces() - if err != nil { - return err - } - +func (m *coldFlushManager) trackedColdFlush() error { // The cold flush process will persist any data that has been "loaded" into memory via // the Load() API but has not yet been persisted durably. As a result, if the cold flush // process completes without error, then we want to "decrement" the number of tracked bytes @@ -153,16 +148,22 @@ func (m *coldFlushManager) coldFlush() error { memTracker := m.opts.MemoryTracker() memTracker.MarkLoadedAsPending() - defer func() { - if err == nil { - // Only decrement if the cold flush was a success. In this case, the decrement will reduce the - // value by however many bytes had been tracked when the cold flush began. - memTracker.DecPendingLoadedBytes() - } else { - m.log.Error("data cold flush failed", - zap.Error(err)) - } - }() + if err := m.coldFlush(); err != nil { + return err + } + + // Only decrement if the cold flush was a success. In this case, the decrement will reduce the + // value by however many bytes had been tracked when the cold flush began. + memTracker.DecPendingLoadedBytes() + return nil +} + +func (m *coldFlushManager) coldFlush() error { + namespaces, err := m.database.OwnedNamespaces() + if err != nil { + return err + } + flushPersist, err := m.pm.StartFlushPersist() if err != nil { return err diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index 1aa60af219..0fd7509982 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -184,6 +184,7 @@ func (m *fileSystemManager) Run( } func (m *fileSystemManager) Report() { + m.databaseCleanupManager.Report() m.databaseFlushManager.Report() } diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index 74fca57477..23b8ecaa0a 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -301,10 +301,7 @@ func (s *dbSeries) IsBufferEmptyAtBlockStart(blockStart time.Time) bool { s.RLock() bufferEmpty := s.buffer.IsEmptyAtBlockStart(blockStart) s.RUnlock() - if bufferEmpty { - return true - } - return false + return bufferEmpty } func (s *dbSeries) NumActiveBlocks() int { diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index f55f04e0b0..2073428228 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -188,9 +188,8 @@ type dbShard struct { shard uint32 coldWritesEnabled bool // NB(bodu): Cache state on whether we snapshotted last or not to avoid - // going to disk to see if filesets are empty. We don't need to lock on this - // because snapshots happen sequentially. - emptySnapshotOnDiskByTime map[time.Time]bool + // going to disk to see if filesets are empty. + emptySnapshotOnDiskByTime map[xtime.UnixNano]bool } // NB(r): dbShardRuntimeOptions does not contain its own @@ -302,7 +301,7 @@ func newDatabaseShard( flushState: newShardFlushState(), tickWg: &sync.WaitGroup{}, coldWritesEnabled: namespaceMetadata.Options().ColdWritesEnabled(), - emptySnapshotOnDiskByTime: make(map[time.Time]bool), + emptySnapshotOnDiskByTime: make(map[xtime.UnixNano]bool), logger: opts.InstrumentOptions().Logger(), metrics: newDatabaseShardMetrics(shard, scope), } @@ -2375,13 +2374,13 @@ func (s *dbShard) Snapshot( s.RUnlock() return errShardNotBootstrappedToSnapshot } + + // NB(bodu): This always defaults to false if the record does not exist. + emptySnapshotOnDisk := s.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] + s.RUnlock() - var ( - needsSnapshot bool - // NB(bodu): This always defaults to false if the record does not exist. - emptySnapshotOnDisk = s.emptySnapshotOnDiskByTime[blockStart] - ) + var needsSnapshot bool s.forEachShardEntry(func(entry *lookup.Entry) bool { if !entry.Series.IsBufferEmptyAtBlockStart(blockStart) { needsSnapshot = true @@ -2395,13 +2394,6 @@ func (s *dbShard) Snapshot( if !needsSnapshot && emptySnapshotOnDisk { return nil } - // We're proceeding w/ the snaphot at this point but we know it will be empty or contain data that is recoverable - // from commit logs since we would have rotated commit logs before checking for flushable data. - if needsSnapshot { - s.emptySnapshotOnDiskByTime[blockStart] = false - } else { - s.emptySnapshotOnDiskByTime[blockStart] = true - } var multiErr xerrors.MultiError @@ -2449,6 +2441,21 @@ func (s *dbShard) Snapshot( multiErr = multiErr.Add(err) } + if multiErr.FinalError() != nil { + // Only update cached snapshot state if we successfully flushed data to disk. + s.Lock() + if needsSnapshot { + s.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] = false + } else { + // NB(bodu): If we flushed an empty snapshot to disk, it means that the previous + // snapshot on disk was not empty (or we just bootstrapped and cached state was lost). + // The snapshot we just flushed may or may not have data, although whatever data we flushed + // would be recoverable from the rotate commit log as well. + s.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] = true + } + s.Unlock() + } + return multiErr.FinalError() }