Skip to content

Commit

Permalink
Merge #91630 #92068
Browse files Browse the repository at this point in the history
91630: distsql,jobs: propagate 'job' log tag and pprof label to remote nodes r=lidorcarmel a=lidorcarmel

Previously, the job log tags and the recently added job pprof labels were only used locally and not sent to remote nodes. Which means that, for example, pprof did not show the job ids of jobs that consume cpu but were started on another node.

This PR propagates the job tag to the remote nodes, and there it adds that tag to be used when logging. It also adds the same tag as a pprof label on the remote node. With this change we no longer need to set the tags that were added in #77397, therefore those are reverted here.

Note that this PR changes the job log tag from `job=<job id>` to `job=<job type> id=<job id>`, to be the same as the pprof label.

See this gist for a before/after example https://gist.github.com/lidorcarmel/2c147b76d814e29682cccbdcadb5d7bb.

And here is pprof graph example with those tags:

<img width="2531" alt="pprof_with_job_tags" src="https://user-images.githubusercontent.com/51982110/201506953-acf2675d-28a2-471e-9ebf-5441b4728978.png">

There is another commit in this PR which is fixing a small bug in the recently added pprofutil package.

Epic: None

Release note: None

92068: sql/opt: Support regtype in foldOIDFamilyCast  r=rafiss a=e-mbrown

Informs: #91022

Type cast to regtype can now use the index on pgtype(oid).

Release note: None

Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: e-mbrown <[email protected]>
  • Loading branch information
3 people committed Nov 22, 2022
3 parents b12438a + 1a7ec75 + 8ea163a commit 59de8e8
Show file tree
Hide file tree
Showing 16 changed files with 271 additions and 19 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ ALL_TESTS = [
"//pkg/util/netutil/addr:addr_test",
"//pkg/util/netutil:netutil_test",
"//pkg/util/optional:optional_test",
"//pkg/util/pprofutil:pprofutil_test",
"//pkg/util/pretty:pretty_test",
"//pkg/util/protoutil:protoutil_test",
"//pkg/util/quantile:quantile_test",
Expand Down Expand Up @@ -2037,6 +2038,7 @@ GO_TARGETS = [
"//pkg/util/optional:optional",
"//pkg/util/optional:optional_test",
"//pkg/util/pprofutil:pprofutil",
"//pkg/util/pprofutil:pprofutil_test",
"//pkg/util/pretty:pretty",
"//pkg/util/pretty:pretty_test",
"//pkg/util/protoutil:protoutil",
Expand Down
58 changes: 58 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path/filepath"
"reflect"
"regexp"
"runtime/pprof"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -77,6 +78,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
Expand Down Expand Up @@ -105,6 +107,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
"github.com/gogo/protobuf/proto"
pgx "github.com/jackc/pgx/v4"
Expand Down Expand Up @@ -207,6 +210,61 @@ func TestBackupRestoreMultiNodeRemote(t *testing.T) {
backupAndRestore(ctx, t, tc, []string{remoteFoo}, []string{localFoo}, numAccounts)
}

// TestBackupRestoreJobTagAndLabel runs a backup and restore and verifies that
// the flows are executed remotely with a job log tag and a job pprof label.
func TestBackupRestoreJobTagAndLabel(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numAccounts = 1000
ctx := context.Background()
getJobTag := func(ctx context.Context) string {
tags := logtags.FromContext(ctx)
if tags != nil {
for _, tag := range tags.Get() {
if tag.Key() == "job" {
return tag.ValueStr()
}
}
}
return ""
}
found := false
var mu syncutil.Mutex
tc, _, _, cleanupFn := backupRestoreTestSetupWithParams(t, multiNode, numAccounts, InitManualReplication,
base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
SetupFlowCb: func(ctx context.Context, _ base.SQLInstanceID, _ *execinfrapb.SetupFlowRequest) error {
mu.Lock()
defer mu.Unlock()
tag := getJobTag(ctx)
label, ok := pprof.Label(ctx, "job")
if tag == "" && !ok {
// Skip, it's not a job.
return nil
}
if tag == "" || !ok || tag != label {
log.Fatalf(ctx, "the job tag should exist and match the pprof label: tag=%s label=%s ctx=%s",
tag, label, ctx)
}
found = true
return nil
},
},
},
},
},
)
defer cleanupFn()

