Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
112954: ui: enable Forward button to set timewindow for latest NOW value r=koorosh a=koorosh

Before, "Forward" button on Time selector component allowed to select next time window if there's enough space for full increment (ie with 10 min time window, it wasn't possible to move forward if current end time is less that Now() - 10min). It caused misalignment where Forward button became disabled even if there's some more data to display.

This change handles this case and updates current time window to current time (executes the same logic as Now button).

Resolves: #112847

Release note (ui change): Forward button on time selector allows to select latest available timewindow (the same as with "Now" button)

Release justification: low risk, high benefit changes to existing functionality


https://github.com/cockroachdb/cockroach/assets/3106437/00f50793-c327-4902-903b-868ea2000047



113827: build: add BranchReleaseSeries r=RaduBerinde a=RaduBerinde

This change adds `build.BranchReleaseSeries()` which returns the major and minor in `version.txt`. This will be used to know the current release series when the latest `clusterversion` is not finalized.

We also clean up the code a bit: we separate the variables that are overridden by Bazel, and we use a different variable for the testing override (to make things more clear).

Epic: none
Release note: None

113864:   jobs: only force jobs.MaybeGenerateForcedRetryableError in 23.1 r=dt a=dt

Broken into a couple commits for ease of review:
1) jobs: plumb cluster version to info table accessor

This is a pure refactor to plumb a clusterversion.Handle to the info table accessor
via all the call sites and wrapping structs/call trees; no behavior change, or usage
of the plumbed cv, is added in this commit.

2) jobs: only force jobs.MaybeGenerateForcedRetryableError in 23.1


3) jobs: only store 23.1 debugging info after 23.1 upgrade

Release note (bug fix): fixed a bug that could cause 23.1 nodes in clusters which had not finalized the 23.1
version upgrade to use excessive CPU retrying expected errors related to the incomplete upgrade state.

Informs #113795.


