From 523b79d32bc0d8e4ef4d377bc0decc01a1a9171d Mon Sep 17 00:00:00 2001 From: adityamaru Date: Wed, 30 Nov 2022 22:47:33 -0500 Subject: [PATCH] streamingccl: tighten replication timestamp semantics Previously, each partition would reach out to the source cluster and pick its own timestamp from which it would start ingesting MVCC versions. This timestamp was used by the rangefeed setup by the partition, to run its initial scan. Eventually, all the partitions would replicate up until a certain timestamp and cause the frontier to be bumped but it was possible for different partitions to begin ingesting at different timestamps. This change makes it such that during replication planning when we create the producer job on the source cluster, we return a timestamp alongwith the StreamID. This becomes the timestamp at which each ingestion partition sets up the inital scan of the rangefeed, and consequently become the inital timestamp at which all data is ingested. We stash this timestamp in the replication job details and never update its value. On future resumptions of the replication job, if there is a progress high water, we will not run an initial rangefeed scan but instead start the rangefeed from the previous progress highwater. The motivation for this change was to know the lower bound on both the source and destination cluster for MVCC versions that have been streamed. This is necessary to bound the fingerprinting on both clusters to ensure a match. Release note: None Fixes: #92742 --- docs/generated/sql/functions.md | 2 +- pkg/ccl/streamingccl/streamclient/client.go | 10 +- .../streamingccl/streamclient/client_test.go | 29 +++--- .../streamclient/partitioned_stream_client.go | 26 +++-- .../partitioned_stream_client_test.go | 14 ++- .../streamclient/random_stream_client.go | 15 ++- ...tream_ingestion_frontier_processor_test.go | 10 +- .../streamingest/stream_ingestion_job.go | 29 ++++-- .../streamingest/stream_ingestion_job_test.go | 99 +++++++++++++++++++ .../streamingest/stream_ingestion_planning.go | 13 ++- .../stream_ingestion_processor.go | 9 +- .../stream_ingestion_processor_planning.go | 18 ++-- .../stream_ingestion_processor_test.go | 66 ++++++------- .../stream_replication_e2e_test.go | 7 -- .../streamingccl/streamingtest/BUILD.bazel | 2 + .../streamingtest/replication_helpers.go | 16 +++ .../streamproducer/event_stream.go | 17 ++-- .../streamproducer/replication_manager.go | 4 +- .../streamproducer/replication_stream_test.go | 81 ++++++++------- .../streamproducer/stream_lifetime.go | 23 ++--- pkg/jobs/jobspb/jobs.proto | 14 ++- pkg/repstream/streampb/stream.proto | 30 +++++- pkg/sql/exec_util.go | 5 + pkg/sql/execinfrapb/processors_bulk_io.proto | 16 ++- pkg/sql/sem/builtins/fixed_oids.go | 2 +- pkg/sql/sem/builtins/replication_builtins.go | 10 +- pkg/sql/sem/eval/context.go | 2 +- 27 files changed, 388 insertions(+), 181 deletions(-) diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md index e3eea1cd12b4..9b0f90519e1e 100644 --- a/docs/generated/sql/functions.md +++ b/docs/generated/sql/functions.md @@ -2650,7 +2650,7 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates Volatile crdb_internal.replication_stream_spec(stream_id: int) → bytes

This function can be used on the consumer side to get a replication stream specification for the specified stream. The consumer will later call ‘stream_partition’ to a partition with the spec to start streaming.

Volatile -crdb_internal.start_replication_stream(tenant_name: string) → int

This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.

+crdb_internal.start_replication_stream(tenant_name: string) → bytes

This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.

Volatile crdb_internal.stream_ingestion_stats_json(job_id: int) → jsonb

This function can be used on the ingestion side to get a statistics summary of a stream ingestion job in json format.

