From 3956d8712018e4afe096a027a0a626061599dd52 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Tue, 1 Aug 2023 17:49:53 -0400 Subject: [PATCH] bulk,backupccl: process and persist aggregator stats This commit teaches the coordinator of the backup job to listen for `TracingAggregatorEvents` metas from nodes that are executing in the DistSQL flow. Each `TracingAggregatorEvent` can be identified by its tag. The received metas are categorized by node and further categorized by tag so that we have an up-to-date in-memory representation of the latest `TracingAggregatorEvent` of each tag on each node. Periodically, this in-memory state is flushed to the `system.job_info` table in both machine-readable and human-readable file formats: - A file per node, for each aggregated TracingAggregatorEvent. These files contain the machine-readable proto bytes of the TracingAggregatorEvent. - A text file that contains a cluster-wide and per-node summary of each TracingAggregatorEvent in its human-readable format. Example: ``` -- SQL Instance ID: 1; Flow ID: 831caaf5-75cd-4e00-9e11-9a7469727eb5 - ExportStats num_files: 443 data_size: 1639.29 MB throughput: 54.63 MB/s -- Cluster-wide -- ExportStats num_files: 443 data_size: 1639.29 MB throughput: 54.63 MB/s ``` These files can be viewed and downloaded in the Advanced Debugging tab of the job details page. The files wil help understand the execution state of the job at different points in time. Some future work items that will build off this infrastructure are: - Annotating the job's DistSQL diagram with per-processor stats. - Displaying relevant stats in the job details page. - Teaching restore, import and C2C jobs to also start persisting aggregator stats for improved observability. We are not equipped to handle special characters in the path of a status/admin server URL. To bypass this problem in the face of filenames with special characters we move the filename from the path component of the URL to a query parameter. Informs: #100126 Release note: None --- docs/generated/http/full.md | 2 +- pkg/ccl/backupccl/backup_job.go | 25 ++- pkg/ccl/backupccl/backup_processor.go | 24 +-- .../backupccl/backup_processor_planning.go | 7 + pkg/ccl/backupccl/backuppb/BUILD.bazel | 2 + pkg/ccl/backupccl/backuppb/backup.go | 37 +++- pkg/ccl/backupccl/backuppb/backup.proto | 2 + pkg/jobs/BUILD.bazel | 1 + pkg/jobs/adopt.go | 8 +- pkg/jobs/execution_detail_utils.go | 51 ++++-- pkg/jobs/jobspb/BUILD.bazel | 6 +- pkg/jobs/jobspb/jobs.go | 19 +++ pkg/jobs/jobspb/jobs.proto | 2 + .../bulkpb/ingestion_performance_stats.go | 5 +- pkg/server/serverpb/status.proto | 2 +- pkg/sql/execinfrapb/data.go | 2 + .../jobs_profiler_execution_details_test.go | 26 +-- .../cluster-ui/src/api/jobProfilerApi.ts | 10 +- pkg/util/bulk/BUILD.bazel | 12 ++ pkg/util/bulk/aggregator_stats.go | 160 ++++++++++++++++++ pkg/util/bulk/tracing_aggregator_test.go | 6 +- 21 files changed, 346 insertions(+), 63 deletions(-) create mode 100644 pkg/jobs/jobspb/jobs.go create mode 100644 pkg/util/bulk/aggregator_stats.go diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index e1ff55fa69e7..44ab440670bc 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -5200,7 +5200,7 @@ Support status: [reserved](#support-status) ## GetJobProfilerExecutionDetails -`GET /_status/job_profiler_execution_details/{job_id}/{filename}` +`GET /_status/job_profiler_execution_details/{job_id}` diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index ba069cbea748..0588690f2c0c 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -51,6 +51,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" @@ -324,8 +325,20 @@ func backup( } return nil } + tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents) + tracingAggLoop := func(ctx context.Context) error { + if err := bulk.AggregateTracingStats(ctx, job.ID(), + execCtx.ExecCfg().Settings, execCtx.ExecCfg().InternalDB, tracingAggCh); err != nil { + log.Warningf(ctx, "failed to aggregate tracing stats: %v", err) + // Even if we fail to aggregate tracing stats, we must continue draining + // the channel so that the sender in the DistSQLReceiver does not block + // and allows the backup to continue uninterrupted. + for range tracingAggCh { + } + } + return nil + } - resumerSpan.RecordStructured(&types.StringValue{Value: "starting DistSQL backup execution"}) runBackup := func(ctx context.Context) error { return distBackup( ctx, @@ -333,11 +346,19 @@ func backup( planCtx, dsp, progCh, + tracingAggCh, backupSpecs, ) } - if err := ctxgroup.GoAndWait(ctx, jobProgressLoop, checkpointLoop, storePerNodeProgressLoop, runBackup); err != nil { + if err := ctxgroup.GoAndWait( + ctx, + jobProgressLoop, + checkpointLoop, + storePerNodeProgressLoop, + tracingAggLoop, + runBackup, + ); err != nil { return roachpb.RowCount{}, 0, errors.Wrapf(err, "exporting %d ranges", errors.Safe(numTotalSpans)) } diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 3cc093817b15..c58fd2a70814 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -94,13 +94,6 @@ var ( util.ConstantWithMetamorphicTestBool("export_request_verbose_tracing", false), ) - flushTracingAggregatorFrequency = settings.RegisterDurationSetting( - settings.TenantWritable, - "bulkio.backup.tracing_aggregator_flush_after", - "frequency at which backup tracing aggregator stats are flushed", - time.Second*1, - ) - testingDiscardBackupData = envutil.EnvOrDefaultBool("COCKROACH_BACKUP_TESTING_DISCARD_DATA", false) ) @@ -197,7 +190,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { ctx, bp.agg = bulk.MakeTracingAggregatorWithSpan(ctx, fmt.Sprintf("%s-aggregator", backupProcessorName), bp.EvalCtx.Tracer) bp.aggTimer = timeutil.NewTimer() - bp.aggTimer.Reset(flushTracingAggregatorFrequency.Get(&bp.EvalCtx.Settings.SV)) + bp.aggTimer.Reset(15 * time.Second) bp.cancelAndWaitForWorker = func() { cancel() @@ -250,14 +243,12 @@ func (bp *backupDataProcessor) constructProgressProducerMeta( func (bp *backupDataProcessor) constructTracingAggregatorProducerMeta( ctx context.Context, ) *execinfrapb.ProducerMetadata { - // Take a copy so that we can send the progress address to the output - // processor. aggEvents := &execinfrapb.TracingAggregatorEvents{ SQLInstanceID: bp.flowCtx.NodeID.SQLInstanceID(), FlowID: bp.flowCtx.ID, Events: make(map[string][]byte), } - bp.agg.ForEachAggregatedEvent(func(tag string, event bulk.TracingAggregatorEvent) { + bp.agg.ForEachAggregatedEvent(func(name string, event bulk.TracingAggregatorEvent) { msg, ok := event.(protoutil.Message) if !ok { // This should never happen but if it does skip the aggregated event. @@ -270,7 +261,7 @@ func (bp *backupDataProcessor) constructTracingAggregatorProducerMeta( log.Warningf(ctx, "failed to unmarshal aggregated event: %v", err.Error()) return } - aggEvents.Events[tag] = data + aggEvents.Events[name] = data }) return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents} @@ -285,18 +276,13 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer select { case prog, ok := <-bp.progCh: if !ok { - if bp.backupErr != nil { - bp.MoveToDraining(bp.backupErr) - return nil, bp.DrainHelper() - } - - bp.MoveToDraining(nil /* error */) + bp.MoveToDraining(bp.backupErr) return nil, bp.DrainHelper() } return nil, bp.constructProgressProducerMeta(prog) case <-bp.aggTimer.C: bp.aggTimer.Read = true - bp.aggTimer.Reset(flushTracingAggregatorFrequency.Get(&bp.EvalCtx.Settings.SV)) + bp.aggTimer.Reset(15 * time.Second) return nil, bp.constructTracingAggregatorProducerMeta(bp.Ctx()) } } diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index c919412ad316..71ea019d579d 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -156,6 +156,7 @@ func distBackup( planCtx *sql.PlanningCtx, dsp *sql.DistSQLPlanner, progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, + tracingAggCh chan *execinfrapb.TracingAggregatorEvents, backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, ) error { ctx, span := tracing.ChildSpan(ctx, "backupccl.distBackup") @@ -165,6 +166,7 @@ func distBackup( if len(backupSpecs) == 0 { close(progCh) + close(tracingAggCh) return nil } @@ -194,6 +196,10 @@ func distBackup( // Send the progress up a level to be written to the manifest. progCh <- meta.BulkProcessorProgress } + + if meta.AggregatorEvents != nil { + tracingAggCh <- meta.AggregatorEvents + } return nil } @@ -211,6 +217,7 @@ func distBackup( defer recv.Release() defer close(progCh) + defer close(tracingAggCh) execCfg := execCtx.ExecCfg() jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID) diff --git a/pkg/ccl/backupccl/backuppb/BUILD.bazel b/pkg/ccl/backupccl/backuppb/BUILD.bazel index ae5419e55cf9..7aec49c3c9e7 100644 --- a/pkg/ccl/backupccl/backuppb/BUILD.bazel +++ b/pkg/ccl/backupccl/backuppb/BUILD.bazel @@ -45,6 +45,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/cloud", + "//pkg/jobs", "//pkg/multitenant/mtinfopb", "//pkg/sql/parser", "//pkg/sql/protoreflect", @@ -54,6 +55,7 @@ go_library( "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//jsonpb", + "@com_github_gogo_protobuf//proto", "@io_opentelemetry_go_otel//attribute", ], ) diff --git a/pkg/ccl/backupccl/backuppb/backup.go b/pkg/ccl/backupccl/backuppb/backup.go index d0cc55bc915f..06d91e2317b5 100644 --- a/pkg/ccl/backupccl/backuppb/backup.go +++ b/pkg/ccl/backupccl/backuppb/backup.go @@ -12,8 +12,10 @@ import ( "encoding/json" "fmt" "math" + "strings" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" @@ -23,6 +25,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto "github.com/cockroachdb/errors" "github.com/gogo/protobuf/jsonpb" + "github.com/gogo/protobuf/proto" "go.opentelemetry.io/otel/attribute" ) @@ -226,9 +229,37 @@ func (e *ExportStats) Combine(other bulk.TracingAggregatorEvent) { } } -// Tag implements the TracingAggregatorEvent interface. -func (e *ExportStats) Tag() string { - return "ExportStats" +// String implements the AggregatorEvent and stringer interfaces. +func (e *ExportStats) String() string { + const mb = 1 << 20 + var b strings.Builder + if e.NumFiles > 0 { + b.WriteString(fmt.Sprintf("num_files: %d\n", e.NumFiles)) + } + if e.DataSize > 0 { + dataSizeMB := float64(e.DataSize) / mb + b.WriteString(fmt.Sprintf("data_size: %.2f MB\n", dataSizeMB)) + + if !e.StartTime.IsEmpty() && !e.EndTime.IsEmpty() { + duration := e.EndTime.GoTime().Sub(e.StartTime.GoTime()) + throughput := dataSizeMB / duration.Seconds() + b.WriteString(fmt.Sprintf("throughput: %.2f MB/s\n", throughput)) + } + } + + return b.String() +} + +// ProtoName implements the TracingAggregatorEvent interface. +func (e *ExportStats) ProtoName() string { + return proto.MessageName(e) +} + +var _ jobs.ProtobinExecutionDetailFile = &ExportStats{} + +// ToText implements the ProtobinExecutionDetailFile interface. +func (e *ExportStats) ToText() []byte { + return []byte(e.String()) } func init() { diff --git a/pkg/ccl/backupccl/backuppb/backup.proto b/pkg/ccl/backupccl/backuppb/backup.proto index 6e04cf548b02..5bfc322385ce 100644 --- a/pkg/ccl/backupccl/backuppb/backup.proto +++ b/pkg/ccl/backupccl/backuppb/backup.proto @@ -221,6 +221,8 @@ message BackupProgressTraceEvent { // ExportStats is a message containing information about each // Export{Request,Response}. message ExportStats { + option (gogoproto.goproto_stringer) = false; + // NumFiles is the number of SST files produced by the ExportRequest. int64 num_files = 1; // DataSize is the byte size of all the SST files produced by the diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index a8f826a5a2a2..4d28cedbfdc2 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -77,6 +77,7 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//jsonpb", + "@com_github_gogo_protobuf//proto", "@com_github_gogo_protobuf//types", "@com_github_klauspost_compress//gzip", "@com_github_prometheus_client_model//go", diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 04a579e8c62e..c9221790902d 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -82,14 +82,10 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j return } - resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.binpb", + resumerTraceFilename := fmt.Sprintf("%s/resumer-trace/%s", r.ID().String(), timeutil.Now().Format("20060102_150405.00")) td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()} - b, err := protoutil.Marshal(&td) - if err != nil { - return - } - if err := WriteExecutionDetailFile(dumpCtx, resumerTraceFilename, b, r.db, jobID); err != nil { + if err := WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, r.db, jobID); err != nil { log.Warning(dumpCtx, "failed to write trace on resumer trace file") return } diff --git a/pkg/jobs/execution_detail_utils.go b/pkg/jobs/execution_detail_utils.go index 75e303aefff3..f2309fdda8c6 100644 --- a/pkg/jobs/execution_detail_utils.go +++ b/pkg/jobs/execution_detail_utils.go @@ -20,9 +20,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" + "github.com/gogo/protobuf/proto" "github.com/klauspost/compress/gzip" ) @@ -41,6 +42,20 @@ func compressChunk(chunkBuf []byte) ([]byte, error) { return gzipBuf.Bytes(), nil } +// WriteProtobinExecutionDetailFile writes a `binpb` file of the form +// `filename~.binpb` to the system.job_info table, with the +// contents of the passed in protobuf message. +func WriteProtobinExecutionDetailFile( + ctx context.Context, filename string, msg protoutil.Message, db isql.DB, jobID jobspb.JobID, +) error { + name := fmt.Sprintf("%s~%s.binpb", filename, proto.MessageName(msg)) + b, err := protoutil.Marshal(msg) + if err != nil { + return err + } + return WriteExecutionDetailFile(ctx, name, b, db, jobID) +} + // WriteExecutionDetailFile will break up data into chunks of a fixed size, and // gzip compress them before writing them to the job_info table. func WriteExecutionDetailFile( @@ -84,17 +99,29 @@ func WriteExecutionDetailFile( }) } -func stringifyProtobinFile(filename string, fileContents *bytes.Buffer) ([]byte, error) { - if strings.HasPrefix(filename, "resumer-trace") { - td := &jobspb.TraceData{} - if err := protoutil.Unmarshal(fileContents.Bytes(), td); err != nil { - return nil, err - } - rec := tracingpb.Recording(td.CollectedSpans) - return []byte(rec.String()), nil - } else { - return nil, errors.AssertionFailedf("unknown file %s", filename) +// ProtobinExecutionDetailFile interface encapsulates the methods that must be +// implemented by protobuf messages that are collected as part of a job's +// execution details. +type ProtobinExecutionDetailFile interface { + // ToText returns the human-readable text representation of the protobuf + // execution detail file. + ToText() []byte +} + +func stringifyProtobinFile(filename string, fileContents []byte) ([]byte, error) { + // A `binpb` execution detail file is expected to have its fully qualified + // proto name after the last `~` in the filename. See + // `WriteProtobinExecutionDetailFile` for details. + msg, err := protoreflect.DecodeMessage(strings.TrimSuffix( + filename[strings.LastIndex(filename, "~")+1:], ".binpb"), fileContents) + if err != nil { + return nil, err + } + f, ok := msg.(ProtobinExecutionDetailFile) + if !ok { + return nil, errors.Newf("protobuf in file %s is not a ProtobinExecutionDetailFile", filename) } + return f.ToText(), err } // ReadExecutionDetailFile will stitch together all the chunks corresponding to the @@ -148,7 +175,7 @@ func ReadExecutionDetailFile( if err := fetchFileContent(trimmedFilename); err != nil { return nil, err } - return stringifyProtobinFile(filename, buf) + return stringifyProtobinFile(trimmedFilename, buf.Bytes()) } if err := fetchFileContent(filename); err != nil { diff --git a/pkg/jobs/jobspb/BUILD.bazel b/pkg/jobs/jobspb/BUILD.bazel index 53607669ae8d..a47e9b703981 100644 --- a/pkg/jobs/jobspb/BUILD.bazel +++ b/pkg/jobs/jobspb/BUILD.bazel @@ -4,7 +4,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "jobspb", - srcs = ["wrap.go"], + srcs = [ + "jobs.go", + "wrap.go", + ], embed = [":jobspb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb", visibility = ["//visibility:public"], @@ -13,6 +16,7 @@ go_library( "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/descpb", "//pkg/sql/protoreflect", + "//pkg/util/tracing/tracingpb", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//jsonpb", ], diff --git a/pkg/jobs/jobspb/jobs.go b/pkg/jobs/jobspb/jobs.go new file mode 100644 index 000000000000..c0f8e62041c2 --- /dev/null +++ b/pkg/jobs/jobspb/jobs.go @@ -0,0 +1,19 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package jobspb + +import "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + +// ToText implements the ProtobinExecutionDetailFile interface. +func (t *TraceData) ToText() []byte { + rec := tracingpb.Recording(t.CollectedSpans) + return []byte(rec.String()) +} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 6a621ab9245d..2639e39b64a1 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -1429,6 +1429,8 @@ message RetriableExecutionFailure { string truncated_error = 6; } +// TraceData is used to capture the traces of a job when the resumer completes +// its execution. message TraceData { repeated util.tracing.tracingpb.RecordedSpan collected_spans = 1 [(gogoproto.nullable) = false]; } diff --git a/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go b/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go index 3e5d5c8308ba..98c597607ff2 100644 --- a/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go +++ b/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "math" + "reflect" "sort" "strings" "time" @@ -87,8 +88,8 @@ func (s *IngestionPerformanceStats) Combine(other bulk.TracingAggregatorEvent) { } // Tag implements the TracingAggregatorEvent interface. -func (s *IngestionPerformanceStats) Tag() string { - return "IngestionPerformanceStats" +func (s *IngestionPerformanceStats) ProtoName() string { + return reflect.TypeOf(s).Elem().String() } // Render implements the TracingAggregatorEvent interface. diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index cbab6e4b0ad1..baaf92876a6c 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -2556,7 +2556,7 @@ service Status { rpc GetJobProfilerExecutionDetails(GetJobProfilerExecutionDetailRequest) returns (GetJobProfilerExecutionDetailResponse) { option (google.api.http) = { - get: "/_status/job_profiler_execution_details/{job_id}/{filename}" + get: "/_status/job_profiler_execution_details/{job_id}" }; } diff --git a/pkg/sql/execinfrapb/data.go b/pkg/sql/execinfrapb/data.go index b83ad2db9d6f..1269b2a5f5e0 100644 --- a/pkg/sql/execinfrapb/data.go +++ b/pkg/sql/execinfrapb/data.go @@ -277,6 +277,8 @@ func RemoteProducerMetaToLocalMeta( meta.Metrics = v.Metrics case *RemoteProducerMetadata_Changefeed: meta.Changefeed = v.Changefeed + case *RemoteProducerMetadata_TracingAggregatorEvents: + meta.AggregatorEvents = v.TracingAggregatorEvents default: return *meta, false } diff --git a/pkg/sql/jobs_profiler_execution_details_test.go b/pkg/sql/jobs_profiler_execution_details_test.go index 60cc0c229837..1241dc322fad 100644 --- a/pkg/sql/jobs_profiler_execution_details_test.go +++ b/pkg/sql/jobs_profiler_execution_details_test.go @@ -349,10 +349,10 @@ func TestListProfilerExecutionDetails(t *testing.T) { runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) files := listExecutionDetails(t, s, jobspb.JobID(importJobID)) require.Len(t, files, 5) - require.Regexp(t, "distsql\\..*\\.html", files[0]) - require.Regexp(t, "goroutines\\..*\\.txt", files[1]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[2]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[3]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb", files[0]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb", files[1]) + require.Regexp(t, "distsql\\..*\\.html", files[2]) + require.Regexp(t, "goroutines\\..*\\.txt", files[3]) require.Regexp(t, "trace\\..*\\.zip", files[4]) // Resume the job, so it can write another DistSQL diagram and goroutine @@ -364,14 +364,14 @@ func TestListProfilerExecutionDetails(t *testing.T) { runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) files = listExecutionDetails(t, s, jobspb.JobID(importJobID)) require.Len(t, files, 10) - require.Regexp(t, "distsql\\..*\\.html", files[0]) - require.Regexp(t, "distsql\\..*\\.html", files[1]) - require.Regexp(t, "goroutines\\..*\\.txt", files[2]) - require.Regexp(t, "goroutines\\..*\\.txt", files[3]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[4]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[5]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[6]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[7]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb", files[0]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt", files[1]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb", files[2]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt", files[3]) + require.Regexp(t, "distsql\\..*\\.html", files[4]) + require.Regexp(t, "distsql\\..*\\.html", files[5]) + require.Regexp(t, "goroutines\\..*\\.txt", files[6]) + require.Regexp(t, "goroutines\\..*\\.txt", files[7]) require.Regexp(t, "trace\\..*\\.zip", files[8]) require.Regexp(t, "trace\\..*\\.zip", files[9]) }) @@ -413,7 +413,7 @@ func checkExecutionDetails( client, err := s.GetAdminHTTPClient() require.NoError(t, err) - url := s.AdminURL().String() + fmt.Sprintf("/_status/job_profiler_execution_details/%d/%s", jobID, filename) + url := s.AdminURL().String() + fmt.Sprintf("/_status/job_profiler_execution_details/%d?%s", jobID, filename) req, err := http.NewRequest("GET", url, nil) require.NoError(t, err) diff --git a/pkg/ui/workspaces/cluster-ui/src/api/jobProfilerApi.ts b/pkg/ui/workspaces/cluster-ui/src/api/jobProfilerApi.ts index c4e3f19c9a86..5289a3d9f655 100644 --- a/pkg/ui/workspaces/cluster-ui/src/api/jobProfilerApi.ts +++ b/pkg/ui/workspaces/cluster-ui/src/api/jobProfilerApi.ts @@ -11,6 +11,9 @@ import { cockroach } from "@cockroachlabs/crdb-protobuf-client"; import { fetchData } from "./fetchData"; import { SqlExecutionRequest, executeInternalSql } from "./sqlApi"; +import { propsToQueryString } from "../util"; + +const JOB_PROFILER_PATH = "/_status/job_profiler_execution_details"; export type ListJobProfilerExecutionDetailsRequest = cockroach.server.serverpb.ListJobProfilerExecutionDetailsRequest; @@ -37,9 +40,14 @@ export const listExecutionDetailFiles = ( export const getExecutionDetailFile = ( req: GetJobProfilerExecutionDetailRequest, ): Promise => { + let jobProfilerPath = `${JOB_PROFILER_PATH}/${req.job_id}`; + const queryStr = propsToQueryString({ + filename: req.filename, + }); + jobProfilerPath = jobProfilerPath.concat(`?${queryStr}`); return fetchData( cockroach.server.serverpb.GetJobProfilerExecutionDetailResponse, - `/_status/job_profiler_execution_details/${req.job_id}/${req.filename}`, + jobProfilerPath, null, null, "30M", diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel index 12bd2397518b..91c9ab242c70 100644 --- a/pkg/util/bulk/BUILD.bazel +++ b/pkg/util/bulk/BUILD.bazel @@ -3,14 +3,25 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "bulk", srcs = [ + "aggregator_stats.go", "iterator.go", "tracing_aggregator.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", + "//pkg/jobs", + "//pkg/jobs/jobspb", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql/execinfrapb", + "//pkg/sql/isql", + "//pkg/sql/protoreflect", "//pkg/util/syncutil", + "//pkg/util/timeutil", "//pkg/util/tracing", + "@com_github_cockroachdb_errors//:errors", ], ) @@ -25,6 +36,7 @@ go_test( "//pkg/roachpb", "//pkg/util/tracing", "//pkg/util/tracing/tracingpb", + "@com_github_gogo_protobuf//proto", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/util/bulk/aggregator_stats.go b/pkg/util/bulk/aggregator_stats.go new file mode 100644 index 000000000000..c04395a50080 --- /dev/null +++ b/pkg/util/bulk/aggregator_stats.go @@ -0,0 +1,160 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk + +import ( + "bytes" + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" +) + +var flushTracingAggregatorFrequency = settings.RegisterDurationSetting( + settings.TenantWritable, + "bulkio.backup.aggregator_stats_flush_frequency", + "frequency at which the coordinator node processes and persists tracing aggregator stats to storage", + 10*time.Minute, +) + +// flushTracingStats persists the following files to the `system.job_info` table +// for consumption by job observability tools: +// +// - A file per node, for each aggregated TracingAggregatorEvent. These files +// contain the machine-readable proto bytes of the TracingAggregatorEvent. +// +// - A text file that contains a cluster-wide and per-node summary of each +// TracingAggregatorEvent in its human-readable format. +func flushTracingStats( + ctx context.Context, + jobID jobspb.JobID, + db isql.DB, + perNodeStats map[execinfrapb.ComponentID]map[string][]byte, +) error { + clusterWideAggregatorStats := make(map[string]TracingAggregatorEvent) + asOf := timeutil.Now().Format("20060102_150405.00") + + var clusterWideSummary bytes.Buffer + for component, nameToEvent := range perNodeStats { + clusterWideSummary.WriteString(fmt.Sprintf("## SQL Instance ID: %s; Flow ID: %s\n\n", + component.SQLInstanceID.String(), component.FlowID.String())) + for name, event := range nameToEvent { + // Write a proto file per tag. This machine-readable file can be consumed + // by other places we want to display this information egs: annotated + // DistSQL diagrams, DBConsole etc. + filename := fmt.Sprintf("%s/%s", + component.SQLInstanceID.String(), asOf) + msg, err := protoreflect.DecodeMessage(name, event) + if err != nil { + clusterWideSummary.WriteString(fmt.Sprintf("invalid protocol message: %v", err)) + // If we failed to decode the event write the error to the file and + // carry on. + continue + } + + if err := jobs.WriteProtobinExecutionDetailFile(ctx, filename, msg, db, jobID); err != nil { + return err + } + + // Construct a single text file that contains information on a per-node + // basis as well as a cluster-wide aggregate. + clusterWideSummary.WriteString(fmt.Sprintf("# %s\n", name)) + + aggEvent := msg.(TracingAggregatorEvent) + clusterWideSummary.WriteString(aggEvent.String()) + clusterWideSummary.WriteString("\n") + + if _, ok := clusterWideAggregatorStats[name]; ok { + clusterWideAggregatorStats[name].Combine(aggEvent) + } else { + clusterWideAggregatorStats[name] = aggEvent + } + } + } + + for tag, event := range clusterWideAggregatorStats { + clusterWideSummary.WriteString("## Cluster-wide\n\n") + clusterWideSummary.WriteString(fmt.Sprintf("# %s\n", tag)) + clusterWideSummary.WriteString(event.String()) + } + + filename := fmt.Sprintf("aggregatorstats.%s.txt", asOf) + return jobs.WriteExecutionDetailFile(ctx, filename, clusterWideSummary.Bytes(), db, jobID) +} + +// AggregateTracingStats listens for AggregatorEvents on a channel and +// periodically processes them to human and machine-readable file formats that +// are persisted in the system.job_info table. These files can then be consumed +// for improved observability into the job's execution. +// +// This method does not return until the passed in channel is closed or an error +// is encountered. +func AggregateTracingStats( + ctx context.Context, + jobID jobspb.JobID, + st *cluster.Settings, + db isql.DB, + tracingAgg chan *execinfrapb.TracingAggregatorEvents, +) error { + if !st.Version.IsActive(ctx, clusterversion.V23_2Start) { + return errors.Newf("aggregator stats are supported when the cluster version >= %s", + clusterversion.V23_2Start.String()) + } + perNodeAggregatorStats := make(map[execinfrapb.ComponentID]map[string][]byte) + + // AggregatorEvents are periodically received from each node in the DistSQL + // flow. + flushTimer := timeutil.NewTimer() + defer flushTimer.Stop() + flushTimer.Reset(flushTracingAggregatorFrequency.Get(&st.SV)) + + var flushOnClose bool + for agg := range tracingAgg { + flushOnClose = true + componentID := execinfrapb.ComponentID{ + FlowID: agg.FlowID, + SQLInstanceID: agg.SQLInstanceID, + } + + // Update the running aggregate of the component with the latest received + // aggregate. + perNodeAggregatorStats[componentID] = agg.Events + + select { + case <-flushTimer.C: + flushTimer.Read = true + flushTimer.Reset(flushTracingAggregatorFrequency.Get(&st.SV)) + // Flush the per-node and cluster wide aggregator stats to the job_info + // table. + if err := flushTracingStats(ctx, jobID, db, perNodeAggregatorStats); err != nil { + return err + } + flushOnClose = false + default: + } + } + if flushOnClose { + if err := flushTracingStats(ctx, jobID, db, perNodeAggregatorStats); err != nil { + return err + } + } + return nil +} diff --git a/pkg/util/bulk/tracing_aggregator_test.go b/pkg/util/bulk/tracing_aggregator_test.go index 84d52fa4cf01..11853c968688 100644 --- a/pkg/util/bulk/tracing_aggregator_test.go +++ b/pkg/util/bulk/tracing_aggregator_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" ) @@ -60,7 +61,8 @@ func TestAggregator(t *testing.T) { // We only expect to see the aggregated stats from the local children since we // have not imported the remote children's Recording. - exportStatsTag, found := aggSp.GetLazyTag("ExportStats") + exportStats := &backuppb.ExportStats{} + exportStatsTag, found := aggSp.GetLazyTag(proto.MessageName(exportStats)) require.True(t, found) var es *backuppb.ExportStats var ok bool @@ -79,7 +81,7 @@ func TestAggregator(t *testing.T) { // Now, we expect the ExportStats from the remote child to show up in the // aggregator. - exportStatsTag, found = aggSp.GetLazyTag("ExportStats") + exportStatsTag, found = aggSp.GetLazyTag(proto.MessageName(exportStats)) require.True(t, found) if es, ok = exportStatsTag.(*backuppb.ExportStats); !ok { t.Fatal("failed to cast LazyTag to expected type")