backupAndRestore(ctx, t, tc, []string{localFoo}, []string{localFoo}, numAccounts)

require.True(t, found)
}

func TestBackupRestorePartitioned(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ go_library(
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/stop",
Expand Down
14 changes: 8 additions & 6 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"context"
"fmt"
"os"
"runtime/pprof"
"strconv"
"strings"
"time"
Expand All @@ -37,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -1275,8 +1275,12 @@ func (r *Registry) stepThroughStateMachine(
return errors.NewAssertionErrorWithWrappedErrf(jobErr,
"job %d: resuming with non-nil error", job.ID())
}
resumeCtx := logtags.AddTag(ctx, "job", job.ID())
labels := pprof.Labels("job", fmt.Sprintf("%s id=%d", jobType, job.ID()))
resumeCtx := logtags.AddTag(ctx, "job",
fmt.Sprintf("%s id=%d", jobType, job.ID()))
// Adding all tags as pprof labels (including the one we just added for job
// type and id).
resumeCtx, undo := pprofutil.SetProfilerLabelsFromCtxTags(resumeCtx)
defer undo()

if err := job.started(ctx, nil /* txn */); err != nil {
return err
Expand All @@ -1290,9 +1294,7 @@ func (r *Registry) stepThroughStateMachine(
jm.CurrentlyRunning.Dec(1)
r.metrics.RunningNonIdleJobs.Dec(1)
}()
pprof.Do(resumeCtx, labels, func(ctx context.Context) {
err = resumer.Resume(ctx, execCtx)
})
err = resumer.Resume(resumeCtx, execCtx)
}()

r.MarkIdle(job, false)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//pkg/util/envutil",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/pprofutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/grpcinterceptor",
Expand Down
23 changes: 17 additions & 6 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/grpcinterceptor"
Expand Down Expand Up @@ -404,6 +405,9 @@ func (ds *ServerImpl) setupFlow(

if !f.IsLocal() {
flowCtx.AmbientContext.AddLogTag("f", flowCtx.ID.Short())
if req.JobTag != "" {
flowCtx.AmbientContext.AddLogTag("job", req.JobTag)
}
ctx = flowCtx.AmbientContext.AnnotateCtx(ctx)
telemetry.Inc(sqltelemetry.DistSQLExecCounter)
}
Expand Down Expand Up @@ -615,11 +619,6 @@ func (ds *ServerImpl) SetupFlow(
ctx context.Context, req *execinfrapb.SetupFlowRequest,
) (*execinfrapb.SimpleResponse, error) {
log.VEventf(ctx, 1, "received SetupFlow request from n%v for flow %v", req.Flow.Gateway, req.Flow.FlowID)
if cb := ds.TestingKnobs.SetupFlowCb; cb != nil {
if err := cb(ds.ServerConfig.NodeID.SQLInstanceID(), req); err != nil {
return &execinfrapb.SimpleResponse{Error: execinfrapb.NewError(ctx, err)}, nil
}
}
_, rpcSpan := ds.setupSpanForIncomingRPC(ctx, req)
defer rpcSpan.Finish()

Expand Down Expand Up @@ -654,6 +653,15 @@ func (ds *ServerImpl) SetupFlow(
}
return err
}
var undo func()
ctx, undo = pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer undo()
if cb := ds.TestingKnobs.SetupFlowCb; cb != nil {
if err = cb(ctx, ds.ServerConfig.NodeID.SQLInstanceID(), req); err != nil {
f.Cleanup(ctx)
return err
}
}
return ds.remoteFlowRunner.RunFlow(ctx, f)
}(); err != nil {
// We return flow deployment errors in the response so that they are
Expand Down Expand Up @@ -700,7 +708,10 @@ func (ds *ServerImpl) flowStreamInt(
}
defer cleanup()
log.VEventf(ctx, 1, "connected inbound stream %s/%d", flowID.Short(), streamID)
return streamStrategy.Run(f.AmbientContext.AnnotateCtx(ctx), stream, msg, f)
ctx = f.AmbientContext.AnnotateCtx(ctx)
ctx, undo := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer undo()
return streamStrategy.Run(ctx, stream, msg, f)
}

// FlowStream is part of the execinfrapb.DistSQLServer interface.
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
pbtypes "github.com/gogo/protobuf/types"
)

Expand Down Expand Up @@ -395,13 +396,27 @@ func (dsp *DistSQLPlanner) setupFlows(
if len(statementSQL) > setupFlowRequestStmtMaxLength {
statementSQL = statementSQL[:setupFlowRequestStmtMaxLength]
}
getJobTag := func(ctx context.Context) string {
tags := logtags.FromContext(ctx)
if tags != nil {
for _, tag := range tags.Get() {
if tag.Key() == "job" {
return tag.ValueStr()
}
}
}
return ""
}
setupReq := execinfrapb.SetupFlowRequest{
// TODO(yuzefovich): avoid populating some fields of the SetupFlowRequest
// for local plans.
LeafTxnInputState: leafInputState,
Version: execinfra.Version,
EvalContext: execinfrapb.MakeEvalContext(&evalCtx.Context),
TraceKV: evalCtx.Tracing.KVTracingEnabled(),
CollectStats: planCtx.collectExecStats,
StatementSQL: statementSQL,
JobTag: getJobTag(ctx),
}

var isVectorized bool
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func TestSetupFlowRPCError(t *testing.T) {
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
SetupFlowCb: func(nodeID base.SQLInstanceID, req *execinfrapb.SetupFlowRequest) error {
SetupFlowCb: func(_ context.Context, nodeID base.SQLInstanceID, req *execinfrapb.SetupFlowRequest) error {
nodeIDForError, ok := stmtToNodeIDForError[req.StatementSQL]
if !ok || nodeIDForError != nodeID {
return nil
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package execinfra

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -298,8 +299,9 @@ type TestingKnobs struct {
ProcessorNoTracingSpan bool

// SetupFlowCb, when non-nil, is called by the execinfrapb.DistSQLServer
// when responding to SetupFlow RPCs.
SetupFlowCb func(base.SQLInstanceID, *execinfrapb.SetupFlowRequest) error
// when responding to SetupFlow RPCs, after the flow is set up but before it
// is started.
SetupFlowCb func(context.Context, base.SQLInstanceID, *execinfrapb.SetupFlowRequest) error
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/execinfrapb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ message SetupFlowRequest {
reserved 1, 2;

optional util.tracing.tracingpb.TraceInfo trace_info = 11;
optional string job_tag = 13 [(gogoproto.nullable) = false];

// LeafTxnInputState is the input parameter for the *client.Txn needed for
// executing the flow.
Expand Down
16 changes: 15 additions & 1 deletion pkg/sql/execinfrapb/processors_bulk_io.proto
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ import "cloud/cloudpb/external_storage.proto";
// descriptor in the database, and doesn't emit any rows nor support
// any post-processing.
message BackfillerSpec {
// TODO(lidor): job_id is not needed when interoperability with 22.2 is
// dropped, the new way to send the job tag is using 'job_tag' in the
// SetupFlowRequest message.
optional int64 job_id = 13 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];

enum Type {
Invalid = 0;
Column = 1;
Expand Down Expand Up @@ -105,6 +107,9 @@ message JobProgress {
}

message ReadImportDataSpec {
// TODO(lidor): job_id is not needed when interoperability with 22.2 is
// dropped, the new way to send the job tag is using 'job_tag' in the
// SetupFlowRequest message.
optional int64 job_id = 19 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
reserved 1;
optional roachpb.IOFileFormat format = 8 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -241,6 +246,9 @@ message StreamIngestionFrontierSpec {
}

message BackupDataSpec {
// TODO(lidor): job_id is not needed when interoperability with 22.2 is
// dropped, the new way to send the job tag is using 'job_tag' in the
// SetupFlowRequest message.
optional int64 job_id = 11 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
repeated roachpb.Span spans = 1 [(gogoproto.nullable) = false];
repeated roachpb.Span introduced_spans = 2 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -294,6 +302,9 @@ message RestoreSpanEntry {
}

message RestoreDataSpec {
// TODO(lidor): job_id is not needed when interoperability with 22.2 is
// dropped, the new way to send the job tag is using 'job_tag' in the
// SetupFlowRequest message.
optional int64 job_id = 6 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
optional util.hlc.Timestamp restore_time = 1 [(gogoproto.nullable) = false];
optional roachpb.FileEncryptionOptions encryption = 2;
Expand All @@ -309,6 +320,9 @@ message RestoreDataSpec {
}

message SplitAndScatterSpec {
// TODO(lidor): job_id is not needed when interoperability with 22.2 is
// dropped, the new way to send the job tag is using 'job_tag' in the
// SetupFlowRequest message.
optional int64 job_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"];
message RestoreEntryChunk {
repeated RestoreSpanEntry entries = 1 [(gogoproto.nullable) = false];
Expand Down
40 changes: 40 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/scalar
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,46 @@ vectorized: true
table: pg_class@pg_class_oid_idx
spans: [/101 - /101]

statement ok
CREATE TYPE status AS ENUM ('open', 'closed', 'inactive');

query T
EXPLAIN (VERBOSE) SELECT * FROM pg_type WHERE oid = 'status'::regtype
----
distribution: local
vectorized: true
·
• virtual table
columns: (oid, typname, typnamespace, typowner, typlen, typbyval, typtype, typcategory, typispreferred, typisdefined, typdelim, typrelid, typelem, typarray, typinput, typoutput, typreceive, typsend, typmodin, typmodout, typanalyze, typalign, typstorage, typnotnull, typbasetype, typtypmod, typndims, typcollation, typdefaultbin, typdefault, typacl)
estimated row count: 10 (missing stats)
table: pg_type@pg_type_oid_idx
spans: [/status - /status]

query T
EXPLAIN (VERBOSE) SELECT * FROM pg_type WHERE oid = 'bool'::regtype
----
distribution: local
vectorized: true
·
• virtual table
columns: (oid, typname, typnamespace, typowner, typlen, typbyval, typtype, typcategory, typispreferred, typisdefined, typdelim, typrelid, typelem, typarray, typinput, typoutput, typreceive, typsend, typmodin, typmodout, typanalyze, typalign, typstorage, typnotnull, typbasetype, typtypmod, typndims, typcollation, typdefaultbin, typdefault, typacl)
estimated row count: 10 (missing stats)
table: pg_type@pg_type_oid_idx
spans: [/boolean - /boolean]

query T
EXPLAIN (VERBOSE) SELECT * FROM pg_type WHERE oid = 'bool[]'::regtype
----
distribution: local
vectorized: true
·
• virtual table
columns: (oid, typname, typnamespace, typowner, typlen, typbyval, typtype, typcategory, typispreferred, typisdefined, typdelim, typrelid, typelem, typarray, typinput, typoutput, typreceive, typsend, typmodin, typmodout, typanalyze, typalign, typstorage, typnotnull, typbasetype, typtypmod, typndims, typcollation, typdefaultbin, typdefault, typacl)
estimated row count: 10 (missing stats)
table: pg_type@pg_type_oid_idx
spans: [/boolean[] - /boolean[]]


query T
EXPLAIN (VERBOSE) SELECT CASE WHEN current_database() = 'test' THEN 42 ELSE 1/3 END
----
Expand Down
Loading

0 comments on commit 59de8e8

Please sign in to comment.