Skip to content

Commit

Permalink
bulk,backupccl: process and persist aggregator stats
Browse files Browse the repository at this point in the history
This commit teaches the coordinator of the backup job
to listen for `TracingAggregatorEvents` metas
from nodes that are executing in the DistSQL flow.
Each `TracingAggregatorEvent` can be identified by its
tag. The received metas are categorized by node and further
categorized by tag so that we have an up-to-date in-memory
representation of the latest `TracingAggregatorEvent` of each
tag on each node.

Periodically, this in-memory state is flushed to the `system.job_info`
table in both machine-readable and human-readable file formats:

- A file per node, for each aggregated TracingAggregatorEvent. These files
  contain the machine-readable proto bytes of the TracingAggregatorEvent.

- A text file that contains a cluster-wide and per-node summary of each
  TracingAggregatorEvent in its human-readable format.

Example:
```
-- SQL Instance ID: 1; Flow ID: 831caaf5-75cd-4e00-9e11-9a7469727eb5

- ExportStats
num_files: 443
data_size: 1639.29 MB
throughput: 54.63 MB/s

-- Cluster-wide

-- ExportStats
num_files: 443
data_size: 1639.29 MB
throughput: 54.63 MB/s

```

These files can be viewed and downloaded in the Advanced Debugging
tab of the job details page. The files wil help understand the execution
state of the job at different points in time.

Some future work items that will build off this infrastructure are:
- Annotating the job's DistSQL diagram with per-processor stats.
- Displaying relevant stats in the job details page.
- Teaching restore, import and C2C jobs to also start persisting aggregator
  stats for improved observability.

We are not equipped to handle special characters
in the path of a status/admin server URL. To bypass
this problem in the face of filenames with special
characters we move the filename from the path component
of the URL to a query parameter.

Informs: cockroachdb#100126
Release note: None
  • Loading branch information
adityamaru committed Aug 8, 2023
1 parent 61d26de commit f286f1e
Show file tree
Hide file tree
Showing 21 changed files with 337 additions and 63 deletions.
2 changes: 1 addition & 1 deletion docs/generated/http/full.md
Original file line number Diff line number Diff line change
Expand Up @@ -5198,7 +5198,7 @@ Support status: [reserved](#support-status)

## GetJobProfilerExecutionDetails

`GET /_status/job_profiler_execution_details/{job_id}/{filename}`
`GET /_status/job_profiler_execution_details/{job_id}`



Expand Down
24 changes: 22 additions & 2 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
Expand Down Expand Up @@ -307,20 +308,39 @@ func backup(
}
return nil
}
tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents)
tracingAggLoop := func(ctx context.Context) error {
if err := bulk.AggregateTracingStats(ctx, job.ID(),
execCtx.ExecCfg().Settings, execCtx.ExecCfg().InternalDB, tracingAggCh); err != nil {
log.Warningf(ctx, "failed to aggregate tracing stats: %v", err)
// Drain the channel if the loop to aggregate tracing stats has returned
// an error.
for range tracingAggCh {
}
}
return nil
}

resumerSpan.RecordStructured(&types.StringValue{Value: "starting DistSQL backup execution"})
runBackup := func(ctx context.Context) error {
return distBackup(
ctx,
execCtx,
planCtx,
dsp,
progCh,
tracingAggCh,
backupSpecs,
)
}

if err := ctxgroup.GoAndWait(ctx, jobProgressLoop, checkpointLoop, storePerNodeProgressLoop, runBackup); err != nil {
if err := ctxgroup.GoAndWait(
ctx,
jobProgressLoop,
checkpointLoop,
storePerNodeProgressLoop,
tracingAggLoop,
runBackup,
); err != nil {
return roachpb.RowCount{}, 0, errors.Wrapf(err, "exporting %d ranges", errors.Safe(numTotalSpans))
}

Expand Down
24 changes: 5 additions & 19 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,6 @@ var (
"send each export request with a verbose tracing span",
util.ConstantWithMetamorphicTestBool("export_request_verbose_tracing", false),
)

flushTracingAggregatorFrequency = settings.RegisterDurationSetting(
settings.TenantWritable,
"bulkio.backup.tracing_aggregator_flush_after",
"frequency at which backup tracing aggregator stats are flushed",
time.Second*1,
)
)

