diff --git a/pkg/build/info.go b/pkg/build/info.go index 8d0a7cffc9b7..7fb99d680e41 100644 --- a/pkg/build/info.go +++ b/pkg/build/info.go @@ -28,24 +28,34 @@ import ( // with the string passed to the linker in the root Makefile. const TimeFormat = "2006/01/02 15:04:05" +// These variables are initialized by Bazel via the linker -X flag +// when compiling release binaries. var ( - // These variables are initialized by Bazel via the linker -X flag - // when compiling release binaries. utcTime string // Build time in UTC (year/month/day hour:min:sec) rev string // SHA-1 of this build (git rev-parse) buildTagOverride string - cgoCompiler = cgoVersion() cgoTargetTriple string - platform = fmt.Sprintf("%s %s", runtime.GOOS, runtime.GOARCH) - // Distribution is changed by the CCL init-time hook in non-APL builds. - Distribution = "OSS" - typ string // Type of this build: , "development", or "release" - channel string + typ string // Type of this build: , "development", or "release" + channel string +) + +// Distribution is changed by the CCL init-time hook in non-APL builds. +var Distribution = "OSS" + +var ( + cgoCompiler = cgoVersion() + platform = fmt.Sprintf("%s %s", runtime.GOOS, runtime.GOARCH) envChannel = envutil.EnvOrDefaultString("COCKROACH_CHANNEL", "unknown") enabledAssertions = buildutil.CrdbTestBuild +) + +var ( //go:embed version.txt - cockroachVersion string - binaryVersion = computeBinaryVersion(cockroachVersion, rev) + versionTxt string + parsedVersionTxt *version.Version = parseCockroachVersion(versionTxt) + binaryVersion string = computeBinaryVersion(buildTagOverride, parsedVersionTxt, rev) + // binaryVersionTestingOverride is modified by TestingOverrideVersion. + binaryVersionTestingOverride string ) const ( @@ -64,38 +74,54 @@ func SeemsOfficial() bool { return channel == DefaultTelemetryChannel || channel == FIPSTelemetryChannel } -func computeBinaryVersion(versionTxt, revision string) string { - if buildTagOverride != "" { - return buildTagOverride - } +func parseCockroachVersion(versionTxt string) *version.Version { txt := strings.TrimSuffix(versionTxt, "\n") v, err := version.Parse(txt) if err != nil { panic(fmt.Errorf("could not parse version.txt: %w", err)) } + return v +} + +func computeBinaryVersion( + buildTagOverride string, parsedVersionTxt *version.Version, revision string, +) string { + if buildTagOverride != "" { + return buildTagOverride + } if IsRelease() { - return v.String() + return parsedVersionTxt.String() } if revision != "" { - return fmt.Sprintf("%s-dev-%s", v.String(), revision) + return fmt.Sprintf("%s-dev-%s", parsedVersionTxt.String(), revision) } - return fmt.Sprintf("%s-dev", v.String()) + return fmt.Sprintf("%s-dev", parsedVersionTxt.String()) } -// BinaryVersion returns the version prefix, patch number and metadata of the current build. +// BinaryVersion returns the version prefix, patch number and metadata of the +// current build. func BinaryVersion() string { + if binaryVersionTestingOverride != "" { + return binaryVersionTestingOverride + } return binaryVersion } // BinaryVersionPrefix returns the version prefix of the current build. func BinaryVersionPrefix() string { - v, err := version.Parse(binaryVersion) + v, err := version.Parse(BinaryVersion()) if err != nil { return "dev" } return fmt.Sprintf("v%d.%d", v.Major(), v.Minor()) } +// BranchReleaseSeries returns tha major and minor in version.txt, without +// allowing for any overrides. +func BranchReleaseSeries() (major, minor int) { + return parsedVersionTxt.Major(), parsedVersionTxt.Minor() +} + func init() { // Allow tests to override the version.txt contents. if versionOverride := envutil.EnvOrDefaultString( @@ -161,7 +187,7 @@ func GetInfo() Info { } return Info{ GoVersion: runtime.Version(), - Tag: binaryVersion, + Tag: BinaryVersion(), Time: utcTime, Revision: rev, CgoCompiler: cgoCompiler, @@ -178,9 +204,9 @@ func GetInfo() Info { // TestingOverrideVersion allows tests to override the binary version // reported by cockroach. func TestingOverrideVersion(v string) func() { - prevBinaryVersion := binaryVersion - binaryVersion = v - return func() { binaryVersion = prevBinaryVersion } + prevOverride := binaryVersionTestingOverride + binaryVersionTestingOverride = v + return func() { binaryVersionTestingOverride = prevOverride } } // MakeIssueURL produces a URL to a CockroachDB issue. diff --git a/pkg/build/info_test.go b/pkg/build/info_test.go index d13bdec25193..c25fd57250f0 100644 --- a/pkg/build/info_test.go +++ b/pkg/build/info_test.go @@ -74,9 +74,10 @@ func TestComputeBinaryVersion(t *testing.T) { defer func() { typ = oldBuildType }() if tc.panicExpected { - require.Panics(t, func() { computeBinaryVersion(tc.versionTxt, tc.revision) }) + require.Panics(t, func() { parseCockroachVersion(tc.versionTxt) }) } else { - actualVersion := computeBinaryVersion(tc.versionTxt, tc.revision) + v := parseCockroachVersion(tc.versionTxt) + actualVersion := computeBinaryVersion("" /* buildTagOverride */, v, tc.revision) require.Equal(t, tc.expectedVersion, actualVersion) } }) diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 2830c0cd898b..704b4c008f97 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -262,7 +262,7 @@ func backup( return nil } jobsprofiler.StorePerNodeProcessorProgressFraction( - ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog) + ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog, execCtx.ExecCfg().Settings.Version) case <-ctx.Done(): return ctx.Err() } @@ -2095,7 +2095,7 @@ func (b *backupResumer) CollectProfile(ctx context.Context, execCtx interface{}) aggStatsCopy = b.mu.perNodeAggregatorStats.DeepCopy() }() return bulkutil.FlushTracingAggregatorStats(ctx, b.job.ID(), - p.ExecCfg().InternalDB, aggStatsCopy) + p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version) } func (b *backupResumer) deleteCheckpoint( diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 71ea019d579d..7ecbfedf4488 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -219,7 +219,7 @@ func distBackup( defer close(progCh) defer close(tracingAggCh) execCfg := execCtx.ExecCfg() - jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID) + jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID, execCfg.Settings.Version) // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index bdf7d985858d..1a07c4252260 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -2544,7 +2544,7 @@ func (r *restoreResumer) CollectProfile(ctx context.Context, execCtx interface{} aggStatsCopy = r.mu.perNodeAggregatorStats.DeepCopy() }() return bulkutil.FlushTracingAggregatorStats(ctx, r.job.ID(), - p.ExecCfg().InternalDB, aggStatsCopy) + p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version) } // dropDescriptors implements the OnFailOrCancel logic. diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 58a24e1c39db..0f8d8dac1483 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -317,7 +317,7 @@ func distRestore( defer recv.Release() execCfg := execCtx.ExecCfg() - jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, md.jobID) + jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, md.jobID, execCfg.Settings.Version) // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 3f918fabc3d2..471383efe1ca 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -303,7 +303,7 @@ func startDistChangefeed( finishedSetupFn = func(flowinfra.Flow) { resultsCh <- tree.Datums(nil) } } - jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID) + jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID, execCfg.Settings.Version) // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx diff --git a/pkg/ccl/streamingccl/replicationutils/BUILD.bazel b/pkg/ccl/streamingccl/replicationutils/BUILD.bazel index b76b53dca6c8..1c448dcb0786 100644 --- a/pkg/ccl/streamingccl/replicationutils/BUILD.bazel +++ b/pkg/ccl/streamingccl/replicationutils/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/kv/kvpb", diff --git a/pkg/ccl/streamingccl/replicationutils/utils.go b/pkg/ccl/streamingccl/replicationutils/utils.go index 49d0a3393db6..f743b9a27115 100644 --- a/pkg/ccl/streamingccl/replicationutils/utils.go +++ b/pkg/ccl/streamingccl/replicationutils/utils.go @@ -14,6 +14,7 @@ import ( "fmt" "testing" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -174,9 +175,9 @@ func ReplicatedTimeFromProgress(p *jobspb.Progress) hlc.Timestamp { // LoadIngestionProgress loads the latest persisted stream ingestion progress. // The method returns nil if the progress does not exist yet. func LoadIngestionProgress( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) (*jobspb.StreamIngestionProgress, error) { - progress, err := jobs.LoadJobProgress(ctx, db, jobID) + progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv) if err != nil || progress == nil { return nil, err } @@ -192,9 +193,9 @@ func LoadIngestionProgress( // LoadReplicationProgress loads the latest persisted stream replication progress. // The method returns nil if the progress does not exist yet. func LoadReplicationProgress( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) (*jobspb.StreamReplicationProgress, error) { - progress, err := jobs.LoadJobProgress(ctx, db, jobID) + progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv) if err != nil || progress == nil { return nil, err } diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 6dcc2ec085f6..8573320823ba 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "//pkg/cloud", "//pkg/cloud/externalconn", "//pkg/cloud/externalconn/connectionpb", + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobsprofiler", diff --git a/pkg/ccl/streamingccl/streamingest/replication_execution_details.go b/pkg/ccl/streamingccl/streamingest/replication_execution_details.go index 3a6dda6e13c6..a8d6b6806b4b 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_execution_details.go +++ b/pkg/ccl/streamingccl/streamingest/replication_execution_details.go @@ -16,6 +16,7 @@ import ( "text/tabwriter" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -104,7 +105,11 @@ func constructSpanFrontierExecutionDetails( // - The snapshot of the frontier tracking how far each span has been replicated // up to. func generateSpanFrontierExecutionDetailFile( - ctx context.Context, execCfg *sql.ExecutorConfig, ingestionJobID jobspb.JobID, skipBehindBy bool, + ctx context.Context, + execCfg *sql.ExecutorConfig, + ingestionJobID jobspb.JobID, + skipBehindBy bool, + cv clusterversion.Handle, ) error { return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { var sb bytes.Buffer @@ -112,7 +117,7 @@ func generateSpanFrontierExecutionDetailFile( // Read the StreamIngestionPartitionSpecs to get a mapping from spans to // their source and destination SQL instance IDs. - specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID) + specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID, cv) if err != nil { return err } @@ -124,7 +129,7 @@ func generateSpanFrontierExecutionDetailFile( // Now, read the latest snapshot of the frontier that tells us what // timestamp each span has been replicated up to. - frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID) + frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID, cv) if err != nil { return err } @@ -157,7 +162,7 @@ func generateSpanFrontierExecutionDetailFile( if err := w.Flush(); err != nil { return err } - return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID) + return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID, cv) }) } @@ -170,6 +175,7 @@ func persistStreamIngestionPartitionSpecs( execCfg *sql.ExecutorConfig, ingestionJobID jobspb.JobID, streamIngestionSpecs map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec, + cv clusterversion.Handle, ) error { err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { specs := make([]*execinfrapb.StreamIngestionPartitionSpec, 0) @@ -183,7 +189,7 @@ func persistStreamIngestionPartitionSpecs( if err != nil { return err } - return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID) + return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID, cv) }) if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.AfterPersistingPartitionSpecs != nil { knobs.AfterPersistingPartitionSpecs() diff --git a/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go b/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go index e1dc2a7f3dcf..6cb628b77892 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_execution_details_test.go @@ -344,7 +344,7 @@ func TestEndToEndFrontierExecutionDetailFile(t *testing.T) { ingestionJobID := jobspb.JobID(123) require.NoError(t, persistStreamIngestionPartitionSpecs(ctx, &execCfg, - ingestionJobID, streamIngestionsSpecs)) + ingestionJobID, streamIngestionsSpecs, execCfg.Settings.Version)) // Now, let's persist some frontier entries. frontierEntries := execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{ @@ -369,9 +369,9 @@ func TestEndToEndFrontierExecutionDetailFile(t *testing.T) { frontierBytes, err := protoutil.Marshal(&frontierEntries) require.NoError(t, err) require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, ingestionJobID) + return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, ingestionJobID, execCfg.Settings.Version) })) - require.NoError(t, generateSpanFrontierExecutionDetailFile(ctx, &execCfg, ingestionJobID, true /* skipBehindBy */)) + require.NoError(t, generateSpanFrontierExecutionDetailFile(ctx, &execCfg, ingestionJobID, true /* skipBehindBy */, execCfg.Settings.Version)) files := listExecutionDetails(t, srv, ingestionJobID) require.Len(t, files, 1) data, err := checkExecutionDetails(t, srv, ingestionJobID, files[0]) diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index 92207d0eb95c..062f64968428 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -1114,12 +1114,14 @@ func TestLoadProducerAndIngestionProgress(t *testing.T) { c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID)) srcDB := c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB - producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID)) + producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID), + c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version) require.NoError(t, err) require.Equal(t, jobspb.StreamReplicationProgress_NOT_FINISHED, producerProgress.StreamIngestionStatus) destDB := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB - ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID)) + ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID), + c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version) require.NoError(t, err) require.Equal(t, jobspb.Replicating, ingestionProgress.ReplicationStatus) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index f20d9ffb6db4..f3f598a4fd90 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -113,7 +113,7 @@ func startDistIngestion( return errors.Wrap(err, "failed to update job progress") } jobsprofiler.StorePlanDiagram(ctx, execCtx.ExecCfg().DistSQLSrv.Stopper, planner.initialPlan, execCtx.ExecCfg().InternalDB, - ingestionJob.ID()) + ingestionJob.ID(), execCtx.ExecCfg().Settings.Version) replanOracle := sql.ReplanOnCustomFunc( measurePlanChange, @@ -463,7 +463,7 @@ func (p *replicationFlowPlanner) constructPlanGenerator( if !p.createdInitialPlan() { // Only persist the initial plan as it's the only plan that actually gets // executed. - if err := persistStreamIngestionPartitionSpecs(ctx, execCtx.ExecCfg(), ingestionJobID, streamIngestionSpecs); err != nil { + if err := persistStreamIngestionPartitionSpecs(ctx, execCtx.ExecCfg(), ingestionJobID, streamIngestionSpecs, execCtx.ExecCfg().Settings.Version); err != nil { return nil, nil, err } } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index 55c4ee245b47..a0260874fdc7 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -518,7 +518,7 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error { } if err = sf.FlowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, jobID) + return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, jobID, sf.EvalCtx.Settings.Version) }); err != nil { return err } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 6e23b719969c..d8b952e1001b 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" @@ -224,7 +225,7 @@ func ingestWithRetries( } status := redact.Sprintf("waiting before retrying error: %s", err) updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, status) - newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob) + newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob, execCtx.ExecCfg().Settings.Version) if lastReplicatedTime.Less(newReplicatedTime) { r.Reset() lastReplicatedTime = newReplicatedTime @@ -241,8 +242,10 @@ func ingestWithRetries( return nil } -func loadReplicatedTime(ctx context.Context, db isql.DB, ingestionJob *jobs.Job) hlc.Timestamp { - latestProgress, err := replicationutils.LoadIngestionProgress(ctx, db, ingestionJob.ID()) +func loadReplicatedTime( + ctx context.Context, db isql.DB, ingestionJob *jobs.Job, cv clusterversion.Handle, +) hlc.Timestamp { + latestProgress, err := replicationutils.LoadIngestionProgress(ctx, db, ingestionJob.ID(), cv) if err != nil { log.Warningf(ctx, "error loading job progress: %s", err) return hlc.Timestamp{} @@ -556,11 +559,11 @@ func (s *streamIngestionResumer) CollectProfile(ctx context.Context, execCtx int var combinedErr error if err := bulkutil.FlushTracingAggregatorStats(ctx, s.job.ID(), - p.ExecCfg().InternalDB, aggStatsCopy); err != nil { + p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version); err != nil { combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, "failed to flush aggregator stats")) } if err := generateSpanFrontierExecutionDetailFile(ctx, p.ExecCfg(), - s.job.ID(), false /* skipBehindBy */); err != nil { + s.job.ID(), false /* skipBehindBy */, p.ExecCfg().Settings.Version); err != nil { combinedErr = errors.CombineErrors(combinedErr, errors.Wrap(err, "failed to generate span frontier execution details")) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 889c320e2cc0..89f82054cb9d 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -318,6 +319,7 @@ func newStreamIngestionDataProcessor( cutoverProvider: &cutoverFromJobProgress{ jobID: jobspb.JobID(spec.JobID), db: flowCtx.Cfg.DB, + cv: flowCtx.Cfg.Settings.Version, }, buffer: &streamIngestionBuffer{}, cutoverCh: make(chan struct{}), @@ -1245,10 +1247,11 @@ type cutoverProvider interface { type cutoverFromJobProgress struct { db isql.DB jobID jobspb.JobID + cv clusterversion.Handle } func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, error) { - ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, c.db, c.jobID) + ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, c.db, c.jobID, c.cv) if err != nil { return false, err } diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job.go b/pkg/ccl/streamingccl/streamproducer/producer_job.go index 7c95053cb438..26d7170e2193 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job.go @@ -99,7 +99,7 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er case <-p.timer.Ch(): p.timer.MarkRead() p.timer.Reset(streamingccl.StreamReplicationStreamLivenessTrackFrequency.Get(execCfg.SV())) - progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID()) + progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID(), execCfg.Settings.Version) if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.AfterResumerJobLoad != nil { err = knobs.AfterResumerJobLoad(err) } diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index 7af146b25ade..031d00bf12cc 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -87,7 +87,7 @@ func (r *Registry) maybeDumpTrace(resumerCtx context.Context, resumer Resumer, j r.ID().String(), timeutil.Now().Format("20060102_150405.00")) td := jobspb.TraceData{CollectedSpans: sp.GetConfiguredRecording()} if err := r.db.Txn(dumpCtx, func(ctx context.Context, txn isql.Txn) error { - return WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, txn, jobID) + return WriteProtobinExecutionDetailFile(dumpCtx, resumerTraceFilename, &td, txn, jobID, r.settings.Version) }); err != nil { log.Warning(dumpCtx, "failed to write trace on resumer trace file") } diff --git a/pkg/jobs/execution_detail_utils.go b/pkg/jobs/execution_detail_utils.go index b1a13bb5ff81..49098bbebc5a 100644 --- a/pkg/jobs/execution_detail_utils.go +++ b/pkg/jobs/execution_detail_utils.go @@ -16,6 +16,7 @@ import ( "fmt" "strings" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -46,14 +47,19 @@ func compressChunk(chunkBuf []byte) ([]byte, error) { // forms in the list of files displayed on the jobs' Advanced Debugging // DBConsole page. func WriteProtobinExecutionDetailFile( - ctx context.Context, filename string, msg protoutil.Message, txn isql.Txn, jobID jobspb.JobID, + ctx context.Context, + filename string, + msg protoutil.Message, + txn isql.Txn, + jobID jobspb.JobID, + cv clusterversion.Handle, ) error { name := fmt.Sprintf("%s~%s.binpb", filename, proto.MessageName(msg)) b, err := protoutil.Marshal(msg) if err != nil { return err } - return WriteExecutionDetailFile(ctx, name, b, txn, jobID) + return WriteExecutionDetailFile(ctx, name, b, txn, jobID, cv) } // WriteExecutionDetailFile will chunk and write to the job_info table under the @@ -64,10 +70,15 @@ func WriteProtobinExecutionDetailFile( // This method clears any existing file with the same filename before writing a // new one. func WriteExecutionDetailFile( - ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID, + ctx context.Context, + filename string, + data []byte, + txn isql.Txn, + jobID jobspb.JobID, + cv clusterversion.Handle, ) error { return WriteChunkedFileToJobInfo(ctx, - profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename), data, txn, jobID) + profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename), data, txn, jobID, cv) } // ProtobinExecutionDetailFile interface encapsulates the methods that must be @@ -98,7 +109,7 @@ func stringifyProtobinFile(filename string, fileContents []byte) ([]byte, error) // ReadExecutionDetailFile will stitch together all the chunks corresponding to the // filename and return the uncompressed data of the file. func ReadExecutionDetailFile( - ctx context.Context, filename string, txn isql.Txn, jobID jobspb.JobID, + ctx context.Context, filename string, txn isql.Txn, jobID jobspb.JobID, cv clusterversion.Handle, ) ([]byte, error) { // TODO(adityamaru): If filename=all add logic to zip up all the files corresponding // to the job's execution details and return the zipped bundle instead. @@ -113,6 +124,7 @@ func ReadExecutionDetailFile( profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(trimmedFilename), txn, jobID, + cv, ) if err != nil { return nil, err @@ -125,17 +137,18 @@ func ReadExecutionDetailFile( profilerconstants.MakeProfilerExecutionDetailsChunkKeyPrefix(filename), txn, jobID, + cv, ) } // ListExecutionDetailFiles lists all the files that have been generated as part // of a job's execution details. func ListExecutionDetailFiles( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) ([]string, error) { var res []string if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - jobInfo := InfoStorageForJob(txn, jobID) + jobInfo := InfoStorageForJob(txn, jobID, cv) // Iterate over all the files that have been stored as part of the job's // execution details. diff --git a/pkg/jobs/execution_detail_utils_test.go b/pkg/jobs/execution_detail_utils_test.go index 9c9590945490..fdde967fa206 100644 --- a/pkg/jobs/execution_detail_utils_test.go +++ b/pkg/jobs/execution_detail_utils_test.go @@ -51,7 +51,7 @@ func TestReadWriteListExecutionDetailFiles(t *testing.T) { // Generate random data of size between 15 bytes and 5 MiB. data := make([]byte, 15+rand.Intn(5*1024*1024-15)) randutil.ReadTestdataBytes(rng, data) - err := WriteExecutionDetailFile(ctx, filename, data, txn, jobspb.JobID(123)) + err := WriteExecutionDetailFile(ctx, filename, data, txn, jobspb.JobID(123), ts.ClusterSettings().Version) if err != nil { return err } @@ -59,11 +59,11 @@ func TestReadWriteListExecutionDetailFiles(t *testing.T) { } // Write a binpb format file. - return WriteProtobinExecutionDetailFile(ctx, "testproto", msg, txn, jobspb.JobID(123)) + return WriteProtobinExecutionDetailFile(ctx, "testproto", msg, txn, jobspb.JobID(123), ts.ClusterSettings().Version) })) // List the files. - listedFiles, err := ListExecutionDetailFiles(ctx, ts.InternalDB().(isql.DB), jobspb.JobID(123)) + listedFiles, err := ListExecutionDetailFiles(ctx, ts.InternalDB().(isql.DB), jobspb.JobID(123), ts.ClusterSettings().Version) require.NoError(t, err) require.ElementsMatch(t, append(filenames, "testproto~cockroach.sql.jobs.jobspb.BackupDetails.binpb.txt", @@ -76,7 +76,7 @@ func TestReadWriteListExecutionDetailFiles(t *testing.T) { // Skip the text version of the binpb file. continue } - readData, err := ReadExecutionDetailFile(ctx, filename, txn, jobspb.JobID(123)) + readData, err := ReadExecutionDetailFile(ctx, filename, txn, jobspb.JobID(123), s.ClusterSettings().Version) require.NoError(t, err) if strings.HasSuffix(filename, "binpb") { // For the binpb file, unmarshal the data and compare it to the original message. diff --git a/pkg/jobs/job_info_storage.go b/pkg/jobs/job_info_storage.go index 5a0d9c85f738..c91199484596 100644 --- a/pkg/jobs/job_info_storage.go +++ b/pkg/jobs/job_info_storage.go @@ -14,6 +14,7 @@ import ( "bytes" "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -30,18 +31,19 @@ import ( type InfoStorage struct { j *Job txn isql.Txn + cv clusterversion.Handle } // InfoStorage returns a new InfoStorage with the passed in job and txn. func (j *Job) InfoStorage(txn isql.Txn) InfoStorage { - return InfoStorage{j: j, txn: txn} + return InfoStorage{j: j, txn: txn, cv: j.registry.settings.Version} } // InfoStorageForJob returns a new InfoStorage with the passed in // job ID and txn. It avoids loading the job record. The resulting // job_info writes will not check the job session ID. -func InfoStorageForJob(txn isql.Txn, jobID jobspb.JobID) InfoStorage { - return InfoStorage{j: &Job{id: jobID}, txn: txn} +func InfoStorageForJob(txn isql.Txn, jobID jobspb.JobID, cv clusterversion.Handle) InfoStorage { + return InfoStorage{j: &Job{id: jobID}, txn: txn, cv: cv} } func (i InfoStorage) checkClaimSession(ctx context.Context) error { @@ -236,7 +238,7 @@ func (i InfoStorage) Write(ctx context.Context, infoKey string, value []byte) er return errors.AssertionFailedf("missing value (infoKey %q)", infoKey) } if err := i.write(ctx, infoKey, value); err != nil { - return MaybeGenerateForcedRetryableError(ctx, i.txn.KV(), err) + return MaybeGenerateForcedRetryableError(ctx, i.txn.KV(), err, i.cv) } return nil } diff --git a/pkg/jobs/job_info_utils.go b/pkg/jobs/job_info_utils.go index 9420ed324601..40acc086bcbd 100644 --- a/pkg/jobs/job_info_utils.go +++ b/pkg/jobs/job_info_utils.go @@ -17,6 +17,7 @@ import ( "io" "strings" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/errors" @@ -32,10 +33,15 @@ const finalChunkSuffix = "#_final" // chunks and so if the caller wishes to preserve history they must use a // unique filename. func WriteChunkedFileToJobInfo( - ctx context.Context, filename string, data []byte, txn isql.Txn, jobID jobspb.JobID, + ctx context.Context, + filename string, + data []byte, + txn isql.Txn, + jobID jobspb.JobID, + cv clusterversion.Handle, ) error { finalChunkName := filename + finalChunkSuffix - jobInfo := InfoStorageForJob(txn, jobID) + jobInfo := InfoStorageForJob(txn, jobID, cv) // Clear any existing chunks with the same filename before writing new chunks. // We clear all rows that with info keys in [filename, filename#_final~). @@ -81,12 +87,12 @@ func WriteChunkedFileToJobInfo( // ReadChunkedFileToJobInfo will stitch together all the chunks corresponding to // the filename and return the uncompressed data of the file. func ReadChunkedFileToJobInfo( - ctx context.Context, filename string, txn isql.Txn, jobID jobspb.JobID, + ctx context.Context, filename string, txn isql.Txn, jobID jobspb.JobID, cv clusterversion.Handle, ) ([]byte, error) { buf := bytes.NewBuffer([]byte{}) // Iterate over all the chunks of the requested file and return the unzipped // chunks of data. - jobInfo := InfoStorageForJob(txn, jobID) + jobInfo := InfoStorageForJob(txn, jobID, cv) var lastInfoKey string if err := jobInfo.Iterate(ctx, filename, func(infoKey string, value []byte) error { diff --git a/pkg/jobs/job_info_utils_test.go b/pkg/jobs/job_info_utils_test.go index f16c6a40ba92..92313807ccad 100644 --- a/pkg/jobs/job_info_utils_test.go +++ b/pkg/jobs/job_info_utils_test.go @@ -68,11 +68,11 @@ func TestReadWriteChunkedFileToJobInfo(t *testing.T) { randutil.ReadTestdataBytes(rng, tt.data) } require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123), s.ClusterSettings().Version) if err != nil { return err } - got, err := ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123)) + got, err := ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123), s.ClusterSettings().Version) if err != nil { return err } @@ -133,7 +133,7 @@ func TestOverwriteChunkingWithVariableLengths(t *testing.T) { tt.data = generateData(tt.numChunks) // Write the first file, this will generate a certain number of chunks. require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + return WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123), s.ClusterSettings().Version) })) // Overwrite the file with fewer chunks, this should delete the extra @@ -144,11 +144,11 @@ func TestOverwriteChunkingWithVariableLengths(t *testing.T) { tt.data = generateData(lessChunks) var got []byte require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123), s.ClusterSettings().Version) if err != nil { return err } - got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123)) + got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123), s.ClusterSettings().Version) return err })) require.Equal(t, tt.data, got) @@ -162,11 +162,11 @@ func TestOverwriteChunkingWithVariableLengths(t *testing.T) { tt.data = generateData(moreChunks) var got []byte require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123)) + err := WriteChunkedFileToJobInfo(ctx, tt.name, tt.data, txn, jobspb.JobID(123), s.ClusterSettings().Version) if err != nil { return err } - got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123)) + got, err = ReadChunkedFileToJobInfo(ctx, tt.name, txn, jobspb.JobID(123), s.ClusterSettings().Version) return err })) require.Equal(t, tt.data, got) diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 1d934970f73b..e4b90d3f9b67 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -1124,10 +1124,12 @@ func FormatRetriableExecutionErrorLogToStringArray( } // GetJobTraceID returns the current trace ID of the job from the job progress. -func GetJobTraceID(ctx context.Context, db isql.DB, jobID jobspb.JobID) (tracingpb.TraceID, error) { +func GetJobTraceID( + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, +) (tracingpb.TraceID, error) { var traceID tracingpb.TraceID if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - jobInfo := InfoStorageForJob(txn, jobID) + jobInfo := InfoStorageForJob(txn, jobID, cv) progressBytes, exists, err := jobInfo.GetLegacyProgress(ctx) if err != nil { return err @@ -1151,14 +1153,14 @@ func GetJobTraceID(ctx context.Context, db isql.DB, jobID jobspb.JobID) (tracing // LoadJobProgress returns the job progress from the info table. Note that the // progress can be nil if none is recorded. func LoadJobProgress( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) (*jobspb.Progress, error) { var ( progressBytes []byte exists bool ) if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := InfoStorageForJob(txn, jobID) + infoStorage := InfoStorageForJob(txn, jobID, cv) var err error progressBytes, exists, err = infoStorage.GetLegacyProgress(ctx) return err diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 924b914ec1e0..13092f00a429 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -1193,13 +1193,13 @@ func checkTraceFiles( recordings := make([][]byte, 0) execCfg := s.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig) - edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID) + edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID, execCfg.Settings.Version) require.NoError(t, err) require.Len(t, edFiles, expectedNumFiles) require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { for _, f := range edFiles { - data, err := jobs.ReadExecutionDetailFile(ctx, f, txn, jobID) + data, err := jobs.ReadExecutionDetailFile(ctx, f, txn, jobID, execCfg.Settings.Version) if err != nil { return err } @@ -3691,7 +3691,7 @@ func TestLoadJobProgress(t *testing.T) { _, err := r.CreateJobWithTxn(ctx, rec, 7, nil) require.NoError(t, err) - p, err := jobs.LoadJobProgress(ctx, s.InternalDB().(isql.DB), 7) + p, err := jobs.LoadJobProgress(ctx, s.InternalDB().(isql.DB), 7, s.ClusterSettings().Version) require.NoError(t, err) require.Equal(t, []float32{7.1}, p.GetDetails().(*jobspb.Progress_Import).Import.ReadProgress) } diff --git a/pkg/jobs/jobsprofiler/BUILD.bazel b/pkg/jobs/jobsprofiler/BUILD.bazel index 7a41084f0a78..72f67f0bf3b2 100644 --- a/pkg/jobs/jobsprofiler/BUILD.bazel +++ b/pkg/jobs/jobsprofiler/BUILD.bazel @@ -6,6 +6,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobsprofiler/profilerconstants", diff --git a/pkg/jobs/jobsprofiler/profiler.go b/pkg/jobs/jobsprofiler/profiler.go index fd365d17964c..5353c01b9584 100644 --- a/pkg/jobs/jobsprofiler/profiler.go +++ b/pkg/jobs/jobsprofiler/profiler.go @@ -14,6 +14,7 @@ import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler/profilerconstants" @@ -29,8 +30,17 @@ import ( // table. The generation of the plan diagram and persistence to the info table // are done asynchronously and this method does not block on their completion. func StorePlanDiagram( - ctx context.Context, stopper *stop.Stopper, p *sql.PhysicalPlan, db isql.DB, jobID jobspb.JobID, + ctx context.Context, + stopper *stop.Stopper, + p *sql.PhysicalPlan, + db isql.DB, + jobID jobspb.JobID, + cv clusterversion.Handle, ) { + if !cv.IsActive(ctx, clusterversion.V23_1) { + return + } + if err := stopper.RunAsyncTask(ctx, "jobs-store-plan-diagram", func(ctx context.Context) { var cancel func() ctx, cancel = stopper.WithCancelOnQuiesce(ctx) @@ -45,7 +55,7 @@ func StorePlanDiagram( } dspKey := profilerconstants.MakeDSPDiagramInfoKey(timeutil.Now().UnixNano()) - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) return infoStorage.Write(ctx, dspKey, []byte(diagURL.String())) }) // Don't log the error if the context has been canceled. This will likely be @@ -70,9 +80,13 @@ func StorePerNodeProcessorProgressFraction( db isql.DB, jobID jobspb.JobID, perComponentProgress map[execinfrapb.ComponentID]float32, + cv clusterversion.Handle, ) { + if !cv.IsActive(ctx, clusterversion.V23_1) { + return + } if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) for componentID, fraction := range perComponentProgress { key := profilerconstants.MakeNodeProcessorProgressInfoKey(componentID.FlowID.String(), componentID.SQLInstanceID.String(), componentID.ID) diff --git a/pkg/jobs/jobsprofiler/profiler_test.go b/pkg/jobs/jobsprofiler/profiler_test.go index 161b9f30021f..70e7f14f731c 100644 --- a/pkg/jobs/jobsprofiler/profiler_test.go +++ b/pkg/jobs/jobsprofiler/profiler_test.go @@ -96,7 +96,7 @@ func TestProfilerStorePlanDiagram(t *testing.T) { testutils.SucceedsSoon(t, func() error { var count int err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, execCfg.Settings.Version) return infoStorage.Iterate(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, func(infoKey string, value []byte) error { count++ @@ -143,19 +143,19 @@ func TestStorePerNodeProcessorProgressFraction(t *testing.T) { n2proc1.ID = 1 jobsprofiler.StorePerNodeProcessorProgressFraction(ctx, s.InternalDB().(isql.DB), - jobID, map[execinfrapb.ComponentID]float32{n1proc1: 0.95, n2proc1: 0.50}) + jobID, map[execinfrapb.ComponentID]float32{n1proc1: 0.95, n2proc1: 0.50}, s.ClusterSettings().Version) // Update n2proc1. jobsprofiler.StorePerNodeProcessorProgressFraction(ctx, s.InternalDB().(isql.DB), - jobID, map[execinfrapb.ComponentID]float32{n2proc1: 0.70}) + jobID, map[execinfrapb.ComponentID]float32{n2proc1: 0.70}, s.ClusterSettings().Version) // Update n1proc1. jobsprofiler.StorePerNodeProcessorProgressFraction(ctx, s.InternalDB().(isql.DB), - jobID, map[execinfrapb.ComponentID]float32{n1proc1: 1.00}) + jobID, map[execinfrapb.ComponentID]float32{n1proc1: 1.00}, s.ClusterSettings().Version) var persistedProgress map[string]string err := s.ExecutorConfig().(sql.ExecutorConfig).InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { persistedProgress = make(map[string]string) - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, s.ClusterSettings().Version) return infoStorage.Iterate(ctx, profilerconstants.NodeProcessorProgressInfoKeyPrefix, func(infoKey string, value []byte) error { f, err := strconv.ParseFloat(string(value), 32) @@ -219,7 +219,7 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) { testutils.SucceedsSoon(t, func() error { recordings := make([][]byte, 0) execCfg := s.ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig) - edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobspb.JobID(jobID)) + edFiles, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobspb.JobID(jobID), s.ClusterSettings().Version) if err != nil { return err } @@ -232,7 +232,7 @@ func TestTraceRecordingOnResumerCompletion(t *testing.T) { return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { for _, f := range traceFiles { - data, err := jobs.ReadExecutionDetailFile(ctx, f, txn, jobspb.JobID(jobID)) + data, err := jobs.ReadExecutionDetailFile(ctx, f, txn, jobspb.JobID(jobID), s.ClusterSettings().Version) if err != nil { return err } diff --git a/pkg/jobs/utils.go b/pkg/jobs/utils.go index e9479baad86e..788a9d22e696 100644 --- a/pkg/jobs/utils.go +++ b/pkg/jobs/utils.go @@ -210,12 +210,18 @@ func isJobInfoTableDoesNotExistError(err error) bool { // txn is pushed to a higher timestamp at which the upgrade will have completed // and the table/column will be visible. The longer term fix is being tracked in // https://github.com/cockroachdb/cockroach/issues/106764. -func MaybeGenerateForcedRetryableError(ctx context.Context, txn *kv.Txn, err error) error { - if err != nil && isJobTypeColumnDoesNotExistError(err) { +func MaybeGenerateForcedRetryableError( + ctx context.Context, txn *kv.Txn, err error, cv clusterversion.Handle, +) error { + if err == nil || !cv.IsActive(ctx, clusterversion.V23_1) { + return err + } + + if isJobTypeColumnDoesNotExistError(err) { return txn.GenerateForcedRetryableErr(ctx, "synthetic error "+ "to push timestamp to after the `job_type` upgrade has run") } - if err != nil && isJobInfoTableDoesNotExistError(err) { + if isJobInfoTableDoesNotExistError(err) { return txn.GenerateForcedRetryableErr(ctx, "synthetic error "+ "to push timestamp to after the `job_info` upgrade has run") } diff --git a/pkg/server/autoconfig/BUILD.bazel b/pkg/server/autoconfig/BUILD.bazel index 8f67ffe3dd30..5013a15c24cb 100644 --- a/pkg/server/autoconfig/BUILD.bazel +++ b/pkg/server/autoconfig/BUILD.bazel @@ -13,6 +13,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/security/username", diff --git a/pkg/server/autoconfig/auto_config_env_runner.go b/pkg/server/autoconfig/auto_config_env_runner.go index 159b27d645c3..50fa4ed8067a 100644 --- a/pkg/server/autoconfig/auto_config_env_runner.go +++ b/pkg/server/autoconfig/auto_config_env_runner.go @@ -146,7 +146,7 @@ func (r *envRunner) maybeRunNextTask( err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (resErr error) { // Re-check if there any other started task already. - otherTaskID, _, err := getCurrentlyStartedTaskID(ctx, txn, r.envID) + otherTaskID, _, err := getCurrentlyStartedTaskID(ctx, txn, r.envID, execCfg.Settings.Version) if err != nil { return err } @@ -161,7 +161,7 @@ func (r *envRunner) maybeRunNextTask( } // Find the latest completed task. - lastTaskID, err := getLastCompletedTaskID(ctx, txn, r.envID) + lastTaskID, err := getLastCompletedTaskID(ctx, txn, r.envID, execCfg.Settings.Version) if err != nil { return err } @@ -190,7 +190,7 @@ func (r *envRunner) maybeRunNextTask( // maybeWaitForCurrentTaskJob(), which is an optimization. Storing // the job ID is not strictly required for sequencing the tasks. if err := writeStartMarker(ctx, txn, - InfoKeyTaskRef{Environment: r.envID, Task: nextTaskID}, jobID); err != nil { + InfoKeyTaskRef{Environment: r.envID, Task: nextTaskID}, jobID, execCfg.Settings.Version); err != nil { return errors.Wrapf(err, "unable to write start marker for task %d", nextTaskID) } @@ -219,7 +219,7 @@ func (r *envRunner) maybeWaitForCurrentTaskJob( if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { var err error - prevTaskID, prevJobID, err = getCurrentlyStartedTaskID(ctx, txn, r.envID) + prevTaskID, prevJobID, err = getCurrentlyStartedTaskID(ctx, txn, r.envID, execCfg.Settings.Version) return err }); err != nil { return errors.Wrap(err, "checking latest task job") diff --git a/pkg/server/autoconfig/auto_config_task.go b/pkg/server/autoconfig/auto_config_task.go index 24011ae8755a..76c81146d69d 100644 --- a/pkg/server/autoconfig/auto_config_task.go +++ b/pkg/server/autoconfig/auto_config_task.go @@ -78,7 +78,7 @@ func (r *taskRunner) OnFailOrCancel(ctx context.Context, execCtx interface{}, jo if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { return markTaskComplete(ctx, txn, InfoKeyTaskRef{Environment: r.envID, Task: r.task.TaskID}, - []byte("task error")) + []byte("task error"), execCfg.Settings.Version) }); err != nil { return err } @@ -180,7 +180,7 @@ func execSimpleSQL( log.Infof(ctx, "finished executing txn statements") return markTaskComplete(ctx, txn, InfoKeyTaskRef{Environment: envID, Task: taskID}, - []byte("task success")) + []byte("task success"), execCfg.Settings.Version) }) } diff --git a/pkg/server/autoconfig/task_markers.go b/pkg/server/autoconfig/task_markers.go index 2b861e8db18c..b7f3d62178de 100644 --- a/pkg/server/autoconfig/task_markers.go +++ b/pkg/server/autoconfig/task_markers.go @@ -16,6 +16,7 @@ import ( "strconv" "strings" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -141,9 +142,13 @@ func (tr *InfoKeyTaskRef) decodeInternal(prefix, infoKey string) error { // writeStartMarker writes a start marker for the given task ID and // also writes its job ID into the value part. func writeStartMarker( - ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, jobID jobspb.JobID, + ctx context.Context, + txn isql.Txn, + taskRef InfoKeyTaskRef, + jobID jobspb.JobID, + cv clusterversion.Handle, ) error { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) return infoStorage.Write(ctx, taskRef.EncodeStartMarkerKey(), []byte(strconv.FormatUint(uint64(jobID), 10))) @@ -152,9 +157,9 @@ func writeStartMarker( // getCurrentlyStartedTaskID retrieves the ID of the last task which // has a start marker in job_info. func getCurrentlyStartedTaskID( - ctx context.Context, txn isql.Txn, env EnvironmentID, + ctx context.Context, txn isql.Txn, env EnvironmentID, cv clusterversion.Handle, ) (prevTaskID TaskID, prevJobID jobspb.JobID, err error) { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) if err := infoStorage.GetLast(ctx, InfoKeyStartPrefix(env), @@ -184,9 +189,9 @@ func getCurrentlyStartedTaskID( // getLastCompletedTaskID retrieves the task ID of the last task which // has a completion marker in job_info. func getLastCompletedTaskID( - ctx context.Context, txn isql.Txn, env EnvironmentID, + ctx context.Context, txn isql.Txn, env EnvironmentID, cv clusterversion.Handle, ) (lastTaskID TaskID, err error) { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) if err := infoStorage.GetLast(ctx, InfoKeyCompletionPrefix(env), @@ -208,9 +213,13 @@ func getLastCompletedTaskID( // markTaskCompletes transactionally removes the task's start marker // and creates a completion marker. func markTaskComplete( - ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, completionValue []byte, + ctx context.Context, + txn isql.Txn, + taskRef InfoKeyTaskRef, + completionValue []byte, + cv clusterversion.Handle, ) error { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) // Remove the start marker. if err := infoStorage.Delete(ctx, taskRef.EncodeStartMarkerKey()); err != nil { diff --git a/pkg/server/job_profiler.go b/pkg/server/job_profiler.go index fb9eefef67eb..296d5ebb1b63 100644 --- a/pkg/server/job_profiler.go +++ b/pkg/server/job_profiler.go @@ -71,7 +71,7 @@ func (s *statusServer) RequestJobProfilerExecutionDetails( return nil, errors.Newf("execution details can only be requested on a cluster with version >= %s", clusterversion.V23_2.String()) } - e := makeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID, execCfg.JobRegistry) + e := makeJobProfilerExecutionDetailsBuilder(execCfg.SQLStatusServer, execCfg.InternalDB, jobID, execCfg.JobRegistry, execCfg.Settings.Version) // TODO(adityamaru): When we start collecting more information we can consider // parallelize the collection of the various pieces. @@ -111,14 +111,19 @@ type executionDetailsBuilder struct { db isql.DB jobID jobspb.JobID registry *jobs.Registry + cv clusterversion.Handle } // makeJobProfilerExecutionDetailsBuilder returns an instance of an executionDetailsBuilder. func makeJobProfilerExecutionDetailsBuilder( - srv serverpb.SQLStatusServer, db isql.DB, jobID jobspb.JobID, registry *jobs.Registry, + srv serverpb.SQLStatusServer, + db isql.DB, + jobID jobspb.JobID, + registry *jobs.Registry, + cv clusterversion.Handle, ) executionDetailsBuilder { e := executionDetailsBuilder{ - srv: srv, db: db, jobID: jobID, registry: registry, + srv: srv, db: db, jobID: jobID, registry: registry, cv: cv, } return e } @@ -140,7 +145,7 @@ func (e *executionDetailsBuilder) addLabelledGoroutines(ctx context.Context) { } filename := fmt.Sprintf("goroutines.%s.txt", timeutil.Now().Format("20060102_150405.00")) if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, txn, e.jobID) + return jobs.WriteExecutionDetailFile(ctx, filename, resp.Data, txn, e.jobID, e.cv) }); err != nil { log.Errorf(ctx, "failed to write goroutine for job %d: %v", e.jobID, err.Error()) } @@ -161,7 +166,7 @@ func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) { if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { return jobs.WriteExecutionDetailFile(ctx, filename, []byte(fmt.Sprintf(``, dspDiagramURL)), - txn, e.jobID) + txn, e.jobID, e.cv) }); err != nil { log.Errorf(ctx, "failed to write DistSQL diagram for job %d: %v", e.jobID, err.Error()) } @@ -172,7 +177,7 @@ func (e *executionDetailsBuilder) addDistSQLDiagram(ctx context.Context) { // that captures the active tracing spans of a job on all nodes in the cluster. func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) { z := zipper.MakeInternalExecutorInflightTraceZipper(e.db.Executor()) - traceID, err := jobs.GetJobTraceID(ctx, e.db, e.jobID) + traceID, err := jobs.GetJobTraceID(ctx, e.db, e.jobID, e.cv) if err != nil { log.Warningf(ctx, "failed to fetch job trace ID: %+v", err.Error()) return @@ -185,7 +190,7 @@ func (e *executionDetailsBuilder) addClusterWideTraces(ctx context.Context) { filename := fmt.Sprintf("trace.%s.zip", timeutil.Now().Format("20060102_150405.00")) if err := e.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - return jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, txn, e.jobID) + return jobs.WriteExecutionDetailFile(ctx, filename, zippedTrace, txn, e.jobID, e.cv) }); err != nil { log.Errorf(ctx, "failed to write traces for job %d: %v", e.jobID, err.Error()) } diff --git a/pkg/server/status.go b/pkg/server/status.go index 1f8a8831348c..98319940bd6f 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -3943,7 +3943,7 @@ func (s *statusServer) GetJobProfilerExecutionDetails( execCfg := s.sqlServer.execCfg var data []byte if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - data, err = jobs.ReadExecutionDetailFile(ctx, req.Filename, txn, jobID) + data, err = jobs.ReadExecutionDetailFile(ctx, req.Filename, txn, jobID, execCfg.Settings.Version) return err }); err != nil { return nil, err @@ -3964,7 +3964,7 @@ func (s *statusServer) ListJobProfilerExecutionDetails( jobID := jobspb.JobID(req.JobId) execCfg := s.sqlServer.execCfg - files, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID) + files, err := jobs.ListExecutionDetailFiles(ctx, execCfg.InternalDB, jobID, execCfg.Settings.Version) if err != nil { return nil, err } diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 7718ee51b718..a7e1ca9da5b3 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1081,7 +1081,7 @@ func populateSystemJobsTableRows( params..., ) if err != nil { - return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err) + return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err, p.execCfg.Settings.Version) } cleanup := func(ctx context.Context) { @@ -1094,7 +1094,7 @@ func populateSystemJobsTableRows( for { hasNext, err := it.Next(ctx) if !hasNext || err != nil { - return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err) + return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err, p.execCfg.Settings.Version) } currentRow := it.Cur() diff --git a/pkg/sql/importer/import_processor_planning.go b/pkg/sql/importer/import_processor_planning.go index b978a01450cf..979ee5ddf76c 100644 --- a/pkg/sql/importer/import_processor_planning.go +++ b/pkg/sql/importer/import_processor_planning.go @@ -270,7 +270,7 @@ func distImport( } execCfg := execCtx.ExecCfg() - jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, job.ID()) + jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, job.ID(), execCtx.ExecCfg().Settings.Version) // Copy the evalCtx, as dsp.Run() might change it. evalCtxCopy := *evalCtx diff --git a/pkg/sql/jobs_profiler_execution_details.go b/pkg/sql/jobs_profiler_execution_details.go index d4e43b966f46..62a73977bef9 100644 --- a/pkg/sql/jobs_profiler_execution_details.go +++ b/pkg/sql/jobs_profiler_execution_details.go @@ -43,9 +43,9 @@ func (p *planner) GenerateExecutionDetailsJSON( payload := j.Payload() switch payload.Type() { case jobspb.TypeBackup: - executionDetailsJSON, err = constructBackupExecutionDetails(ctx, jobID, execCfg.InternalDB) + executionDetailsJSON, err = constructBackupExecutionDetails(ctx, jobID, execCfg.InternalDB, execCfg.Settings.Version) default: - executionDetailsJSON, err = constructDefaultExecutionDetails(ctx, jobID, execCfg.InternalDB) + executionDetailsJSON, err = constructDefaultExecutionDetails(ctx, jobID, execCfg.InternalDB, execCfg.Settings.Version) } return executionDetailsJSON, err @@ -60,12 +60,12 @@ type defaultExecutionDetails struct { } func constructDefaultExecutionDetails( - ctx context.Context, jobID jobspb.JobID, db isql.DB, + ctx context.Context, jobID jobspb.JobID, db isql.DB, cv clusterversion.Handle, ) ([]byte, error) { executionDetails := &defaultExecutionDetails{} err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { // Populate the latest DSP diagram URL. - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) err := infoStorage.GetLast(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, func(infoKey string, value []byte) error { executionDetails.PlanDiagram = string(value) return nil @@ -94,13 +94,13 @@ type backupExecutionDetails struct { } func constructBackupExecutionDetails( - ctx context.Context, jobID jobspb.JobID, db isql.DB, + ctx context.Context, jobID jobspb.JobID, db isql.DB, cv clusterversion.Handle, ) ([]byte, error) { var annotatedURL url.URL marshallablePerComponentProgress := make(map[string]float32) if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { // Read the latest DistSQL diagram. - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) var distSQLURL string if err := infoStorage.GetLast(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, func(infoKey string, value []byte) error { distSQLURL = string(value) diff --git a/pkg/sql/jobs_profiler_execution_details_test.go b/pkg/sql/jobs_profiler_execution_details_test.go index 540f0754b5a6..3f7392fb3da4 100644 --- a/pkg/sql/jobs_profiler_execution_details_test.go +++ b/pkg/sql/jobs_profiler_execution_details_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" @@ -89,11 +90,16 @@ func (d fakeExecResumer) CollectProfile(_ context.Context, _ interface{}) error // checkForPlanDiagram is a method used in tests to wait for the existence of a // DSP diagram for the provided jobID. func checkForPlanDiagrams( - ctx context.Context, t *testing.T, db isql.DB, jobID jobspb.JobID, expectedNumDiagrams int, + ctx context.Context, + t *testing.T, + db isql.DB, + jobID jobspb.JobID, + expectedNumDiagrams int, + cv clusterversion.Handle, ) { testutils.SucceedsSoon(t, func() error { return db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) var found int err := infoStorage.Iterate(ctx, profilerconstants.DSPDiagramInfoKeyPrefix, func(infoKey string, value []byte) error { @@ -134,8 +140,9 @@ func TestShowJobsWithExecutionDetails(t *testing.T) { p := sql.PhysicalPlan{} infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) p.PhysicalInfrastructure = infra - jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) - checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1) + jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID(), + s.ClusterSettings().Version) + checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1, s.ClusterSettings().Version) return nil }, } @@ -185,8 +192,8 @@ func TestReadWriteProfilerExecutionDetails(t *testing.T) { p := sql.PhysicalPlan{} infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) p.PhysicalInfrastructure = infra - jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) - checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1) + jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID(), s.ClusterSettings().Version) + checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), 1, s.ClusterSettings().Version) isRunning <- struct{}{} <-continueRunning return nil @@ -348,8 +355,9 @@ func TestListProfilerExecutionDetails(t *testing.T) { p := sql.PhysicalPlan{} infra := physicalplan.NewPhysicalInfrastructure(uuid.FastMakeV4(), base.SQLInstanceID(1)) p.PhysicalInfrastructure = infra - jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID()) - checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), expectedDiagrams) + jobsprofiler.StorePlanDiagram(ctx, s.Stopper(), &p, s.InternalDB().(isql.DB), j.ID(), + execCfg.Settings.Version) + checkForPlanDiagrams(ctx, t, s.InternalDB().(isql.DB), j.ID(), expectedDiagrams, s.ClusterSettings().Version) writtenDiagram <- struct{}{} <-continueCh if err := execCfg.JobRegistry.CheckPausepoint("fakeresumer.pause"); err != nil { diff --git a/pkg/ui/workspaces/cluster-ui/src/timeScaleDropdown/timeFrameControls.tsx b/pkg/ui/workspaces/cluster-ui/src/timeScaleDropdown/timeFrameControls.tsx index 390926710de9..a6b335bd69a0 100644 --- a/pkg/ui/workspaces/cluster-ui/src/timeScaleDropdown/timeFrameControls.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/timeScaleDropdown/timeFrameControls.tsx @@ -38,7 +38,7 @@ export const TimeFrameControls = ({ onArrowClick(direction); const left = disabledArrows.includes(ArrowDirection.LEFT); - const right = disabledArrows.includes(ArrowDirection.RIGHT); + const canForward = !disabledArrows.includes(ArrowDirection.RIGHT); const delay = 0.3; return ( @@ -66,9 +66,10 @@ export const TimeFrameControls = ({ mouseLeaveDelay={delay} >