From 881576ef5c644901a419c66d9f0f9089790fe85c Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 6 Nov 2023 14:06:14 +0000 Subject: [PATCH 1/5] jobs: plumb cluster version to info table accessor This is a pure refactor to plumb a clusterversion.Handle to the info table accessor via all the call sites and wrapping structs/call trees; no behavior change, or usage of the plumbed cv, is added in this commit. Release note: none. Epic: none. --- pkg/ccl/backupccl/backup_job.go | 4 +-- .../backupccl/backup_processor_planning.go | 2 +- pkg/ccl/backupccl/restore_job.go | 2 +- .../backupccl/restore_processor_planning.go | 2 +- pkg/ccl/changefeedccl/changefeed_dist.go | 2 +- .../streamingccl/replicationutils/BUILD.bazel | 1 + .../streamingccl/replicationutils/utils.go | 9 ++++--- pkg/ccl/streamingccl/streamingest/BUILD.bazel | 1 + .../replication_execution_details.go | 16 +++++++---- .../replication_execution_details_test.go | 6 ++--- .../replication_stream_e2e_test.go | 6 +++-- .../streamingest/stream_ingestion_dist.go | 4 +-- .../stream_ingestion_frontier_processor.go | 2 +- .../streamingest/stream_ingestion_job.go | 13 +++++---- .../stream_ingestion_processor.go | 5 +++- .../streamproducer/producer_job.go | 2 +- pkg/jobs/adopt.go | 2 +- pkg/jobs/execution_detail_utils.go | 27 ++++++++++++++----- pkg/jobs/execution_detail_utils_test.go | 8 +++--- pkg/jobs/job_info_storage.go | 8 +++--- pkg/jobs/job_info_utils.go | 14 +++++++--- pkg/jobs/job_info_utils_test.go | 14 +++++----- pkg/jobs/jobs.go | 10 ++++--- pkg/jobs/jobs_test.go | 6 ++--- pkg/jobs/jobsprofiler/BUILD.bazel | 1 + pkg/jobs/jobsprofiler/profiler.go | 13 ++++++--- pkg/jobs/jobsprofiler/profiler_test.go | 14 +++++----- pkg/server/autoconfig/BUILD.bazel | 1 + .../autoconfig/auto_config_env_runner.go | 8 +++--- pkg/server/autoconfig/auto_config_task.go | 4 +-- pkg/server/autoconfig/task_markers.go | 17 ++++++------ pkg/server/job_profiler.go | 15 ++++++----- pkg/server/status.go | 4 +-- pkg/sql/importer/import_processor_planning.go | 2 +- pkg/sql/jobs_profiler_execution_details.go | 12 ++++----- .../jobs_profiler_execution_details_test.go | 24 +++++++++++------ .../upgrademanager/manager_external_test.go | 2 +- pkg/util/bulk/BUILD.bazel | 1 + pkg/util/bulk/aggregator_stats.go | 6 +++-- 39 files changed, 176 insertions(+), 114 deletions(-) diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 2830c0cd898b..704b4c008f97 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -262,7 +262,7 @@ func backup( return nil } jobsprofiler.StorePerNodeProcessorProgressFraction( - ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog) + ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog, execCtx.ExecCfg().Settings.Version) case <-ctx.Done(): return ctx.Err() } @@ -2095,7 +2095,7 @@ func (b *backupResumer) CollectProfile(ctx context.Context, execCtx interface{}) aggStatsCopy = b.mu.perNodeAggregatorStats.DeepCopy() }() return bulkutil.FlushTracingAggregatorStats(ctx, b.job.ID(), - p.ExecCfg().InternalDB, aggStatsCopy) + p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version) } func (b *backupResumer) deleteCheckpoint( diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 71ea019d579d..7ecbfedf4488 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -219,7 +219,7 @@ func distBackup( defer close(progCh) defer close(tracingAggCh) execCfg := execCtx.ExecCfg() - jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID) + jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID, execCfg.Settings.Version) // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index b71583ee5c3d..a53ca04d511c 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -2544,7 +2544,7 @@ func (r *restoreResumer) CollectProfile(ctx context.Context, execCtx interface{} aggStatsCopy = r.mu.perNodeAggregatorStats.DeepCopy() }() return bulkutil.FlushTracingAggregatorStats(ctx, r.job.ID(), - p.ExecCfg().InternalDB, aggStatsCopy) + p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version) } // dropDescriptors implements the OnFailOrCancel logic. diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 58a24e1c39db..0f8d8dac1483 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -317,7 +317,7 @@ func distRestore( defer recv.Release() execCfg := execCtx.ExecCfg() - jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, md.jobID) + jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, md.jobID, execCfg.Settings.Version) // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 3f918fabc3d2..471383efe1ca 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -303,7 +303,7 @@ func startDistChangefeed( finishedSetupFn = func(flowinfra.Flow) { resultsCh <- tree.Datums(nil) } } - jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID) + jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID, execCfg.Settings.Version) // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx diff --git a/pkg/ccl/streamingccl/replicationutils/BUILD.bazel b/pkg/ccl/streamingccl/replicationutils/BUILD.bazel index b76b53dca6c8..1c448dcb0786 100644 --- a/pkg/ccl/streamingccl/replicationutils/BUILD.bazel +++ b/pkg/ccl/streamingccl/replicationutils/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/kv/kvpb", diff --git a/pkg/ccl/streamingccl/replicationutils/utils.go b/pkg/ccl/streamingccl/replicationutils/utils.go index 49d0a3393db6..f743b9a27115 100644 --- a/pkg/ccl/streamingccl/replicationutils/utils.go +++ b/pkg/ccl/streamingccl/replicationutils/utils.go @@ -14,6 +14,7 @@ import ( "fmt" "testing" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -174,9 +175,9 @@ func ReplicatedTimeFromProgress(p *jobspb.Progress) hlc.Timestamp { // LoadIngestionProgress loads the latest persisted stream ingestion progress. // The method returns nil if the progress does not exist yet. func LoadIngestionProgress( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) (*jobspb.StreamIngestionProgress, error) { - progress, err := jobs.LoadJobProgress(ctx, db, jobID) + progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv) if err != nil || progress == nil { return nil, err } @@ -192,9 +193,9 @@ func LoadIngestionProgress( // LoadReplicationProgress loads the latest persisted stream replication progress. // The method returns nil if the progress does not exist yet. func LoadReplicationProgress( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) (*jobspb.StreamReplicationProgress, error) { - progress, err := jobs.LoadJobProgress(ctx, db, jobID) + progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv) if err != nil || progress == nil { return nil, err } diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 6dcc2ec085f6..8573320823ba 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//pkg/cloud", "//pkg/cloud/externalconn", "//pkg/cloud/externalconn/connectionpb", + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobsprofiler", diff --git a/pkg/ccl/streamingccl/streamingest/replication_execution_details.go b/pkg/ccl/streamingccl/streamingest/replication_execution_details.go index 3a6dda6e13c6..a8d6b6806b4b 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_execution_details.go +++ b/pkg/ccl/streamingccl/streamingest/replication_execution_details.go @@ -16,6 +16,7 @@ import ( "text/tabwriter" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -104,7 +105,11 @@ func constructSpanFrontierExecutionDetails( // - The snapshot of the frontier tracking how far each span has been replicated // up to. func generateSpanFrontierExecutionDetailFile( - ctx context.Context, execCfg *sql.ExecutorConfig, ingestionJobID jobspb.JobID, skipBehindBy bool, + ctx context.Context, + execCfg *sql.ExecutorConfig, + ingestionJobID jobspb.JobID, + skipBehindBy bool, + cv clusterversion.Handle, ) error { return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { var sb bytes.Buffer @@ -112,7 +117,7 @@ func generateSpanFrontierExecutionDetailFile( // Read the StreamIngestionPartitionSpecs to get a mapping from spans to // their source and destination SQL instance IDs. - specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID) + specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID, cv) if err != nil { return err } @@ -124,7 +129,7 @@ func generateSpanFrontierExecutionDetailFile( // Now, read the latest snapshot of the frontier that tells us what // timestamp each span has been replicated up to. - frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID) + frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID, cv) if err != nil { return err } @@ -157,7 +162,7 @@ func generateSpanFrontierExecutionDetailFile( if err := w.Flush(); err != nil { return err } - return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID) + return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID, cv) }) } @@ -170,6 +175,7 @@ func persistStreamIngestionPartitionSpecs( execCfg *sql.ExecutorConfig, ingestionJobID jobspb.JobID, streamIngestionSpecs map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec, + cv clusterversion.Handle, ) error { err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { specs := make([]*execinfrapb.StreamIngestionPartitionSpec, 0) @@ -183,7 +189,7 @@ func persistStreamIngestionPartitionSpecs( if err != nil { return err } - return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID) + return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID, cv) }) if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.AfterPersistingPartitionSpecs != nil { knobs.AfterPersistingPartitionSpecs() diff --git a/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go b/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go index e1dc2a7f3dcf..6cb628b77892 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go @@ -344,7 +344,7 @@ func TestEndToEndFrontierExecutionDetailFile(t *testing.T) { ingestionJobID := jobspb.JobID(123) require.NoError(t, persistStreamIngestionPartitionSpecs(ctx, &execCfg, - ingestionJobID, streamIngestionsSpecs)) + ingestionJobID, streamIngestionsSpecs, execCfg.Settings.Version)) // Now, let's persist some frontier entries. frontierEntries := execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ @@ -369,9 +369,9 @@ func TestEndToEndFrontierExecutionDetailFile(t *testing.T) { frontierBytes, err := protoutil.Marshal(&frontierEntries) require.NoError(t, err) require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, ingestionJobID) + return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, ingestionJobID, execCfg.Settings.Version) })) - require.NoError(t, generateSpanFrontierExecutionDetailFile(ctx, &execCfg, ingestionJobID, true /* skipBehindBy */)) + require.NoError(t, generateSpanFrontierExecutionDetailFile(ctx, &execCfg, ingestionJobID, true /* skipBehindBy */, execCfg.Settings.Version)) files := listExecutionDetails(t, srv, ingestionJobID) require.Len(t, files, 1) data, err := checkExecutionDetails(t, srv, ingestionJobID, files[0]) diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index 92207d0eb95c..062f64968428 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -1114,12 +1114,14 @@ func TestLoadProducerAndIngestionProgress(t *testing.T) { c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID)) srcDB := c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB - producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID)) + producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID), + c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version) require.NoError(t, err) require.Equal(t, jobspb.StreamReplicationProgress_NOT_FINISHED, producerProgress.StreamIngestionStatus) destDB := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB - ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID)) + ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID), + c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version) require.NoError(t, err) require.Equal(t, jobspb.Replicating, ingestionProgress.ReplicationStatus) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index f20d9ffb6db4..f3f598a4fd90 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -113,7 +113,7 @@ func startDistIngestion( return errors.Wrap(err, "failed to update job progress") } jobsprofiler.StorePlanDiagram(ctx, execCtx.ExecCfg().DistSQLSrv.Stopper, planner.initialPlan, execCtx.ExecCfg().InternalDB, - ingestionJob.ID()) + ingestionJob.ID(), execCtx.ExecCfg().Settings.Version) replanOracle := sql.ReplanOnCustomFunc( measurePlanChange, @@ -463,7 +463,7 @@ func (p *replicationFlowPlanner) constructPlanGenerator( if !p.createdInitialPlan() { // Only persist the initial plan as it's the only plan that actually gets // executed. - if err := persistStreamIngestionPartitionSpecs(ctx, execCtx.ExecCfg(), ingestionJobID, streamIngestionSpecs); err != nil { + if err := persistStreamIngestionPartitionSpecs(ctx, execCtx.ExecCfg(), ingestionJobID, streamIngestionSpecs, execCtx.ExecCfg().Settings.Version); err != nil { return nil, nil, err } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 55c4ee245b47..a0260874fdc7 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -518,7 +518,7 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error { } if err = sf.FlowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, jobID) + return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, jobID, sf.EvalCtx.Settings.Version) }); err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 6e23b719969c..d8b952e1001b 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" @@ -224,7 +225,7 @@ func ingestWithRetries( } status := redact.Sprintf("waiting before retrying error: %s", err) updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, status) - newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob) + newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob, execCtx.ExecCfg().Settings.Version) if lastReplicatedTime.Less(newReplicatedTime) { r.Reset() lastReplicatedTime = newReplicatedTime @@ -241,8 +242,10 @@ func ingestWithRetries( return nil } -func loadReplicatedTime(ctx context.Context, db isql.DB, ingestionJob *jobs.Job) hlc.Timestamp { - latestProgress, err := replicationutils.LoadIngestionProgress(ctx, db, ingestionJob.ID()) +func loadReplicatedTime( + ctx context.Context, db isql.DB, ingestionJob *jobs.Job, cv clusterversion.Handle, +) hlc.Timestamp { + latestProgress, err := replicationutils.LoadIngestionProgress(ctx, db, ingestionJob.ID(), cv) if err != nil { log.Warningf(ctx, "error loading job progress: %s", err) return hlc.Timestamp{} @@ -556,11 +559,11 @@ func (s *streamIngestionResumer) CollectProfile(ctx context.Context, execCtx int var combinedErr error if err := bulkutil.FlushTracingAggregatorStats(ctx, s.job.ID(), - p.ExecCfg().InternalDB, aggStatsCopy); err != nil { + p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version); err != nil { combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, "failed to flush aggregator stats")) } if err := generateSpanFrontierExecutionDetailFile(ctx, p.ExecCfg(), - s.job.ID(), false /* skipBehindBy */); err != nil { + s.job.ID(), false /* skipBehindBy */, p.ExecCfg().Settings.Version); err != nil { combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, "failed to generate span frontier execution details")) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 889c320e2cc0..89f82054cb9d 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -318,6 +319,7 @@ func newStreamIngestionDataProcessor( cutoverProvider: &cutoverFromJobProgress{ jobID: jobspb.JobID(spec.JobID), db: flowCtx.Cfg.DB, + cv: flowCtx.Cfg.Settings.Version, }, buffer: &streamIngestionBuffer{}, cutoverCh: make(chan struct{}), @@ -1245,10 +1247,11 @@ type cutoverProvider interface { type cutoverFromJobProgress struct { db isql.DB jobID jobspb.JobID + cv clusterversion.Handle } func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, error) { - ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, c.db, c.jobID) + ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, c.db, c.jobID, c.cv) if err != nil { return false, err } diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job.go b/pkg/ccl/streamingccl/streamproducer/producer_job.go index 7c95053cb438..26d7170e2193 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job.go @@ -99,7 +99,7 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er case <-p.timer.Ch(): p.timer.MarkRead() p.timer.Reset(streamingccl.StreamReplicationStreamLivenessTrackFrequency.Get(execCfg.SV())) - progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID()) + progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID(), execCfg.Settings.Version) if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.AfterResumerJobLoad != nil { err = knobs.AfterResumerJobLoad(err) } diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 7af146b25ade..031d00bf12cc 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -87,7 +87,7 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j r.ID().String(), timeutil.Now().Format("20060102_150405.00")) td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()} if err := r.db.Txn(dumpCtx, func(ctx context.Context, txn isql.Txn) error { - return WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, txn, jobID) + return WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, txn, jobID, r.settings.Version) }); err != nil { log.Warning(dumpCtx, "failed to write trace on resumer trace file") } diff --git a/pkg/jobs/execution_detail_utils.go b/pkg/jobs/execution_detail_utils.go index b1a13bb5ff81..49098bbebc5a 100644 --- a/pkg/jobs/execution_detail_utils.go +++ b/pkg/jobs/execution_detail_utils.go @@ -16,6 +16,7 @@ import ( "fmt" "strings" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -46,14 +47,19 @@ func compressChunk(chunkBuf []byte) ([]byte, error) { // forms in the list of files displayed on the jobs' Advanced Debugging // DBConsole page. func WriteProtobinExecutionDetailFile( - ctx context.Context, filename string, msg protoutil.Message, txn isql.Txn, jobID jobspb.JobID, + ctx context.Context, + filename string, + msg protoutil.Message, + txn isql.Txn, + jobID jobspb.JobID, + cv clusterversion.Handle, ) error { name := fmt.Sprintf("%s~%s.binpb", filename, proto.MessageName(msg)) b, err := protoutil.Marshal(msg) if err != nil { return err } - return WriteExecutionDetailFile(ctx, name, b, txn, jobID) + return WriteExecutionDetailFile(ctx, name, b, txn, jobID, cv) } // WriteExecutionDetailFile will chunk and write to the job_info table under the @@ -64,10 +70,15 @@ func WriteProtobinExecutionDetailFile( // This method clears any existing file with the same filename before writing a // new one. func WriteExecutionDetailFile( - ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID, + ctx context.Context, + filename string, + data []byte, + txn isql.Txn, + jobID jobspb.JobID, + cv clusterversion.Handle, ) error { return WriteChunkedFileToJobInfo(ctx, - profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename), data, txn, jobID) + profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename), data, txn, jobID, cv) } // ProtobinExecutionDetailFile interface encapsulates the methods that must be @@ -98,7 +109,7 @@ func stringifyProtobinFile(filename string, fileContents []byte) ([]byte, error) // ReadExecutionDetailFile will stitch together all the chunks corresponding to the // filename and return the uncompressed data of the file. func ReadExecutionDetailFile( - ctx context.Context, filename string, txn isql.Txn, jobID jobspb.JobID, + ctx context.Context, filename string, txn isql.Txn, jobID jobspb.JobID, cv clusterversion.Handle, ) ([]byte, error) { // TODO(adityamaru): If filename=all add logic to zip up all the files corresponding // to the job's execution details and return the zipped bundle instead. @@ -113,6 +124,7 @@ func ReadExecutionDetailFile( profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(trimmedFilename), txn, jobID, + cv, ) if err != nil { return nil, err @@ -125,17 +137,18 @@ func ReadExecutionDetailFile( profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename), txn, jobID, + cv, ) } // ListExecutionDetailFiles lists all the files that have been generated as part // of a job's execution details. func ListExecutionDetailFiles( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) ([]string, error) { var res []string if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - jobInfo := InfoStorageForJob(txn, jobID) + jobInfo := InfoStorageForJob(txn, jobID, cv) // Iterate over all the files that have been stored as part of the job's // execution details. diff --git a/pkg/jobs/execution_detail_utils_test.go b/pkg/jobs/execution_detail_utils_test.go index 9c9590945490..fdde967fa206 100644 --- a/pkg/jobs/execution_detail_utils_test.go +++ b/pkg/jobs/execution_detail_utils_test.go @@ -51,7 +51,7 @@ func TestReadWriteListExecutionDetailFiles(t *testing.T) { // Generate random data of size between 15 bytes and 5 MiB. data := make([]byte, 15+rand.Intn(5*1024*1024-15)) randutil.ReadTestdataBytes(rng, data) - err := WriteExecutionDetailFile(ctx, filename, data, txn, jobspb.JobID(123)) + err := WriteExecutionDetailFile(ctx, filename, data, txn, jobspb.JobID(123), ts.ClusterSettings().Version) if err != nil { return err } @@ -59,11 +59,11 @@ func TestReadWriteListExecutionDetailFiles(t *testing.T) { } // Write a binpb format file. - return WriteProtobinExecutionDetailFile(ctx, "testproto", msg, txn, jobspb.JobID(123)) + return WriteProtobinExecutionDetailFile(ctx, "testproto", msg, txn, jobspb.JobID(123), ts.ClusterSettings().Version) })) // List the files. - listedFiles, err := ListExecutionDetailFiles(ctx, ts.InternalDB().(isql.DB), jobspb.JobID(123)) + listedFiles, err := ListExecutionDetailFiles(ctx, ts.InternalDB().(isql.DB), jobspb.JobID(123), ts.ClusterSettings().Version) require.NoError(t, err) require.ElementsMatch(t, append(filenames, "testproto~cockroach.sql.jobs.jobspb.BackupDetails.binpb.txt", @@ -76,7 +76,7 @@ func TestReadWriteListExecutionDetailFiles(t *testing.T) { // Skip the text version of the binpb file. continue } - readData, err := ReadExecutionDetailFile(ctx, filename, txn, jobspb.JobID(123)) + readData, err := ReadExecutionDetailFile(ctx, filename, txn, jobspb.JobID(123), s.ClusterSettings().Version) require.NoError(t, err) if strings.HasSuffix(filename, "binpb") { // For the binpb file, unmarshal the data and compare it to the original message. diff --git a/pkg/jobs/job_info_storage.go b/pkg/jobs/job_info_storage.go index 5a0d9c85f738..c249d9d95754 100644 --- a/pkg/jobs/job_info_storage.go +++ b/pkg/jobs/job_info_storage.go @@ -14,6 +14,7 @@ import ( "bytes" "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -30,18 +31,19 @@ import ( type InfoStorage struct { j *Job txn isql.Txn + cv clusterversion.Handle } // InfoStorage returns a new InfoStorage with the passed in job and txn. func (j *Job) InfoStorage(txn isql.Txn) InfoStorage { - return InfoStorage{j: j, txn: txn} + return InfoStorage{j: j, txn: txn, cv: j.registry.settings.Version} } // InfoStorageForJob returns a new InfoStorage with the passed in // job ID and txn. It avoids loading the job record. The resulting // job_info writes will not check the job session ID. -func InfoStorageForJob(txn isql.Txn, jobID jobspb.JobID) InfoStorage { - return InfoStorage{j: &Job{id: jobID}, txn: txn} +func InfoStorageForJob(txn isql.Txn, jobID jobspb.JobID, cv clusterversion.Handle) InfoStorage { + return InfoStorage{j: &Job{id: jobID}, txn: txn, cv: cv} } func (i InfoStorage) checkClaimSession(ctx context.Context) error { diff --git a/pkg/jobs/job_info_utils.go b/pkg/jobs/job_info_utils.go index 9420ed324601..40acc086bcbd 100644 --- a/pkg/jobs/job_info_utils.go +++ b/pkg/jobs/job_info_utils.go @@ -17,6 +17,7 @@ import ( "io" "strings" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/errors" @@ -32,10 +33,15 @@ const finalChunkSuffix = "#_final" // chunks and so if the caller wishes to preserve history they must use a // unique filename. func WriteChunkedFileToJobInfo( - ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID, + ctx context.Context, + filename string, + data []byte, + txn isql.Txn, + jobID jobspb.JobID, + cv clusterversion.Handle, ) error { finalChunkName := filename + finalChunkSuffix - jobInfo := InfoStorageForJob(txn, jobID) + jobInfo := InfoStorageForJob(txn, jobID, cv) // Clear any existing chunks with the same filename before writing new chunks. // We clear all rows that with info keys in [filename, filename#_final~). @@ -81,12 +87,12 @@ func WriteChunkedFileToJobInfo( // ReadChunkedFileToJobInfo will stitch together all the chunks corresponding to // the filename and return the uncompressed data of the file. func ReadChunkedFileToJobInfo( - ctx context.Context, filename string, txn isql.Txn, jobID jobspb.JobID, + ctx context.Context, filename string, txn isql.Txn, jobID jobspb.JobID, cv clusterversion.Handle, ) ([]byte, error) { buf := bytes.NewBuffer([]byte{}) // Iterate over all the chunks of the requested file and return the unzipped // chunks of data. - jobInfo := InfoStorageForJob(txn, jobID) + jobInfo := InfoStorageForJob(txn, jobID, cv) var lastInfoKey string if err := jobInfo.Iterate(ctx, filename, func(infoKey string, value []byte) error { diff --git a/pkg/jobs/job_info_utils_test.go b/pkg/jobs/job_info_utils_test.go index f16c6a40ba92..92313807ccad 100644 --- a/pkg/jobs/job_info_utils_test.go +++ b/pkg/jobs/job_info_utils_test.go @@ -68,11 +68,11 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) { randutil.ReadTestdataBytes(rng, tt.data) } require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123), s.ClusterSettings().Version) if err != nil { return err } - got, err := ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123)) + got, err := ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123), s.ClusterSettings().Version) if err != nil { return err } @@ -133,7 +133,7 @@ func TestOverwriteChunkingWithVariableLengths(t *testing.T) { tt.data = generateData(tt.numChunks) // Write the first file, this will generate a certain number of chunks. require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + return WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123), s.ClusterSettings().Version) })) // Overwrite the file with fewer chunks, this should delete the extra @@ -144,11 +144,11 @@ func TestOverwriteChunkingWithVariableLengths(t *testing.T) { tt.data = generateData(lessChunks) var got []byte require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123), s.ClusterSettings().Version) if err != nil { return err } - got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123)) + got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123), s.ClusterSettings().Version) return err })) require.Equal(t, tt.data, got) @@ -162,11 +162,11 @@ func TestOverwriteChunkingWithVariableLengths(t *testing.T) { tt.data = generateData(moreChunks) var got []byte require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123), s.ClusterSettings().Version) if err != nil { return err } - got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123)) + got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123), s.ClusterSettings().Version) return err })) require.Equal(t, tt.data, got) diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 1d934970f73b..e4b90d3f9b67 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -1124,10 +1124,12 @@ func FormatRetriableExecutionErrorLogToStringArray( } // GetJobTraceID returns the current trace ID of the job from the job progress. -func GetJobTraceID(ctx context.Context, db isql.DB, jobID jobspb.JobID) (tracingpb.TraceID, error) { +func GetJobTraceID( + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, +) (tracingpb.TraceID, error) { var traceID tracingpb.TraceID if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - jobInfo := InfoStorageForJob(txn, jobID) + jobInfo := InfoStorageForJob(txn, jobID, cv) progressBytes, exists, err := jobInfo.GetLegacyProgress(ctx) if err != nil { return err @@ -1151,14 +1153,14 @@ func GetJobTraceID(ctx context.Context, db isql.DB, jobID jobspb.JobID) (tracing // LoadJobProgress returns the job progress from the info table. Note that the // progress can be nil if none is recorded. func LoadJobProgress( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) (*jobspb.Progress, error) { var ( progressBytes []byte exists bool ) if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := InfoStorageForJob(txn, jobID) + infoStorage := InfoStorageForJob(txn, jobID, cv) var err error progressBytes, exists, err = infoStorage.GetLegacyProgress(ctx) return err diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 924b914ec1e0..13092f00a429 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -1193,13 +1193,13 @@ func checkTraceFiles( recordings := make([][]byte, 0) execCfg := s.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig) - edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID) + edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID, execCfg.Settings.Version) require.NoError(t, err) require.Len(t, edFiles, expectedNumFiles) require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { for _, f := range edFiles { - data, err := jobs.ReadExecutionDetailFile(ctx, f, txn, jobID) + data, err := jobs.ReadExecutionDetailFile(ctx, f, txn, jobID, execCfg.Settings.Version) if err != nil { return err } @@ -3691,7 +3691,7 @@ func TestLoadJobProgress(t *testing.T) { _, err := r.CreateJobWithTxn(ctx, rec, 7, nil) require.NoError(t, err) - p, err := jobs.LoadJobProgress(ctx, s.InternalDB().(isql.DB), 7) + p, err := jobs.LoadJobProgress(ctx, s.InternalDB().(isql.DB), 7, s.ClusterSettings().Version) require.NoError(t, err) require.Equal(t, []float32{7.1}, p.GetDetails().(*jobspb.Progress_Import).Import.ReadProgress) } diff --git a/pkg/jobs/jobsprofiler/BUILD.bazel b/pkg/jobs/jobsprofiler/BUILD.bazel index 7a41084f0a78..72f67f0bf3b2 100644 --- a/pkg/jobs/jobsprofiler/BUILD.bazel +++ b/pkg/jobs/jobsprofiler/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobsprofiler/profilerconstants", diff --git a/pkg/jobs/jobsprofiler/profiler.go b/pkg/jobs/jobsprofiler/profiler.go index fd365d17964c..4bd4ec21b1e3 100644 --- a/pkg/jobs/jobsprofiler/profiler.go +++ b/pkg/jobs/jobsprofiler/profiler.go @@ -14,6 +14,7 @@ import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" @@ -29,7 +30,12 @@ import ( // table. The generation of the plan diagram and persistence to the info table // are done asynchronously and this method does not block on their completion. func StorePlanDiagram( - ctx context.Context, stopper *stop.Stopper, p *sql.PhysicalPlan, db isql.DB, jobID jobspb.JobID, + ctx context.Context, + stopper *stop.Stopper, + p *sql.PhysicalPlan, + db isql.DB, + jobID jobspb.JobID, + cv clusterversion.Handle, ) { if err := stopper.RunAsyncTask(ctx, "jobs-store-plan-diagram", func(ctx context.Context) { var cancel func() @@ -45,7 +51,7 @@ func StorePlanDiagram( } dspKey := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano()) - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) return infoStorage.Write(ctx, dspKey, []byte(diagURL.String())) }) // Don't log the error if the context has been canceled. This will likely be @@ -70,9 +76,10 @@ func StorePerNodeProcessorProgressFraction( db isql.DB, jobID jobspb.JobID, perComponentProgress map[execinfrapb.ComponentID]float32, + cv clusterversion.Handle, ) { if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) for componentID, fraction := range perComponentProgress { key := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(), componentID.SQLInstanceID.String(), componentID.ID) diff --git a/pkg/jobs/jobsprofiler/profiler_test.go b/pkg/jobs/jobsprofiler/profiler_test.go index 161b9f30021f..70e7f14f731c 100644 --- a/pkg/jobs/jobsprofiler/profiler_test.go +++ b/pkg/jobs/jobsprofiler/profiler_test.go @@ -96,7 +96,7 @@ func TestProfilerStorePlanDiagram(t *testing.T) { testutils.SucceedsSoon(t, func() error { var count int err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, execCfg.Settings.Version) return infoStorage.Iterate(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, func(infoKey string, value []byte) error { count++ @@ -143,19 +143,19 @@ func TestStorePerNodeProcessorProgressFraction(t *testing.T) { n2proc1.ID = 1 jobsprofiler.StorePerNodeProcessorProgressFraction(ctx, s.InternalDB().(isql.DB), - jobID, map[execinfrapb.ComponentID]float32{n1proc1: 0.95, n2proc1: 0.50}) + jobID, map[execinfrapb.ComponentID]float32{n1proc1: 0.95, n2proc1: 0.50}, s.ClusterSettings().Version) // Update n2proc1. jobsprofiler.StorePerNodeProcessorProgressFraction(ctx, s.InternalDB().(isql.DB), - jobID, map[execinfrapb.ComponentID]float32{n2proc1: 0.70}) + jobID, map[execinfrapb.ComponentID]float32{n2proc1: 0.70}, s.ClusterSettings().Version) // Update n1proc1. jobsprofiler.StorePerNodeProcessorProgressFraction(ctx, s.InternalDB().(isql.DB), - jobID, map[execinfrapb.ComponentID]float32{n1proc1: 1.00}) + jobID, map[execinfrapb.ComponentID]float32{n1proc1: 1.00}, s.ClusterSettings().Version) var persistedProgress map[string]string err := s.ExecutorConfig().(sql.ExecutorConfig).InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { persistedProgress = make(map[string]string) - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, s.ClusterSettings().Version) return infoStorage.Iterate(ctx, profilerconstants.NodeProcessorProgressInfoKeyPrefix, func(infoKey string, value []byte) error { f, err := strconv.ParseFloat(string(value), 32) @@ -219,7 +219,7 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) { testutils.SucceedsSoon(t, func() error { recordings := make([][]byte, 0) execCfg := s.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig) - edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobspb.JobID(jobID)) + edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobspb.JobID(jobID), s.ClusterSettings().Version) if err != nil { return err } @@ -232,7 +232,7 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) { return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { for _, f := range traceFiles { - data, err := jobs.ReadExecutionDetailFile(ctx, f, txn, jobspb.JobID(jobID)) + data, err := jobs.ReadExecutionDetailFile(ctx, f, txn, jobspb.JobID(jobID), s.ClusterSettings().Version) if err != nil { return err } diff --git a/pkg/server/autoconfig/BUILD.bazel b/pkg/server/autoconfig/BUILD.bazel index 8f67ffe3dd30..5013a15c24cb 100644 --- a/pkg/server/autoconfig/BUILD.bazel +++ b/pkg/server/autoconfig/BUILD.bazel @@ -13,6 +13,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/security/username", diff --git a/pkg/server/autoconfig/auto_config_env_runner.go b/pkg/server/autoconfig/auto_config_env_runner.go index 159b27d645c3..50fa4ed8067a 100644 --- a/pkg/server/autoconfig/auto_config_env_runner.go +++ b/pkg/server/autoconfig/auto_config_env_runner.go @@ -146,7 +146,7 @@ func (r *envRunner) maybeRunNextTask( err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (resErr error) { // Re-check if there any other started task already. - otherTaskID, _, err := getCurrentlyStartedTaskID(ctx, txn, r.envID) + otherTaskID, _, err := getCurrentlyStartedTaskID(ctx, txn, r.envID, execCfg.Settings.Version) if err != nil { return err } @@ -161,7 +161,7 @@ func (r *envRunner) maybeRunNextTask( } // Find the latest completed task. - lastTaskID, err := getLastCompletedTaskID(ctx, txn, r.envID) + lastTaskID, err := getLastCompletedTaskID(ctx, txn, r.envID, execCfg.Settings.Version) if err != nil { return err } @@ -190,7 +190,7 @@ func (r *envRunner) maybeRunNextTask( // maybeWaitForCurrentTaskJob(), which is an optimization. Storing // the job ID is not strictly required for sequencing the tasks. if err := writeStartMarker(ctx, txn, - InfoKeyTaskRef{Environment: r.envID, Task: nextTaskID}, jobID); err != nil { + InfoKeyTaskRef{Environment: r.envID, Task: nextTaskID}, jobID, execCfg.Settings.Version); err != nil { return errors.Wrapf(err, "unable to write start marker for task %d", nextTaskID) } @@ -219,7 +219,7 @@ func (r *envRunner) maybeWaitForCurrentTaskJob( if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { var err error - prevTaskID, prevJobID, err = getCurrentlyStartedTaskID(ctx, txn, r.envID) + prevTaskID, prevJobID, err = getCurrentlyStartedTaskID(ctx, txn, r.envID, execCfg.Settings.Version) return err }); err != nil { return errors.Wrap(err, "checking latest task job") diff --git a/pkg/server/autoconfig/auto_config_task.go b/pkg/server/autoconfig/auto_config_task.go index 24011ae8755a..76c81146d69d 100644 --- a/pkg/server/autoconfig/auto_config_task.go +++ b/pkg/server/autoconfig/auto_config_task.go @@ -78,7 +78,7 @@ func (r *taskRunner) OnFailOrCancel(ctx context.Context, execCtx interface{}, jo if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { return markTaskComplete(ctx, txn, InfoKeyTaskRef{Environment: r.envID, Task: r.task.TaskID}, - []byte("task error")) + []byte("task error"), execCfg.Settings.Version) }); err != nil { return err } @@ -180,7 +180,7 @@ func execSimpleSQL( log.Infof(ctx, "finished executing txn statements") return markTaskComplete(ctx, txn, InfoKeyTaskRef{Environment: envID, Task: taskID}, - []byte("task success")) + []byte("task success"), execCfg.Settings.Version) }) } diff --git a/pkg/server/autoconfig/task_markers.go b/pkg/server/autoconfig/task_markers.go index 2b861e8db18c..17071b42b255 100644 --- a/pkg/server/autoconfig/task_markers.go +++ b/pkg/server/autoconfig/task_markers.go @@ -16,6 +16,7 @@ import ( "strconv" "strings" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -141,9 +142,9 @@ func (tr *InfoKeyTaskRef) decodeInternal(prefix, infoKey string) error { // writeStartMarker writes a start marker for the given task ID and // also writes its job ID into the value part. func writeStartMarker( - ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, jobID jobspb.JobID, + ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, jobID jobspb.JobID, cv clusterversion.Handle, ) error { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) return infoStorage.Write(ctx, taskRef.EncodeStartMarkerKey(), []byte(strconv.FormatUint(uint64(jobID), 10))) @@ -152,9 +153,9 @@ func writeStartMarker( // getCurrentlyStartedTaskID retrieves the ID of the last task which // has a start marker in job_info. func getCurrentlyStartedTaskID( - ctx context.Context, txn isql.Txn, env EnvironmentID, + ctx context.Context, txn isql.Txn, env EnvironmentID, cv clusterversion.Handle, ) (prevTaskID TaskID, prevJobID jobspb.JobID, err error) { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) if err := infoStorage.GetLast(ctx, InfoKeyStartPrefix(env), @@ -184,9 +185,9 @@ func getCurrentlyStartedTaskID( // getLastCompletedTaskID retrieves the task ID of the last task which // has a completion marker in job_info. func getLastCompletedTaskID( - ctx context.Context, txn isql.Txn, env EnvironmentID, + ctx context.Context, txn isql.Txn, env EnvironmentID, cv clusterversion.Handle, ) (lastTaskID TaskID, err error) { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) if err := infoStorage.GetLast(ctx, InfoKeyCompletionPrefix(env), @@ -208,9 +209,9 @@ func getLastCompletedTaskID( // markTaskCompletes transactionally removes the task's start marker // and creates a completion marker. func markTaskComplete( - ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, completionValue []byte, + ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, completionValue []byte, cv clusterversion.Handle, ) error { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) // Remove the start marker. if err := infoStorage.Delete(ctx, taskRef.EncodeStartMarkerKey()); err != nil { diff --git a/pkg/server/job_profiler.go b/pkg/server/job_profiler.go index fb9eefef67eb..6c4daeb0f43a 100644 --- a/pkg/server/job_profiler.go +++ b/pkg/server/job_profiler.go @@ -71,7 +71,7 @@ func (s *statusServer) RequestJobProfilerExecutionDetails( return nil, errors.Newf("execution details can only be requested on a cluster with version >= %s", clusterversion.V23_2.String()) } - e := makeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID, execCfg.JobRegistry) + e := makeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID, execCfg.JobRegistry, execCfg.Settings.Version) // TODO(adityamaru): When we start collecting more information we can consider // parallelize the collection of the various pieces. @@ -111,14 +111,15 @@ type executionDetailsBuilder struct { db isql.DB jobID jobspb.JobID registry *jobs.Registry + cv clusterversion.Handle } // makeJobProfilerExecutionDetailsBuilder returns an instance of an executionDetailsBuilder. func makeJobProfilerExecutionDetailsBuilder( - srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, registry *jobs.Registry, + srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, registry *jobs.Registry, cv clusterversion.Handle, ) executionDetailsBuilder { e := executionDetailsBuilder{ - srv: srv, db: db, jobID: jobID, registry: registry, + srv: srv, db: db, jobID: jobID, registry: registry, cv: cv, } return e } @@ -140,7 +141,7 @@ func (e *executionDetailsBuilder) addLabelledGoroutines(ctx context.Context) { } filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00")) if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, txn, e.jobID) + return jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, txn, e.jobID, e.cv) }); err != nil { log.Errorf(ctx, "failed to write goroutine for job %d: %v", e.jobID, err.Error()) } @@ -161,7 +162,7 @@ func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) { if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { return jobs.WriteExecutionDetailFile(ctx, filename, []byte(fmt.Sprintf(``, dspDiagramURL)), - txn, e.jobID) + txn, e.jobID, e.cv) }); err != nil { log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error()) } @@ -172,7 +173,7 @@ func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) { // that captures the active tracing spans of a job on all nodes in the cluster. func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) { z := zipper.MakeInternalExecutorInflightTraceZipper(e.db.Executor()) - traceID, err := jobs.GetJobTraceID(ctx, e.db, e.jobID) + traceID, err := jobs.GetJobTraceID(ctx, e.db, e.jobID, e.cv) if err != nil { log.Warningf(ctx, "failed to fetch job trace ID: %+v", err.Error()) return @@ -185,7 +186,7 @@ func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) { filename := fmt.Sprintf("trace.%s.zip", timeutil.Now().Format("20060102_150405.00")) if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, txn, e.jobID) + return jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, txn, e.jobID, e.cv) }); err != nil { log.Errorf(ctx, "failed to write traces for job %d: %v", e.jobID, err.Error()) } diff --git a/pkg/server/status.go b/pkg/server/status.go index 3c0e09d67144..3d33444a8e0f 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -3943,7 +3943,7 @@ func (s *statusServer) GetJobProfilerExecutionDetails( execCfg := s.sqlServer.execCfg var data []byte if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - data, err = jobs.ReadExecutionDetailFile(ctx, req.Filename, txn, jobID) + data, err = jobs.ReadExecutionDetailFile(ctx, req.Filename, txn, jobID, execCfg.Settings.Version) return err }); err != nil { return nil, err @@ -3964,7 +3964,7 @@ func (s *statusServer) ListJobProfilerExecutionDetails( jobID := jobspb.JobID(req.JobId) execCfg := s.sqlServer.execCfg - files, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID) + files, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID, execCfg.Settings.Version) if err != nil { return nil, err } diff --git a/pkg/sql/importer/import_processor_planning.go b/pkg/sql/importer/import_processor_planning.go index b978a01450cf..979ee5ddf76c 100644 --- a/pkg/sql/importer/import_processor_planning.go +++ b/pkg/sql/importer/import_processor_planning.go @@ -270,7 +270,7 @@ func distImport( } execCfg := execCtx.ExecCfg() - jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, job.ID()) + jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, job.ID(), execCtx.ExecCfg().Settings.Version) // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx diff --git a/pkg/sql/jobs_profiler_execution_details.go b/pkg/sql/jobs_profiler_execution_details.go index d4e43b966f46..62a73977bef9 100644 --- a/pkg/sql/jobs_profiler_execution_details.go +++ b/pkg/sql/jobs_profiler_execution_details.go @@ -43,9 +43,9 @@ func (p *planner) GenerateExecutionDetailsJSON( payload := j.Payload() switch payload.Type() { case jobspb.TypeBackup: - executionDetailsJSON, err = constructBackupExecutionDetails(ctx, jobID, execCfg.InternalDB) + executionDetailsJSON, err = constructBackupExecutionDetails(ctx, jobID, execCfg.InternalDB, execCfg.Settings.Version) default: - executionDetailsJSON, err = constructDefaultExecutionDetails(ctx, jobID, execCfg.InternalDB) + executionDetailsJSON, err = constructDefaultExecutionDetails(ctx, jobID, execCfg.InternalDB, execCfg.Settings.Version) } return executionDetailsJSON, err @@ -60,12 +60,12 @@ type defaultExecutionDetails struct { } func constructDefaultExecutionDetails( - ctx context.Context, jobID jobspb.JobID, db isql.DB, + ctx context.Context, jobID jobspb.JobID, db isql.DB, cv clusterversion.Handle, ) ([]byte, error) { executionDetails := &defaultExecutionDetails{} err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { // Populate the latest DSP diagram URL. - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) err := infoStorage.GetLast(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, func(infoKey string, value []byte) error { executionDetails.PlanDiagram = string(value) return nil @@ -94,13 +94,13 @@ type backupExecutionDetails struct { } func constructBackupExecutionDetails( - ctx context.Context, jobID jobspb.JobID, db isql.DB, + ctx context.Context, jobID jobspb.JobID, db isql.DB, cv clusterversion.Handle, ) ([]byte, error) { var annotatedURL url.URL marshallablePerComponentProgress := make(map[string]float32) if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { // Read the latest DistSQL diagram. - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) var distSQLURL string if err := infoStorage.GetLast(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, func(infoKey string, value []byte) error { distSQLURL = string(value) diff --git a/pkg/sql/jobs_profiler_execution_details_test.go b/pkg/sql/jobs_profiler_execution_details_test.go index 540f0754b5a6..3f7392fb3da4 100644 --- a/pkg/sql/jobs_profiler_execution_details_test.go +++ b/pkg/sql/jobs_profiler_execution_details_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" @@ -89,11 +90,16 @@ func (d fakeExecResumer) CollectProfile(_ context.Context, _ interface{}) error // checkForPlanDiagram is a method used in tests to wait for the existence of a // DSP diagram for the provided jobID. func checkForPlanDiagrams( - ctx context.Context, t *testing.T, db isql.DB, jobID jobspb.JobID, expectedNumDiagrams int, + ctx context.Context, + t *testing.T, + db isql.DB, + jobID jobspb.JobID, + expectedNumDiagrams int, + cv clusterversion.Handle, ) { testutils.SucceedsSoon(t, func() error { return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) var found int err := infoStorage.Iterate(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, func(infoKey string, value []byte) error { @@ -134,8 +140,9 @@ func TestShowJobsWithExecutionDetails(t *testing.T) { p := sql.PhysicalPlan{} infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) p.PhysicalInfrastructure = infra - jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) - checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1) + jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID(), + s.ClusterSettings().Version) + checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1, s.ClusterSettings().Version) return nil }, } @@ -185,8 +192,8 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) { p := sql.PhysicalPlan{} infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) p.PhysicalInfrastructure = infra - jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) - checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1) + jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID(), s.ClusterSettings().Version) + checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1, s.ClusterSettings().Version) isRunning <- struct{}{} <-continueRunning return nil @@ -348,8 +355,9 @@ func TestListProfilerExecutionDetails(t *testing.T) { p := sql.PhysicalPlan{} infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) p.PhysicalInfrastructure = infra - jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) - checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), expectedDiagrams) + jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID(), + execCfg.Settings.Version) + checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), expectedDiagrams, s.ClusterSettings().Version) writtenDiagram <- struct{}{} <-continueCh if err := execCfg.JobRegistry.CheckPausepoint("fakeresumer.pause"); err != nil { diff --git a/pkg/upgrade/upgrademanager/manager_external_test.go b/pkg/upgrade/upgrademanager/manager_external_test.go index cdf0888572e5..afceddace324 100644 --- a/pkg/upgrade/upgrademanager/manager_external_test.go +++ b/pkg/upgrade/upgrademanager/manager_external_test.go @@ -165,7 +165,7 @@ func TestAlreadyRunningJobsAreHandledProperly(t *testing.T) { RETURNING id;`, firstID).Scan(&secondID)) // Insert the job payload and progress into the `system.job_info` table. err := tc.Server(0).InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, secondID) + infoStorage := jobs.InfoStorageForJob(txn, secondID, tc.Server(0).ClusterSettings().Version) if err := infoStorage.WriteLegacyPayload(ctx, firstPayload); err != nil { return err } diff --git a/pkg/util/bulk/BUILD.bazel b/pkg/util/bulk/BUILD.bazel index d568d5f73895..de0cd7287b6c 100644 --- a/pkg/util/bulk/BUILD.bazel +++ b/pkg/util/bulk/BUILD.bazel @@ -11,6 +11,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/sql/execinfrapb", diff --git a/pkg/util/bulk/aggregator_stats.go b/pkg/util/bulk/aggregator_stats.go index 3d16b2690245..54079c10d269 100644 --- a/pkg/util/bulk/aggregator_stats.go +++ b/pkg/util/bulk/aggregator_stats.go @@ -16,6 +16,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -85,6 +86,7 @@ func FlushTracingAggregatorStats( jobID jobspb.JobID, db isql.DB, perNodeAggregatorStats ComponentAggregatorStats, + cv clusterversion.Handle, ) error { return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { clusterWideAggregatorStats := make(map[string]TracingAggregatorEvent) @@ -108,7 +110,7 @@ func FlushTracingAggregatorStats( continue } - if err := jobs.WriteProtobinExecutionDetailFile(ctx, filename, msg, txn, jobID); err != nil { + if err := jobs.WriteProtobinExecutionDetailFile(ctx, filename, msg, txn, jobID, cv); err != nil { return err } @@ -135,6 +137,6 @@ func FlushTracingAggregatorStats( } filename := fmt.Sprintf("aggregatorstats.%s.txt", asOf) - return jobs.WriteExecutionDetailFile(ctx, filename, clusterWideSummary.Bytes(), txn, jobID) + return jobs.WriteExecutionDetailFile(ctx, filename, clusterWideSummary.Bytes(), txn, jobID, cv) }) } From fe6c40e23ae8670b90a62fb235fbef81e1cd0298 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 6 Nov 2023 14:18:36 +0000 Subject: [PATCH 2/5] jobs: only force jobs.MaybeGenerateForcedRetryableError in 23.1 Release note (bug fix): fixed a bug that could cause 23.1 nodes in clusters which had not finalized the 23.1 version upgrade to use excessive CPU retrying expected errors related to the incomplete upgrade state. --- pkg/jobs/job_info_storage.go | 2 +- pkg/jobs/utils.go | 12 +++++++++--- pkg/sql/crdb_internal.go | 4 ++-- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/jobs/job_info_storage.go b/pkg/jobs/job_info_storage.go index c249d9d95754..c91199484596 100644 --- a/pkg/jobs/job_info_storage.go +++ b/pkg/jobs/job_info_storage.go @@ -238,7 +238,7 @@ func (i InfoStorage) Write(ctx context.Context, infoKey string, value []byte) er return errors.AssertionFailedf("missing value (infoKey %q)", infoKey) } if err := i.write(ctx, infoKey, value); err != nil { - return MaybeGenerateForcedRetryableError(ctx, i.txn.KV(), err) + return MaybeGenerateForcedRetryableError(ctx, i.txn.KV(), err, i.cv) } return nil } diff --git a/pkg/jobs/utils.go b/pkg/jobs/utils.go index e9479baad86e..788a9d22e696 100644 --- a/pkg/jobs/utils.go +++ b/pkg/jobs/utils.go @@ -210,12 +210,18 @@ func isJobInfoTableDoesNotExistError(err error) bool { // txn is pushed to a higher timestamp at which the upgrade will have completed // and the table/column will be visible. The longer term fix is being tracked in // https://github.com/cockroachdb/cockroach/issues/106764. -func MaybeGenerateForcedRetryableError(ctx context.Context, txn *kv.Txn, err error) error { - if err != nil && isJobTypeColumnDoesNotExistError(err) { +func MaybeGenerateForcedRetryableError( + ctx context.Context, txn *kv.Txn, err error, cv clusterversion.Handle, +) error { + if err == nil || !cv.IsActive(ctx, clusterversion.V23_1) { + return err + } + + if isJobTypeColumnDoesNotExistError(err) { return txn.GenerateForcedRetryableErr(ctx, "synthetic error "+ "to push timestamp to after the `job_type` upgrade has run") } - if err != nil && isJobInfoTableDoesNotExistError(err) { + if isJobInfoTableDoesNotExistError(err) { return txn.GenerateForcedRetryableErr(ctx, "synthetic error "+ "to push timestamp to after the `job_info` upgrade has run") } diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 7718ee51b718..a7e1ca9da5b3 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1081,7 +1081,7 @@ func populateSystemJobsTableRows( params..., ) if err != nil { - return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err) + return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err, p.execCfg.Settings.Version) } cleanup := func(ctx context.Context) { @@ -1094,7 +1094,7 @@ func populateSystemJobsTableRows( for { hasNext, err := it.Next(ctx) if !hasNext || err != nil { - return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err) + return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err, p.execCfg.Settings.Version) } currentRow := it.Cur() From 308b2472abbcbd2d9f500292feb3e9e0a3086261 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Mon, 6 Nov 2023 14:20:46 +0000 Subject: [PATCH 3/5] jobs: only store 23.1 debugging info after 23.1 upgrade Release note: none. Epic: none. --- pkg/jobs/jobsprofiler/profiler.go | 7 +++++++ pkg/server/autoconfig/task_markers.go | 12 ++++++++++-- pkg/server/job_profiler.go | 6 +++++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/pkg/jobs/jobsprofiler/profiler.go b/pkg/jobs/jobsprofiler/profiler.go index 4bd4ec21b1e3..5353c01b9584 100644 --- a/pkg/jobs/jobsprofiler/profiler.go +++ b/pkg/jobs/jobsprofiler/profiler.go @@ -37,6 +37,10 @@ func StorePlanDiagram( jobID jobspb.JobID, cv clusterversion.Handle, ) { + if !cv.IsActive(ctx, clusterversion.V23_1) { + return + } + if err := stopper.RunAsyncTask(ctx, "jobs-store-plan-diagram", func(ctx context.Context) { var cancel func() ctx, cancel = stopper.WithCancelOnQuiesce(ctx) @@ -78,6 +82,9 @@ func StorePerNodeProcessorProgressFraction( perComponentProgress map[execinfrapb.ComponentID]float32, cv clusterversion.Handle, ) { + if !cv.IsActive(ctx, clusterversion.V23_1) { + return + } if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) for componentID, fraction := range perComponentProgress { diff --git a/pkg/server/autoconfig/task_markers.go b/pkg/server/autoconfig/task_markers.go index 17071b42b255..b7f3d62178de 100644 --- a/pkg/server/autoconfig/task_markers.go +++ b/pkg/server/autoconfig/task_markers.go @@ -142,7 +142,11 @@ func (tr *InfoKeyTaskRef) decodeInternal(prefix, infoKey string) error { // writeStartMarker writes a start marker for the given task ID and // also writes its job ID into the value part. func writeStartMarker( - ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, jobID jobspb.JobID, cv clusterversion.Handle, + ctx context.Context, + txn isql.Txn, + taskRef InfoKeyTaskRef, + jobID jobspb.JobID, + cv clusterversion.Handle, ) error { infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) return infoStorage.Write(ctx, @@ -209,7 +213,11 @@ func getLastCompletedTaskID( // markTaskCompletes transactionally removes the task's start marker // and creates a completion marker. func markTaskComplete( - ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, completionValue []byte, cv clusterversion.Handle, + ctx context.Context, + txn isql.Txn, + taskRef InfoKeyTaskRef, + completionValue []byte, + cv clusterversion.Handle, ) error { infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) diff --git a/pkg/server/job_profiler.go b/pkg/server/job_profiler.go index 6c4daeb0f43a..296d5ebb1b63 100644 --- a/pkg/server/job_profiler.go +++ b/pkg/server/job_profiler.go @@ -116,7 +116,11 @@ type executionDetailsBuilder struct { // makeJobProfilerExecutionDetailsBuilder returns an instance of an executionDetailsBuilder. func makeJobProfilerExecutionDetailsBuilder( - srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, registry *jobs.Registry, cv clusterversion.Handle, + srv serverpb.SQLStatusServer, + db isql.DB, + jobID jobspb.JobID, + registry *jobs.Registry, + cv clusterversion.Handle, ) executionDetailsBuilder { e := executionDetailsBuilder{ srv: srv, db: db, jobID: jobID, registry: registry, cv: cv, From a65ede5ebdedaefbe18f88537aa9ca656af7f973 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Mon, 6 Nov 2023 11:02:06 -0800 Subject: [PATCH 4/5] build: add BranchReleaseSeries This change adds `build.BranchReleaseSeries()` which returns the major and minor in `version.txt`. This will be used to know the current release series when the latest `clusterversion` is not finalized. We also clean up the code a bit: we separate the variables that are overridden by Bazel, and we use a different variable for the testing override (to make things more clear). Epic: none Release note: None --- pkg/build/info.go | 72 ++++++++++++++++++++++++++++-------------- pkg/build/info_test.go | 5 +-- 2 files changed, 52 insertions(+), 25 deletions(-) diff --git a/pkg/build/info.go b/pkg/build/info.go index 8d0a7cffc9b7..7fb99d680e41 100644 --- a/pkg/build/info.go +++ b/pkg/build/info.go @@ -28,24 +28,34 @@ import ( // with the string passed to the linker in the root Makefile. const TimeFormat = "2006/01/02 15:04:05" +// These variables are initialized by Bazel via the linker -X flag +// when compiling release binaries. var ( - // These variables are initialized by Bazel via the linker -X flag - // when compiling release binaries. utcTime string // Build time in UTC (year/month/day hour:min:sec) rev string // SHA-1 of this build (git rev-parse) buildTagOverride string - cgoCompiler = cgoVersion() cgoTargetTriple string - platform = fmt.Sprintf("%s %s", runtime.GOOS, runtime.GOARCH) - // Distribution is changed by the CCL init-time hook in non-APL builds. - Distribution = "OSS" - typ string // Type of this build: , "development", or "release" - channel string + typ string // Type of this build: , "development", or "release" + channel string +) + +// Distribution is changed by the CCL init-time hook in non-APL builds. +var Distribution = "OSS" + +var ( + cgoCompiler = cgoVersion() + platform = fmt.Sprintf("%s %s", runtime.GOOS, runtime.GOARCH) envChannel = envutil.EnvOrDefaultString("COCKROACH_CHANNEL", "unknown") enabledAssertions = buildutil.CrdbTestBuild +) + +var ( //go:embed version.txt - cockroachVersion string - binaryVersion = computeBinaryVersion(cockroachVersion, rev) + versionTxt string + parsedVersionTxt *version.Version = parseCockroachVersion(versionTxt) + binaryVersion string = computeBinaryVersion(buildTagOverride, parsedVersionTxt, rev) + // binaryVersionTestingOverride is modified by TestingOverrideVersion. + binaryVersionTestingOverride string ) const ( @@ -64,38 +74,54 @@ func SeemsOfficial() bool { return channel == DefaultTelemetryChannel || channel == FIPSTelemetryChannel } -func computeBinaryVersion(versionTxt, revision string) string { - if buildTagOverride != "" { - return buildTagOverride - } +func parseCockroachVersion(versionTxt string) *version.Version { txt := strings.TrimSuffix(versionTxt, "\n") v, err := version.Parse(txt) if err != nil { panic(fmt.Errorf("could not parse version.txt: %w", err)) } + return v +} + +func computeBinaryVersion( + buildTagOverride string, parsedVersionTxt *version.Version, revision string, +) string { + if buildTagOverride != "" { + return buildTagOverride + } if IsRelease() { - return v.String() + return parsedVersionTxt.String() } if revision != "" { - return fmt.Sprintf("%s-dev-%s", v.String(), revision) + return fmt.Sprintf("%s-dev-%s", parsedVersionTxt.String(), revision) } - return fmt.Sprintf("%s-dev", v.String()) + return fmt.Sprintf("%s-dev", parsedVersionTxt.String()) } -// BinaryVersion returns the version prefix, patch number and metadata of the current build. +// BinaryVersion returns the version prefix, patch number and metadata of the +// current build. func BinaryVersion() string { + if binaryVersionTestingOverride != "" { + return binaryVersionTestingOverride + } return binaryVersion } // BinaryVersionPrefix returns the version prefix of the current build. func BinaryVersionPrefix() string { - v, err := version.Parse(binaryVersion) + v, err := version.Parse(BinaryVersion()) if err != nil { return "dev" } return fmt.Sprintf("v%d.%d", v.Major(), v.Minor()) } +// BranchReleaseSeries returns tha major and minor in version.txt, without +// allowing for any overrides. +func BranchReleaseSeries() (major, minor int) { + return parsedVersionTxt.Major(), parsedVersionTxt.Minor() +} + func init() { // Allow tests to override the version.txt contents. if versionOverride := envutil.EnvOrDefaultString( @@ -161,7 +187,7 @@ func GetInfo() Info { } return Info{ GoVersion: runtime.Version(), - Tag: binaryVersion, + Tag: BinaryVersion(), Time: utcTime, Revision: rev, CgoCompiler: cgoCompiler, @@ -178,9 +204,9 @@ func GetInfo() Info { // TestingOverrideVersion allows tests to override the binary version // reported by cockroach. func TestingOverrideVersion(v string) func() { - prevBinaryVersion := binaryVersion - binaryVersion = v - return func() { binaryVersion = prevBinaryVersion } + prevOverride := binaryVersionTestingOverride + binaryVersionTestingOverride = v + return func() { binaryVersionTestingOverride = prevOverride } } // MakeIssueURL produces a URL to a CockroachDB issue. diff --git a/pkg/build/info_test.go b/pkg/build/info_test.go index d13bdec25193..c25fd57250f0 100644 --- a/pkg/build/info_test.go +++ b/pkg/build/info_test.go @@ -74,9 +74,10 @@ func TestComputeBinaryVersion(t *testing.T) { defer func() { typ = oldBuildType }() if tc.panicExpected { - require.Panics(t, func() { computeBinaryVersion(tc.versionTxt, tc.revision) }) + require.Panics(t, func() { parseCockroachVersion(tc.versionTxt) }) } else { - actualVersion := computeBinaryVersion(tc.versionTxt, tc.revision) + v := parseCockroachVersion(tc.versionTxt) + actualVersion := computeBinaryVersion("" /* buildTagOverride */, v, tc.revision) require.Equal(t, tc.expectedVersion, actualVersion) } }) From 6c597f698f419416fc3059faa7fadec3ab1df6f5 Mon Sep 17 00:00:00 2001 From: Andrii Vorobiov Date: Tue, 24 Oct 2023 13:51:15 +0300 Subject: [PATCH 5/5] ui: enable Forward button to set timewindow for latest NOW value Before, "Forward" button on Time selector component allowed to select next time window if there's enough space for full increment (ie with 10 min time window, it wasn't possible to move forward if current end time is less that Now() - 10min). It caused misalignment where Forward button became disabled even if there's some more data to display. This change handles this case and updates current time window to current time (executes the same logic as Now button). Resolves: #112847 Release note (ui change): Forward button on time selector allows to select latest available timewindow (the same as with "Now" button) Release justification: low risk, high benefit changes to existing functionality --- .../src/timeScaleDropdown/timeFrameControls.tsx | 9 +++++---- .../src/timeScaleDropdown/timeScaleDropdown.spec.tsx | 2 -- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/ui/workspaces/cluster-ui/src/timeScaleDropdown/timeFrameControls.tsx b/pkg/ui/workspaces/cluster-ui/src/timeScaleDropdown/timeFrameControls.tsx index 390926710de9..a6b335bd69a0 100644 --- a/pkg/ui/workspaces/cluster-ui/src/timeScaleDropdown/timeFrameControls.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/timeScaleDropdown/timeFrameControls.tsx @@ -38,7 +38,7 @@ export const TimeFrameControls = ({ onArrowClick(direction); const left = disabledArrows.includes(ArrowDirection.LEFT); - const right = disabledArrows.includes(ArrowDirection.RIGHT); + const canForward = !disabledArrows.includes(ArrowDirection.RIGHT); const delay = 0.3; return ( @@ -66,9 +66,10 @@ export const TimeFrameControls = ({ mouseLeaveDelay={delay} >