const (
Expand Down Expand Up @@ -194,7 +187,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
ctx, bp.agg = bulk.MakeTracingAggregatorWithSpan(ctx,
fmt.Sprintf("%s-aggregator", backupProcessorName), bp.EvalCtx.Tracer)
bp.aggTimer = timeutil.NewTimer()
bp.aggTimer.Reset(flushTracingAggregatorFrequency.Get(&bp.EvalCtx.Settings.SV))
bp.aggTimer.Reset(15 * time.Second)

bp.cancelAndWaitForWorker = func() {
cancel()
Expand Down Expand Up @@ -247,14 +240,12 @@ func (bp *backupDataProcessor) constructProgressProducerMeta(
func (bp *backupDataProcessor) constructTracingAggregatorProducerMeta(
ctx context.Context,
) *execinfrapb.ProducerMetadata {
// Take a copy so that we can send the progress address to the output
// processor.
aggEvents := &execinfrapb.TracingAggregatorEvents{
SQLInstanceID: bp.flowCtx.NodeID.SQLInstanceID(),
FlowID: bp.flowCtx.ID,
Events: make(map[string][]byte),
}
bp.agg.ForEachAggregatedEvent(func(tag string, event bulk.TracingAggregatorEvent) {
bp.agg.ForEachAggregatedEvent(func(name string, event bulk.TracingAggregatorEvent) {
msg, ok := event.(protoutil.Message)
if !ok {
// This should never happen but if it does skip the aggregated event.
Expand All @@ -267,7 +258,7 @@ func (bp *backupDataProcessor) constructTracingAggregatorProducerMeta(
log.Warningf(ctx, "failed to unmarshal aggregated event: %v", err.Error())
return
}
aggEvents.Events[tag] = data
aggEvents.Events[name] = data
})

return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents}
Expand All @@ -282,18 +273,13 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer
select {
case prog, ok := <-bp.progCh:
if !ok {
if bp.backupErr != nil {
bp.MoveToDraining(bp.backupErr)
return nil, bp.DrainHelper()
}

bp.MoveToDraining(nil /* error */)
bp.MoveToDraining(bp.backupErr)
return nil, bp.DrainHelper()
}
return nil, bp.constructProgressProducerMeta(prog)
case <-bp.aggTimer.C:
bp.aggTimer.Read = true
bp.aggTimer.Reset(flushTracingAggregatorFrequency.Get(&bp.EvalCtx.Settings.SV))
bp.aggTimer.Reset(15 * time.Second)
return nil, bp.constructTracingAggregatorProducerMeta(bp.Ctx())
}
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ func distBackup(
planCtx *sql.PlanningCtx,
dsp *sql.DistSQLPlanner,
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
tracingAggCh chan *execinfrapb.TracingAggregatorEvents,
backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec,
) error {
ctx, span := tracing.ChildSpan(ctx, "backupccl.distBackup")
Expand All @@ -165,6 +166,7 @@ func distBackup(

if len(backupSpecs) == 0 {
close(progCh)
close(tracingAggCh)
return nil
}

Expand Down Expand Up @@ -194,6 +196,10 @@ func distBackup(
// Send the progress up a level to be written to the manifest.
progCh <- meta.BulkProcessorProgress
}

if meta.AggregatorEvents != nil {
tracingAggCh <- meta.AggregatorEvents
}
return nil
}

Expand All @@ -211,6 +217,7 @@ func distBackup(
defer recv.Release()

defer close(progCh)
defer close(tracingAggCh)
execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backuppb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/cloud",
"//pkg/jobs",
"//pkg/multitenant/mtinfopb",
"//pkg/sql/parser",
"//pkg/sql/protoreflect",
Expand All @@ -54,6 +55,7 @@ go_library(
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//proto",
"@io_opentelemetry_go_otel//attribute",
],
)
Expand Down
37 changes: 34 additions & 3 deletions pkg/ccl/backupccl/backuppb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"encoding/json"
"fmt"
"math"
"strings"

"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
Expand All @@ -23,6 +25,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"go.opentelemetry.io/otel/attribute"
)

Expand Down Expand Up @@ -226,9 +229,37 @@ func (e *ExportStats) Combine(other bulk.TracingAggregatorEvent) {
}
}

// Tag implements the TracingAggregatorEvent interface.
func (e *ExportStats) Tag() string {
return "ExportStats"
// String implements the AggregatorEvent and stringer interfaces.
func (e *ExportStats) String() string {
const mb = 1 << 20
var b strings.Builder
if e.NumFiles > 0 {
b.WriteString(fmt.Sprintf("num_files: %d\n", e.NumFiles))
}
if e.DataSize > 0 {
dataSizeMB := float64(e.DataSize) / mb
b.WriteString(fmt.Sprintf("data_size: %.2f MB\n", dataSizeMB))

if !e.StartTime.IsEmpty() && !e.EndTime.IsEmpty() {
duration := e.EndTime.GoTime().Sub(e.StartTime.GoTime())
throughput := dataSizeMB / duration.Seconds()
b.WriteString(fmt.Sprintf("throughput: %.2f MB/s\n", throughput))
}
}

return b.String()
}

// ProtoName implements the TracingAggregatorEvent interface.
func (e *ExportStats) ProtoName() string {
return proto.MessageName(e)
}

var _ jobs.ProtobinExecutionDetailFile = &ExportStats{}

// ToText implements the ProtobinExecutionDetailFile interface.
func (e *ExportStats) ToText() []byte {
return []byte(e.String())
}

func init() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ message BackupProgressTraceEvent {
// ExportStats is a message containing information about each
// Export{Request,Response}.
message ExportStats {
option (gogoproto.goproto_stringer) = false;

// NumFiles is the number of SST files produced by the ExportRequest.
int64 num_files = 1;
// DataSize is the byte size of all the SST files produced by the
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_klauspost_compress//gzip",
"@com_github_prometheus_client_model//go",
Expand Down
8 changes: 2 additions & 6 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,10 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j
return
}

resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.binpb",
resumerTraceFilename := fmt.Sprintf("%s/resumer-trace/%s",
r.ID().String(), timeutil.Now().Format("20060102_150405.00"))
td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()}
b, err := protoutil.Marshal(&td)
if err != nil {
return
}
if err := WriteExecutionDetailFile(dumpCtx, resumerTraceFilename, b, r.db, jobID); err != nil {
if err := WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, r.db, jobID); err != nil {
log.Warning(dumpCtx, "failed to write trace on resumer trace file")
return
}
Expand Down
51 changes: 39 additions & 12 deletions pkg/jobs/execution_detail_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/klauspost/compress/gzip"
)

Expand All @@ -41,6 +42,20 @@ func compressChunk(chunkBuf []byte) ([]byte, error) {
return gzipBuf.Bytes(), nil
}

// WriteProtobinExecutionDetailFile writes a `binpb` file of the form
// `filename~<proto.MessageName>.binpb` to the system.job_info table, with the
// contents of the passed in protobuf message.
func WriteProtobinExecutionDetailFile(
ctx context.Context, filename string, msg protoutil.Message, db isql.DB, jobID jobspb.JobID,
) error {
name := fmt.Sprintf("%s~%s.binpb", filename, proto.MessageName(msg))
b, err := protoutil.Marshal(msg)
if err != nil {
return err
}
return WriteExecutionDetailFile(ctx, name, b, db, jobID)
}

// WriteExecutionDetailFile will break up data into chunks of a fixed size, and
// gzip compress them before writing them to the job_info table.
func WriteExecutionDetailFile(
Expand Down Expand Up @@ -84,17 +99,29 @@ func WriteExecutionDetailFile(
})
}

func stringifyProtobinFile(filename string, fileContents *bytes.Buffer) ([]byte, error) {
if strings.HasPrefix(filename, "resumer-trace") {
td := &jobspb.TraceData{}
if err := protoutil.Unmarshal(fileContents.Bytes(), td); err != nil {
return nil, err
}
rec := tracingpb.Recording(td.CollectedSpans)
return []byte(rec.String()), nil
} else {
return nil, errors.AssertionFailedf("unknown file %s", filename)
// ProtobinExecutionDetailFile interface encapsulates the methods that must be
// implemented by protobuf messages that are collected as part of a job's
// execution details.
type ProtobinExecutionDetailFile interface {
// ToText returns the human-readable text representation of the protobuf
// execution detail file.
ToText() []byte
}

func stringifyProtobinFile(filename string, fileContents []byte) ([]byte, error) {
// A `binpb` execution detail file is expected to have its fully qualified
// proto name after the last `~` in the filename. See
// `WriteProtobinExecutionDetailFile` for details.
msg, err := protoreflect.DecodeMessage(strings.TrimSuffix(
filename[strings.LastIndex(filename, "~")+1:], ".binpb"), fileContents)
if err != nil {
return nil, err
}
f, ok := msg.(ProtobinExecutionDetailFile)
if !ok {
return nil, errors.Newf("protobuf in file %s is not a ProtobinExecutionDetailFile", filename)
}
return f.ToText(), err
}

// ReadExecutionDetailFile will stitch together all the chunks corresponding to the
Expand Down Expand Up @@ -148,7 +175,7 @@ func ReadExecutionDetailFile(
if err := fetchFileContent(trimmedFilename); err != nil {
return nil, err
}
return stringifyProtobinFile(filename, buf)
return stringifyProtobinFile(trimmedFilename, buf.Bytes())
}

if err := fetchFileContent(filename); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "jobspb",
srcs = ["wrap.go"],
srcs = [
"jobs.go",
"wrap.go",
],
embed = [":jobspb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb",
visibility = ["//visibility:public"],
Expand All @@ -13,6 +16,7 @@ go_library(
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/protoreflect",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//jsonpb",
],
Expand Down
19 changes: 19 additions & 0 deletions pkg/jobs/jobspb/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package jobspb

import "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"

// ToText implements the ProtobinExecutionDetailFile interface.
func (t *TraceData) ToText() []byte {
rec := tracingpb.Recording(t.CollectedSpans)
return []byte(rec.String())
}
Loading

0 comments on commit f286f1e

Please sign in to comment.