Skip to content

Commit

Permalink
streamingccl: store replicated time in details
Browse files Browse the repository at this point in the history
The cutover process uses the progress field to record how many ranges
need to be reverted. This, however, wipes out the high watermark that
was previously stored in that progress field.

Here, we move the canonical copy of the high watermark to the progress
details.

For clarity, we rename "high watermark" as "replicated time".

Further, we delete a long skipped test that isn't providing much
value.

Epic: none

Release note: None
  • Loading branch information
stevendanna committed May 24, 2023
1 parent 775b005 commit 0b2499a
Show file tree
Hide file tree
Showing 23 changed files with 313 additions and 627 deletions.
66 changes: 33 additions & 33 deletions pkg/ccl/streamingccl/replicationtestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,18 @@ func FingerprintTenantAtTimestampNoHistory(
return db.QueryStr(t, fingerprintQuery, tenantID)[0][0]
}

// WaitUntilHighWatermark waits for the ingestion job high watermark to reach the given high watermark.
func (c *TenantStreamingClusters) WaitUntilHighWatermark(
highWatermark hlc.Timestamp, ingestionJobID jobspb.JobID,
// WaitUntilReplicatedTime waits for the ingestion job high watermark
// to reach the given target time.
func (c *TenantStreamingClusters) WaitUntilReplicatedTime(
targetTime hlc.Timestamp, ingestionJobID jobspb.JobID,
) {
testutils.SucceedsSoon(c.T, func() error {
progress := jobutils.GetJobProgress(c.T, c.DestSysSQL, ingestionJobID)
if progress.GetHighWater() == nil {
return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s",
highWatermark.String())
}
highwater := *progress.GetHighWater()
if highwater.Less(highWatermark) {
return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s",
highwater.String(), highWatermark.String())
}
return nil
})
WaitUntilReplicatedTime(c.T, targetTime, c.DestSysSQL, ingestionJobID)
}

// WaitUntilStartTimeReached waits for the ingestion replicated time
// to reach the recorded start time of the job.
func (c *TenantStreamingClusters) WaitUntilStartTimeReached(ingestionJobID jobspb.JobID) {
WaitUntilStartTimeReached(c.T, c.DestSysSQL, ingestionJobID)
}

// Cutover sets the cutover timestamp on the replication job causing the job to
Expand Down Expand Up @@ -333,12 +328,6 @@ func (c *TenantStreamingClusters) SrcExec(exec srcInitExecFunc) {
exec(c.T, c.SrcSysSQL, c.SrcTenantSQL)
}

// WaitUntilStartTimeReached waits for the ingestion job high watermark to reach
// the recorded start time of the job.
func (c *TenantStreamingClusters) WaitUntilStartTimeReached(ingestionJobID jobspb.JobID) {
WaitUntilStartTimeReached(c.T, c.DestSysSQL, ingestionJobID)
}

func WaitUntilStartTimeReached(t *testing.T, db *sqlutils.SQLRunner, ingestionJobID jobspb.JobID) {
timeout := 45 * time.Second
if skip.NightlyStress() {
Expand All @@ -358,20 +347,31 @@ func WaitUntilStartTimeReached(t *testing.T, db *sqlutils.SQLRunner, ingestionJo
return errors.New("ingestion start time not yet recorded")
}

progress := jobutils.GetJobProgress(t, db, ingestionJobID)
if progress.GetHighWater() == nil {
return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s",
startTime.String())
}
highwater := *progress.GetHighWater()
if highwater.Less(startTime) {
return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s",
highwater.String(), startTime.String())
}
return nil
return requireReplicatedTime(startTime, jobutils.GetJobProgress(t, db, ingestionJobID))
}, timeout)
}

func WaitUntilReplicatedTime(
t *testing.T, targetTime hlc.Timestamp, db *sqlutils.SQLRunner, ingestionJobID jobspb.JobID,
) {
testutils.SucceedsSoon(t, func() error {
return requireReplicatedTime(targetTime, jobutils.GetJobProgress(t, db, ingestionJobID))
})
}