Co-authored-by: Andrii Vorobiov <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
4 people committed Nov 6, 2023
4 parents cb8192f + 6c597f6 + a65ede5 + 308b247 commit 8899d03
Show file tree
Hide file tree
Showing 45 changed files with 264 additions and 151 deletions.
72 changes: 49 additions & 23 deletions pkg/build/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,34 @@ import (
// with the string passed to the linker in the root Makefile.
const TimeFormat = "2006/01/02 15:04:05"

// These variables are initialized by Bazel via the linker -X flag
// when compiling release binaries.
var (
// These variables are initialized by Bazel via the linker -X flag
// when compiling release binaries.
utcTime string // Build time in UTC (year/month/day hour:min:sec)
rev string // SHA-1 of this build (git rev-parse)
buildTagOverride string
cgoCompiler = cgoVersion()
cgoTargetTriple string
platform = fmt.Sprintf("%s %s", runtime.GOOS, runtime.GOARCH)
// Distribution is changed by the CCL init-time hook in non-APL builds.
Distribution = "OSS"
typ string // Type of this build: <empty>, "development", or "release"
channel string
typ string // Type of this build: <empty>, "development", or "release"
channel string
)

// Distribution is changed by the CCL init-time hook in non-APL builds.
var Distribution = "OSS"

var (
cgoCompiler = cgoVersion()
platform = fmt.Sprintf("%s %s", runtime.GOOS, runtime.GOARCH)
envChannel = envutil.EnvOrDefaultString("COCKROACH_CHANNEL", "unknown")
enabledAssertions = buildutil.CrdbTestBuild
)

var (
//go:embed version.txt
cockroachVersion string
binaryVersion = computeBinaryVersion(cockroachVersion, rev)
versionTxt string
parsedVersionTxt *version.Version = parseCockroachVersion(versionTxt)
binaryVersion string = computeBinaryVersion(buildTagOverride, parsedVersionTxt, rev)
// binaryVersionTestingOverride is modified by TestingOverrideVersion.
binaryVersionTestingOverride string
)

const (
Expand All @@ -64,38 +74,54 @@ func SeemsOfficial() bool {
return channel == DefaultTelemetryChannel || channel == FIPSTelemetryChannel
}

func computeBinaryVersion(versionTxt, revision string) string {
if buildTagOverride != "" {
return buildTagOverride
}
func parseCockroachVersion(versionTxt string) *version.Version {
txt := strings.TrimSuffix(versionTxt, "\n")
v, err := version.Parse(txt)
if err != nil {
panic(fmt.Errorf("could not parse version.txt: %w", err))
}
return v
}

func computeBinaryVersion(
buildTagOverride string, parsedVersionTxt *version.Version, revision string,
) string {
if buildTagOverride != "" {
return buildTagOverride
}
if IsRelease() {
return v.String()
return parsedVersionTxt.String()
}
if revision != "" {
return fmt.Sprintf("%s-dev-%s", v.String(), revision)
return fmt.Sprintf("%s-dev-%s", parsedVersionTxt.String(), revision)
}
return fmt.Sprintf("%s-dev", v.String())
return fmt.Sprintf("%s-dev", parsedVersionTxt.String())
}

// BinaryVersion returns the version prefix, patch number and metadata of the current build.
// BinaryVersion returns the version prefix, patch number and metadata of the
// current build.
func BinaryVersion() string {
if binaryVersionTestingOverride != "" {
return binaryVersionTestingOverride
}
return binaryVersion
}

// BinaryVersionPrefix returns the version prefix of the current build.
func BinaryVersionPrefix() string {
v, err := version.Parse(binaryVersion)
v, err := version.Parse(BinaryVersion())
if err != nil {
return "dev"
}
return fmt.Sprintf("v%d.%d", v.Major(), v.Minor())
}

// BranchReleaseSeries returns tha major and minor in version.txt, without
// allowing for any overrides.
func BranchReleaseSeries() (major, minor int) {
return parsedVersionTxt.Major(), parsedVersionTxt.Minor()
}

func init() {
// Allow tests to override the version.txt contents.
if versionOverride := envutil.EnvOrDefaultString(
Expand Down Expand Up @@ -161,7 +187,7 @@ func GetInfo() Info {
}
return Info{
GoVersion: runtime.Version(),
Tag: binaryVersion,
Tag: BinaryVersion(),
Time: utcTime,
Revision: rev,
CgoCompiler: cgoCompiler,
Expand All @@ -178,9 +204,9 @@ func GetInfo() Info {
// TestingOverrideVersion allows tests to override the binary version
// reported by cockroach.
func TestingOverrideVersion(v string) func() {
prevBinaryVersion := binaryVersion
binaryVersion = v
return func() { binaryVersion = prevBinaryVersion }
prevOverride := binaryVersionTestingOverride
binaryVersionTestingOverride = v
return func() { binaryVersionTestingOverride = prevOverride }
}

// MakeIssueURL produces a URL to a CockroachDB issue.
Expand Down
5 changes: 3 additions & 2 deletions pkg/build/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ func TestComputeBinaryVersion(t *testing.T) {
defer func() { typ = oldBuildType }()

if tc.panicExpected {
require.Panics(t, func() { computeBinaryVersion(tc.versionTxt, tc.revision) })
require.Panics(t, func() { parseCockroachVersion(tc.versionTxt) })
} else {
actualVersion := computeBinaryVersion(tc.versionTxt, tc.revision)
v := parseCockroachVersion(tc.versionTxt)
actualVersion := computeBinaryVersion("" /* buildTagOverride */, v, tc.revision)
require.Equal(t, tc.expectedVersion, actualVersion)
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func backup(
return nil
}
jobsprofiler.StorePerNodeProcessorProgressFraction(
ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog)
ctx, execCtx.ExecCfg().InternalDB, job.ID(), prog, execCtx.ExecCfg().Settings.Version)
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -2095,7 +2095,7 @@ func (b *backupResumer) CollectProfile(ctx context.Context, execCtx interface{})
aggStatsCopy = b.mu.perNodeAggregatorStats.DeepCopy()
}()
return bulkutil.FlushTracingAggregatorStats(ctx, b.job.ID(),
p.ExecCfg().InternalDB, aggStatsCopy)
p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version)
}

func (b *backupResumer) deleteCheckpoint(
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func distBackup(
defer close(progCh)
defer close(tracingAggCh)
execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID, execCfg.Settings.Version)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2544,7 +2544,7 @@ func (r *restoreResumer) CollectProfile(ctx context.Context, execCtx interface{}
aggStatsCopy = r.mu.perNodeAggregatorStats.DeepCopy()
}()
return bulkutil.FlushTracingAggregatorStats(ctx, r.job.ID(),
p.ExecCfg().InternalDB, aggStatsCopy)
p.ExecCfg().InternalDB, aggStatsCopy, p.ExecCfg().Settings.Version)
}

// dropDescriptors implements the OnFailOrCancel logic.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func distRestore(
defer recv.Release()

execCfg := execCtx.ExecCfg()
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, md.jobID)
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, md.jobID, execCfg.Settings.Version)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func startDistChangefeed(
finishedSetupFn = func(flowinfra.Flow) { resultsCh <- tree.Datums(nil) }
}

jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID)
jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID, execCfg.Settings.Version)

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/replicationutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv/kvpb",
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -174,9 +175,9 @@ func ReplicatedTimeFromProgress(p *jobspb.Progress) hlc.Timestamp {
// LoadIngestionProgress loads the latest persisted stream ingestion progress.
// The method returns nil if the progress does not exist yet.
func LoadIngestionProgress(
ctx context.Context, db isql.DB, jobID jobspb.JobID,
ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle,
) (*jobspb.StreamIngestionProgress, error) {
progress, err := jobs.LoadJobProgress(ctx, db, jobID)
progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv)
if err != nil || progress == nil {
return nil, err
}
Expand All @@ -192,9 +193,9 @@ func LoadIngestionProgress(
// LoadReplicationProgress loads the latest persisted stream replication progress.
// The method returns nil if the progress does not exist yet.
func LoadReplicationProgress(
ctx context.Context, db isql.DB, jobID jobspb.JobID,
ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle,
) (*jobspb.StreamReplicationProgress, error) {
progress, err := jobs.LoadJobProgress(ctx, db, jobID)
progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv)
if err != nil || progress == nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/cloud",
"//pkg/cloud/externalconn",
"//pkg/cloud/externalconn/connectionpb",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
Expand Down
16 changes: 11 additions & 5 deletions pkg/ccl/streamingccl/streamingest/replication_execution_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"text/tabwriter"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -104,15 +105,19 @@ func constructSpanFrontierExecutionDetails(
// - The snapshot of the frontier tracking how far each span has been replicated
// up to.
func generateSpanFrontierExecutionDetailFile(
ctx context.Context, execCfg *sql.ExecutorConfig, ingestionJobID jobspb.JobID, skipBehindBy bool,
ctx context.Context,
execCfg *sql.ExecutorConfig,
ingestionJobID jobspb.JobID,
skipBehindBy bool,
cv clusterversion.Handle,
) error {
return execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var sb bytes.Buffer
w := tabwriter.NewWriter(&sb, 0, 0, 1, ' ', tabwriter.TabIndent)

// Read the StreamIngestionPartitionSpecs to get a mapping from spans to
// their source and destination SQL instance IDs.
specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID)
specs, err := jobs.ReadChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, txn, ingestionJobID, cv)
if err != nil {
return err
}
Expand All @@ -124,7 +129,7 @@ func generateSpanFrontierExecutionDetailFile(

// Now, read the latest snapshot of the frontier that tells us what
// timestamp each span has been replicated up to.
frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID)
frontierEntries, err := jobs.ReadChunkedFileToJobInfo(ctx, frontierEntriesFilename, txn, ingestionJobID, cv)
if err != nil {
return err
}
Expand Down Expand Up @@ -157,7 +162,7 @@ func generateSpanFrontierExecutionDetailFile(
if err := w.Flush(); err != nil {
return err
}
return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID)
return jobs.WriteExecutionDetailFile(ctx, filename, sb.Bytes(), txn, ingestionJobID, cv)
})
}

