Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
68218: importccl: persist bulk op summary during execution r=pbardea a=pbardea

Fixes cockroachdb#67987.
Fixes cockroachdb#67987.

Release note (bug fix): Fix a bug where the summary displayed after an
IMPORT command would sometimes be inaccurate due to retries.

Release justification: Bug fix. Fix a bug where the summary displayed after an
IMPORT command would sometimes be inaccurate due to retries.

69491: cli: add --{from,to} to `tsdump` command r=knz a=tbg

This allows restricting which range of datapoints is pulled by
`./cockroach debug tsdump` via the `--from` and `--to` flags.

This command also touches up the `tsdump` command a little bit.

Release justification: low-risk observability change
Release note (cli change): The `debug tsdump` command now accepts
`--from` and `--to` flags that limit for which dates timeseries
are exported.


Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
3 people committed Aug 30, 2021
3 parents 7057d2f + ce65d43 + ddfa4db commit b78f3fc
Show file tree
Hide file tree
Showing 15 changed files with 757 additions and 581 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ go_library(
"//pkg/util/log/eventpb",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/timeutil/pgdate",
"//pkg/util/tracing",
Expand Down
21 changes: 19 additions & 2 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -369,22 +370,33 @@ func ingestKvs(
pkFlushedRow := make([]int64, len(spec.Uri))
idxFlushedRow := make([]int64, len(spec.Uri))

bulkSummaryMu := &struct {
syncutil.Mutex
summary roachpb.BulkOpSummary
}{}

// When the PK adder flushes, everything written has been flushed, so we set
// pkFlushedRow to writtenRow. Additionally if the indexAdder is empty then we
// can treat it as flushed as well (in case we're not adding anything to it).
pkIndexAdder.SetOnFlush(func() {
pkIndexAdder.SetOnFlush(func(summary roachpb.BulkOpSummary) {
for i, emitted := range writtenRow {
atomic.StoreInt64(&pkFlushedRow[i], emitted)
bulkSummaryMu.Lock()
bulkSummaryMu.summary.Add(summary)
bulkSummaryMu.Unlock()
}
if indexAdder.IsEmpty() {
for i, emitted := range writtenRow {
atomic.StoreInt64(&idxFlushedRow[i], emitted)
}
}
})
indexAdder.SetOnFlush(func() {
indexAdder.SetOnFlush(func(summary roachpb.BulkOpSummary) {
for i, emitted := range writtenRow {
atomic.StoreInt64(&idxFlushedRow[i], emitted)
bulkSummaryMu.Lock()
bulkSummaryMu.summary.Add(summary)
bulkSummaryMu.Unlock()
}
})

Expand All @@ -411,6 +423,11 @@ func ingestKvs(
prog.ResumePos[file] = idx
}
prog.CompletedFraction[file] = math.Float32frombits(atomic.LoadUint32(&writtenFraction[offset]))
// Write down the summary of how much we've ingested since the last update.
bulkSummaryMu.Lock()
prog.BulkSummary = bulkSummaryMu.summary
bulkSummaryMu.summary.Reset()
bulkSummaryMu.Unlock()
}
progCh <- prog
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/importccl/import_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (r *errorReportingRowReceiver) ProducerDone() {}
// A do nothing bulk adder implementation.
type doNothingKeyAdder struct {
onKeyAdd func(key roachpb.Key)
onFlush func()
onFlush func(summary roachpb.BulkOpSummary)
}

var _ kvserverbase.BulkAdder = &doNothingKeyAdder{}
Expand All @@ -205,16 +205,16 @@ func (a *doNothingKeyAdder) Add(_ context.Context, k roachpb.Key, _ []byte) erro
}
func (a *doNothingKeyAdder) Flush(_ context.Context) error {
if a.onFlush != nil {
a.onFlush()
a.onFlush(roachpb.BulkOpSummary{})
}
return nil
}

func (*doNothingKeyAdder) IsEmpty() bool { return true }
func (*doNothingKeyAdder) CurrentBufferFill() float32 { return 0 }
func (*doNothingKeyAdder) GetSummary() roachpb.BulkOpSummary { return roachpb.BulkOpSummary{} }
func (*doNothingKeyAdder) Close(_ context.Context) {}
func (a *doNothingKeyAdder) SetOnFlush(f func()) { a.onFlush = f }
func (*doNothingKeyAdder) IsEmpty() bool { return true }
func (*doNothingKeyAdder) CurrentBufferFill() float32 { return 0 }
func (*doNothingKeyAdder) GetSummary() roachpb.BulkOpSummary { return roachpb.BulkOpSummary{} }
func (*doNothingKeyAdder) Close(_ context.Context) {}
func (a *doNothingKeyAdder) SetOnFlush(f func(_ roachpb.BulkOpSummary)) { a.onFlush = f }

var eofOffset int64 = math.MaxInt64

Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2152,9 +2152,21 @@ func ingestWithRetry(
}

if utilccl.IsPermanentBulkJobError(err) {
return roachpb.BulkOpSummary{}, err
return res, err
}

// Re-load the job in order to update our progress object, which may have
// been updated by the changeFrontier processor since the flow started.
reloadedJob, reloadErr := execCtx.ExecCfg().JobRegistry.LoadJob(ctx, job.ID())
if reloadErr != nil {
if ctx.Err() != nil {
return res, ctx.Err()
}
log.Warningf(ctx, `IMPORT job %d could not reload job progress when retrying: %+v`,
int64(job.ID()), reloadErr)
} else {
job = reloadedJob
}
log.Warningf(ctx, `encountered retryable error: %+v`, err)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,8 @@ func init() {

f = debugTimeSeriesDumpCmd.Flags()
f.Var(&debugTimeSeriesDumpOpts.format, "format", "output format (text, csv, tsv, raw)")
f.Var(&debugTimeSeriesDumpOpts.from, "from", "oldest timestamp to include (inclusive)")
f.Var(&debugTimeSeriesDumpOpts.to, "to", "newest timestamp to include (inclusive)")
}

func initPebbleCmds(cmd *cobra.Command) {
Expand Down
27 changes: 20 additions & 7 deletions pkg/cli/tsdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package cli

import (
"bufio"
"context"
"encoding/csv"
"fmt"
Expand All @@ -29,21 +30,31 @@ import (
// TODO(knz): this struct belongs elsewhere.
// See: https://github.com/cockroachdb/cockroach/issues/49509
var debugTimeSeriesDumpOpts = struct {
format tsDumpFormat
format tsDumpFormat
from, to timestampValue
}{
format: tsDumpText,
from: timestampValue{},
to: timestampValue(timeutil.Now().Add(24 * time.Hour)),
}

var debugTimeSeriesDumpCmd = &cobra.Command{
Use: "tsdump",
Short: "dump all the raw timeseries values in a cluster",
Long: `
Dumps all of the raw timeseries values in a cluster.
Dumps all of the raw timeseries values in a cluster. Only the default resolution
is retrieved, i.e. typically datapoints older than the value of the
'timeseries.storage.resolution_10s.ttl' cluster setting will be absent from the
output.
`,
RunE: clierrorplus.MaybeDecorateError(func(cmd *cobra.Command, args []string) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

req := &tspb.DumpRequest{
StartNanos: time.Time(debugTimeSeriesDumpOpts.from).UnixNano(),
EndNanos: time.Time(debugTimeSeriesDumpOpts.to).UnixNano(),
}
var w tsWriter
switch debugTimeSeriesDumpOpts.format {
case tsDumpRaw:
Expand All @@ -55,16 +66,18 @@ Dumps all of the raw timeseries values in a cluster.
defer finish()

tsClient := tspb.NewTimeSeriesClient(conn)
stream, err := tsClient.DumpRaw(context.Background(), &tspb.DumpRequest{})
stream, err := tsClient.DumpRaw(context.Background(), req)
if err != nil {
return err
}

if err := ts.DumpRawTo(stream, os.Stdout); err != nil {
// Buffer the writes to os.Stdout since we're going to
// be writing potentially a lot of data to it.
w := bufio.NewWriter(os.Stdout)
if err := ts.DumpRawTo(stream, w); err != nil {
return err
}
return os.Stdout.Sync()

return w.Flush()
case tsDumpCSV:
w = csvTSWriter{w: csv.NewWriter(os.Stdout)}
case tsDumpTSV:
Expand All @@ -84,7 +97,7 @@ Dumps all of the raw timeseries values in a cluster.
defer finish()

tsClient := tspb.NewTimeSeriesClient(conn)
stream, err := tsClient.Dump(context.Background(), &tspb.DumpRequest{})
stream, err := tsClient.Dump(context.Background(), req)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit b78f3fc

Please sign in to comment.