func requireReplicatedTime(targetTime hlc.Timestamp, progress *jobspb.Progress) error {
replicatedTime := progress.GetDetails().(*jobspb.Progress_StreamIngest).StreamIngest.ReplicatedTime
if replicatedTime.IsEmpty() {
return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s",
targetTime)
}
if replicatedTime.Less(targetTime) {
return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s",
replicatedTime, targetTime)
}
return nil
}

func CreateScatteredTable(t *testing.T, c *TenantStreamingClusters, numNodes int) {
// Create a source table with multiple ranges spread across multiple nodes
numRanges := 50
Expand Down
8 changes: 5 additions & 3 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,11 @@ func GetStreamIngestionStatsNoHeartbeat(
IngestionDetails: &streamIngestionDetails,
IngestionProgress: jobProgress.GetStreamIngest(),
}
if highwater := jobProgress.GetHighWater(); highwater != nil && !highwater.IsEmpty() {
replicatedTime := jobProgress.Details.(*jobspb.Progress_StreamIngest).StreamIngest.ReplicatedTime

if !replicatedTime.IsEmpty() {
lagInfo := &streampb.StreamIngestionStats_ReplicationLagInfo{
MinIngestedTimestamp: *highwater,
MinIngestedTimestamp: replicatedTime,
}
lagInfo.EarliestCheckpointedTimestamp = hlc.MaxTimestamp
lagInfo.LatestCheckpointedTimestamp = hlc.MinTimestamp
Expand All @@ -154,7 +156,7 @@ func GetStreamIngestionStatsNoHeartbeat(
}
lagInfo.SlowestFastestIngestionLag = lagInfo.LatestCheckpointedTimestamp.GoTime().
Sub(lagInfo.EarliestCheckpointedTimestamp.GoTime())
lagInfo.ReplicationLag = timeutil.Since(highwater.GoTime())
lagInfo.ReplicationLag = timeutil.Since(replicatedTime.GoTime())
stats.ReplicationLagInfo = lagInfo
}
return stats, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type Client interface {
// open its subscription to its partition of a larger stream.
// TODO(dt): ts -> checkpointToken.
Subscribe(ctx context.Context, streamID streampb.StreamID, spec SubscriptionToken,
initialScanTime hlc.Timestamp, previousHighWater hlc.Timestamp) (Subscription, error)
initialScanTime hlc.Timestamp, previousReplicatedTime hlc.Timestamp) (Subscription, error)

// Close releases all the resources used by this client.
Close(ctx context.Context) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (p *partitionedStreamClient) Subscribe(
streamID streampb.StreamID,
spec SubscriptionToken,
initialScanTime hlc.Timestamp,
previousHighWater hlc.Timestamp,
previousReplicatedTime hlc.Timestamp,
) (Subscription, error) {
_, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe")
defer sp.Finish()
Expand All @@ -212,7 +212,7 @@ func (p *partitionedStreamClient) Subscribe(
return nil, err
}
sps.InitialScanTimestamp = initialScanTime
sps.PreviousHighWaterTimestamp = previousHighWater
sps.PreviousReplicatedTimestamp = previousReplicatedTime

specBytes, err := protoutil.Marshal(&sps)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ go_test(
"rangekey_batcher_test.go",
"replication_random_client_test.go",
"replication_stream_e2e_test.go",
"stream_ingestion_frontier_processor_test.go",
"stream_ingestion_job_test.go",
"stream_ingestion_processor_test.go",
],
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,14 @@ func alterTenantJobCutover(
return hlc.Timestamp{}, errors.Newf("job with id %d is not a stream ingestion job", job.ID())
}
progress := job.Progress()

if alterTenantStmt.Cutover.Latest {
ts := progress.GetHighWater()
if ts == nil || ts.IsEmpty() {
replicatedTime := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest.ReplicatedTime
if replicatedTime.IsEmpty() {
return hlc.Timestamp{},
errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName)
}
cutoverTime = *ts
cutoverTime = replicatedTime
}

// TODO(ssd): We could use the replication manager here, but
Expand Down
32 changes: 16 additions & 16 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestAlterTenantCompleteToTime(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)
Expand All @@ -68,14 +68,14 @@ func TestAlterTenantCompleteToLatest(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

highWater := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID))
targetReplicatedTime := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(targetReplicatedTime, jobspb.JobID(ingestionJobID))

var cutoverStr string
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`,
args.DestTenantName).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
require.GreaterOrEqual(t, cutoverOutput.GoTime(), highWater.GoTime())
require.GreaterOrEqual(t, cutoverOutput.GoTime(), targetReplicatedTime.GoTime())
require.LessOrEqual(t, cutoverOutput.GoTime(), c.SrcCluster.Server(0).Clock().Now().GoTime())
jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
}
Expand All @@ -94,7 +94,7 @@ func TestAlterTenantPauseResume(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

// Pause the replication job.
c.DestSysSQL.Exec(t, `ALTER TENANT $1 PAUSE REPLICATION`, args.DestTenantName)
Expand All @@ -104,7 +104,7 @@ func TestAlterTenantPauseResume(t *testing.T) {
c.DestSysSQL.Exec(t, `ALTER TENANT $1 RESUME REPLICATION`, args.DestTenantName)
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
var cutoverTime time.Time
c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime)

Expand Down Expand Up @@ -176,12 +176,12 @@ func TestAlterTenantUpdateExistingCutoverTime(t *testing.T) {
jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

highWater := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID))
replicatedTimeTarget := c.SrcCluster.Server(0).Clock().Now()
c.WaitUntilReplicatedTime(replicatedTimeTarget, jobspb.JobID(ingestionJobID))

// First cutover to a future time.
var cutoverStr string
cutoverTime := highWater.Add(time.Hour.Nanoseconds(), 0)
cutoverTime := replicatedTimeTarget.Add(time.Hour.Nanoseconds(), 0)
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr)
cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
Expand All @@ -190,7 +190,7 @@ func TestAlterTenantUpdateExistingCutoverTime(t *testing.T) {
require.Equal(t, cutoverOutput, getCutoverTime())

// And cutover to an even further time.
cutoverTime = highWater.Add((time.Hour * 2).Nanoseconds(), 0)
cutoverTime = replicatedTimeTarget.Add((time.Hour * 2).Nanoseconds(), 0)
c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`,
args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr)
cutoverOutput = replicationtestutils.DecimalTimeToHLC(t, cutoverStr)
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestAlterTenantFailUpdatingCutoverTime(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand Down Expand Up @@ -337,7 +337,7 @@ func TestTenantStatusWithFutureCutoverTime(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand All @@ -351,7 +351,7 @@ func TestTenantStatusWithFutureCutoverTime(t *testing.T) {
c.DestSysSQL.Exec(t, `ALTER TENANT $1 RESUME REPLICATION`, args.DestTenantName)
unblockResumerStart()
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand Down Expand Up @@ -422,7 +422,7 @@ func TestTenantStatusWithLatestCutoverTime(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

require.Equal(c.T, "replicating", getTenantStatus())

Expand Down Expand Up @@ -459,7 +459,7 @@ func TestTenantReplicationStatus(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

registry := c.DestSysServer.JobRegistry().(*jobs.Registry)

Expand Down Expand Up @@ -496,7 +496,7 @@ func TestAlterTenantHandleFutureProtectedTimestamp(t *testing.T) {

jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))
c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID))

c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`, args.DestTenantName)
}
20 changes: 9 additions & 11 deletions pkg/ccl/streamingccl/streamingest/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
// tenant. This operation will fail the test if it is run prior to the
// replication stream activating the tenant.
//
// - wait-until-high-watermark ts=<ts>
// - wait-until-replicated-time ts=<ts>
// Wait until the replication job has reached the specified timestamp.
//
// - cutover ts=<ts>
Expand Down Expand Up @@ -119,22 +119,20 @@ func TestDataDriven(t *testing.T) {
case "start-replication-stream":
ds.producerJobID, ds.replicationJobID = ds.replicationClusters.StartStreamReplication(ctx)

case "wait-until-high-watermark":
var highWaterMark string
d.ScanArgs(t, "ts", &highWaterMark)
varValue, ok := ds.vars[highWaterMark]
case "wait-until-replicated-time":
var replicatedTimeTarget string
d.ScanArgs(t, "ts", &replicatedTimeTarget)
varValue, ok := ds.vars[replicatedTimeTarget]
if ok {
highWaterMark = varValue
replicatedTimeTarget = varValue
}
timestamp, _, err := tree.ParseDTimestamp(nil, highWaterMark, time.Microsecond)
timestamp, _, err := tree.ParseDTimestamp(nil, replicatedTimeTarget, time.Microsecond)
require.NoError(t, err)
hw := hlc.Timestamp{WallTime: timestamp.UnixNano()}
ds.replicationClusters.WaitUntilHighWatermark(hw, jobspb.JobID(ds.replicationJobID))

ds.replicationClusters.WaitUntilReplicatedTime(hlc.Timestamp{WallTime: timestamp.UnixNano()},
jobspb.JobID(ds.replicationJobID))
case "start-replicated-tenant":
cleanupTenant := ds.replicationClusters.CreateDestTenantSQL(ctx)
ds.cleanupFns = append(ds.cleanupFns, cleanupTenant)

case "let":
if len(d.CmdArgs) == 0 {
t.Fatalf("Must specify at least one variable name.")
Expand Down
25 changes: 13 additions & 12 deletions pkg/ccl/streamingccl/streamingest/replication_random_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,19 @@ import (
"github.com/stretchr/testify/require"
)

func getHighWaterMark(ingestionJobID int, sqlDB *gosql.DB) (*hlc.Timestamp, error) {
func getReplicatedTime(ingestionJobID int, sqlDB *gosql.DB) (hlc.Timestamp, error) {
var progressBytes []byte
if err := sqlDB.QueryRow(
`SELECT progress FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJobID,
).Scan(&progressBytes); err != nil {
return nil, err
return hlc.Timestamp{}, err
}
var payload jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &payload); err != nil {
return nil, err
var progress jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
return hlc.Timestamp{}, err
}
return payload.GetHighWater(), nil
rt := progress.GetDetails().(*jobspb.Progress_StreamIngest).StreamIngest.ReplicatedTime
return rt, nil
}

func getTestRandomClientURI(tenantID roachpb.TenantID, tenantName roachpb.TenantName) string {
Expand Down Expand Up @@ -274,22 +275,22 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) {
close(canBeCompletedCh)

// Ensure that the job has made some progress.
var highwater hlc.Timestamp
var replicatedTime hlc.Timestamp
testutils.SucceedsSoon(t, func() error {
hw, err := getHighWaterMark(ingestionJobID, conn)
var err error
replicatedTime, err = getReplicatedTime(ingestionJobID, conn)
require.NoError(t, err)
if hw == nil {
return errors.New("highwatermark is unset, no progress has been reported")
if replicatedTime.IsEmpty() {
return errors.New("ReplicatedTime is unset, no progress has been reported")
}
highwater = *hw
return nil
})

// Cutting over the job should shutdown the ingestion processors via a context
// cancellation, and subsequently rollback data above our frontier timestamp.
//
// Pick a cutover time just before the latest resolved timestamp.
cutoverTime := timeutil.Unix(0, highwater.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond)
cutoverTime := timeutil.Unix(0, replicatedTime.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond)
_, err = conn.Exec(`ALTER TENANT "30" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, cutoverTime)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit 0b2499a

Please sign in to comment.