From 8770dce8616dba69ace0b00d773ec85d4b1403fc Mon Sep 17 00:00:00 2001 From: adityamaru Date: Tue, 11 Jul 2023 23:34:48 -0400 Subject: [PATCH] sql,server: support collecting labelled goroutines This change collect cluster-wide goroutines that have a pprof label tying it to the particular job's execution, whose job execution details have been requested. This relies on the support added to the pprofui server to collect cluster-wide, labelled goroutines in https://github.com/cockroachdb/cockroach/pull/105916. Informs: #105076 Release note: None --- pkg/server/status.go | 2 +- pkg/sql/jobs_profiler_bundle.go | 30 ++++++++++++-- pkg/sql/jobs_profiler_bundle_test.go | 61 +++++++++++++++++++++------- 3 files changed, 74 insertions(+), 19 deletions(-) diff --git a/pkg/server/status.go b/pkg/server/status.go index 170d3d4812c0..6217e2052a6a 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -4139,7 +4139,7 @@ func (s *statusServer) GetJobProfilerExecutionDetails( jobID := jobspb.JobID(req.JobId) execCfg := s.sqlServer.execCfg - eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.InternalDB, jobID) + eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID) data, err := eb.ReadExecutionDetail(ctx, req.Filename) if err != nil { return nil, err diff --git a/pkg/sql/jobs_profiler_bundle.go b/pkg/sql/jobs_profiler_bundle.go index ed6a87780b25..eacc76db80ce 100644 --- a/pkg/sql/jobs_profiler_bundle.go +++ b/pkg/sql/jobs_profiler_bundle.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -40,10 +41,11 @@ func (p *planner) RequestExecutionDetails(ctx context.Context, jobID jobspb.JobI clusterversion.V23_1.String()) } - e := MakeJobProfilerExecutionDetailsBuilder(execCfg.InternalDB, jobID) + e := MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID) // TODO(adityamaru): When we start collecting more information we can consider // parallelize the collection of the various pieces. e.addDistSQLDiagram(ctx) + e.addLabelledGoroutines(ctx) return nil } @@ -51,6 +53,7 @@ func (p *planner) RequestExecutionDetails(ctx context.Context, jobID jobspb.JobI // ExecutionDetailsBuilder can be used to read and write execution details corresponding // to a job. type ExecutionDetailsBuilder struct { + srv serverpb.SQLStatusServer db isql.DB jobID jobspb.JobID } @@ -156,14 +159,35 @@ func (e *ExecutionDetailsBuilder) ReadExecutionDetail( // MakeJobProfilerExecutionDetailsBuilder returns an instance of an ExecutionDetailsBuilder. func MakeJobProfilerExecutionDetailsBuilder( - db isql.DB, jobID jobspb.JobID, + srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, ) ExecutionDetailsBuilder { e := ExecutionDetailsBuilder{ - db: db, jobID: jobID, + srv: srv, db: db, jobID: jobID, } return e } +// addLabelledGoroutines collects and persists goroutines from all nodes in the +// cluster that have a pprof label tying it to the job whose execution details +// are being collected. +func (e *ExecutionDetailsBuilder) addLabelledGoroutines(ctx context.Context) { + profileRequest := serverpb.ProfileRequest{ + NodeId: "all", + Type: serverpb.ProfileRequest_GOROUTINE, + Labels: true, + LabelFilter: fmt.Sprintf("%d", e.jobID), + } + resp, err := e.srv.Profile(ctx, &profileRequest) + if err != nil { + log.Errorf(ctx, "failed to collect goroutines for job %d: %+v", e.jobID, err.Error()) + return + } + filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00")) + if err := e.WriteExecutionDetail(ctx, filename, resp.Data); err != nil { + log.Errorf(ctx, "failed to write goroutine for job %d: %+v", e.jobID, err.Error()) + } +} + // addDistSQLDiagram generates and persists a `distsql..html` file. func (e *ExecutionDetailsBuilder) addDistSQLDiagram(ctx context.Context) { query := `SELECT plan_diagram FROM [SHOW JOB $1 WITH EXECUTION DETAILS]` diff --git a/pkg/sql/jobs_profiler_bundle_test.go b/pkg/sql/jobs_profiler_bundle_test.go index dca3fb483d93..4968548f5b7d 100644 --- a/pkg/sql/jobs_profiler_bundle_test.go +++ b/pkg/sql/jobs_profiler_bundle_test.go @@ -16,6 +16,8 @@ import ( "fmt" "io" "net/http" + "runtime/pprof" + "strings" "testing" "time" @@ -59,35 +61,63 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) { runner := sqlutils.MakeSQLRunner(sqlDB) - jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { - return fakeExecResumer{ - OnResume: func(ctx context.Context) error { - p := sql.PhysicalPlan{} - infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) - p.PhysicalInfrastructure = infra - jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) - checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID()) - return nil - }, - } - }, jobs.UsesTenantCostControl) - runner.Exec(t, `CREATE TABLE t (id INT)`) runner.Exec(t, `INSERT INTO t SELECT generate_series(1, 100)`) t.Run("read/write DistSQL diagram", func(t *testing.T) { + jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return fakeExecResumer{ + OnResume: func(ctx context.Context) error { + p := sql.PhysicalPlan{} + infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) + p.PhysicalInfrastructure = infra + jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) + checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID()) + return nil + }, + } + }, jobs.UsesTenantCostControl) + var importJobID int runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID) jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID)) runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) - checkExecutionDetails(t, s, jobspb.JobID(importJobID), "distsql") + distSQLDiagram := checkExecutionDetails(t, s, jobspb.JobID(importJobID), "distsql") + require.Regexp(t, "", string(distSQLDiagram)) + }) + + t.Run("read/write goroutines", func(t *testing.T) { + blockCh := make(chan struct{}) + continueCh := make(chan struct{}) + defer close(blockCh) + defer close(continueCh) + jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return fakeExecResumer{ + OnResume: func(ctx context.Context) error { + pprof.Do(ctx, pprof.Labels("foo", "bar"), func(ctx2 context.Context) { + blockCh <- struct{}{} + <-continueCh + }) + return nil + }, + } + }, jobs.UsesTenantCostControl) + var importJobID int + runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID) + <-blockCh + runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID) + goroutines := checkExecutionDetails(t, s, jobspb.JobID(importJobID), "goroutines") + continueCh <- struct{}{} + jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID)) + require.True(t, strings.Contains(string(goroutines), fmt.Sprintf("labels: {\"foo\":\"bar\", \"job\":\"IMPORT id=%d\", \"n\":\"1\"}", importJobID))) + require.True(t, strings.Contains(string(goroutines), "github.com/cockroachdb/cockroach/pkg/sql_test.fakeExecResumer.Resume")) }) } func checkExecutionDetails( t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, filename string, -) { +) []byte { t.Helper() client, err := s.GetAdminHTTPClient() @@ -112,4 +142,5 @@ func checkExecutionDetails( data, err := io.ReadAll(r) require.NoError(t, err) require.NotEmpty(t, data) + return data }