Skip to content

Commit

Permalink
Address PR comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Jul 28, 2020
1 parent 04ecccb commit ad38db0
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 54 deletions.
8 changes: 4 additions & 4 deletions src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,13 +678,13 @@ func (s *commitLogSource) bootstrapShardSnapshots(
mostRecentCompleteSnapshotByBlockShard map[xtime.UnixNano]map[uint32]fs.FileSetFile,
) error {
// NB(bodu): We use info files on disk to check if a snapshot should be loaded in as cold or warm.
// We do this instead of cross refing blockstarts and current time to handle the case of bootsrapping a
// We do this instead of cross refing blockstarts and current time to handle the case of bootstrapping a
// once warm block start after a node has been shut down for a long time. We consider all block starts we
// haven't flushed data for yet a warm block start.
fsOpts := s.opts.CommitLogOptions().FilesystemOptions()
readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), ns.ID(), shard,
fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions())
shardBlockStartsOnDisk := make(map[time.Time]struct{})
shardBlockStartsOnDisk := make(map[xtime.UnixNano]struct{})
for _, result := range readInfoFilesResults {
if err := result.Err.Error(); err != nil {
// If we couldn't read the info files then keep going to be consistent
Expand All @@ -698,7 +698,7 @@ func (s *commitLogSource) bootstrapShardSnapshots(
}
info := result.Info
at := xtime.FromNanoseconds(info.BlockStart)
shardBlockStartsOnDisk[at] = struct{}{}
shardBlockStartsOnDisk[xtime.ToUnixNano(at)] = struct{}{}
}

rangeIter := shardTimeRanges.Iter()
Expand Down Expand Up @@ -734,7 +734,7 @@ func (s *commitLogSource) bootstrapShardSnapshots(
}

writeType := series.WarmWrite
if _, ok := shardBlockStartsOnDisk[blockStart]; ok {
if _, ok := shardBlockStartsOnDisk[xtime.ToUnixNano(blockStart)]; ok {
writeType = series.ColdWrite
}
if err := s.bootstrapShardBlockSnapshot(
Expand Down
32 changes: 21 additions & 11 deletions src/dbnode/storage/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ type cleanupManager struct {

deleteFilesFn deleteFilesFn
deleteInactiveDirectoriesFn deleteInactiveDirectoriesFn
cleanupInProgress bool
warmFlushCleanupInProgress bool
coldFlushCleanupInProgress bool
metrics cleanupManagerMetrics
logger *zap.Logger
}

type cleanupManagerMetrics struct {
status tally.Gauge
warmFlushCleanupStatus tally.Gauge
coldFlushCleanupStatus tally.Gauge
corruptCommitlogFile tally.Counter
corruptSnapshotFile tally.Counter
corruptSnapshotMetadataFile tally.Counter
Expand All @@ -90,7 +92,8 @@ func newCleanupManagerMetrics(scope tally.Scope) cleanupManagerMetrics {
sScope := scope.SubScope("snapshot")
smScope := scope.SubScope("snapshot-metadata")
return cleanupManagerMetrics{
status: scope.Gauge("cleanup"),
warmFlushCleanupStatus: scope.Gauge("warm-flush-cleanup"),
coldFlushCleanupStatus: scope.Gauge("cold-flush-cleanup"),
corruptCommitlogFile: clScope.Counter("corrupt"),
corruptSnapshotFile: sScope.Counter("corrupt"),
corruptSnapshotMetadataFile: smScope.Counter("corrupt"),
Expand Down Expand Up @@ -132,12 +135,12 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro
}

m.Lock()
m.cleanupInProgress = true
m.warmFlushCleanupInProgress = true
m.Unlock()

defer func() {
m.Lock()
m.cleanupInProgress = false
m.warmFlushCleanupInProgress = false
m.Unlock()
}()

Expand Down Expand Up @@ -183,12 +186,12 @@ func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) erro
}

m.Lock()
m.cleanupInProgress = true
m.coldFlushCleanupInProgress = true
m.Unlock()

defer func() {
m.Lock()
m.cleanupInProgress = false
m.coldFlushCleanupInProgress = false
m.Unlock()
}()

Expand All @@ -212,13 +215,20 @@ func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) erro
}
func (m *cleanupManager) Report() {
m.RLock()
cleanupInProgress := m.cleanupInProgress
coldFlushCleanupInProgress := m.coldFlushCleanupInProgress
warmFlushCleanupInProgress := m.warmFlushCleanupInProgress
m.RUnlock()

if cleanupInProgress {
m.metrics.status.Update(1)
if coldFlushCleanupInProgress {
m.metrics.coldFlushCleanupStatus.Update(1)
} else {
m.metrics.status.Update(0)
m.metrics.coldFlushCleanupStatus.Update(0)
}

if warmFlushCleanupInProgress {
m.metrics.warmFlushCleanupStatus.Update(1)
} else {
m.metrics.warmFlushCleanupStatus.Update(0)
}
}

Expand Down
39 changes: 20 additions & 19 deletions src/dbnode/storage/coldflush.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ func (m *coldFlushManager) Run(t time.Time) bool {
if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err))
l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err))
})
}
if err := m.coldFlush(); err != nil {
if err := m.trackedColdFlush(); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("error when flushing data", zap.Time("time", t), zap.Error(err))
l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err))
})
}
m.Lock()
Expand All @@ -124,12 +124,7 @@ func (m *coldFlushManager) Run(t time.Time) bool {
return true
}

