Skip to content

Commit

Permalink
streamingccl: store replicated time in details
Browse files Browse the repository at this point in the history
The cutover process uses the progress field to record how many ranges
need to be reverted. This, however, wipes out the high watermark that
was previously stored in that progress field.

Here, we move the canonical copy of the high watermark to the progress
details.

For clarity, we rename "high watermark" as "replicated time".

Further, we delete a long skipped test that isn't providing much
value.

Epic: none

Release note: None
  • Loading branch information
stevendanna committed May 25, 2023
1 parent 1c7bedf commit 327947f
Show file tree
Hide file tree
Showing 25 changed files with 332 additions and 648 deletions.
66 changes: 33 additions & 33 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,18 @@ func FingerprintTenantAtTimestampNoHistory(
return db.QueryStr(t, fingerprintQuery, tenantID)[0][0]
}

// WaitUntilHighWatermark waits for the ingestion job high watermark to reach the given high watermark.
func (c *TenantStreamingClusters) WaitUntilHighWatermark(
highWatermark hlc.Timestamp, ingestionJobID jobspb.JobID,
// WaitUntilReplicatedTime waits for the ingestion job high watermark
// to reach the given target time.
func (c *TenantStreamingClusters) WaitUntilReplicatedTime(
targetTime hlc.Timestamp, ingestionJobID jobspb.JobID,
) {
testutils.SucceedsSoon(c.T, func() error {
progress := jobutils.GetJobProgress(c.T, c.DestSysSQL, ingestionJobID)
if progress.GetHighWater() == nil {
return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s",
highWatermark.String())
}
highwater := *progress.GetHighWater()
if highwater.Less(highWatermark) {
return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s",
highwater.String(), highWatermark.String())
}
return nil
})
WaitUntilReplicatedTime(c.T, targetTime, c.DestSysSQL, ingestionJobID)
}

// WaitUntilStartTimeReached waits for the ingestion replicated time
// to reach the recorded start time of the job.
func (c *TenantStreamingClusters) WaitUntilStartTimeReached(ingestionJobID jobspb.JobID) {
WaitUntilStartTimeReached(c.T, c.DestSysSQL, ingestionJobID)
}

// Cutover sets the cutover timestamp on the replication job causing the job to
Expand Down Expand Up @@ -333,12 +328,6 @@ func (c *TenantStreamingClusters) SrcExec(exec srcInitExecFunc) {
exec(c.T, c.SrcSysSQL, c.SrcTenantSQL)
}

// WaitUntilStartTimeReached waits for the ingestion job high watermark to reach
// the recorded start time of the job.
func (c *TenantStreamingClusters) WaitUntilStartTimeReached(ingestionJobID jobspb.JobID) {
WaitUntilStartTimeReached(c.T, c.DestSysSQL, ingestionJobID)
}

func WaitUntilStartTimeReached(t *testing.T, db *sqlutils.SQLRunner, ingestionJobID jobspb.JobID) {
timeout := 45 * time.Second
if skip.NightlyStress() {
Expand All @@ -358,20 +347,31 @@ func WaitUntilStartTimeReached(t *testing.T, db *sqlutils.SQLRunner, ingestionJo
return errors.New("ingestion start time not yet recorded")
}

progress := jobutils.GetJobProgress(t, db, ingestionJobID)
if progress.GetHighWater() == nil {
return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s",
startTime.String())
}
highwater := *progress.GetHighWater()
if highwater.Less(startTime) {
return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s",
highwater.String(), startTime.String())
}
return nil
return requireReplicatedTime(startTime, jobutils.GetJobProgress(t, db, ingestionJobID))
}, timeout)
}

func WaitUntilReplicatedTime(
t *testing.T, targetTime hlc.Timestamp, db *sqlutils.SQLRunner, ingestionJobID jobspb.JobID,
) {
testutils.SucceedsSoon(t, func() error {
return requireReplicatedTime(targetTime, jobutils.GetJobProgress(t, db, ingestionJobID))
})
}

func requireReplicatedTime(targetTime hlc.Timestamp, progress *jobspb.Progress) error {
replicatedTime := replicationutils.ReplicatedTimeFromProgress(progress)
if replicatedTime.IsEmpty() {
return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s",
targetTime)
}
if replicatedTime.Less(targetTime) {
return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s",
replicatedTime, targetTime)
}
return nil
}

