From 07af69f07a52075f5feddc8d8f49b17299fe1135 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Mon, 27 Mar 2023 16:42:56 -0400 Subject: [PATCH] jobsprofiler: store DistSQL diagram of jobs in job info This change teaches import, cdc, backup and restore to store their DistSQL plans in the job_info table under a timestamped info key. The generation and writing of the plan diagram is done asynchronously so as to not slow down the execution of the job. A new plan will be stored everytime the job sets up its DistSQL flow. Release note: None Epic: CRDB-8964 --- pkg/BUILD.bazel | 4 + pkg/ccl/backupccl/BUILD.bazel | 1 + .../backupccl/backup_processor_planning.go | 11 +++ pkg/ccl/backupccl/restore_job.go | 2 +- .../backupccl/restore_processor_planning.go | 13 ++- pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/changefeed_dist.go | 7 ++ pkg/ccl/streamingccl/streamingest/BUILD.bazel | 1 + .../stream_ingestion_processor_planning.go | 11 +++ pkg/jobs/jobsprofiler/BUILD.bazel | 49 +++++++++ pkg/jobs/jobsprofiler/main_test.go | 35 +++++++ pkg/jobs/jobsprofiler/profiler.go | 54 ++++++++++ pkg/jobs/jobsprofiler/profiler_test.go | 99 +++++++++++++++++++ pkg/sql/importer/BUILD.bazel | 1 + pkg/sql/importer/import_processor_planning.go | 8 ++ 15 files changed, 293 insertions(+), 4 deletions(-) create mode 100644 pkg/jobs/jobsprofiler/BUILD.bazel create mode 100644 pkg/jobs/jobsprofiler/main_test.go create mode 100644 pkg/jobs/jobsprofiler/profiler.go create mode 100644 pkg/jobs/jobsprofiler/profiler_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 10a4f6eb4b20..6995920281ec 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -176,6 +176,7 @@ ALL_TESTS = [ "//pkg/jobs/joberror:joberror_test", "//pkg/jobs/jobsauth:jobsauth_test", "//pkg/jobs/jobspb:jobspb_test", + "//pkg/jobs/jobsprofiler:jobsprofiler_test", "//pkg/jobs/jobsprotectedts:jobsprotectedts_test", "//pkg/jobs:jobs_test", "//pkg/keys:keys_test", @@ -1159,6 +1160,8 @@ GO_TARGETS = [ "//pkg/jobs/jobsauth:jobsauth_test", "//pkg/jobs/jobspb:jobspb", "//pkg/jobs/jobspb:jobspb_test", + "//pkg/jobs/jobsprofiler:jobsprofiler", + "//pkg/jobs/jobsprofiler:jobsprofiler_test", "//pkg/jobs/jobsprotectedts:jobsprotectedts", "//pkg/jobs/jobsprotectedts:jobsprotectedts_test", "//pkg/jobs/jobstest:jobstest", @@ -2632,6 +2635,7 @@ GET_X_DATA_TARGETS = [ "//pkg/jobs/joberror:get_x_data", "//pkg/jobs/jobsauth:get_x_data", "//pkg/jobs/jobspb:get_x_data", + "//pkg/jobs/jobsprofiler:get_x_data", "//pkg/jobs/jobsprotectedts:get_x_data", "//pkg/jobs/jobstest:get_x_data", "//pkg/jobs/metricspoller:get_x_data", diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index a0ea91dc8c97..a5150417d831 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -59,6 +59,7 @@ go_library( "//pkg/jobs", "//pkg/jobs/joberror", "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprofiler", "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/kv", diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 5bc54356a569..6b44c8a551c5 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/backupccl/backuppb" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -170,7 +171,11 @@ func distBackup( // Setup a one-stage plan with one proc per input spec. corePlacement := make([]physicalplan.ProcessorCorePlacement, len(backupSpecs)) i := 0 + var jobID jobspb.JobID for sqlInstanceID, spec := range backupSpecs { + if i == 0 { + jobID = jobspb.JobID(spec.JobID) + } corePlacement[i].SQLInstanceID = sqlInstanceID corePlacement[i].Core.BackupData = spec i++ @@ -206,6 +211,12 @@ func distBackup( defer recv.Release() defer close(progCh) + execCfg := execCtx.ExecCfg() + if err := jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, + p, execCfg.InternalDB, jobID); err != nil { + log.Warningf(ctx, "failed to store DistSQL plan diagram for job %d: %+v", jobID, err.Error()) + } + // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index c9ae9d1af75c..3db1f805c519 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -393,7 +393,7 @@ func restore( return distRestore( ctx, execCtx, - int64(job.ID()), + job.ID(), dataToRestore, endTime, encryption, diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 22fd790b8d23..7cbf458d6ad7 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -60,7 +61,7 @@ var replanRestoreFrequency = settings.RegisterDurationSetting( func distRestore( ctx context.Context, execCtx sql.JobExecContext, - jobID int64, + jobID jobspb.JobID, dataToRestore restorationData, restoreTime hlc.Timestamp, encryption *jobspb.BackupEncryptionOptions, @@ -111,7 +112,7 @@ func distRestore( p := planCtx.NewPhysicalPlan() restoreDataSpec := execinfrapb.RestoreDataSpec{ - JobID: jobID, + JobID: int64(jobID), RestoreTime: restoreTime, Encryption: fileEncryption, TableRekeys: dataToRestore.getRekeys(), @@ -184,7 +185,7 @@ func distRestore( NumNodes: int64(numNodes), UseSimpleImportSpans: useSimpleImportSpans, UseFrontierCheckpointing: spanFilter.useFrontierCheckpointing, - JobID: jobID, + JobID: int64(jobID), } if spanFilter.useFrontierCheckpointing { spec.CheckpointedSpans = persistFrontier(spanFilter.checkpointFrontier, 0) @@ -292,6 +293,12 @@ func distRestore( ) defer recv.Release() + execCfg := execCtx.ExecCfg() + if err := jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, + p, execCfg.InternalDB, jobID); err != nil { + log.Warningf(ctx, "failed to store DistSQL plan diagram for job %d: %+v", jobID, err.Error()) + } + // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index efc731faddbc..6f31ab661da8 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -63,6 +63,7 @@ go_library( "//pkg/jobs", "//pkg/jobs/jobsauth", "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprofiler", "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/kv", diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 093aa1fc98bc..f0ad5532cb27 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -38,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -314,6 +316,11 @@ func startDistChangefeed( finishedSetupFn = func(flowinfra.Flow) { resultsCh <- tree.Datums(nil) } } + if err := jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, + p, execCfg.InternalDB, jobID); err != nil { + log.Warningf(ctx, "failed to store DistSQL plan diagram for job %d: %+v", jobID, err.Error()) + } + // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx // p is the physical plan, recv is the distsqlreceiver diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 33aa65986bcb..6675b57cfd3f 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//pkg/cloud/externalconn/connectionpb", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprofiler", "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/kv", diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index e0b3dabae956..13e5ca7e14e9 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -23,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/logtags" ) @@ -110,7 +112,11 @@ func distStreamIngest( // Setup a one-stage plan with one proc per input spec. corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs)) + var jobID jobspb.JobID for i := range streamIngestionSpecs { + if i == 0 { + jobID = jobspb.JobID(streamIngestionSpecs[i].JobID) + } corePlacement[i].SQLInstanceID = sqlInstanceIDs[i] corePlacement[i].Core.StreamIngestionData = streamIngestionSpecs[i] } @@ -151,6 +157,11 @@ func distStreamIngest( ) defer recv.Release() + if err := jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, + p, execCfg.InternalDB, jobID); err != nil { + log.Warningf(ctx, "failed to store DistSQL plan diagram for job %d: %+v", jobID, err.Error()) + } + // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) diff --git a/pkg/jobs/jobsprofiler/BUILD.bazel b/pkg/jobs/jobsprofiler/BUILD.bazel new file mode 100644 index 000000000000..0c0625cf24bd --- /dev/null +++ b/pkg/jobs/jobsprofiler/BUILD.bazel @@ -0,0 +1,49 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "jobsprofiler", + srcs = ["profiler.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler", + visibility = ["//visibility:public"], + deps = [ + "//pkg/jobs", + "//pkg/jobs/jobspb", + "//pkg/sql", + "//pkg/sql/execinfrapb", + "//pkg/sql/isql", + "//pkg/util/log", + "//pkg/util/stop", + "//pkg/util/timeutil", + ], +) + +go_test( + name = "jobsprofiler_test", + srcs = [ + "main_test.go", + "profiler_test.go", + ], + args = ["-test.timeout=295s"], + deps = [ + "//pkg/base", + "//pkg/ccl", + "//pkg/jobs", + "//pkg/jobs/jobspb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/sql", + "//pkg/sql/isql", + "//pkg/testutils", + "//pkg/testutils/serverutils", + "//pkg/testutils/testcluster", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/jobs/jobsprofiler/main_test.go b/pkg/jobs/jobsprofiler/main_test.go new file mode 100644 index 000000000000..1bc1dd1f6526 --- /dev/null +++ b/pkg/jobs/jobsprofiler/main_test.go @@ -0,0 +1,35 @@ +// Copyright 2017 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package jobsprofiler_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl" + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +func TestMain(m *testing.M) { + defer ccl.TestingEnableEnterprise()() + securityassets.SetLoader(securitytest.EmbeddedAssets) + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + os.Exit(m.Run()) +} + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go diff --git a/pkg/jobs/jobsprofiler/profiler.go b/pkg/jobs/jobsprofiler/profiler.go new file mode 100644 index 000000000000..76913f080f9f --- /dev/null +++ b/pkg/jobs/jobsprofiler/profiler.go @@ -0,0 +1,54 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package jobsprofiler + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// StorePlanDiagram stores the DistSQL diagram generated from p in the job info +// table. The generation of the plan diagram and persistence to the info table +// are done asynchronously and this method does not block on their completion. +func StorePlanDiagram( + ctx context.Context, stopper *stop.Stopper, p *sql.PhysicalPlan, db isql.DB, jobID jobspb.JobID, +) error { + return stopper.RunAsyncTask(ctx, "jobs-store-plan-diagram", func(ctx context.Context) { + var cancel func() + ctx, cancel = stopper.WithCancelOnQuiesce(ctx) + defer cancel() + + err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + flowSpecs := p.GenerateFlowSpecs() + _, diagURL, err := execinfrapb.GeneratePlanDiagramURL(fmt.Sprintf("job:%d", jobID), flowSpecs, execinfrapb.DiagramFlags{}) + if err != nil { + return err + } + + const infoKey = "dsp-diag-url-%d" + infoStorage := jobs.InfoStorageForJob(txn, jobID) + return infoStorage.Write(ctx, []byte(fmt.Sprintf(infoKey, timeutil.Now().UnixNano())), + []byte(diagURL.String())) + }) + if err != nil { + log.Infof(ctx, "failed to generate and write DistSQL diagram for job %d: %v", jobID, err.Error()) + } + }) +} diff --git a/pkg/jobs/jobsprofiler/profiler_test.go b/pkg/jobs/jobsprofiler/profiler_test.go new file mode 100644 index 000000000000..6fbccee7250f --- /dev/null +++ b/pkg/jobs/jobsprofiler/profiler_test.go @@ -0,0 +1,99 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package jobsprofiler_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestProfilerStorePlanDiagram(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + _, err := sqlDB.Exec(`CREATE DATABASE test`) + require.NoError(t, err) + _, err = sqlDB.Exec(`CREATE TABLE foo (id INT PRIMARY KEY)`) + require.NoError(t, err) + _, err = sqlDB.Exec(`INSERT INTO foo VALUES (1), (2)`) + require.NoError(t, err) + _, err = sqlDB.Exec(`SET CLUSTER SETTING kv.rangefeed.enabled = true`) + require.NoError(t, err) + + for _, tc := range []struct { + name string + sql string + typ jobspb.Type + }{ + { + name: "backup", + sql: "BACKUP TABLE foo INTO 'userfile:///foo'", + typ: jobspb.TypeBackup, + }, + { + name: "restore", + sql: "RESTORE TABLE foo FROM LATEST IN 'userfile:///foo' WITH into_db='test'", + typ: jobspb.TypeRestore, + }, + { + name: "changefeed", + sql: "CREATE CHANGEFEED FOR foo INTO 'null://sink'", + typ: jobspb.TypeChangefeed, + }, + } { + t.Run(tc.name, func(t *testing.T) { + _, err := sqlDB.Exec(tc.sql) + require.NoError(t, err) + + var jobID jobspb.JobID + err = sqlDB.QueryRow( + `SELECT id FROM crdb_internal.system_jobs WHERE job_type = $1`, tc.typ.String()).Scan(&jobID) + require.NoError(t, err) + + execCfg := s.TenantOrServer().ExecutorConfig().(sql.ExecutorConfig) + testutils.SucceedsSoon(t, func() error { + var count int + err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + infoStorage := jobs.InfoStorageForJob(txn, jobID) + return infoStorage.Iterate(ctx, []byte("dsp-diag-url"), func(infoKey, value []byte) error { + count++ + return nil + }) + }) + require.NoError(t, err) + if count != 1 { + return errors.Newf("expected a row for the DistSQL diagram to be written but found %d", count) + } + return nil + }) + }) + } +} diff --git a/pkg/sql/importer/BUILD.bazel b/pkg/sql/importer/BUILD.bazel index 81d8724c6ac2..6d80bb46ed5c 100644 --- a/pkg/sql/importer/BUILD.bazel +++ b/pkg/sql/importer/BUILD.bazel @@ -42,6 +42,7 @@ go_library( "//pkg/jobs", "//pkg/jobs/joberror", "//pkg/jobs/jobspb", + "//pkg/jobs/jobsprofiler", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvpb", diff --git a/pkg/sql/importer/import_processor_planning.go b/pkg/sql/importer/import_processor_planning.go index 645e842bcbf0..08cadddaf23e 100644 --- a/pkg/sql/importer/import_processor_planning.go +++ b/pkg/sql/importer/import_processor_planning.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -32,6 +33,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -268,6 +270,12 @@ func distImport( } } + execCfg := execCtx.ExecCfg() + if err := jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, + p, execCfg.InternalDB, job.ID()); err != nil { + log.Warningf(ctx, "failed to store DistSQL plan diagram for job %d: %+v", job.ID(), err.Error()) + } + // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx dsp.Run(ctx, planCtx, nil, p, recv, &evalCtxCopy, testingKnobs.onSetupFinish)