From a99de9f7764da1f6c5f530a5ee1d67f520d18b99 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Wed, 30 Aug 2023 18:15:08 -0400 Subject: [PATCH] server,sql: add status server endpoint to request profiler details This change introduces a new status server endpoint to request job profiler details. This endpoint will redirect the request to the current coordinator node of the job in question. This will be useful because in a followup we will load the resumer from the coordinator node's job registry and trigger its specific job profiler detail collection logic. This is the first step of a few to move to a "fetch model" rather than have each resumer dump their execution details at some arbitrary cadence. The core logic involved in collecting profiler details has not changed, it has been moved in its entirety from pkg/sql to pkg/server. The `crdb_internal.request_job_execution_details` builtin now resolves the job's coordinator ID and calls the new status server endpoint. Informs: #109671 Release note: None --- docs/generated/http/full.md | 40 ++++ pkg/jobs/utils.go | 18 ++ pkg/server/BUILD.bazel | 6 + pkg/server/job_profiler.go | 175 ++++++++++++++++++ pkg/server/job_profiler_test.go | 133 +++++++++++++ pkg/server/serverpb/status.go | 1 + pkg/server/serverpb/status.proto | 12 ++ pkg/server/status.go | 12 +- pkg/server/tenant.go | 5 + pkg/server/testing_knobs.go | 4 + pkg/sql/BUILD.bazel | 1 - pkg/sql/jobs_profiler_execution_details.go | 109 +---------- .../jobs_profiler_execution_details_test.go | 29 +-- pkg/sql/sem/builtins/builtins.go | 6 +- pkg/sql/sem/eval/context.go | 5 +- 15 files changed, 430 insertions(+), 126 deletions(-) create mode 100644 pkg/server/job_profiler.go create mode 100644 pkg/server/job_profiler_test.go diff --git a/docs/generated/http/full.md b/docs/generated/http/full.md index ab0fec95efb4..793001d37454 100644 --- a/docs/generated/http/full.md +++ b/docs/generated/http/full.md @@ -5204,6 +5204,46 @@ Support status: [reserved](#support-status) +## RequestJobProfilerExecutionDetails + +`GET /_status/request_job_profiler_execution_details/{job_id}` + + + +Support status: [reserved](#support-status) + +#### Request Parameters + + + + + + + +| Field | Type | Label | Description | Support status | +| ----- | ---- | ----- | ----------- | -------------- | +| job_id | [int64](#cockroach.server.serverpb.RequestJobProfilerExecutionDetailsRequest-int64) | | | [reserved](#support-status) | + + + + + + + +#### Response Parameters + + + + + + + + + + + + + ## GetJobProfilerExecutionDetails `GET /_status/job_profiler_execution_details/{job_id}` diff --git a/pkg/jobs/utils.go b/pkg/jobs/utils.go index 6f7d894ce2a2..db59226ccf90 100644 --- a/pkg/jobs/utils.go +++ b/pkg/jobs/utils.go @@ -156,6 +156,24 @@ func JobExists( return row != nil, nil } +// JobCoordinatorID returns the coordinator node ID of the job. +func JobCoordinatorID( + ctx context.Context, jobID jobspb.JobID, txn *kv.Txn, ex isql.Executor, +) (int32, error) { + row, err := ex.QueryRow(ctx, "fetch-job-coordinator", txn, `SELECT claim_instance_id FROM system.jobs WHERE id = $1`, jobID) + if err != nil { + return 0, err + } + if row == nil { + return 0, errors.Errorf("coordinator not found for job %d", jobID) + } + coordinatorID, ok := tree.AsDInt(row[0]) + if !ok { + return 0, errors.AssertionFailedf("expected coordinator ID to be an int, got %T", row[0]) + } + return int32(coordinatorID), nil +} + // isJobTypeColumnDoesNotExistError returns true if the error is of the form // `column "job_type" does not exist`. func isJobTypeColumnDoesNotExistError(err error) bool { diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 79c8378af069..7427ffdce323 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "init.go", "init_handshake.go", "initial_sql.go", + "job_profiler.go", "key_visualizer_server.go", "listen_and_update_addrs.go", "load_endpoint.go", @@ -336,6 +337,7 @@ go_library( "//pkg/util/tracing/tracingpb", "//pkg/util/tracing/tracingservicepb", "//pkg/util/tracing/tracingui", + "//pkg/util/tracing/zipper", "//pkg/util/uint128", "//pkg/util/uuid", "@com_github_cenkalti_backoff//:backoff", @@ -434,6 +436,7 @@ go_test( "index_usage_stats_test.go", "init_handshake_test.go", "intent_test.go", + "job_profiler_test.go", "load_endpoint_test.go", "main_test.go", "migration_test.go", @@ -478,10 +481,13 @@ go_test( "//pkg/base/serverident", "//pkg/build", "//pkg/cli/exit", + "//pkg/cloud/impl:cloudimpl", "//pkg/clusterversion", "//pkg/config", "//pkg/config/zonepb", "//pkg/gossip", + "//pkg/jobs", + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/kvtenant", diff --git a/pkg/server/job_profiler.go b/pkg/server/job_profiler.go new file mode 100644 index 000000000000..f0f176f0957f --- /dev/null +++ b/pkg/server/job_profiler.go @@ -0,0 +1,175 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "fmt" + "strconv" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/server/srverrors" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing/zipper" + "github.com/cockroachdb/errors" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// RequestJobProfilerExecutionDetails requests the profiler details for a job. +// This method ensures that the details are requested on the current coordinator +// node of the job to allow for the collection of Resumer specific details. +func (s *statusServer) RequestJobProfilerExecutionDetails( + ctx context.Context, req *serverpb.RequestJobProfilerExecutionDetailsRequest, +) (*serverpb.RequestJobProfilerExecutionDetailsResponse, error) { + ctx = s.AnnotateCtx(ctx) + // TODO(adityamaru): Figure out the correct privileges required to request execution details. + _, err := s.privilegeChecker.RequireAdminUser(ctx) + if err != nil { + return nil, err + } + + execCfg := s.sqlServer.execCfg + var coordinatorID int32 + err = execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + // Find the jobs' current coordinator node ID. + coordinatorID, err = jobs.JobCoordinatorID(ctx, jobspb.JobID(req.JobId), + txn, execCfg.InternalDB.Executor()) + return err + }) + if err != nil { + return nil, err + } + + nodeID, local, err := s.parseNodeID(strconv.Itoa(int(coordinatorID))) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + + // If this node is the current coordinator of the job then we can collect the + // profiler details. + if local { + jobID := jobspb.JobID(req.JobId) + if !execCfg.Settings.Version.IsActive(ctx, clusterversion.V23_2) { + return nil, errors.Newf("execution details can only be requested on a cluster with version >= %s", + clusterversion.V23_2.String()) + } + e := makeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID, execCfg.JobRegistry) + + // TODO(adityamaru): When we start collecting more information we can consider + // parallelize the collection of the various pieces. + e.addDistSQLDiagram(ctx) + e.addLabelledGoroutines(ctx) + e.addClusterWideTraces(ctx) + + // TODO(dt,adityamaru): add logic to reach out the registry and call resumer + // specific execution details collection logic. + + return &serverpb.RequestJobProfilerExecutionDetailsResponse{}, nil + } + + // Forward the request to the coordinator node + status, err := s.dialNode(ctx, nodeID) + if err != nil { + return nil, srverrors.ServerError(ctx, err) + } + return status.RequestJobProfilerExecutionDetails(ctx, req) +} + +// executionDetailsBuilder can be used to read and write execution details corresponding +// to a job. +type executionDetailsBuilder struct { + srv serverpb.SQLStatusServer + db isql.DB + jobID jobspb.JobID + registry *jobs.Registry +} + +// makeJobProfilerExecutionDetailsBuilder returns an instance of an executionDetailsBuilder. +func makeJobProfilerExecutionDetailsBuilder( + srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, registry *jobs.Registry, +) executionDetailsBuilder { + e := executionDetailsBuilder{ + srv: srv, db: db, jobID: jobID, registry: registry, + } + return e +} + +// addLabelledGoroutines collects and persists goroutines from all nodes in the +// cluster that have a pprof label tying it to the job whose execution details +// are being collected. +func (e *executionDetailsBuilder) addLabelledGoroutines(ctx context.Context) { + profileRequest := serverpb.ProfileRequest{ + NodeId: "all", + Type: serverpb.ProfileRequest_GOROUTINE, + Labels: true, + LabelFilter: fmt.Sprintf("%d", e.jobID), + } + resp, err := e.srv.Profile(ctx, &profileRequest) + if err != nil { + log.Errorf(ctx, "failed to collect goroutines for job %d: %v", e.jobID, err.Error()) + return + } + filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00")) + if err := jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, e.db, e.jobID); err != nil { + log.Errorf(ctx, "failed to write goroutine for job %d: %v", e.jobID, err.Error()) + } +} + +// addDistSQLDiagram generates and persists a `distsql..html` file. +func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) { + query := `SELECT plan_diagram FROM [SHOW JOB $1 WITH EXECUTION DETAILS]` + row, err := e.db.Executor().QueryRowEx(ctx, "profiler-bundler-add-diagram", nil, /* txn */ + sessiondata.NoSessionDataOverride, query, e.jobID) + if err != nil { + log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error()) + return + } + if row != nil && row[0] != tree.DNull { + dspDiagramURL := string(tree.MustBeDString(row[0])) + filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405.00")) + if err := jobs.WriteExecutionDetailFile(ctx, filename, + []byte(fmt.Sprintf(``, dspDiagramURL)), + e.db, e.jobID); err != nil { + log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error()) + } + } +} + +// addClusterWideTraces generates and persists a `trace..zip` file +// that captures the active tracing spans of a job on all nodes in the cluster. +func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) { + z := zipper.MakeInternalExecutorInflightTraceZipper(e.db.Executor()) + + traceID, err := jobs.GetJobTraceID(ctx, e.db, e.jobID) + if err != nil { + log.Warningf(ctx, "failed to fetch job trace ID: %+v", err.Error()) + return + } + zippedTrace, err := z.Zip(ctx, int64(traceID)) + if err != nil { + log.Errorf(ctx, "failed to collect cluster wide traces for job %d: %v", e.jobID, err.Error()) + return + } + + filename := fmt.Sprintf("trace.%s.zip", timeutil.Now().Format("20060102_150405.00")) + if err := jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, e.db, e.jobID); err != nil { + log.Errorf(ctx, "failed to write traces for job %d: %v", e.jobID, err.Error()) + } +} diff --git a/pkg/server/job_profiler_test.go b/pkg/server/job_profiler_test.go new file mode 100644 index 000000000000..63fbcb852eea --- /dev/null +++ b/pkg/server/job_profiler_test.go @@ -0,0 +1,133 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register ExternalStorage providers. + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +// fakeExecResumer calls optional callbacks during the job lifecycle. +type fakeExecResumer struct { + OnResume func(context.Context) error + FailOrCancel func(context.Context) error +} + +func (d fakeExecResumer) ForceRealSpan() bool { + return true +} + +func (d fakeExecResumer) DumpTraceAfterRun() bool { + return true +} + +var _ jobs.Resumer = fakeExecResumer{} +var _ jobs.TraceableJob = fakeExecResumer{} + +func (d fakeExecResumer) Resume(ctx context.Context, execCtx interface{}) error { + if d.OnResume != nil { + if err := d.OnResume(ctx); err != nil { + return err + } + } + return nil +} + +func (d fakeExecResumer) OnFailOrCancel(ctx context.Context, _ interface{}, _ error) error { + if d.FailOrCancel != nil { + return d.FailOrCancel(ctx) + } + return nil +} + +// TestJobExecutionDetailsRouting tests that the request job execution details +// endpoint redirects the request to the current coordinator node of the job. +func TestJobExecutionDetailsRouting(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + hasStartedCh := make(chan struct{}) + defer close(hasStartedCh) + jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return fakeExecResumer{ + OnResume: func(ctx context.Context) error { + hasStartedCh <- struct{}{} + return nil + }, + } + }, jobs.UsesTenantCostControl) + defer jobs.ResetConstructors()() + + dialedNodeID := roachpb.NodeID(-1) + ctx := context.Background() + tc := serverutils.StartCluster(t, 2, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + Server: &TestingKnobs{ + DialNodeCallback: func(ctx context.Context, nodeID roachpb.NodeID) error { + // We only care about the first call to dialNode. + if dialedNodeID == -1 { + dialedNodeID = nodeID + } + return nil + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + s := tc.Server(1) + sqlDB := s.SQLConn(t, "n1") + defer sqlDB.Close() + + _, err := sqlDB.Exec(`CREATE TABLE defaultdb.t (id INT)`) + require.NoError(t, err) + _, err = sqlDB.Exec(`INSERT INTO defaultdb.t SELECT generate_series(1, 100)`) + require.NoError(t, err) + var importJobID int + err = sqlDB.QueryRow(`IMPORT INTO defaultdb.t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID) + require.NoError(t, err) + <-hasStartedCh + + // Get the job's current coordinator ID. + var claimInstanceID int + err = sqlDB.QueryRow(`SELECT claim_instance_id FROM system.jobs WHERE id = $1`, importJobID).Scan(&claimInstanceID) + require.NoError(t, err) + + nonCoordinatorIDIdx := 0 + if claimInstanceID == 1 { + // We want to pick the non-coordinator node to send our request to. Idx is + // zero-indexed, so if the coordinator is node 1, we want to pick node 2. + nonCoordinatorIDIdx = 1 + } + nonCoordServer := tc.Server(nonCoordinatorIDIdx) + var resp serverpb.RequestJobProfilerExecutionDetailsResponse + path := fmt.Sprintf("/_status/request_job_profiler_execution_details/%d", importJobID) + err = serverutils.GetJSONProto(nonCoordServer, path, &resp) + require.NoError(t, err) + require.Equal(t, serverpb.RequestJobProfilerExecutionDetailsResponse{}, resp) + require.Equal(t, claimInstanceID, int(dialedNodeID)) +} diff --git a/pkg/server/serverpb/status.go b/pkg/server/serverpb/status.go index e33f0f863ce4..029c9647b694 100644 --- a/pkg/server/serverpb/status.go +++ b/pkg/server/serverpb/status.go @@ -46,6 +46,7 @@ type SQLStatusServer interface { LogFile(context.Context, *LogFileRequest) (*LogEntriesResponse, error) Logs(context.Context, *LogsRequest) (*LogEntriesResponse, error) NodesUI(context.Context, *NodesRequest) (*NodesResponseExternal, error) + RequestJobProfilerExecutionDetails(context.Context, *RequestJobProfilerExecutionDetailsRequest) (*RequestJobProfilerExecutionDetailsResponse, error) } // OptionalNodesStatusServer is a StatusServer that is only optionally present diff --git a/pkg/server/serverpb/status.proto b/pkg/server/serverpb/status.proto index d4682da316ef..4266c847bb28 100644 --- a/pkg/server/serverpb/status.proto +++ b/pkg/server/serverpb/status.proto @@ -2076,6 +2076,12 @@ message NetworkConnectivityResponse { ]; } +message RequestJobProfilerExecutionDetailsRequest { + int64 job_id = 1; +} + +message RequestJobProfilerExecutionDetailsResponse {} + message GetJobProfilerExecutionDetailRequest { int64 job_id = 1; string filename = 2; @@ -2563,6 +2569,12 @@ service Status { }; } + rpc RequestJobProfilerExecutionDetails(RequestJobProfilerExecutionDetailsRequest) returns(RequestJobProfilerExecutionDetailsResponse) { + option (google.api.http) = { + get: "/_status/request_job_profiler_execution_details/{job_id}" + }; + } + rpc GetJobProfilerExecutionDetails(GetJobProfilerExecutionDetailRequest) returns (GetJobProfilerExecutionDetailResponse) { option (google.api.http) = { diff --git a/pkg/server/status.go b/pkg/server/status.go index fc0f6975b80b..41e9e4e75969 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -479,6 +479,8 @@ type statusServer struct { // 256 concurrent queries actively running on a node, then it would // take 2^16 seconds (18 hours) to hit any one of them. cancelSemaphore *quotapool.IntPool + + knobs *TestingKnobs } // systemStatusServer is an extension of the standard @@ -565,6 +567,7 @@ func newStatusServer( internalExecutor *sql.InternalExecutor, serverIterator ServerIterator, clock *hlc.Clock, + knobs *TestingKnobs, ) *statusServer { ambient.AddLogTag("status", nil) if !rpcCtx.TenantID.IsSystem() { @@ -591,6 +594,7 @@ func newStatusServer( // See the docstring on cancelSemaphore for details about this initialization. cancelSemaphore: quotapool.NewIntPool("pgwire-cancel", 256), + knobs: knobs, } return server @@ -636,6 +640,7 @@ func newSystemStatusServer( internalExecutor, serverIterator, clock, + knobs, ) return &systemStatusServer{ @@ -686,6 +691,11 @@ func (s *statusServer) parseNodeID(nodeIDParam string) (roachpb.NodeID, bool, er func (s *statusServer) dialNode( ctx context.Context, nodeID roachpb.NodeID, ) (serverpb.StatusClient, error) { + if s.knobs != nil && s.knobs.DialNodeCallback != nil { + if err := s.knobs.DialNodeCallback(ctx, nodeID); err != nil { + return nil, err + } + } conn, err := s.serverIterator.dialNode(ctx, serverID(nodeID)) if err != nil { return nil, err @@ -1614,7 +1624,7 @@ func (s *statusServer) fetchProfileFromAllNodes( nodeFn := func(ctx context.Context, client interface{}, nodeID roachpb.NodeID) (interface{}, error) { statusClient := client.(serverpb.StatusClient) var pd *profData - err = timeutil.RunWithTimeout(ctx, opName, 1*time.Minute, func(ctx context.Context) error { + err := timeutil.RunWithTimeout(ctx, opName, 1*time.Minute, func(ctx context.Context) error { resp, err := statusClient.Profile(ctx, &serverpb.ProfileRequest{ NodeId: fmt.Sprintf("%d", nodeID), Type: req.Type, diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 36d6cb7e282f..3e05bbc79675 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -316,6 +316,10 @@ func newTenantServer( // construct the status server with a nil sqlServer, and then assign it once // an SQL server gets created. We are going to assume that the status server // won't require the SQL server object until later. + var serverKnobs TestingKnobs + if s, ok := baseCfg.TestingKnobs.Server.(*TestingKnobs); ok { + serverKnobs = *s + } sStatus := newStatusServer( baseCfg.AmbientCtx, baseCfg.Settings, @@ -331,6 +335,7 @@ func newTenantServer( args.circularInternalExecutor, serverIterator, args.clock, + &serverKnobs, ) args.sqlStatusServer = sStatus diff --git a/pkg/server/testing_knobs.go b/pkg/server/testing_knobs.go index 715e9ea51b67..9eaa7980c0b8 100644 --- a/pkg/server/testing_knobs.go +++ b/pkg/server/testing_knobs.go @@ -157,6 +157,10 @@ type TestingKnobs struct { // on a remote node in a cluster fan-out. It is invoked by the nodeFn argument // of server.iterateNodes. IterateNodesNodeCallback func(ctx context.Context, nodeID roachpb.NodeID) error + + // DialNodeCallback is used to mock dial errors when dialing a node. It is + // invoked by the dialNode method of server.serverIterator. + DialNodeCallback func(ctx context.Context, nodeID roachpb.NodeID) error } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 2dffffd1a009..d1a596add750 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -574,7 +574,6 @@ go_library( "//pkg/util/tracing", "//pkg/util/tracing/collector", "//pkg/util/tracing/tracingpb", - "//pkg/util/tracing/zipper", "//pkg/util/tsearch", "//pkg/util/uint128", "//pkg/util/uuid", diff --git a/pkg/sql/jobs_profiler_execution_details.go b/pkg/sql/jobs_profiler_execution_details.go index a5f931f5e896..d4e43b966f46 100644 --- a/pkg/sql/jobs_profiler_execution_details.go +++ b/pkg/sql/jobs_profiler_execution_details.go @@ -13,7 +13,6 @@ package sql import ( "context" gojson "encoding/json" - "fmt" "net/url" "strconv" @@ -26,11 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/tracing/zipper" "github.com/cockroachdb/errors" ) @@ -182,103 +177,9 @@ func (p *planner) RequestExecutionDetailFiles(ctx context.Context, jobID jobspb. clusterversion.V23_2.String()) } - e := makeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID) - - // Check if the job exists otherwise we can bail early. - exists, err := jobs.JobExists(ctx, jobID, p.Txn(), e.db.Executor()) - if err != nil { - return err - } - if !exists { - return errors.Newf("job %d not found; cannot request execution details", jobID) - } - - // TODO(adityamaru): When we start collecting more information we can consider - // parallelize the collection of the various pieces. - e.addDistSQLDiagram(ctx) - e.addLabelledGoroutines(ctx) - e.addClusterWideTraces(ctx) - - return nil -} - -// executionDetailsBuilder can be used to read and write execution details corresponding -// to a job. -type executionDetailsBuilder struct { - srv serverpb.SQLStatusServer - db isql.DB - jobID jobspb.JobID -} - -// makeJobProfilerExecutionDetailsBuilder returns an instance of an executionDetailsBuilder. -func makeJobProfilerExecutionDetailsBuilder( - srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, -) executionDetailsBuilder { - e := executionDetailsBuilder{ - srv: srv, db: db, jobID: jobID, - } - return e -} - -// addLabelledGoroutines collects and persists goroutines from all nodes in the -// cluster that have a pprof label tying it to the job whose execution details -// are being collected. -func (e *executionDetailsBuilder) addLabelledGoroutines(ctx context.Context) { - profileRequest := serverpb.ProfileRequest{ - NodeId: "all", - Type: serverpb.ProfileRequest_GOROUTINE, - Labels: true, - LabelFilter: fmt.Sprintf("%d", e.jobID), - } - resp, err := e.srv.Profile(ctx, &profileRequest) - if err != nil { - log.Errorf(ctx, "failed to collect goroutines for job %d: %v", e.jobID, err.Error()) - return - } - filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00")) - if err := jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, e.db, e.jobID); err != nil { - log.Errorf(ctx, "failed to write goroutine for job %d: %v", e.jobID, err.Error()) - } -} - -// addDistSQLDiagram generates and persists a `distsql..html` file. -func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) { - query := `SELECT plan_diagram FROM [SHOW JOB $1 WITH EXECUTION DETAILS]` - row, err := e.db.Executor().QueryRowEx(ctx, "profiler-bundler-add-diagram", nil, /* txn */ - sessiondata.NoSessionDataOverride, query, e.jobID) - if err != nil { - log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error()) - return - } - if row != nil && row[0] != tree.DNull { - dspDiagramURL := string(tree.MustBeDString(row[0])) - filename := fmt.Sprintf("distsql.%s.html", timeutil.Now().Format("20060102_150405.00")) - if err := jobs.WriteExecutionDetailFile(ctx, filename, - []byte(fmt.Sprintf(``, dspDiagramURL)), - e.db, e.jobID); err != nil { - log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error()) - } - } -} - -// addClusterWideTraces generates and persists a `trace..zip` file -// that captures the active tracing spans of a job on all nodes in the cluster. -func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) { - z := zipper.MakeInternalExecutorInflightTraceZipper(e.db.Executor()) - - traceID, err := jobs.GetJobTraceID(ctx, e.db, e.jobID) - if err != nil { - log.Warningf(ctx, "failed to fetch job trace ID: %+v", err.Error()) - return - } - zippedTrace, err := z.Zip(ctx, int64(traceID)) - if err != nil { - log.Errorf(ctx, "failed to collect cluster wide traces for job %d: %v", e.jobID, err.Error()) - return - } - - filename := fmt.Sprintf("trace.%s.zip", timeutil.Now().Format("20060102_150405.00")) - if err := jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, e.db, e.jobID); err != nil { - log.Errorf(ctx, "failed to write traces for job %d: %v", e.jobID, err.Error()) - } + _, err := execCfg.SQLStatusServer.RequestJobProfilerExecutionDetails(ctx, + &serverpb.RequestJobProfilerExecutionDetailsRequest{ + JobId: int64(jobID), + }) + return err } diff --git a/pkg/sql/jobs_profiler_execution_details_test.go b/pkg/sql/jobs_profiler_execution_details_test.go index 1241dc322fad..eb6c4f62936e 100644 --- a/pkg/sql/jobs_profiler_execution_details_test.go +++ b/pkg/sql/jobs_profiler_execution_details_test.go @@ -107,9 +107,7 @@ func checkForPlanDiagrams( }) } -// TestJobsExecutionDetails tests that a job's execution details are retrieved -// and rendered correctly. -func TestJobsExecutionDetails(t *testing.T) { +func TestShowJobsWithExecutionDetails(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -224,7 +222,7 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) { }) t.Run("execution details for invalid job ID", func(t *testing.T) { - runner.ExpectErr(t, `job -123 not found; cannot request execution details`, `SELECT crdb_internal.request_job_execution_details(-123)`) + runner.ExpectErr(t, `coordinator not found for job -123`, `SELECT crdb_internal.request_job_execution_details(-123)`) }) t.Run("read/write terminal trace", func(t *testing.T) { @@ -321,6 +319,10 @@ func TestListProfilerExecutionDetails(t *testing.T) { execCfg := s.ExecutorConfig().(sql.ExecutorConfig) expectedDiagrams := 1 + writtenDiagram := make(chan struct{}) + defer close(writtenDiagram) + continueCh := make(chan struct{}) + defer close(continueCh) jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { return fakeExecResumer{ OnResume: func(ctx context.Context) error { @@ -329,6 +331,8 @@ func TestListProfilerExecutionDetails(t *testing.T) { p.PhysicalInfrastructure = infra jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), expectedDiagrams) + writtenDiagram <- struct{}{} + <-continueCh if err := execCfg.JobRegistry.CheckPausepoint("fakeresumer.pause"); err != nil { return err } @@ -344,22 +348,25 @@ func TestListProfilerExecutionDetails(t *testing.T) { runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'fakeresumer.pause'`) var importJobID int runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID) - jobutils.WaitForJobToPause(t, runner, jobspb.JobID(importJobID)) + <-writtenDiagram runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) files := listExecutionDetails(t, s, jobspb.JobID(importJobID)) - require.Len(t, files, 5) - require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb", files[0]) - require.Regexp(t, "[0-9]/resumer-trace/.*~cockroach\\.sql\\.jobs\\.jobspb\\.TraceData\\.binpb", files[1]) - require.Regexp(t, "distsql\\..*\\.html", files[2]) - require.Regexp(t, "goroutines\\..*\\.txt", files[3]) - require.Regexp(t, "trace\\..*\\.zip", files[4]) + require.Len(t, files, 3) + require.Regexp(t, "distsql\\..*\\.html", files[0]) + require.Regexp(t, "goroutines\\..*\\.txt", files[1]) + require.Regexp(t, "trace\\..*\\.zip", files[2]) + + continueCh <- struct{}{} + jobutils.WaitForJobToPause(t, runner, jobspb.JobID(importJobID)) // Resume the job, so it can write another DistSQL diagram and goroutine // snapshot. runner.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = ''`) expectedDiagrams = 2 runner.Exec(t, `RESUME JOB $1`, importJobID) + <-writtenDiagram + continueCh <- struct{}{} jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID)) runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) files = listExecutionDetails(t, s, jobspb.JobID(importJobID)) diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index c0b172988521..31b81c15ef12 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -7786,16 +7786,12 @@ specified store on the node it's run from. One of 'mvccGC', 'merge', 'split', if err != nil { return nil, err } - if !isAdmin { return nil, errors.New("must be admin to request a job profiler bundle") } jobID := int(tree.MustBeDInt(args[0])) - if err := evalCtx.JobsProfiler.RequestExecutionDetailFiles( - ctx, - jobspb.JobID(jobID), - ); err != nil { + if err := evalCtx.JobsProfiler.RequestExecutionDetailFiles(ctx, jobspb.JobID(jobID)); err != nil { return nil, err } diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index a228beb2e9d1..bbd6dde2a0d8 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -296,10 +296,7 @@ type JobsProfiler interface { GenerateExecutionDetailsJSON(ctx context.Context, evalCtx *Context, jobID jobspb.JobID) ([]byte, error) // RequestExecutionDetailFiles triggers the collection of execution details - // for the specified jobID that are then persisted to `system.job_info`. This - // currently includes the following pieces of information: - // - // - Latest DistSQL diagram of the job + // for the specified jobID that are then persisted to `system.job_info`. RequestExecutionDetailFiles(ctx context.Context, jobID jobspb.JobID) error }