Skip to content

Commit

Permalink
importccl: persist bulk op summary during execution
Browse files Browse the repository at this point in the history
Release note (bug fix): Fix a bug where the summary displayed after an
IMPORT command would sometimes be inaccurate due to retries.

Release justification: Bug fix. Fix a bug where the summary displayed after an
IMPORT command would sometimes be inaccurate due to retries.
  • Loading branch information
pbardea committed Aug 30, 2021
1 parent 30ec5a0 commit ce65d43
Show file tree
Hide file tree
Showing 12 changed files with 722 additions and 573 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/util/log/eventpb",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/tracing",
Expand Down
21 changes: 19 additions & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -369,22 +370,33 @@ func ingestKvs(
pkFlushedRow := make([]int64, len(spec.Uri))
idxFlushedRow := make([]int64, len(spec.Uri))

bulkSummaryMu := &struct {
syncutil.Mutex
summary roachpb.BulkOpSummary
}{}

// When the PK adder flushes, everything written has been flushed, so we set
// pkFlushedRow to writtenRow. Additionally if the indexAdder is empty then we
// can treat it as flushed as well (in case we're not adding anything to it).
pkIndexAdder.SetOnFlush(func() {
pkIndexAdder.SetOnFlush(func(summary roachpb.BulkOpSummary) {
for i, emitted := range writtenRow {
atomic.StoreInt64(&pkFlushedRow[i], emitted)
bulkSummaryMu.Lock()
bulkSummaryMu.summary.Add(summary)
bulkSummaryMu.Unlock()
}
if indexAdder.IsEmpty() {
for i, emitted := range writtenRow {
atomic.StoreInt64(&idxFlushedRow[i], emitted)
}
}
})
indexAdder.SetOnFlush(func() {
indexAdder.SetOnFlush(func(summary roachpb.BulkOpSummary) {
for i, emitted := range writtenRow {
atomic.StoreInt64(&idxFlushedRow[i], emitted)
bulkSummaryMu.Lock()
bulkSummaryMu.summary.Add(summary)
bulkSummaryMu.Unlock()
}
})

Expand All @@ -411,6 +423,11 @@ func ingestKvs(
prog.ResumePos[file] = idx
}
prog.CompletedFraction[file] = math.Float32frombits(atomic.LoadUint32(&writtenFraction[offset]))
// Write down the summary of how much we've ingested since the last update.
bulkSummaryMu.Lock()
prog.BulkSummary = bulkSummaryMu.summary
bulkSummaryMu.summary.Reset()
bulkSummaryMu.Unlock()
}
progCh <- prog
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (r *errorReportingRowReceiver) ProducerDone() {}
// A do nothing bulk adder implementation.
type doNothingKeyAdder struct {
onKeyAdd func(key roachpb.Key)
onFlush func()
onFlush func(summary roachpb.BulkOpSummary)
}

var _ kvserverbase.BulkAdder = &doNothingKeyAdder{}
Expand All @@ -205,16 +205,16 @@ func (a *doNothingKeyAdder) Add(_ context.Context, k roachpb.Key, _ []byte) erro
}
func (a *doNothingKeyAdder) Flush(_ context.Context) error {
if a.onFlush != nil {
a.onFlush()
a.onFlush(roachpb.BulkOpSummary{})
}
return nil
}

func (*doNothingKeyAdder) IsEmpty() bool { return true }
func (*doNothingKeyAdder) CurrentBufferFill() float32 { return 0 }
func (*doNothingKeyAdder) GetSummary() roachpb.BulkOpSummary { return roachpb.BulkOpSummary{} }
func (*doNothingKeyAdder) Close(_ context.Context) {}
func (a *doNothingKeyAdder) SetOnFlush(f func()) { a.onFlush = f }
func (*doNothingKeyAdder) IsEmpty() bool { return true }
func (*doNothingKeyAdder) CurrentBufferFill() float32 { return 0 }
func (*doNothingKeyAdder) GetSummary() roachpb.BulkOpSummary { return roachpb.BulkOpSummary{} }
func (*doNothingKeyAdder) Close(_ context.Context) {}
func (a *doNothingKeyAdder) SetOnFlush(f func(_ roachpb.BulkOpSummary)) { a.onFlush = f }

var eofOffset int64 = math.MaxInt64

Expand Down
890 changes: 469 additions & 421 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ option go_package = "jobspb";

import "errorspb/errors.proto";
import "gogoproto/gogo.proto";
import "roachpb/api.proto";
import "roachpb/data.proto";
import "roachpb/io-formats.proto";
import "sql/catalog/descpb/structured.proto";
Expand Down Expand Up @@ -360,6 +361,8 @@ message ImportProgress {
// Holds metadata related to sequences for every file processed during an
// IMPORT.
repeated SequenceDetails sequence_details = 6;

roachpb.BulkOpSummary summary = 7 [(gogoproto.nullable) = false];
}

// TypeSchemaChangeDetails is the job detail information for a type schema change job.
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/bulk/buffering_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type BufferingAdder struct {
bulkMon *mon.BytesMonitor
memAcc mon.BoundAccount

onFlush func()
onFlush func(summary roachpb.BulkOpSummary)
}

var _ kvserverbase.BulkAdder = &BufferingAdder{}
Expand Down Expand Up @@ -140,7 +140,7 @@ func MakeBulkAdder(
}

// SetOnFlush sets a callback to run after the buffering adder flushes.
func (b *BufferingAdder) SetOnFlush(fn func()) {
func (b *BufferingAdder) SetOnFlush(fn func(summary roachpb.BulkOpSummary)) {
b.onFlush = fn
}

Expand Down Expand Up @@ -208,7 +208,7 @@ func (b *BufferingAdder) IsEmpty() bool {
func (b *BufferingAdder) Flush(ctx context.Context) error {
if b.curBuf.Len() == 0 {
if b.onFlush != nil {
b.onFlush()
b.onFlush(b.sink.GetBatchSummary())
}
return nil
}
Expand Down Expand Up @@ -266,7 +266,7 @@ func (b *BufferingAdder) Flush(ctx context.Context) error {
)
}
if b.onFlush != nil {
b.onFlush()
b.onFlush(b.sink.GetBatchSummary())
}
b.curBuf.Reset()
return nil
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/bulk/sst_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ func (b *SSTBatcher) doFlush(ctx context.Context, reason int, nextKey roachpb.Ke
}
}

b.rowCounter.DataSize += b.sstWriter.DataSize
b.totalRows.Add(b.rowCounter.BulkOpSummary)
b.totalRows.DataSize += b.sstWriter.DataSize
return nil
}

Expand All @@ -363,6 +363,11 @@ func (b *SSTBatcher) Close() {
b.sstWriter.Close()
}

// GetBatchSummary returns this batcher's total added rows/bytes/etc.
func (b *SSTBatcher) GetBatchSummary() roachpb.BulkOpSummary {
return b.rowCounter.BulkOpSummary
}

// GetSummary returns this batcher's total added rows/bytes/etc.
func (b *SSTBatcher) GetSummary() roachpb.BulkOpSummary {
return b.totalRows
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvserverbase/bulk_adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type BulkAdder interface {
// Close closes the underlying buffers/writers.
Close(ctx context.Context)
// SetOnFlush sets a callback function called after flushing the buffer.
SetOnFlush(func())
SetOnFlush(func(summary roachpb.BulkOpSummary))
}

// DuplicateKeyError represents a failed attempt to ingest the same key twice
Expand Down
27 changes: 27 additions & 0 deletions pkg/sql/distsql_plan_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/logtags"
)
Expand Down Expand Up @@ -103,6 +104,12 @@ func (c *callbackResultWriter) Err() error {
return c.err
}

func getLastImportSummary(job *jobs.Job) roachpb.BulkOpSummary {
progress := job.Progress()
importProgress := progress.GetImport()
return importProgress.Summary
}

func makeImportReaderSpecs(
job *jobs.Job,
tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable,
Expand Down Expand Up @@ -203,6 +210,17 @@ func DistIngest(
return roachpb.BulkOpSummary{}, err
}

// accumulatedBulkSummary accumulates the BulkOpSummary returned from each
// processor in their progress updates. It stores stats about the amount of
// data written since the last time we update the job progress.
accumulatedBulkSummary := struct {
syncutil.Mutex
roachpb.BulkOpSummary
}{}
accumulatedBulkSummary.Lock()
accumulatedBulkSummary.BulkOpSummary = getLastImportSummary(job)
accumulatedBulkSummary.Unlock()

inputSpecs := makeImportReaderSpecs(job, tables, from, format, nodes, walltime, execCtx.User())

p := planCtx.NewPhysicalPlan()
Expand Down Expand Up @@ -263,6 +281,11 @@ func DistIngest(
prog.ReadProgress[i] = fileProgress
overall += fileProgress
}

accumulatedBulkSummary.Lock()
prog.Summary.Add(accumulatedBulkSummary.BulkOpSummary)
accumulatedBulkSummary.Reset()
accumulatedBulkSummary.Unlock()
return overall / float32(len(from))
},
)
Expand All @@ -277,6 +300,10 @@ func DistIngest(
atomic.StoreUint32(&fractionProgress[i], math.Float32bits(v))
}

accumulatedBulkSummary.Lock()
accumulatedBulkSummary.Add(meta.BulkProcessorProgress.BulkSummary)
accumulatedBulkSummary.Unlock()

if alwaysFlushProgress {
return updateJobProgress()
}
Expand Down
Loading

0 comments on commit ce65d43

Please sign in to comment.