Skip to content

Commit

Permalink
jobsprofiler: store DistSQL diagram of jobs in job info
Browse files Browse the repository at this point in the history
This change teaches import, cdc, backup and restore
to store their DistSQL plans in the job_info table
under a timestamped info key. The generation and writing
of the plan diagram is done asynchronously so as to not
slow down the execution of the job. A new plan will be
stored everytime the job sets up its DistSQL flow.

Release note: None
Epic: CRDB-8964
  • Loading branch information
adityamaru committed Mar 27, 2023
1 parent 83430fc commit da25b21
Show file tree
Hide file tree
Showing 15 changed files with 279 additions and 4 deletions.
4 changes: 4 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ ALL_TESTS = [
"//pkg/jobs/joberror:joberror_test",
"//pkg/jobs/jobsauth:jobsauth_test",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprofiler:jobsprofiler_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs:jobs_test",
"//pkg/keys:keys_test",
Expand Down Expand Up @@ -1159,6 +1160,8 @@ GO_TARGETS = [
"//pkg/jobs/jobsauth:jobsauth_test",
"//pkg/jobs/jobspb:jobspb",
"//pkg/jobs/jobspb:jobspb_test",
"//pkg/jobs/jobsprofiler:jobsprofiler",
"//pkg/jobs/jobsprofiler:jobsprofiler_test",
"//pkg/jobs/jobsprotectedts:jobsprotectedts",
"//pkg/jobs/jobsprotectedts:jobsprotectedts_test",
"//pkg/jobs/jobstest:jobstest",
Expand Down Expand Up @@ -2632,6 +2635,7 @@ GET_X_DATA_TARGETS = [
"//pkg/jobs/joberror:get_x_data",
"//pkg/jobs/jobsauth:get_x_data",
"//pkg/jobs/jobspb:get_x_data",
"//pkg/jobs/jobsprofiler:get_x_data",
"//pkg/jobs/jobsprotectedts:get_x_data",
"//pkg/jobs/jobstest:get_x_data",
"//pkg/jobs/metricspoller:get_x_data",
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/joberror",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -170,7 +171,11 @@ func distBackup(
// Setup a one-stage plan with one proc per input spec.
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(backupSpecs))
i := 0
var jobID jobspb.JobID
for sqlInstanceID, spec := range backupSpecs {
if i == 0 {
jobID = jobspb.JobID(spec.JobID)
}
corePlacement[i].SQLInstanceID = sqlInstanceID
corePlacement[i].Core.BackupData = spec
i++
Expand Down Expand Up @@ -206,6 +211,9 @@ func distBackup(
defer recv.Release()

defer close(progCh)
execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func restore(
return distRestore(
ctx,
execCtx,
int64(job.ID()),
job.ID(),
dataToRestore,
endTime,
encryption,
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -60,7 +61,7 @@ var replanRestoreFrequency = settings.RegisterDurationSetting(
func distRestore(
ctx context.Context,
execCtx sql.JobExecContext,
jobID int64,
jobID jobspb.JobID,
dataToRestore restorationData,
restoreTime hlc.Timestamp,
encryption *jobspb.BackupEncryptionOptions,
Expand Down Expand Up @@ -111,7 +112,7 @@ func distRestore(
p := planCtx.NewPhysicalPlan()

restoreDataSpec := execinfrapb.RestoreDataSpec{
JobID: jobID,
JobID: int64(jobID),
RestoreTime: restoreTime,
Encryption: fileEncryption,
TableRekeys: dataToRestore.getRekeys(),
Expand Down Expand Up @@ -184,7 +185,7 @@ func distRestore(
NumNodes: int64(numNodes),
UseSimpleImportSpans: useSimpleImportSpans,
UseFrontierCheckpointing: spanFilter.useFrontierCheckpointing,
JobID: jobID,
JobID: int64(jobID),
}
if spanFilter.useFrontierCheckpointing {
spec.CheckpointedSpans = persistFrontier(spanFilter.checkpointFrontier, 0)
Expand Down Expand Up @@ -292,6 +293,9 @@ func distRestore(
)
defer recv.Release()

execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobsauth",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -314,6 +315,8 @@ func startDistChangefeed(
finishedSetupFn = func(flowinfra.Flow) { resultsCh <- tree.Datums(nil) }
}

jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
// p is the physical plan, recv is the distsqlreceiver
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/cloud/externalconn/connectionpb",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -110,7 +111,11 @@ func distStreamIngest(

// Setup a one-stage plan with one proc per input spec.
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs))
var jobID jobspb.JobID
for i := range streamIngestionSpecs {
if i == 0 {
jobID = jobspb.JobID(streamIngestionSpecs[i].JobID)
}
corePlacement[i].SQLInstanceID = sqlInstanceIDs[i]
corePlacement[i].Core.StreamIngestionData = streamIngestionSpecs[i]
}
Expand Down Expand Up @@ -151,6 +156,8 @@ func distStreamIngest(
)
defer recv.Release()

jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
Expand Down
49 changes: 49 additions & 0 deletions pkg/jobs/jobsprofiler/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "jobsprofiler",
srcs = ["profiler.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/sql",
"//pkg/sql/execinfrapb",
"//pkg/sql/isql",
"//pkg/util/log",
"//pkg/util/stop",
"//pkg/util/timeutil",
],
)

go_test(
name = "jobsprofiler_test",
srcs = [
"main_test.go",
"profiler_test.go",
],
args = ["-test.timeout=295s"],
deps = [
"//pkg/base",
"//pkg/ccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/isql",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
35 changes: 35 additions & 0 deletions pkg/jobs/jobsprofiler/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2017 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package jobsprofiler_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl"
"github.com/cockroachdb/cockroach/pkg/security/securityassets"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
defer ccl.TestingEnableEnterprise()()
securityassets.SetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
58 changes: 58 additions & 0 deletions pkg/jobs/jobsprofiler/profiler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package jobsprofiler

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

// StorePlanDiagram stores the DistSQL diagram generated from p in the job info
// table. The generation of the plan diagram and persistence to the info table
// are done asynchronously and this method does not block on their completion.
func StorePlanDiagram(
ctx context.Context, stopper *stop.Stopper, p *sql.PhysicalPlan, db isql.DB, jobID jobspb.JobID,
) {
if err := stopper.RunAsyncTask(ctx, "jobs-store-plan-diagram", func(ctx context.Context) {
var cancel func()
ctx, cancel = stopper.WithCancelOnQuiesce(ctx)
defer cancel()

err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
flowSpecs := p.GenerateFlowSpecs()
_, diagURL, err := execinfrapb.GeneratePlanDiagramURL(fmt.Sprintf("job:%d", jobID), flowSpecs, execinfrapb.DiagramFlags{})
if err != nil {
return err
}

const infoKey = "dsp-diag-url-%d"
infoStorage := jobs.InfoStorageForJob(txn, jobID)
return infoStorage.Write(ctx, []byte(fmt.Sprintf(infoKey, timeutil.Now().UnixNano())),
[]byte(diagURL.String()))
})
if err != nil {
log.Warningf(ctx, "failed to generate and write DistSQL diagram for job %d: %v",
jobID, err.Error())
}
}); err != nil {
log.Warningf(ctx, "failed to spawn task to generate DistSQL plan diagram for job %d: %v",
jobID, err.Error())
}
}
Loading

0 comments on commit da25b21

Please sign in to comment.