Skip to content

Commit

Permalink
streamingccl: tighten replication timestamp semantics
Browse files Browse the repository at this point in the history
Previously, each partition would reach out to the
source cluster and pick its own timestamp from which it
would start ingesting MVCC versions. This timestamp was
used by the rangefeed setup by the partition, to run its
initial scan. Eventually, all the partitions would replicate
up until a certain timestamp and cause the frontier to be
bumped but it was possible for different partitions to begin
ingesting at different timestamps.

This change makes it such that during replication planning when
we create the producer job on the source cluster, we return a timestamp
alongwith the StreamID. This becomes the timestamp at which each
ingestion partition sets up the inital scan of the rangefeed,
and consequently become the inital timestamp at which all data
is ingested. We stash this timestamp in the replication job
details and never update its value. On future resumptions of the
replication job, if there is a progress high water, we will not
run an initial rangefeed scan but instead start the rangefeed from
the previous progress highwater.

The motivation for this change was to know the lower bound on
both the source and destination cluster for MVCC versions that
have been streamed. This is necessary to bound the fingerprinting
on both clusters to ensure a match.

Release note: None

Fixes: #92742
  • Loading branch information
adityamaru committed Dec 1, 2022
1 parent ae37120 commit 6cf6ab4
Show file tree
Hide file tree
Showing 23 changed files with 305 additions and 182 deletions.
2 changes: 1 addition & 1 deletion docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2650,7 +2650,7 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.replication_stream_spec"></a><code>crdb_internal.replication_stream_spec(stream_id: <a href="int.html">int</a>) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>This function can be used on the consumer side to get a replication stream specification for the specified stream. The consumer will later call ‘stream_partition’ to a partition with the spec to start streaming.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.start_replication_stream"></a><code>crdb_internal.start_replication_stream(tenant_name: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.</p>
<tr><td><a name="crdb_internal.start_replication_stream"></a><code>crdb_internal.start_replication_stream(tenant_name: <a href="string.html">string</a>) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.stream_ingestion_stats_json"></a><code>crdb_internal.stream_ingestion_stats_json(job_id: <a href="int.html">int</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>This function can be used on the ingestion side to get a statistics summary of a stream ingestion job in json format.</p>
</span></td><td>Volatile</td></tr>
Expand Down
10 changes: 3 additions & 7 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Client interface {
// Create initializes a stream with the source, potentially reserving any
// required resources, such as protected timestamps, and returns an ID which
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenant roachpb.TenantName) (streampb.StreamID, error)
Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error)

