Skip to content

Commit

Permalink
Merge #106654
Browse files Browse the repository at this point in the history
106654: sql,server: support collecting labelled goroutines in the job profiler r=dt a=adityamaru

This change collect cluster-wide goroutines that have a pprof label tying it to the particular job's execution, whose job execution details have been requested. This relies on the support added to the pprofui server to collect cluster-wide, labelled goroutines in #105916.

Informs: #105076
Release note: None

Co-authored-by: adityamaru <[email protected]>
  • Loading branch information
craig[bot] and adityamaru committed Jul 12, 2023
2 parents 269b9e3 + 8770dce commit c403d35
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4139,7 +4139,7 @@ func (s *statusServer) GetJobProfilerExecutionDetails(

jobID := jobspb.JobID(req.JobId)
execCfg := s.sqlServer.execCfg
eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.InternalDB, jobID)
eb := sql.MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID)
data, err := eb.ReadExecutionDetail(ctx, req.Filename)
if err != nil {
return nil, err
Expand Down
30 changes: 27 additions & 3 deletions pkg/sql/jobs_profiler_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
Expand All @@ -40,17 +41,19 @@ func (p *planner) RequestExecutionDetails(ctx context.Context, jobID jobspb.JobI
clusterversion.V23_1.String())
}

e := MakeJobProfilerExecutionDetailsBuilder(execCfg.InternalDB, jobID)
e := MakeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID)
// TODO(adityamaru): When we start collecting more information we can consider
// parallelize the collection of the various pieces.
e.addDistSQLDiagram(ctx)
e.addLabelledGoroutines(ctx)

return nil
}

// ExecutionDetailsBuilder can be used to read and write execution details corresponding
// to a job.
type ExecutionDetailsBuilder struct {
srv serverpb.SQLStatusServer
db isql.DB
jobID jobspb.JobID
}
Expand Down Expand Up @@ -156,14 +159,35 @@ func (e *ExecutionDetailsBuilder) ReadExecutionDetail(

// MakeJobProfilerExecutionDetailsBuilder returns an instance of an ExecutionDetailsBuilder.
func MakeJobProfilerExecutionDetailsBuilder(
db isql.DB, jobID jobspb.JobID,
srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID,
) ExecutionDetailsBuilder {
e := ExecutionDetailsBuilder{
db: db, jobID: jobID,
srv: srv, db: db, jobID: jobID,
}
return e
}

// addLabelledGoroutines collects and persists goroutines from all nodes in the
// cluster that have a pprof label tying it to the job whose execution details
// are being collected.
func (e *ExecutionDetailsBuilder) addLabelledGoroutines(ctx context.Context) {
profileRequest := serverpb.ProfileRequest{
NodeId: "all",
Type: serverpb.ProfileRequest_GOROUTINE,
Labels: true,
LabelFilter: fmt.Sprintf("%d", e.jobID),
}
resp, err := e.srv.Profile(ctx, &profileRequest)
if err != nil {
log.Errorf(ctx, "failed to collect goroutines for job %d: %+v", e.jobID, err.Error())
return
}
filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00"))
if err := e.WriteExecutionDetail(ctx, filename, resp.Data); err != nil {
log.Errorf(ctx, "failed to write goroutine for job %d: %+v", e.jobID, err.Error())
}
}

// addDistSQLDiagram generates and persists a `distsql.<timestamp>.html` file.
func (e *ExecutionDetailsBuilder) addDistSQLDiagram(ctx context.Context) {
query := `SELECT plan_diagram FROM [SHOW JOB $1 WITH EXECUTION DETAILS]`
Expand Down
61 changes: 46 additions & 15 deletions pkg/sql/jobs_profiler_bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"fmt"
"io"
"net/http"
"runtime/pprof"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -59,35 +61,63 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) {

runner := sqlutils.MakeSQLRunner(sqlDB)

jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return fakeExecResumer{
OnResume: func(ctx context.Context) error {
p := sql.PhysicalPlan{}
infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1))
p.PhysicalInfrastructure = infra
jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID())
checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID())
return nil
},
}
}, jobs.UsesTenantCostControl)

runner.Exec(t, `CREATE TABLE t (id INT)`)
runner.Exec(t, `INSERT INTO t SELECT generate_series(1, 100)`)

t.Run("read/write DistSQL diagram", func(t *testing.T) {
jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return fakeExecResumer{
OnResume: func(ctx context.Context) error {
p := sql.PhysicalPlan{}
infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1))
p.PhysicalInfrastructure = infra
jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID())
checkForPlanDiagram(ctx, t, s.InternalDB().(isql.DB), j.ID())
return nil
},
}
}, jobs.UsesTenantCostControl)

var importJobID int
runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID)
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))

runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
checkExecutionDetails(t, s, jobspb.JobID(importJobID), "distsql")
distSQLDiagram := checkExecutionDetails(t, s, jobspb.JobID(importJobID), "distsql")
require.Regexp(t, "<meta http-equiv=\"Refresh\" content=\"0\\; url=https://cockroachdb\\.github\\.io/distsqlplan/decode.html.*>", string(distSQLDiagram))
})

t.Run("read/write goroutines", func(t *testing.T) {
blockCh := make(chan struct{})
continueCh := make(chan struct{})
defer close(blockCh)
defer close(continueCh)
jobs.RegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return fakeExecResumer{
OnResume: func(ctx context.Context) error {
pprof.Do(ctx, pprof.Labels("foo", "bar"), func(ctx2 context.Context) {
blockCh <- struct{}{}
<-continueCh
})
return nil
},
}
}, jobs.UsesTenantCostControl)
var importJobID int
runner.QueryRow(t, `IMPORT INTO t CSV DATA ('nodelocal://1/foo') WITH DETACHED`).Scan(&importJobID)
<-blockCh
runner.Exec(t, `SELECT crdb_internal.request_job_execution_details($1)`, importJobID)
goroutines := checkExecutionDetails(t, s, jobspb.JobID(importJobID), "goroutines")
continueCh <- struct{}{}
jobutils.WaitForJobToSucceed(t, runner, jobspb.JobID(importJobID))
require.True(t, strings.Contains(string(goroutines), fmt.Sprintf("labels: {\"foo\":\"bar\", \"job\":\"IMPORT id=%d\", \"n\":\"1\"}", importJobID)))
require.True(t, strings.Contains(string(goroutines), "github.com/cockroachdb/cockroach/pkg/sql_test.fakeExecResumer.Resume"))
})
}

func checkExecutionDetails(
t *testing.T, s serverutils.TestServerInterface, jobID jobspb.JobID, filename string,
) {
) []byte {
t.Helper()

client, err := s.GetAdminHTTPClient()
Expand All @@ -112,4 +142,5 @@ func checkExecutionDetails(
data, err := io.ReadAll(r)
require.NoError(t, err)
require.NotEmpty(t, data)
return data
}

0 comments on commit c403d35

Please sign in to comment.