From 01d22ea7447b2e009c2786b789b28ae7094bca46 Mon Sep 17 00:00:00 2001 From: Lidor Carmel Date: Mon, 19 Dec 2022 14:41:25 -0800 Subject: [PATCH] streamingest: fix ALTER TENANT REPLICATION output ALTER TENANT PAUSE/RESUME: currently prints the job id, and this PR removes the output. ALTER TENANT COMPLETE: currently prints the job id. This PR changes the output to be the cutover timestamp. The rational is that the user may not know the cutover time because LATEST or NOW() etc. was used. Epic: CRDB-18752 Release note: None --- .../replicationtestutils/BUILD.bazel | 1 + .../replicationtestutils/testutils.go | 17 +++++++++++++++-- .../streamingest/alter_replication_job.go | 14 +++++++++----- .../streamingest/alter_replication_job_test.go | 6 +++++- .../streamingest/replication_stream_e2e_test.go | 14 ++++++++++---- .../streamingest/stream_ingestion_job_test.go | 6 +++++- 6 files changed, 45 insertions(+), 13 deletions(-) diff --git a/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel b/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel index 5034ec8c4831..15222ce818af 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel +++ b/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel @@ -69,6 +69,7 @@ go_library( "//pkg/util/hlc", "//pkg/util/protoutil", "//pkg/util/syncutil", + "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/pkg/ccl/streamingccl/replicationtestutils/testutils.go b/pkg/ccl/streamingccl/replicationtestutils/testutils.go index b0a0ef2529a8..36b71ea19146 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/testutils.go +++ b/pkg/ccl/streamingccl/replicationtestutils/testutils.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -147,8 +148,11 @@ func (c *TenantStreamingClusters) Cutover( producerJobID, ingestionJobID int, cutoverTime time.Time, ) { // Cut over the ingestion job and the job will stop eventually. - c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, - c.Args.DestTenantName, cutoverTime) + var cutoverStr string + c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, + c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr) + cutoverOutput := DecimalTimeToHLC(c.T, cutoverStr) + require.Equal(c.T, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) jobutils.WaitForJobToSucceed(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID)) } @@ -356,6 +360,15 @@ func RunningStatus(t *testing.T, sqlRunner *sqlutils.SQLRunner, ingestionJobID i return p.RunningStatus } +func DecimalTimeToHLC(t *testing.T, s string) hlc.Timestamp { + t.Helper() + d, _, err := apd.NewFromString(s) + require.NoError(t, err) + ts, err := hlc.DecimalToHLC(d) + require.NoError(t, err) + return ts +} + // GetStreamJobIds returns the jod ids of the producer and ingestion jobs. func GetStreamJobIds( t *testing.T, diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index 6b853f781861..d386e1bcb3e4 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -36,8 +36,8 @@ import ( const alterReplicationJobOp = "ALTER TENANT REPLICATION" -var alterReplicationJobHeader = colinfo.ResultColumns{ - {Name: "replication_job_id", Typ: types.Int}, +var alterReplicationCutoverHeader = colinfo.ResultColumns{ + {Name: "cutover_time", Typ: types.Decimal}, } // ResolvedTenantReplicationOptions represents options from an @@ -96,9 +96,10 @@ func alterReplicationJobTypeCheck( return false, nil, err } } + return true, alterReplicationCutoverHeader, nil } - return true, alterReplicationJobHeader, nil + return true, nil, nil } func alterReplicationJobHook( @@ -168,6 +169,7 @@ func alterReplicationJobHook( p.ExecCfg().ProtectedTimestampProvider, alterTenantStmt, tenInfo, cutoverTime); err != nil { return err } + resultsCh <- tree.Datums{eval.TimestampToDecimalDatum(cutoverTime)} } else if !alterTenantStmt.Options.IsDefault() { if err := alterTenantOptions(ctx, p.Txn(), jobRegistry, options, tenInfo); err != nil { return err @@ -187,10 +189,12 @@ func alterReplicationJobHook( return errors.New("unsupported job command in ALTER TENANT REPLICATION") } } - resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(tenInfo.TenantReplicationJobID))} return nil } - return fn, alterReplicationJobHeader, nil, false, nil + if alterTenantStmt.Cutover != nil { + return fn, alterReplicationCutoverHeader, nil, false, nil + } + return fn, nil, nil, false, nil } func alterTenantJobCutover( diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go index 227914c9c930..c992cee762ec 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go @@ -49,7 +49,11 @@ func TestAlterTenantPauseResume(t *testing.T) { var cutoverTime time.Time c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) - c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, args.DestTenantName, cutoverTime) + var cutoverStr string + c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, + args.DestTenantName, cutoverTime).Scan(&cutoverStr) + cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) + require.Equal(t, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) cleanupTenant := c.CreateDestTenantSQL(ctx) defer func() { diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index 55651ce03f7c..62da996cf841 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -532,8 +532,11 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { var cutoverTime time.Time alternateSrcSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) - c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, - c.Args.DestTenantName, cutoverTime) + var cutoverStr string + c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, + c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr) + cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) + require.Equal(c.T, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) // The destroyed address should have been removed from the topology @@ -580,8 +583,11 @@ func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) { // Destroy the source cluster c.SrcCleanup() - c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, - c.Args.DestTenantName, cutoverTime.AsOfSystemTime()) + var cutoverStr string + c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, + c.Args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr) + cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) + require.Equal(c.T, cutoverTime, cutoverOutput) // Resume ingestion. c.DestSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", ingestionJobID)) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 4d3d618b2551..18acce6f7532 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -136,8 +136,12 @@ INSERT INTO d.t2 VALUES (2); `) replicationtestutils.WaitUntilStartTimeReached(t, destSQL, jobspb.JobID(ingestionJobID)) + var cutoverStr string cutoverTime := timeutil.Now().Round(time.Microsecond) - destSQL.Exec(t, `ALTER TENANT "destination-tenant" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}.AsOfSystemTime()) + destSQL.QueryRow(t, `ALTER TENANT "destination-tenant" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, + hlc.Timestamp{WallTime: cutoverTime.UnixNano()}.AsOfSystemTime()).Scan(&cutoverStr) + cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) + require.Equal(t, cutoverTime, cutoverOutput.GoTime()) jobutils.WaitForJobToSucceed(t, destSQL, jobspb.JobID(ingestionJobID)) jobutils.WaitForJobToSucceed(t, sourceDBRunner, jobspb.JobID(streamProducerJobID))