Skip to content

Commit

Permalink
streamingest: fix ALTER TENANT REPLICATION output
Browse files Browse the repository at this point in the history
ALTER TENANT PAUSE/RESUME: currently prints the job id, and this PR removes
the output.

ALTER TENANT COMPLETE: currently prints the job id. This PR changes the output
to be the cutover timestamp. The rational is that the user may not know the
cutover time because LATEST or NOW() etc. was used.

Epic: CRDB-18752

Release note: None
  • Loading branch information
lidorcarmel committed Jan 6, 2023
1 parent e973b55 commit 01d22ea
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 13 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
17 changes: 15 additions & 2 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -147,8 +148,11 @@ func (c *TenantStreamingClusters) Cutover(
producerJobID, ingestionJobID int, cutoverTime time.Time,
) {
// Cut over the ingestion job and the job will stop eventually.
c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime)
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := DecimalTimeToHLC(c.T, cutoverStr)
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
}
Expand Down Expand Up @@ -356,6 +360,15 @@ func RunningStatus(t *testing.T, sqlRunner *sqlutils.SQLRunner, ingestionJobID i
return p.RunningStatus
}

func DecimalTimeToHLC(t *testing.T, s string) hlc.Timestamp {
t.Helper()
d, _, err := apd.NewFromString(s)
require.NoError(t, err)
ts, err := hlc.DecimalToHLC(d)
require.NoError(t, err)
return ts
}

// GetStreamJobIds returns the jod ids of the producer and ingestion jobs.
func GetStreamJobIds(
t *testing.T,
Expand Down
14 changes: 9 additions & 5 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (

const alterReplicationJobOp = "ALTER TENANT REPLICATION"

var alterReplicationJobHeader = colinfo.ResultColumns{
{Name: "replication_job_id", Typ: types.Int},
var alterReplicationCutoverHeader = colinfo.ResultColumns{
{Name: "cutover_time", Typ: types.Decimal},
}

// ResolvedTenantReplicationOptions represents options from an
Expand Down Expand Up @@ -96,9 +96,10 @@ func alterReplicationJobTypeCheck(
return false, nil, err
}
}
return true, alterReplicationCutoverHeader, nil
}

return true, alterReplicationJobHeader, nil
return true, nil, nil
}

func alterReplicationJobHook(
Expand Down Expand Up @@ -168,6 +169,7 @@ func alterReplicationJobHook(
p.ExecCfg().ProtectedTimestampProvider, alterTenantStmt, tenInfo, cutoverTime); err != nil {
return err
}
resultsCh <- tree.Datums{eval.TimestampToDecimalDatum(cutoverTime)}
} else if !alterTenantStmt.Options.IsDefault() {
if err := alterTenantOptions(ctx, p.Txn(), jobRegistry, options, tenInfo); err != nil {
return err
Expand All @@ -187,10 +189,12 @@ func alterReplicationJobHook(
return errors.New("unsupported job command in ALTER TENANT REPLICATION")
}
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(tenInfo.TenantReplicationJobID))}
return nil
}
return fn, alterReplicationJobHeader, nil, false, nil
if alterTenantStmt.Cutover != nil {
return fn, alterReplicationCutoverHeader, nil, false, nil
}
return fn, nil, nil, false, nil
}

func alterTenantJobCutover(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ func TestAlterTenantPauseResume(t *testing.T) {
var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, args.DestTenantName, cutoverTime)
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(t, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
cleanupTenant := c.CreateDestTenantSQL(ctx)
defer func() {
Expand Down
14 changes: 10 additions & 4 deletions pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,11 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) {
var cutoverTime time.Time
alternateSrcSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime)
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(c.T, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

// The destroyed address should have been removed from the topology
Expand Down Expand Up @@ -580,8 +583,11 @@ func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) {
// Destroy the source cluster
c.SrcCleanup()

c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime.AsOfSystemTime())
var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
c.Args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(c.T, cutoverTime, cutoverOutput)

// Resume ingestion.
c.DestSysSQL.Exec(t, fmt.Sprintf("RESUME JOB %d", ingestionJobID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ INSERT INTO d.t2 VALUES (2);
`)

replicationtestutils.WaitUntilStartTimeReached(t, destSQL, jobspb.JobID(ingestionJobID))
var cutoverStr string
cutoverTime := timeutil.Now().Round(time.Microsecond)
destSQL.Exec(t, `ALTER TENANT "destination-tenant" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, hlc.Timestamp{WallTime: cutoverTime.UnixNano()}.AsOfSystemTime())
destSQL.QueryRow(t, `ALTER TENANT "destination-tenant" COMPLETE REPLICATION TO SYSTEM TIME $1::string`,
hlc.Timestamp{WallTime: cutoverTime.UnixNano()}.AsOfSystemTime()).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.Equal(t, cutoverTime, cutoverOutput.GoTime())
jobutils.WaitForJobToSucceed(t, destSQL, jobspb.JobID(ingestionJobID))
jobutils.WaitForJobToSucceed(t, sourceDBRunner, jobspb.JobID(streamProducerJobID))

Expand Down

0 comments on commit 01d22ea

Please sign in to comment.