// Dial checks if the source is able to be connected to for queries
Dial(ctx context.Context) error
Expand All @@ -74,12 +74,8 @@ type Client interface {
// the specified remote address. This is used by each consumer processor to
// open its subscription to its partition of a larger stream.
// TODO(dt): ts -> checkpointToken.
Subscribe(
ctx context.Context,
streamID streampb.StreamID,
spec SubscriptionToken,
checkpoint hlc.Timestamp,
) (Subscription, error)
Subscribe(ctx context.Context, streamID streampb.StreamID, spec SubscriptionToken,
initialScanTime hlc.Timestamp, previousHighWater hlc.Timestamp) (Subscription, error)

// Close releases all the resources used by this client.
Close(ctx context.Context) error
Expand Down
29 changes: 16 additions & 13 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand All @@ -32,19 +33,22 @@ type testStreamClient struct{}
var _ Client = testStreamClient{}

// Dial implements Client interface.
func (sc testStreamClient) Dial(ctx context.Context) error {
func (sc testStreamClient) Dial(_ context.Context) error {
return nil
}

// Create implements the Client interface.
func (sc testStreamClient) Create(
_ context.Context, _ roachpb.TenantName,
) (streampb.StreamID, error) {
return streampb.StreamID(1), nil
) (streampb.ReplicationProducerSpec, error) {
return streampb.ReplicationProducerSpec{
StreamID: streampb.StreamID(1),
ReplicationStartTime: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
}, nil
}

// Plan implements the Client interface.
func (sc testStreamClient) Plan(ctx context.Context, ID streampb.StreamID) (Topology, error) {
func (sc testStreamClient) Plan(_ context.Context, _ streampb.StreamID) (Topology, error) {
return Topology{
Partitions: []PartitionInfo{
{
Expand All @@ -59,19 +63,19 @@ func (sc testStreamClient) Plan(ctx context.Context, ID streampb.StreamID) (Topo

// Heartbeat implements the Client interface.
func (sc testStreamClient) Heartbeat(
ctx context.Context, ID streampb.StreamID, _ hlc.Timestamp,
_ context.Context, _ streampb.StreamID, _ hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
return streampb.StreamReplicationStatus{}, nil
}

// Close implements the Client interface.
func (sc testStreamClient) Close(ctx context.Context) error {
func (sc testStreamClient) Close(_ context.Context) error {
return nil
}

// Subscribe implements the Client interface.
func (sc testStreamClient) Subscribe(
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
_ context.Context, _ streampb.StreamID, _ SubscriptionToken, _ hlc.Timestamp, _ hlc.Timestamp,
) (Subscription, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Expand All @@ -97,9 +101,7 @@ func (sc testStreamClient) Subscribe(
}

// Complete implements the streamclient.Client interface.
func (sc testStreamClient) Complete(
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
func (sc testStreamClient) Complete(_ context.Context, _ streampb.StreamID, _ bool) error {
return nil
}

Expand All @@ -108,7 +110,7 @@ type testStreamSubscription struct {
}

// Subscribe implements the Subscription interface.
func (t testStreamSubscription) Subscribe(ctx context.Context) error {
func (t testStreamSubscription) Subscribe(_ context.Context) error {
return nil
}

Expand Down Expand Up @@ -180,10 +182,11 @@ func ExampleClient() {
_ = client.Close(ctx)
}()

id, err := client.Create(ctx, "system")
prs, err := client.Create(ctx, "system")
if err != nil {
panic(err)
}
id := prs.StreamID

var ingested struct {
ts hlc.Timestamp
Expand Down Expand Up @@ -224,7 +227,7 @@ func ExampleClient() {

for _, partition := range topology.Partitions {
// TODO(dt): use Subscribe helper and partition.SrcAddr
sub, err := client.Subscribe(ctx, id, partition.SubscriptionToken, ts)
sub, err := client.Subscribe(ctx, id, partition.SubscriptionToken, hlc.Timestamp{}, ts)
if err != nil {
panic(err)
}
Expand Down
26 changes: 17 additions & 9 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,24 @@ var _ Client = &partitionedStreamClient{}
// Create implements Client interface.
func (p *partitionedStreamClient) Create(
ctx context.Context, tenantName roachpb.TenantName,
) (streampb.StreamID, error) {
) (streampb.ReplicationProducerSpec, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()

p.mu.Lock()
defer p.mu.Unlock()
var streamID streampb.StreamID
var rawReplicationProducerSpec []byte
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName)
err := row.Scan(&streamID)
err := row.Scan(&rawReplicationProducerSpec)
if err != nil {
return streampb.InvalidStreamID,
errors.Wrapf(err, "error creating replication stream for tenant %s", tenantName)
return streampb.ReplicationProducerSpec{}, errors.Wrapf(err, "error creating replication stream for tenant %s", tenantName)
}
var replicationProducerSpec streampb.ReplicationProducerSpec
if err := protoutil.Unmarshal(rawReplicationProducerSpec, &replicationProducerSpec); err != nil {
return streampb.ReplicationProducerSpec{}, err
}

return streamID, err
return replicationProducerSpec, err
}

// Dial implements Client interface.
Expand Down Expand Up @@ -195,7 +198,11 @@ func (p *partitionedStreamClient) Close(ctx context.Context) error {

// Subscribe implements Client interface.
func (p *partitionedStreamClient) Subscribe(
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
ctx context.Context,
streamID streampb.StreamID,
spec SubscriptionToken,
initialScanTime hlc.Timestamp,
previousHighWater hlc.Timestamp,
) (Subscription, error) {
_, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe")
defer sp.Finish()
Expand All @@ -204,7 +211,8 @@ func (p *partitionedStreamClient) Subscribe(
if err := protoutil.Unmarshal(spec, &sps); err != nil {
return nil, err
}
sps.StartFrom = checkpoint
sps.InitialScanTimestamp = initialScanTime
sps.PreviousHighWaterTimestamp = previousHighWater

specBytes, err := protoutil.Marshal(&sps)
if err != nil {
Expand All @@ -215,7 +223,7 @@ func (p *partitionedStreamClient) Subscribe(
eventsChan: make(chan streamingccl.Event),
srcConnConfig: p.pgxConfig,
specBytes: specBytes,
streamID: stream,
streamID: streamID,
closeChan: make(chan struct{}),
}
p.mu.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ INSERT INTO d.t2 VALUES (2);
[][]string{{string(status)}})
}

streamID, err := client.Create(ctx, testTenantName)
rps, err := client.Create(ctx, testTenantName)
require.NoError(t, err)
streamID := rps.StreamID
// We can create multiple replication streams for the same tenant.
_, err = client.Create(ctx, testTenantName)
require.NoError(t, err)
Expand Down Expand Up @@ -166,6 +167,8 @@ INSERT INTO d.t2 VALUES (2);
require.NoError(t, err)
require.Equal(t, streampb.StreamReplicationStatus_STREAM_INACTIVE, status.StreamStatus)

initialScanTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}

// Testing client.Subscribe()
makePartitionSpec := func(tables ...string) *streampb.StreamPartitionSpec {
var spans []roachpb.Span
Expand All @@ -176,7 +179,8 @@ INSERT INTO d.t2 VALUES (2);
}

return &streampb.StreamPartitionSpec{
Spans: spans,
InitialScanTimestamp: initialScanTimestamp,
Spans: spans,
Config: streampb.StreamPartitionSpec_ExecutionConfig{
MinCheckpointFrequency: 10 * time.Millisecond,
},
Expand All @@ -199,7 +203,8 @@ INSERT INTO d.t2 VALUES (2);
require.NoError(t, subClient.Close(ctx))
}()
require.NoError(t, err)
sub, err := subClient.Subscribe(ctx, streamID, encodeSpec("t1"), hlc.Timestamp{})
sub, err := subClient.Subscribe(ctx, streamID, encodeSpec("t1"),
initialScanTimestamp, hlc.Timestamp{})
require.NoError(t, err)

rf := streamingtest.MakeReplicationFeed(t, &subscriptionFeedSource{sub: sub})
Expand Down Expand Up @@ -244,8 +249,9 @@ INSERT INTO d.t2 VALUES (2);
h.SysSQL.Exec(t, `
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms';
`)
streamID, err = client.Create(ctx, testTenantName)
rps, err = client.Create(ctx, testTenantName)
require.NoError(t, err)
streamID = rps.StreamID
require.NoError(t, client.Complete(ctx, streamID, true))
h.SysSQL.CheckQueryResultsRetry(t,
fmt.Sprintf("SELECT status FROM [SHOW JOBS] WHERE job_id = %d", streamID), [][]string{{"succeeded"}})
Expand Down
15 changes: 11 additions & 4 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,12 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top
// Create implements the Client interface.
func (m *RandomStreamClient) Create(
ctx context.Context, tenantName roachpb.TenantName,
) (streampb.StreamID, error) {
) (streampb.ReplicationProducerSpec, error) {
log.Infof(ctx, "creating random stream for tenant %s", tenantName)
return streampb.StreamID(1), nil
return streampb.ReplicationProducerSpec{
StreamID: streampb.StreamID(1),
ReplicationStartTime: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
}, nil
}

// Heartbeat implements the Client interface.
Expand Down Expand Up @@ -457,7 +460,11 @@ func (m *RandomStreamClient) Close(_ context.Context) error {

// Subscribe implements the Client interface.
func (m *RandomStreamClient) Subscribe(
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
_ context.Context,
_ streampb.StreamID,
spec SubscriptionToken,
initialScanTime hlc.Timestamp,
_ hlc.Timestamp,
) (Subscription, error) {
partitionURL, err := url.Parse(string(spec))
if err != nil {
Expand All @@ -471,7 +478,7 @@ func (m *RandomStreamClient) Subscribe(

eventCh := make(chan streamingccl.Event)
now := timeutil.Now()
startWalltime := timeutil.Unix(0 /* sec */, checkpoint.WallTime)
startWalltime := timeutil.Unix(0 /* sec */, initialScanTime.WallTime)
if startWalltime.After(now) {
panic("cannot start random stream client event stream in the future")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -245,7 +246,10 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
OldID: roachpb.MustMakeTenantID(tenantID),
NewID: roachpb.MustMakeTenantID(tenantID + 10),
}
spec.StartTime = tc.frontierStartTime
spec.PreviousHighWaterTimestamp = tc.frontierStartTime
if tc.frontierStartTime.IsEmpty() {
spec.InitialScanTimestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
}
spec.Checkpoint.ResolvedSpans = tc.jobCheckpoint
proc, err := newStreamIngestionDataProcessor(ctx, &flowCtx, 0 /* processorID */, spec, &post, out)
require.NoError(t, err)
Expand Down Expand Up @@ -331,8 +335,8 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
require.NoError(t, err)
progress := job.Progress().Progress
if progress == nil {
if !heartbeatTs.Equal(spec.StartTime) {
t.Fatalf("heartbeat %v should equal start time of %v", heartbeatTs, spec.StartTime)
if !heartbeatTs.Equal(spec.InitialScanTimestamp) {
t.Fatalf("heartbeat %v should equal start time of %v", heartbeatTs, spec.InitialScanTimestamp)
}
} else {
persistedHighwater := *progress.(*jobspb.Progress_HighWater).HighWater
Expand Down
25 changes: 16 additions & 9 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,14 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
progress := ingestionJob.Progress()
streamAddress := streamingccl.StreamAddress(details.StreamAddress)

startTime := progress.GetStreamIngest().StartTime
var previousHighWater, heartbeatTimestamp hlc.Timestamp
initialScanTimestamp := details.ReplicationStartTime
// Start from the last checkpoint if it exists.
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
startTime = *h
previousHighWater = *h
heartbeatTimestamp = previousHighWater
} else {
heartbeatTimestamp = initialScanTimestamp
}

// Initialize a stream client and resolve topology.
Expand All @@ -229,7 +233,7 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
streamID := streampb.StreamID(details.StreamID)
updateRunningStatus(ctx, ingestionJob, fmt.Sprintf("connecting to the producer job %d "+
"and creating a stream replication plan", streamID))
if err := waitUntilProducerActive(ctx, client, streamID, startTime, ingestionJob.ID()); err != nil {
if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil {
return err
}

Expand All @@ -241,9 +245,6 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.

// TODO(casper): update running status
err = ingestionJob.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if md.Progress.GetStreamIngest().StartTime.Less(startTime) {
md.Progress.GetStreamIngest().StartTime = startTime
}
md.Progress.GetStreamIngest().StreamAddresses = topology.StreamAddresses()
ju.UpdateProgress(md.Progress)
return nil
Expand All @@ -252,8 +253,14 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
return errors.Wrap(err, "failed to update job progress")
}

log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s",
ingestionJob.ID(), progress.GetStreamIngest().StartTime)
if previousHighWater.IsEmpty() {
log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s",
ingestionJob.ID(), initialScanTimestamp)
} else {
log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s",
ingestionJob.ID(), previousHighWater)
}

ingestProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest
checkpoint := ingestProgress.Checkpoint

Expand All @@ -268,7 +275,7 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, sqlInstanceIDs, progress.GetStreamIngest().StartTime, checkpoint,
streamAddress, topology, sqlInstanceIDs, initialScanTimestamp, previousHighWater, checkpoint,
ingestionJob.ID(), streamID, topology.SourceTenantID, details.DestinationTenantID)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 6cf6ab4

Please sign in to comment.