Skip to content

Commit

Permalink
Merge #77397
Browse files Browse the repository at this point in the history
77397: *: add jobID to ctx tags in bulk distsql processors r=dt a=dt

It is in the tags in the job's initial context which is what plans and runs these flows, but those tags get lost on the way to  the *remote* processors, so only the one processor that happens to be on the gateway logs with its ID. Instead, these changes now put it explicitly in the processor specs so that we can explicitly add it to the local ctx tags in each processor's initialization.

Release note: none.

Release justification: logging only change.

Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
craig[bot] and dt committed Mar 6, 2022
2 parents 608369b + 4b334c0 commit 3b7f761
Show file tree
Hide file tree
Showing 13 changed files with 40 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func backup(
planCtx,
execCtx,
dsp,
int64(job.ID()),
spans,
introducedSpans,
pkIDs,
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
gogotypes "github.com/gogo/protobuf/types"
"github.com/kr/pretty"
)
Expand Down Expand Up @@ -177,6 +178,7 @@ func newBackupDataProcessor(

// Start is part of the RowSource interface.
func (bp *backupDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", bp.spec.JobID)
ctx = bp.StartInternal(ctx, backupProcessorName)
ctx, cancel := context.WithCancel(ctx)
bp.cancelAndWaitForWorker = func() {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

func distBackupPlanSpecs(
ctx context.Context,
planCtx *sql.PlanningCtx,
execCtx sql.JobExecContext,
dsp *sql.DistSQLPlanner,
jobID int64,
spans roachpb.Spans,
introducedSpans roachpb.Spans,
pkIDs map[uint64]bool,
Expand Down Expand Up @@ -92,6 +92,7 @@ func distBackupPlanSpecs(
sqlInstanceIDToSpec := make(map[base.SQLInstanceID]*execinfrapb.BackupDataSpec)
for _, partition := range spanPartitions {
spec := &execinfrapb.BackupDataSpec{
JobID: jobID,
Spans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
Expand All @@ -113,6 +114,7 @@ func distBackupPlanSpecs(
// which is not the leaseholder for any of the spans, but is for an
// introduced span.
spec := &execinfrapb.BackupDataSpec{
JobID: jobID,
IntroducedSpans: partition.Spans,
DefaultURI: defaultURI,
URIsByLocalityKV: urisByLocalityKV,
Expand Down Expand Up @@ -153,7 +155,6 @@ func distBackup(
) error {
ctx, span := tracing.ChildSpan(ctx, "backup-distsql")
defer span.Finish()
ctx = logtags.AddTag(ctx, "backup-distsql", nil)
evalCtx := execCtx.ExtendedEvalContext()
var noTxn *kv.Txn

Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
gogotypes "github.com/gogo/protobuf/types"
)

Expand Down Expand Up @@ -161,6 +162,7 @@ func newRestoreDataProcessor(

// Start is part of the RowSource interface.
func (rd *restoreDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", rd.spec.JobID)
ctx = rd.StartInternal(ctx, restoreDataProcName)
rd.input.Start(ctx)

Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ func restore(
return distRestore(
ctx,
execCtx,
int64(job.ID()),
importSpanChunks,
dataToRestore.getPKIDs(),
encryption,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

// distRestore plans a 2 stage distSQL flow for a distributed restore. It
Expand All @@ -41,6 +40,7 @@ import (
func distRestore(
ctx context.Context,
execCtx sql.JobExecContext,
jobID int64,
chunks [][]execinfrapb.RestoreSpanEntry,
pkIDs map[uint64]bool,
encryption *jobspb.BackupEncryptionOptions,
Expand All @@ -49,7 +49,6 @@ func distRestore(
restoreTime hlc.Timestamp,
progCh chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
) error {
ctx = logtags.AddTag(ctx, "restore-distsql", nil)
defer close(progCh)
var noTxn *kv.Txn

Expand Down Expand Up @@ -89,6 +88,7 @@ func distRestore(
}

restoreDataSpec := execinfrapb.RestoreDataSpec{
JobID: jobID,
RestoreTime: restoreTime,
Encryption: fileEncryption,
TableRekeys: tableRekeys,
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

type splitAndScatterer interface {
Expand Down Expand Up @@ -244,6 +245,7 @@ func newSplitAndScatterProcessor(

// Start is part of the RowSource interface.
func (ssp *splitAndScatterProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", ssp.spec.JobID)
ctx = ssp.StartInternal(ctx, splitAndScatterProcessorName)
// Note that the loop over doneScatterCh in Next should prevent the goroutine
// below from leaking when there are no errors. However, if that loop needs to
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -203,6 +204,9 @@ func (ca *changeAggregator) MustBeStreaming() bool {

// Start is part of the RowSource interface.
func (ca *changeAggregator) Start(ctx context.Context) {
if ca.spec.JobID != 0 {
ctx = logtags.AddTag(ctx, "job", ca.spec.JobID)
}
ctx = ca.StartInternal(ctx, changeAggregatorProcName)

// Derive a separate context so that we can shutdown the poller. Note that
Expand Down Expand Up @@ -1138,6 +1142,9 @@ func (cf *changeFrontier) MustBeStreaming() bool {

// Start is part of the RowSource interface.
func (cf *changeFrontier) Start(ctx context.Context) {
if cf.spec.JobID != 0 {
ctx = logtags.AddTag(ctx, "job", cf.spec.JobID)
}
// StartInternal called at the beginning of the function because there are
// early returns if errors are detected.
ctx = cf.StartInternal(ctx, changeFrontierProcName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

var minimumFlushInterval = settings.RegisterPublicDurationSettingWithExplicitUnit(
Expand Down Expand Up @@ -191,6 +192,7 @@ func newStreamIngestionDataProcessor(

// Start is part of the RowSource interface.
func (sip *streamIngestionProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", sip.spec.JobID)
log.Infof(ctx, "starting ingest proc")
ctx = sip.StartInternal(ctx, streamIngestionProcessorName)

Expand Down
16 changes: 14 additions & 2 deletions pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import "roachpb/api.proto";
// descriptor in the database, and doesn't emit any rows nor support
// any post-processing.
message BackfillerSpec {
optional int64 job_id = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];

enum Type {
Invalid = 0;
Column = 1;
Expand Down Expand Up @@ -83,6 +85,8 @@ message BackfillerSpec {
// Note that older nodes do not respect this flag so callers should
// check MVCCAddSSTable before setting this option.
optional bool write_at_batch_timestamp = 12 [(gogoproto.nullable) = false];

// NEXTID: 14.
}

// JobProgress identifies the job to report progress on. This reporting
Expand All @@ -98,6 +102,7 @@ message JobProgress {
}

message ReadImportDataSpec {
optional int64 job_id = 19 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
reserved 1;
optional roachpb.IOFileFormat format = 8 [(gogoproto.nullable) = false];
// sample_size is the rate at which to output rows, based on an input row's size.
Expand Down Expand Up @@ -165,7 +170,7 @@ message ReadImportDataSpec {

optional int32 initial_splits = 18 [(gogoproto.nullable) = false];

// NEXTID: 19
// NEXTID: 20.
}

message StreamIngestionDataSpec {
Expand Down Expand Up @@ -213,6 +218,7 @@ message StreamIngestionFrontierSpec {
}

message BackupDataSpec {
optional int64 job_id = 11 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
repeated roachpb.Span spans = 1 [(gogoproto.nullable) = false];
repeated roachpb.Span introduced_spans = 2 [(gogoproto.nullable) = false];
optional string default_uri = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "DefaultURI"];
Expand All @@ -229,6 +235,8 @@ message BackupDataSpec {
// User who initiated the backup. This is used to check access privileges
// when using FileTable ExternalStorage.
optional string user_proto = 10 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security.SQLUsernameProto"];

// NEXTID: 12.
}

message RestoreFileSpec {
Expand Down Expand Up @@ -263,24 +271,28 @@ message RestoreSpanEntry {
}

message RestoreDataSpec {
optional int64 job_id = 6 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
optional util.hlc.Timestamp restore_time = 1 [(gogoproto.nullable) = false];
optional roachpb.FileEncryptionOptions encryption = 2;
repeated TableRekey table_rekeys = 3 [(gogoproto.nullable) = false];
repeated TenantRekey tenant_rekeys = 5[(gogoproto.nullable) = false];
// PKIDs is used to convert result from an ExportRequest into row count
// information passed back to track progress in the backup job.
map<uint64, bool> pk_ids = 4 [(gogoproto.customname) = "PKIDs"];
// NEXT ID: 6.
// NEXT ID: 7.
}

message SplitAndScatterSpec {
optional int64 job_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
message RestoreEntryChunk {
repeated RestoreSpanEntry entries = 1 [(gogoproto.nullable) = false];
}

repeated RestoreEntryChunk chunks = 1 [(gogoproto.nullable) = false];
repeated TableRekey table_rekeys = 2 [(gogoproto.nullable) = false];
repeated TenantRekey tenant_rekeys = 3 [(gogoproto.nullable) = false];

// NEXTID: 5.
}

// ExporterSpec is the specification for a processor that consumes rows and
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/importer/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

var csvOutputTypes = []*types.T{
Expand Down Expand Up @@ -190,6 +191,7 @@ func newReadImportDataProcessor(

// Start is part of the RowSource interface.
func (idp *readImportDataProcessor) Start(ctx context.Context) {
ctx = logtags.AddTag(ctx, "job", idp.spec.JobID)
ctx = idp.StartInternal(ctx, readImportDataProcessorName)
log.Infof(ctx, "starting read import")
// We don't have to worry about this go routine leaking because next we loop over progCh
Expand Down
4 changes: 1 addition & 3 deletions pkg/sql/importer/import_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/logtags"
)

// distImport is used by IMPORT to run a DistSQL flow to ingest data by starting
Expand All @@ -52,8 +51,6 @@ func distImport(
walltime int64,
alwaysFlushProgress bool,
) (roachpb.BulkOpSummary, error) {
ctx = logtags.AddTag(ctx, "import-distsql-ingest", nil)

dsp := execCtx.DistSQLPlanner()
evalCtx := execCtx.ExtendedEvalContext()

Expand Down Expand Up @@ -254,6 +251,7 @@ func makeImportReaderSpecs(
// creates the spec. Future files just add themselves to the Uris.
if i < len(sqlInstanceIDs) {
spec := &execinfrapb.ReadImportDataSpec{
JobID: int64(job.ID()),
Tables: tables,
Types: typeDescs,
Format: format,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func (ib *indexBackfiller) runBackfill(

func (ib *indexBackfiller) Run(ctx context.Context) {
opName := "indexBackfillerProcessor"
ctx = logtags.AddTag(ctx, "job", ib.spec.JobID)
ctx = logtags.AddTag(ctx, opName, int(ib.spec.Table.ID))
ctx, span := execinfra.ProcessorSpan(ctx, opName)
defer span.Finish()
Expand Down

0 comments on commit 3b7f761

Please sign in to comment.