Skip to content

Commit

Permalink
distsql,jobs: propagate 'job' log tag and pprof label to remote nodes
Browse files Browse the repository at this point in the history
Previously, the recently added job log tag and pprof label
were only used locally and not sent to remote nodes. Which means that,
for example, pprof did not show the job ids of jobs that consume cpu
but were started on another node.

This PR propagates the job tag to the remote nodes, and there it
adds that tag to be used when logging. It also adds the same tag as
a pprof label on the remote node. With this change we no longer need
to set the tags that were added in cockroachdb#77397, but we cannot remove those
yet because we want the old job tags to still be sent and applied
during upgrades. Once 23.1 is out, we can remove the old implementation
from 23.2.

Note that this PR changes the job log tag from `job=<job id>` to
`job=<job type> id=<job id>`, to be the same as the pprof label.

Epic: None

Release note: None
  • Loading branch information
lidorcarmel committed Nov 28, 2022
1 parent 574c529 commit 12f79e7
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 8 deletions.
57 changes: 57 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path/filepath"
"reflect"
"regexp"
"runtime/pprof"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -76,6 +77,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
Expand Down Expand Up @@ -207,6 +209,61 @@ func TestBackupRestoreMultiNodeRemote(t *testing.T) {
backupAndRestore(ctx, t, tc, []string{remoteFoo}, []string{localFoo}, numAccounts)
}

// TestBackupRestoreJobTagAndLabel runs a backup and restore and verifies that
// the flows are executed remotely with a job log tag and a job pprof label.
func TestBackupRestoreJobTagAndLabel(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 1000
ctx := context.Background()
getJobTag := func(ctx context.Context) string {
tags := logtags.FromContext(ctx)
if tags != nil {
for _, tag := range tags.Get() {
if tag.Key() == "job" {
return tag.ValueStr()
}
}
}
return ""
}
found := false
var mu syncutil.Mutex
tc, _, _, cleanupFn := backupRestoreTestSetupWithParams(t, multiNode, numAccounts, InitManualReplication,
base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
SetupFlowCb: func(ctx context.Context, _ base.SQLInstanceID, _ *execinfrapb.SetupFlowRequest) error {
mu.Lock()
defer mu.Unlock()
tag := getJobTag(ctx)
label, ok := pprof.Label(ctx, "job")
if tag == "" && !ok {
// Skip, it's not a job.
return nil
}
if tag == "" || !ok || tag != label {
log.Fatalf(ctx, "the job tag should exist and match the pprof label: tag=%s label=%s ctx=%s",
tag, label, ctx)
}
found = true
return nil
},
},
},
},
},
)
defer cleanupFn()

backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts)

require.True(t, found)
}

