diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 5c9c28f355e0..7834a9b8b11f 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -15,6 +15,7 @@ import ( "crypto/x509" "encoding/json" "fmt" + "hash/fnv" "math" "net/url" "strings" @@ -711,9 +712,7 @@ var _ sarama.Partitioner = &changefeedPartitioner{} var _ sarama.PartitionerConstructor = newChangefeedPartitioner func newChangefeedPartitioner(topic string) sarama.Partitioner { - return &changefeedPartitioner{ - hash: sarama.NewHashPartitioner(topic), - } + return sarama.NewCustomHashPartitioner(fnv.New32a)(topic) } func (p *changefeedPartitioner) RequiresConsistency() bool { return true } diff --git a/pkg/ccl/changefeedccl/sink_test.go b/pkg/ccl/changefeedccl/sink_test.go index f26801522bb2..6f9d46d14a63 100644 --- a/pkg/ccl/changefeedccl/sink_test.go +++ b/pkg/ccl/changefeedccl/sink_test.go @@ -871,3 +871,35 @@ func TestSinkConfigParsing(t *testing.T) { require.ErrorContains(t, err, "invalid character 's' looking for beginning of value") }) } + +func TestChangefeedConsistentPartitioning(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // We test that these arbitrary strings get mapped to these + // arbitrary partitions to ensure that if an upgrade occurs + // while a changefeed is running, partitioning remains the same + // and therefore ordering guarantees are preserved. Changing + // these values is a breaking change. + referencePartitions := map[string]int32{ + "0": 1003, + "01": 351, + "10": 940, + "a": 292, + "\x00": 732, + "\xff \xff": 164, + } + longString1 := strings.Repeat("a", 2048) + referencePartitions[longString1] = 755 + longString2 := strings.Repeat("a", 2047) + "A" + referencePartitions[longString2] = 592 + + partitioner := newChangefeedPartitioner("topic1") + + for key, expected := range referencePartitions { + actual, err := partitioner.Partition(&sarama.ProducerMessage{Key: sarama.ByteEncoder(key)}, 1031) + require.NoError(t, err) + require.Equal(t, expected, actual) + } + +} diff --git a/pkg/ccl/streamingccl/replicationtestutils/testutils.go b/pkg/ccl/streamingccl/replicationtestutils/testutils.go index 3b95c4090e84..64a6fa1018d0 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/testutils.go +++ b/pkg/ccl/streamingccl/replicationtestutils/testutils.go @@ -150,23 +150,18 @@ func FingerprintTenantAtTimestampNoHistory( return db.QueryStr(t, fingerprintQuery, tenantID)[0][0] } -// WaitUntilHighWatermark waits for the ingestion job high watermark to reach the given high watermark. -func (c *TenantStreamingClusters) WaitUntilHighWatermark( - highWatermark hlc.Timestamp, ingestionJobID jobspb.JobID, +// WaitUntilReplicatedTime waits for the ingestion job high watermark +// to reach the given target time. +func (c *TenantStreamingClusters) WaitUntilReplicatedTime( + targetTime hlc.Timestamp, ingestionJobID jobspb.JobID, ) { - testutils.SucceedsSoon(c.T, func() error { - progress := jobutils.GetJobProgress(c.T, c.DestSysSQL, ingestionJobID) - if progress.GetHighWater() == nil { - return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s", - highWatermark.String()) - } - highwater := *progress.GetHighWater() - if highwater.Less(highWatermark) { - return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s", - highwater.String(), highWatermark.String()) - } - return nil - }) + WaitUntilReplicatedTime(c.T, targetTime, c.DestSysSQL, ingestionJobID) +} + +// WaitUntilStartTimeReached waits for the ingestion replicated time +// to reach the recorded start time of the job. +func (c *TenantStreamingClusters) WaitUntilStartTimeReached(ingestionJobID jobspb.JobID) { + WaitUntilStartTimeReached(c.T, c.DestSysSQL, ingestionJobID) } // Cutover sets the cutover timestamp on the replication job causing the job to @@ -333,12 +328,6 @@ func (c *TenantStreamingClusters) SrcExec(exec srcInitExecFunc) { exec(c.T, c.SrcSysSQL, c.SrcTenantSQL) } -// WaitUntilStartTimeReached waits for the ingestion job high watermark to reach -// the recorded start time of the job. -func (c *TenantStreamingClusters) WaitUntilStartTimeReached(ingestionJobID jobspb.JobID) { - WaitUntilStartTimeReached(c.T, c.DestSysSQL, ingestionJobID) -} - func WaitUntilStartTimeReached(t *testing.T, db *sqlutils.SQLRunner, ingestionJobID jobspb.JobID) { timeout := 45 * time.Second if skip.NightlyStress() { @@ -358,20 +347,31 @@ func WaitUntilStartTimeReached(t *testing.T, db *sqlutils.SQLRunner, ingestionJo return errors.New("ingestion start time not yet recorded") } - progress := jobutils.GetJobProgress(t, db, ingestionJobID) - if progress.GetHighWater() == nil { - return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s", - startTime.String()) - } - highwater := *progress.GetHighWater() - if highwater.Less(startTime) { - return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s", - highwater.String(), startTime.String()) - } - return nil + return requireReplicatedTime(startTime, jobutils.GetJobProgress(t, db, ingestionJobID)) }, timeout) } +func WaitUntilReplicatedTime( + t *testing.T, targetTime hlc.Timestamp, db *sqlutils.SQLRunner, ingestionJobID jobspb.JobID, +) { + testutils.SucceedsSoon(t, func() error { + return requireReplicatedTime(targetTime, jobutils.GetJobProgress(t, db, ingestionJobID)) + }) +} + +func requireReplicatedTime(targetTime hlc.Timestamp, progress *jobspb.Progress) error { + replicatedTime := replicationutils.ReplicatedTimeFromProgress(progress) + if replicatedTime.IsEmpty() { + return errors.Newf("stream ingestion has not recorded any progress yet, waiting to advance pos %s", + targetTime) + } + if replicatedTime.Less(targetTime) { + return errors.Newf("waiting for stream ingestion job progress %s to advance beyond %s", + replicatedTime, targetTime) + } + return nil +} + func CreateScatteredTable(t *testing.T, c *TenantStreamingClusters, numNodes int) { // Create a source table with multiple ranges spread across multiple nodes numRanges := 50 diff --git a/pkg/ccl/streamingccl/replicationutils/utils.go b/pkg/ccl/streamingccl/replicationutils/utils.go index d0e65f75b447..5f8070d25794 100644 --- a/pkg/ccl/streamingccl/replicationutils/utils.go +++ b/pkg/ccl/streamingccl/replicationutils/utils.go @@ -136,9 +136,11 @@ func GetStreamIngestionStatsNoHeartbeat( IngestionDetails: &streamIngestionDetails, IngestionProgress: jobProgress.GetStreamIngest(), } - if highwater := jobProgress.GetHighWater(); highwater != nil && !highwater.IsEmpty() { + + replicatedTime := ReplicatedTimeFromProgress(&jobProgress) + if !replicatedTime.IsEmpty() { lagInfo := &streampb.StreamIngestionStats_ReplicationLagInfo{ - MinIngestedTimestamp: *highwater, + MinIngestedTimestamp: replicatedTime, } lagInfo.EarliestCheckpointedTimestamp = hlc.MaxTimestamp lagInfo.LatestCheckpointedTimestamp = hlc.MinTimestamp @@ -154,12 +156,16 @@ func GetStreamIngestionStatsNoHeartbeat( } lagInfo.SlowestFastestIngestionLag = lagInfo.LatestCheckpointedTimestamp.GoTime(). Sub(lagInfo.EarliestCheckpointedTimestamp.GoTime()) - lagInfo.ReplicationLag = timeutil.Since(highwater.GoTime()) + lagInfo.ReplicationLag = timeutil.Since(replicatedTime.GoTime()) stats.ReplicationLagInfo = lagInfo } return stats, nil } +func ReplicatedTimeFromProgress(p *jobspb.Progress) hlc.Timestamp { + return p.Details.(*jobspb.Progress_StreamIngest).StreamIngest.ReplicatedTime +} + func GetStreamIngestionStats( ctx context.Context, streamIngestionDetails jobspb.StreamIngestionDetails, diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 0721cafc7707..08ac8abe88bc 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -77,7 +77,7 @@ type Client interface { // open its subscription to its partition of a larger stream. // TODO(dt): ts -> checkpointToken. Subscribe(ctx context.Context, streamID streampb.StreamID, spec SubscriptionToken, - initialScanTime hlc.Timestamp, previousHighWater hlc.Timestamp) (Subscription, error) + initialScanTime hlc.Timestamp, previousReplicatedTime hlc.Timestamp) (Subscription, error) // Close releases all the resources used by this client. Close(ctx context.Context) error diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index c2f829c8114c..a99f3aabfacf 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -202,7 +202,7 @@ func (p *partitionedStreamClient) Subscribe( streamID streampb.StreamID, spec SubscriptionToken, initialScanTime hlc.Timestamp, - previousHighWater hlc.Timestamp, + previousReplicatedTime hlc.Timestamp, ) (Subscription, error) { _, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe") defer sp.Finish() @@ -212,7 +212,7 @@ func (p *partitionedStreamClient) Subscribe( return nil, err } sps.InitialScanTimestamp = initialScanTime - sps.PreviousHighWaterTimestamp = previousHighWater + sps.PreviousReplicatedTimestamp = previousReplicatedTime specBytes, err := protoutil.Marshal(&sps) if err != nil { diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index dff505ee525e..d58bc675b5fb 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -88,7 +88,6 @@ go_test( "rangekey_batcher_test.go", "replication_random_client_test.go", "replication_stream_e2e_test.go", - "stream_ingestion_frontier_processor_test.go", "stream_ingestion_job_test.go", "stream_ingestion_processor_test.go", ], @@ -143,7 +142,6 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", - "//pkg/upgrade/upgradebase", "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go index 7879ce03f36d..86ba6e777464 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job.go @@ -234,13 +234,14 @@ func alterTenantJobCutover( return hlc.Timestamp{}, errors.Newf("job with id %d is not a stream ingestion job", job.ID()) } progress := job.Progress() + if alterTenantStmt.Cutover.Latest { - ts := progress.GetHighWater() - if ts == nil || ts.IsEmpty() { + replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress) + if replicatedTime.IsEmpty() { return hlc.Timestamp{}, errors.Newf("replicated tenant %q has not yet recorded a safe replication time", tenantName) } - cutoverTime = *ts + cutoverTime = replicatedTime } // TODO(ssd): We could use the replication manager here, but diff --git a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go index 48d1d72afdb2..d5cabb448ad6 100644 --- a/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/alter_replication_job_test.go @@ -41,7 +41,7 @@ func TestAlterTenantCompleteToTime(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) var cutoverTime time.Time c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) @@ -68,14 +68,14 @@ func TestAlterTenantCompleteToLatest(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - highWater := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID)) + targetReplicatedTime := c.SrcCluster.Server(0).Clock().Now() + c.WaitUntilReplicatedTime(targetReplicatedTime, jobspb.JobID(ingestionJobID)) var cutoverStr string c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`, args.DestTenantName).Scan(&cutoverStr) cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) - require.GreaterOrEqual(t, cutoverOutput.GoTime(), highWater.GoTime()) + require.GreaterOrEqual(t, cutoverOutput.GoTime(), targetReplicatedTime.GoTime()) require.LessOrEqual(t, cutoverOutput.GoTime(), c.SrcCluster.Server(0).Clock().Now().GoTime()) jobutils.WaitForJobToSucceed(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) } @@ -94,7 +94,7 @@ func TestAlterTenantPauseResume(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) // Pause the replication job. c.DestSysSQL.Exec(t, `ALTER TENANT $1 PAUSE REPLICATION`, args.DestTenantName) @@ -104,7 +104,7 @@ func TestAlterTenantPauseResume(t *testing.T) { c.DestSysSQL.Exec(t, `ALTER TENANT $1 RESUME REPLICATION`, args.DestTenantName) jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) var cutoverTime time.Time c.DestSysSQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&cutoverTime) @@ -176,12 +176,12 @@ func TestAlterTenantUpdateExistingCutoverTime(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - highWater := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(highWater, jobspb.JobID(ingestionJobID)) + replicatedTimeTarget := c.SrcCluster.Server(0).Clock().Now() + c.WaitUntilReplicatedTime(replicatedTimeTarget, jobspb.JobID(ingestionJobID)) // First cutover to a future time. var cutoverStr string - cutoverTime := highWater.Add(time.Hour.Nanoseconds(), 0) + cutoverTime := replicatedTimeTarget.Add(time.Hour.Nanoseconds(), 0) c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr) cutoverOutput := replicationtestutils.DecimalTimeToHLC(t, cutoverStr) @@ -190,7 +190,7 @@ func TestAlterTenantUpdateExistingCutoverTime(t *testing.T) { require.Equal(t, cutoverOutput, getCutoverTime()) // And cutover to an even further time. - cutoverTime = highWater.Add((time.Hour * 2).Nanoseconds(), 0) + cutoverTime = replicatedTimeTarget.Add((time.Hour * 2).Nanoseconds(), 0) c.DestSysSQL.QueryRow(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO SYSTEM TIME $2::string`, args.DestTenantName, cutoverTime.AsOfSystemTime()).Scan(&cutoverStr) cutoverOutput = replicationtestutils.DecimalTimeToHLC(t, cutoverStr) @@ -242,7 +242,7 @@ func TestAlterTenantFailUpdatingCutoverTime(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) require.Equal(c.T, "replicating", getTenantStatus()) @@ -337,7 +337,7 @@ func TestTenantStatusWithFutureCutoverTime(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) require.Equal(c.T, "replicating", getTenantStatus()) @@ -351,7 +351,7 @@ func TestTenantStatusWithFutureCutoverTime(t *testing.T) { c.DestSysSQL.Exec(t, `ALTER TENANT $1 RESUME REPLICATION`, args.DestTenantName) unblockResumerStart() jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) require.Equal(c.T, "replicating", getTenantStatus()) @@ -422,7 +422,7 @@ func TestTenantStatusWithLatestCutoverTime(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) require.Equal(c.T, "replicating", getTenantStatus()) @@ -459,7 +459,7 @@ func TestTenantReplicationStatus(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) registry := c.DestSysServer.JobRegistry().(*jobs.Registry) @@ -496,7 +496,7 @@ func TestAlterTenantHandleFutureProtectedTimestamp(t *testing.T) { jobutils.WaitForJobToRun(t, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) c.DestSysSQL.Exec(c.T, `ALTER TENANT $1 COMPLETE REPLICATION TO LATEST`, args.DestTenantName) } diff --git a/pkg/ccl/streamingccl/streamingest/datadriven_test.go b/pkg/ccl/streamingccl/streamingest/datadriven_test.go index 8766d5fe8a2f..995c7057b78b 100644 --- a/pkg/ccl/streamingccl/streamingest/datadriven_test.go +++ b/pkg/ccl/streamingccl/streamingest/datadriven_test.go @@ -44,7 +44,7 @@ import ( // tenant. This operation will fail the test if it is run prior to the // replication stream activating the tenant. // -// - wait-until-high-watermark ts= +// - wait-until-replicated-time ts= // Wait until the replication job has reached the specified timestamp. // // - cutover ts= @@ -119,22 +119,20 @@ func TestDataDriven(t *testing.T) { case "start-replication-stream": ds.producerJobID, ds.replicationJobID = ds.replicationClusters.StartStreamReplication(ctx) - case "wait-until-high-watermark": - var highWaterMark string - d.ScanArgs(t, "ts", &highWaterMark) - varValue, ok := ds.vars[highWaterMark] + case "wait-until-replicated-time": + var replicatedTimeTarget string + d.ScanArgs(t, "ts", &replicatedTimeTarget) + varValue, ok := ds.vars[replicatedTimeTarget] if ok { - highWaterMark = varValue + replicatedTimeTarget = varValue } - timestamp, _, err := tree.ParseDTimestamp(nil, highWaterMark, time.Microsecond) + timestamp, _, err := tree.ParseDTimestamp(nil, replicatedTimeTarget, time.Microsecond) require.NoError(t, err) - hw := hlc.Timestamp{WallTime: timestamp.UnixNano()} - ds.replicationClusters.WaitUntilHighWatermark(hw, jobspb.JobID(ds.replicationJobID)) - + ds.replicationClusters.WaitUntilReplicatedTime(hlc.Timestamp{WallTime: timestamp.UnixNano()}, + jobspb.JobID(ds.replicationJobID)) case "start-replicated-tenant": cleanupTenant := ds.replicationClusters.CreateDestTenantSQL(ctx) ds.cleanupFns = append(ds.cleanupFns, cleanupTenant) - case "let": if len(d.CmdArgs) == 0 { t.Fatalf("Must specify at least one variable name.") diff --git a/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go b/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go index f644e193ac7d..8ceaa15b32bf 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_random_client_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // To start tenants. "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -48,18 +49,18 @@ import ( "github.com/stretchr/testify/require" ) -func getHighWaterMark(ingestionJobID int, sqlDB *gosql.DB) (*hlc.Timestamp, error) { +func getReplicatedTime(ingestionJobID int, sqlDB *gosql.DB) (hlc.Timestamp, error) { var progressBytes []byte if err := sqlDB.QueryRow( `SELECT progress FROM crdb_internal.system_jobs WHERE id = $1`, ingestionJobID, ).Scan(&progressBytes); err != nil { - return nil, err + return hlc.Timestamp{}, err } - var payload jobspb.Progress - if err := protoutil.Unmarshal(progressBytes, &payload); err != nil { - return nil, err + var progress jobspb.Progress + if err := protoutil.Unmarshal(progressBytes, &progress); err != nil { + return hlc.Timestamp{}, err } - return payload.GetHighWater(), nil + return replicationutils.ReplicatedTimeFromProgress(&progress), nil } func getTestRandomClientURI(tenantID roachpb.TenantID, tenantName roachpb.TenantName) string { @@ -274,14 +275,14 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { close(canBeCompletedCh) // Ensure that the job has made some progress. - var highwater hlc.Timestamp + var replicatedTime hlc.Timestamp testutils.SucceedsSoon(t, func() error { - hw, err := getHighWaterMark(ingestionJobID, conn) + var err error + replicatedTime, err = getReplicatedTime(ingestionJobID, conn) require.NoError(t, err) - if hw == nil { - return errors.New("highwatermark is unset, no progress has been reported") + if replicatedTime.IsEmpty() { + return errors.New("ReplicatedTime is unset, no progress has been reported") } - highwater = *hw return nil }) @@ -289,7 +290,7 @@ func TestStreamIngestionJobWithRandomClient(t *testing.T) { // cancellation, and subsequently rollback data above our frontier timestamp. // // Pick a cutover time just before the latest resolved timestamp. - cutoverTime := timeutil.Unix(0, highwater.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond) + cutoverTime := timeutil.Unix(0, replicatedTime.WallTime).UTC().Add(-1 * time.Microsecond).Round(time.Microsecond) _, err = conn.Exec(`ALTER TENANT "30" COMPLETE REPLICATION TO SYSTEM TIME $1::string`, cutoverTime) require.NoError(t, err) diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index 2aba38f43bfa..b3a934fdba72 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -61,7 +61,7 @@ func TestTenantStreamingProducerJobTimedOut(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) stats := replicationutils.TestingGetStreamIngestionStatsFromReplicationJob(t, ctx, c.DestSysSQL, ingestionJobID) @@ -118,7 +118,7 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) // Pause ingestion. @@ -144,7 +144,7 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) { // Confirm that dest tenant has received the new change after resumption. srcTime = c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) } @@ -284,7 +284,7 @@ func TestTenantStreamingCheckpoint(t *testing.T) { jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) cutoverTime := c.DestSysServer.Clock().Now() - c.WaitUntilHighWatermark(cutoverTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID)) c.Cutover(producerJobID, ingestionJobID, cutoverTime.GoTime(), false) cutoverFingerprint := c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime()) @@ -318,7 +318,7 @@ func TestTenantStreamingCancelIngestion(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) if cancelAfterPaused { @@ -388,7 +388,7 @@ func TestTenantStreamingDropTenantCancelsStream(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.WaitUntilHighWatermark(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(c.SrcCluster.Server(0).Clock().Now(), jobspb.JobID(ingestionJobID)) if cancelAfterPaused { c.DestSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID)) jobutils.WaitForJobToPause(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) @@ -452,7 +452,7 @@ func TestTenantStreamingUnavailableStreamAddress(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.DestSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID) jobutils.WaitForJobToPause(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) @@ -541,7 +541,7 @@ func TestTenantStreamingCutoverOnSourceFailure(t *testing.T) { c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID)) cutoverTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(cutoverTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(cutoverTime, jobspb.JobID(ingestionJobID)) // Pause ingestion. c.DestSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID)) @@ -577,7 +577,7 @@ func TestTenantStreamingDeleteRange(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) srcTime := c.SrcSysServer.Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTime.AsOfSystemTime()) // Introduce a DeleteRange on t1 and t2. @@ -609,7 +609,7 @@ func TestTenantStreamingDeleteRange(t *testing.T) { tableSpan.Key, tableSpan.Key.Next().Next())) } srcTimeAfterDelRange := c.SrcSysServer.Clock().Now() - c.WaitUntilHighWatermark(srcTimeAfterDelRange, jobspb.JobID(ingestionJobID)) + c.WaitUntilReplicatedTime(srcTimeAfterDelRange, jobspb.JobID(ingestionJobID)) c.RequireFingerprintMatchAtTimestamp(srcTimeAfterDelRange.AsOfSystemTime()) c.RequireFingerprintMatchAtTimestamp(srcTimeBeforeDelRange.AsOfSystemTime()) @@ -747,9 +747,13 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { if err != nil { return err } - require.True(t, frontier.LessEq(*progress.GetHighWater())) - frontier := progress.GetHighWater().GoTime().Round(time.Millisecond) - window := frontier.Sub(rec.Timestamp.GoTime().Round(time.Millisecond)) + + replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress) + require.True(t, frontier.LessEq(replicatedTime)) + + roundedReplicatedTime := replicatedTime.GoTime().Round(time.Millisecond) + roundedProtectedTime := rec.Timestamp.GoTime().Round(time.Millisecond) + window := roundedReplicatedTime.Sub(roundedProtectedTime) require.Equal(t, time.Second, window) return nil })) @@ -772,7 +776,7 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { // important because if `frontier@t2 - ReplicationTTLSeconds < t1` then we // will not update the PTS record. now := c.SrcCluster.Server(0).Clock().Now().Add(int64(time.Second)*2, 0) - c.WaitUntilHighWatermark(now, jobspb.JobID(replicationJobID)) + c.WaitUntilReplicatedTime(now, jobspb.JobID(replicationJobID)) // Check that the producer and replication job have written a protected // timestamp. @@ -780,7 +784,7 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { checkDestinationProtection(c, now, replicationJobID) now2 := now.Add(time.Second.Nanoseconds(), 0) - c.WaitUntilHighWatermark(now2, jobspb.JobID(replicationJobID)) + c.WaitUntilReplicatedTime(now2, jobspb.JobID(replicationJobID)) // Let the replication progress for a second before checking that the // protected timestamp record has also been updated on the destination // cluster. This update happens in the same txn in which we update the @@ -862,8 +866,8 @@ func TestTenantStreamingShowTenant(t *testing.T) { jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID)) jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - highWatermark := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(highWatermark, jobspb.JobID(ingestionJobID)) + targetReplicatedTime := c.SrcCluster.Server(0).Clock().Now() + c.WaitUntilReplicatedTime(targetReplicatedTime, jobspb.JobID(ingestionJobID)) destRegistry := c.DestCluster.Server(0).JobRegistry().(*jobs.Registry) details, err := destRegistry.LoadJob(ctx, jobspb.JobID(ingestionJobID)) require.NoError(t, err) @@ -894,7 +898,7 @@ func TestTenantStreamingShowTenant(t *testing.T) { require.Equal(t, ingestionJobID, jobId) require.Less(t, maxReplTime, timeutil.Now()) require.Less(t, protectedTime, timeutil.Now()) - require.GreaterOrEqual(t, maxReplTime, highWatermark.GoTime()) + require.GreaterOrEqual(t, maxReplTime, targetReplicatedTime.GoTime()) require.GreaterOrEqual(t, protectedTime, replicationDetails.ReplicationStartTime.GoTime()) require.Nil(t, cutoverTime) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go index eab265f9723b..0570ea5e3d22 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -37,24 +37,27 @@ func startDistIngestion( ) error { details := ingestionJob.Details().(jobspb.StreamIngestionDetails) - progress := ingestionJob.Progress() + streamProgress := ingestionJob.Progress().Details.(*jobspb.Progress_StreamIngest).StreamIngest - var previousHighWater, heartbeatTimestamp hlc.Timestamp + streamID := streampb.StreamID(details.StreamID) initialScanTimestamp := details.ReplicationStartTime + replicatedTime := streamProgress.ReplicatedTime + + if replicatedTime.IsEmpty() && initialScanTimestamp.IsEmpty() { + return jobs.MarkAsPermanentJobError(errors.AssertionFailedf("initial timestamp and replicated timestamp are both empty")) + } + // Start from the last checkpoint if it exists. - if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { - previousHighWater = *h - heartbeatTimestamp = previousHighWater + var heartbeatTimestamp hlc.Timestamp + if !replicatedTime.IsEmpty() { + heartbeatTimestamp = replicatedTime } else { heartbeatTimestamp = initialScanTimestamp } - log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s", - ingestionJob.ID(), heartbeatTimestamp) - - streamID := streampb.StreamID(details.StreamID) - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.InitializingReplication, - fmt.Sprintf("connecting to the producer job %d and resuming a stream replication plan", streamID)) + msg := fmt.Sprintf("resuming stream (producer job %d) from %s", + streamID, heartbeatTimestamp) + updateRunningStatus(ctx, ingestionJob, jobspb.InitializingReplication, msg) client, err := connectToActiveClient(ctx, ingestionJob, execCtx.ExecCfg().InternalDB) if err != nil { @@ -69,7 +72,7 @@ func startDistIngestion( return err } - log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID) + log.Infof(ctx, "producer job %d is active, planning DistSQL flow", streamID) dsp := execCtx.DistSQLPlanner() p, planCtx, err := makePlan( @@ -77,14 +80,12 @@ func startDistIngestion( ingestionJob, details, client, - previousHighWater, - progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest.Checkpoint, + replicatedTime, + streamProgress.Checkpoint, initialScanTimestamp)(ctx, dsp) if err != nil { return err } - log.Infof(ctx, "starting to run DistSQL flow for stream ingestion job %d", - ingestionJob.ID()) execPlan := func(ctx context.Context) error { ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil) @@ -112,9 +113,7 @@ func startDistIngestion( return rw.Err() } - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.Replicating, - "running the SQL flow for the stream ingestion job") - + updateRunningStatus(ctx, ingestionJob, jobspb.Replicating, "running replicating stream") // TODO(msbutler): Implement automatic replanning in the spirit of changefeed replanning. return execPlan(ctx) } @@ -126,7 +125,7 @@ func makePlan( ingestionJob *jobs.Job, details jobspb.StreamIngestionDetails, client streamclient.Client, - previousHighWater hlc.Timestamp, + previousReplicatedTime hlc.Timestamp, checkpoint jobspb.StreamIngestionCheckpoint, initialScanTimestamp hlc.Timestamp, ) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { @@ -158,7 +157,7 @@ func makePlan( topology, sqlInstanceIDs, initialScanTimestamp, - previousHighWater, + previousReplicatedTime, checkpoint, jobID, streamID, @@ -208,7 +207,7 @@ func constructStreamIngestionPlanSpecs( topology streamclient.Topology, sqlInstanceIDs []base.SQLInstanceID, initialScanTimestamp hlc.Timestamp, - previousHighWater hlc.Timestamp, + previousReplicatedTimestamp hlc.Timestamp, checkpoint jobspb.StreamIngestionCheckpoint, jobID jobspb.JobID, streamID streampb.StreamID, @@ -226,13 +225,13 @@ func constructStreamIngestionPlanSpecs( // the partition addresses. if i < len(sqlInstanceIDs) { spec := &execinfrapb.StreamIngestionDataSpec{ - StreamID: uint64(streamID), - JobID: int64(jobID), - PreviousHighWaterTimestamp: previousHighWater, - InitialScanTimestamp: initialScanTimestamp, - Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info - StreamAddress: string(streamAddress), - PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec), + StreamID: uint64(streamID), + JobID: int64(jobID), + PreviousReplicatedTimestamp: previousReplicatedTimestamp, + InitialScanTimestamp: initialScanTimestamp, + Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info + StreamAddress: string(streamAddress), + PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec), TenantRekey: execinfrapb.TenantRekey{ OldID: sourceTenantID, NewID: destinationTenantID, @@ -256,7 +255,7 @@ func constructStreamIngestionPlanSpecs( // Create a spec for the StreamIngestionFrontier processor on the coordinator // node. streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{ - HighWaterAtStart: previousHighWater, + ReplicatedTimeAtStart: previousReplicatedTimestamp, TrackedSpans: trackedSpans, JobID: int64(jobID), StreamID: uint64(streamID), diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go index c7dd0ef97297..6c55fd73e4fc 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go @@ -57,9 +57,6 @@ type streamIngestionFrontier struct { // input returns rows from one or more streamIngestion processors. input execinfra.RowSource - // highWaterAtStart is the job high-water. It's used in an assertion that we - // never regress the job high-water. - highWaterAtStart hlc.Timestamp // frontier contains the current resolved timestamp high-water for the tracked // span set. @@ -72,9 +69,13 @@ type streamIngestionFrontier struct { // stream alive. heartbeatSender *heartbeatSender - // persistedHighWater stores the highwater mark of progress that is persisted - // in the job record. - persistedHighWater hlc.Timestamp + // replicatedTimeAtStart is the job's replicated time when + // this processor started. It's used in an assertion that we + // never regress the job's replicated time. + replicatedTimeAtStart hlc.Timestamp + // persistedReplicatedTime stores the highwater mark of + // progress that is persisted in the job record. + persistedReplicatedTime hlc.Timestamp lastPartitionUpdate time.Time partitionProgress map[string]jobspb.StreamIngestionProgress_PartitionProgress @@ -95,7 +96,7 @@ func newStreamIngestionFrontierProcessor( input execinfra.RowSource, post *execinfrapb.PostProcessSpec, ) (execinfra.Processor, error) { - frontier, err := span.MakeFrontierAt(spec.HighWaterAtStart, spec.TrackedSpans...) + frontier, err := span.MakeFrontierAt(spec.ReplicatedTimeAtStart, spec.TrackedSpans...) if err != nil { return nil, err } @@ -116,15 +117,15 @@ func newStreamIngestionFrontierProcessor( } } sf := &streamIngestionFrontier{ - flowCtx: flowCtx, - spec: spec, - input: input, - highWaterAtStart: spec.HighWaterAtStart, - frontier: frontier, - partitionProgress: partitionProgress, - metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics), - heartbeatSender: heartbeatSender, - persistedHighWater: spec.HighWaterAtStart, + flowCtx: flowCtx, + spec: spec, + input: input, + replicatedTimeAtStart: spec.ReplicatedTimeAtStart, + frontier: frontier, + partitionProgress: partitionProgress, + metrics: flowCtx.Cfg.JobRegistry.MetricsStruct().StreamIngest.(*Metrics), + heartbeatSender: heartbeatSender, + persistedReplicatedTime: spec.ReplicatedTimeAtStart, } if err := sf.Init( ctx, @@ -304,10 +305,10 @@ func (sf *streamIngestionFrontier) Next() ( case <-sf.Ctx().Done(): sf.MoveToDraining(sf.Ctx().Err()) return nil, sf.DrainHelper() - // Send the latest persisted highwater in the heartbeat to the source cluster + // Send the latest persisted replicated time in the heartbeat to the source cluster // as even with retries we will never request an earlier row than it, and // the source cluster is free to clean up earlier data. - case sf.heartbeatSender.frontierUpdates <- sf.persistedHighWater: + case sf.heartbeatSender.frontierUpdates <- sf.persistedReplicatedTime: // If heartbeatSender has error, it means remote has error, we want to // stop the processor. case <-sf.heartbeatSender.stoppedChan: @@ -371,10 +372,10 @@ func (sf *streamIngestionFrontier) noteResolvedTimestamps( // Inserting a timestamp less than the one the ingestion flow started at could // potentially regress the job progress. This is not expected and thus we // assert to catch such unexpected behavior. - if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(sf.highWaterAtStart) { + if !resolved.Timestamp.IsEmpty() && resolved.Timestamp.Less(sf.replicatedTimeAtStart) { return frontierChanged, errors.AssertionFailedf( `got a resolved timestamp %s that is less than the frontier processor start time %s`, - redact.Safe(resolved.Timestamp), redact.Safe(sf.highWaterAtStart)) + redact.Safe(resolved.Timestamp), redact.Safe(sf.replicatedTimeAtStart)) } changed, err := sf.frontier.Forward(resolved.Span, resolved.Timestamp) @@ -405,7 +406,7 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { return span.ContinueMatch }) - highWatermark := f.Frontier() + replicatedTime := f.Frontier() partitionProgress := sf.partitionProgress sf.lastPartitionUpdate = timeutil.Now() @@ -418,17 +419,20 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { } progress := md.Progress - // Keep the recorded highwater empty until some advancement has been made - if sf.highWaterAtStart.Less(highWatermark) { - progress.Progress = &jobspb.Progress_HighWater{ - HighWater: &highWatermark, - } - } - streamProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest streamProgress.PartitionProgress = partitionProgress streamProgress.Checkpoint.ResolvedSpans = frontierResolvedSpans + // Keep the recorded replicatedTime empty until some advancement has been made + if sf.replicatedTimeAtStart.Less(replicatedTime) { + streamProgress.ReplicatedTime = replicatedTime + // The HighWater is for informational purposes + // only. + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &replicatedTime, + } + } + ju.UpdateProgress(progress) // Reset RunStats.NumRuns to 1 since the stream ingestion has returned to @@ -439,9 +443,9 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { } // Update the protected timestamp record protecting the destination tenant's - // keyspan if the highWatermark has moved forward since the last time we + // keyspan if the replicatedTime has moved forward since the last time we // recorded progress. This makes older revisions of replicated values with a - // timestamp less than highWatermark - ReplicationTTLSeconds, eligible for + // timestamp less than replicatedTime - ReplicationTTLSeconds eligible for // garbage collection. replicationDetails := md.Payload.GetStreamIngestion() if replicationDetails.ProtectedTimestampRecordID == nil { @@ -453,22 +457,30 @@ func (sf *streamIngestionFrontier) maybeUpdatePartitionProgress() error { if err != nil { return err } - newProtectAbove := highWatermark.Add( + newProtectAbove := replicatedTime.Add( -int64(replicationDetails.ReplicationTTLSeconds)*time.Second.Nanoseconds(), 0) + + // If we have a CutoverTime set, keep the protected + // timestamp at or below the cutover time. + if !streamProgress.CutoverTime.IsEmpty() && streamProgress.CutoverTime.Less(newProtectAbove) { + newProtectAbove = streamProgress.CutoverTime + } + if record.Timestamp.Less(newProtectAbove) { return ptp.UpdateTimestamp(ctx, *replicationDetails.ProtectedTimestampRecordID, newProtectAbove) } + return nil }); err != nil { return err } sf.metrics.JobProgressUpdates.Inc(1) - sf.persistedHighWater = f.Frontier() + sf.persistedReplicatedTime = f.Frontier() sf.metrics.FrontierCheckpointSpanCount.Update(int64(len(frontierResolvedSpans))) - if !sf.persistedHighWater.IsEmpty() { - // Only update the frontier lag if the high water mark has been updated, + if !sf.persistedReplicatedTime.IsEmpty() { + // Only update the frontier lag if the replicated time has been updated, // implying the initial scan has completed. - sf.metrics.FrontierLagNanos.Update(timeutil.Since(sf.persistedHighWater.GoTime()).Nanoseconds()) + sf.metrics.FrontierLagNanos.Update(timeutil.Since(sf.persistedReplicatedTime.GoTime()).Nanoseconds()) } return nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go deleted file mode 100644 index 9b068fcb9299..000000000000 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ /dev/null @@ -1,388 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package streamingest - -import ( - "context" - "fmt" - "math" - "testing" - "time" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" - "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" - "github.com/cockroachdb/cockroach/pkg/sql/execinfra" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/distsqlutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" - "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" - "github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/leaktest" - "github.com/cockroachdb/cockroach/pkg/util/limit" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/stretchr/testify/require" -) - -type partitionToEvent map[string][]streamingccl.Event - -func TestStreamIngestionFrontierProcessor(t *testing.T) { - defer leaktest.AfterTest(t)() - skip.WithIssue(t, 87145, "flaky test") - - ctx := context.Background() - - tc := testcluster.StartTestCluster(t, 3 /* nodes */, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - JobsTestingKnobs: &jobs.TestingKnobs{ - // We create a job record to track persistence but we don't want it to - // be adopted as the processors are being manually executed in the test. - DisableAdoptions: true, - }, - // DisableAdoptions needs this. - UpgradeManager: &upgradebase.TestingKnobs{ - DontUseJobs: true, - }, - }, - }, - }) - defer tc.Stopper().Stop(context.Background()) - - st := cluster.MakeTestingClusterSettings() - JobCheckpointFrequency.Override(ctx, &st.SV, 200*time.Millisecond) - streamingccl.StreamReplicationConsumerHeartbeatFrequency.Override(ctx, &st.SV, 100*time.Millisecond) - - evalCtx := eval.MakeTestingEvalContext(st) - - testDiskMonitor := execinfra.NewTestDiskMonitor(ctx, st) - defer testDiskMonitor.Stop(ctx) - - registry := tc.Server(0).JobRegistry().(*jobs.Registry) - flowCtx := execinfra.FlowCtx{ - Cfg: &execinfra.ServerConfig{ - Settings: st, - DB: tc.Server(0).InternalDB().(descs.DB), - JobRegistry: registry, - BulkSenderLimiter: limit.MakeConcurrentRequestLimiter("test", math.MaxInt), - }, - EvalCtx: &evalCtx, - Mon: evalCtx.TestingMon, - DiskMonitor: testDiskMonitor, - } - - post := execinfrapb.PostProcessSpec{} - - var spec execinfrapb.StreamIngestionDataSpec - // The stream address needs to be set with a scheme we support, but this test - // will mock out the actual client. - spec.StreamAddress = "randomgen://test/" - pa1 := "randomgen://test1/" - pa2 := "randomgen://test2/" - - pa1StartKey := roachpb.Key("key_1") - pa1Span := roachpb.Span{Key: pa1StartKey, EndKey: pa1StartKey.Next()} - pa2StartKey := roachpb.Key("key_2") - pa2Span := roachpb.Span{Key: pa2StartKey, EndKey: pa2StartKey.Next()} - - v := roachpb.MakeValueFromString("value_1") - v.Timestamp = hlc.Timestamp{WallTime: 1} - - const tenantID = 20 - sampleKV := func() roachpb.KeyValue { - key, err := keys.RewriteKeyToTenantPrefix(roachpb.Key("key_1"), - keys.MakeTenantPrefix(roachpb.MustMakeTenantID(tenantID))) - require.NoError(t, err) - return roachpb.KeyValue{Key: key, Value: v} - } - sampleCheckpoint := func(span roachpb.Span, ts int64) []jobspb.ResolvedSpan { - return []jobspb.ResolvedSpan{{Span: span, Timestamp: hlc.Timestamp{WallTime: ts}}} - } - sampleCheckpointWithLogicTS := func(span roachpb.Span, ts int64, logicalTs int32) []jobspb.ResolvedSpan { - return []jobspb.ResolvedSpan{{Span: span, Timestamp: hlc.Timestamp{WallTime: ts, Logical: logicalTs}}} - } - - for _, tc := range []struct { - name string - events partitionToEvent - expectedFrontierTimestamp hlc.Timestamp - frontierStartTime hlc.Timestamp - jobCheckpoint []jobspb.ResolvedSpan - }{ - { - name: "same-resolved-ts-across-partitions", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 1)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 4)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 1)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 4)), - }}, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 4}, - }, - { - // No progress should be reported to the job since partition 2 has not - // emitted a resolved ts. - name: "no-partition-checkpoints", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV()), - }, pa2: []streamingccl.Event{ - streamingccl.MakeKVEvent(sampleKV()), - }}, - }, - { - // No progress should be reported to the job since partition 2 has not - // emitted a resolved ts. - name: "no-checkpoint-from-one-partition", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 1)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 4)), - }, pa2: []streamingccl.Event{}}, - }, - { - name: "one-partition-ahead-of-the-other", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 1)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 4)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 1)), - }}, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 1}, - }, - { - name: "some-interleaved-timestamps", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 2)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 4)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 3)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 5)), - }}, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 4}, - }, - { - name: "some-interleaved-logical-timestamps", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpointWithLogicTS(pa1Span, 1, 2)), - streamingccl.MakeCheckpointEvent(sampleCheckpointWithLogicTS(pa1Span, 1, 4)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpointWithLogicTS(pa2Span, 1, 1)), - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 2)), - }}, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 1, Logical: 4}, - }, - { - // The frontier should error out as it receives a checkpoint with a ts - // lower than its start time. - name: "partition-checkpoint-lower-than-start-ts", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpointWithLogicTS(pa1Span, 1, 4)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpointWithLogicTS(pa2Span, 1, 2)), - }}, - frontierStartTime: hlc.Timestamp{WallTime: 1, Logical: 3}, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 1, Logical: 3}, - }, - { - name: "existing-job-checkpoint", - events: partitionToEvent{pa1: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa1Span, 5)), - }, pa2: []streamingccl.Event{ - streamingccl.MakeCheckpointEvent(sampleCheckpoint(pa2Span, 2)), - }}, - frontierStartTime: hlc.Timestamp{WallTime: 1}, - jobCheckpoint: []jobspb.ResolvedSpan{ - {Span: pa1Span, Timestamp: hlc.Timestamp{WallTime: 4}}, - {Span: pa2Span, Timestamp: hlc.Timestamp{WallTime: 3}}, - }, - expectedFrontierTimestamp: hlc.Timestamp{WallTime: 3}, - }, - } { - t.Run(tc.name, func(t *testing.T) { - topology := streamclient.Topology{ - Partitions: []streamclient.PartitionInfo{ - { - ID: pa1, - SubscriptionToken: []byte(pa1), - SrcAddr: streamingccl.PartitionAddress(pa1), - Spans: []roachpb.Span{pa1Span}, - }, - { - ID: pa2, - SubscriptionToken: []byte(pa2), - SrcAddr: streamingccl.PartitionAddress(pa2), - Spans: []roachpb.Span{pa2Span}, - }, - }, - } - - spec.PartitionSpecs = map[string]execinfrapb.StreamIngestionPartitionSpec{} - for _, partition := range topology.Partitions { - spec.PartitionSpecs[partition.ID] = execinfrapb.StreamIngestionPartitionSpec{ - PartitionID: partition.ID, - SubscriptionToken: string(partition.SubscriptionToken), - Address: string(partition.SrcAddr), - Spans: partition.Spans, - } - } - spec.TenantRekey = execinfrapb.TenantRekey{ - OldID: roachpb.MustMakeTenantID(tenantID), - NewID: roachpb.MustMakeTenantID(tenantID + 10), - } - spec.PreviousHighWaterTimestamp = tc.frontierStartTime - if tc.frontierStartTime.IsEmpty() { - spec.InitialScanTimestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - } - spec.Checkpoint.ResolvedSpans = tc.jobCheckpoint - proc, err := newStreamIngestionDataProcessor(ctx, &flowCtx, 0 /* processorID */, spec, &post) - require.NoError(t, err) - sip, ok := proc.(*streamIngestionProcessor) - if !ok { - t.Fatal("expected the processor that's created to be a stream ingestion processor") - } - - // Inject a mock client with the events being tested against. - doneCh := make(chan struct{}) - defer close(doneCh) - sip.forceClientForTests = &mockStreamClient{ - partitionEvents: tc.events, - doneCh: doneCh, - } - defer func() { - require.NoError(t, sip.forceClientForTests.Close(ctx)) - }() - - jobID := registry.MakeJobID() - - t.Logf("Using JobID: %v", jobID) - - // Create a frontier processor. - var frontierSpec execinfrapb.StreamIngestionFrontierSpec - frontierSpec.StreamAddresses = topology.StreamAddresses() - frontierSpec.TrackedSpans = []roachpb.Span{pa1Span, pa2Span} - frontierSpec.Checkpoint.ResolvedSpans = tc.jobCheckpoint - frontierSpec.JobID = int64(jobID) - - if !tc.frontierStartTime.IsEmpty() { - frontierSpec.HighWaterAtStart = tc.frontierStartTime - } - - // Create a mock ingestion job. - record := jobs.Record{ - JobID: jobID, - Description: "fake ingestion job", - Username: username.TestUserName(), - Details: jobspb.StreamIngestionDetails{StreamAddress: spec.StreamAddress}, - // We don't use this so it does not matter what we set it too, as long - // as it is non-nil. - Progress: jobspb.StreamIngestionProgress{}, - } - record.CreatedBy = &jobs.CreatedByInfo{ - Name: "ingestion", - } - _, err = registry.CreateJobWithTxn(ctx, record, record.JobID, nil) - require.NoError(t, err) - - frontierPost := execinfrapb.PostProcessSpec{} - frontierOut := distsqlutils.RowBuffer{} - frontierProc, err := newStreamIngestionFrontierProcessor( - ctx, &flowCtx, 0 /* processorID*/, frontierSpec, sip, &frontierPost, - ) - require.NoError(t, err) - fp, ok := frontierProc.(*streamIngestionFrontier) - if !ok { - t.Fatal("expected the processor that's created to be a stream ingestion frontier") - } - ctxWithCancel, cancel := context.WithCancel(ctx) - defer cancel() - - client := streamclient.GetRandomStreamClientSingletonForTesting() - defer func() { - require.NoError(t, client.Close(context.Background())) - }() - - client.ClearInterceptors() - - // Record heartbeats in a list and terminate the client once the expected - // frontier timestamp has been reached - heartbeats := make([]hlc.Timestamp, 0) - client.RegisterHeartbeatInterception(func(heartbeatTs hlc.Timestamp) { - heartbeats = append(heartbeats, heartbeatTs) - if tc.expectedFrontierTimestamp.LessEq(heartbeatTs) { - doneCh <- struct{}{} - } - - // Ensure we never heartbeat a value later than what is persisted - job, err := registry.LoadJob(ctx, jobID) - require.NoError(t, err) - progress := job.Progress().Progress - if progress == nil { - if !heartbeatTs.Equal(spec.InitialScanTimestamp) { - t.Fatalf("heartbeat %v should equal start time of %v", heartbeatTs, spec.InitialScanTimestamp) - } - } else { - persistedHighwater := *progress.(*jobspb.Progress_HighWater).HighWater - require.True(t, heartbeatTs.LessEq(persistedHighwater)) - } - }) - - fp.Run(ctxWithCancel, &frontierOut) - - if !frontierOut.ProducerClosed() { - t.Fatal("producer for StreamFrontierProcessor not closed") - } - - minCheckpointTs := hlc.Timestamp{} - for _, resolvedSpan := range tc.jobCheckpoint { - if minCheckpointTs.IsEmpty() || resolvedSpan.Timestamp.Less(minCheckpointTs) { - minCheckpointTs = resolvedSpan.Timestamp - } - - // Ensure that the frontier is at least at the checkpoint for this span - require.True(t, resolvedSpan.Timestamp.LessEq(frontierForSpans(fp.frontier, resolvedSpan.Span))) - } - - // Wait until the frontier terminates - _, meta := frontierOut.Next() - if meta != nil { - if !tc.frontierStartTime.IsEmpty() { - require.True(t, testutils.IsError(meta.Err, fmt.Sprintf("got a resolved timestamp ."+ - "* that is less than the frontier processor start time %s", - tc.frontierStartTime.String()))) - return - } - t.Fatalf("unexpected meta record returned by frontier processor: %+v\n", *meta) - } - - // Ensure that the rows emitted by the frontier never regress the ts or - // appear prior to a checkpoint. - var prevTimestamp hlc.Timestamp - for _, heartbeatTs := range heartbeats { - if !prevTimestamp.IsEmpty() { - require.True(t, prevTimestamp.LessEq(heartbeatTs)) - require.True(t, minCheckpointTs.LessEq(heartbeatTs)) - } - prevTimestamp = heartbeatTs - } - - // Check the final ts recorded by the frontier. - require.Equal(t, tc.expectedFrontierTimestamp, prevTimestamp) - }) - } -} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index c1aa8acfa993..e818e87343ac 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -165,22 +165,18 @@ func waitUntilProducerActive( func updateRunningStatus( ctx context.Context, - execCtx sql.JobExecContext, ingestionJob *jobs.Job, status jobspb.ReplicationStatus, runningStatus string, ) { - execCfg := execCtx.ExecCfg() - err := execCfg.InternalDB.Txn(ctx, func( - ctx context.Context, txn isql.Txn, - ) error { - return ingestionJob.WithTxn(txn).Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - updateRunningStatusInternal(md, ju, status, runningStatus) - return nil - }) + err := ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + updateRunningStatusInternal(md, ju, status, runningStatus) + return nil }) if err != nil { log.Warningf(ctx, "error when updating job running status: %s", err) + } else { + log.Infof(ctx, "%s", runningStatus) } } @@ -203,7 +199,7 @@ func completeIngestion( streamID := details.StreamID log.Infof(ctx, "completing the producer job %d", streamID) - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationCuttingOver, + updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationCuttingOver, "completing the producer job in the source cluster") completeProducerJob(ctx, ingestionJob, execCtx.ExecCfg().InternalDB, true) @@ -306,14 +302,14 @@ func ingestWithRetries( } const msgFmt = "stream ingestion waits for retrying after error: %s" log.Warningf(ctx, msgFmt, err) - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationError, + updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, fmt.Sprintf(msgFmt, err)) retryCount++ } if err != nil { return err } - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationCuttingOver, + updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationCuttingOver, "stream ingestion finished successfully") return nil } @@ -324,7 +320,7 @@ func (s *streamIngestionResumer) handleResumeError( ) error { const errorFmt = "ingestion job failed (%s) but is being paused" log.Warningf(ctx, errorFmt, err) - updateRunningStatus(ctx, execCtx, s.job, jobspb.ReplicationError, fmt.Sprintf(errorFmt, err)) + updateRunningStatus(ctx, s.job, jobspb.ReplicationError, fmt.Sprintf(errorFmt, err)) // The ingestion job is paused but the producer job will keep // running until it times out. Users can still resume ingestion before // the producer job times out. @@ -436,9 +432,12 @@ func cutoverTimeIsEligibleForCutover( log.Infof(ctx, "empty cutover time, no revert required") return false } - if progress.GetHighWater() == nil || progress.GetHighWater().Less(cutoverTime) { - log.Infof(ctx, "job with highwater %s not yet ready to revert to cutover at %s", - progress.GetHighWater(), cutoverTime.String()) + + replicatedTime := replicationutils.ReplicatedTimeFromProgress(progress) + if replicatedTime.Less(cutoverTime) { + log.Infof(ctx, "job with replicated time %s not yet ready to revert to cutover at %s", + replicatedTime, + cutoverTime.String()) return false } return true diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index ad5e8ae7fc95..eb58a478b474 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -261,11 +261,23 @@ func TestCutoverBuiltin(t *testing.T) { require.True(t, ok) require.True(t, sp.StreamIngest.CutoverTime.IsEmpty()) - var highWater time.Time + var replicatedTime time.Time err = job.NoTxn().Update(ctx, func(_ isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - highWater = timeutil.Now().Round(time.Microsecond) - hlcHighWater := hlc.Timestamp{WallTime: highWater.UnixNano()} - return jobs.UpdateHighwaterProgressed(hlcHighWater, md, ju) + if err := md.CheckRunningOrReverting(); err != nil { + return err + } + replicatedTime = timeutil.Now().Round(time.Microsecond) + hlcReplicatedTime := hlc.Timestamp{WallTime: replicatedTime.UnixNano()} + + progress := md.Progress + streamProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest + streamProgress.ReplicatedTime = hlcReplicatedTime + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &hlcReplicatedTime, + } + + ju.UpdateProgress(progress) + return nil }) require.NoError(t, err) @@ -273,7 +285,7 @@ func TestCutoverBuiltin(t *testing.T) { var explain string err = db.QueryRowContext(ctx, `EXPLAIN SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, job.ID(), - highWater).Scan(&explain) + replicatedTime).Scan(&explain) require.NoError(t, err) require.Equal(t, "distribution: local", explain) @@ -281,7 +293,7 @@ func TestCutoverBuiltin(t *testing.T) { err = db.QueryRowContext( ctx, `SELECT crdb_internal.complete_stream_ingestion_job($1, $2)`, - job.ID(), highWater).Scan(&jobID) + job.ID(), replicatedTime).Scan(&jobID) require.NoError(t, err) require.Equal(t, job.ID(), jobspb.JobID(jobID)) @@ -291,7 +303,7 @@ func TestCutoverBuiltin(t *testing.T) { progress = sj.Progress() sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest) require.True(t, ok) - require.Equal(t, hlc.Timestamp{WallTime: highWater.UnixNano()}, sp.StreamIngest.CutoverTime) + require.Equal(t, hlc.Timestamp{WallTime: replicatedTime.UnixNano()}, sp.StreamIngest.CutoverTime) } // TestReplicationJobResumptionStartTime tests that a replication job picks the @@ -346,14 +358,14 @@ func TestReplicationJobResumptionStartTime(t *testing.T) { for _, r := range replicationSpecs { require.Equal(t, startTime, r.InitialScanTimestamp) - require.Empty(t, r.PreviousHighWaterTimestamp) + require.Empty(t, r.PreviousReplicatedTimestamp) } - require.Empty(t, frontier.HighWaterAtStart) + require.Empty(t, frontier.ReplicatedTimeAtStart) // Allow the job to make some progress. canContinue <- struct{}{} srcTime := c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(replicationJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID)) // Pause the job. c.DestSysSQL.Exec(t, `PAUSE JOB $1`, replicationJobID) @@ -373,20 +385,20 @@ func TestReplicationJobResumptionStartTime(t *testing.T) { // Assert that the previous highwater mark is greater than the replication // start time. - var previousHighWaterTimestamp hlc.Timestamp + var previousReplicatedTimestamp hlc.Timestamp for _, r := range replicationSpecs { require.Equal(t, startTime, r.InitialScanTimestamp) - require.True(t, r.InitialScanTimestamp.Less(r.PreviousHighWaterTimestamp)) - if previousHighWaterTimestamp.IsEmpty() { - previousHighWaterTimestamp = r.PreviousHighWaterTimestamp + require.True(t, r.InitialScanTimestamp.Less(r.PreviousReplicatedTimestamp)) + if previousReplicatedTimestamp.IsEmpty() { + previousReplicatedTimestamp = r.PreviousReplicatedTimestamp } else { - require.Equal(t, r.PreviousHighWaterTimestamp, previousHighWaterTimestamp) + require.Equal(t, r.PreviousReplicatedTimestamp, previousReplicatedTimestamp) } } - require.Equal(t, frontier.HighWaterAtStart, previousHighWaterTimestamp) + require.Equal(t, frontier.ReplicatedTimeAtStart, previousReplicatedTimestamp) canContinue <- struct{}{} srcTime = c.SrcCluster.Server(0).Clock().Now() - c.WaitUntilHighWatermark(srcTime, jobspb.JobID(replicationJobID)) + c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID)) c.Cutover(producerJobID, replicationJobID, srcTime.GoTime(), false) jobutils.WaitForJobToSucceed(t, c.DestSysSQL, jobspb.JobID(replicationJobID)) } @@ -461,7 +473,17 @@ func TestCutoverFractionProgressed(t *testing.T) { replicationJob, err := registry.CreateJobWithTxn(ctx, mockReplicationJobRecord, jobID, nil) require.NoError(t, err) require.NoError(t, replicationJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - return jobs.UpdateHighwaterProgressed(cutover, md, ju) + if err := md.CheckRunningOrReverting(); err != nil { + return err + } + progress := md.Progress + streamProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest + streamProgress.ReplicatedTime = cutover + progress.Progress = &jobspb.Progress_HighWater{ + HighWater: &cutover, + } + ju.UpdateProgress(progress) + return nil })) metrics := registry.MetricsStruct().StreamIngest.(*Metrics) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index fce64cc43bac..7906ee64fa50 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowexec" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -264,7 +265,7 @@ func newStreamIngestionDataProcessor( trackedSpans = append(trackedSpans, partitionSpec.Spans...) } - frontier, err := span.MakeFrontierAt(spec.PreviousHighWaterTimestamp, trackedSpans...) + frontier, err := span.MakeFrontierAt(spec.PreviousReplicatedTimestamp, trackedSpans...) if err != nil { return nil, err } @@ -281,8 +282,8 @@ func newStreamIngestionDataProcessor( frontier: frontier, maxFlushRateTimer: timeutil.NewTimer(), cutoverProvider: &cutoverFromJobProgress{ - jobID: jobspb.JobID(spec.JobID), - registry: flowCtx.Cfg.JobRegistry, + jobID: jobspb.JobID(spec.JobID), + db: flowCtx.Cfg.DB, }, cutoverCh: make(chan struct{}), closePoller: make(chan struct{}), @@ -371,16 +372,16 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { sip.streamPartitionClients = append(sip.streamPartitionClients, streamClient) } - previousHighWater := frontierForSpans(sip.frontier, partitionSpec.Spans...) + previousReplicatedTimetamp := frontierForSpans(sip.frontier, partitionSpec.Spans...) if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil { - streamingKnobs.BeforeClientSubscribe(addr, string(token), previousHighWater) + streamingKnobs.BeforeClientSubscribe(addr, string(token), previousReplicatedTimetamp) } } sub, err := streamClient.Subscribe(ctx, streampb.StreamID(sip.spec.StreamID), token, - sip.spec.InitialScanTimestamp, previousHighWater) + sip.spec.InitialScanTimestamp, previousReplicatedTimetamp) if err != nil { sip.MoveToDraining(errors.Wrapf(err, "consuming partition %v", addr)) @@ -1156,25 +1157,54 @@ type cutoverProvider interface { // custoverFromJobProgress is a cutoverProvider that decides whether the cutover // time has been reached based on the progress stored on the job record. type cutoverFromJobProgress struct { - registry *jobs.Registry - jobID jobspb.JobID + db isql.DB + jobID jobspb.JobID +} + +func (c *cutoverFromJobProgress) loadIngestionProgress( + ctx context.Context, +) (*jobspb.StreamIngestionProgress, error) { + var ( + progressBytes []byte + exists bool + ) + if err := c.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + infoStorage := jobs.InfoStorageForJob(txn, c.jobID) + var err error + progressBytes, exists, err = infoStorage.GetLegacyProgress(ctx) + return err + }); err != nil { + return nil, err + } + if !exists { + return nil, nil + } + progress := &jobspb.Progress{} + if err := protoutil.Unmarshal(progressBytes, progress); err != nil { + return nil, err + } + + sp, ok := progress.GetDetails().(*jobspb.Progress_StreamIngest) + if !ok { + return nil, errors.Newf("unknown progress details type %T in stream ingestion job %d", + progress.GetDetails(), c.jobID) + } + return sp.StreamIngest, nil } func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, error) { - j, err := c.registry.LoadJob(ctx, c.jobID) + ingestionProgress, err := c.loadIngestionProgress(ctx) if err != nil { return false, err } - progress := j.Progress() - var sp *jobspb.Progress_StreamIngest - var ok bool - if sp, ok = progress.GetDetails().(*jobspb.Progress_StreamIngest); !ok { - return false, errors.Newf("unknown progress type %T in stream ingestion job %d", - j.Progress().Progress, c.jobID) + if ingestionProgress == nil { + log.Warningf(ctx, "no legacy job progress recorded yet") + return false, nil } - // Job has been signaled to complete. - if resolvedTimestamp := progress.GetHighWater(); !sp.StreamIngest.CutoverTime.IsEmpty() && - resolvedTimestamp != nil && sp.StreamIngest.CutoverTime.Less(*resolvedTimestamp) { + + cutoverTime := ingestionProgress.CutoverTime + replicatedTime := ingestionProgress.ReplicatedTime + if !cutoverTime.IsEmpty() && cutoverTime.LessEq(replicatedTime) { return true, nil } diff --git a/pkg/ccl/streamingccl/streamingest/testdata/cutover_after_pause b/pkg/ccl/streamingccl/streamingest/testdata/cutover_after_pause index b88a01b6eabe..589f2a759b98 100644 --- a/pkg/ccl/streamingccl/streamingest/testdata/cutover_after_pause +++ b/pkg/ccl/streamingccl/streamingest/testdata/cutover_after_pause @@ -27,7 +27,7 @@ let $afterImport as=source-system SELECT clock_timestamp()::timestamp::string ---- -wait-until-high-watermark ts=$afterImport +wait-until-replicated-time ts=$afterImport ---- job as=destination-system pause diff --git a/pkg/ccl/streamingccl/streamingest/testdata/simple b/pkg/ccl/streamingccl/streamingest/testdata/simple index d2c8b7a2c115..ff9620b01a9f 100644 --- a/pkg/ccl/streamingccl/streamingest/testdata/simple +++ b/pkg/ccl/streamingccl/streamingest/testdata/simple @@ -10,7 +10,7 @@ let $start as=source-system SELECT clock_timestamp()::timestamp::string ---- -wait-until-high-watermark ts=$start +wait-until-replicated-time ts=$start ---- # The job description should be redacted. diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 0a349145d375..3e9beff52863 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -115,7 +115,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { } initialTimestamp := s.spec.InitialScanTimestamp - if s.spec.PreviousHighWaterTimestamp.IsEmpty() { + if s.spec.PreviousReplicatedTimestamp.IsEmpty() { opts = append(opts, rangefeed.WithInitialScan(func(ctx context.Context) {}), rangefeed.WithScanRetryBehavior(rangefeed.ScanRetryRemaining), @@ -132,10 +132,10 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { rangefeed.WithOnScanCompleted(s.onInitialScanSpanCompleted), ) } else { - initialTimestamp = s.spec.PreviousHighWaterTimestamp + initialTimestamp = s.spec.PreviousReplicatedTimestamp // When resuming from cursor, advance frontier to the cursor position. for _, sp := range s.spec.Spans { - if _, err := frontier.Forward(sp, s.spec.PreviousHighWaterTimestamp); err != nil { + if _, err := frontier.Forward(sp, s.spec.PreviousReplicatedTimestamp); err != nil { return err } } diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go index 2c42fec9f81d..fbb10d63add4 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -302,7 +302,7 @@ func encodeSpec( h *replicationtestutils.ReplicationHelper, srcTenant replicationtestutils.TenantState, initialScanTime hlc.Timestamp, - previousHighWater hlc.Timestamp, + previousReplicatedTime hlc.Timestamp, tables ...string, ) []byte { var spans []roachpb.Span @@ -313,9 +313,9 @@ func encodeSpec( } spec := &streampb.StreamPartitionSpec{ - InitialScanTimestamp: initialScanTime, - PreviousHighWaterTimestamp: previousHighWater, - Spans: spans, + InitialScanTimestamp: initialScanTime, + PreviousReplicatedTimestamp: previousReplicatedTime, + Spans: spans, Config: streampb.StreamPartitionSpec_ExecutionConfig{ MinCheckpointFrequency: 10 * time.Millisecond, }, @@ -518,17 +518,17 @@ USE d; srcTenant.SQL.Exec(t, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, n INT)", table)) srcTenant.SQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&clockTime) - var previousHighWater hlc.Timestamp + var previousReplicatedTime hlc.Timestamp initialScanTimestamp := hlc.Timestamp{WallTime: clockTime.UnixNano()} if !initialScan { - previousHighWater = hlc.Timestamp{WallTime: clockTime.UnixNano()} + previousReplicatedTime = hlc.Timestamp{WallTime: clockTime.UnixNano()} } if addSSTableBeforeRangefeed { srcTenant.SQL.Exec(t, fmt.Sprintf("IMPORT INTO %s CSV DATA ($1)", table), dataSrv.URL) } source, feed := startReplication(ctx, t, h, makePartitionStreamDecoder, streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp, - previousHighWater, table)) + previousReplicatedTime, table)) defer feed.Close(ctx) if !addSSTableBeforeRangefeed { srcTenant.SQL.Exec(t, fmt.Sprintf("IMPORT INTO %s CSV DATA ($1)", table), dataSrv.URL) diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index faa6ea9ae870..22ca7615ce51 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -180,6 +180,7 @@ go_library( "//pkg/ccl/changefeedccl", "//pkg/ccl/changefeedccl/cdctest", "//pkg/ccl/changefeedccl/changefeedbase", + "//pkg/ccl/streamingccl/replicationutils", "//pkg/cli", "//pkg/cli/clisqlclient", "//pkg/cloud", diff --git a/pkg/cmd/roachtest/tests/cluster_to_cluster.go b/pkg/cmd/roachtest/tests/cluster_to_cluster.go index 6c6506418632..cc95c1b80c94 100644 --- a/pkg/cmd/roachtest/tests/cluster_to_cluster.go +++ b/pkg/cmd/roachtest/tests/cluster_to_cluster.go @@ -22,6 +22,7 @@ import ( "strings" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" @@ -508,14 +509,14 @@ func (rd *replicationDriver) runWorkload(ctx context.Context) error { return rd.rs.workload.runDriver(ctx, rd.c, rd.t, rd.setup) } -func (rd *replicationDriver) waitForHighWatermark(ingestionJobID int, wait time.Duration) { +func (rd *replicationDriver) waitForReplicatedTime(ingestionJobID int, wait time.Duration) { testutils.SucceedsWithin(rd.t, func() error { info, err := getStreamIngestionJobInfo(rd.setup.dst.db, ingestionJobID) if err != nil { return err } if info.GetHighWater().IsZero() { - return errors.New("no high watermark") + return errors.New("no replicated time") } return nil }, wait) @@ -655,7 +656,7 @@ func (rd *replicationDriver) main(ctx context.Context) { }) rd.t.L().Printf("waiting for replication stream to finish ingesting initial scan") - rd.waitForHighWatermark(ingestionJobID, rd.rs.timeout/2) + rd.waitForReplicatedTime(ingestionJobID, rd.rs.timeout/2) rd.metrics.initialScanEnd = newMetricSnapshot(metricSnapper, timeutil.Now()) rd.t.Status(fmt.Sprintf(`initial scan complete. run workload and repl. stream for another %s minutes`, rd.rs.additionalDuration)) @@ -973,11 +974,8 @@ func (rrd *replResilienceDriver) getPhase() c2cPhase { rrd.dstJobID).Scan(&jobStatus) require.Equal(rrd.t, jobs.StatusRunning, jobs.Status(jobStatus)) - progress := getJobProgress(rrd.t, rrd.setup.dst.sysSQL, rrd.dstJobID) - streamIngestProgress := progress.GetStreamIngest() - highWater := progress.GetHighWater() - - if highWater == nil || highWater.IsEmpty() { + streamIngestProgress := getJobProgress(rrd.t, rrd.setup.dst.sysSQL, rrd.dstJobID).GetStreamIngest() + if streamIngestProgress.ReplicatedTime.IsEmpty() { return phaseInitialScan } if streamIngestProgress.CutoverTime.IsEmpty() { @@ -1135,13 +1133,16 @@ func getIngestionJobID(t test.Test, dstSQL *sqlutils.SQLRunner, dstTenantName st } type streamIngesitonJobInfo struct { - status string - errMsg string - highwaterTime time.Time - finishedTime time.Time + status string + errMsg string + replicatedTime time.Time + finishedTime time.Time } -func (c *streamIngesitonJobInfo) GetHighWater() time.Time { return c.highwaterTime } +// GetHighWater returns the replicated time. The GetHighWater name is +// retained here as this is implementing the jobInfo interface used by +// the latency verifier. +func (c *streamIngesitonJobInfo) GetHighWater() time.Time { return c.replicatedTime } func (c *streamIngesitonJobInfo) GetFinishedTime() time.Time { return c.finishedTime } func (c *streamIngesitonJobInfo) GetStatus() string { return c.status } func (c *streamIngesitonJobInfo) GetError() string { return c.status } @@ -1165,16 +1166,11 @@ func getStreamIngestionJobInfo(db *gosql.DB, jobID int) (jobInfo, error) { if err := protoutil.Unmarshal(progressBytes, &progress); err != nil { return nil, err } - var highwaterTime time.Time - highwater := progress.GetHighWater() - if highwater != nil { - highwaterTime = highwater.GoTime() - } return &streamIngesitonJobInfo{ - status: status, - errMsg: payload.Error, - highwaterTime: highwaterTime, - finishedTime: time.UnixMicro(payload.FinishedMicros), + status: status, + errMsg: payload.Error, + replicatedTime: replicationutils.ReplicatedTimeFromProgress(&progress).GoTime(), + finishedTime: time.UnixMicro(payload.FinishedMicros), }, nil } diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index cae15f376987..b274b7860d9b 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -151,6 +151,11 @@ message StreamIngestionProgress { // consistent state as of the CutoverTime. util.hlc.Timestamp cutover_time = 1 [(gogoproto.nullable) = false]; + // ReplicatedTime is the ingestion frontier. This is the canonical + // value of the frontier. The HighWater in the job progress is for + // informational purposes only. + util.hlc.Timestamp replicated_time = 7 [(gogoproto.nullable) = false]; + // ReplicationStatus is the status of the tenant that has a replication // (ingestion) job. uint32 replication_status = 6 [ diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 141bfa28b7ab..a18f2d971c0c 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -705,11 +705,6 @@ func (txn *Txn) commit(ctx context.Context) error { et := endTxnReq(true, txn.deadline()) ba := &kvpb.BatchRequest{Requests: et.unionArr[:]} _, pErr := txn.Send(ctx, ba) - if pErr == nil { - for _, t := range txn.commitTriggers { - t(ctx) - } - } return pErr.GoError() } @@ -1089,7 +1084,15 @@ func (txn *Txn) Send( sender := txn.mu.sender txn.mu.Unlock() br, pErr := txn.db.sendUsingSender(ctx, ba, sender) + if pErr == nil { + // Invoking the commit triggers here ensures they run even in the case when a + // commit request is issued manually (not via Commit). + if et, ok := ba.GetArg(kvpb.EndTxn); ok && et.(*kvpb.EndTxnRequest).Commit { + for _, t := range txn.commitTriggers { + t(ctx) + } + } return br, nil } diff --git a/pkg/kv/txn_test.go b/pkg/kv/txn_test.go index c599eab66f69..152e875d67cb 100644 --- a/pkg/kv/txn_test.go +++ b/pkg/kv/txn_test.go @@ -780,3 +780,58 @@ func TestTxnNegotiateAndSendWithResumeSpan(t *testing.T) { } }) } + +// TestTxnCommitTriggers tests the behavior of invoking commit triggers, as part +// of a Commit or a manual EndTxnRequest that includes a commit. +func TestTxnCommitTriggers(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + for _, test := range []struct { + name string + // A function that specifies how a transaction ends. + endTxnFn func(txn *Txn) error + // Assuming a trigger bool value starts off as false, expTrigger is the + // expected value of the trigger after the transaction ends. + expTrigger bool + }{ + { + name: "explicit commit", + endTxnFn: func(txn *Txn) error { return txn.Commit(ctx) }, + expTrigger: true, + }, + { + name: "manual commit", + endTxnFn: func(txn *Txn) error { + b := txn.NewBatch() + b.AddRawRequest(&kvpb.EndTxnRequest{Commit: true}) + return txn.Run(ctx, b) + }, + expTrigger: true, + }, + { + name: "manual abort", + endTxnFn: func(txn *Txn) error { + b := txn.NewBatch() + b.AddRawRequest(&kvpb.EndTxnRequest{Commit: false}) + return txn.Run(ctx, b) + }, + expTrigger: false, + }, + } { + t.Run(test.name, func(t *testing.T) { + clock := hlc.NewClockForTesting(nil) + db := NewDB(log.MakeTestingAmbientCtxWithNewTracer(), newTestTxnFactory(nil), clock, stopper) + txn := NewTxn(ctx, db, 0 /* gatewayNodeID */) + triggerVal := false + triggerFn := func(ctx context.Context) { triggerVal = true } + txn.AddCommitTrigger(triggerFn) + err := test.endTxnFn(txn) + require.NoError(t, err) + require.Equal(t, test.expTrigger, triggerVal) + }) + } +} diff --git a/pkg/repstream/streampb/stream.proto b/pkg/repstream/streampb/stream.proto index 86fe2ba749c6..335bd5f56aea 100644 --- a/pkg/repstream/streampb/stream.proto +++ b/pkg/repstream/streampb/stream.proto @@ -38,18 +38,18 @@ message ReplicationProducerSpec { // StreamPartitionSpec is the stream partition specification. message StreamPartitionSpec { - // PreviousHighWaterTimestamp specifies the timestamp from which spans will + // PreviousReplicatedTimestamp specifies the timestamp from which spans will // start ingesting data in the replication job. This timestamp is empty unless // the replication job resumes after a progress checkpoint has been recorded. // While it is empty we use the InitialScanTimestamp described below. - util.hlc.Timestamp previous_high_water_timestamp = 1 [(gogoproto.nullable) = false]; + util.hlc.Timestamp previous_replicated_timestamp = 1 [(gogoproto.nullable) = false]; // InitialScanTimestamp is the timestamp at which the partition will run the // initial rangefeed scan before replicating further changes to the target // spans. This timestamp is always non-empty, but a partition will only run an // initial scan if no progress has been recorded prior to the current // resumption of the replication job. Otherwise, all spans will start - // ingesting data from the PreviousHighWaterTimestamp described above. + // ingesting data from the PreviousReplicatedTimestamp described above. util.hlc.Timestamp initial_scan_timestamp = 4 [(gogoproto.nullable) = false]; // List of spans to stream. diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index e1994ffc0811..ca76765ca68c 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -204,18 +204,18 @@ message StreamIngestionDataSpec { // PartitionSpecs maps partition IDs to their specifications. map partition_specs = 6 [(gogoproto.nullable) = false]; - // PreviousHighWaterTimestamp specifies the timestamp from which spans will + // PreviousReplicatedTimestamp specifies the timestamp from which spans will // start ingesting data in the replication job. This timestamp is empty unless // the replication job resumes after a progress checkpoint has been recorded. // While it is empty we use the InitialScanTimestamp described below. - optional util.hlc.Timestamp previous_high_water_timestamp = 2 [(gogoproto.nullable) = false]; + optional util.hlc.Timestamp previous_replicated_timestamp = 2 [(gogoproto.nullable) = false]; // InitialScanTimestamp is the timestamp at which the partition will run the // initial rangefeed scan before replicating further changes to the target // spans. This timestamp is always non-empty, but a partition will only run an // initial scan if no progress has been recorded prior to the current // resumption of the replication job. Otherwise, all spans will start - // ingesting data from the PreviousHighWaterTimestamp described above. + // ingesting data from the PreviousReplicatedTimestamp described above. optional util.hlc.Timestamp initial_scan_timestamp = 11 [(gogoproto.nullable) = false]; // StreamAddress locate the stream so that a stream client can be initialized. @@ -231,11 +231,11 @@ message StreamIngestionDataSpec { } message StreamIngestionFrontierSpec { - // HighWaterAtStart is set by the ingestion job when initializing the frontier + // ReplicatedTimeAtStart is set by the ingestion job when initializing the frontier // processor. It is used as sanity check by the frontier processor to ensure // that it does not receive updates at a timestamp lower than this field. This // consequently prevents the job progress from regressing during ingestion. - optional util.hlc.Timestamp high_water_at_start = 1 [(gogoproto.nullable) = false]; + optional util.hlc.Timestamp replicated_time_at_start = 1 [(gogoproto.nullable) = false]; // TrackedSpans is the entire span set being watched. The spans do not really // represent KV spans but uniquely identify the partitions in the ingestion // stream. Once all the partitions in the ingestion stream have been resolved