diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index 6905863a0652..fc7b46663e06 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -5198,7 +5198,7 @@ Support status: [reserved](#support-status) ## GetJobProfilerExecutionDetails -`GET /_status/job_profiler_execution_details/{job_id}/{filename}` +`GET /_status/job_profiler_execution_details/{job_id}` diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 200d3f3f5300..67681b035038 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -51,6 +51,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" @@ -307,8 +308,19 @@ func backup( } return nil } + tracingAggCh := make(chan *execinfrapb.TracingAggregatorEvents) + tracingAggLoop := func(ctx context.Context) error { + if err := bulk.AggregateTracingStats(ctx, job.ID(), + execCtx.ExecCfg().Settings, execCtx.ExecCfg().InternalDB, tracingAggCh); err != nil { + log.Warningf(ctx, "failed to aggregate tracing stats: %v", err) + // Drain the channel if the loop to aggregate tracing stats has returned + // an error. + for range tracingAggCh { + } + } + return nil + } - resumerSpan.RecordStructured(&types.StringValue{Value: "starting DistSQL backup execution"}) runBackup := func(ctx context.Context) error { return distBackup( ctx, @@ -316,11 +328,19 @@ func backup( planCtx, dsp, progCh, + tracingAggCh, backupSpecs, ) } - if err := ctxgroup.GoAndWait(ctx, jobProgressLoop, checkpointLoop, storePerNodeProgressLoop, runBackup); err != nil { + if err := ctxgroup.GoAndWait( + ctx, + jobProgressLoop, + checkpointLoop, + storePerNodeProgressLoop, + tracingAggLoop, + runBackup, + ); err != nil { return roachpb.RowCount{}, 0, errors.Wrapf(err, "exporting %d ranges", errors.Safe(numTotalSpans)) } diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index d5a346cb341a..3284520df937 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -92,13 +92,6 @@ var ( "send each export request with a verbose tracing span", util.ConstantWithMetamorphicTestBool("export_request_verbose_tracing", false), ) - - flushTracingAggregatorFrequency = settings.RegisterDurationSetting( - settings.TenantWritable, - "bulkio.backup.tracing_aggregator_flush_after", - "frequency at which backup tracing aggregator stats are flushed", - time.Second*1, - ) ) const ( @@ -194,7 +187,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { ctx, bp.agg = bulk.MakeTracingAggregatorWithSpan(ctx, fmt.Sprintf("%s-aggregator", backupProcessorName), bp.EvalCtx.Tracer) bp.aggTimer = timeutil.NewTimer() - bp.aggTimer.Reset(flushTracingAggregatorFrequency.Get(&bp.EvalCtx.Settings.SV)) + bp.aggTimer.Reset(15 * time.Second) bp.cancelAndWaitForWorker = func() { cancel() @@ -247,14 +240,12 @@ func (bp *backupDataProcessor) constructProgressProducerMeta( func (bp *backupDataProcessor) constructTracingAggregatorProducerMeta( ctx context.Context, ) *execinfrapb.ProducerMetadata { - // Take a copy so that we can send the progress address to the output - // processor. aggEvents := &execinfrapb.TracingAggregatorEvents{ SQLInstanceID: bp.flowCtx.NodeID.SQLInstanceID(), FlowID: bp.flowCtx.ID, Events: make(map[string][]byte), } - bp.agg.ForEachAggregatedEvent(func(tag string, event bulk.TracingAggregatorEvent) { + bp.agg.ForEachAggregatedEvent(func(name string, event bulk.TracingAggregatorEvent) { msg, ok := event.(protoutil.Message) if !ok { // This should never happen but if it does skip the aggregated event. @@ -267,7 +258,7 @@ func (bp *backupDataProcessor) constructTracingAggregatorProducerMeta( log.Warningf(ctx, "failed to unmarshal aggregated event: %v", err.Error()) return } - aggEvents.Events[tag] = data + aggEvents.Events[name] = data }) return &execinfrapb.ProducerMetadata{AggregatorEvents: aggEvents} @@ -282,18 +273,13 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer select { case prog, ok := <-bp.progCh: if !ok { - if bp.backupErr != nil { - bp.MoveToDraining(bp.backupErr) - return nil, bp.DrainHelper() - } - - bp.MoveToDraining(nil /* error */) + bp.MoveToDraining(bp.backupErr) return nil, bp.DrainHelper() } return nil, bp.constructProgressProducerMeta(prog) case <-bp.aggTimer.C: bp.aggTimer.Read = true - bp.aggTimer.Reset(flushTracingAggregatorFrequency.Get(&bp.EvalCtx.Settings.SV)) + bp.aggTimer.Reset(15 * time.Second) return nil, bp.constructTracingAggregatorProducerMeta(bp.Ctx()) } } diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index c919412ad316..71ea019d579d 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -156,6 +156,7 @@ func distBackup( planCtx *sql.PlanningCtx, dsp *sql.DistSQLPlanner, progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, + tracingAggCh chan *execinfrapb.TracingAggregatorEvents, backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, ) error { ctx, span := tracing.ChildSpan(ctx, "backupccl.distBackup") @@ -165,6 +166,7 @@ func distBackup( if len(backupSpecs) == 0 { close(progCh) + close(tracingAggCh) return nil } @@ -194,6 +196,10 @@ func distBackup( // Send the progress up a level to be written to the manifest. progCh <- meta.BulkProcessorProgress } + + if meta.AggregatorEvents != nil { + tracingAggCh <- meta.AggregatorEvents + } return nil } @@ -211,6 +217,7 @@ func distBackup( defer recv.Release() defer close(progCh) + defer close(tracingAggCh) execCfg := execCtx.ExecCfg() jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID) diff --git a/pkg/ccl/backupccl/backuppb/BUILD.bazel b/pkg/ccl/backupccl/backuppb/BUILD.bazel index ae5419e55cf9..7aec49c3c9e7 100644 --- a/pkg/ccl/backupccl/backuppb/BUILD.bazel +++ b/pkg/ccl/backupccl/backuppb/BUILD.bazel @@ -45,6 +45,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/cloud", + "//pkg/jobs", "//pkg/multitenant/mtinfopb", "//pkg/sql/parser", "//pkg/sql/protoreflect", @@ -54,6 +55,7 @@ go_library( "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//jsonpb", + "@com_github_gogo_protobuf//proto", "@io_opentelemetry_go_otel//attribute", ], ) diff --git a/pkg/ccl/backupccl/backuppb/backup.go b/pkg/ccl/backupccl/backuppb/backup.go index d0cc55bc915f..06d91e2317b5 100644 --- a/pkg/ccl/backupccl/backuppb/backup.go +++ b/pkg/ccl/backupccl/backuppb/backup.go @@ -12,8 +12,10 @@ import ( "encoding/json" "fmt" "math" + "strings" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" @@ -23,6 +25,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto "github.com/cockroachdb/errors" "github.com/gogo/protobuf/jsonpb" + "github.com/gogo/protobuf/proto" "go.opentelemetry.io/otel/attribute" ) @@ -226,9 +229,37 @@ func (e *ExportStats) Combine(other bulk.TracingAggregatorEvent) { } } -// Tag implements the TracingAggregatorEvent interface. -func (e *ExportStats) Tag() string { - return "ExportStats" +// String implements the AggregatorEvent and stringer interfaces. +func (e *ExportStats) String() string { + const mb = 1 << 20 + var b strings.Builder + if e.NumFiles > 0 { + b.WriteString(fmt.Sprintf("num_files: %d\n", e.NumFiles)) + } + if e.DataSize > 0 { + dataSizeMB := float64(e.DataSize) / mb + b.WriteString(fmt.Sprintf("data_size: %.2f MB\n", dataSizeMB)) + + if !e.StartTime.IsEmpty() && !e.EndTime.IsEmpty() { + duration := e.EndTime.GoTime().Sub(e.StartTime.GoTime()) + throughput := dataSizeMB / duration.Seconds() + b.WriteString(fmt.Sprintf("throughput: %.2f MB/s\n", throughput)) + } + } + + return b.String() +} + +// ProtoName implements the TracingAggregatorEvent interface. +func (e *ExportStats) ProtoName() string { + return proto.MessageName(e) +} + +var _ jobs.ProtobinExecutionDetailFile = &ExportStats{} + +// ToText implements the ProtobinExecutionDetailFile interface. +func (e *ExportStats) ToText() []byte { + return []byte(e.String()) } func init() { diff --git a/pkg/ccl/backupccl/backuppb/backup.proto b/pkg/ccl/backupccl/backuppb/backup.proto index 6e04cf548b02..5bfc322385ce 100644 --- a/pkg/ccl/backupccl/backuppb/backup.proto +++ b/pkg/ccl/backupccl/backuppb/backup.proto @@ -221,6 +221,8 @@ message BackupProgressTraceEvent { // ExportStats is a message containing information about each // Export{Request,Response}. message ExportStats { + option (gogoproto.goproto_stringer) = false; + // NumFiles is the number of SST files produced by the ExportRequest. int64 num_files = 1; // DataSize is the byte size of all the SST files produced by the diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index a8f826a5a2a2..4d28cedbfdc2 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -77,6 +77,7 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//jsonpb", + "@com_github_gogo_protobuf//proto", "@com_github_gogo_protobuf//types", "@com_github_klauspost_compress//gzip", "@com_github_prometheus_client_model//go", diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 50db531b23b3..b742a671e25d 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -82,14 +82,10 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j return } - resumerTraceFilename := fmt.Sprintf("resumer-trace-n%s.%s.binpb", + resumerTraceFilename := fmt.Sprintf("%s/resumer-trace/%s", r.ID().String(), timeutil.Now().Format("20060102_150405.00")) td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()} - b, err := protoutil.Marshal(&td) - if err != nil { - return - } - if err := WriteExecutionDetailFile(dumpCtx, resumerTraceFilename, b, r.db, jobID); err != nil { + if err := WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, r.db, jobID); err != nil { log.Warning(dumpCtx, "failed to write trace on resumer trace file") return } diff --git a/pkg/jobs/execution_detail_utils.go b/pkg/jobs/execution_detail_utils.go index 75e303aefff3..f2309fdda8c6 100644 --- a/pkg/jobs/execution_detail_utils.go +++ b/pkg/jobs/execution_detail_utils.go @@ -20,9 +20,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" + "github.com/gogo/protobuf/proto" "github.com/klauspost/compress/gzip" ) @@ -41,6 +42,20 @@ func compressChunk(chunkBuf []byte) ([]byte, error) { return gzipBuf.Bytes(), nil } +// WriteProtobinExecutionDetailFile writes a `binpb` file of the form +// `filename~.binpb` to the system.job_info table, with the +// contents of the passed in protobuf message. +func WriteProtobinExecutionDetailFile( + ctx context.Context, filename string, msg protoutil.Message, db isql.DB, jobID jobspb.JobID, +) error { + name := fmt.Sprintf("%s~%s.binpb", filename, proto.MessageName(msg)) + b, err := protoutil.Marshal(msg) + if err != nil { + return err + } + return WriteExecutionDetailFile(ctx, name, b, db, jobID) +} + // WriteExecutionDetailFile will break up data into chunks of a fixed size, and // gzip compress them before writing them to the job_info table. func WriteExecutionDetailFile( @@ -84,17 +99,29 @@ func WriteExecutionDetailFile( }) } -func stringifyProtobinFile(filename string, fileContents *bytes.Buffer) ([]byte, error) { - if strings.HasPrefix(filename, "resumer-trace") { - td := &jobspb.TraceData{} - if err := protoutil.Unmarshal(fileContents.Bytes(), td); err != nil { - return nil, err - } - rec := tracingpb.Recording(td.CollectedSpans) - return []byte(rec.String()), nil - } else { - return nil, errors.AssertionFailedf("unknown file %s", filename) +// ProtobinExecutionDetailFile interface encapsulates the methods that must be +// implemented by protobuf messages that are collected as part of a job's +// execution details. +type ProtobinExecutionDetailFile interface { + // ToText returns the human-readable text representation of the protobuf + // execution detail file. + ToText() []byte +} + +func stringifyProtobinFile(filename string, fileContents []byte) ([]byte, error) { + // A `binpb` execution detail file is expected to have its fully qualified + // proto name after the last `~` in the filename. See + // `WriteProtobinExecutionDetailFile` for details. + msg, err := protoreflect.DecodeMessage(strings.TrimSuffix( + filename[strings.LastIndex(filename, "~")+1:], ".binpb"), fileContents) + if err != nil { + return nil, err + } + f, ok := msg.(ProtobinExecutionDetailFile) + if !ok { + return nil, errors.Newf("protobuf in file %s is not a ProtobinExecutionDetailFile", filename) } + return f.ToText(), err } // ReadExecutionDetailFile will stitch together all the chunks corresponding to the @@ -148,7 +175,7 @@ func ReadExecutionDetailFile( if err := fetchFileContent(trimmedFilename); err != nil { return nil, err } - return stringifyProtobinFile(filename, buf) + return stringifyProtobinFile(trimmedFilename, buf.Bytes()) } if err := fetchFileContent(filename); err != nil { diff --git a/pkg/jobs/jobspb/BUILD.bazel b/pkg/jobs/jobspb/BUILD.bazel index 53607669ae8d..a47e9b703981 100644 --- a/pkg/jobs/jobspb/BUILD.bazel +++ b/pkg/jobs/jobspb/BUILD.bazel @@ -4,7 +4,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "jobspb", - srcs = ["wrap.go"], + srcs = [ + "jobs.go", + "wrap.go", + ], embed = [":jobspb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb", visibility = ["//visibility:public"], @@ -13,6 +16,7 @@ go_library( "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/descpb", "//pkg/sql/protoreflect", + "//pkg/util/tracing/tracingpb", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//jsonpb", ], diff --git a/pkg/jobs/jobspb/jobs.go b/pkg/jobs/jobspb/jobs.go new file mode 100644 index 000000000000..c0f8e62041c2 --- /dev/null +++ b/pkg/jobs/jobspb/jobs.go @@ -0,0 +1,19 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package jobspb + +import "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + +// ToText implements the ProtobinExecutionDetailFile interface. +func (t *TraceData) ToText() []byte { + rec := tracingpb.Recording(t.CollectedSpans) + return []byte(rec.String()) +} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 6a621ab9245d..2639e39b64a1 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -1429,6 +1429,8 @@ message RetriableExecutionFailure { string truncated_error = 6; } +// TraceData is used to capture the traces of a job when the resumer completes +// its execution. message TraceData { repeated util.tracing.tracingpb.RecordedSpan collected_spans = 1 [(gogoproto.nullable) = false]; } diff --git a/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go b/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go index 3e5d5c8308ba..98c597607ff2 100644 --- a/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go +++ b/pkg/kv/bulk/bulkpb/ingestion_performance_stats.go @@ -14,6 +14,7 @@ import ( "context" "fmt" "math" + "reflect" "sort" "strings" "time" @@ -87,8 +88,8 @@ func (s *IngestionPerformanceStats) Combine(other bulk.TracingAggregatorEvent) { } // Tag implements the TracingAggregatorEvent interface. -func (s *IngestionPerformanceStats) Tag() string { - return "IngestionPerformanceStats" +func (s *IngestionPerformanceStats) ProtoName() string { + return reflect.TypeOf(s).Elem().String() } // Render implements the TracingAggregatorEvent interface. diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index d56a9c0088f8..8aa50e9ef023 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -2545,7 +2545,7 @@ service Status { rpc GetJobProfilerExecutionDetails(GetJobProfilerExecutionDetailRequest) returns (GetJobProfilerExecutionDetailResponse) { option (google.api.http) = { - get: "/_status/job_profiler_execution_details/{job_id}/{filename}" + get: "/_status/job_profiler_execution_details/{job_id}" }; } diff --git a/pkg/sql/execinfrapb/data.go b/pkg/sql/execinfrapb/data.go index b83ad2db9d6f..1269b2a5f5e0 100644 --- a/pkg/sql/execinfrapb/data.go +++ b/pkg/sql/execinfrapb/data.go @@ -277,6 +277,8 @@ func RemoteProducerMetaToLocalMeta( meta.Metrics = v.Metrics case *RemoteProducerMetadata_Changefeed: meta.Changefeed = v.Changefeed + case *RemoteProducerMetadata_TracingAggregatorEvents: + meta.AggregatorEvents = v.TracingAggregatorEvents default: return *meta, false } diff --git a/pkg/sql/jobs_profiler_execution_details_test.go b/pkg/sql/jobs_profiler_execution_details_test.go index 60cc0c229837..1241dc322fad 100644 --- a/pkg/sql/jobs_profiler_execution_details_test.go +++ b/pkg/sql/jobs_profiler_execution_details_test.go @@ -349,10 +349,10 @@ func TestListProfilerExecutionDetails(t *testing.T) { runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) files := listExecutionDetails(t, s, jobspb.JobID(importJobID)) require.Len(t, files, 5) - require.Regexp(t, "distsql\\..*\\.html", files[0]) - require.Regexp(t, "goroutines\\..*\\.txt", files[1]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[2]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[3]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb", files[0]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb", files[1]) + require.Regexp(t, "distsql\\..*\\.html", files[2]) + require.Regexp(t, "goroutines\\..*\\.txt", files[3]) require.Regexp(t, "trace\\..*\\.zip", files[4]) // Resume the job, so it can write another DistSQL diagram and goroutine @@ -364,14 +364,14 @@ func TestListProfilerExecutionDetails(t *testing.T) { runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) files = listExecutionDetails(t, s, jobspb.JobID(importJobID)) require.Len(t, files, 10) - require.Regexp(t, "distsql\\..*\\.html", files[0]) - require.Regexp(t, "distsql\\..*\\.html", files[1]) - require.Regexp(t, "goroutines\\..*\\.txt", files[2]) - require.Regexp(t, "goroutines\\..*\\.txt", files[3]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[4]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[5]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb", files[6]) - require.Regexp(t, "resumer-trace-n[0-9].*\\.binpb\\.txt", files[7]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb", files[0]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt", files[1]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb", files[2]) + require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb.txt", files[3]) + require.Regexp(t, "distsql\\..*\\.html", files[4]) + require.Regexp(t, "distsql\\..*\\.html", files[5]) + require.Regexp(t, "goroutines\\..*\\.txt", files[6]) + require.Regexp(t, "goroutines\\..*\\.txt", files[7]) require.Regexp(t, "trace\\..*\\.zip", files[8]) require.Regexp(t, "trace\\..*\\.zip", files[9]) }) @@ -413,7 +413,7 @@ func checkExecutionDetails( client, err := s.GetAdminHTTPClient() require.NoError(t, err) - url := s.AdminURL().String() + fmt.Sprintf("/_status/job_profiler_execution_details/%d/%s", jobID, filename) + url := s.AdminURL().String() + fmt.Sprintf("/_status/job_profiler_execution_details/%d?%s", jobID, filename) req, err := http.NewRequest("GET", url, nil) require.NoError(t, err) diff --git a/pkg/ui/workspaces/cluster-ui/src/api/jobProfilerApi.ts b/pkg/ui/workspaces/cluster-ui/src/api/jobProfilerApi.ts index c4e3f19c9a86..5289a3d9f655 100644 --- a/pkg/ui/workspaces/cluster-ui/src/api/jobProfilerApi.ts +++ b/pkg/ui/workspaces/cluster-ui/src/api/jobProfilerApi.ts @@ -11,6 +11,9 @@ import { cockroach } from "@cockroachlabs/crdb-protobuf-client"; import { fetchData } from "./fetchData"; import { SqlExecutionRequest, executeInternalSql } from "./sqlApi"; +import { propsToQueryString } from "../util"; + +const JOB_PROFILER_PATH = "/_status/job_profiler_execution_details"; export type ListJobProfilerExecutionDetailsRequest = cockroach.server.serverpb.ListJobProfilerExecutionDetailsRequest; @@ -37,9 +40,14 @@ export const listExecutionDetailFiles = ( export const getExecutionDetailFile = ( req: GetJobProfilerExecutionDetailRequest, ): Promise => { + let jobProfilerPath = `${JOB_PROFILER_PATH}/${req.job_id}`; + const queryStr = propsToQueryString({ + filename: req.filename, + }); + jobProfilerPath = jobProfilerPath.concat(`?${queryStr}`); return fetchData( cockroach.server.serverpb.GetJobProfilerExecutionDetailResponse, - `/_status/job_profiler_execution_details/${req.job_id}/${req.filename}`, + jobProfilerPath, null, null, "30M", diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel index 12bd2397518b..6a949a94a2dd 100644 --- a/pkg/util/bulk/BUILD.bazel +++ b/pkg/util/bulk/BUILD.bazel @@ -3,13 +3,22 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "bulk", srcs = [ + "aggregator_stats.go", "iterator.go", "tracing_aggregator.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk", visibility = ["//visibility:public"], deps = [ + "//pkg/jobs", + "//pkg/jobs/jobspb", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/sql/execinfrapb", + "//pkg/sql/isql", + "//pkg/sql/protoreflect", "//pkg/util/syncutil", + "//pkg/util/timeutil", "//pkg/util/tracing", ], ) @@ -25,6 +34,7 @@ go_test( "//pkg/roachpb", "//pkg/util/tracing", "//pkg/util/tracing/tracingpb", + "@com_github_gogo_protobuf//proto", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/util/bulk/aggregator_stats.go b/pkg/util/bulk/aggregator_stats.go new file mode 100644 index 000000000000..6cc73866499f --- /dev/null +++ b/pkg/util/bulk/aggregator_stats.go @@ -0,0 +1,154 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk + +import ( + "bytes" + "context" + "fmt" + "time" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +var flushTracingAggregatorFrequency = settings.RegisterDurationSetting( + settings.TenantWritable, + "bulkio.backup.aggregator_stats_flush_frequency", + "frequency at which the coordinator node processes and persists tracing aggregator stats to storage", + 10*time.Minute, +) + +// flushTracingStats persists the following files to the `system.job_info` table +// for consumption by job observability tools: +// +// - A file per node, for each aggregated TracingAggregatorEvent. These files +// contain the machine-readable proto bytes of the TracingAggregatorEvent. +// +// - A text file that contains a cluster-wide and per-node summary of each +// TracingAggregatorEvent in its human-readable format. +func flushTracingStats( + ctx context.Context, + jobID jobspb.JobID, + db isql.DB, + perNodeStats map[execinfrapb.ComponentID]map[string][]byte, +) error { + clusterWideAggregatorStats := make(map[string]TracingAggregatorEvent) + asOf := timeutil.Now().Format("20060102_150405.00") + + var clusterWideSummary bytes.Buffer + for component, nameToEvent := range perNodeStats { + clusterWideSummary.WriteString(fmt.Sprintf("## SQL Instance ID: %s; Flow ID: %s\n\n", + component.SQLInstanceID.String(), component.FlowID.String())) + for name, event := range nameToEvent { + // Write a proto file per tag. This machine-readable file can be consumed + // by other places we want to display this information egs: annotated + // DistSQL diagrams, DBConsole etc. + filename := fmt.Sprintf("%s/%s", + component.SQLInstanceID.String(), asOf) + msg, err := protoreflect.DecodeMessage(name, event) + if err != nil { + clusterWideSummary.WriteString(fmt.Sprintf("invalid protocol message: %v", err)) + // If we failed to decode the event write the error to the file and + // carry on. + continue + } + + if err := jobs.WriteProtobinExecutionDetailFile(ctx, filename, msg, db, jobID); err != nil { + return err + } + + // Construct a single text file that contains information on a per-node + // basis as well as a cluster-wide aggregate. + clusterWideSummary.WriteString(fmt.Sprintf("# %s\n", name)) + + aggEvent := msg.(TracingAggregatorEvent) + clusterWideSummary.WriteString(aggEvent.String()) + clusterWideSummary.WriteString("\n") + + if _, ok := clusterWideAggregatorStats[name]; ok { + clusterWideAggregatorStats[name].Combine(aggEvent) + } else { + clusterWideAggregatorStats[name] = aggEvent + } + } + } + + for tag, event := range clusterWideAggregatorStats { + clusterWideSummary.WriteString("## Cluster-wide\n\n") + clusterWideSummary.WriteString(fmt.Sprintf("# %s\n", tag)) + clusterWideSummary.WriteString(event.String()) + } + + filename := fmt.Sprintf("aggregatorstats.%s.txt", asOf) + return jobs.WriteExecutionDetailFile(ctx, filename, clusterWideSummary.Bytes(), db, jobID) +} + +// AggregateTracingStats listens for AggregatorEvents on a channel and +// periodically processes them to human and machine-readable file formats that +// are persisted in the system.job_info table. These files can then be consumed +// for improved observability into the job's execution. +// +// This method does not return until the passed in channel is closed or an error +// is encountered. +func AggregateTracingStats( + ctx context.Context, + jobID jobspb.JobID, + st *cluster.Settings, + db isql.DB, + tracingAgg chan *execinfrapb.TracingAggregatorEvents, +) error { + perNodeAggregatorStats := make(map[execinfrapb.ComponentID]map[string][]byte) + + // AggregatorEvents are periodically received from each node in the DistSQL + // flow. + flushTimer := timeutil.NewTimer() + defer flushTimer.Stop() + flushTimer.Reset(flushTracingAggregatorFrequency.Get(&st.SV)) + + var flushOnClose bool + for agg := range tracingAgg { + flushOnClose = true + componentID := execinfrapb.ComponentID{ + FlowID: agg.FlowID, + SQLInstanceID: agg.SQLInstanceID, + } + + // Update the running aggregate of the component with the latest received + // aggregate. + perNodeAggregatorStats[componentID] = agg.Events + + select { + case <-flushTimer.C: + flushTimer.Read = true + flushTimer.Reset(flushTracingAggregatorFrequency.Get(&st.SV)) + // Flush the per-node and cluster wide aggregator stats to the job_info + // table. + if err := flushTracingStats(ctx, jobID, db, perNodeAggregatorStats); err != nil { + return err + } + flushOnClose = false + default: + } + } + if flushOnClose { + if err := flushTracingStats(ctx, jobID, db, perNodeAggregatorStats); err != nil { + return err + } + } + return nil +} diff --git a/pkg/util/bulk/tracing_aggregator_test.go b/pkg/util/bulk/tracing_aggregator_test.go index 84d52fa4cf01..11853c968688 100644 --- a/pkg/util/bulk/tracing_aggregator_test.go +++ b/pkg/util/bulk/tracing_aggregator_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" ) @@ -60,7 +61,8 @@ func TestAggregator(t *testing.T) { // We only expect to see the aggregated stats from the local children since we // have not imported the remote children's Recording. - exportStatsTag, found := aggSp.GetLazyTag("ExportStats") + exportStats := &backuppb.ExportStats{} + exportStatsTag, found := aggSp.GetLazyTag(proto.MessageName(exportStats)) require.True(t, found) var es *backuppb.ExportStats var ok bool @@ -79,7 +81,7 @@ func TestAggregator(t *testing.T) { // Now, we expect the ExportStats from the remote child to show up in the // aggregator. - exportStatsTag, found = aggSp.GetLazyTag("ExportStats") + exportStatsTag, found = aggSp.GetLazyTag(proto.MessageName(exportStats)) require.True(t, found) if es, ok = exportStatsTag.(*backuppb.ExportStats); !ok { t.Fatal("failed to cast LazyTag to expected type")