func TestBackupRestorePartitioned(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_library(
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/stop",
Expand Down
14 changes: 8 additions & 6 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"fmt"
"os"
"runtime/pprof"
"strconv"
"strings"
"time"
Expand All @@ -37,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -1231,8 +1231,12 @@ func (r *Registry) stepThroughStateMachine(
return errors.NewAssertionErrorWithWrappedErrf(jobErr,
"job %d: resuming with non-nil error", job.ID())
}
resumeCtx := logtags.AddTag(ctx, "job", job.ID())
labels := pprof.Labels("job", fmt.Sprintf("%s id=%d", jobType, job.ID()))
resumeCtx := logtags.AddTag(ctx, "job",
fmt.Sprintf("%s id=%d", jobType, job.ID()))
// Adding all tags as pprof labels (including the one we just added for job
// type and id).
resumeCtx, undo := pprofutil.SetProfilerLabelsFromCtxTags(resumeCtx)
defer undo()

if err := job.started(ctx, nil /* txn */); err != nil {
return err
Expand All @@ -1246,9 +1250,7 @@ func (r *Registry) stepThroughStateMachine(
jm.CurrentlyRunning.Dec(1)
r.metrics.RunningNonIdleJobs.Dec(1)
}()
pprof.Do(resumeCtx, labels, func(ctx context.Context) {
err = resumer.Resume(ctx, execCtx)
})
err = resumer.Resume(resumeCtx, execCtx)
}()

r.MarkIdle(job, false)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ go_library(
"//pkg/util/envutil",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/pprofutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/grpcinterceptor",
Expand Down
18 changes: 17 additions & 1 deletion pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/grpcinterceptor"
Expand Down Expand Up @@ -420,6 +421,9 @@ func (ds *ServerImpl) setupFlow(

if !f.IsLocal() {
flowCtx.AmbientContext.AddLogTag("f", f.GetFlowCtx().ID.Short())
if req.JobTag != "" {
flowCtx.AmbientContext.AddLogTag("job", req.JobTag)
}
ctx = flowCtx.AmbientContext.AnnotateCtx(ctx)
telemetry.Inc(sqltelemetry.DistSQLExecCounter)
}
Expand Down Expand Up @@ -652,6 +656,15 @@ func (ds *ServerImpl) SetupFlow(
if err != nil {
return err
}
var undo func()
ctx, undo = pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer undo()
if cb := ds.TestingKnobs.SetupFlowCb; cb != nil {
if err = cb(ctx, ds.ServerConfig.NodeID.SQLInstanceID(), req); err != nil {
f.Cleanup(ctx)
return err
}
}
return ds.flowScheduler.ScheduleFlow(ctx, f)
}(); err != nil {
// We return flow deployment errors in the response so that they are
Expand Down Expand Up @@ -697,7 +710,10 @@ func (ds *ServerImpl) flowStreamInt(
}
defer cleanup()
log.VEventf(ctx, 1, "connected inbound stream %s/%d", flowID.Short(), streamID)
return streamStrategy.Run(f.AmbientContext.AnnotateCtx(ctx), stream, msg, f)
ctx = f.AmbientContext.AnnotateCtx(ctx)
ctx, undo := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer undo()
return streamStrategy.Run(ctx, stream, msg, f)
}

// FlowStream is part of the execinfrapb.DistSQLServer interface.
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
pbtypes "github.com/gogo/protobuf/types"
)

Expand Down Expand Up @@ -383,13 +384,27 @@ func (dsp *DistSQLPlanner) setupFlows(
if len(statementSQL) > setupFlowRequestStmtMaxLength {
statementSQL = statementSQL[:setupFlowRequestStmtMaxLength]
}
getJobTag := func(ctx context.Context) string {
tags := logtags.FromContext(ctx)
if tags != nil {
for _, tag := range tags.Get() {
if tag.Key() == "job" {
return tag.ValueStr()
}
}
}
return ""
}
setupReq := execinfrapb.SetupFlowRequest{
// TODO(yuzefovich): avoid populating some fields of the SetupFlowRequest
// for local plans.
LeafTxnInputState: leafInputState,
Version: execinfra.Version,
EvalContext: execinfrapb.MakeEvalContext(&evalCtx.Context),
TraceKV: evalCtx.Tracing.KVTracingEnabled(),
CollectStats: collectStats,
StatementSQL: statementSQL,
JobTag: getJobTag(ctx),
}

if vectorizeMode := evalCtx.SessionData().VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff {
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package execinfra

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
Expand Down Expand Up @@ -288,6 +290,11 @@ type TestingKnobs struct {
// IndexBackfillMergerTestingKnobs are the index backfill merger specific
// testing knobs.
IndexBackfillMergerTestingKnobs base.ModuleTestingKnobs

// SetupFlowCb, when non-nil, is called by the execinfrapb.DistSQLServer
// when responding to SetupFlow RPCs, after the flow is set up but before it
// is started.
SetupFlowCb func(context.Context, base.SQLInstanceID, *execinfrapb.SetupFlowRequest) error
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfrapb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ message SetupFlowRequest {
reserved 1, 2;

optional util.tracing.tracingpb.TraceInfo trace_info = 11;
optional string job_tag = 13 [(gogoproto.nullable) = false];

// LeafTxnInputState is the input parameter for the *client.Txn needed for
// executing the flow.
Expand Down
16 changes: 15 additions & 1 deletion pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ import "cloud/cloudpb/external_storage.proto";
// descriptor in the database, and doesn't emit any rows nor support
// any post-processing.
message BackfillerSpec {
// TODO(lidor): job_id is not needed when interoperability with 22.2 is
// dropped, the new way to send the job tag is using 'job_tag' in the
// SetupFlowRequest message.
optional int64 job_id = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];

enum Type {
Invalid = 0;
Column = 1;
Expand Down Expand Up @@ -105,6 +107,9 @@ message JobProgress {
}

message ReadImportDataSpec {
// TODO(lidor): job_id is not needed when interoperability with 22.2 is
// dropped, the new way to send the job tag is using 'job_tag' in the
// SetupFlowRequest message.
optional int64 job_id = 19 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
reserved 1;
optional roachpb.IOFileFormat format = 8 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -241,6 +246,9 @@ message StreamIngestionFrontierSpec {
}

message BackupDataSpec {
// TODO(lidor): job_id is not needed when interoperability with 22.2 is
// dropped, the new way to send the job tag is using 'job_tag' in the
// SetupFlowRequest message.
optional int64 job_id = 11 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
repeated roachpb.Span spans = 1 [(gogoproto.nullable) = false];
repeated roachpb.Span introduced_spans = 2 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -294,6 +302,9 @@ message RestoreSpanEntry {
}

message RestoreDataSpec {
// TODO(lidor): job_id is not needed when interoperability with 22.2 is
// dropped, the new way to send the job tag is using 'job_tag' in the
// SetupFlowRequest message.
optional int64 job_id = 6 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
optional util.hlc.Timestamp restore_time = 1 [(gogoproto.nullable) = false];
optional roachpb.FileEncryptionOptions encryption = 2;
Expand All @@ -309,6 +320,9 @@ message RestoreDataSpec {
}

message SplitAndScatterSpec {
// TODO(lidor): job_id is not needed when interoperability with 22.2 is
// dropped, the new way to send the job tag is using 'job_tag' in the
// SetupFlowRequest message.
optional int64 job_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
message RestoreEntryChunk {
repeated RestoreSpanEntry entries = 1 [(gogoproto.nullable) = false];
Expand Down

0 comments on commit 12f79e7

Please sign in to comment.