From fc36b3cd6e63d50237bf8026be6de1586fbc96ae Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Thu, 7 Jul 2022 17:54:41 -0400 Subject: [PATCH] bulk, backupccl: introduce a Structured event Aggregator This change introduces an Aggregator object that is capable of listening for Structured events emitted in a recording, aggregating them and rendering them as LazyTags. We also introduce an AggregatorEvent interface that can be implemented by a Structured event thereby making it eligible for aggregation in the Aggregator. The first user of the Aggregator will be every backup data processor that is setup on the nodes in the cluster during a backup. The Aggregator lives as long as the processor, and listens for Aggregator events emitted by any span that is a child of the processors' span. This includes both local children as well as remote children whose recordings have been imported into a local span. The Aggregator stores running aggregates of each AggregatorEvent it is notified about, bucketed by the events' tag name. This aggregate will be rendered on the tracing span as a LazyTag. This change teaches every ExportRequest to emit an AggregatorEvent. Going forward we expect many more operations in bulk jobs to define and emit such events providing visibility into otherwise opaque operations. We cleanup some of the StructuredEvents that were previously added but have not proved useful, and also change some of the tracing span operation names to be more intuitive. To view these aggregated events once can navigate to the `/tracez` endpoint of the debug console to take a snapshot and search for either `BACKUP` or the job ID to filter for tracing spans on that node. The span associated with the backup processor will be decorated with tags that correspond to the fields in the introduced `ExportStats` proto message. Note these stats are only aggregated on a per node basis. Fixes: #80388 Release note: None --- pkg/BUILD.bazel | 4 + pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/backup_planning.go | 2 +- pkg/ccl/backupccl/backup_planning_tenant.go | 4 +- pkg/ccl/backupccl/backup_processor.go | 60 +++++----- .../backupccl/backup_processor_planning.go | 4 +- pkg/ccl/backupccl/backuppb/BUILD.bazel | 2 + pkg/ccl/backupccl/backuppb/backup.go | 61 ++++++++++ pkg/ccl/backupccl/backuppb/backup.proto | 28 ++--- pkg/jobs/adopt.go | 4 +- pkg/kv/kvserver/batcheval/cmd_export.go | 2 +- pkg/storage/mvcc.go | 2 +- pkg/util/bulk/BUILD.bazel | 27 +++++ pkg/util/bulk/aggregator.go | 105 ++++++++++++++++++ pkg/util/bulk/aggregator_test.go | 90 +++++++++++++++ pkg/util/stop/stopper.go | 2 +- pkg/util/tracing/crdbspan.go | 10 +- 17 files changed, 356 insertions(+), 52 deletions(-) create mode 100644 pkg/util/bulk/BUILD.bazel create mode 100644 pkg/util/bulk/aggregator.go create mode 100644 pkg/util/bulk/aggregator_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 178177aeff77..93d5f9f19777 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -514,6 +514,7 @@ ALL_TESTS = [ "//pkg/util/binfetcher:binfetcher_test", "//pkg/util/bitarray:bitarray_test", "//pkg/util/buildutil:buildutil_test", + "//pkg/util/bulk:bulk_test", "//pkg/util/cache:cache_test", "//pkg/util/caller:caller_test", "//pkg/util/cgroups:cgroups_test", @@ -1826,6 +1827,8 @@ GO_TARGETS = [ "//pkg/util/bufalloc:bufalloc", "//pkg/util/buildutil:buildutil", "//pkg/util/buildutil:buildutil_test", + "//pkg/util/bulk:bulk", + "//pkg/util/bulk:bulk_test", "//pkg/util/cache:cache", "//pkg/util/cache:cache_test", "//pkg/util/caller:caller", @@ -2799,6 +2802,7 @@ GET_X_DATA_TARGETS = [ "//pkg/util/bitarray:get_x_data", "//pkg/util/bufalloc:get_x_data", "//pkg/util/buildutil:get_x_data", + "//pkg/util/bulk:get_x_data", "//pkg/util/cache:get_x_data", "//pkg/util/caller:get_x_data", "//pkg/util/cancelchecker:get_x_data", diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 6b7836f15ec9..ed1bdc07a161 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -117,6 +117,7 @@ go_library( "//pkg/upgrade/upgrades", "//pkg/util", "//pkg/util/admission/admissionpb", + "//pkg/util/bulk", "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/hlc", diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 1077135314a8..ba928741b9d6 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1483,5 +1483,5 @@ func updateBackupDetails( } func init() { - sql.AddPlanHook("backup", backupPlanHook) + sql.AddPlanHook("backupccl.backupPlanHook", backupPlanHook) } diff --git a/pkg/ccl/backupccl/backup_planning_tenant.go b/pkg/ccl/backupccl/backup_planning_tenant.go index 1d88c577d0bb..0c6258f9879d 100644 --- a/pkg/ccl/backupccl/backup_planning_tenant.go +++ b/pkg/ccl/backupccl/backup_planning_tenant.go @@ -79,7 +79,7 @@ func retrieveSingleTenantMetadata( ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID, ) (descpb.TenantInfoWithUsage, error) { row, err := ie.QueryRow( - ctx, "backup-lookup-tenant", txn, + ctx, "backupccl.retrieveSingleTenantMetadata", txn, tenantMetadataQuery+` WHERE id = $1`, tenantID.ToUint64(), ) if err != nil { @@ -99,7 +99,7 @@ func retrieveAllTenantsMetadata( ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, ) ([]descpb.TenantInfoWithUsage, error) { rows, err := ie.QueryBuffered( - ctx, "backup-lookup-tenants", txn, + ctx, "backupccl.retrieveAllTenantsMetadata", txn, // XXX Should we add a `WHERE active`? We require the tenant to be active // when it is specified.. tenantMetadataQuery, diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index d926450a3a40..5594957a3499 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -106,6 +107,10 @@ type backupDataProcessor struct { // BoundAccount that reserves the memory usage of the backup processor. memAcc *mon.BoundAccount + + // Aggregator that aggregates StructuredEvents emitted in the + // backupDataProcessors' trace recording. + agg *bulk.Aggregator } var ( @@ -153,6 +158,12 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { ctx = logtags.AddTag(ctx, "job", bp.spec.JobID) ctx = bp.StartInternal(ctx, backupProcessorName) ctx, cancel := context.WithCancel(ctx) + + // Construct an Aggregator to aggregate and render AggregatorEvents emitted in + // bps' trace recording. + ctx, bp.agg = bulk.MakeAggregatorWithSpan(ctx, + fmt.Sprintf("%s-aggregator", backupProcessorName)) + bp.cancelAndWaitForWorker = func() { cancel() for range bp.progCh { @@ -160,7 +171,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { } log.Infof(ctx, "starting backup data") if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{ - TaskName: "backup-worker", + TaskName: "backupDataProcessor.runBackupProcessor", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc) @@ -198,6 +209,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() + bp.agg.Close() if bp.InternalClose() { bp.memAcc.Close(bp.Ctx) } @@ -387,26 +399,16 @@ func runBackupProcessor( Source: roachpb.AdmissionHeader_FROM_SQL, NoMemoryReservedAtSource: true, } - log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)", + log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", span.span, span.attempts+1, header.UserPriority.String()) var rawResp roachpb.Response var pErr *roachpb.Error - var reqSentTime time.Time - var respReceivedTime time.Time + requestSentAt := timeutil.Now() exportRequestErr := contextutil.RunWithTimeout(ctx, fmt.Sprintf("ExportRequest for span %s", span.span), timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - reqSentTime = timeutil.Now() - backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceRequestEvent{ - Span: span.span.String(), - Attempt: int32(span.attempts + 1), - Priority: header.UserPriority.String(), - ReqSentTime: reqSentTime.String(), - }) - rawResp, pErr = kv.SendWrappedWithAdmission( ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req) - respReceivedTime = timeutil.Now() if pErr != nil { return pErr.GoError() } @@ -419,9 +421,7 @@ func runBackupProcessor( todo <- span // TODO(dt): send a progress update to update job progress to note // the intents being hit. - backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceResponseEvent{ - RetryableError: tracing.RedactAndTruncateError(intentErr), - }) + log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error()) continue } // TimeoutError improves the opaque `context deadline exceeded` error @@ -480,19 +480,12 @@ func runBackupProcessor( completedSpans = 1 } - duration := respReceivedTime.Sub(reqSentTime) - exportResponseTraceEvent := &backuppb.BackupExportTraceResponseEvent{ - Duration: duration.String(), - FileSummaries: make([]roachpb.RowCount, 0), - } - if len(resp.Files) > 1 { log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") } for i, file := range resp.Files { entryCounts := countRows(file.Exported, spec.PKIDs) - exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, entryCounts) ret := exportedSpan{ // BackupManifest_File just happens to contain the exact fields @@ -521,8 +514,9 @@ func runBackupProcessor( return ctx.Err() } } - exportResponseTraceEvent.NumFiles = int32(len(resp.Files)) - backupProcessorSpan.RecordStructured(exportResponseTraceEvent) + + // Emit the stats for the processed ExportRequest. + recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt)) default: // No work left to do, so we can exit. Note that another worker could @@ -576,6 +570,22 @@ func runBackupProcessor( return grp.Wait() } +// recordExportStats emits a StructuredEvent containing the stats about the +// evaluated ExportRequest. +func recordExportStats( + sp *tracing.Span, resp *roachpb.ExportResponse, exportDuration time.Duration, +) { + if sp == nil { + return + } + exportStats := backuppb.ExportStats{Duration: exportDuration} + for _, f := range resp.Files { + exportStats.NumFiles++ + exportStats.DataSize += int64(len(f.SST)) + } + sp.RecordStructured(&exportStats) +} + func init() { rowexec.NewBackupDataProcessor = newBackupDataProcessor } diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index eb518b2ef8d8..42d5bfd07059 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -45,7 +45,7 @@ func distBackupPlanSpecs( startTime, endTime hlc.Timestamp, ) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) { var span *tracing.Span - ctx, span = tracing.ChildSpan(ctx, "backup-plan-specs") + ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs") _ = ctx // ctx is currently unused, but this new ctx should be used below in the future. defer span.Finish() user := execCtx.User() @@ -158,7 +158,7 @@ func distBackup( progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, ) error { - ctx, span := tracing.ChildSpan(ctx, "backup-distsql") + ctx, span := tracing.ChildSpan(ctx, "backupccl.distBackup") defer span.Finish() evalCtx := execCtx.ExtendedEvalContext() var noTxn *kv.Txn diff --git a/pkg/ccl/backupccl/backuppb/BUILD.bazel b/pkg/ccl/backupccl/backuppb/BUILD.bazel index b8643961005c..c9d57dabd390 100644 --- a/pkg/ccl/backupccl/backuppb/BUILD.bazel +++ b/pkg/ccl/backupccl/backuppb/BUILD.bazel @@ -48,9 +48,11 @@ go_library( "//pkg/sql/parser", "//pkg/sql/protoreflect", "//pkg/sql/sem/tree", + "//pkg/util/bulk", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//jsonpb", + "@io_opentelemetry_go_otel//attribute", ], ) diff --git a/pkg/ccl/backupccl/backuppb/backup.go b/pkg/ccl/backupccl/backuppb/backup.go index 0448199ebe06..4dc7f0b72135 100644 --- a/pkg/ccl/backupccl/backuppb/backup.go +++ b/pkg/ccl/backupccl/backuppb/backup.go @@ -10,15 +10,18 @@ package backuppb import ( "encoding/json" + "fmt" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/bulk" _ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto "github.com/cockroachdb/errors" "github.com/gogo/protobuf/jsonpb" + "go.opentelemetry.io/otel/attribute" ) // IsIncremental returns if the BackupManifest corresponds to an incremental @@ -123,6 +126,64 @@ func (m ScheduledBackupExecutionArgs) MarshalJSONPB(marshaller *jsonpb.Marshaler return json.Marshal(m) } +var _ bulk.AggregatorEvent = &ExportStats{} + +const ( + tagNumFiles = "num_files" + tagDataSize = "data_size" + tagThroughput = "throughput" +) + +// Render implements the LazyTag interface. +func (e *ExportStats) Render() []attribute.KeyValue { + const mb = 1 << 20 + tags := make([]attribute.KeyValue, 0) + if e.NumFiles > 0 { + tags = append(tags, attribute.KeyValue{ + Key: tagNumFiles, + Value: attribute.Int64Value(e.NumFiles), + }) + } + if e.DataSize > 0 { + dataSizeMB := float64(e.DataSize) / mb + tags = append(tags, attribute.KeyValue{ + Key: tagDataSize, + Value: attribute.StringValue(fmt.Sprintf("%.2f MB", dataSizeMB)), + }) + + if e.Duration > 0 { + throughput := dataSizeMB / e.Duration.Seconds() + tags = append(tags, attribute.KeyValue{ + Key: tagThroughput, + Value: attribute.StringValue(fmt.Sprintf("%.2f MB/s", throughput)), + }) + } + } + + return tags +} + +// Identity implements the AggregatorEvent interface. +func (e *ExportStats) Identity() bulk.AggregatorEvent { + return &ExportStats{} +} + +// Combine implements the AggregatorEvent interface. +func (e *ExportStats) Combine(other bulk.AggregatorEvent) { + otherExportStats, ok := other.(*ExportStats) + if !ok { + panic(fmt.Sprintf("`other` is not of type ExportStats: %T", other)) + } + e.NumFiles += otherExportStats.NumFiles + e.DataSize += otherExportStats.DataSize + e.Duration += otherExportStats.Duration +} + +// Tag implements the AggregatorEvent interface. +func (e *ExportStats) Tag() string { + return "ExportStats" +} + func init() { protoreflect.RegisterShorthands((*BackupManifest)(nil), "backup", "backup_manifest") } diff --git a/pkg/ccl/backupccl/backuppb/backup.proto b/pkg/ccl/backupccl/backuppb/backup.proto index aa8dcc12e2ef..caa125169601 100644 --- a/pkg/ccl/backupccl/backuppb/backup.proto +++ b/pkg/ccl/backupccl/backuppb/backup.proto @@ -193,22 +193,16 @@ message BackupProgressTraceEvent { util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false]; } -// BackupExportTraceRequestEvent is the trace event recorded when an -// ExportRequest has been sent. -message BackupExportTraceRequestEvent { - string span = 1; - int32 attempt = 2; - string priority = 3; - string req_sent_time = 4; -} - -// BackupExportTraceResponseEvent is the trace event recorded when we receive a -// response from the ExportRequest. -message BackupExportTraceResponseEvent { - string duration = 1; - int32 num_files = 2; - repeated roachpb.RowCount file_summaries = 3 [(gogoproto.nullable) = false]; - reserved 4 ; - string retryable_error = 5; +// ExportStats is a message containing information about each +// Export{Request,Response}. +message ExportStats { + // NumFiles is the number of SST files produced by the ExportRequest. + int64 num_files = 1; + // DataSize is the byte size of all the SST files produced by the + // ExportRequest. + int64 data_size = 2; + // Duration is the total time taken to send an ExportRequest, receive an + // ExportResponse and push the response on a channel. + int64 duration = 3 [(gogoproto.casttype) = "time.Duration"]; } diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 756a42b8399c..57057c9370f3 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -12,6 +12,7 @@ package jobs import ( "context" + "fmt" "strconv" "sync" @@ -389,7 +390,8 @@ func (r *Registry) runJob( // TODO(ajwerner): Move this writing up the trace ID down into // stepThroughStateMachine where we're already often (and soon with // exponential backoff, always) updating the job in that call. - ctx, span := r.ac.Tracer.StartSpanCtx(ctx, typ.String(), spanOptions...) + ctx, span := r.ac.Tracer.StartSpanCtx(ctx, + fmt.Sprintf("%s-%d", typ.String(), job.ID()), spanOptions...) span.SetTag("job-id", attribute.Int64Value(int64(job.ID()))) defer span.Finish() if span.TraceID() != 0 { diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 7b1e399df078..bdc17bb136c4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -102,7 +102,7 @@ func evalExport( h := cArgs.Header reply := resp.(*roachpb.ExportResponse) - ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey)) + ctx, evalExportSpan := tracing.ChildSpan(ctx, "evalExport") defer evalExportSpan.Finish() var evalExportTrace types.StringValue diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 68dc1e98a509..c8a25d28d24b 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -5562,7 +5562,7 @@ func MVCCExportToSST( ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer, ) (roachpb.BulkOpSummary, MVCCKey, error) { var span *tracing.Span - ctx, span = tracing.ChildSpan(ctx, "MVCCExportToSST") + ctx, span = tracing.ChildSpan(ctx, "storage.MVCCExportToSST") defer span.Finish() sstWriter := MakeBackupSSTWriter(ctx, cs, dest) defer sstWriter.Close() diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel new file mode 100644 index 000000000000..60172112c50d --- /dev/null +++ b/pkg/util/bulk/BUILD.bazel @@ -0,0 +1,27 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "bulk", + srcs = ["aggregator.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/syncutil", + "//pkg/util/tracing", + ], +) + +go_test( + name = "bulk_test", + srcs = ["aggregator_test.go"], + deps = [ + ":bulk", + "//pkg/ccl/backupccl/backuppb", + "//pkg/util/tracing", + "//pkg/util/tracing/tracingpb", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/util/bulk/aggregator.go b/pkg/util/bulk/aggregator.go new file mode 100644 index 000000000000..eb71d20b19c0 --- /dev/null +++ b/pkg/util/bulk/aggregator.go @@ -0,0 +1,105 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" +) + +// AggregatorEvent describes an event that can be aggregated and stored by the +// Aggregator. An AggregatorEvent also implements the tracing.LazyTag interface +// to render its information on the associated tracing span. +type AggregatorEvent interface { + tracing.LazyTag + + // Identity returns an AggregatorEvent that when combined with another + // event returns the other AggregatorEvent unchanged. + Identity() AggregatorEvent + // Combine combines two AggregatorEvents together. + Combine(other AggregatorEvent) + // Tag returns a string used to identify the AggregatorEvent. + Tag() string +} + +// An Aggregator can be used to aggregate and render AggregatorEvents that are +// emitted as part of its tracing spans' recording. +type Aggregator struct { + mu struct { + syncutil.Mutex + // aggregatedEvents is a mapping from the tag identifying the + // AggregatorEvent to the running aggregate of the AggregatorEvent. + aggregatedEvents map[string]AggregatorEvent + // sp is the tracing span managed by the Aggregator. + sp *tracing.Span + // closed is set to true if the Aggregator has already been closed. + closed bool + } +} + +// Notify implements the tracing.EventListener interface. +func (b *Aggregator) Notify(event tracing.Structured) { + bulkEvent, ok := event.(AggregatorEvent) + if !ok { + return + } + + b.mu.Lock() + defer b.mu.Unlock() + + // If this is the first AggregatorEvent with this tag, set it as a LazyTag on + // the associated tracing span. This way the AggregatorEvent will be + // dynamically Render()ed everytime we pull the tracing for the associated + // span. + eventTag := bulkEvent.Tag() + if _, ok := b.mu.aggregatedEvents[bulkEvent.Tag()]; !ok { + b.mu.aggregatedEvents[eventTag] = bulkEvent.Identity() + b.mu.sp.SetLazyTag(eventTag, b.mu.aggregatedEvents[eventTag]) + } + b.mu.aggregatedEvents[eventTag].Combine(bulkEvent) +} + +// Close is responsible for finishing the Aggregators' tracing span. +func (b *Aggregator) Close() { + b.mu.Lock() + defer b.mu.Unlock() + if !b.mu.closed { + b.mu.sp.Finish() + b.mu.closed = true + } +} + +var _ tracing.EventListener = &Aggregator{} + +// MakeAggregatorWithSpan returns an instance of an Aggregator along with a +// newly created child context. The Aggregator is registered as a +// tracing.EventListener on the span associated with newly created context. +// +// The Aggregator instance is responsible for finishing the returned span, and +// so the user must call Close(). +func MakeAggregatorWithSpan( + ctx context.Context, aggregatorName string, +) (context.Context, *Aggregator) { + agg := &Aggregator{} + sp := tracing.SpanFromContext(ctx) + + aggCtx, aggSpan := sp.Tracer().StartSpanCtx(ctx, aggregatorName, + tracing.WithEventListeners([]tracing.EventListener{agg}), tracing.WithParent(sp)) + + agg.mu.Lock() + defer agg.mu.Unlock() + agg.mu.aggregatedEvents = make(map[string]AggregatorEvent) + agg.mu.sp = aggSpan + + return aggCtx, agg +} diff --git a/pkg/util/bulk/aggregator_test.go b/pkg/util/bulk/aggregator_test.go new file mode 100644 index 000000000000..5b8eef1c553c --- /dev/null +++ b/pkg/util/bulk/aggregator_test.go @@ -0,0 +1,90 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk_test + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/stretchr/testify/require" +) + +func TestAggregator(t *testing.T) { + tr := tracing.NewTracer() + ctx := context.Background() + ctx, root := tr.StartSpanCtx(ctx, "root", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer root.Finish() + + ctx, agg := bulk.MakeAggregatorWithSpan(ctx, "mockAggregator") + aggSp := tracing.SpanFromContext(ctx) + defer agg.Close() + + child := tr.StartSpan("child", tracing.WithParent(root), + tracing.WithEventListeners([]tracing.EventListener{agg})) + defer child.Finish() + child.RecordStructured(&backuppb.ExportStats{ + NumFiles: 10, + DataSize: 10, + Duration: time.Minute, + }) + + _, childChild := tracing.ChildSpan(ctx, "childChild") + defer childChild.Finish() + childChild.RecordStructured(&backuppb.ExportStats{ + NumFiles: 20, + DataSize: 20, + Duration: time.Minute, + }) + + remoteChild := tr.StartSpan("remoteChild", tracing.WithRemoteParentFromSpanMeta(childChild.Meta())) + remoteChild.RecordStructured(&backuppb.ExportStats{ + NumFiles: 30, + DataSize: 30, + Duration: time.Minute, + }) + + // We only expect to see the aggregated stats from the local children since we + // have not imported the remote children's Recording. + exportStatsTag, found := aggSp.GetLazyTag("ExportStats") + require.True(t, found) + var es *backuppb.ExportStats + var ok bool + if es, ok = exportStatsTag.(*backuppb.ExportStats); !ok { + t.Fatal("failed to cast LazyTag to expected type") + } + require.Equal(t, backuppb.ExportStats{ + NumFiles: 30, + DataSize: 30, + Duration: 2 * time.Minute, + }, *es) + + // Import the remote recording into its parent. + rec := remoteChild.FinishAndGetConfiguredRecording() + childChild.ImportRemoteRecording(rec) + + // Now, we expect the ExportStats from the remote child to show up in the + // aggregator. + exportStatsTag, found = aggSp.GetLazyTag("ExportStats") + require.True(t, found) + if es, ok = exportStatsTag.(*backuppb.ExportStats); !ok { + t.Fatal("failed to cast LazyTag to expected type") + } + require.Equal(t, backuppb.ExportStats{ + NumFiles: 60, + DataSize: 60, + Duration: 3 * time.Minute, + }, *es) +} diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index 0b34aa6133fc..f9dd295cb019 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -461,7 +461,7 @@ func (s *Stopper) RunAsyncTaskEx(ctx context.Context, opt TaskOpts, f func(conte // // Note that we have to create the child in this parent goroutine; we can't // defer the creation to the spawned async goroutine since the parent span - // might get Finish()ed by then. However, we'll update the child'd goroutine + // might get Finish()ed by then. However, we'll update the child's goroutine // ID. var sp *tracing.Span switch opt.SpanOpt { diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index cbc062f32478..50a9cbf8540c 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -17,6 +17,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/ring" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -509,7 +510,11 @@ func (s *crdbSpan) recordFinishedChildren(childRecording tracingpb.Recording) { // children being added to s. for _, span := range childRecording { for _, record := range span.StructuredRecords { - s.notifyEventListeners(record.Payload) + var d types.DynamicAny + if err := types.UnmarshalAny(record.Payload, &d); err != nil { + continue + } + s.notifyEventListeners(d.Message.(protoutil.Message)) } } @@ -930,6 +935,9 @@ func (s *crdbSpan) getRecordingNoChildrenLocked( childKey := string(tag.Key) childValue := tag.Value.Emit() + if rs.Tags == nil { + rs.Tags = make(map[string]string) + } rs.Tags[childKey] = childValue tagGroup.Tags = append(tagGroup.Tags,