From 522536404b0096a65961dc301e1a94550b159f01 Mon Sep 17 00:00:00 2001 From: Aaron Zinger Date: Tue, 23 May 2023 14:56:57 -0400 Subject: [PATCH 1/4] changefeedccl: assert that partitioning is consistent across versions The following commit will freeze the partitioning logic in its current state. This commit ensures that that change, as well as future library updates, don't accidentally change the hashing logic. For example, Sarama intends to change how it handles negative values in a future update. Release note: None --- pkg/ccl/changefeedccl/sink_test.go | 32 ++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index f26801522bb2..6f9d46d14a63 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -871,3 +871,35 @@ func TestSinkConfigParsing(t *testing.T) { require.ErrorContains(t, err, "invalid character 's' looking for beginning of value") }) } + +func TestChangefeedConsistentPartitioning(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // We test that these arbitrary strings get mapped to these + // arbitrary partitions to ensure that if an upgrade occurs + // while a changefeed is running, partitioning remains the same + // and therefore ordering guarantees are preserved. Changing + // these values is a breaking change. + referencePartitions := map[string]int32{ + "0": 1003, + "01": 351, + "10": 940, + "a": 292, + "\x00": 732, + "\xff \xff": 164, + } + longString1 := strings.Repeat("a", 2048) + referencePartitions[longString1] = 755 + longString2 := strings.Repeat("a", 2047) + "A" + referencePartitions[longString2] = 592 + + partitioner := newChangefeedPartitioner("topic1") + + for key, expected := range referencePartitions { + actual, err := partitioner.Partition(&sarama.ProducerMessage{Key: sarama.ByteEncoder(key)}, 1031) + require.NoError(t, err) + require.Equal(t, expected, actual) + } + +} From 361c142f6a1dacdf91a8efeac181bd6934593efc Mon Sep 17 00:00:00 2001 From: Aaron Zinger Date: Tue, 23 May 2023 16:24:51 -0400 Subject: [PATCH 2/4] changefeedccl: explicitly set hashing algorithm in Sarama This is a no-op change meant to defend against future changes in the default Sarama hashing algorithm. Fixes #101779. Release note: None --- pkg/ccl/changefeedccl/sink_kafka.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 5c9c28f355e0..7834a9b8b11f 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -15,6 +15,7 @@ import ( "crypto/x509" "encoding/json" "fmt" + "hash/fnv" "math" "net/url" "strings" @@ -711,9 +712,7 @@ var _ sarama.Partitioner = &changefeedPartitioner{} var _ sarama.PartitionerConstructor = newChangefeedPartitioner func newChangefeedPartitioner(topic string) sarama.Partitioner { - return &changefeedPartitioner{ - hash: sarama.NewHashPartitioner(topic), - } + return sarama.NewCustomHashPartitioner(fnv.New32a)(topic) } func (p *changefeedPartitioner) RequiresConsistency() bool { return true } From 327947fb87028f068093726a606b04057d33146f Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 24 May 2023 12:34:58 +0100 Subject: [PATCH 3/4] streamingccl: store replicated time in details The cutover process uses the progress field to record how many ranges need to be reverted. This, however, wipes out the high watermark that was previously stored in that progress field. Here, we move the canonical copy of the high watermark to the progress details. For clarity, we rename "high watermark" as "replicated time". Further, we delete a long skipped test that isn't providing much value. Epic: none Release note: None --- .../replicationtestutils/testutils.go | 66 +-- .../streamingccl/replicationutils/utils.go | 12 +- pkg/ccl/streamingccl/streamclient/client.go | 2 +- .../streamclient/partitioned_stream_client.go | 4 +- pkg/ccl/streamingccl/streamingest/BUILD.bazel | 2 - .../streamingest/alter_replication_job.go | 7 +- .../alter_replication_job_test.go | 32 +- .../streamingest/datadriven_test.go | 20 +- .../replication_random_client_test.go | 25 +- .../replication_stream_e2e_test.go | 40 +- .../streamingest/stream_ingestion_dist.go | 59 ++- .../stream_ingestion_frontier_processor.go | 82 ++-- ...tream_ingestion_frontier_processor_test.go | 388 ------------------ .../streamingest/stream_ingestion_job.go | 31 +- .../streamingest/stream_ingestion_job_test.go | 58 ++- .../stream_ingestion_processor.go | 66 ++- .../streamingest/testdata/cutover_after_pause | 2 +- .../streamingccl/streamingest/testdata/simple | 2 +- .../streamproducer/event_stream.go | 6 +- .../streamproducer/replication_stream_test.go | 14 +- pkg/cmd/roachtest/tests/BUILD.bazel | 1 + pkg/cmd/roachtest/tests/cluster_to_cluster.go | 40 +- pkg/jobs/jobspb/jobs.proto | 5 + pkg/repstream/streampb/stream.proto | 6 +- pkg/sql/execinfrapb/processors_bulk_io.proto | 10 +- 25 files changed, 332 insertions(+), 648 deletions(-) delete mode 100644 pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go diff --git a/pkg/ccl/streamingccl/replicationtestutils/testutils.go b/pkg/ccl/streamingccl/replicationtestutils/testutils.go index 3b95c4090e84..64a6fa1018d0 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/testutils.go +++ b/pkg/ccl/streamingccl/replicationtestutils/testutils.go @@ -150,23 +150,18 @@ func FingerprintTenantAtTimestampNoHistory( return db.QueryStr(t, fingerprintQuery, tenantID)[0][0] } -// WaitUntilHighWatermark waits for the ingestion job high watermark to reach the given high watermark. -func (c *TenantStreamingClusters) WaitUntilHighWatermark( - highWatermark hlc.Timestamp, ingestionJobID jobspb.JobID, +// WaitUntilReplicatedTime waits for the ingestion job high watermark +// to reach the given target time. +func (c *TenantStreamingClusters) WaitUntilReplicatedTime( + targetTime hlc.Timestamp, ingestionJobID jobspb.JobID, ) { - testutils.SucceedsSoon(c.T, func() error { - progress := jobutils.GetJobProgress(c.T, c.DestSysSQL, ingestionJobID) - if progress.GetHighWater() == nil { - return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s", - highWatermark.String()) - } - highwater := *progress.GetHighWater() - if highwater.Less(highWatermark) { - return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s", - highwater.String(), highWatermark.String()) - } - return nil - }) + WaitUntilReplicatedTime(c.T, targetTime, c.DestSysSQL, ingestionJobID) +} + +// WaitUntilStartTimeReached waits for the ingestion replicated time +// to reach the recorded start time of the job. +func (c *TenantStreamingClusters) WaitUntilStartTimeReached(ingestionJobID jobspb.JobID) { + WaitUntilStartTimeReached(c.T, c.DestSysSQL, ingestionJobID) } // Cutover sets the cutover timestamp on the replication job causing the job to @@ -333,12 +328,6 @@ func (c *TenantStreamingClusters) SrcExec(exec srcInitExecFunc) { exec(c.T, c.SrcSysSQL, c.SrcTenantSQL) } -// WaitUntilStartTimeReached waits for the ingestion job high watermark to reach -// the recorded start time of the job. -func (c *TenantStreamingClusters) WaitUntilStartTimeReached(ingestionJobID jobspb.JobID) { - WaitUntilStartTimeReached(c.T, c.DestSysSQL, ingestionJobID) -} - func WaitUntilStartTimeReached(t *testing.T, db *sqlutils.SQLRunner, ingestionJobID jobspb.JobID) { timeout := 45 * time.Second if skip.NightlyStress() { @@ -358,20 +347,31 @@ func WaitUntilStartTimeReached(t *testing.T, db *sqlutils.SQLRunner, ingestionJo return errors.New("ingestion start time not yet recorded") } - progress := jobutils.GetJobProgress(t, db, ingestionJobID) - if progress.GetHighWater() == nil { - return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s", - startTime.String()) - } - highwater := *progress.GetHighWater() - if highwater.Less(startTime) { - return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s", - highwater.String(), startTime.String()) - } - return nil + return requireReplicatedTime(startTime, jobutils.GetJobProgress(t, db, ingestionJobID)) }, timeout) } +func WaitUntilReplicatedTime( + t *testing.T, targetTime hlc.Timestamp, db *sqlutils.SQLRunner, ingestionJobID jobspb.JobID, +) { + testutils.SucceedsSoon(t, func() error { + return requireReplicatedTime(targetTime, jobutils.GetJobProgress(t, db, ingestionJobID)) + }) +} + +func requireReplicatedTime(targetTime hlc.Timestamp, progress *jobspb.Progress) error { + replicatedTime := replicationutils.ReplicatedTimeFromProgress(progress) + if replicatedTime.IsEmpty() { + return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s", + targetTime) + } + if replicatedTime.Less(targetTime) { + return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s", + replicatedTime, targetTime) + } + return nil +} + func CreateScatteredTable(t *testing.T, c *TenantStreamingClusters, numNodes int) { // Create a source table with multiple ranges spread across multiple nodes numRanges := 50 diff --git a/pkg/ccl/streamingccl/replicationutils/utils.go b/pkg/ccl/streamingccl/replicationutils/utils.go index d0e65f75b447..5f8070d25794 100644 --- a/pkg/ccl/streamingccl/replicationutils/utils.go +++ b/pkg/ccl/streamingccl/replicationutils/utils.go @@ -136,9 +136,11 @@ func GetStreamIngestionStatsNoHeartbeat( IngestionDetails: &streamIngestionDetails, IngestionProgress: jobProgress.GetStreamIngest(), } - if highwater := jobProgress.GetHighWater(); highwater != nil && !highwater.IsEmpty() { + + replicatedTime := ReplicatedTimeFromProgress(&jobProgress) + if !replicatedTime.IsEmpty() { lagInfo := &streampb.StreamIngestionStats_ReplicationLagInfo{ - MinIngestedTimestamp: *highwater, + MinIngestedTimestamp: replicatedTime, } lagInfo.EarliestCheckpointedTimestamp = hlc.MaxTimestamp lagInfo.LatestCheckpointedTimestamp = hlc.MinTimestamp @@ -154,12 +156,16 @@ func GetStreamIngestionStatsNoHeartbeat( } lagInfo.SlowestFastestIngestionLag = lagInfo.LatestCheckpointedTimestamp.GoTime(). Sub(lagInfo.EarliestCheckpointedTimestamp.GoTime()) - lagInfo.ReplicationLag = timeutil.Since(highwater.GoTime()) + lagInfo.ReplicationLag = timeutil.Since(replicatedTime.GoTime()) stats.ReplicationLagInfo = lagInfo } return stats, nil } +func ReplicatedTimeFromProgress(p *jobspb.Progress) hlc.Timestamp { + return p.Details.(*jobspb.Progress_StreamIngest).StreamIngest.ReplicatedTime +} + func GetStreamIngestionStats( ctx context.Context, streamIngestionDetails jobspb.StreamIngestionDetails, diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 0721cafc7707..08ac8abe88bc 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -77,7 +77,7 @@ type Client interface { // open its subscription to its partition of a larger stream. // TODO(dt): ts -> checkpointToken. Subscribe(ctx context.Context, streamID streampb.StreamID, spec SubscriptionToken, - initialScanTime hlc.Timestamp, previousHighWater hlc.Timestamp) (Subscription, error) + initialScanTime hlc.Timestamp, previousReplicatedTime hlc.Timestamp) (Subscription, error) // Close releases all the resources used by this client. Close(ctx context.Context) error diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index c2f829c8114c..a99f3aabfacf 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -202,7 +202,7 @@ func (p *partitionedStreamClient) Subscribe( streamID streampb.StreamID, spec SubscriptionToken, initialScanTime hlc.Timestamp, - previousHighWater hlc.Timestamp, + previousReplicatedTime hlc.Timestamp, ) (Subscription, error) { _, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe") defer sp.Finish() @@ -212,7 +212,7 @@ func (p *partitionedStreamClient) Subscribe( return nil, err } sps.InitialScanTimestamp = initialScanTime - sps.PreviousHighWaterTimestamp = previousHighWater + sps.PreviousReplicatedTimestamp = previousReplicatedTime specBytes, err := protoutil.Marshal(&sps) if err != nil { diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index dff505ee525e..d58bc675b5fb 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -88,7 +88,6 @@ go_test( "rangekey_batcher_test.go", "replication_random_client_test.go", "replication_stream_e2e_test.go", - "stream_ingestion_frontier_processor_test.go", "stream_ingestion_job_test.go", "stream_ingestion_processor_test.go", ], @@ -143,7 +142,6 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", - "//pkg/upgrade/upgradebase", "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index 7879ce03f36d..86ba6e777464 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -234,13 +234,14 @@ func alterTenantJobCutover( return hlc.Timestamp{}, errors.Newf("job with id %d is not a stream ingestion job", job.ID()) } progress := job.Progress() + if alterTenantStmt.Cutover.Latest { - ts := progress.GetHighWater() - if ts == nil || ts.IsEmpty() { + replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress) + if replicatedTime.IsEmpty() { return hlc.Timestamp{}, errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName) } - cutoverTime = *ts + cutoverTime = replicatedTime } // TODO(ssd): We could use the replication manager here, but diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go index 48d1d72afdb2..d5cabb448ad6 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go @@ -41,7 +41,7 @@ func TestAlterTenantCompleteToTime(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) var cutoverTime time.Time c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) @@ -68,14 +68,14 @@ func TestAlterTenantCompleteToLatest(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - highWater := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID)) + targetReplicatedTime := c.SrcCluster.Server(0).Clock().Now() + c.WaitUntilReplicatedTime(targetReplicatedTime, jobspb.JobID(ingestionJobID)) var cutoverStr string c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`, args.DestTenantName).Scan(&cutoverStr) cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) - require.GreaterOrEqual(t, cutoverOutput.GoTime(), highWater.GoTime()) + require.GreaterOrEqual(t, cutoverOutput.GoTime(), targetReplicatedTime.GoTime()) require.LessOrEqual(t, cutoverOutput.GoTime(), c.SrcCluster.Server(0).Clock().Now().GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) } @@ -94,7 +94,7 @@ func TestAlterTenantPauseResume(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) // Pause the replication job. c.DestSysSQL.Exec(t, `ALTER TENANT $1 PAUSE REPLICATION`, args.DestTenantName) @@ -104,7 +104,7 @@ func TestAlterTenantPauseResume(t *testing.T) { c.DestSysSQL.Exec(t, `ALTER TENANT $1 RESUME REPLICATION`, args.DestTenantName) jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) var cutoverTime time.Time c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) @@ -176,12 +176,12 @@ func TestAlterTenantUpdateExistingCutoverTime(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - highWater := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID)) + replicatedTimeTarget := c.SrcCluster.Server(0).Clock().Now() + c.WaitUntilReplicatedTime(replicatedTimeTarget, jobspb.JobID(ingestionJobID)) // First cutover to a future time. var cutoverStr string - cutoverTime := highWater.Add(time.Hour.Nanoseconds(), 0) + cutoverTime := replicatedTimeTarget.Add(time.Hour.Nanoseconds(), 0) c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr) cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) @@ -190,7 +190,7 @@ func TestAlterTenantUpdateExistingCutoverTime(t *testing.T) { require.Equal(t, cutoverOutput, getCutoverTime()) // And cutover to an even further time. - cutoverTime = highWater.Add((time.Hour * 2).Nanoseconds(), 0) + cutoverTime = replicatedTimeTarget.Add((time.Hour * 2).Nanoseconds(), 0) c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr) cutoverOutput = replicationtestutils.DecimalTimeToHLC(t, cutoverStr) @@ -242,7 +242,7 @@ func TestAlterTenantFailUpdatingCutoverTime(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) require.Equal(c.T, "replicating", getTenantStatus()) @@ -337,7 +337,7 @@ func TestTenantStatusWithFutureCutoverTime(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) require.Equal(c.T, "replicating", getTenantStatus()) @@ -351,7 +351,7 @@ func TestTenantStatusWithFutureCutoverTime(t *testing.T) { c.DestSysSQL.Exec(t, `ALTER TENANT $1 RESUME REPLICATION`, args.DestTenantName) unblockResumerStart() jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) require.Equal(c.T, "replicating", getTenantStatus()) @@ -422,7 +422,7 @@ func TestTenantStatusWithLatestCutoverTime(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) require.Equal(c.T, "replicating", getTenantStatus()) @@ -459,7 +459,7 @@ func TestTenantReplicationStatus(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) registry := c.DestSysServer.JobRegistry().(*jobs.Registry) @@ -496,7 +496,7 @@ func TestAlterTenantHandleFutureProtectedTimestamp(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`, args.DestTenantName) } diff --git a/pkg/ccl/streamingccl/streamingest/datadriven_test.go b/pkg/ccl/streamingccl/streamingest/datadriven_test.go index 8766d5fe8a2f..995c7057b78b 100644 --- a/pkg/ccl/streamingccl/streamingest/datadriven_test.go +++ b/pkg/ccl/streamingccl/streamingest/datadriven_test.go @@ -44,7 +44,7 @@ import ( // tenant. This operation will fail the test if it is run prior to the // replication stream activating the tenant. // -// - wait-until-high-watermark ts= +// - wait-until-replicated-time ts= // Wait until the replication job has reached the specified timestamp. // // - cutover ts= @@ -119,22 +119,20 @@ func TestDataDriven(t *testing.T) { case "start-replication-stream": ds.producerJobID, ds.replicationJobID = ds.replicationClusters.StartStreamReplication(ctx) - case "wait-until-high-watermark": - var highWaterMark string - d.ScanArgs(t, "ts", &highWaterMark) - varValue, ok := ds.vars[highWaterMark] + case "wait-until-replicated-time": + var replicatedTimeTarget string + d.ScanArgs(t, "ts", &replicatedTimeTarget) + varValue, ok := ds.vars[replicatedTimeTarget] if ok { - highWaterMark = varValue + replicatedTimeTarget = varValue } - timestamp, _, err := tree.ParseDTimestamp(nil, highWaterMark, time.Microsecond) + timestamp, _, err := tree.ParseDTimestamp(nil, replicatedTimeTarget, time.Microsecond) require.NoError(t, err) - hw := hlc.Timestamp{WallTime: timestamp.UnixNano()} - ds.replicationClusters.WaitUntilHighWatermark(hw, jobspb.JobID(ds.replicationJobID)) - + ds.replicationClusters.WaitUntilReplicatedTime(hlc.Timestamp{WallTime: timestamp.UnixNano()}, + jobspb.JobID(ds.replicationJobID)) case "start-replicated-tenant": cleanupTenant := ds.replicationClusters.CreateDestTenantSQL(ctx) ds.cleanupFns = append(ds.cleanupFns, cleanupTenant) - case "let": if len(d.CmdArgs) == 0 { t.Fatalf("Must specify at least one variable name.") diff --git a/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go b/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go index f644e193ac7d..8ceaa15b32bf 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // To start tenants. "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -48,18 +49,18 @@ import ( "github.com/stretchr/testify/require" ) -func getHighWaterMark(ingestionJobID int, sqlDB *gosql.DB) (*hlc.Timestamp, error) { +func getReplicatedTime(ingestionJobID int, sqlDB *gosql.DB) (hlc.Timestamp, error) { var progressBytes []byte if err := sqlDB.QueryRow( `SELECT progress FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJobID, ).Scan(&progressBytes); err != nil { - return nil, err + return hlc.Timestamp{}, err } - var payload jobspb.Progress - if err := protoutil.Unmarshal(progressBytes, &payload); err != nil { - return nil, err + var progress jobspb.Progress + if err := protoutil.Unmarshal(progressBytes, &progress); err != nil { + return hlc.Timestamp{}, err } - return payload.GetHighWater(), nil + return replicationutils.ReplicatedTimeFromProgress(&progress), nil } func getTestRandomClientURI(tenantID roachpb.TenantID, tenantName roachpb.TenantName) string { @@ -274,14 +275,14 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { close(canBeCompletedCh) // Ensure that the job has made some progress. - var highwater hlc.Timestamp + var replicatedTime hlc.Timestamp testutils.SucceedsSoon(t, func() error { - hw, err := getHighWaterMark(ingestionJobID, conn) + var err error + replicatedTime, err = getReplicatedTime(ingestionJobID, conn) require.NoError(t, err) - if hw == nil { - return errors.New("highwatermark is unset, no progress has been reported") + if replicatedTime.IsEmpty() { + return errors.New("ReplicatedTime is unset, no progress has been reported") } - highwater = *hw return nil }) @@ -289,7 +290,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { // cancellation, and subsequently rollback data above our frontier timestamp. // // Pick a cutover time just before the latest resolved timestamp. - cutoverTime := timeutil.Unix(0, highwater.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond) + cutoverTime := timeutil.Unix(0, replicatedTime.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond) _, err = conn.Exec(`ALTER TENANT "30" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, cutoverTime) require.NoError(t, err) diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index 2aba38f43bfa..b3a934fdba72 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -61,7 +61,7 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) stats := replicationutils.TestingGetStreamIngestionStatsFromReplicationJob(t, ctx, c.DestSysSQL, ingestionJobID) @@ -118,7 +118,7 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) // Pause ingestion. @@ -144,7 +144,7 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) { // Confirm that dest tenant has received the new change after resumption. srcTime = c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) } @@ -284,7 +284,7 @@ func TestTenantStreamingCheckpoint(t *testing.T) { jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) cutoverTime := c.DestSysServer.Clock().Now() - c.WaitUntilHighWatermark(cutoverTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID)) c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false) cutoverFingerprint := c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime()) @@ -318,7 +318,7 @@ func TestTenantStreamingCancelIngestion(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) if cancelAfterPaused { @@ -388,7 +388,7 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) if cancelAfterPaused { c.DestSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID)) jobutils.WaitForJobToPause(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) @@ -452,7 +452,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.DestSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID) jobutils.WaitForJobToPause(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) @@ -541,7 +541,7 @@ func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) { c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID)) cutoverTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(cutoverTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID)) // Pause ingestion. c.DestSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID)) @@ -577,7 +577,7 @@ func TestTenantStreamingDeleteRange(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) srcTime := c.SrcSysServer.Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) // Introduce a DeleteRange on t1 and t2. @@ -609,7 +609,7 @@ func TestTenantStreamingDeleteRange(t *testing.T) { tableSpan.Key, tableSpan.Key.Next().Next())) } srcTimeAfterDelRange := c.SrcSysServer.Clock().Now() - c.WaitUntilHighWatermark(srcTimeAfterDelRange, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTimeAfterDelRange, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTimeAfterDelRange.AsOfSystemTime()) c.RequireFingerprintMatchAtTimestamp(srcTimeBeforeDelRange.AsOfSystemTime()) @@ -747,9 +747,13 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { if err != nil { return err } - require.True(t, frontier.LessEq(*progress.GetHighWater())) - frontier := progress.GetHighWater().GoTime().Round(time.Millisecond) - window := frontier.Sub(rec.Timestamp.GoTime().Round(time.Millisecond)) + + replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress) + require.True(t, frontier.LessEq(replicatedTime)) + + roundedReplicatedTime := replicatedTime.GoTime().Round(time.Millisecond) + roundedProtectedTime := rec.Timestamp.GoTime().Round(time.Millisecond) + window := roundedReplicatedTime.Sub(roundedProtectedTime) require.Equal(t, time.Second, window) return nil })) @@ -772,7 +776,7 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { // important because if `frontier@t2 - ReplicationTTLSeconds < t1` then we // will not update the PTS record. now := c.SrcCluster.Server(0).Clock().Now().Add(int64(time.Second)*2, 0) - c.WaitUntilHighWatermark(now, jobspb.JobID(replicationJobID)) + c.WaitUntilReplicatedTime(now, jobspb.JobID(replicationJobID)) // Check that the producer and replication job have written a protected // timestamp. @@ -780,7 +784,7 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { checkDestinationProtection(c, now, replicationJobID) now2 := now.Add(time.Second.Nanoseconds(), 0) - c.WaitUntilHighWatermark(now2, jobspb.JobID(replicationJobID)) + c.WaitUntilReplicatedTime(now2, jobspb.JobID(replicationJobID)) // Let the replication progress for a second before checking that the // protected timestamp record has also been updated on the destination // cluster. This update happens in the same txn in which we update the @@ -862,8 +866,8 @@ func TestTenantStreamingShowTenant(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - highWatermark := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(highWatermark, jobspb.JobID(ingestionJobID)) + targetReplicatedTime := c.SrcCluster.Server(0).Clock().Now() + c.WaitUntilReplicatedTime(targetReplicatedTime, jobspb.JobID(ingestionJobID)) destRegistry := c.DestCluster.Server(0).JobRegistry().(*jobs.Registry) details, err := destRegistry.LoadJob(ctx, jobspb.JobID(ingestionJobID)) require.NoError(t, err) @@ -894,7 +898,7 @@ func TestTenantStreamingShowTenant(t *testing.T) { require.Equal(t, ingestionJobID, jobId) require.Less(t, maxReplTime, timeutil.Now()) require.Less(t, protectedTime, timeutil.Now()) - require.GreaterOrEqual(t, maxReplTime, highWatermark.GoTime()) + require.GreaterOrEqual(t, maxReplTime, targetReplicatedTime.GoTime()) require.GreaterOrEqual(t, protectedTime, replicationDetails.ReplicationStartTime.GoTime()) require.Nil(t, cutoverTime) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index eab265f9723b..0570ea5e3d22 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -37,24 +37,27 @@ func startDistIngestion( ) error { details := ingestionJob.Details().(jobspb.StreamIngestionDetails) - progress := ingestionJob.Progress() + streamProgress := ingestionJob.Progress().Details.(*jobspb.Progress_StreamIngest).StreamIngest - var previousHighWater, heartbeatTimestamp hlc.Timestamp + streamID := streampb.StreamID(details.StreamID) initialScanTimestamp := details.ReplicationStartTime + replicatedTime := streamProgress.ReplicatedTime + + if replicatedTime.IsEmpty() && initialScanTimestamp.IsEmpty() { + return jobs.MarkAsPermanentJobError(errors.AssertionFailedf("initial timestamp and replicated timestamp are both empty")) + } + // Start from the last checkpoint if it exists. - if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { - previousHighWater = *h - heartbeatTimestamp = previousHighWater + var heartbeatTimestamp hlc.Timestamp + if !replicatedTime.IsEmpty() { + heartbeatTimestamp = replicatedTime } else { heartbeatTimestamp = initialScanTimestamp } - log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s", - ingestionJob.ID(), heartbeatTimestamp) - - streamID := streampb.StreamID(details.StreamID) - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.InitializingReplication, - fmt.Sprintf("connecting to the producer job %d and resuming a stream replication plan", streamID)) + msg := fmt.Sprintf("resuming stream (producer job %d) from %s", + streamID, heartbeatTimestamp) + updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg) client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB) if err != nil { @@ -69,7 +72,7 @@ func startDistIngestion( return err } - log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID) + log.Infof(ctx, "producer job %d is active, planning DistSQL flow", streamID) dsp := execCtx.DistSQLPlanner() p, planCtx, err := makePlan( @@ -77,14 +80,12 @@ func startDistIngestion( ingestionJob, details, client, - previousHighWater, - progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest.Checkpoint, + replicatedTime, + streamProgress.Checkpoint, initialScanTimestamp)(ctx, dsp) if err != nil { return err } - log.Infof(ctx, "starting to run DistSQL flow for stream ingestion job %d", - ingestionJob.ID()) execPlan := func(ctx context.Context) error { ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil) @@ -112,9 +113,7 @@ func startDistIngestion( return rw.Err() } - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.Replicating, - "running the SQL flow for the stream ingestion job") - + updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "running replicating stream") // TODO(msbutler): Implement automatic replanning in the spirit of changefeed replanning. return execPlan(ctx) } @@ -126,7 +125,7 @@ func makePlan( ingestionJob *jobs.Job, details jobspb.StreamIngestionDetails, client streamclient.Client, - previousHighWater hlc.Timestamp, + previousReplicatedTime hlc.Timestamp, checkpoint jobspb.StreamIngestionCheckpoint, initialScanTimestamp hlc.Timestamp, ) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { @@ -158,7 +157,7 @@ func makePlan( topology, sqlInstanceIDs, initialScanTimestamp, - previousHighWater, + previousReplicatedTime, checkpoint, jobID, streamID, @@ -208,7 +207,7 @@ func constructStreamIngestionPlanSpecs( topology streamclient.Topology, sqlInstanceIDs []base.SQLInstanceID, initialScanTimestamp hlc.Timestamp, - previousHighWater hlc.Timestamp, + previousReplicatedTimestamp hlc.Timestamp, checkpoint jobspb.StreamIngestionCheckpoint, jobID jobspb.JobID, streamID streampb.StreamID, @@ -226,13 +225,13 @@ func constructStreamIngestionPlanSpecs( // the partition addresses. if i < len(sqlInstanceIDs) { spec := &execinfrapb.StreamIngestionDataSpec{ - StreamID: uint64(streamID), - JobID: int64(jobID), - PreviousHighWaterTimestamp: previousHighWater, - InitialScanTimestamp: initialScanTimestamp, - Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info - StreamAddress: string(streamAddress), - PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec), + StreamID: uint64(streamID), + JobID: int64(jobID), + PreviousReplicatedTimestamp: previousReplicatedTimestamp, + InitialScanTimestamp: initialScanTimestamp, + Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info + StreamAddress: string(streamAddress), + PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec), TenantRekey: execinfrapb.TenantRekey{ OldID: sourceTenantID, NewID: destinationTenantID, @@ -256,7 +255,7 @@ func constructStreamIngestionPlanSpecs( // Create a spec for the StreamIngestionFrontier processor on the coordinator // node. streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{ - HighWaterAtStart: previousHighWater, + ReplicatedTimeAtStart: previousReplicatedTimestamp, TrackedSpans: trackedSpans, JobID: int64(jobID), StreamID: uint64(streamID), diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index c7dd0ef97297..6c55fd73e4fc 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -57,9 +57,6 @@ type streamIngestionFrontier struct { // input returns rows from one or more streamIngestion processors. input execinfra.RowSource - // highWaterAtStart is the job high-water. It's used in an assertion that we - // never regress the job high-water. - highWaterAtStart hlc.Timestamp // frontier contains the current resolved timestamp high-water for the tracked // span set. @@ -72,9 +69,13 @@ type streamIngestionFrontier struct { // stream alive. heartbeatSender *heartbeatSender - // persistedHighWater stores the highwater mark of progress that is persisted - // in the job record. - persistedHighWater hlc.Timestamp + // replicatedTimeAtStart is the job's replicated time when + // this processor started. It's used in an assertion that we + // never regress the job's replicated time. + replicatedTimeAtStart hlc.Timestamp + // persistedReplicatedTime stores the highwater mark of + // progress that is persisted in the job record. + persistedReplicatedTime hlc.Timestamp lastPartitionUpdate time.Time partitionProgress map[string]jobspb.StreamIngestionProgress_PartitionProgress @@ -95,7 +96,7 @@ func newStreamIngestionFrontierProcessor( input execinfra.RowSource, post *execinfrapb.PostProcessSpec, ) (execinfra.Processor, error) { - frontier, err := span.MakeFrontierAt(spec.HighWaterAtStart, spec.TrackedSpans...) + frontier, err := span.MakeFrontierAt(spec.ReplicatedTimeAtStart, spec.TrackedSpans...) if err != nil { return nil, err } @@ -116,15 +117,15 @@ func newStreamIngestionFrontierProcessor( } } sf := &streamIngestionFrontier{ - flowCtx: flowCtx, - spec: spec, - input: input, - highWaterAtStart: spec.HighWaterAtStart, - frontier: frontier, - partitionProgress: partitionProgress, - metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics), - heartbeatSender: heartbeatSender, - persistedHighWater: spec.HighWaterAtStart, + flowCtx: flowCtx, + spec: spec, + input: input, + replicatedTimeAtStart: spec.ReplicatedTimeAtStart, + frontier: frontier, + partitionProgress: partitionProgress, + metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics), + heartbeatSender: heartbeatSender, + persistedReplicatedTime: spec.ReplicatedTimeAtStart, } if err := sf.Init( ctx, @@ -304,10 +305,10 @@ func (sf *streamIngestionFrontier) Next() ( case <-sf.Ctx().Done(): sf.MoveToDraining(sf.Ctx().Err()) return nil, sf.DrainHelper() - // Send the latest persisted highwater in the heartbeat to the source cluster + // Send the latest persisted replicated time in the heartbeat to the source cluster // as even with retries we will never request an earlier row than it, and // the source cluster is free to clean up earlier data. - case sf.heartbeatSender.frontierUpdates <- sf.persistedHighWater: + case sf.heartbeatSender.frontierUpdates <- sf.persistedReplicatedTime: // If heartbeatSender has error, it means remote has error, we want to // stop the processor. case <-sf.heartbeatSender.stoppedChan: @@ -371,10 +372,10 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps( // Inserting a timestamp less than the one the ingestion flow started at could // potentially regress the job progress. This is not expected and thus we // assert to catch such unexpected behavior. - if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(sf.highWaterAtStart) { + if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(sf.replicatedTimeAtStart) { return frontierChanged, errors.AssertionFailedf( `got a resolved timestamp %s that is less than the frontier processor start time %s`, - redact.Safe(resolved.Timestamp), redact.Safe(sf.highWaterAtStart)) + redact.Safe(resolved.Timestamp), redact.Safe(sf.replicatedTimeAtStart)) } changed, err := sf.frontier.Forward(resolved.Span, resolved.Timestamp) @@ -405,7 +406,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { return span.ContinueMatch }) - highWatermark := f.Frontier() + replicatedTime := f.Frontier() partitionProgress := sf.partitionProgress sf.lastPartitionUpdate = timeutil.Now() @@ -418,17 +419,20 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { } progress := md.Progress - // Keep the recorded highwater empty until some advancement has been made - if sf.highWaterAtStart.Less(highWatermark) { - progress.Progress = &jobspb.Progress_HighWater{ - HighWater: &highWatermark, - } - } - streamProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest streamProgress.PartitionProgress = partitionProgress streamProgress.Checkpoint.ResolvedSpans = frontierResolvedSpans + // Keep the recorded replicatedTime empty until some advancement has been made + if sf.replicatedTimeAtStart.Less(replicatedTime) { + streamProgress.ReplicatedTime = replicatedTime + // The HighWater is for informational purposes + // only. + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &replicatedTime, + } + } + ju.UpdateProgress(progress) // Reset RunStats.NumRuns to 1 since the stream ingestion has returned to @@ -439,9 +443,9 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { } // Update the protected timestamp record protecting the destination tenant's - // keyspan if the highWatermark has moved forward since the last time we + // keyspan if the replicatedTime has moved forward since the last time we // recorded progress. This makes older revisions of replicated values with a - // timestamp less than highWatermark - ReplicationTTLSeconds, eligible for + // timestamp less than replicatedTime - ReplicationTTLSeconds eligible for // garbage collection. replicationDetails := md.Payload.GetStreamIngestion() if replicationDetails.ProtectedTimestampRecordID == nil { @@ -453,22 +457,30 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { if err != nil { return err } - newProtectAbove := highWatermark.Add( + newProtectAbove := replicatedTime.Add( -int64(replicationDetails.ReplicationTTLSeconds)*time.Second.Nanoseconds(), 0) + + // If we have a CutoverTime set, keep the protected + // timestamp at or below the cutover time. + if !streamProgress.CutoverTime.IsEmpty() && streamProgress.CutoverTime.Less(newProtectAbove) { + newProtectAbove = streamProgress.CutoverTime + } + if record.Timestamp.Less(newProtectAbove) { return ptp.UpdateTimestamp(ctx, *replicationDetails.ProtectedTimestampRecordID, newProtectAbove) } + return nil }); err != nil { return err } sf.metrics.JobProgressUpdates.Inc(1) - sf.persistedHighWater = f.Frontier() + sf.persistedReplicatedTime = f.Frontier() sf.metrics.FrontierCheckpointSpanCount.Update(int64(len(frontierResolvedSpans))) - if !sf.persistedHighWater.IsEmpty() { - // Only update the frontier lag if the high water mark has been updated, + if !sf.persistedReplicatedTime.IsEmpty() { + // Only update the frontier lag if the replicated time has been updated, // implying the initial scan has completed. - sf.metrics.FrontierLagNanos.Update(timeutil.Since(sf.persistedHighWater.GoTime()).Nanoseconds()) + sf.metrics.FrontierLagNanos.Update(timeutil.Since(sf.persistedReplicatedTime.GoTime()).Nanoseconds()) } return nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go deleted file mode 100644 index 9b068fcb9299..000000000000 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ /dev/null @@ -1,388 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package streamingest - -import ( - "context" - "fmt" - "math" - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" - "github.com/cockroachdb/cockroach/pkg/sql/execinfra" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" - "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" - "github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/limit" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/stretchr/testify/require" -) - -type partitionToEvent map[string][]streamingccl.Event - -func TestStreamIngestionFrontierProcessor(t *testing.T) { - defer leaktest.AfterTest(t)() - skip.WithIssue(t, 87145, "flaky test") - - ctx := context.Background() - - tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - JobsTestingKnobs: &jobs.TestingKnobs{ - // We create a job record to track persistence but we don't want it to - // be adopted as the processors are being manually executed in the test. - DisableAdoptions: true, - }, - // DisableAdoptions needs this. - UpgradeManager: &upgradebase.TestingKnobs{ - DontUseJobs: true, - }, - }, - }, - }) - defer tc.Stopper().Stop(context.Background()) - - st := cluster.MakeTestingClusterSettings() - JobCheckpointFrequency.Override(ctx, &st.SV, 200*time.Millisecond) - streamingccl.StreamReplicationConsumerHeartbeatFrequency.Override(ctx, &st.SV, 100*time.Millisecond) - - evalCtx := eval.MakeTestingEvalContext(st) - - testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st) - defer testDiskMonitor.Stop(ctx) - - registry := tc.Server(0).JobRegistry().(*jobs.Registry) - flowCtx := execinfra.FlowCtx{ - Cfg: &execinfra.ServerConfig{ - Settings: st, - DB: tc.Server(0).InternalDB().(descs.DB), - JobRegistry: registry, - BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt), - }, - EvalCtx: &evalCtx, - Mon: evalCtx.TestingMon, - DiskMonitor: testDiskMonitor, - } - - post := execinfrapb.PostProcessSpec{} - - var spec execinfrapb.StreamIngestionDataSpec - // The stream address needs to be set with a scheme we support, but this test - // will mock out the actual client. - spec.StreamAddress = "randomgen://test/" - pa1 := "randomgen://test1/" - pa2 := "randomgen://test2/" - - pa1StartKey := roachpb.Key("key_1") - pa1Span := roachpb.Span{Key: pa1StartKey, EndKey: pa1StartKey.Next()} - pa2StartKey := roachpb.Key("key_2") - pa2Span := roachpb.Span{Key: pa2StartKey, EndKey: pa2StartKey.Next()} - - v := roachpb.MakeValueFromString("value_1") - v.Timestamp = hlc.Timestamp{WallTime: 1} - - const tenantID = 20 - sampleKV := func() roachpb.KeyValue { - key, err := keys.RewriteKeyToTenantPrefix(roachpb.Key("key_1"), - keys.MakeTenantPrefix(roachpb.MustMakeTenantID(tenantID))) - require.NoError(t, err) - return roachpb.KeyValue{Key: key, Value: v} - } - sampleCheckpoint := func(span roachpb.Span, ts int64) []jobspb.ResolvedSpan { - return []jobspb.ResolvedSpan{{Span: span, Timestamp: hlc.Timestamp{WallTime: ts}}} - } - sampleCheckpointWithLogicTS := func(span roachpb.Span, ts int64, logicalTs int32) []jobspb.ResolvedSpan { - return []jobspb.ResolvedSpan{{Span: span, Timestamp: hlc.Timestamp{WallTime: ts, Logical: logicalTs}}} - } - - for _, tc := range []struct { - name string - events partitionToEvent - expectedFrontierTimestamp hlc.Timestamp - frontierStartTime hlc.Timestamp - jobCheckpoint []jobspb.ResolvedSpan - }{ - { - name: "same-resolved-ts-across-partitions", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 1)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 4)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 1)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 4)), - }}, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 4}, - }, - { - // No progress should be reported to the job since partition 2 has not - // emitted a resolved ts. - name: "no-partition-checkpoints", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV()), - }, pa2: []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV()), - }}, - }, - { - // No progress should be reported to the job since partition 2 has not - // emitted a resolved ts. - name: "no-checkpoint-from-one-partition", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 1)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 4)), - }, pa2: []streamingccl.Event{}}, - }, - { - name: "one-partition-ahead-of-the-other", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 1)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 4)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 1)), - }}, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 1}, - }, - { - name: "some-interleaved-timestamps", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 2)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 4)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 3)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 5)), - }}, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 4}, - }, - { - name: "some-interleaved-logical-timestamps", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpointWithLogicTS(pa1Span, 1, 2)), - streamingccl.MakeCheckpointEvent(sampleCheckpointWithLogicTS(pa1Span, 1, 4)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpointWithLogicTS(pa2Span, 1, 1)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 2)), - }}, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 1, Logical: 4}, - }, - { - // The frontier should error out as it receives a checkpoint with a ts - // lower than its start time. - name: "partition-checkpoint-lower-than-start-ts", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpointWithLogicTS(pa1Span, 1, 4)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpointWithLogicTS(pa2Span, 1, 2)), - }}, - frontierStartTime: hlc.Timestamp{WallTime: 1, Logical: 3}, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 1, Logical: 3}, - }, - { - name: "existing-job-checkpoint", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 5)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 2)), - }}, - frontierStartTime: hlc.Timestamp{WallTime: 1}, - jobCheckpoint: []jobspb.ResolvedSpan{ - {Span: pa1Span, Timestamp: hlc.Timestamp{WallTime: 4}}, - {Span: pa2Span, Timestamp: hlc.Timestamp{WallTime: 3}}, - }, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 3}, - }, - } { - t.Run(tc.name, func(t *testing.T) { - topology := streamclient.Topology{ - Partitions: []streamclient.PartitionInfo{ - { - ID: pa1, - SubscriptionToken: []byte(pa1), - SrcAddr: streamingccl.PartitionAddress(pa1), - Spans: []roachpb.Span{pa1Span}, - }, - { - ID: pa2, - SubscriptionToken: []byte(pa2), - SrcAddr: streamingccl.PartitionAddress(pa2), - Spans: []roachpb.Span{pa2Span}, - }, - }, - } - - spec.PartitionSpecs = map[string]execinfrapb.StreamIngestionPartitionSpec{} - for _, partition := range topology.Partitions { - spec.PartitionSpecs[partition.ID] = execinfrapb.StreamIngestionPartitionSpec{ - PartitionID: partition.ID, - SubscriptionToken: string(partition.SubscriptionToken), - Address: string(partition.SrcAddr), - Spans: partition.Spans, - } - } - spec.TenantRekey = execinfrapb.TenantRekey{ - OldID: roachpb.MustMakeTenantID(tenantID), - NewID: roachpb.MustMakeTenantID(tenantID + 10), - } - spec.PreviousHighWaterTimestamp = tc.frontierStartTime - if tc.frontierStartTime.IsEmpty() { - spec.InitialScanTimestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - } - spec.Checkpoint.ResolvedSpans = tc.jobCheckpoint - proc, err := newStreamIngestionDataProcessor(ctx, &flowCtx, 0 /* processorID */, spec, &post) - require.NoError(t, err) - sip, ok := proc.(*streamIngestionProcessor) - if !ok { - t.Fatal("expected the processor that's created to be a stream ingestion processor") - } - - // Inject a mock client with the events being tested against. - doneCh := make(chan struct{}) - defer close(doneCh) - sip.forceClientForTests = &mockStreamClient{ - partitionEvents: tc.events, - doneCh: doneCh, - } - defer func() { - require.NoError(t, sip.forceClientForTests.Close(ctx)) - }() - - jobID := registry.MakeJobID() - - t.Logf("Using JobID: %v", jobID) - - // Create a frontier processor. - var frontierSpec execinfrapb.StreamIngestionFrontierSpec - frontierSpec.StreamAddresses = topology.StreamAddresses() - frontierSpec.TrackedSpans = []roachpb.Span{pa1Span, pa2Span} - frontierSpec.Checkpoint.ResolvedSpans = tc.jobCheckpoint - frontierSpec.JobID = int64(jobID) - - if !tc.frontierStartTime.IsEmpty() { - frontierSpec.HighWaterAtStart = tc.frontierStartTime - } - - // Create a mock ingestion job. - record := jobs.Record{ - JobID: jobID, - Description: "fake ingestion job", - Username: username.TestUserName(), - Details: jobspb.StreamIngestionDetails{StreamAddress: spec.StreamAddress}, - // We don't use this so it does not matter what we set it too, as long - // as it is non-nil. - Progress: jobspb.StreamIngestionProgress{}, - } - record.CreatedBy = &jobs.CreatedByInfo{ - Name: "ingestion", - } - _, err = registry.CreateJobWithTxn(ctx, record, record.JobID, nil) - require.NoError(t, err) - - frontierPost := execinfrapb.PostProcessSpec{} - frontierOut := distsqlutils.RowBuffer{} - frontierProc, err := newStreamIngestionFrontierProcessor( - ctx, &flowCtx, 0 /* processorID*/, frontierSpec, sip, &frontierPost, - ) - require.NoError(t, err) - fp, ok := frontierProc.(*streamIngestionFrontier) - if !ok { - t.Fatal("expected the processor that's created to be a stream ingestion frontier") - } - ctxWithCancel, cancel := context.WithCancel(ctx) - defer cancel() - - client := streamclient.GetRandomStreamClientSingletonForTesting() - defer func() { - require.NoError(t, client.Close(context.Background())) - }() - - client.ClearInterceptors() - - // Record heartbeats in a list and terminate the client once the expected - // frontier timestamp has been reached - heartbeats := make([]hlc.Timestamp, 0) - client.RegisterHeartbeatInterception(func(heartbeatTs hlc.Timestamp) { - heartbeats = append(heartbeats, heartbeatTs) - if tc.expectedFrontierTimestamp.LessEq(heartbeatTs) { - doneCh <- struct{}{} - } - - // Ensure we never heartbeat a value later than what is persisted - job, err := registry.LoadJob(ctx, jobID) - require.NoError(t, err) - progress := job.Progress().Progress - if progress == nil { - if !heartbeatTs.Equal(spec.InitialScanTimestamp) { - t.Fatalf("heartbeat %v should equal start time of %v", heartbeatTs, spec.InitialScanTimestamp) - } - } else { - persistedHighwater := *progress.(*jobspb.Progress_HighWater).HighWater - require.True(t, heartbeatTs.LessEq(persistedHighwater)) - } - }) - - fp.Run(ctxWithCancel, &frontierOut) - - if !frontierOut.ProducerClosed() { - t.Fatal("producer for StreamFrontierProcessor not closed") - } - - minCheckpointTs := hlc.Timestamp{} - for _, resolvedSpan := range tc.jobCheckpoint { - if minCheckpointTs.IsEmpty() || resolvedSpan.Timestamp.Less(minCheckpointTs) { - minCheckpointTs = resolvedSpan.Timestamp - } - - // Ensure that the frontier is at least at the checkpoint for this span - require.True(t, resolvedSpan.Timestamp.LessEq(frontierForSpans(fp.frontier, resolvedSpan.Span))) - } - - // Wait until the frontier terminates - _, meta := frontierOut.Next() - if meta != nil { - if !tc.frontierStartTime.IsEmpty() { - require.True(t, testutils.IsError(meta.Err, fmt.Sprintf("got a resolved timestamp ."+ - "* that is less than the frontier processor start time %s", - tc.frontierStartTime.String()))) - return - } - t.Fatalf("unexpected meta record returned by frontier processor: %+v\n", *meta) - } - - // Ensure that the rows emitted by the frontier never regress the ts or - // appear prior to a checkpoint. - var prevTimestamp hlc.Timestamp - for _, heartbeatTs := range heartbeats { - if !prevTimestamp.IsEmpty() { - require.True(t, prevTimestamp.LessEq(heartbeatTs)) - require.True(t, minCheckpointTs.LessEq(heartbeatTs)) - } - prevTimestamp = heartbeatTs - } - - // Check the final ts recorded by the frontier. - require.Equal(t, tc.expectedFrontierTimestamp, prevTimestamp) - }) - } -} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index c1aa8acfa993..e818e87343ac 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -165,22 +165,18 @@ func waitUntilProducerActive( func updateRunningStatus( ctx context.Context, - execCtx sql.JobExecContext, ingestionJob *jobs.Job, status jobspb.ReplicationStatus, runningStatus string, ) { - execCfg := execCtx.ExecCfg() - err := execCfg.InternalDB.Txn(ctx, func( - ctx context.Context, txn isql.Txn, - ) error { - return ingestionJob.WithTxn(txn).Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - updateRunningStatusInternal(md, ju, status, runningStatus) - return nil - }) + err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + updateRunningStatusInternal(md, ju, status, runningStatus) + return nil }) if err != nil { log.Warningf(ctx, "error when updating job running status: %s", err) + } else { + log.Infof(ctx, "%s", runningStatus) } } @@ -203,7 +199,7 @@ func completeIngestion( streamID := details.StreamID log.Infof(ctx, "completing the producer job %d", streamID) - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationCuttingOver, + updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationCuttingOver, "completing the producer job in the source cluster") completeProducerJob(ctx, ingestionJob, execCtx.ExecCfg().InternalDB, true) @@ -306,14 +302,14 @@ func ingestWithRetries( } const msgFmt = "stream ingestion waits for retrying after error: %s" log.Warningf(ctx, msgFmt, err) - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationError, + updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, fmt.Sprintf(msgFmt, err)) retryCount++ } if err != nil { return err } - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationCuttingOver, + updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationCuttingOver, "stream ingestion finished successfully") return nil } @@ -324,7 +320,7 @@ func (s *streamIngestionResumer) handleResumeError( ) error { const errorFmt = "ingestion job failed (%s) but is being paused" log.Warningf(ctx, errorFmt, err) - updateRunningStatus(ctx, execCtx, s.job, jobspb.ReplicationError, fmt.Sprintf(errorFmt, err)) + updateRunningStatus(ctx, s.job, jobspb.ReplicationError, fmt.Sprintf(errorFmt, err)) // The ingestion job is paused but the producer job will keep // running until it times out. Users can still resume ingestion before // the producer job times out. @@ -436,9 +432,12 @@ func cutoverTimeIsEligibleForCutover( log.Infof(ctx, "empty cutover time, no revert required") return false } - if progress.GetHighWater() == nil || progress.GetHighWater().Less(cutoverTime) { - log.Infof(ctx, "job with highwater %s not yet ready to revert to cutover at %s", - progress.GetHighWater(), cutoverTime.String()) + + replicatedTime := replicationutils.ReplicatedTimeFromProgress(progress) + if replicatedTime.Less(cutoverTime) { + log.Infof(ctx, "job with replicated time %s not yet ready to revert to cutover at %s", + replicatedTime, + cutoverTime.String()) return false } return true diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index ad5e8ae7fc95..eb58a478b474 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -261,11 +261,23 @@ func TestCutoverBuiltin(t *testing.T) { require.True(t, ok) require.True(t, sp.StreamIngest.CutoverTime.IsEmpty()) - var highWater time.Time + var replicatedTime time.Time err = job.NoTxn().Update(ctx, func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - highWater = timeutil.Now().Round(time.Microsecond) - hlcHighWater := hlc.Timestamp{WallTime: highWater.UnixNano()} - return jobs.UpdateHighwaterProgressed(hlcHighWater, md, ju) + if err := md.CheckRunningOrReverting(); err != nil { + return err + } + replicatedTime = timeutil.Now().Round(time.Microsecond) + hlcReplicatedTime := hlc.Timestamp{WallTime: replicatedTime.UnixNano()} + + progress := md.Progress + streamProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest + streamProgress.ReplicatedTime = hlcReplicatedTime + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &hlcReplicatedTime, + } + + ju.UpdateProgress(progress) + return nil }) require.NoError(t, err) @@ -273,7 +285,7 @@ func TestCutoverBuiltin(t *testing.T) { var explain string err = db.QueryRowContext(ctx, `EXPLAIN SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, job.ID(), - highWater).Scan(&explain) + replicatedTime).Scan(&explain) require.NoError(t, err) require.Equal(t, "distribution: local", explain) @@ -281,7 +293,7 @@ func TestCutoverBuiltin(t *testing.T) { err = db.QueryRowContext( ctx, `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, - job.ID(), highWater).Scan(&jobID) + job.ID(), replicatedTime).Scan(&jobID) require.NoError(t, err) require.Equal(t, job.ID(), jobspb.JobID(jobID)) @@ -291,7 +303,7 @@ func TestCutoverBuiltin(t *testing.T) { progress = sj.Progress() sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest) require.True(t, ok) - require.Equal(t, hlc.Timestamp{WallTime: highWater.UnixNano()}, sp.StreamIngest.CutoverTime) + require.Equal(t, hlc.Timestamp{WallTime: replicatedTime.UnixNano()}, sp.StreamIngest.CutoverTime) } // TestReplicationJobResumptionStartTime tests that a replication job picks the @@ -346,14 +358,14 @@ func TestReplicationJobResumptionStartTime(t *testing.T) { for _, r := range replicationSpecs { require.Equal(t, startTime, r.InitialScanTimestamp) - require.Empty(t, r.PreviousHighWaterTimestamp) + require.Empty(t, r.PreviousReplicatedTimestamp) } - require.Empty(t, frontier.HighWaterAtStart) + require.Empty(t, frontier.ReplicatedTimeAtStart) // Allow the job to make some progress. canContinue <- struct{}{} srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(replicationJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID)) // Pause the job. c.DestSysSQL.Exec(t, `PAUSE JOB $1`, replicationJobID) @@ -373,20 +385,20 @@ func TestReplicationJobResumptionStartTime(t *testing.T) { // Assert that the previous highwater mark is greater than the replication // start time. - var previousHighWaterTimestamp hlc.Timestamp + var previousReplicatedTimestamp hlc.Timestamp for _, r := range replicationSpecs { require.Equal(t, startTime, r.InitialScanTimestamp) - require.True(t, r.InitialScanTimestamp.Less(r.PreviousHighWaterTimestamp)) - if previousHighWaterTimestamp.IsEmpty() { - previousHighWaterTimestamp = r.PreviousHighWaterTimestamp + require.True(t, r.InitialScanTimestamp.Less(r.PreviousReplicatedTimestamp)) + if previousReplicatedTimestamp.IsEmpty() { + previousReplicatedTimestamp = r.PreviousReplicatedTimestamp } else { - require.Equal(t, r.PreviousHighWaterTimestamp, previousHighWaterTimestamp) + require.Equal(t, r.PreviousReplicatedTimestamp, previousReplicatedTimestamp) } } - require.Equal(t, frontier.HighWaterAtStart, previousHighWaterTimestamp) + require.Equal(t, frontier.ReplicatedTimeAtStart, previousReplicatedTimestamp) canContinue <- struct{}{} srcTime = c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(replicationJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID)) c.Cutover(producerJobID, replicationJobID, srcTime.GoTime(), false) jobutils.WaitForJobToSucceed(t, c.DestSysSQL, jobspb.JobID(replicationJobID)) } @@ -461,7 +473,17 @@ func TestCutoverFractionProgressed(t *testing.T) { replicationJob, err := registry.CreateJobWithTxn(ctx, mockReplicationJobRecord, jobID, nil) require.NoError(t, err) require.NoError(t, replicationJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - return jobs.UpdateHighwaterProgressed(cutover, md, ju) + if err := md.CheckRunningOrReverting(); err != nil { + return err + } + progress := md.Progress + streamProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest + streamProgress.ReplicatedTime = cutover + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &cutover, + } + ju.UpdateProgress(progress) + return nil })) metrics := registry.MetricsStruct().StreamIngest.(*Metrics) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index fce64cc43bac..7906ee64fa50 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -264,7 +265,7 @@ func newStreamIngestionDataProcessor( trackedSpans = append(trackedSpans, partitionSpec.Spans...) } - frontier, err := span.MakeFrontierAt(spec.PreviousHighWaterTimestamp, trackedSpans...) + frontier, err := span.MakeFrontierAt(spec.PreviousReplicatedTimestamp, trackedSpans...) if err != nil { return nil, err } @@ -281,8 +282,8 @@ func newStreamIngestionDataProcessor( frontier: frontier, maxFlushRateTimer: timeutil.NewTimer(), cutoverProvider: &cutoverFromJobProgress{ - jobID: jobspb.JobID(spec.JobID), - registry: flowCtx.Cfg.JobRegistry, + jobID: jobspb.JobID(spec.JobID), + db: flowCtx.Cfg.DB, }, cutoverCh: make(chan struct{}), closePoller: make(chan struct{}), @@ -371,16 +372,16 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { sip.streamPartitionClients = append(sip.streamPartitionClients, streamClient) } - previousHighWater := frontierForSpans(sip.frontier, partitionSpec.Spans...) + previousReplicatedTimetamp := frontierForSpans(sip.frontier, partitionSpec.Spans...) if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil { - streamingKnobs.BeforeClientSubscribe(addr, string(token), previousHighWater) + streamingKnobs.BeforeClientSubscribe(addr, string(token), previousReplicatedTimetamp) } } sub, err := streamClient.Subscribe(ctx, streampb.StreamID(sip.spec.StreamID), token, - sip.spec.InitialScanTimestamp, previousHighWater) + sip.spec.InitialScanTimestamp, previousReplicatedTimetamp) if err != nil { sip.MoveToDraining(errors.Wrapf(err, "consuming partition %v", addr)) @@ -1156,25 +1157,54 @@ type cutoverProvider interface { // custoverFromJobProgress is a cutoverProvider that decides whether the cutover // time has been reached based on the progress stored on the job record. type cutoverFromJobProgress struct { - registry *jobs.Registry - jobID jobspb.JobID + db isql.DB + jobID jobspb.JobID +} + +func (c *cutoverFromJobProgress) loadIngestionProgress( + ctx context.Context, +) (*jobspb.StreamIngestionProgress, error) { + var ( + progressBytes []byte + exists bool + ) + if err := c.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + infoStorage := jobs.InfoStorageForJob(txn, c.jobID) + var err error + progressBytes, exists, err = infoStorage.GetLegacyProgress(ctx) + return err + }); err != nil { + return nil, err + } + if !exists { + return nil, nil + } + progress := &jobspb.Progress{} + if err := protoutil.Unmarshal(progressBytes, progress); err != nil { + return nil, err + } + + sp, ok := progress.GetDetails().(*jobspb.Progress_StreamIngest) + if !ok { + return nil, errors.Newf("unknown progress details type %T in stream ingestion job %d", + progress.GetDetails(), c.jobID) + } + return sp.StreamIngest, nil } func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, error) { - j, err := c.registry.LoadJob(ctx, c.jobID) + ingestionProgress, err := c.loadIngestionProgress(ctx) if err != nil { return false, err } - progress := j.Progress() - var sp *jobspb.Progress_StreamIngest - var ok bool - if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok { - return false, errors.Newf("unknown progress type %T in stream ingestion job %d", - j.Progress().Progress, c.jobID) + if ingestionProgress == nil { + log.Warningf(ctx, "no legacy job progress recorded yet") + return false, nil } - // Job has been signaled to complete. - if resolvedTimestamp := progress.GetHighWater(); !sp.StreamIngest.CutoverTime.IsEmpty() && - resolvedTimestamp != nil && sp.StreamIngest.CutoverTime.Less(*resolvedTimestamp) { + + cutoverTime := ingestionProgress.CutoverTime + replicatedTime := ingestionProgress.ReplicatedTime + if !cutoverTime.IsEmpty() && cutoverTime.LessEq(replicatedTime) { return true, nil } diff --git a/pkg/ccl/streamingccl/streamingest/testdata/cutover_after_pause b/pkg/ccl/streamingccl/streamingest/testdata/cutover_after_pause index b88a01b6eabe..589f2a759b98 100644 --- a/pkg/ccl/streamingccl/streamingest/testdata/cutover_after_pause +++ b/pkg/ccl/streamingccl/streamingest/testdata/cutover_after_pause @@ -27,7 +27,7 @@ let $afterImport as=source-system SELECT clock_timestamp()::timestamp::string ---- -wait-until-high-watermark ts=$afterImport +wait-until-replicated-time ts=$afterImport ---- job as=destination-system pause diff --git a/pkg/ccl/streamingccl/streamingest/testdata/simple b/pkg/ccl/streamingccl/streamingest/testdata/simple index d2c8b7a2c115..ff9620b01a9f 100644 --- a/pkg/ccl/streamingccl/streamingest/testdata/simple +++ b/pkg/ccl/streamingccl/streamingest/testdata/simple @@ -10,7 +10,7 @@ let $start as=source-system SELECT clock_timestamp()::timestamp::string ---- -wait-until-high-watermark ts=$start +wait-until-replicated-time ts=$start ---- # The job description should be redacted. diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 0a349145d375..3e9beff52863 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -115,7 +115,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { } initialTimestamp := s.spec.InitialScanTimestamp - if s.spec.PreviousHighWaterTimestamp.IsEmpty() { + if s.spec.PreviousReplicatedTimestamp.IsEmpty() { opts = append(opts, rangefeed.WithInitialScan(func(ctx context.Context) {}), rangefeed.WithScanRetryBehavior(rangefeed.ScanRetryRemaining), @@ -132,10 +132,10 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { rangefeed.WithOnScanCompleted(s.onInitialScanSpanCompleted), ) } else { - initialTimestamp = s.spec.PreviousHighWaterTimestamp + initialTimestamp = s.spec.PreviousReplicatedTimestamp // When resuming from cursor, advance frontier to the cursor position. for _, sp := range s.spec.Spans { - if _, err := frontier.Forward(sp, s.spec.PreviousHighWaterTimestamp); err != nil { + if _, err := frontier.Forward(sp, s.spec.PreviousReplicatedTimestamp); err != nil { return err } } diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go index 2c42fec9f81d..fbb10d63add4 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -302,7 +302,7 @@ func encodeSpec( h *replicationtestutils.ReplicationHelper, srcTenant replicationtestutils.TenantState, initialScanTime hlc.Timestamp, - previousHighWater hlc.Timestamp, + previousReplicatedTime hlc.Timestamp, tables ...string, ) []byte { var spans []roachpb.Span @@ -313,9 +313,9 @@ func encodeSpec( } spec := &streampb.StreamPartitionSpec{ - InitialScanTimestamp: initialScanTime, - PreviousHighWaterTimestamp: previousHighWater, - Spans: spans, + InitialScanTimestamp: initialScanTime, + PreviousReplicatedTimestamp: previousReplicatedTime, + Spans: spans, Config: streampb.StreamPartitionSpec_ExecutionConfig{ MinCheckpointFrequency: 10 * time.Millisecond, }, @@ -518,17 +518,17 @@ USE d; srcTenant.SQL.Exec(t, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, n INT)", table)) srcTenant.SQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&clockTime) - var previousHighWater hlc.Timestamp + var previousReplicatedTime hlc.Timestamp initialScanTimestamp := hlc.Timestamp{WallTime: clockTime.UnixNano()} if !initialScan { - previousHighWater = hlc.Timestamp{WallTime: clockTime.UnixNano()} + previousReplicatedTime = hlc.Timestamp{WallTime: clockTime.UnixNano()} } if addSSTableBeforeRangefeed { srcTenant.SQL.Exec(t, fmt.Sprintf("IMPORT INTO %s CSV DATA ($1)", table), dataSrv.URL) } source, feed := startReplication(ctx, t, h, makePartitionStreamDecoder, streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp, - previousHighWater, table)) + previousReplicatedTime, table)) defer feed.Close(ctx) if !addSSTableBeforeRangefeed { srcTenant.SQL.Exec(t, fmt.Sprintf("IMPORT INTO %s CSV DATA ($1)", table), dataSrv.URL) diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index faa6ea9ae870..22ca7615ce51 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -180,6 +180,7 @@ go_library( "//pkg/ccl/changefeedccl", "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/streamingccl/replicationutils", "//pkg/cli", "//pkg/cli/clisqlclient", "//pkg/cloud", diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 6c6506418632..cc95c1b80c94 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -22,6 +22,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" @@ -508,14 +509,14 @@ func (rd *replicationDriver) runWorkload(ctx context.Context) error { return rd.rs.workload.runDriver(ctx, rd.c, rd.t, rd.setup) } -func (rd *replicationDriver) waitForHighWatermark(ingestionJobID int, wait time.Duration) { +func (rd *replicationDriver) waitForReplicatedTime(ingestionJobID int, wait time.Duration) { testutils.SucceedsWithin(rd.t, func() error { info, err := getStreamIngestionJobInfo(rd.setup.dst.db, ingestionJobID) if err != nil { return err } if info.GetHighWater().IsZero() { - return errors.New("no high watermark") + return errors.New("no replicated time") } return nil }, wait) @@ -655,7 +656,7 @@ func (rd *replicationDriver) main(ctx context.Context) { }) rd.t.L().Printf("waiting for replication stream to finish ingesting initial scan") - rd.waitForHighWatermark(ingestionJobID, rd.rs.timeout/2) + rd.waitForReplicatedTime(ingestionJobID, rd.rs.timeout/2) rd.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, rd.rs.additionalDuration)) @@ -973,11 +974,8 @@ func (rrd *replResilienceDriver) getPhase() c2cPhase { rrd.dstJobID).Scan(&jobStatus) require.Equal(rrd.t, jobs.StatusRunning, jobs.Status(jobStatus)) - progress := getJobProgress(rrd.t, rrd.setup.dst.sysSQL, rrd.dstJobID) - streamIngestProgress := progress.GetStreamIngest() - highWater := progress.GetHighWater() - - if highWater == nil || highWater.IsEmpty() { + streamIngestProgress := getJobProgress(rrd.t, rrd.setup.dst.sysSQL, rrd.dstJobID).GetStreamIngest() + if streamIngestProgress.ReplicatedTime.IsEmpty() { return phaseInitialScan } if streamIngestProgress.CutoverTime.IsEmpty() { @@ -1135,13 +1133,16 @@ func getIngestionJobID(t test.Test, dstSQL *sqlutils.SQLRunner, dstTenantName st } type streamIngesitonJobInfo struct { - status string - errMsg string - highwaterTime time.Time - finishedTime time.Time + status string + errMsg string + replicatedTime time.Time + finishedTime time.Time } -func (c *streamIngesitonJobInfo) GetHighWater() time.Time { return c.highwaterTime } +// GetHighWater returns the replicated time. The GetHighWater name is +// retained here as this is implementing the jobInfo interface used by +// the latency verifier. +func (c *streamIngesitonJobInfo) GetHighWater() time.Time { return c.replicatedTime } func (c *streamIngesitonJobInfo) GetFinishedTime() time.Time { return c.finishedTime } func (c *streamIngesitonJobInfo) GetStatus() string { return c.status } func (c *streamIngesitonJobInfo) GetError() string { return c.status } @@ -1165,16 +1166,11 @@ func getStreamIngestionJobInfo(db *gosql.DB, jobID int) (jobInfo, error) { if err := protoutil.Unmarshal(progressBytes, &progress); err != nil { return nil, err } - var highwaterTime time.Time - highwater := progress.GetHighWater() - if highwater != nil { - highwaterTime = highwater.GoTime() - } return &streamIngesitonJobInfo{ - status: status, - errMsg: payload.Error, - highwaterTime: highwaterTime, - finishedTime: time.UnixMicro(payload.FinishedMicros), + status: status, + errMsg: payload.Error, + replicatedTime: replicationutils.ReplicatedTimeFromProgress(&progress).GoTime(), + finishedTime: time.UnixMicro(payload.FinishedMicros), }, nil } diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index cae15f376987..b274b7860d9b 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -151,6 +151,11 @@ message StreamIngestionProgress { // consistent state as of the CutoverTime. util.hlc.Timestamp cutover_time = 1 [(gogoproto.nullable) = false]; + // ReplicatedTime is the ingestion frontier. This is the canonical + // value of the frontier. The HighWater in the job progress is for + // informational purposes only. + util.hlc.Timestamp replicated_time = 7 [(gogoproto.nullable) = false]; + // ReplicationStatus is the status of the tenant that has a replication // (ingestion) job. uint32 replication_status = 6 [ diff --git a/pkg/repstream/streampb/stream.proto b/pkg/repstream/streampb/stream.proto index 86fe2ba749c6..335bd5f56aea 100644 --- a/pkg/repstream/streampb/stream.proto +++ b/pkg/repstream/streampb/stream.proto @@ -38,18 +38,18 @@ message ReplicationProducerSpec { // StreamPartitionSpec is the stream partition specification. message StreamPartitionSpec { - // PreviousHighWaterTimestamp specifies the timestamp from which spans will + // PreviousReplicatedTimestamp specifies the timestamp from which spans will // start ingesting data in the replication job. This timestamp is empty unless // the replication job resumes after a progress checkpoint has been recorded. // While it is empty we use the InitialScanTimestamp described below. - util.hlc.Timestamp previous_high_water_timestamp = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp previous_replicated_timestamp = 1 [(gogoproto.nullable) = false]; // InitialScanTimestamp is the timestamp at which the partition will run the // initial rangefeed scan before replicating further changes to the target // spans. This timestamp is always non-empty, but a partition will only run an // initial scan if no progress has been recorded prior to the current // resumption of the replication job. Otherwise, all spans will start - // ingesting data from the PreviousHighWaterTimestamp described above. + // ingesting data from the PreviousReplicatedTimestamp described above. util.hlc.Timestamp initial_scan_timestamp = 4 [(gogoproto.nullable) = false]; // List of spans to stream. diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index e1994ffc0811..ca76765ca68c 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -204,18 +204,18 @@ message StreamIngestionDataSpec { // PartitionSpecs maps partition IDs to their specifications. map partition_specs = 6 [(gogoproto.nullable) = false]; - // PreviousHighWaterTimestamp specifies the timestamp from which spans will + // PreviousReplicatedTimestamp specifies the timestamp from which spans will // start ingesting data in the replication job. This timestamp is empty unless // the replication job resumes after a progress checkpoint has been recorded. // While it is empty we use the InitialScanTimestamp described below. - optional util.hlc.Timestamp previous_high_water_timestamp = 2 [(gogoproto.nullable) = false]; + optional util.hlc.Timestamp previous_replicated_timestamp = 2 [(gogoproto.nullable) = false]; // InitialScanTimestamp is the timestamp at which the partition will run the // initial rangefeed scan before replicating further changes to the target // spans. This timestamp is always non-empty, but a partition will only run an // initial scan if no progress has been recorded prior to the current // resumption of the replication job. Otherwise, all spans will start - // ingesting data from the PreviousHighWaterTimestamp described above. + // ingesting data from the PreviousReplicatedTimestamp described above. optional util.hlc.Timestamp initial_scan_timestamp = 11 [(gogoproto.nullable) = false]; // StreamAddress locate the stream so that a stream client can be initialized. @@ -231,11 +231,11 @@ message StreamIngestionDataSpec { } message StreamIngestionFrontierSpec { - // HighWaterAtStart is set by the ingestion job when initializing the frontier + // ReplicatedTimeAtStart is set by the ingestion job when initializing the frontier // processor. It is used as sanity check by the frontier processor to ensure // that it does not receive updates at a timestamp lower than this field. This // consequently prevents the job progress from regressing during ingestion. - optional util.hlc.Timestamp high_water_at_start = 1 [(gogoproto.nullable) = false]; + optional util.hlc.Timestamp replicated_time_at_start = 1 [(gogoproto.nullable) = false]; // TrackedSpans is the entire span set being watched. The spans do not really // represent KV spans but uniquely identify the partitions in the ingestion // stream. Once all the partitions in the ingestion stream have been resolved From 267427905623a0822c42754bb142df293b34ed29 Mon Sep 17 00:00:00 2001 From: Mira Radeva Date: Thu, 25 May 2023 09:40:10 -0400 Subject: [PATCH 4/4] kv: allow commit triggers for manually-ended transactions Previously, commit triggers were only invoked upon an explicit transaction Commit(). However, sometimes transactions are committed with a separate EndTxnRequest instead, and in those cases the commit triggers were not invoked. This patch fixes the issue by invoking commit triggers as part of Send(). Part of: #82538 Release note: None --- pkg/kv/txn.go | 13 ++++++----- pkg/kv/txn_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 141bfa28b7ab..a18f2d971c0c 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -705,11 +705,6 @@ func (txn *Txn) commit(ctx context.Context) error { et := endTxnReq(true, txn.deadline()) ba := &kvpb.BatchRequest{Requests: et.unionArr[:]} _, pErr := txn.Send(ctx, ba) - if pErr == nil { - for _, t := range txn.commitTriggers { - t(ctx) - } - } return pErr.GoError() } @@ -1089,7 +1084,15 @@ func (txn *Txn) Send( sender := txn.mu.sender txn.mu.Unlock() br, pErr := txn.db.sendUsingSender(ctx, ba, sender) + if pErr == nil { + // Invoking the commit triggers here ensures they run even in the case when a + // commit request is issued manually (not via Commit). + if et, ok := ba.GetArg(kvpb.EndTxn); ok && et.(*kvpb.EndTxnRequest).Commit { + for _, t := range txn.commitTriggers { + t(ctx) + } + } return br, nil } diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index c599eab66f69..152e875d67cb 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -780,3 +780,58 @@ func TestTxnNegotiateAndSendWithResumeSpan(t *testing.T) { } }) } + +// TestTxnCommitTriggers tests the behavior of invoking commit triggers, as part +// of a Commit or a manual EndTxnRequest that includes a commit. +func TestTxnCommitTriggers(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + for _, test := range []struct { + name string + // A function that specifies how a transaction ends. + endTxnFn func(txn *Txn) error + // Assuming a trigger bool value starts off as false, expTrigger is the + // expected value of the trigger after the transaction ends. + expTrigger bool + }{ + { + name: "explicit commit", + endTxnFn: func(txn *Txn) error { return txn.Commit(ctx) }, + expTrigger: true, + }, + { + name: "manual commit", + endTxnFn: func(txn *Txn) error { + b := txn.NewBatch() + b.AddRawRequest(&kvpb.EndTxnRequest{Commit: true}) + return txn.Run(ctx, b) + }, + expTrigger: true, + }, + { + name: "manual abort", + endTxnFn: func(txn *Txn) error { + b := txn.NewBatch() + b.AddRawRequest(&kvpb.EndTxnRequest{Commit: false}) + return txn.Run(ctx, b) + }, + expTrigger: false, + }, + } { + t.Run(test.name, func(t *testing.T) { + clock := hlc.NewClockForTesting(nil) + db := NewDB(log.MakeTestingAmbientCtxWithNewTracer(), newTestTxnFactory(nil), clock, stopper) + txn := NewTxn(ctx, db, 0 /* gatewayNodeID */) + triggerVal := false + triggerFn := func(ctx context.Context) { triggerVal = true } + txn.AddCommitTrigger(triggerFn) + err := test.endTxnFn(txn) + require.NoError(t, err) + require.Equal(t, test.expTrigger, triggerVal) + }) + } +}