Skip to content

Commit

Permalink
bulk, backupccl: introduce a Structured event Aggregator
Browse files Browse the repository at this point in the history
This change introduces an Aggregator object that is capable
of listening for Structured events emitted in a recording,
aggregating them and rendering them as LazyTags.
We also introduce an AggregatorEvent interface that can be
implemented by a Structured event thereby making it eligible
for aggregation in the Aggregator.

The first user of the Aggregator will be every backup data
processor that is setup on the nodes in the cluster during
a backup. The Aggregator lives as long as the processor, and
listens for Aggregator events emitted by any span that is a child
of the processors' span. This includes both local children as
well as remote children whose recordings have been imported into
a local span. The Aggregator stores running aggregates of each
AggregatorEvent it is notified about, bucketed by the events'
tag name. This aggregate will be rendered on the tracing span as a LazyTag.

This change teaches every ExportRequest to emit an AggregatorEvent.
Going forward we expect many more operations in bulk jobs to
define and emit such events providing visibility into otherwise
opaque operations.

We cleanup some of the StructuredEvents that were previously
added but have not proved useful, and also change some of the
tracing span operation names to be more intuitive.

Fixes: #80388

Release note: None
  • Loading branch information
adityamaru committed Jul 7, 2022
1 parent b9a165c commit 34e7ce7
Show file tree
Hide file tree
Showing 17 changed files with 338 additions and 52 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ ALL_TESTS = [
"//pkg/util/binfetcher:binfetcher_test",
"//pkg/util/bitarray:bitarray_test",
"//pkg/util/buildutil:buildutil_test",
"//pkg/util/bulk:bulk_test",
"//pkg/util/cache:cache_test",
"//pkg/util/caller:caller_test",
"//pkg/util/cgroups:cgroups_test",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ go_library(
"//pkg/storage",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/bulk",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,5 +1428,5 @@ func updateBackupDetails(
}

func init() {
sql.AddPlanHook("backup", backupPlanHook)
sql.AddPlanHook("backupccl.backupPlanHook", backupPlanHook)
}
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_planning_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func retrieveSingleTenantMetadata(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID,
) (descpb.TenantInfoWithUsage, error) {
row, err := ie.QueryRow(
ctx, "backup-lookup-tenant", txn,
ctx, "backupccl.retrieveSingleTenantMetadata", txn,
tenantMetadataQuery+` WHERE id = $1`, tenantID.ToUint64(),
)
if err != nil {
Expand All @@ -99,7 +99,7 @@ func retrieveAllTenantsMetadata(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn,
) ([]descpb.TenantInfoWithUsage, error) {
rows, err := ie.QueryBuffered(
ctx, "backup-lookup-tenants", txn,
ctx, "backupccl.retrieveAllTenantsMetadata", txn,
// XXX Should we add a `WHERE active`? We require the tenant to be active
// when it is specified..
tenantMetadataQuery,
Expand Down
60 changes: 35 additions & 25 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -106,6 +107,10 @@ type backupDataProcessor struct {

// BoundAccount that reserves the memory usage of the backup processor.
memAcc *mon.BoundAccount

// Aggregator that aggregates StructuredEvents emitted in the
// backupDataProcessors' trace recording.
agg *bulk.Aggregator
}

var (
Expand Down Expand Up @@ -153,14 +158,20 @@ func (bp *backupDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", bp.spec.JobID)
ctx = bp.StartInternal(ctx, backupProcessorName)
ctx, cancel := context.WithCancel(ctx)

// Construct an Aggregator to aggregate and render AggregatorEvents emitted in
// bps' trace recording.
ctx, bp.agg = bulk.MakeAggregatorWithSpan(ctx,
fmt.Sprintf("%s-aggregator", backupProcessorName))

bp.cancelAndWaitForWorker = func() {
cancel()
for range bp.progCh {
}
}
log.Infof(ctx, "starting backup data")
if err := bp.flowCtx.Stopper().RunAsyncTaskEx(ctx, stop.TaskOpts{
TaskName: "backup-worker",
TaskName: "backupDataProcessor.runBackupProcessor",
SpanOpt: stop.ChildSpan,
}, func(ctx context.Context) {
bp.backupErr = runBackupProcessor(ctx, bp.flowCtx, &bp.spec, bp.progCh, bp.memAcc)
Expand Down Expand Up @@ -198,6 +209,7 @@ func (bp *backupDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Producer

func (bp *backupDataProcessor) close() {
bp.cancelAndWaitForWorker()
bp.agg.Close()
if bp.InternalClose() {
bp.memAcc.Close(bp.Ctx)
}
Expand Down Expand Up @@ -387,26 +399,16 @@ func runBackupProcessor(
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
}
log.Infof(ctx, "sending ExportRequest for span %s (attempt %d, priority %s)",
log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)",
span.span, span.attempts+1, header.UserPriority.String())
var rawResp roachpb.Response
var pErr *roachpb.Error
var reqSentTime time.Time
var respReceivedTime time.Time
requestSentAt := timeutil.Now()
exportRequestErr := contextutil.RunWithTimeout(ctx,
fmt.Sprintf("ExportRequest for span %s", span.span),
timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error {
reqSentTime = timeutil.Now()
backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceRequestEvent{
Span: span.span.String(),
Attempt: int32(span.attempts + 1),
Priority: header.UserPriority.String(),
ReqSentTime: reqSentTime.String(),
})

rawResp, pErr = kv.SendWrappedWithAdmission(
ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req)
respReceivedTime = timeutil.Now()
if pErr != nil {
return pErr.GoError()
}
Expand All @@ -419,9 +421,7 @@ func runBackupProcessor(
todo <- span
// TODO(dt): send a progress update to update job progress to note
// the intents being hit.
backupProcessorSpan.RecordStructured(&backuppb.BackupExportTraceResponseEvent{
RetryableError: tracing.RedactAndTruncateError(intentErr),
})
log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error())
continue
}
// TimeoutError improves the opaque `context deadline exceeded` error
Expand Down Expand Up @@ -480,19 +480,12 @@ func runBackupProcessor(
completedSpans = 1
}

duration := respReceivedTime.Sub(reqSentTime)
exportResponseTraceEvent := &backuppb.BackupExportTraceResponseEvent{
Duration: duration.String(),
FileSummaries: make([]roachpb.RowCount, 0),
}

if len(resp.Files) > 1 {
log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1")
}

for i, file := range resp.Files {
entryCounts := countRows(file.Exported, spec.PKIDs)
exportResponseTraceEvent.FileSummaries = append(exportResponseTraceEvent.FileSummaries, entryCounts)

ret := exportedSpan{
// BackupManifest_File just happens to contain the exact fields
Expand Down Expand Up @@ -521,8 +514,9 @@ func runBackupProcessor(
return ctx.Err()
}
}
exportResponseTraceEvent.NumFiles = int32(len(resp.Files))
backupProcessorSpan.RecordStructured(exportResponseTraceEvent)

// Emit the stats for the processed ExportRequest.
recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt))

default:
// No work left to do, so we can exit. Note that another worker could
Expand Down Expand Up @@ -576,6 +570,22 @@ func runBackupProcessor(
return grp.Wait()
}

// recordExportStats emits a StructuredEvent containing the stats about the
// evaluated ExportRequest.
func recordExportStats(
sp *tracing.Span, resp *roachpb.ExportResponse, exportDuration time.Duration,
) {
if sp == nil {
return
}
exportStats := backuppb.ExportStats{Duration: exportDuration}
for _, f := range resp.Files {
exportStats.NumFiles++
exportStats.DataSize += int64(len(f.SST))
}
sp.RecordStructured(&exportStats)
}

func init() {
rowexec.NewBackupDataProcessor = newBackupDataProcessor
}
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func distBackupPlanSpecs(
startTime, endTime hlc.Timestamp,
) (map[base.SQLInstanceID]*execinfrapb.BackupDataSpec, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "backup-plan-specs")
ctx, span = tracing.ChildSpan(ctx, "backupccl.distBackupPlanSpecs")
_ = ctx // ctx is currently unused, but this new ctx should be used below in the future.
defer span.Finish()
user := execCtx.User()
Expand Down Expand Up @@ -155,7 +155,7 @@ func distBackup(
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
backupSpecs map[base.SQLInstanceID]*execinfrapb.BackupDataSpec,
) error {
ctx, span := tracing.ChildSpan(ctx, "backup-distsql")
ctx, span := tracing.ChildSpan(ctx, "backupccl.distBackup")
defer span.Finish()
evalCtx := execCtx.ExtendedEvalContext()
var noTxn *kv.Txn
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backuppb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ go_library(
"//pkg/sql/parser",
"//pkg/sql/protoreflect",
"//pkg/sql/sem/tree",
"//pkg/util/bulk",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//jsonpb",
"@io_opentelemetry_go_otel//attribute",
],
)
56 changes: 56 additions & 0 deletions pkg/ccl/backupccl/backuppb/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ package backuppb

import (
"encoding/json"
"fmt"

"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/bulk"
_ "github.com/cockroachdb/cockroach/pkg/util/uuid" // required for backup.proto
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
"go.opentelemetry.io/otel/attribute"
)

// IsIncremental returns if the BackupManifest corresponds to an incremental
Expand Down Expand Up @@ -123,6 +126,59 @@ func (m ScheduledBackupExecutionArgs) MarshalJSONPB(marshaller *jsonpb.Marshaler
return json.Marshal(m)
}

var _ bulk.AggregatorEvent = &ExportStats{}

const (
tagNumFiles = "num_files"
tagDataSize = "data_size"
tagThroughput = "throughput"
)

// Render implements the LazyTag interface.
func (e *ExportStats) Render() []attribute.KeyValue {
const mb = 1 << 20
tags := make([]attribute.KeyValue, 0)
if e.NumFiles > 0 {
tags = append(tags, attribute.KeyValue{
Key: tagNumFiles,
Value: attribute.Int64Value(e.NumFiles),
})
}
if e.DataSize > 0 {
dataSizeMB := float64(e.DataSize) / mb
tags = append(tags, attribute.KeyValue{
Key: tagDataSize,
Value: attribute.StringValue(fmt.Sprintf("%.2f MB", dataSizeMB)),
})

if e.Duration > 0 {
throughput := dataSizeMB / e.Duration.Seconds()
tags = append(tags, attribute.KeyValue{
Key: tagThroughput,
Value: attribute.StringValue(fmt.Sprintf("%.2f MB/s", throughput)),
})
}
}

return tags
}

// Combine implements the AggregatorEvent interface.
func (e *ExportStats) Combine(other bulk.AggregatorEvent) {
otherExportStats, ok := other.(*ExportStats)
if !ok {
panic("`other` is not of type ExportStats")
}
e.NumFiles += otherExportStats.NumFiles
e.DataSize += otherExportStats.DataSize
e.Duration += otherExportStats.Duration
}

// Tag implements the AggregatorEvent interface.
func (e *ExportStats) Tag() string {
return "ExportStats"
}

func init() {
protoreflect.RegisterShorthands((*BackupManifest)(nil), "backup", "backup_manifest")
}
27 changes: 10 additions & 17 deletions pkg/ccl/backupccl/backuppb/backup.proto
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,15 @@ message BackupProgressTraceEvent {
util.hlc.Timestamp revision_start_time = 3 [(gogoproto.nullable) = false];
}

// BackupExportTraceRequestEvent is the trace event recorded when an
// ExportRequest has been sent.
message BackupExportTraceRequestEvent {
string span = 1;
int32 attempt = 2;
string priority = 3;
string req_sent_time = 4;
}

// BackupExportTraceResponseEvent is the trace event recorded when we receive a
// response from the ExportRequest.
message BackupExportTraceResponseEvent {
string duration = 1;
int32 num_files = 2;
repeated roachpb.RowCount file_summaries = 3 [(gogoproto.nullable) = false];
reserved 4 ;
string retryable_error = 5;
// ExportStats is a message containing information about each
// Export{Request,Response}.
message ExportStats {
// NumFiles is the number of SST files produced by the ExportRequest.
int64 num_files = 1;
// DataSize is the byte size of all the SST files produced by the
// ExportRequest.
int64 data_size = 2;
// Duration is the time taken for the ExportRequest to be served.
int64 duration = 3 [(gogoproto.casttype) = "time.Duration"];
}

4 changes: 3 additions & 1 deletion pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package jobs

import (
"context"
"fmt"
"strconv"
"sync"

Expand Down Expand Up @@ -397,7 +398,8 @@ func (r *Registry) runJob(
// TODO(ajwerner): Move this writing up the trace ID down into
// stepThroughStateMachine where we're already often (and soon with
// exponential backoff, always) updating the job in that call.
ctx, span := r.ac.Tracer.StartSpanCtx(ctx, typ.String(), spanOptions...)
ctx, span := r.ac.Tracer.StartSpanCtx(ctx,
fmt.Sprintf("%s-%d", typ.String(), job.ID()), spanOptions...)
span.SetTag("job-id", attribute.Int64Value(int64(job.ID())))
defer span.Finish()
if span.TraceID() != 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func evalExport(
h := cArgs.Header
reply := resp.(*roachpb.ExportResponse)

ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("Export [%s,%s)", args.Key, args.EndKey))
ctx, evalExportSpan := tracing.ChildSpan(ctx, fmt.Sprintf("evalExport"))
defer evalExportSpan.Finish()

var evalExportTrace types.StringValue
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4705,7 +4705,7 @@ func MVCCExportToSST(
ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer,
) (roachpb.BulkOpSummary, MVCCKey, error) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, "MVCCExportToSST")
ctx, span = tracing.ChildSpan(ctx, "storage.MVCCExportToSST")
defer span.Finish()
sstWriter := MakeBackupSSTWriter(ctx, cs, dest)
defer sstWriter.Close()
Expand Down
25 changes: 25 additions & 0 deletions pkg/util/bulk/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "bulk",
srcs = ["aggregator.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/util/bulk",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/syncutil",
"//pkg/util/tracing",
],
)

go_test(
name = "bulk_test",
srcs = ["aggregator_test.go"],
embed = [":bulk"],
deps = [
"//pkg/ccl/backupccl/backuppb",
"//pkg/util/log",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit 34e7ce7

Please sign in to comment.