func (m *coldFlushManager) coldFlush() error {
namespaces, err := m.database.OwnedNamespaces()
if err != nil {
return err
}

func (m *coldFlushManager) trackedColdFlush() error {
// The cold flush process will persist any data that has been "loaded" into memory via
// the Load() API but has not yet been persisted durably. As a result, if the cold flush
// process completes without error, then we want to "decrement" the number of tracked bytes
Expand All @@ -153,16 +148,22 @@ func (m *coldFlushManager) coldFlush() error {
memTracker := m.opts.MemoryTracker()
memTracker.MarkLoadedAsPending()

defer func() {
if err == nil {
// Only decrement if the cold flush was a success. In this case, the decrement will reduce the
// value by however many bytes had been tracked when the cold flush began.
memTracker.DecPendingLoadedBytes()
} else {
m.log.Error("data cold flush failed",
zap.Error(err))
}
}()
if err := m.coldFlush(); err != nil {
return err
}

// Only decrement if the cold flush was a success. In this case, the decrement will reduce the
// value by however many bytes had been tracked when the cold flush began.
memTracker.DecPendingLoadedBytes()
return nil
}

func (m *coldFlushManager) coldFlush() error {
namespaces, err := m.database.OwnedNamespaces()
if err != nil {
return err
}

flushPersist, err := m.pm.StartFlushPersist()
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions src/dbnode/storage/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (m *fileSystemManager) Run(
}

func (m *fileSystemManager) Report() {
m.databaseCleanupManager.Report()
m.databaseFlushManager.Report()
}

Expand Down
5 changes: 1 addition & 4 deletions src/dbnode/storage/series/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,7 @@ func (s *dbSeries) IsBufferEmptyAtBlockStart(blockStart time.Time) bool {
s.RLock()
bufferEmpty := s.buffer.IsEmptyAtBlockStart(blockStart)
s.RUnlock()
if bufferEmpty {
return true
}
return false
return bufferEmpty
}

func (s *dbSeries) NumActiveBlocks() int {
Expand Down
39 changes: 23 additions & 16 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,8 @@ type dbShard struct {
shard uint32
coldWritesEnabled bool
// NB(bodu): Cache state on whether we snapshotted last or not to avoid
// going to disk to see if filesets are empty. We don't need to lock on this
// because snapshots happen sequentially.
emptySnapshotOnDiskByTime map[time.Time]bool
// going to disk to see if filesets are empty.
emptySnapshotOnDiskByTime map[xtime.UnixNano]bool
}

// NB(r): dbShardRuntimeOptions does not contain its own
Expand Down Expand Up @@ -302,7 +301,7 @@ func newDatabaseShard(
flushState: newShardFlushState(),
tickWg: &sync.WaitGroup{},
coldWritesEnabled: namespaceMetadata.Options().ColdWritesEnabled(),
emptySnapshotOnDiskByTime: make(map[time.Time]bool),
emptySnapshotOnDiskByTime: make(map[xtime.UnixNano]bool),
logger: opts.InstrumentOptions().Logger(),
metrics: newDatabaseShardMetrics(shard, scope),
}
Expand Down Expand Up @@ -2375,13 +2374,13 @@ func (s *dbShard) Snapshot(
s.RUnlock()
return errShardNotBootstrappedToSnapshot
}

// NB(bodu): This always defaults to false if the record does not exist.
emptySnapshotOnDisk := s.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)]

s.RUnlock()

var (
needsSnapshot bool
// NB(bodu): This always defaults to false if the record does not exist.
emptySnapshotOnDisk = s.emptySnapshotOnDiskByTime[blockStart]
)
var needsSnapshot bool
s.forEachShardEntry(func(entry *lookup.Entry) bool {
if !entry.Series.IsBufferEmptyAtBlockStart(blockStart) {
needsSnapshot = true
Expand All @@ -2395,13 +2394,6 @@ func (s *dbShard) Snapshot(
if !needsSnapshot && emptySnapshotOnDisk {
return nil
}
// We're proceeding w/ the snaphot at this point but we know it will be empty or contain data that is recoverable
// from commit logs since we would have rotated commit logs before checking for flushable data.
if needsSnapshot {
s.emptySnapshotOnDiskByTime[blockStart] = false
} else {
s.emptySnapshotOnDiskByTime[blockStart] = true
}

var multiErr xerrors.MultiError

Expand Down Expand Up @@ -2449,6 +2441,21 @@ func (s *dbShard) Snapshot(
multiErr = multiErr.Add(err)
}

if multiErr.FinalError() != nil {
// Only update cached snapshot state if we successfully flushed data to disk.
s.Lock()
if needsSnapshot {
s.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] = false
} else {
// NB(bodu): If we flushed an empty snapshot to disk, it means that the previous
// snapshot on disk was not empty (or we just bootstrapped and cached state was lost).
// The snapshot we just flushed may or may not have data, although whatever data we flushed
// would be recoverable from the rotate commit log as well.
s.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] = true
}
s.Unlock()
}

return multiErr.FinalError()
}

Expand Down

0 comments on commit ad38db0

Please sign in to comment.