diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index a3f043071ac1..3dac56757e72 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -462,6 +462,7 @@ ALL_TESTS = [ "//pkg/util/binfetcher:binfetcher_test", "//pkg/util/bitarray:bitarray_test", "//pkg/util/buildutil:buildutil_test", + "//pkg/util/bulk:bulk_test", "//pkg/util/cache:cache_test", "//pkg/util/caller:caller_test", "//pkg/util/cgroups:cgroups_test", diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index d99b56820c85..f89c1fd64899 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -109,6 +109,7 @@ go_library( "//pkg/storage", "//pkg/util", "//pkg/util/admission/admissionpb", + "//pkg/util/bulk", "//pkg/util/contextutil", "//pkg/util/ctxgroup", "//pkg/util/hlc", diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 2c8e668f6329..321beb0de6ec 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1428,5 +1428,5 @@ func updateBackupDetails( } func init() { - sql.AddPlanHook("backup", backupPlanHook) + sql.AddPlanHook("backupccl.backupPlanHook", backupPlanHook) } diff --git a/pkg/ccl/backupccl/backup_planning_tenant.go b/pkg/ccl/backupccl/backup_planning_tenant.go index 1d88c577d0bb..0c6258f9879d 100644 --- a/pkg/ccl/backupccl/backup_planning_tenant.go +++ b/pkg/ccl/backupccl/backup_planning_tenant.go @@ -79,7 +79,7 @@ func retrieveSingleTenantMetadata( ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID, ) (descpb.TenantInfoWithUsage, error) { row, err := ie.QueryRow( - ctx, "backup-lookup-tenant", txn, + ctx, "backupccl.retrieveSingleTenantMetadata", txn, tenantMetadataQuery+` WHERE id = $1`, tenantID.ToUint64(), ) if err != nil { @@ -99,7 +99,7 @@ func retrieveAllTenantsMetadata( ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, ) ([]descpb.TenantInfoWithUsage, error) { rows, err := ie.QueryBuffered( - ctx, "backup-lookup-tenants", txn, + ctx, "backupccl.retrieveAllTenantsMetadata", txn, // XXX Should we add a `WHERE active`? We require the tenant to be active // when it is specified.. tenantMetadataQuery, diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index d926450a3a40..5594957a3499 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -106,6 +107,10 @@ type backupDataProcessor struct { // BoundAccount that reserves the memory usage of the backup processor. memAcc *mon.BoundAccount + + // Aggregator that aggregates StructuredEvents emitted in the + // backupDataProcessors' trace recording. + agg *bulk.Aggregator } var ( @@ -153,6 +158,12 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { ctx = logtags.AddTag(ctx, "job", bp.spec.JobID) ctx = bp.StartInternal(ctx, backupProcessorName) ctx, cancel := context.WithCancel(ctx) + + // Construct an Aggregator to aggregate and render AggregatorEvents emitted in + // bps' trace recording. + ctx, bp.agg = bulk.MakeAggregatorWithSpan(ctx, + fmt.Sprintf("%s-aggregator", backupProcessorName)) + bp.cancelAndWaitForWorker = func() { cancel() for range bp.progCh { @@ -160,7 +171,7 @@ func (bp *backupDataProcessor) Start(ctx context.Context) { } log.Infof(ctx, "starting backup data") if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{ - TaskName: "backup-worker", + TaskName: "backupDataProcessor.runBackupProcessor", SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc) @@ -198,6 +209,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer func (bp *backupDataProcessor) close() { bp.cancelAndWaitForWorker() + bp.agg.Close() if bp.InternalClose() { bp.memAcc.Close(bp.Ctx) } @@ -387,26 +399,16 @@ func runBackupProcessor( Source: roachpb.AdmissionHeader_FROM_SQL, NoMemoryReservedAtSource: true, } - log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)", + log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", span.span, span.attempts+1, header.UserPriority.String()) var rawResp roachpb.Response var pErr *roachpb.Error - var reqSentTime time.Time - var respReceivedTime time.Time + requestSentAt := timeutil.Now() exportRequestErr := contextutil.RunWithTimeout(ctx, fmt.Sprintf("ExportRequest for span %s", span.span), timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - reqSentTime = timeutil.Now() - backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceRequestEvent{ - Span: span.span.String(), - Attempt: int32(span.attempts + 1), - Priority: header.UserPriority.String(), - ReqSentTime: reqSentTime.String(), - }) - rawResp, pErr = kv.SendWrappedWithAdmission( ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req) - respReceivedTime = timeutil.Now() if pErr != nil { return pErr.GoError() } @@ -419,9 +421,7 @@ func runBackupProcessor( todo <- span // TODO(dt): send a progress update to update job progress to note // the intents being hit. - backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceResponseEvent{ - RetryableError: tracing.RedactAndTruncateError(intentErr), - }) + log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error()) continue } // TimeoutError improves the opaque `context deadline exceeded` error @@ -480,19 +480,12 @@ func runBackupProcessor( completedSpans = 1 } - duration := respReceivedTime.Sub(reqSentTime) - exportResponseTraceEvent := &backuppb.BackupExportTraceResponseEvent{ - Duration: duration.String(), - FileSummaries: make([]roachpb.RowCount, 0), - } - if len(resp.Files) > 1 { log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") } for i, file := range resp.Files { entryCounts := countRows(file.Exported, spec.PKIDs) - exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, entryCounts) ret := exportedSpan{ // BackupManifest_File just happens to contain the exact fields @@ -521,8 +514,9 @@ func runBackupProcessor( return ctx.Err() } } - exportResponseTraceEvent.NumFiles = int32(len(resp.Files)) - backupProcessorSpan.RecordStructured(exportResponseTraceEvent) + + // Emit the stats for the processed ExportRequest. + recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt)) default: // No work left to do, so we can exit. Note that another worker could @@ -576,6 +570,22 @@ func runBackupProcessor( return grp.Wait() } +// recordExportStats emits a StructuredEvent containing the stats about the +// evaluated ExportRequest. +func recordExportStats( + sp *tracing.Span, resp *roachpb.ExportResponse, exportDuration time.Duration, +) { + if sp == nil { + return + } + exportStats := backuppb.ExportStats{Duration: exportDuration} + for _, f := range resp.Files { + exportStats.NumFiles++ + exportStats.DataSize += int64(len(f.SST)) + } + sp.RecordStructured(&exportStats) +} + func init() { rowexec.NewBackupDataProcessor = newBackupDataProcessor } diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 0dff006c889c..1c48909ec4ad 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -44,7 +44,7 @@ func distBackupPlanSpecs( startTime, endTime hlc.Timestamp, ) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) { var span *tracing.Span - ctx, span = tracing.ChildSpan(ctx, "backup-plan-specs") + ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs") _ = ctx // ctx is currently unused, but this new ctx should be used below in the future. defer span.Finish() user := execCtx.User() @@ -155,7 +155,7 @@ func distBackup( progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, ) error { - ctx, span := tracing.ChildSpan(ctx, "backup-distsql") + ctx, span := tracing.ChildSpan(ctx, "backupccl.distBackup") defer span.Finish() evalCtx := execCtx.ExtendedEvalContext() var noTxn *kv.Txn diff --git a/pkg/ccl/backupccl/backuppb/BUILD.bazel b/pkg/ccl/backupccl/backuppb/BUILD.bazel index 9aebc3cc0b1b..4a67127b926a 100644 --- a/pkg/ccl/backupccl/backuppb/BUILD.bazel +++ b/pkg/ccl/backupccl/backuppb/BUILD.bazel @@ -45,8 +45,10 @@ go_library( "//pkg/sql/parser", "//pkg/sql/protoreflect", "//pkg/sql/sem/tree", + "//pkg/util/bulk", "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//jsonpb", + "@io_opentelemetry_go_otel//attribute", ], ) diff --git a/pkg/ccl/backupccl/backuppb/backup.go b/pkg/ccl/backupccl/backuppb/backup.go index 0448199ebe06..238875a2ceee 100644 --- a/pkg/ccl/backupccl/backuppb/backup.go +++ b/pkg/ccl/backupccl/backuppb/backup.go @@ -10,15 +10,18 @@ package backuppb import ( "encoding/json" + "fmt" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/bulk" _ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto "github.com/cockroachdb/errors" "github.com/gogo/protobuf/jsonpb" + "go.opentelemetry.io/otel/attribute" ) // IsIncremental returns if the BackupManifest corresponds to an incremental @@ -123,6 +126,59 @@ func (m ScheduledBackupExecutionArgs) MarshalJSONPB(marshaller *jsonpb.Marshaler return json.Marshal(m) } +var _ bulk.AggregatorEvent = &ExportStats{} + +const ( + tagNumFiles = "num_files" + tagDataSize = "data_size" + tagThroughput = "throughput" +) + +// Render implements the LazyTag interface. +func (e *ExportStats) Render() []attribute.KeyValue { + const mb = 1 << 20 + tags := make([]attribute.KeyValue, 0) + if e.NumFiles > 0 { + tags = append(tags, attribute.KeyValue{ + Key: tagNumFiles, + Value: attribute.Int64Value(e.NumFiles), + }) + } + if e.DataSize > 0 { + dataSizeMB := float64(e.DataSize) / mb + tags = append(tags, attribute.KeyValue{ + Key: tagDataSize, + Value: attribute.StringValue(fmt.Sprintf("%.2f MB", dataSizeMB)), + }) + + if e.Duration > 0 { + throughput := dataSizeMB / e.Duration.Seconds() + tags = append(tags, attribute.KeyValue{ + Key: tagThroughput, + Value: attribute.StringValue(fmt.Sprintf("%.2f MB/s", throughput)), + }) + } + } + + return tags +} + +// Combine implements the AggregatorEvent interface. +func (e *ExportStats) Combine(other bulk.AggregatorEvent) { + otherExportStats, ok := other.(*ExportStats) + if !ok { + panic("`other` is not of type ExportStats") + } + e.NumFiles += otherExportStats.NumFiles + e.DataSize += otherExportStats.DataSize + e.Duration += otherExportStats.Duration +} + +// Tag implements the AggregatorEvent interface. +func (e *ExportStats) Tag() string { + return "ExportStats" +} + func init() { protoreflect.RegisterShorthands((*BackupManifest)(nil), "backup", "backup_manifest") } diff --git a/pkg/ccl/backupccl/backuppb/backup.proto b/pkg/ccl/backupccl/backuppb/backup.proto index 0a52bff4fc5b..a807c5482b00 100644 --- a/pkg/ccl/backupccl/backuppb/backup.proto +++ b/pkg/ccl/backupccl/backuppb/backup.proto @@ -193,22 +193,15 @@ message BackupProgressTraceEvent { util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false]; } -// BackupExportTraceRequestEvent is the trace event recorded when an -// ExportRequest has been sent. -message BackupExportTraceRequestEvent { - string span = 1; - int32 attempt = 2; - string priority = 3; - string req_sent_time = 4; -} - -// BackupExportTraceResponseEvent is the trace event recorded when we receive a -// response from the ExportRequest. -message BackupExportTraceResponseEvent { - string duration = 1; - int32 num_files = 2; - repeated roachpb.RowCount file_summaries = 3 [(gogoproto.nullable) = false]; - reserved 4 ; - string retryable_error = 5; +// ExportStats is a message containing information about each +// Export{Request,Response}. +message ExportStats { + // NumFiles is the number of SST files produced by the ExportRequest. + int64 num_files = 1; + // DataSize is the byte size of all the SST files produced by the + // ExportRequest. + int64 data_size = 2; + // Duration is the time taken for the ExportRequest to be served. + int64 duration = 3 [(gogoproto.casttype) = "time.Duration"]; } diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index ef4674f47979..72a1c3da4661 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -12,6 +12,7 @@ package jobs import ( "context" + "fmt" "strconv" "sync" @@ -397,7 +398,8 @@ func (r *Registry) runJob( // TODO(ajwerner): Move this writing up the trace ID down into // stepThroughStateMachine where we're already often (and soon with // exponential backoff, always) updating the job in that call. - ctx, span := r.ac.Tracer.StartSpanCtx(ctx, typ.String(), spanOptions...) + ctx, span := r.ac.Tracer.StartSpanCtx(ctx, + fmt.Sprintf("%s-%d", typ.String(), job.ID()), spanOptions...) span.SetTag("job-id", attribute.Int64Value(int64(job.ID()))) defer span.Finish() if span.TraceID() != 0 { diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 7b1e399df078..b52fe8746692 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -102,7 +102,7 @@ func evalExport( h := cArgs.Header reply := resp.(*roachpb.ExportResponse) - ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey)) + ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("evalExport")) defer evalExportSpan.Finish() var evalExportTrace types.StringValue diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index e93f7624ef0a..74c36a2b9f17 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -4705,7 +4705,7 @@ func MVCCExportToSST( ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer, ) (roachpb.BulkOpSummary, MVCCKey, error) { var span *tracing.Span - ctx, span = tracing.ChildSpan(ctx, "MVCCExportToSST") + ctx, span = tracing.ChildSpan(ctx, "storage.MVCCExportToSST") defer span.Finish() sstWriter := MakeBackupSSTWriter(ctx, cs, dest) defer sstWriter.Close() diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel new file mode 100644 index 000000000000..7028d25fdd76 --- /dev/null +++ b/pkg/util/bulk/BUILD.bazel @@ -0,0 +1,25 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "bulk", + srcs = ["aggregator.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk", + visibility = ["//visibility:public"], + deps = [ + "//pkg/util/syncutil", + "//pkg/util/tracing", + ], +) + +go_test( + name = "bulk_test", + srcs = ["aggregator_test.go"], + embed = [":bulk"], + deps = [ + "//pkg/ccl/backupccl/backuppb", + "//pkg/util/log", + "//pkg/util/tracing", + "//pkg/util/tracing/tracingpb", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/util/bulk/aggregator.go b/pkg/util/bulk/aggregator.go new file mode 100644 index 000000000000..af6c469feb49 --- /dev/null +++ b/pkg/util/bulk/aggregator.go @@ -0,0 +1,98 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" +) + +// AggregatorEvent describes an event that can be aggregated and stored by the +// Aggregator. An AggregatorEvent also implements the tracing.LazyTag interface +// to render its information on the associated tracing span. +type AggregatorEvent interface { + tracing.LazyTag + + // Combine combines two AggregatorEvents together. + Combine(other AggregatorEvent) + // Tag returns a string used to identify the AggregatorEvent. + Tag() string +} + +// An Aggregator can be used to aggregate and render AggregatorEvents that are +// emitted as part of its tracing spans' recording. +type Aggregator struct { + mu struct { + syncutil.Mutex + // aggregatedEvents is a mapping from the tag identifying the + // AggregatorEvent to the running aggregate of the AggregatorEvent. + aggregatedEvents map[string]AggregatorEvent + // sp is the tracing span managed by the Aggregator. + sp *tracing.Span + } +} + +// Notify implements the tracing.EventListener interface. +func (b *Aggregator) Notify(event tracing.Structured) { + bulkEvent, ok := event.(AggregatorEvent) + if !ok { + return + } + + b.mu.Lock() + defer b.mu.Unlock() + + // If this is the first AggregatorEvent with this tag, set it as a LazyTag on + // the associated tracing span. This way the AggregatorEvent will be + // dynamically Render()ed everytime we pull the tracing for the associated + // span. + eventTag := bulkEvent.Tag() + if _, ok := b.mu.aggregatedEvents[bulkEvent.Tag()]; !ok { + b.mu.aggregatedEvents[eventTag] = bulkEvent + b.mu.sp.SetLazyTag(eventTag, b.mu.aggregatedEvents[eventTag]) + } else { + b.mu.aggregatedEvents[eventTag].Combine(bulkEvent) + } +} + +// Close is responsible for finishing the Aggregators' tracing span. +func (b *Aggregator) Close() { + b.mu.Lock() + defer b.mu.Unlock() + b.mu.sp.Finish() +} + +var _ tracing.EventListener = &Aggregator{} + +// MakeAggregatorWithSpan returns an instance of an Aggregator along with a +// newly created child context. The Aggregator is registered as a +// tracing.EventListener on the span associated with newly created context. +// +// The Aggregator instance is responsible for finishing the returned span, and +// so the user must call Close(). +func MakeAggregatorWithSpan( + ctx context.Context, aggregatorName string, +) (context.Context, *Aggregator) { + agg := &Aggregator{} + sp := tracing.SpanFromContext(ctx) + + aggCtx, aggSpan := sp.Tracer().StartSpanCtx(ctx, aggregatorName, + tracing.WithEventListeners([]tracing.EventListener{agg}), tracing.WithParent(sp)) + + agg.mu.Lock() + defer agg.mu.Unlock() + agg.mu.aggregatedEvents = make(map[string]AggregatorEvent) + agg.mu.sp = aggSpan + + return aggCtx, agg +} diff --git a/pkg/util/bulk/aggregator_test.go b/pkg/util/bulk/aggregator_test.go new file mode 100644 index 000000000000..5ef55963866f --- /dev/null +++ b/pkg/util/bulk/aggregator_test.go @@ -0,0 +1,90 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package bulk_test + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" + "github.com/cockroachdb/cockroach/pkg/util/bulk" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/stretchr/testify/require" +) + +func TestAggregator(t *testing.T) { + tr := tracing.NewTracer() + ctx := context.Background() + ctx, root := tr.StartSpanCtx(ctx, "root", tracing.WithRecording(tracingpb.RecordingVerbose)) + defer root.Finish() + + ctx, agg := bulk.MakeAggregatorWithSpan(ctx, "mockAggregator") + aggSp := tracing.SpanFromContext(ctx) + defer agg.Close() + + child := tr.StartSpan("child", tracing.WithParent(root), + tracing.WithEventListeners([]tracing.EventListener{agg})) + defer child.Finish() + child.RecordStructured(&backuppb.ExportStats{ + NumFiles: 10, + DataSize: 10, + Duration: time.Minute, + }) + + ctx, childChild := tracing.ChildSpan(ctx, "childChild") + defer childChild.Finish() + childChild.RecordStructured(&backuppb.ExportStats{ + NumFiles: 20, + DataSize: 20, + Duration: time.Minute, + }) + + remoteChild := tr.StartSpan("remoteChild", tracing.WithRemoteParentFromSpanMeta(childChild.Meta())) + remoteChild.RecordStructured(&backuppb.ExportStats{ + NumFiles: 30, + DataSize: 30, + Duration: time.Minute, + }) + + // We only expect to see the aggregated stats from the local children since we + // have not imported the remote childs' Recording. + exportStatsTag, found := aggSp.GetLazyTag("ExportStats") + require.True(t, found) + var es *backuppb.ExportStats + var ok bool + if es, ok = exportStatsTag.(*backuppb.ExportStats); !ok { + t.Fatal("failed to cast LazyTag to expected type") + } + require.Equal(t, backuppb.ExportStats{ + NumFiles: 30, + DataSize: 30, + Duration: 2 * time.Minute, + }, *es) + + // Import the remote recording into its parent. + rec := remoteChild.FinishAndGetConfiguredRecording() + childChild.ImportRemoteRecording(rec) + + // Now, we expect the ExportStats from the remote child to show up in the + // aggregator. + exportStatsTag, found = aggSp.GetLazyTag("ExportStats") + require.True(t, found) + if es, ok = exportStatsTag.(*backuppb.ExportStats); !ok { + t.Fatal("failed to cast LazyTag to expected type") + } + require.Equal(t, backuppb.ExportStats{ + NumFiles: 60, + DataSize: 60, + Duration: 3 * time.Minute, + }, *es) +} diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index 0b34aa6133fc..f9dd295cb019 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -461,7 +461,7 @@ func (s *Stopper) RunAsyncTaskEx(ctx context.Context, opt TaskOpts, f func(conte // // Note that we have to create the child in this parent goroutine; we can't // defer the creation to the spawned async goroutine since the parent span - // might get Finish()ed by then. However, we'll update the child'd goroutine + // might get Finish()ed by then. However, we'll update the child's goroutine // ID. var sp *tracing.Span switch opt.SpanOpt { diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index affa37cec68c..1016f76c0df6 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -17,6 +17,7 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/ring" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -509,7 +510,11 @@ func (s *crdbSpan) recordFinishedChildren(childRecording tracingpb.Recording) { // children being added to s. for _, span := range childRecording { for _, record := range span.StructuredRecords { - s.notifyEventListeners(record.Payload) + var d types.DynamicAny + if err := types.UnmarshalAny(record.Payload, &d); err != nil { + continue + } + s.notifyEventListeners(d.Message.(protoutil.Message)) } } @@ -937,6 +942,9 @@ func (s *crdbSpan) getRecordingNoChildrenLocked( childKey := string(tag.Key) childValue := tag.Value.Emit() + if rs.Tags == nil { + rs.Tags = make(map[string]string) + } rs.Tags[childKey] = childValue tagGroup.Tags = append(tagGroup.Tags,