func CreateScatteredTable(t *testing.T, c *TenantStreamingClusters, numNodes int) {
// Create a source table with multiple ranges spread across multiple nodes
numRanges := 50
Expand Down
12 changes: 9 additions & 3 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,11 @@ func GetStreamIngestionStatsNoHeartbeat(
IngestionDetails: &streamIngestionDetails,
IngestionProgress: jobProgress.GetStreamIngest(),
}
if highwater := jobProgress.GetHighWater(); highwater != nil && !highwater.IsEmpty() {

replicatedTime := ReplicatedTimeFromProgress(&jobProgress)
if !replicatedTime.IsEmpty() {
lagInfo := &streampb.StreamIngestionStats_ReplicationLagInfo{
MinIngestedTimestamp: *highwater,
MinIngestedTimestamp: replicatedTime,
}
lagInfo.EarliestCheckpointedTimestamp = hlc.MaxTimestamp
lagInfo.LatestCheckpointedTimestamp = hlc.MinTimestamp
Expand All @@ -154,12 +156,16 @@ func GetStreamIngestionStatsNoHeartbeat(
}
lagInfo.SlowestFastestIngestionLag = lagInfo.LatestCheckpointedTimestamp.GoTime().
Sub(lagInfo.EarliestCheckpointedTimestamp.GoTime())
lagInfo.ReplicationLag = timeutil.Since(highwater.GoTime())
lagInfo.ReplicationLag = timeutil.Since(replicatedTime.GoTime())
stats.ReplicationLagInfo = lagInfo
}
return stats, nil
}

func ReplicatedTimeFromProgress(p *jobspb.Progress) hlc.Timestamp {
return p.Details.(*jobspb.Progress_StreamIngest).StreamIngest.ReplicatedTime
}

func GetStreamIngestionStats(
ctx context.Context,
streamIngestionDetails jobspb.StreamIngestionDetails,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Client interface {
// open its subscription to its partition of a larger stream.
// TODO(dt): ts -> checkpointToken.
Subscribe(ctx context.Context, streamID streampb.StreamID, spec SubscriptionToken,
initialScanTime hlc.Timestamp, previousHighWater hlc.Timestamp) (Subscription, error)
initialScanTime hlc.Timestamp, previousReplicatedTime hlc.Timestamp) (Subscription, error)

// Close releases all the resources used by this client.
Close(ctx context.Context) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (p *partitionedStreamClient) Subscribe(
streamID streampb.StreamID,
spec SubscriptionToken,
initialScanTime hlc.Timestamp,
previousHighWater hlc.Timestamp,
previousReplicatedTime hlc.Timestamp,
) (Subscription, error) {
_, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe")
defer sp.Finish()
Expand All @@ -212,7 +212,7 @@ func (p *partitionedStreamClient) Subscribe(
return nil, err
}
sps.InitialScanTimestamp = initialScanTime
sps.PreviousHighWaterTimestamp = previousHighWater
sps.PreviousReplicatedTimestamp = previousReplicatedTime

specBytes, err := protoutil.Marshal(&sps)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ go_test(
"rangekey_batcher_test.go",
"replication_random_client_test.go",
"replication_stream_e2e_test.go",
"stream_ingestion_frontier_processor_test.go",
"stream_ingestion_job_test.go",
"stream_ingestion_processor_test.go",
],
Expand Down Expand Up @@ -143,7 +142,6 @@ go_test(
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/upgrade/upgradebase",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,14 @@ func alterTenantJobCutover(
return hlc.Timestamp{}, errors.Newf("job with id %d is not a stream ingestion job", job.ID())
}
progress := job.Progress()

if alterTenantStmt.Cutover.Latest {
ts := progress.GetHighWater()
if ts == nil || ts.IsEmpty() {
replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress)
if replicatedTime.IsEmpty() {
return hlc.Timestamp{},
errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName)
}
cutoverTime = *ts
cutoverTime = replicatedTime
}

// TODO(ssd): We could use the replication manager here, but
Expand Down
32 changes: 16 additions & 16 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestAlterTenantCompleteToTime(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)
Expand All @@ -68,14 +68,14 @@ func TestAlterTenantCompleteToLatest(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

highWater := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID))
targetReplicatedTime := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(targetReplicatedTime, jobspb.JobID(ingestionJobID))

var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`,
args.DestTenantName).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.GreaterOrEqual(t, cutoverOutput.GoTime(), highWater.GoTime())
require.GreaterOrEqual(t, cutoverOutput.GoTime(), targetReplicatedTime.GoTime())
require.LessOrEqual(t, cutoverOutput.GoTime(), c.SrcCluster.Server(0).Clock().Now().GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
}
Expand All @@ -94,7 +94,7 @@ func TestAlterTenantPauseResume(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

// Pause the replication job.
c.DestSysSQL.Exec(t, `ALTER TENANT $1 PAUSE REPLICATION`, args.DestTenantName)
Expand All @@ -104,7 +104,7 @@ func TestAlterTenantPauseResume(t *testing.T) {
c.DestSysSQL.Exec(t, `ALTER TENANT $1 RESUME REPLICATION`, args.DestTenantName)
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

Expand Down Expand Up @@ -176,12 +176,12 @@ func TestAlterTenantUpdateExistingCutoverTime(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

highWater := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID))
replicatedTimeTarget := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(replicatedTimeTarget, jobspb.JobID(ingestionJobID))

// First cutover to a future time.
var cutoverStr string
cutoverTime := highWater.Add(time.Hour.Nanoseconds(), 0)
cutoverTime := replicatedTimeTarget.Add(time.Hour.Nanoseconds(), 0)
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
Expand All @@ -190,7 +190,7 @@ func TestAlterTenantUpdateExistingCutoverTime(t *testing.T) {
require.Equal(t, cutoverOutput, getCutoverTime())

// And cutover to an even further time.
cutoverTime = highWater.Add((time.Hour * 2).Nanoseconds(), 0)
cutoverTime = replicatedTimeTarget.Add((time.Hour * 2).Nanoseconds(), 0)
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr)
cutoverOutput = replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestAlterTenantFailUpdatingCutoverTime(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand Down Expand Up @@ -337,7 +337,7 @@ func TestTenantStatusWithFutureCutoverTime(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand All @@ -351,7 +351,7 @@ func TestTenantStatusWithFutureCutoverTime(t *testing.T) {
c.DestSysSQL.Exec(t, `ALTER TENANT $1 RESUME REPLICATION`, args.DestTenantName)
unblockResumerStart()
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand Down Expand Up @@ -422,7 +422,7 @@ func TestTenantStatusWithLatestCutoverTime(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand Down Expand Up @@ -459,7 +459,7 @@ func TestTenantReplicationStatus(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

registry := c.DestSysServer.JobRegistry().(*jobs.Registry)

Expand Down Expand Up @@ -496,7 +496,7 @@ func TestAlterTenantHandleFutureProtectedTimestamp(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`, args.DestTenantName)
}
20 changes: 9 additions & 11 deletions pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
// tenant. This operation will fail the test if it is run prior to the
// replication stream activating the tenant.
//
// - wait-until-high-watermark ts=<ts>
// - wait-until-replicated-time ts=<ts>
// Wait until the replication job has reached the specified timestamp.
//
// - cutover ts=<ts>
Expand Down Expand Up @@ -119,22 +119,20 @@ func TestDataDriven(t *testing.T) {
case "start-replication-stream":
ds.producerJobID, ds.replicationJobID = ds.replicationClusters.StartStreamReplication(ctx)

case "wait-until-high-watermark":
var highWaterMark string
d.ScanArgs(t, "ts", &highWaterMark)
varValue, ok := ds.vars[highWaterMark]
case "wait-until-replicated-time":
var replicatedTimeTarget string
d.ScanArgs(t, "ts", &replicatedTimeTarget)
varValue, ok := ds.vars[replicatedTimeTarget]
if ok {
highWaterMark = varValue
replicatedTimeTarget = varValue
}
timestamp, _, err := tree.ParseDTimestamp(nil, highWaterMark, time.Microsecond)
timestamp, _, err := tree.ParseDTimestamp(nil, replicatedTimeTarget, time.Microsecond)
require.NoError(t, err)
hw := hlc.Timestamp{WallTime: timestamp.UnixNano()}
ds.replicationClusters.WaitUntilHighWatermark(hw, jobspb.JobID(ds.replicationJobID))

ds.replicationClusters.WaitUntilReplicatedTime(hlc.Timestamp{WallTime: timestamp.UnixNano()},
jobspb.JobID(ds.replicationJobID))
case "start-replicated-tenant":
cleanupTenant := ds.replicationClusters.CreateDestTenantSQL(ctx)
ds.cleanupFns = append(ds.cleanupFns, cleanupTenant)

case "let":
if len(d.CmdArgs) == 0 {
t.Fatalf("Must specify at least one variable name.")
Expand Down
Loading

0 comments on commit 327947f

Please sign in to comment.