Expand All @@ -170,6 +175,7 @@ func persistStreamIngestionPartitionSpecs(
execCfg *sql.ExecutorConfig,
ingestionJobID jobspb.JobID,
streamIngestionSpecs map[base.SQLInstanceID]*execinfrapb.StreamIngestionDataSpec,
cv clusterversion.Handle,
) error {
err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
specs := make([]*execinfrapb.StreamIngestionPartitionSpec, 0)
Expand All @@ -183,7 +189,7 @@ func persistStreamIngestionPartitionSpecs(
if err != nil {
return err
}
return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID)
return jobs.WriteChunkedFileToJobInfo(ctx, replicationPartitionInfoFilename, specBytes, txn, ingestionJobID, cv)
})
if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.AfterPersistingPartitionSpecs != nil {
knobs.AfterPersistingPartitionSpecs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestEndToEndFrontierExecutionDetailFile(t *testing.T) {

ingestionJobID := jobspb.JobID(123)
require.NoError(t, persistStreamIngestionPartitionSpecs(ctx, &execCfg,
ingestionJobID, streamIngestionsSpecs))
ingestionJobID, streamIngestionsSpecs, execCfg.Settings.Version))

// Now, let's persist some frontier entries.
frontierEntries := execinfrapb.FrontierEntries{ResolvedSpans: []jobspb.ResolvedSpan{
Expand All @@ -369,9 +369,9 @@ func TestEndToEndFrontierExecutionDetailFile(t *testing.T) {
frontierBytes, err := protoutil.Marshal(&frontierEntries)
require.NoError(t, err)
require.NoError(t, execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, ingestionJobID)
return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, ingestionJobID, execCfg.Settings.Version)
}))
require.NoError(t, generateSpanFrontierExecutionDetailFile(ctx, &execCfg, ingestionJobID, true /* skipBehindBy */))
require.NoError(t, generateSpanFrontierExecutionDetailFile(ctx, &execCfg, ingestionJobID, true /* skipBehindBy */, execCfg.Settings.Version))
files := listExecutionDetails(t, srv, ingestionJobID)
require.Len(t, files, 1)
data, err := checkExecutionDetails(t, srv, ingestionJobID, files[0])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,12 +1114,14 @@ func TestLoadProducerAndIngestionProgress(t *testing.T) {
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID))

