Skip to content

Commit

Permalink
Merge #96374 #96656
Browse files Browse the repository at this point in the history
96374: streamingest: fix returned cutover time on `complete to latest` r=lidorcarmel a=lidorcarmel

Currently when cutting over to the LATEST high watermark, the sql command prints that that cutover was done to timestamp 0.

This commit fixes this bug.

Fixes: #95901

Epic: CRDB-18752

Release note: None

96656: clusterversion: bump minBinary to 22.2 r=RaduBerinde a=celiala

This PR bumps binaryMinSupportedVersion to 22.2 before cutting the 23.1
release branch, as part of the stability period process.

In previous stability periods, the work of deleting pre-22.2 gates/tests
and bumping binaryMinSupportedVersion was done at the same time. For 23.1,
we will break this up into smaller tasks:

- This PR will just bump binaryMinSupportedVersion to 22.2, skipping 22.2
  mixed-version tests as needed to bump this value.
- The clean-up of pre-22.2 gates and 22.2 mixed-version tests will be done
  via follow-up work/issues.

Issue: #95530
Epic: CRDB-23572

Release note: None

Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: Celia La <[email protected]>
  • Loading branch information
3 people committed Feb 7, 2023
3 parents 0a65a4e + 6d3f60c + 76fc52c commit 618ea05
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 17 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/storageccl/engineccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ go_test(
"//pkg/storage/enginepb",
"//pkg/testutils",
"//pkg/testutils/datapathutils",
"//pkg/testutils/skip",
"//pkg/testutils/storageutils",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/storageccl/engineccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -166,6 +167,8 @@ func runIterate(
}

func BenchmarkTimeBoundIterate(b *testing.B) {
skip.WithIssue(b, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup")

for _, loadFactor := range []float32{1.0, 0.5, 0.1, 0.05, 0.0} {
b.Run(fmt.Sprintf("LoadFactor=%.2f", loadFactor), func(b *testing.B) {
b.Run("NormalIterator", func(b *testing.B) {
Expand Down
36 changes: 20 additions & 16 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,12 @@ func alterReplicationJobHook(
jobRegistry := p.ExecCfg().JobRegistry
if alterTenantStmt.Cutover != nil {
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(p.InternalSQLTxn())
if err := alterTenantJobCutover(
ctx, p.InternalSQLTxn(), jobRegistry, pts,
alterTenantStmt, tenInfo, cutoverTime,
); err != nil {
actualCutoverTime, err := alterTenantJobCutover(
ctx, p.InternalSQLTxn(), jobRegistry, pts, alterTenantStmt, tenInfo, cutoverTime)
if err != nil {
return err
}
resultsCh <- tree.Datums{eval.TimestampToDecimalDatum(cutoverTime)}
resultsCh <- tree.Datums{eval.TimestampToDecimalDatum(actualCutoverTime)}
} else if !alterTenantStmt.Options.IsDefault() {
if err := alterTenantOptions(ctx, p.InternalSQLTxn(), jobRegistry, options, tenInfo); err != nil {
return err
Expand Down Expand Up @@ -209,6 +208,9 @@ func alterReplicationJobHook(
return fn, nil, nil, false, nil
}

// alterTenantJobCutover returns the cutover timestamp that was used to initiate
// the cutover process - if the command is 'ALTER TENANT .. COMPLETE REPLICATION
// TO LATEST' then the frontier high water timestamp is used.
func alterTenantJobCutover(
ctx context.Context,
txn isql.Txn,
Expand All @@ -217,25 +219,26 @@ func alterTenantJobCutover(
alterTenantStmt *tree.AlterTenantReplication,
tenInfo *mtinfopb.TenantInfo,
cutoverTime hlc.Timestamp,
) error {
) (hlc.Timestamp, error) {
if alterTenantStmt == nil || alterTenantStmt.Cutover == nil {
return errors.AssertionFailedf("unexpected nil ALTER TENANT cutover expression")
return hlc.Timestamp{}, errors.AssertionFailedf("unexpected nil ALTER TENANT cutover expression")
}

tenantName := tenInfo.Name
job, err := jobRegistry.LoadJobWithTxn(ctx, tenInfo.TenantReplicationJobID, txn)
if err != nil {
return err
return hlc.Timestamp{}, err
}
details, ok := job.Details().(jobspb.StreamIngestionDetails)
if !ok {
return errors.Newf("job with id %d is not a stream ingestion job", job.ID())
return hlc.Timestamp{}, errors.Newf("job with id %d is not a stream ingestion job", job.ID())
}
progress := job.Progress()
if alterTenantStmt.Cutover.Latest {
ts := progress.GetHighWater()
if ts == nil || ts.IsEmpty() {
return errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName)
return hlc.Timestamp{},
errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName)
}
cutoverTime = *ts
}
Expand All @@ -246,25 +249,26 @@ func alterTenantJobCutover(
// Check that the timestamp is above our retained timestamp.
stats, err := replicationutils.GetStreamIngestionStatsNoHeartbeat(ctx, details, progress)
if err != nil {
return err
return hlc.Timestamp{}, err
}
if stats.IngestionDetails.ProtectedTimestampRecordID == nil {
return errors.Newf("replicated tenant %q (%d) has not yet recorded a retained timestamp",
return hlc.Timestamp{}, errors.Newf("replicated tenant %q (%d) has not yet recorded a retained timestamp",
tenantName, tenInfo.ID)
} else {
record, err := ptp.GetRecord(ctx, *stats.IngestionDetails.ProtectedTimestampRecordID)
if err != nil {
return err
return hlc.Timestamp{}, err
}
if cutoverTime.Less(record.Timestamp) {
return errors.Newf("cutover time %s is before earliest safe cutover time %s", cutoverTime, record.Timestamp)
return hlc.Timestamp{}, errors.Newf("cutover time %s is before earliest safe cutover time %s",
cutoverTime, record.Timestamp)
}
}
if err := completeStreamIngestion(ctx, jobRegistry, txn, tenInfo.TenantReplicationJobID, cutoverTime); err != nil {
return err
return hlc.Timestamp{}, err
}

return nil
return cutoverTime, nil
}

func alterTenantOptions(
Expand Down
53 changes: 53 additions & 0 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,59 @@ import (
"github.com/stretchr/testify/require"
)

func TestAlterTenantCompleteToTime(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs

c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
defer cleanup()
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(t, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
}

func TestAlterTenantCompleteToLatest(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
args := replicationtestutils.DefaultTenantStreamingClustersArgs

c, cleanup := replicationtestutils.CreateTenantStreamingClusters(ctx, t, args)
defer cleanup()
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

highWater := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID))

var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`,
args.DestTenantName).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.GreaterOrEqual(t, cutoverOutput.GoTime(), highWater.GoTime())
require.LessOrEqual(t, cutoverOutput.GoTime(), c.SrcCluster.Server(0).Clock().Now().GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
}

func TestAlterTenantPauseResume(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ var versionsSingleton = func() keyedVersions {
var V23_1 = versionsSingleton[len(versionsSingleton)-1].Key

const (
BinaryMinSupportedVersionKey = V22_1
BinaryMinSupportedVersionKey = V22_2
)

// TODO(irfansharif): clusterversion.binary{,MinimumSupported}Version
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/create_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -155,6 +156,8 @@ SELECT nextval(105:::REGCLASS);`,
func TestGatingCreateFunction(t *testing.T) {
defer leaktest.AfterTest(t)()

skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup")

t.Run("new_schema_changer_version_enabled", func(t *testing.T) {
params, _ := tests.CreateTestServerParams()
// Override binary version to be older.
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/sst_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -78,6 +79,8 @@ func makePebbleSST(t testing.TB, kvs []MVCCKeyValue, ingestion bool) []byte {
func TestMakeIngestionWriterOptions(t *testing.T) {
defer leaktest.AfterTest(t)()

skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup")

testCases := []struct {
name string
st *cluster.Settings
Expand Down
3 changes: 3 additions & 0 deletions pkg/upgrade/upgrades/alter_sql_instances_locality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -36,6 +37,8 @@ func TestAlterSystemSqlInstancesTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup")

clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -37,6 +38,8 @@ func TestAlterSystemStatementStatisticsTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup")

clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Expand Down
3 changes: 3 additions & 0 deletions pkg/upgrade/upgrades/builtins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -27,6 +28,8 @@ func TestIsAtLeastVersionBuiltin(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup")

clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -40,6 +41,8 @@ import (
func TestSchemaTelemetrySchedule(t *testing.T) {
defer leaktest.AfterTest(t)()

skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup")

// We want to ensure that the migration will succeed when run again.
// To ensure that it will, we inject a failure when trying to mark
// the upgrade as complete when forceRetry is true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -36,6 +37,8 @@ func TestSampledStmtDiagReqsMigration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup")

clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Expand Down
5 changes: 5 additions & 0 deletions pkg/upgrade/upgrades/wait_for_schema_changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -39,6 +40,8 @@ func TestWaitForSchemaChangeMigration(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup")

ctx := context.Background()
testCases := []struct {
name string
Expand Down Expand Up @@ -196,6 +199,8 @@ func TestWaitForSchemaChangeMigrationSynthetic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup")

ctx := context.Background()

upsertJob := func(sqlDB *gosql.DB, typ string, status string) error {
Expand Down

0 comments on commit 618ea05

Please sign in to comment.