Volatile diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 6462ed731f07..e7e1632b80e4 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -47,7 +47,7 @@ type Client interface { // Create initializes a stream with the source, potentially reserving any // required resources, such as protected timestamps, and returns an ID which // can be used to interact with this stream in the future. - Create(ctx context.Context, tenant roachpb.TenantName) (streampb.StreamID, error) + Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error) // Dial checks if the source is able to be connected to for queries Dial(ctx context.Context) error @@ -74,12 +74,8 @@ type Client interface { // the specified remote address. This is used by each consumer processor to // open its subscription to its partition of a larger stream. // TODO(dt): ts -> checkpointToken. - Subscribe( - ctx context.Context, - streamID streampb.StreamID, - spec SubscriptionToken, - checkpoint hlc.Timestamp, - ) (Subscription, error) + Subscribe(ctx context.Context, streamID streampb.StreamID, spec SubscriptionToken, + initialScanTime hlc.Timestamp, previousHighWater hlc.Timestamp) (Subscription, error) // Close releases all the resources used by this client. Close(ctx context.Context) error diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index 7f8a1c5db78c..a10987b25648 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -32,19 +33,22 @@ type testStreamClient struct{} var _ Client = testStreamClient{} // Dial implements Client interface. -func (sc testStreamClient) Dial(ctx context.Context) error { +func (sc testStreamClient) Dial(_ context.Context) error { return nil } // Create implements the Client interface. func (sc testStreamClient) Create( _ context.Context, _ roachpb.TenantName, -) (streampb.StreamID, error) { - return streampb.StreamID(1), nil +) (streampb.ReplicationProducerSpec, error) { + return streampb.ReplicationProducerSpec{ + StreamID: streampb.StreamID(1), + ReplicationStartTime: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, + }, nil } // Plan implements the Client interface. -func (sc testStreamClient) Plan(ctx context.Context, ID streampb.StreamID) (Topology, error) { +func (sc testStreamClient) Plan(_ context.Context, _ streampb.StreamID) (Topology, error) { return Topology{ Partitions: []PartitionInfo{ { @@ -59,19 +63,19 @@ func (sc testStreamClient) Plan(ctx context.Context, ID streampb.StreamID) (Topo // Heartbeat implements the Client interface. func (sc testStreamClient) Heartbeat( - ctx context.Context, ID streampb.StreamID, _ hlc.Timestamp, + _ context.Context, _ streampb.StreamID, _ hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { return streampb.StreamReplicationStatus{}, nil } // Close implements the Client interface. -func (sc testStreamClient) Close(ctx context.Context) error { +func (sc testStreamClient) Close(_ context.Context) error { return nil } // Subscribe implements the Client interface. func (sc testStreamClient) Subscribe( - ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp, + _ context.Context, _ streampb.StreamID, _ SubscriptionToken, _ hlc.Timestamp, _ hlc.Timestamp, ) (Subscription, error) { sampleKV := roachpb.KeyValue{ Key: []byte("key_1"), @@ -97,9 +101,7 @@ func (sc testStreamClient) Subscribe( } // Complete implements the streamclient.Client interface. -func (sc testStreamClient) Complete( - ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, -) error { +func (sc testStreamClient) Complete(_ context.Context, _ streampb.StreamID, _ bool) error { return nil } @@ -108,7 +110,7 @@ type testStreamSubscription struct { } // Subscribe implements the Subscription interface. -func (t testStreamSubscription) Subscribe(ctx context.Context) error { +func (t testStreamSubscription) Subscribe(_ context.Context) error { return nil } @@ -189,10 +191,11 @@ func ExampleClient() { _ = client.Close(ctx) }() - id, err := client.Create(ctx, "system") + prs, err := client.Create(ctx, "system") if err != nil { panic(err) } + id := prs.StreamID var ingested struct { ts hlc.Timestamp @@ -233,7 +236,7 @@ func ExampleClient() { for _, partition := range topology.Partitions { // TODO(dt): use Subscribe helper and partition.SrcAddr - sub, err := client.Subscribe(ctx, id, partition.SubscriptionToken, ts) + sub, err := client.Subscribe(ctx, id, partition.SubscriptionToken, hlc.Timestamp{}, ts) if err != nil { panic(err) } diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index 649fd133d7c7..dce354632b4d 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -74,21 +74,24 @@ var _ Client = &partitionedStreamClient{} // Create implements Client interface. func (p *partitionedStreamClient) Create( ctx context.Context, tenantName roachpb.TenantName, -) (streampb.StreamID, error) { +) (streampb.ReplicationProducerSpec, error) { ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create") defer sp.Finish() p.mu.Lock() defer p.mu.Unlock() - var streamID streampb.StreamID + var rawReplicationProducerSpec []byte row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName) - err := row.Scan(&streamID) + err := row.Scan(&rawReplicationProducerSpec) if err != nil { - return streampb.InvalidStreamID, - errors.Wrapf(err, "error creating replication stream for tenant %s", tenantName) + return streampb.ReplicationProducerSpec{}, errors.Wrapf(err, "error creating replication stream for tenant %s", tenantName) + } + var replicationProducerSpec streampb.ReplicationProducerSpec + if err := protoutil.Unmarshal(rawReplicationProducerSpec, &replicationProducerSpec); err != nil { + return streampb.ReplicationProducerSpec{}, err } - return streamID, err + return replicationProducerSpec, err } // Dial implements Client interface. @@ -195,7 +198,11 @@ func (p *partitionedStreamClient) Close(ctx context.Context) error { // Subscribe implements Client interface. func (p *partitionedStreamClient) Subscribe( - ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp, + ctx context.Context, + streamID streampb.StreamID, + spec SubscriptionToken, + initialScanTime hlc.Timestamp, + previousHighWater hlc.Timestamp, ) (Subscription, error) { _, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe") defer sp.Finish() @@ -204,7 +211,8 @@ func (p *partitionedStreamClient) Subscribe( if err := protoutil.Unmarshal(spec, &sps); err != nil { return nil, err } - sps.StartFrom = checkpoint + sps.InitialScanTimestamp = initialScanTime + sps.PreviousHighWaterTimestamp = previousHighWater specBytes, err := protoutil.Marshal(&sps) if err != nil { @@ -215,7 +223,7 @@ func (p *partitionedStreamClient) Subscribe( eventsChan: make(chan streamingccl.Event), srcConnConfig: p.pgxConfig, specBytes: specBytes, - streamID: stream, + streamID: streamID, closeChan: make(chan struct{}), } p.mu.Lock() diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go index e074b1e8e21f..d96128b89dbb 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go @@ -128,8 +128,9 @@ INSERT INTO d.t2 VALUES (2); [][]string{{string(status)}}) } - streamID, err := client.Create(ctx, testTenantName) + rps, err := client.Create(ctx, testTenantName) require.NoError(t, err) + streamID := rps.StreamID // We can create multiple replication streams for the same tenant. _, err = client.Create(ctx, testTenantName) require.NoError(t, err) @@ -166,6 +167,8 @@ INSERT INTO d.t2 VALUES (2); require.NoError(t, err) require.Equal(t, streampb.StreamReplicationStatus_STREAM_INACTIVE, status.StreamStatus) + initialScanTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + // Testing client.Subscribe() makePartitionSpec := func(tables ...string) *streampb.StreamPartitionSpec { var spans []roachpb.Span @@ -176,7 +179,8 @@ INSERT INTO d.t2 VALUES (2); } return &streampb.StreamPartitionSpec{ - Spans: spans, + InitialScanTimestamp: initialScanTimestamp, + Spans: spans, Config: streampb.StreamPartitionSpec_ExecutionConfig{ MinCheckpointFrequency: 10 * time.Millisecond, }, @@ -199,7 +203,8 @@ INSERT INTO d.t2 VALUES (2); require.NoError(t, subClient.Close(ctx)) }() require.NoError(t, err) - sub, err := subClient.Subscribe(ctx, streamID, encodeSpec("t1"), hlc.Timestamp{}) + sub, err := subClient.Subscribe(ctx, streamID, encodeSpec("t1"), + initialScanTimestamp, hlc.Timestamp{}) require.NoError(t, err) rf := streamingtest.MakeReplicationFeed(t, &subscriptionFeedSource{sub: sub}) @@ -244,8 +249,9 @@ INSERT INTO d.t2 VALUES (2); h.SysSQL.Exec(t, ` SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms'; `) - streamID, err = client.Create(ctx, testTenantName) + rps, err = client.Create(ctx, testTenantName) require.NoError(t, err) + streamID = rps.StreamID require.NoError(t, client.Complete(ctx, streamID, true)) h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM [SHOW JOBS] WHERE job_id = %d", streamID), [][]string{{"succeeded"}}) diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 3b3892a19315..61e7e3462678 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -383,9 +383,12 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top // Create implements the Client interface. func (m *RandomStreamClient) Create( ctx context.Context, tenantName roachpb.TenantName, -) (streampb.StreamID, error) { +) (streampb.ReplicationProducerSpec, error) { log.Infof(ctx, "creating random stream for tenant %s", tenantName) - return streampb.StreamID(1), nil + return streampb.ReplicationProducerSpec{ + StreamID: streampb.StreamID(1), + ReplicationStartTime: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, + }, nil } // Heartbeat implements the Client interface. @@ -457,7 +460,11 @@ func (m *RandomStreamClient) Close(_ context.Context) error { // Subscribe implements the Client interface. func (m *RandomStreamClient) Subscribe( - ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp, + _ context.Context, + _ streampb.StreamID, + spec SubscriptionToken, + initialScanTime hlc.Timestamp, + _ hlc.Timestamp, ) (Subscription, error) { partitionURL, err := url.Parse(string(spec)) if err != nil { @@ -471,7 +478,7 @@ func (m *RandomStreamClient) Subscribe( eventCh := make(chan streamingccl.Event) now := timeutil.Now() - startWalltime := timeutil.Unix(0 /* sec */, checkpoint.WallTime) + startWalltime := timeutil.Unix(0 /* sec */, initialScanTime.WallTime) if startWalltime.After(now) { panic("cannot start random stream client event stream in the future") } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go index 0c24d4524ca6..94004575dd11 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor_test.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/limit" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) @@ -245,7 +246,10 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { OldID: roachpb.MustMakeTenantID(tenantID), NewID: roachpb.MustMakeTenantID(tenantID + 10), } - spec.StartTime = tc.frontierStartTime + spec.PreviousHighWaterTimestamp = tc.frontierStartTime + if tc.frontierStartTime.IsEmpty() { + spec.InitialScanTimestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + } spec.Checkpoint.ResolvedSpans = tc.jobCheckpoint proc, err := newStreamIngestionDataProcessor(ctx, &flowCtx, 0 /* processorID */, spec, &post, out) require.NoError(t, err) @@ -331,8 +335,8 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) { require.NoError(t, err) progress := job.Progress().Progress if progress == nil { - if !heartbeatTs.Equal(spec.StartTime) { - t.Fatalf("heartbeat %v should equal start time of %v", heartbeatTs, spec.StartTime) + if !heartbeatTs.Equal(spec.InitialScanTimestamp) { + t.Fatalf("heartbeat %v should equal start time of %v", heartbeatTs, spec.InitialScanTimestamp) } } else { persistedHighwater := *progress.(*jobspb.Progress_HighWater).HighWater diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index b3e1478b159a..123162fab64b 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -214,10 +214,14 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. progress := ingestionJob.Progress() streamAddress := streamingccl.StreamAddress(details.StreamAddress) - startTime := progress.GetStreamIngest().StartTime + var previousHighWater, heartbeatTimestamp hlc.Timestamp + initialScanTimestamp := details.ReplicationStartTime // Start from the last checkpoint if it exists. if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { - startTime = *h + previousHighWater = *h + heartbeatTimestamp = previousHighWater + } else { + heartbeatTimestamp = initialScanTimestamp } // Initialize a stream client and resolve topology. @@ -229,7 +233,7 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. streamID := streampb.StreamID(details.StreamID) updateRunningStatus(ctx, ingestionJob, fmt.Sprintf("connecting to the producer job %d "+ "and creating a stream replication plan", streamID)) - if err := waitUntilProducerActive(ctx, client, streamID, startTime, ingestionJob.ID()); err != nil { + if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil { return err } @@ -241,9 +245,6 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. // TODO(casper): update running status err = ingestionJob.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - if md.Progress.GetStreamIngest().StartTime.Less(startTime) { - md.Progress.GetStreamIngest().StartTime = startTime - } md.Progress.GetStreamIngest().StreamAddresses = topology.StreamAddresses() ju.UpdateProgress(md.Progress) return nil @@ -252,8 +253,14 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. return errors.Wrap(err, "failed to update job progress") } - log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s", - ingestionJob.ID(), progress.GetStreamIngest().StartTime) + if previousHighWater.IsEmpty() { + log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s", + ingestionJob.ID(), initialScanTimestamp) + } else { + log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s", + ingestionJob.ID(), previousHighWater) + } + ingestProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest checkpoint := ingestProgress.Checkpoint @@ -268,12 +275,16 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. // Construct stream ingestion processor specs. streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs( - streamAddress, topology, sqlInstanceIDs, progress.GetStreamIngest().StartTime, checkpoint, + streamAddress, topology, sqlInstanceIDs, initialScanTimestamp, previousHighWater, checkpoint, ingestionJob.ID(), streamID, topology.SourceTenantID, details.DestinationTenantID) if err != nil { return err } + if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.AfterReplicationFlowPlan != nil { + knobs.AfterReplicationFlowPlan(streamIngestionSpecs, streamIngestionFrontierSpec) + } + // Plan and run the DistSQL flow. log.Infof(ctx, "starting to run DistSQL flow for stream ingestion job %d", ingestionJob.ID()) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 9ab47e13ca54..49655b4ea767 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -28,6 +28,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -324,3 +326,100 @@ func TestCutoverBuiltin(t *testing.T) { require.True(t, ok) require.Equal(t, hlc.Timestamp{WallTime: highWater.UnixNano()}, sp.StreamIngest.CutoverTime) } + +// TestReplicationJobResumptionStartTime tests that a replication job picks the +// correct timestamps to resume from across multiple resumptions. +func TestReplicationJobResumptionStartTime(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + planned := make(chan struct{}) + canContinue := make(chan struct{}) + args := defaultTenantStreamingClustersArgs + + replicationSpecs := make([]*execinfrapb.StreamIngestionDataSpec, 0) + frontier := &execinfrapb.StreamIngestionFrontierSpec{} + args.testingKnobs = &sql.StreamingTestingKnobs{ + AfterReplicationFlowPlan: func(ingestionSpecs []*execinfrapb.StreamIngestionDataSpec, + frontierSpec *execinfrapb.StreamIngestionFrontierSpec) { + replicationSpecs = ingestionSpecs + frontier = frontierSpec + planned <- struct{}{} + <-canContinue + }, + } + c, cleanup := createTenantStreamingClusters(ctx, t, args) + defer cleanup() + defer close(planned) + defer close(canContinue) + + producerJobID, replicationJobID := c.startStreamReplication() + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + + // Wait for the distsql plan to be created. + <-planned + registry := c.destSysServer.ExecutorConfig().(sql.ExecutorConfig).JobRegistry + var replicationJobDetails jobspb.StreamIngestionDetails + require.NoError(t, c.destSysServer.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + j, err := registry.LoadJobWithTxn(ctx, jobspb.JobID(replicationJobID), txn) + require.NoError(t, err) + var ok bool + replicationJobDetails, ok = j.Details().(jobspb.StreamIngestionDetails) + if !ok { + t.Fatalf("job with id %d is not a stream ingestion job", replicationJobID) + } + return nil + })) + + // Let's verify the timestamps on the first resumption of the replication job. + startTime := replicationJobDetails.ReplicationStartTime + require.NotEmpty(t, startTime) + + for _, r := range replicationSpecs { + require.Equal(t, startTime, r.InitialScanTimestamp) + require.Empty(t, r.PreviousHighWaterTimestamp) + } + require.Empty(t, frontier.HighWaterAtStart) + + // Allow the job to make some progress. + canContinue <- struct{}{} + srcTime := c.srcCluster.Server(0).Clock().Now() + c.waitUntilHighWatermark(srcTime, jobspb.JobID(replicationJobID)) + + // Pause the job. + c.destSysSQL.Exec(t, `PAUSE JOB $1`, replicationJobID) + jobutils.WaitForJobToPause(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + + // Unpause the job and ensure the resumption takes place at a later timestamp + // than the initial scan timestamp. + c.destSysSQL.Exec(t, `RESUME JOB $1`, replicationJobID) + jobutils.WaitForJobToRun(c.t, c.srcSysSQL, jobspb.JobID(producerJobID)) + jobutils.WaitForJobToRun(c.t, c.destSysSQL, jobspb.JobID(replicationJobID)) + + <-planned + stats := streamIngestionStats(t, c.destSysSQL, replicationJobID) + + // Assert that the start time hasn't changed. + require.Equal(t, startTime, stats.IngestionDetails.ReplicationStartTime) + + // Assert that the previous highwater mark is greater than the replication + // start time. + var previousHighWaterTimestamp hlc.Timestamp + for _, r := range replicationSpecs { + require.Equal(t, startTime, r.InitialScanTimestamp) + require.True(t, r.InitialScanTimestamp.Less(r.PreviousHighWaterTimestamp)) + if previousHighWaterTimestamp.IsEmpty() { + previousHighWaterTimestamp = r.PreviousHighWaterTimestamp + } else { + require.Equal(t, r.PreviousHighWaterTimestamp, previousHighWaterTimestamp) + } + } + require.Equal(t, frontier.HighWaterAtStart, previousHighWaterTimestamp) + canContinue <- struct{}{} + srcTime = c.srcCluster.Server(0).Clock().Now() + c.waitUntilHighWatermark(srcTime, jobspb.JobID(replicationJobID)) + c.cutover(producerJobID, replicationJobID, srcTime.GoTime()) + jobutils.WaitForJobToSucceed(t, c.destSysSQL, jobspb.JobID(replicationJobID)) +} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 763afdeabb0d..c40e7ffa3226 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -158,9 +158,10 @@ func ingestionPlanHook( if err != nil { return err } - // Create the producer job first for the purpose of observability, - // user is able to know the producer job id immediately after executing the RESTORE. - streamID, err := client.Create(ctx, roachpb.TenantName(sourceTenant)) + // Create the producer job first for the purpose of observability, user is + // able to know the producer job id immediately after executing + // CREATE TENANT ... FROM REPLICATION. + replicationProducerSpec, err := client.Create(ctx, roachpb.TenantName(sourceTenant)) if err != nil { return err } @@ -176,12 +177,13 @@ func ingestionPlanHook( } streamIngestionDetails := jobspb.StreamIngestionDetails{ StreamAddress: string(streamAddress), - StreamID: uint64(streamID), + StreamID: uint64(replicationProducerSpec.StreamID), Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}, DestinationTenantID: destinationTenantID, SourceTenantName: roachpb.TenantName(sourceTenant), DestinationTenantName: roachpb.TenantName(destinationTenant), ReplicationTTLSeconds: int32(replicationTTLSeconds), + ReplicationStartTime: replicationProducerSpec.ReplicationStartTime, } jobDescription, err := streamIngestionJobDescription(p, ingestionStmt) @@ -202,7 +204,8 @@ func ingestionPlanHook( return err } - resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID())), tree.NewDInt(tree.DInt(streamID))} + resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(sj.ID())), + tree.NewDInt(tree.DInt(replicationProducerSpec.StreamID))} return nil } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 22aba9cbca3c..df8ee9fe0772 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -228,7 +228,7 @@ func newStreamIngestionDataProcessor( trackedSpans = append(trackedSpans, partitionSpec.Spans...) } - frontier, err := span.MakeFrontierAt(spec.StartTime, trackedSpans...) + frontier, err := span.MakeFrontierAt(spec.PreviousHighWaterTimestamp, trackedSpans...) if err != nil { return nil, err } @@ -325,15 +325,16 @@ func (sip *streamIngestionProcessor) Start(ctx context.Context) { sip.streamPartitionClients = append(sip.streamPartitionClients, streamClient) } - startTime := frontierForSpans(sip.frontier, partitionSpec.Spans...) + previousHighWater := frontierForSpans(sip.frontier, partitionSpec.Spans...) if streamingKnobs, ok := sip.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok { if streamingKnobs != nil && streamingKnobs.BeforeClientSubscribe != nil { - streamingKnobs.BeforeClientSubscribe(addr, string(token), startTime) + streamingKnobs.BeforeClientSubscribe(addr, string(token), previousHighWater) } } - sub, err := streamClient.Subscribe(ctx, streampb.StreamID(sip.spec.StreamID), token, startTime) + sub, err := streamClient.Subscribe(ctx, streampb.StreamID(sip.spec.StreamID), token, + sip.spec.InitialScanTimestamp, previousHighWater) if err != nil { sip.MoveToDraining(errors.Wrapf(err, "consuming partition %v", addr)) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index de00caebbbbb..e973ce67c4bb 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -30,7 +30,8 @@ func distStreamIngestionPlanSpecs( streamAddress streamingccl.StreamAddress, topology streamclient.Topology, sqlInstanceIDs []base.SQLInstanceID, - initialHighWater hlc.Timestamp, + initialScanTimestamp hlc.Timestamp, + previousHighWater hlc.Timestamp, checkpoint jobspb.StreamIngestionCheckpoint, jobID jobspb.JobID, streamID streampb.StreamID, @@ -48,12 +49,13 @@ func distStreamIngestionPlanSpecs( // the partition addresses. if i < len(sqlInstanceIDs) { spec := &execinfrapb.StreamIngestionDataSpec{ - StreamID: uint64(streamID), - JobID: int64(jobID), - StartTime: initialHighWater, - Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info - StreamAddress: string(streamAddress), - PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec), + StreamID: uint64(streamID), + JobID: int64(jobID), + PreviousHighWaterTimestamp: previousHighWater, + InitialScanTimestamp: initialScanTimestamp, + Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info + StreamAddress: string(streamAddress), + PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec), TenantRekey: execinfrapb.TenantRekey{ OldID: sourceTenantID, NewID: destinationTenantID, @@ -77,7 +79,7 @@ func distStreamIngestionPlanSpecs( // Create a spec for the StreamIngestionFrontier processor on the coordinator // node. streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{ - HighWaterAtStart: initialHighWater, + HighWaterAtStart: previousHighWater, TrackedSpans: trackedSpans, JobID: int64(jobID), StreamID: uint64(streamID), diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index a997cc11ca9a..6d8f06db6c05 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -62,25 +62,25 @@ var _ streamclient.Client = &mockStreamClient{} // Create implements the Client interface. func (m *mockStreamClient) Create( _ context.Context, _ roachpb.TenantName, -) (streampb.StreamID, error) { +) (streampb.ReplicationProducerSpec, error) { panic("unimplemented") } // Dial implements the Client interface. -func (m *mockStreamClient) Dial(ctx context.Context) error { +func (m *mockStreamClient) Dial(_ context.Context) error { panic("unimplemented") } // Heartbeat implements the Client interface. func (m *mockStreamClient) Heartbeat( - ctx context.Context, ID streampb.StreamID, _ hlc.Timestamp, + _ context.Context, _ streampb.StreamID, _ hlc.Timestamp, ) (streampb.StreamReplicationStatus, error) { panic("unimplemented") } // Plan implements the Client interface. func (m *mockStreamClient) Plan( - ctx context.Context, _ streampb.StreamID, + _ context.Context, _ streampb.StreamID, ) (streamclient.Topology, error) { panic("unimplemented mock method") } @@ -90,7 +90,7 @@ type mockSubscription struct { } // Subscribe implements the Subscription interface. -func (m *mockSubscription) Subscribe(ctx context.Context) error { +func (m *mockSubscription) Subscribe(_ context.Context) error { return nil } @@ -107,16 +107,17 @@ func (m *mockSubscription) Err() error { // Subscribe implements the Client interface. func (m *mockStreamClient) Subscribe( ctx context.Context, - stream streampb.StreamID, + _ streampb.StreamID, token streamclient.SubscriptionToken, - startTime hlc.Timestamp, + initialScanTime hlc.Timestamp, + _ hlc.Timestamp, ) (streamclient.Subscription, error) { var events []streamingccl.Event var ok bool if events, ok = m.partitionEvents[string(token)]; !ok { return nil, errors.Newf("no events found for partition %s", string(token)) } - log.Infof(ctx, "%q beginning subscription from time %v ", string(token), startTime) + log.Infof(ctx, "%q beginning subscription from time %v ", string(token), initialScanTime) log.Infof(ctx, "%q emitting %d events", string(token), len(events)) eventCh := make(chan streamingccl.Event, len(events)) @@ -137,14 +138,12 @@ func (m *mockStreamClient) Subscribe( } // Close implements the Client interface. -func (m *mockStreamClient) Close(ctx context.Context) error { +func (m *mockStreamClient) Close(_ context.Context) error { return nil } // Complete implements the streamclient.Client interface. -func (m *mockStreamClient) Complete( - ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, -) error { +func (m *mockStreamClient) Complete(_ context.Context, _ streampb.StreamID, _ bool) error { return nil } @@ -155,18 +154,17 @@ var _ streamclient.Client = &errorStreamClient{} // ConsumePartition implements the streamclient.Client interface. func (m *errorStreamClient) Subscribe( - ctx context.Context, - stream streampb.StreamID, - spec streamclient.SubscriptionToken, - checkpoint hlc.Timestamp, + _ context.Context, + _ streampb.StreamID, + _ streamclient.SubscriptionToken, + _ hlc.Timestamp, + _ hlc.Timestamp, ) (streamclient.Subscription, error) { return nil, errors.New("this client always returns an error") } // Complete implements the streamclient.Client interface. -func (m *errorStreamClient) Complete( - ctx context.Context, streamID streampb.StreamID, successfulIngestion bool, -) error { +func (m *errorStreamClient) Complete(_ context.Context, _ streampb.StreamID, _ bool) error { return nil } @@ -241,7 +239,7 @@ func TestStreamIngestionProcessor(t *testing.T) { partitionEvents: map[string][]streamingccl.Event{string(p1): events(), string(p2): events()}, } - startTime := hlc.Timestamp{WallTime: 1} + initialScanTimestamp := hlc.Timestamp{WallTime: 1} partitions := []streamclient.PartitionInfo{ {ID: "1", SubscriptionToken: p1, Spans: []roachpb.Span{p1Span}}, {ID: "2", SubscriptionToken: p2, Spans: []roachpb.Span{p2Span}}, @@ -250,7 +248,7 @@ func TestStreamIngestionProcessor(t *testing.T) { Partitions: partitions, } out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, - topology, startTime, []jobspb.ResolvedSpan{}, tenantRekey, + topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, mockClient, nil /* cutoverProvider */, nil /* streamingTestingKnobs */) require.NoError(t, err) @@ -279,7 +277,7 @@ func TestStreamIngestionProcessor(t *testing.T) { partitionEvents: map[string][]streamingccl.Event{string(p1): events(), string(p2): events()}, } - startTime := hlc.Timestamp{WallTime: 1} + initialScanTimestamp := hlc.Timestamp{WallTime: 1} partitions := []streamclient.PartitionInfo{ {ID: "1", SubscriptionToken: p1, Spans: []roachpb.Span{p1Span}}, {ID: "2", SubscriptionToken: p2, Spans: []roachpb.Span{p2Span}}, @@ -297,7 +295,7 @@ func TestStreamIngestionProcessor(t *testing.T) { lastClientStart[token] = clientStartTime }} out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, - topology, startTime, checkpoint, tenantRekey, mockClient, + topology, initialScanTimestamp, checkpoint, tenantRekey, mockClient, nil /* cutoverProvider */, streamingTestingKnobs) require.NoError(t, err) @@ -315,7 +313,7 @@ func TestStreamIngestionProcessor(t *testing.T) { }) t.Run("error stream client", func(t *testing.T) { - startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + initialScanTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} partitions := []streamclient.PartitionInfo{ {SubscriptionToken: streamclient.SubscriptionToken("1")}, {SubscriptionToken: streamclient.SubscriptionToken("2")}, @@ -324,7 +322,7 @@ func TestStreamIngestionProcessor(t *testing.T) { Partitions: partitions, } out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, - topology, startTime, []jobspb.ResolvedSpan{}, tenantRekey, &errorStreamClient{}, + topology, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, &errorStreamClient{}, nil /* cutoverProvider */, nil /* streamingTestingKnobs */) require.NoError(t, err) @@ -463,15 +461,15 @@ func TestRandomClientGeneration(t *testing.T) { randomStreamClient, ok := streamClient.(*streamclient.RandomStreamClient) require.True(t, ok) - id, err := randomStreamClient.Create(ctx, tenantName) + rps, err := randomStreamClient.Create(ctx, tenantName) require.NoError(t, err) - topo, err := randomStreamClient.Plan(ctx, id) + topo, err := randomStreamClient.Plan(ctx, rps.StreamID) require.NoError(t, err) // One system and two table data partitions. require.Equal(t, 2 /* numPartitions */, len(topo.Partitions)) - startTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + initialScanTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} ctx, cancel := context.WithCancel(ctx) // Cancel the flow after emitting 1000 checkpoint events from the client. @@ -494,7 +492,7 @@ func TestRandomClientGeneration(t *testing.T) { randomStreamClient.RegisterInterception(validateFnWithValidator(t, streamValidator)) out, err := runStreamIngestionProcessor(ctx, t, registry, kvDB, - topo, startTime, []jobspb.ResolvedSpan{}, tenantRekey, + topo, initialScanTimestamp, []jobspb.ResolvedSpan{}, tenantRekey, randomStreamClient, noCutover{}, nil /* streamingTestingKnobs*/) require.NoError(t, err) @@ -536,7 +534,7 @@ func TestRandomClientGeneration(t *testing.T) { } // We must have at least some progress across the frontier - require.Greater(t, latestResolvedTimestamp.WallTime, startTime.WallTime) + require.Greater(t, latestResolvedTimestamp.WallTime, initialScanTimestamp.WallTime) } // Ensure that no errors were reported to the validator. @@ -559,7 +557,7 @@ func runStreamIngestionProcessor( registry *jobs.Registry, kvDB *kv.DB, partitions streamclient.Topology, - startTime hlc.Timestamp, + initialScanTimestamp hlc.Timestamp, checkpoint []jobspb.ResolvedSpan, tenantRekey execinfrapb.TenantRekey, mockClient streamclient.Client, @@ -567,7 +565,7 @@ func runStreamIngestionProcessor( streamingTestingKnobs *sql.StreamingTestingKnobs, ) (*distsqlutils.RowBuffer, error) { sip, out, err := getStreamIngestionProcessor(ctx, t, registry, kvDB, - partitions, startTime, checkpoint, tenantRekey, mockClient, cutoverProvider, streamingTestingKnobs) + partitions, initialScanTimestamp, checkpoint, tenantRekey, mockClient, cutoverProvider, streamingTestingKnobs) require.NoError(t, err) sip.Run(ctx) @@ -588,7 +586,7 @@ func getStreamIngestionProcessor( registry *jobs.Registry, kvDB *kv.DB, partitions streamclient.Topology, - startTime hlc.Timestamp, + initialScanTimestamp hlc.Timestamp, checkpoint []jobspb.ResolvedSpan, tenantRekey execinfrapb.TenantRekey, mockClient streamclient.Client, @@ -632,7 +630,7 @@ func getStreamIngestionProcessor( Spans: pa.Spans, } } - spec.StartTime = startTime + spec.InitialScanTimestamp = initialScanTimestamp spec.Checkpoint.ResolvedSpans = checkpoint processorID := int32(0) proc, err := newStreamIngestionDataProcessor(ctx, &flowCtx, processorID, spec, &post, out) diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index 9026f7927363..b33868f6fe9a 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -505,9 +505,6 @@ func TestTenantStreamingPauseResumeIngestion(t *testing.T) { srcTime = c.srcCluster.Server(0).Clock().Now() c.waitUntilHighWatermark(srcTime, jobspb.JobID(ingestionJobID)) c.compareResult("SELECT * FROM d.t2") - // Confirm this new run resumed from the previous checkpoint. - require.Equal(t, pausedCheckpoint, - streamIngestionStats(t, c.destSysSQL, ingestionJobID).IngestionProgress.StartTime) } func TestTenantStreamingPauseOnPermanentJobError(t *testing.T) { @@ -569,10 +566,6 @@ func TestTenantStreamingPauseOnPermanentJobError(t *testing.T) { // Ingestion happened one more time after resuming the ingestion job. require.Equal(t, 3, ingestionStarts) - - // Confirm this new run resumed from the empty checkpoint. - require.True(t, - streamIngestionStats(t, c.destSysSQL, ingestionJobID).IngestionProgress.StartTime.IsEmpty()) } func TestTenantStreamingCheckpoint(t *testing.T) { diff --git a/pkg/ccl/streamingccl/streamingtest/BUILD.bazel b/pkg/ccl/streamingccl/streamingtest/BUILD.bazel index 13ed90594d9f..4ab742c1b8f1 100644 --- a/pkg/ccl/streamingccl/streamingtest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingtest/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/ccl/streamingccl", "//pkg/jobs/jobspb", "//pkg/keys", + "//pkg/repstream/streampb", "//pkg/roachpb", "//pkg/security/username", "//pkg/sql/catalog", @@ -25,6 +26,7 @@ go_library( "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/util/hlc", + "//pkg/util/protoutil", "//pkg/util/syncutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go index e99019b02b8b..e45056579fd6 100644 --- a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go +++ b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go @@ -21,12 +21,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -264,3 +266,17 @@ func (rh *ReplicationHelper) TableSpan(codec keys.SQLCodec, table string) roachp rh.SysServer.DB(), codec, "d", table) return desc.PrimaryIndexSpan(codec) } + +// StartReplicationStream reaches out to the system tenant to start the +// replication stream from the source tenant. +func (rh *ReplicationHelper) StartReplicationStream( + t *testing.T, sourceTenantName roachpb.TenantName, +) streampb.ReplicationProducerSpec { + var rawReplicationProducerSpec []byte + row := rh.SysSQL.QueryRow(t, `SELECT crdb_internal.start_replication_stream($1)`, sourceTenantName) + row.Scan(&rawReplicationProducerSpec) + var replicationProducerSpec streampb.ReplicationProducerSpec + err := protoutil.Unmarshal(rawReplicationProducerSpec, &replicationProducerSpec) + require.NoError(t, err) + return replicationProducerSpec +} diff --git a/pkg/ccl/streamingccl/streamproducer/event_stream.go b/pkg/ccl/streamingccl/streamproducer/event_stream.go index 6c4690b76733..1dc0d7f78b27 100644 --- a/pkg/ccl/streamingccl/streamproducer/event_stream.go +++ b/pkg/ccl/streamingccl/streamproducer/event_stream.go @@ -111,10 +111,8 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { return err } - if s.spec.StartFrom.IsEmpty() { - // Arrange to perform initial scan. - s.spec.StartFrom = s.execCfg.Clock.Now() - + initialTimestamp := s.spec.InitialScanTimestamp + if s.spec.PreviousHighWaterTimestamp.IsEmpty() { opts = append(opts, rangefeed.WithInitialScan(func(ctx context.Context) {}), rangefeed.WithScanRetryBehavior(rangefeed.ScanRetryRemaining), @@ -128,12 +126,13 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { return int(s.spec.Config.InitialScanParallelism) }), - rangefeed.WithOnScanCompleted(s.onSpanCompleted), + rangefeed.WithOnScanCompleted(s.onInitialScanSpanCompleted), ) } else { + initialTimestamp = s.spec.PreviousHighWaterTimestamp // When resuming from cursor, advance frontier to the cursor position. for _, sp := range s.spec.Spans { - if _, err := frontier.Forward(sp, s.spec.StartFrom); err != nil { + if _, err := frontier.Forward(sp, s.spec.PreviousHighWaterTimestamp); err != nil { return err } } @@ -141,7 +140,7 @@ func (s *eventStream) Start(ctx context.Context, txn *kv.Txn) error { // Start rangefeed, which spins up a separate go routine to perform it's job. s.rf = s.execCfg.RangeFeedFactory.New( - fmt.Sprintf("streamID=%d", s.streamID), s.spec.StartFrom, s.onValue, opts...) + fmt.Sprintf("streamID=%d", s.streamID), initialTimestamp, s.onValue, opts...) if err := s.rf.Start(ctx, s.spec.Spans); err != nil { return err } @@ -243,10 +242,10 @@ func (s *eventStream) onCheckpoint(ctx context.Context, checkpoint *roachpb.Rang } } -func (s *eventStream) onSpanCompleted(ctx context.Context, sp roachpb.Span) error { +func (s *eventStream) onInitialScanSpanCompleted(ctx context.Context, sp roachpb.Span) error { checkpoint := roachpb.RangeFeedCheckpoint{ Span: sp, - ResolvedTS: s.spec.StartFrom, + ResolvedTS: s.spec.InitialScanTimestamp, } select { case <-ctx.Done(): diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager.go b/pkg/ccl/streamingccl/streamproducer/replication_manager.go index 3d19e4ff6bd0..ebc4f0ccc402 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager.go @@ -31,8 +31,8 @@ type replicationStreamManagerImpl struct { // StartReplicationStream implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) StartReplicationStream( ctx context.Context, tenantName roachpb.TenantName, -) (streampb.StreamID, error) { - return startReplicationStreamJob(ctx, r.evalCtx, r.txn, tenantName) +) (streampb.ReplicationProducerSpec, error) { + return startReplicationProducerJob(ctx, r.evalCtx, r.txn, tenantName) } // HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface. diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go index ebc27596a990..c99f1a7c7af7 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -197,7 +197,7 @@ func startReplication( func testStreamReplicationStatus( t *testing.T, runner *sqlutils.SQLRunner, - streamID string, + streamID streampb.StreamID, streamStatus streampb.StreamReplicationStatus_StreamStatus, ) { checkStreamStatus := func(t *testing.T, frontier hlc.Timestamp, @@ -249,10 +249,10 @@ func TestReplicationStreamInitialization(t *testing.T) { h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '10ms'") h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '1ms'") t.Run("failed-after-timeout", func(t *testing.T) { - rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", testTenantName) - streamID := rows[0][0] + replicationProducerSpec := h.StartReplicationStream(t, testTenantName) + streamID := replicationProducerSpec.StreamID - h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %s", streamID), + h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID), [][]string{{"failed"}}) testStreamReplicationStatus(t, h.SysSQL, streamID, streampb.StreamReplicationStatus_STREAM_INACTIVE) }) @@ -260,16 +260,16 @@ func TestReplicationStreamInitialization(t *testing.T) { // Make sure the stream does not time out within the test timeout h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '500s'") t.Run("continuously-running-within-timeout", func(t *testing.T) { - rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", testTenantName) - streamID := rows[0][0] + replicationProducerSpec := h.StartReplicationStream(t, testTenantName) + streamID := replicationProducerSpec.StreamID - h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %s", streamID), + h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID), [][]string{{"running"}}) // Ensures the job is continuously running for 3 seconds. testDuration, now := 3*time.Second, timeutil.Now() for start, end := now, now.Add(testDuration); start.Before(end); start = start.Add(300 * time.Millisecond) { - h.SysSQL.CheckQueryResults(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %s", streamID), + h.SysSQL.CheckQueryResults(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID), [][]string{{"running"}}) testStreamReplicationStatus(t, h.SysSQL, streamID, streampb.StreamReplicationStatus_STREAM_ACTIVE) } @@ -289,7 +289,7 @@ func TestReplicationStreamInitialization(t *testing.T) { }) t.Run("nonexistent-replication-stream-has-inactive-status", func(t *testing.T) { - testStreamReplicationStatus(t, h.SysSQL, "123", streampb.StreamReplicationStatus_STREAM_INACTIVE) + testStreamReplicationStatus(t, h.SysSQL, streampb.StreamID(123), streampb.StreamReplicationStatus_STREAM_INACTIVE) }) } @@ -297,7 +297,8 @@ func encodeSpec( t *testing.T, h *streamingtest.ReplicationHelper, srcTenant streamingtest.TenantState, - startFrom hlc.Timestamp, + initialScanTime hlc.Timestamp, + previousHighWater hlc.Timestamp, tables ...string, ) []byte { var spans []roachpb.Span @@ -308,8 +309,9 @@ func encodeSpec( } spec := &streampb.StreamPartitionSpec{ - StartFrom: startFrom, - Spans: spans, + InitialScanTimestamp: initialScanTime, + PreviousHighWaterTimestamp: previousHighWater, + Spans: spans, Config: streampb.StreamPartitionSpec_ExecutionConfig{ MinCheckpointFrequency: 10 * time.Millisecond, }, @@ -344,8 +346,9 @@ USE d; `) ctx := context.Background() - rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", testTenantName) - streamID := rows[0][0] + replicationProducerSpec := h.StartReplicationStream(t, testTenantName) + streamID := replicationProducerSpec.StreamID + initialScanTimestamp := replicationProducerSpec.ReplicationStartTime const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)` t1Descr := desctestutils.TestingGetPublicTableDescriptor(h.SysServer.DB(), srcTenant.Codec, "d", "t1") @@ -353,7 +356,7 @@ USE d; t.Run("stream-table-cursor-error", func(t *testing.T) { _, feed := startReplication(t, h, makePartitionStreamDecoder, - streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, hlc.Timestamp{}, "t2")) + streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp, hlc.Timestamp{}, "t2")) defer feed.Close(ctx) subscribedSpan := h.TableSpan(srcTenant.Codec, "t2") @@ -380,7 +383,8 @@ USE d; t.Run("stream-table", func(t *testing.T) { _, feed := startReplication(t, h, makePartitionStreamDecoder, - streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, hlc.Timestamp{}, "t1")) + streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp, + hlc.Timestamp{}, "t1")) defer feed.Close(ctx) expected := streamingtest.EncodeKV(t, srcTenant.Codec, t1Descr, 42) @@ -409,7 +413,8 @@ USE d; srcTenant.SQL.Exec(t, `UPDATE d.t1 SET b = 'мир' WHERE i = 42`) _, feed := startReplication(t, h, makePartitionStreamDecoder, - streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, beforeUpdateTS, "t1")) + streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp, + beforeUpdateTS, "t1")) defer feed.Close(ctx) // We should observe 2 versions of this key: one with ("привет", "world"), and a later @@ -445,7 +450,8 @@ CREATE TABLE t3( addRows(0, 10) source, feed := startReplication(t, h, makePartitionStreamDecoder, - streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, hlc.Timestamp{}, "t1")) + streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp, + hlc.Timestamp{}, "t1")) defer feed.Close(ctx) // Few more rows after feed started. @@ -488,8 +494,8 @@ USE d; `) ctx := context.Background() - rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", testTenantName) - streamID := rows[0][0] + replicationProducerSpec := h.StartReplicationStream(t, testTenantName) + streamID := replicationProducerSpec.StreamID const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)` @@ -506,18 +512,21 @@ USE d; // Make any import operation to be a AddSSTable operation instead of kv writes. h.SysSQL.Exec(t, "SET CLUSTER SETTING kv.bulk_io_write.small_write_size = '1';") - var startTime time.Time + var clockTime time.Time srcTenant.SQL.Exec(t, fmt.Sprintf("CREATE TABLE %s (id INT PRIMARY KEY, n INT)", table)) - srcTenant.SQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&startTime) - startHlcTime := hlc.Timestamp{WallTime: startTime.UnixNano()} - if initialScan { - startHlcTime = hlc.Timestamp{} + srcTenant.SQL.QueryRow(t, "SELECT clock_timestamp()").Scan(&clockTime) + + var previousHighWater hlc.Timestamp + initialScanTimestamp := hlc.Timestamp{WallTime: clockTime.UnixNano()} + if !initialScan { + previousHighWater = hlc.Timestamp{WallTime: clockTime.UnixNano()} } if addSSTableBeforeRangefeed { srcTenant.SQL.Exec(t, fmt.Sprintf("IMPORT INTO %s CSV DATA ($1)", table), dataSrv.URL) } source, feed := startReplication(t, h, makePartitionStreamDecoder, - streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, startHlcTime, table)) + streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp, + previousHighWater, table)) defer feed.Close(ctx) if !addSSTableBeforeRangefeed { srcTenant.SQL.Exec(t, fmt.Sprintf("IMPORT INTO %s CSV DATA ($1)", table), dataSrv.URL) @@ -573,10 +582,8 @@ func TestCompleteStreamReplication(t *testing.T) { "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '2s';", "SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '2s';") - var timedOutStreamID int - row := h.SysSQL.QueryRow(t, - "SELECT crdb_internal.start_replication_stream($1)", testTenantName) - row.Scan(&timedOutStreamID) + replicationProducerSpec := h.StartReplicationStream(t, testTenantName) + timedOutStreamID := replicationProducerSpec.StreamID jobutils.WaitForJobToFail(t, h.SysSQL, jobspb.JobID(timedOutStreamID)) // Makes the producer job not easily time out. @@ -587,10 +594,8 @@ func TestCompleteStreamReplication(t *testing.T) { timedOutStreamID, successfulIngestion) // Create a new replication stream and complete it. - var streamID int - row := h.SysSQL.QueryRow(t, - "SELECT crdb_internal.start_replication_stream($1)", testTenantName) - row.Scan(&streamID) + replicationProducerSpec := h.StartReplicationStream(t, testTenantName) + streamID := replicationProducerSpec.StreamID jobutils.WaitForJobToRun(t, h.SysSQL, jobspb.JobID(streamID)) h.SysSQL.Exec(t, "SELECT crdb_internal.complete_replication_stream($1, $2)", streamID, successfulIngestion) @@ -664,13 +669,15 @@ USE d; `) ctx := context.Background() - rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", testTenantName) - streamID := rows[0][0] + replicationProducerSpec := h.StartReplicationStream(t, testTenantName) + streamID := replicationProducerSpec.StreamID + initialScanTimestamp := replicationProducerSpec.ReplicationStartTime const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)` // Only subscribe to table t1 and t2, not t3. source, feed := startReplication(t, h, makePartitionStreamDecoder, - streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, h.SysServer.Clock().Now(), "t1", "t2")) + streamPartitionQuery, streamID, encodeSpec(t, h, srcTenant, initialScanTimestamp, + hlc.Timestamp{}, "t1", "t2")) defer feed.Close(ctx) // TODO(casper): Replace with DROP TABLE once drop table uses the MVCC-compatible DelRange diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index ce679ac8b9af..e6d4b0bb7f88 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -30,17 +30,17 @@ import ( "github.com/cockroachdb/errors" ) -// startReplicationStreamJob initializes a replication stream producer job on +// startReplicationProducerJob initializes a replication stream producer job on // the source cluster that: // // 1. Tracks the liveness of the replication stream consumption. // 2. Updates the protected timestamp for spans being replicated. -func startReplicationStreamJob( +func startReplicationProducerJob( ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantName roachpb.TenantName, -) (streampb.StreamID, error) { +) (streampb.ReplicationProducerSpec, error) { tenantRecord, err := sql.GetTenantRecordByName(ctx, evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig), txn, tenantName) if err != nil { - return 0, err + return streampb.ReplicationProducerSpec{}, err } tenantID := tenantRecord.ID @@ -48,11 +48,11 @@ func startReplicationStreamJob( hasAdminRole, err := evalCtx.SessionAccessor.HasAdminRole(ctx) if err != nil { - return streampb.InvalidStreamID, err + return streampb.ReplicationProducerSpec{}, err } if !hasAdminRole { - return streampb.InvalidStreamID, errors.New("admin role required to start stream replication jobs") + return streampb.ReplicationProducerSpec{}, errors.New("admin role required to start stream replication jobs") } registry := execConfig.JobRegistry @@ -61,24 +61,25 @@ func startReplicationStreamJob( jr := makeProducerJobRecord(registry, tenantID, timeout, evalCtx.SessionData().User(), ptsID) if _, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil { - return streampb.InvalidStreamID, err + return streampb.ReplicationProducerSpec{}, err } ptp := execConfig.ProtectedTimestampProvider statementTime := hlc.Timestamp{ WallTime: evalCtx.GetStmtTimestamp().UnixNano(), } - deprecatedSpansToProtect := roachpb.Spans{*makeTenantSpan(tenantID)} targetToProtect := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MustMakeTenantID(tenantID)}) - pts := jobsprotectedts.MakeRecord(ptsID, int64(jr.JobID), statementTime, deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect) if err := ptp.Protect(ctx, txn, pts); err != nil { - return streampb.InvalidStreamID, err + return streampb.ReplicationProducerSpec{}, err } - return streampb.StreamID(jr.JobID), nil + return streampb.ReplicationProducerSpec{ + StreamID: streampb.StreamID(jr.JobID), + ReplicationStartTime: statementTime, + }, nil } // Convert the producer job's status into corresponding replication diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index f7fc9ffeb8d5..70de443bbcc5 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -114,6 +114,14 @@ message StreamIngestionDetails { // is the earliest timestamp that the replication job can be cut-over to. int32 replication_ttl_seconds = 11 [(gogoproto.customname) = "ReplicationTTLSeconds"]; + // ReplicationStartTime is the initial timestamp from which the replication + // producer job will begin streaming MVCC revisions. This timestamp is picked + // once when the replication producer job is created, and is never updated + // through the lifetime of a replication stream. This will be the timestamp as + // of which each partition will perform its initial rangefeed scan on the + // source cluster. + util.hlc.Timestamp replication_start_time = 12 [(gogoproto.nullable) = false]; + reserved 5, 6; } @@ -135,10 +143,6 @@ message StreamIngestionProgress { (gogoproto.nullable) = false]; } - // The job will ingest events from StartTime onwards during the current run. - // This may change when the the ingestion job gets resumed from the previous checkpoint. - util.hlc.Timestamp start_time = 3 [(gogoproto.nullable) = false]; - // CutoverTime is set to signal to the stream ingestion job to complete its // ingestion. This involves stopping any subsequent ingestion, and rolling // back any additional ingested data, to bring the ingested cluster to a @@ -153,6 +157,8 @@ message StreamIngestionProgress { // StreamAddresses are the source cluster addresses read from the latest topology. repeated string stream_addresses = 5; + + reserved 3; } message StreamReplicationDetails { diff --git a/pkg/repstream/streampb/stream.proto b/pkg/repstream/streampb/stream.proto index 69e23524efbd..1ee9376cf74a 100644 --- a/pkg/repstream/streampb/stream.proto +++ b/pkg/repstream/streampb/stream.proto @@ -23,11 +23,35 @@ import "util/unresolved_addr.proto"; import "gogoproto/gogo.proto"; import "google/protobuf/duration.proto"; +// ReplicationProducerSpec is the specification returned by the replication +// producer job when it is created. +message ReplicationProducerSpec { + int64 stream_id = 1 [(gogoproto.customname) = "StreamID", (gogoproto.casttype) = "StreamID"]; + + // ReplicationStartTime is the initial timestamp from which the replication + // producer job will begin streaming MVCC revisions. This timestamp is picked + // once when the replication producer job is created, and is never updated + // through the lifetime of a replication stream. This will be the timestamp as + // of which each partition will perform its initial rangefeed scan. + util.hlc.Timestamp replication_start_time = 2 [(gogoproto.nullable) = false]; +} + // StreamPartitionSpec is the stream partition specification. message StreamPartitionSpec { - // start_from specifies the starting point for all spans. If its empty, - // an initial scan is performed. - util.hlc.Timestamp start_from = 1 [(gogoproto.nullable) = false]; + // PreviousHighWaterTimestamp specifies the timestamp from which spans will + // start ingesting data in the replication job. This timestamp is empty unless + // the replication job resumes after a progress checkpoint has been recorded. + // While it is empty we use the InitialScanTimestamp described below. + util.hlc.Timestamp previous_high_water_timestamp = 1 [(gogoproto.nullable) = false]; + + // InitialScanTimestamp is the timestamp at which the partition will run the + // initial rangefeed scan before replicating further changes to the target + // spans. This timestamp is always non-empty, but a partition will only run an + // initial scan if no progress has been recorded prior to the current + // resumption of the replication job. Otherwise, all spans will start + // ingesting data from the PreviousHighWaterTimestamp described above. + util.hlc.Timestamp initial_scan_timestamp = 4 [(gogoproto.nullable) = false]; + // List of spans to stream. repeated roachpb.Span spans = 2 [(gogoproto.nullable) = false]; diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 78d6f90b2bc3..4eb169828020 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1634,6 +1634,11 @@ type StreamingTestingKnobs struct { // before a stream ingestion happens. BeforeIngestionStart func(ctx context.Context) error + // AfterReplicationFlowPlan allows the caller to inspect the ingestion and + // frontier specs generated for the replication job. + AfterReplicationFlowPlan func([]*execinfrapb.StreamIngestionDataSpec, + *execinfrapb.StreamIngestionFrontierSpec) + // OverrideReplicationTTLSeconds will override the default value of the // `ReplicationTTLSeconds` field on the StreamIngestion job details. OverrideReplicationTTLSeconds int diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index f4267d5a0bcd..b42d523c6dc4 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -199,8 +199,20 @@ message StreamIngestionDataSpec { // PartitionSpecs maps partition IDs to their specifications. map partition_specs = 6 [(gogoproto.nullable) = false]; - // The processor will ingest events from StartTime onwards. - optional util.hlc.Timestamp start_time = 2 [(gogoproto.nullable) = false]; + // PreviousHighWaterTimestamp specifies the timestamp from which spans will + // start ingesting data in the replication job. This timestamp is empty unless + // the replication job resumes after a progress checkpoint has been recorded. + // While it is empty we use the InitialScanTimestamp described below. + optional util.hlc.Timestamp previous_high_water_timestamp = 2 [(gogoproto.nullable) = false]; + + // InitialScanTimestamp is the timestamp at which the partition will run the + // initial rangefeed scan before replicating further changes to the target + // spans. This timestamp is always non-empty, but a partition will only run an + // initial scan if no progress has been recorded prior to the current + // resumption of the replication job. Otherwise, all spans will start + // ingesting data from the PreviousHighWaterTimestamp described above. + optional util.hlc.Timestamp initial_scan_timestamp = 11 [(gogoproto.nullable) = false]; + // StreamAddress locate the stream so that a stream client can be initialized. optional string stream_address = 3 [(gogoproto.nullable) = false]; // JobID is the job ID of the stream ingestion job. diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go index a70d92f6c2f7..5ec9f059b332 100644 --- a/pkg/sql/sem/builtins/fixed_oids.go +++ b/pkg/sql/sem/builtins/fixed_oids.go @@ -481,7 +481,7 @@ var builtinOidsBySignature = map[string]oid.Oid{ `crdb_internal.show_create_all_tables(database_name: string) -> string`: 352, `crdb_internal.show_create_all_types(database_name: string) -> string`: 353, `crdb_internal.sql_liveness_is_alive(session_id: bytes) -> bool`: 1353, - `crdb_internal.start_replication_stream(tenant_name: string) -> int`: 2044, + `crdb_internal.start_replication_stream(tenant_name: string) -> bytes`: 2044, `crdb_internal.stream_ingestion_stats_json(job_id: int) -> jsonb`: 1546, `crdb_internal.stream_ingestion_stats_pb(job_id: int) -> bytes`: 1547, `crdb_internal.stream_partition(stream_id: int, partition_spec: bytes) -> bytes`: 1550, diff --git a/pkg/sql/sem/builtins/replication_builtins.go b/pkg/sql/sem/builtins/replication_builtins.go index 7fd78f41815b..db42e6d30dfe 100644 --- a/pkg/sql/sem/builtins/replication_builtins.go +++ b/pkg/sql/sem/builtins/replication_builtins.go @@ -163,18 +163,22 @@ var replicationBuiltins = map[string]builtinDefinition{ Types: tree.ParamTypes{ {Name: "tenant_name", Typ: types.String}, }, - ReturnType: tree.FixedReturnType(types.Int), + ReturnType: tree.FixedReturnType(types.Bytes), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) if err != nil { return nil, err } tenantName := string(tree.MustBeDString(args[0])) - jobID, err := mgr.StartReplicationStream(ctx, roachpb.TenantName(tenantName)) + replicationProducerSpec, err := mgr.StartReplicationStream(ctx, roachpb.TenantName(tenantName)) + if err != nil { + return nil, err + } + rawReplicationProducerSpec, err := protoutil.Marshal(&replicationProducerSpec) if err != nil { return nil, err } - return tree.NewDInt(tree.DInt(jobID)), err + return tree.NewDBytes(tree.DBytes(rawReplicationProducerSpec)), err }, Info: "This function can be used on the producer side to start a replication stream for " + "the specified tenant. The returned stream ID uniquely identifies created stream. " + diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index 883e549ba1ea..e4532c37f067 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -721,7 +721,7 @@ type StreamManagerFactory interface { type ReplicationStreamManager interface { // StartReplicationStream starts a stream replication job for the specified // tenant on the producer side. - StartReplicationStream(ctx context.Context, tenantName roachpb.TenantName) (streampb.StreamID, error) + StartReplicationStream(ctx context.Context, tenantName roachpb.TenantName) (streampb.ReplicationProducerSpec, error) // HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating // consumer has consumed until the given 'frontier' timestamp. This updates the producer job