Skip to content

Commit

Permalink
kv/bulk: pull ingest stats into own struct
Browse files Browse the repository at this point in the history
Release note: none.
  • Loading branch information
dt committed Apr 22, 2022
1 parent a96ce2e commit 7ae6b88
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 148 deletions.
5 changes: 2 additions & 3 deletions docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ pkg/base/node_id.go | `*SQLIDContainer`
pkg/base/node_id.go | `*StoreIDContainer`
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/bulk/sst_batcher.go | `sorted`
pkg/kv/bulk/sst_batcher.go | `sz`
pkg/kv/bulk/sst_batcher.go | `timing`
pkg/kv/bulk/stats.go | `sz`
pkg/kv/bulk/stats.go | `timing`
pkg/kv/kvserver/closedts/ctpb/service.go | `LAI`
pkg/kv/kvserver/closedts/ctpb/service.go | `SeqNum`
pkg/kv/kvserver/concurrency/lock/locking.go | `Durability`
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
"kv_buf.go",
"setting.go",
"sst_batcher.go",
"stats.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/bulk",
visibility = ["//visibility:public"],
Expand All @@ -30,6 +31,7 @@ go_library(
"//pkg/util/mon",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)

Expand Down
122 changes: 29 additions & 93 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,6 @@ type BufferingAdder struct {

lastFlush time.Time

// flushCounts accumulates performance and debug info for logging.
flushCounts struct {
total int // number of flushes.
bufferSize int // number of flushes due to buffer size.
totalSort time.Duration
totalFlush time.Duration
totalFilling time.Duration
// span tracks the total span into which this batcher has flushed. It is
// only maintained if log.V(1), so if vmodule is upped mid-ingest it may be
// incomplete.
span roachpb.Span
}

// name of the BufferingAdder for the purpose of logging only.
name string

Expand Down Expand Up @@ -142,37 +129,13 @@ func (b *BufferingAdder) SetOnFlush(fn func(summary roachpb.BulkOpSummary)) {

// Close closes the underlying SST builder.
func (b *BufferingAdder) Close(ctx context.Context) {
if b.flushCounts.total > 0 {
log.VEventf(ctx, 1,
"%s adder closing; ingested %s (%s): %s filling; %v sorting; %v / %v flushing; %v sending; %v splitting; %d; %v scattering, %d, %v; %v commit-wait",
b.name,
sz(b.sink.totalRows.DataSize),
sorted(b.sorted),
timing(b.flushCounts.totalFilling),
timing(b.flushCounts.totalSort),
timing(b.flushCounts.totalFlush),
timing(b.sink.flushCounts.flushWait),
timing(b.sink.flushCounts.sendWait),
timing(b.sink.flushCounts.splitWait),
b.sink.flushCounts.splits,
timing(b.sink.flushCounts.scatterWait),
b.sink.flushCounts.scatters,
b.sink.flushCounts.scatterMoved,
timing(b.sink.flushCounts.commitWait),
)
log.VEventf(ctx, 2, "%s adder closing; flushed into %s %d times, %d due to buffer size (%s); flushing chunked into %d files (%d for ranges, %d for sst size, +%d after split-retries)",
b.name,
b.flushCounts.span,
b.flushCounts.total,
b.flushCounts.bufferSize,
sz(b.memAcc.Used()),
b.sink.flushCounts.total,
b.sink.flushCounts.dueToRange,
b.sink.flushCounts.dueToSize,
b.sink.flushCounts.files-b.sink.flushCounts.total,
)
} else {
log.VEventf(ctx, 3, "%s adder closing; ingested nothing", b.name)
if log.V(1) {
if b.sink.stats.bufferFlushes > 0 {
b.sink.stats.LogTimings(ctx, b.name, "closing")
b.sink.stats.LogFlushes(ctx, b.name, "closing", sz(b.memAcc.Used()))
} else {
log.Infof(ctx, "%s adder closing; ingested nothing", b.name)
}
}
b.sink.Close()

Expand All @@ -196,7 +159,7 @@ func (b *BufferingAdder) Add(ctx context.Context, key roachpb.Key, value []byte)
return b.curBuf.append(key, value)
}

b.flushCounts.bufferSize++
b.sink.stats.flushesDueToSize++
log.VEventf(ctx, 3, "%s adder triggering flush of %s of KVs in %s buffer",
b.name, b.curBuf.KVSize(), b.bufferedMemSize())

Expand Down Expand Up @@ -246,7 +209,7 @@ func (b *BufferingAdder) Flush(ctx context.Context) error {
}

func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
b.flushCounts.totalFilling += timeutil.Since(b.lastFlush)
b.sink.stats.fillWait += timeutil.Since(b.lastFlush)

if b.bufferedKeys() == 0 {
if b.onFlush != nil {
Expand All @@ -258,9 +221,9 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
if err := b.sink.Reset(ctx); err != nil {
return err
}
b.flushCounts.total++
b.sink.stats.bufferFlushes++

before := b.sink.flushCounts
before := b.sink.stats
beforeSize := b.sink.totalRows.DataSize

beforeSort := timeutil.Now()
Expand All @@ -271,7 +234,7 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
mvccKey := storage.MVCCKey{Timestamp: b.timestamp}

beforeFlush := timeutil.Now()
b.flushCounts.totalSort += beforeFlush.Sub(beforeSort)
b.sink.stats.sortWait += beforeFlush.Sub(beforeSort)

// If this is the first flush and is due to size, if it was unsorted then
// create initial splits if requested before flushing.
Expand All @@ -286,8 +249,8 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
}

if log.V(1) {
if len(b.flushCounts.span.Key) == 0 || b.curBuf.Key(0).Compare(b.flushCounts.span.Key) < 0 {
b.flushCounts.span.Key = b.curBuf.Key(0).Clone()
if len(b.sink.stats.span.Key) == 0 || b.curBuf.Key(0).Compare(b.sink.stats.span.Key) < 0 {
b.sink.stats.span.Key = b.curBuf.Key(0).Clone()
}
}

Expand All @@ -302,18 +265,18 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
}

if log.V(1) {
if b.flushCounts.span.EndKey.Compare(mvccKey.Key) < 0 {
b.flushCounts.span.EndKey = mvccKey.Key.Clone()
if b.sink.stats.span.EndKey.Compare(mvccKey.Key) < 0 {
b.sink.stats.span.EndKey = mvccKey.Key.Clone()
}
}

b.flushCounts.totalFlush += timeutil.Since(beforeFlush)
b.sink.stats.flushWait += timeutil.Since(beforeFlush)

if log.V(3) {
written := b.sink.totalRows.DataSize - beforeSize
files := b.sink.flushCounts.total - before.total
dueToSplits := b.sink.flushCounts.dueToRange - before.dueToRange
dueToSize := b.sink.flushCounts.dueToSize - before.dueToSize
files := b.sink.stats.batches - before.batches
dueToSplits := b.sink.stats.batchesDueToRange - before.batchesDueToRange
dueToSize := b.sink.stats.batchesDueToRange - before.batchesDueToRange

log.Infof(ctx,
"%s adder flushing %s (%s buffered/%0.2gx) wrote %d SSTs (avg: %s) with %d for splits, %d for size, took %v",
Expand All @@ -329,39 +292,12 @@ func (b *BufferingAdder) doFlush(ctx context.Context, forSize bool) error {
)
}

if log.V(4) {
log.Infof(ctx,
"%s adder has ingested %s (%s): %s filling; %v sorting; %v / %v flushing; %v sending; %v splitting; %d; %v scattering, %d, %v; %v commit-wait",
b.name,
sz(b.sink.totalRows.DataSize),
sorted(b.sorted),
timing(b.flushCounts.totalFilling),
timing(b.flushCounts.totalSort),
timing(b.flushCounts.totalFlush),
timing(b.sink.flushCounts.flushWait),
timing(b.sink.flushCounts.sendWait),
timing(b.sink.flushCounts.splitWait),
b.sink.flushCounts.splits,
timing(b.sink.flushCounts.scatterWait),
b.sink.flushCounts.scatters,
b.sink.flushCounts.scatterMoved,
timing(b.sink.flushCounts.commitWait),
)
if log.V(2) {
b.sink.stats.LogTimings(ctx, b.name, "flushed")
}

if log.V(5) {
log.Infof(ctx,
"%s adder has flushed into %s %d times, %d due to buffer size (%s), chunked as %d files (%d for ranges, %d for sst size, +%d for split-retries)",
b.name,
b.flushCounts.span,
b.flushCounts.total,
b.flushCounts.bufferSize,
sz(b.memAcc.Used()),
b.sink.flushCounts.total,
b.sink.flushCounts.dueToRange,
b.sink.flushCounts.dueToSize,
b.sink.flushCounts.files-b.sink.flushCounts.total,
)
if log.V(3) {
b.sink.stats.LogFlushes(ctx, b.name, "flushed", sz(b.memAcc.Used()))
}

if b.onFlush != nil {
Expand Down Expand Up @@ -436,27 +372,27 @@ func (b *BufferingAdder) createInitialSplits(ctx context.Context) error {
splitsWait := beforeScatters.Sub(beforeSplits)
log.Infof(ctx, "%s adder created %d initial splits in %v from %d keys in %s buffer",
b.name, len(toScatter), timing(splitsWait), b.curBuf.Len(), b.curBuf.MemSize())
b.sink.flushCounts.splits += len(toScatter)
b.sink.flushCounts.splitWait += splitsWait
b.sink.stats.splits += len(toScatter)
b.sink.stats.splitWait += splitsWait

for _, splitKey := range toScatter {
resp, err := b.sink.db.AdminScatter(ctx, splitKey, 0 /* maxSize */)
if err != nil {
log.Warningf(ctx, "failed to scatter: %v", err)
continue
}
b.sink.flushCounts.scatters++
b.sink.stats.scatters++
if resp.MVCCStats != nil {
moved := sz(resp.MVCCStats.Total())
b.sink.flushCounts.scatterMoved += moved
b.sink.stats.scatterMoved += moved
if moved > 0 {
log.VEventf(ctx, 1, "pre-split scattered %s in non-empty range %s",
moved, resp.RangeInfos[0].Desc.KeySpan().AsRawSpanWithNoLocals())
}
}
}
scattersWait := timeutil.Since(beforeScatters)
b.sink.flushCounts.scatterWait += scattersWait
b.sink.stats.scatterWait += scattersWait
log.Infof(ctx, "%s adder scattered %d initial split spans in %v",
b.name, len(toScatter), timing(scattersWait))

Expand Down
Loading

0 comments on commit 7ae6b88

Please sign in to comment.