srcDB := c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB
producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID))
producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID),
c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version)
require.NoError(t, err)
require.Equal(t, jobspb.StreamReplicationProgress_NOT_FINISHED, producerProgress.StreamIngestionStatus)

destDB := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB
ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID))
ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID),
c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version)
require.NoError(t, err)
require.Equal(t, jobspb.Replicating, ingestionProgress.ReplicationStatus)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func startDistIngestion(
return errors.Wrap(err, "failed to update job progress")
}
jobsprofiler.StorePlanDiagram(ctx, execCtx.ExecCfg().DistSQLSrv.Stopper, planner.initialPlan, execCtx.ExecCfg().InternalDB,
ingestionJob.ID())
ingestionJob.ID(), execCtx.ExecCfg().Settings.Version)

replanOracle := sql.ReplanOnCustomFunc(
measurePlanChange,
Expand Down Expand Up @@ -463,7 +463,7 @@ func (p *replicationFlowPlanner) constructPlanGenerator(
if !p.createdInitialPlan() {
// Only persist the initial plan as it's the only plan that actually gets
// executed.
if err := persistStreamIngestionPartitionSpecs(ctx, execCtx.ExecCfg(), ingestionJobID, streamIngestionSpecs); err != nil {
if err := persistStreamIngestionPartitionSpecs(ctx, execCtx.ExecCfg(), ingestionJobID, streamIngestionSpecs, execCtx.ExecCfg().Settings.Version); err != nil {
return nil, nil, err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (sf *streamIngestionFrontier) maybePersistFrontierEntries() error {
}

if err = sf.FlowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, jobID)
return jobs.WriteChunkedFileToJobInfo(ctx, frontierEntriesFilename, frontierBytes, txn, jobID, sf.EvalCtx.Settings.Version)
}); err != nil {
return err
}
Expand Down
Loading

0 comments on commit 8899d03

Please sign in to comment.