Skip to content

Commit

Permalink
[dbnode] Add additional data snapshotting metrics (#2575)
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu authored Sep 8, 2020
1 parent fbb59d9 commit 9f34f25
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 102 deletions.
4 changes: 4 additions & 0 deletions src/dbnode/storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type flushManagerMetrics struct {
dataWarmFlushDuration tally.Timer
dataSnapshotDuration tally.Timer
indexFlushDuration tally.Timer
commitLogRotationDuration tally.Timer
}

func newFlushManagerMetrics(scope tally.Scope) flushManagerMetrics {
Expand All @@ -76,6 +77,7 @@ func newFlushManagerMetrics(scope tally.Scope) flushManagerMetrics {
dataWarmFlushDuration: scope.Timer("data-warm-flush-duration"),
dataSnapshotDuration: scope.Timer("data-snapshot-duration"),
indexFlushDuration: scope.Timer("index-flush-duration"),
commitLogRotationDuration: scope.Timer("commit-log-rotation-duration"),
}
}

Expand Down Expand Up @@ -147,7 +149,9 @@ func (m *flushManager) Flush(startTime time.Time) error {
multiErr = multiErr.Add(err)
}

start := m.nowFn()
rotatedCommitlogID, err := m.commitlog.RotateLogs()
m.metrics.commitLogRotationDuration.Record(m.nowFn().Sub(start))
if err == nil {
if err = m.dataSnapshot(namespaces, startTime, rotatedCommitlogID); err != nil {
multiErr = multiErr.Add(err)
Expand Down
84 changes: 47 additions & 37 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,24 +151,25 @@ type databaseNamespaceIndexStatsLastTick struct {
}

type databaseNamespaceMetrics struct {
bootstrap instrument.MethodMetrics
flushWarmData instrument.MethodMetrics
flushColdData instrument.MethodMetrics
flushIndex instrument.MethodMetrics
snapshot instrument.MethodMetrics
write instrument.MethodMetrics
writeTagged instrument.MethodMetrics
read instrument.MethodMetrics
fetchBlocks instrument.MethodMetrics
fetchBlocksMetadata instrument.MethodMetrics
queryIDs instrument.MethodMetrics
aggregateQuery instrument.MethodMetrics
unfulfilled tally.Counter
bootstrapStart tally.Counter
bootstrapEnd tally.Counter
shards databaseNamespaceShardMetrics
tick databaseNamespaceTickMetrics
status databaseNamespaceStatusMetrics
bootstrap instrument.MethodMetrics
flushWarmData instrument.MethodMetrics
flushColdData instrument.MethodMetrics
flushIndex instrument.MethodMetrics
snapshot instrument.MethodMetrics
write instrument.MethodMetrics
writeTagged instrument.MethodMetrics
read instrument.MethodMetrics
fetchBlocks instrument.MethodMetrics
fetchBlocksMetadata instrument.MethodMetrics
queryIDs instrument.MethodMetrics
aggregateQuery instrument.MethodMetrics
unfulfilled tally.Counter
bootstrapStart tally.Counter
bootstrapEnd tally.Counter
snapshotSeriesPersist tally.Counter
shards databaseNamespaceShardMetrics
tick databaseNamespaceTickMetrics
status databaseNamespaceStatusMetrics
}

type databaseNamespaceShardMetrics struct {
Expand Down Expand Up @@ -232,22 +233,25 @@ func newDatabaseNamespaceMetrics(
indexTickScope := tickScope.SubScope("index")
statusScope := scope.SubScope("status")
indexStatusScope := statusScope.SubScope("index")
bootstrapScope := scope.SubScope("bootstrap")
snapshotScope := scope.SubScope("snapshot")
return databaseNamespaceMetrics{
bootstrap: instrument.NewMethodMetrics(scope, "bootstrap", opts),
flushWarmData: instrument.NewMethodMetrics(scope, "flushWarmData", opts),
flushColdData: instrument.NewMethodMetrics(scope, "flushColdData", opts),
flushIndex: instrument.NewMethodMetrics(scope, "flushIndex", opts),
snapshot: instrument.NewMethodMetrics(scope, "snapshot", opts),
write: instrument.NewMethodMetrics(scope, "write", opts),
writeTagged: instrument.NewMethodMetrics(scope, "write-tagged", opts),
read: instrument.NewMethodMetrics(scope, "read", opts),
fetchBlocks: instrument.NewMethodMetrics(scope, "fetchBlocks", opts),
fetchBlocksMetadata: instrument.NewMethodMetrics(scope, "fetchBlocksMetadata", opts),
queryIDs: instrument.NewMethodMetrics(scope, "queryIDs", opts),
aggregateQuery: instrument.NewMethodMetrics(scope, "aggregateQuery", opts),
unfulfilled: scope.Counter("bootstrap.unfulfilled"),
bootstrapStart: scope.Counter("bootstrap.start"),
bootstrapEnd: scope.Counter("bootstrap.end"),
bootstrap: instrument.NewMethodMetrics(scope, "bootstrap", opts),
flushWarmData: instrument.NewMethodMetrics(scope, "flushWarmData", opts),
flushColdData: instrument.NewMethodMetrics(scope, "flushColdData", opts),
flushIndex: instrument.NewMethodMetrics(scope, "flushIndex", opts),
snapshot: instrument.NewMethodMetrics(scope, "snapshot", opts),
write: instrument.NewMethodMetrics(scope, "write", opts),
writeTagged: instrument.NewMethodMetrics(scope, "write-tagged", opts),
read: instrument.NewMethodMetrics(scope, "read", opts),
fetchBlocks: instrument.NewMethodMetrics(scope, "fetchBlocks", opts),
fetchBlocksMetadata: instrument.NewMethodMetrics(scope, "fetchBlocksMetadata", opts),
queryIDs: instrument.NewMethodMetrics(scope, "queryIDs", opts),
aggregateQuery: instrument.NewMethodMetrics(scope, "aggregateQuery", opts),
unfulfilled: bootstrapScope.Counter("unfulfilled"),
bootstrapStart: bootstrapScope.Counter("start"),
bootstrapEnd: bootstrapScope.Counter("end"),
snapshotSeriesPersist: snapshotScope.Counter("series-persist"),
shards: databaseNamespaceShardMetrics{
add: shardsScope.Counter("add"),
close: shardsScope.Counter("close"),
Expand Down Expand Up @@ -1297,17 +1301,23 @@ func (n *dbNamespace) Snapshot(
return nil
}

multiErr := xerrors.NewMultiError()
shards := n.OwnedShards()
for _, shard := range shards {
err := shard.Snapshot(blockStart, snapshotTime, snapshotPersist, nsCtx)
var (
seriesPersist int
multiErr xerrors.MultiError
)
for _, shard := range n.OwnedShards() {
result, err := shard.Snapshot(blockStart, snapshotTime, snapshotPersist, nsCtx)
if err != nil {
detailedErr := fmt.Errorf("shard %d failed to snapshot: %v", shard.ID(), err)
multiErr = multiErr.Add(detailedErr)
// Continue with remaining shards
}

seriesPersist += result.SeriesPersist
}

n.metrics.snapshotSeriesPersist.Inc(int64(seriesPersist))

res := multiErr.FinalError()
n.metrics.snapshot.ReportSuccessOrError(res, n.nowFn().Sub(callStart))
return res
Expand Down
9 changes: 7 additions & 2 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,10 @@ func TestNamespaceSnapshotShardError(t *testing.T) {
require.Error(t, testSnapshotWithShardSnapshotErrs(t, shardMethodResults))
}

func testSnapshotWithShardSnapshotErrs(t *testing.T, shardMethodResults []snapshotTestCase) error {
func testSnapshotWithShardSnapshotErrs(
t *testing.T,
shardMethodResults []snapshotTestCase,
) error {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

Expand All @@ -588,7 +591,9 @@ func testSnapshotWithShardSnapshotErrs(t *testing.T, shardMethodResults []snapsh
shardID := uint32(i)
shard.EXPECT().ID().Return(uint32(i)).AnyTimes()
if tc.expectSnapshot {
shard.EXPECT().Snapshot(blockStart, now, gomock.Any(), gomock.Any()).Return(tc.shardSnapshotErr)
shard.EXPECT().
Snapshot(blockStart, now, gomock.Any(), gomock.Any()).
Return(ShardSnapshotResult{}, tc.shardSnapshotErr)
}
ns.shards[testShardIDs[i].ID()] = shard
shardBootstrapStates[shardID] = tc.shardBootstrapStateBeforeTick
Expand Down
61 changes: 39 additions & 22 deletions src/dbnode/storage/series/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type databaseBuffer interface {
metadata persist.Metadata,
persistFn persist.DataFn,
nsCtx namespace.Context,
) error
) (SnapshotResult, error)

WarmFlush(
ctx context.Context,
Expand Down Expand Up @@ -541,24 +541,38 @@ func (b *dbBuffer) Snapshot(
metadata persist.Metadata,
persistFn persist.DataFn,
nsCtx namespace.Context,
) error {
) (SnapshotResult, error) {
var (
start = b.nowFn()
result SnapshotResult
)

buckets, exists := b.bucketVersionsAt(blockStart)
if !exists {
return nil
return result, nil
}

// Snapshot must take both cold and warm writes because cold flushes don't
// happen for the current block (since cold flushes can't happen before a
// warm flush has happened).
streams, err := buckets.mergeToStreams(ctx, streamsOptions{filterWriteType: false, nsCtx: nsCtx})
if err != nil {
return err
return result, err
}
numStreams := len(streams)

var mergedStream xio.SegmentReader
if numStreams == 1 {
mergedStream = streams[0]
afterMergeByBucket := b.nowFn()
result.Stats.TimeMergeByBucket = afterMergeByBucket.Sub(start)

var (
numStreams = len(streams)
mergeAcrossBuckets = numStreams != 1
segment ts.Segment
)
if !mergeAcrossBuckets {
segment, err = streams[0].Segment()
if err != nil {
return result, err
}
} else {
// We may need to merge again here because the regular merge method does
// not merge warm and cold buckets or buckets that have different versions.
Expand All @@ -580,34 +594,37 @@ func (b *dbBuffer) Snapshot(
for iter.Next() {
dp, unit, annotation := iter.Current()
if err := encoder.Encode(dp, unit, annotation); err != nil {
return err
return result, err
}
}
if err := iter.Err(); err != nil {
return err
return result, err
}

var ok bool
mergedStream, ok = encoder.Stream(ctx)
if !ok {
// Don't write out series with no data.
return nil
}
segment = encoder.Discard()

This comment has been minimized.

Copy link
@notbdu

notbdu Sep 8, 2020

Author Contributor

I looked at the docs for this fn, it said it transfers ownership to the caller. What's the reasoning for using this versus .Stream(ctx)?

This comment has been minimized.

Copy link
@notbdu

notbdu Sep 8, 2020

Author Contributor

I read a little more and I guess we don't need a segment reader bound to the request ctx since we're just persisting the segment straight to disk? So it's less work to just take ownership of the underlying segment?

}

segment, err := mergedStream.Segment()
if err != nil {
return err
}
afterMergeAcrossBuckets := b.nowFn()
result.Stats.TimeMergeAcrossBuckets = afterMergeAcrossBuckets.Sub(afterMergeByBucket)

if segment.Len() == 0 {
// Don't write out series with no data.
return nil
return result, nil
}

checksum := segment.CalculateChecksum()

return persistFn(metadata, segment, checksum)
afterChecksum := b.nowFn()
result.Stats.TimeChecksum = afterChecksum.Sub(afterMergeAcrossBuckets)

if err := persistFn(metadata, segment, checksum); err != nil {
return result, err
}

result.Stats.TimePersist = b.nowFn().Sub(afterChecksum)

result.Persist = true
return result, nil
}

func (b *dbBuffer) WarmFlush(
Expand Down
7 changes: 4 additions & 3 deletions src/dbnode/storage/series/buffer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions src/dbnode/storage/series/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ func testBufferWithEmptyEncoder(t *testing.T, testSnapshot bool) {
ctx = context.NewContext()
defer ctx.Close()

err = buffer.Snapshot(ctx, start, metadata, assertPersistDataFn, namespace.Context{})
_, err = buffer.Snapshot(ctx, start, metadata, assertPersistDataFn, namespace.Context{})
assert.NoError(t, err)
} else {
ctx = context.NewContext()
Expand Down Expand Up @@ -1266,7 +1266,7 @@ func testBufferSnapshot(t *testing.T, opts Options, setAnn setAnnotation) {
ID: []byte("some-id"),
})

err := buffer.Snapshot(ctx, start, metadata, assertPersistDataFn, nsCtx)
_, err := buffer.Snapshot(ctx, start, metadata, assertPersistDataFn, nsCtx)
assert.NoError(t, err)

// Check internal state to make sure the merge happened and was persisted.
Expand Down Expand Up @@ -1413,7 +1413,7 @@ func TestBufferSnapshotWithColdWrites(t *testing.T) {
ID: []byte("some-id"),
})

err := buffer.Snapshot(ctx, start, metadata, assertPersistDataFn, nsCtx)
_, err := buffer.Snapshot(ctx, start, metadata, assertPersistDataFn, nsCtx)
require.NoError(t, err)

// Check internal state of warm bucket to make sure the merge happened and
Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/storage/series/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,14 +619,14 @@ func (s *dbSeries) Snapshot(
blockStart time.Time,
persistFn persist.DataFn,
nsCtx namespace.Context,
) error {
) (SnapshotResult, error) {
// Need a write lock because the buffer Snapshot method mutates
// state (by performing a pro-active merge).
s.Lock()
err := s.buffer.Snapshot(ctx, blockStart,
result, err := s.buffer.Snapshot(ctx, blockStart,
persist.NewMetadata(s.metadata), persistFn, nsCtx)
s.Unlock()
return err
return result, err
}

func (s *dbSeries) ColdFlushBlockStarts(blockStates BootstrappedBlockStateSnapshot) OptimizedTimes {
Expand Down
7 changes: 4 additions & 3 deletions src/dbnode/storage/series/series_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 25 additions & 1 deletion src/dbnode/storage/series/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type DatabaseSeries interface {
blockStart time.Time,
persistFn persist.DataFn,
nsCtx namespace.Context,
) error
) (SnapshotResult, error)

// ColdFlushBlockStarts returns the block starts that need cold flushes.
ColdFlushBlockStarts(blockStates BootstrappedBlockStateSnapshot) OptimizedTimes
Expand All @@ -157,6 +157,28 @@ type DatabaseSeries interface {
Reset(opts DatabaseSeriesOptions)
}

// SnapshotResult contains metadata regarding the snapshot.
type SnapshotResult struct {
Persist bool
Stats SnapshotResultStats
}

// SnapshotResultStats contains stats regarding the snapshot.
type SnapshotResultStats struct {
TimeMergeByBucket time.Duration
TimeMergeAcrossBuckets time.Duration
TimeChecksum time.Duration
TimePersist time.Duration
}

// Add adds the result of a snapshot result to this result.
func (r *SnapshotResultStats) Add(other SnapshotResultStats) {
r.TimeMergeByBucket += other.TimeMergeByBucket
r.TimeMergeAcrossBuckets += other.TimeMergeAcrossBuckets
r.TimeChecksum += other.TimeChecksum
r.TimePersist += other.TimePersist
}

// FetchBlocksMetadataOptions encapsulates block fetch metadata options
// and specifies a few series specific options too.
type FetchBlocksMetadataOptions struct {
Expand Down Expand Up @@ -376,6 +398,7 @@ type Stats struct {
coldWrites tally.Counter
encodersPerBlock tally.Histogram
encoderLimitWriteRejected tally.Counter
snapshotMergesEachBucket tally.Counter
}

// NewStats returns a new Stats for the provided scope.
Expand All @@ -389,6 +412,7 @@ func NewStats(scope tally.Scope) Stats {
coldWrites: subScope.Counter("cold-writes"),
encodersPerBlock: subScope.Histogram("encoders-per-block", buckets),
encoderLimitWriteRejected: subScope.Counter("encoder-limit-write-rejected"),
snapshotMergesEachBucket: subScope.Counter("snapshot-merges-each-bucket"),
}
}

Expand Down
Loading

0 comments on commit 9f34f25

Please sign in to comment.