Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingccl: tighten replication timestamp semantics #92788

Merged
merged 1 commit into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2650,7 +2650,7 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.replication_stream_spec"></a><code>crdb_internal.replication_stream_spec(stream_id: <a href="int.html">int</a>) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>This function can be used on the consumer side to get a replication stream specification for the specified stream. The consumer will later call ‘stream_partition’ to a partition with the spec to start streaming.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.start_replication_stream"></a><code>crdb_internal.start_replication_stream(tenant_name: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.</p>
<tr><td><a name="crdb_internal.start_replication_stream"></a><code>crdb_internal.start_replication_stream(tenant_name: <a href="string.html">string</a>) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.stream_ingestion_stats_json"></a><code>crdb_internal.stream_ingestion_stats_json(job_id: <a href="int.html">int</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>This function can be used on the ingestion side to get a statistics summary of a stream ingestion job in json format.</p>
</span></td><td>Volatile</td></tr>
Expand Down
10 changes: 3 additions & 7 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Client interface {
// Create initializes a stream with the source, potentially reserving any
// required resources, such as protected timestamps, and returns an ID which
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenant roachpb.TenantName) (streampb.StreamID, error)
Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error)

// Dial checks if the source is able to be connected to for queries
Dial(ctx context.Context) error
Expand All @@ -74,12 +74,8 @@ type Client interface {
// the specified remote address. This is used by each consumer processor to
// open its subscription to its partition of a larger stream.
// TODO(dt): ts -> checkpointToken.
Subscribe(
ctx context.Context,
streamID streampb.StreamID,
spec SubscriptionToken,
checkpoint hlc.Timestamp,
) (Subscription, error)
Subscribe(ctx context.Context, streamID streampb.StreamID, spec SubscriptionToken,
initialScanTime hlc.Timestamp, previousHighWater hlc.Timestamp) (Subscription, error)

// Close releases all the resources used by this client.
Close(ctx context.Context) error
Expand Down
29 changes: 16 additions & 13 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand All @@ -32,19 +33,22 @@ type testStreamClient struct{}
var _ Client = testStreamClient{}

// Dial implements Client interface.
func (sc testStreamClient) Dial(ctx context.Context) error {
func (sc testStreamClient) Dial(_ context.Context) error {
return nil
}

// Create implements the Client interface.
func (sc testStreamClient) Create(
_ context.Context, _ roachpb.TenantName,
) (streampb.StreamID, error) {
return streampb.StreamID(1), nil
) (streampb.ReplicationProducerSpec, error) {
return streampb.ReplicationProducerSpec{
StreamID: streampb.StreamID(1),
ReplicationStartTime: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
}, nil
}

// Plan implements the Client interface.
func (sc testStreamClient) Plan(ctx context.Context, ID streampb.StreamID) (Topology, error) {
func (sc testStreamClient) Plan(_ context.Context, _ streampb.StreamID) (Topology, error) {
return Topology{
Partitions: []PartitionInfo{
{
Expand All @@ -59,19 +63,19 @@ func (sc testStreamClient) Plan(ctx context.Context, ID streampb.StreamID) (Topo

// Heartbeat implements the Client interface.
func (sc testStreamClient) Heartbeat(
ctx context.Context, ID streampb.StreamID, _ hlc.Timestamp,
_ context.Context, _ streampb.StreamID, _ hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
return streampb.StreamReplicationStatus{}, nil
}

// Close implements the Client interface.
func (sc testStreamClient) Close(ctx context.Context) error {
func (sc testStreamClient) Close(_ context.Context) error {
return nil
}

// Subscribe implements the Client interface.
func (sc testStreamClient) Subscribe(
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
_ context.Context, _ streampb.StreamID, _ SubscriptionToken, _ hlc.Timestamp, _ hlc.Timestamp,
) (Subscription, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Expand All @@ -97,9 +101,7 @@ func (sc testStreamClient) Subscribe(
}

// Complete implements the streamclient.Client interface.
func (sc testStreamClient) Complete(
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
func (sc testStreamClient) Complete(_ context.Context, _ streampb.StreamID, _ bool) error {
return nil
}

Expand All @@ -108,7 +110,7 @@ type testStreamSubscription struct {
}

// Subscribe implements the Subscription interface.
func (t testStreamSubscription) Subscribe(ctx context.Context) error {
func (t testStreamSubscription) Subscribe(_ context.Context) error {
return nil
}

Expand Down Expand Up @@ -189,10 +191,11 @@ func ExampleClient() {
_ = client.Close(ctx)
}()

id, err := client.Create(ctx, "system")
prs, err := client.Create(ctx, "system")
if err != nil {
panic(err)
}
id := prs.StreamID

var ingested struct {
ts hlc.Timestamp
Expand Down Expand Up @@ -233,7 +236,7 @@ func ExampleClient() {

for _, partition := range topology.Partitions {
// TODO(dt): use Subscribe helper and partition.SrcAddr
sub, err := client.Subscribe(ctx, id, partition.SubscriptionToken, ts)
sub, err := client.Subscribe(ctx, id, partition.SubscriptionToken, hlc.Timestamp{}, ts)
if err != nil {
panic(err)
}
Expand Down
26 changes: 17 additions & 9 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,21 +74,24 @@ var _ Client = &partitionedStreamClient{}
// Create implements Client interface.
func (p *partitionedStreamClient) Create(
ctx context.Context, tenantName roachpb.TenantName,
) (streampb.StreamID, error) {
) (streampb.ReplicationProducerSpec, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()

p.mu.Lock()
defer p.mu.Unlock()
var streamID streampb.StreamID
var rawReplicationProducerSpec []byte
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantName)
err := row.Scan(&streamID)
err := row.Scan(&rawReplicationProducerSpec)
if err != nil {
return streampb.InvalidStreamID,
errors.Wrapf(err, "error creating replication stream for tenant %s", tenantName)
return streampb.ReplicationProducerSpec{}, errors.Wrapf(err, "error creating replication stream for tenant %s", tenantName)
}
var replicationProducerSpec streampb.ReplicationProducerSpec
if err := protoutil.Unmarshal(rawReplicationProducerSpec, &replicationProducerSpec); err != nil {
return streampb.ReplicationProducerSpec{}, err
}

return streamID, err
return replicationProducerSpec, err
}

// Dial implements Client interface.
Expand Down Expand Up @@ -195,7 +198,11 @@ func (p *partitionedStreamClient) Close(ctx context.Context) error {

// Subscribe implements Client interface.
func (p *partitionedStreamClient) Subscribe(
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
ctx context.Context,
streamID streampb.StreamID,
spec SubscriptionToken,
initialScanTime hlc.Timestamp,
previousHighWater hlc.Timestamp,
) (Subscription, error) {
_, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe")
defer sp.Finish()
Expand All @@ -204,7 +211,8 @@ func (p *partitionedStreamClient) Subscribe(
if err := protoutil.Unmarshal(spec, &sps); err != nil {
return nil, err
}
sps.StartFrom = checkpoint
sps.InitialScanTimestamp = initialScanTime
sps.PreviousHighWaterTimestamp = previousHighWater

specBytes, err := protoutil.Marshal(&sps)
if err != nil {
Expand All @@ -215,7 +223,7 @@ func (p *partitionedStreamClient) Subscribe(
eventsChan: make(chan streamingccl.Event),
srcConnConfig: p.pgxConfig,
specBytes: specBytes,
streamID: stream,
streamID: streamID,
closeChan: make(chan struct{}),
}
p.mu.Lock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ INSERT INTO d.t2 VALUES (2);
[][]string{{string(status)}})
}

streamID, err := client.Create(ctx, testTenantName)
rps, err := client.Create(ctx, testTenantName)
require.NoError(t, err)
streamID := rps.StreamID
// We can create multiple replication streams for the same tenant.
_, err = client.Create(ctx, testTenantName)
require.NoError(t, err)
Expand Down Expand Up @@ -166,6 +167,8 @@ INSERT INTO d.t2 VALUES (2);
require.NoError(t, err)
require.Equal(t, streampb.StreamReplicationStatus_STREAM_INACTIVE, status.StreamStatus)

initialScanTimestamp := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}

// Testing client.Subscribe()
makePartitionSpec := func(tables ...string) *streampb.StreamPartitionSpec {
var spans []roachpb.Span
Expand All @@ -176,7 +179,8 @@ INSERT INTO d.t2 VALUES (2);
}

return &streampb.StreamPartitionSpec{
Spans: spans,
InitialScanTimestamp: initialScanTimestamp,
Spans: spans,
Config: streampb.StreamPartitionSpec_ExecutionConfig{
MinCheckpointFrequency: 10 * time.Millisecond,
},
Expand All @@ -199,7 +203,8 @@ INSERT INTO d.t2 VALUES (2);
require.NoError(t, subClient.Close(ctx))
}()
require.NoError(t, err)
sub, err := subClient.Subscribe(ctx, streamID, encodeSpec("t1"), hlc.Timestamp{})
sub, err := subClient.Subscribe(ctx, streamID, encodeSpec("t1"),
initialScanTimestamp, hlc.Timestamp{})
require.NoError(t, err)

rf := streamingtest.MakeReplicationFeed(t, &subscriptionFeedSource{sub: sub})
Expand Down Expand Up @@ -244,8 +249,9 @@ INSERT INTO d.t2 VALUES (2);
h.SysSQL.Exec(t, `
SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '200ms';
`)
streamID, err = client.Create(ctx, testTenantName)
rps, err = client.Create(ctx, testTenantName)
require.NoError(t, err)
streamID = rps.StreamID
require.NoError(t, client.Complete(ctx, streamID, true))
h.SysSQL.CheckQueryResultsRetry(t,
fmt.Sprintf("SELECT status FROM [SHOW JOBS] WHERE job_id = %d", streamID), [][]string{{"succeeded"}})
Expand Down
15 changes: 11 additions & 4 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,9 +383,12 @@ func (m *RandomStreamClient) Plan(ctx context.Context, _ streampb.StreamID) (Top
// Create implements the Client interface.
func (m *RandomStreamClient) Create(
ctx context.Context, tenantName roachpb.TenantName,
) (streampb.StreamID, error) {
) (streampb.ReplicationProducerSpec, error) {
log.Infof(ctx, "creating random stream for tenant %s", tenantName)
return streampb.StreamID(1), nil
return streampb.ReplicationProducerSpec{
StreamID: streampb.StreamID(1),
ReplicationStartTime: hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
}, nil
}

// Heartbeat implements the Client interface.
Expand Down Expand Up @@ -457,7 +460,11 @@ func (m *RandomStreamClient) Close(_ context.Context) error {

// Subscribe implements the Client interface.
func (m *RandomStreamClient) Subscribe(
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
_ context.Context,
_ streampb.StreamID,
spec SubscriptionToken,
initialScanTime hlc.Timestamp,
_ hlc.Timestamp,
) (Subscription, error) {
partitionURL, err := url.Parse(string(spec))
if err != nil {
Expand All @@ -471,7 +478,7 @@ func (m *RandomStreamClient) Subscribe(

eventCh := make(chan streamingccl.Event)
now := timeutil.Now()
startWalltime := timeutil.Unix(0 /* sec */, checkpoint.WallTime)
startWalltime := timeutil.Unix(0 /* sec */, initialScanTime.WallTime)
if startWalltime.After(now) {
panic("cannot start random stream client event stream in the future")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -245,7 +246,10 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
OldID: roachpb.MustMakeTenantID(tenantID),
NewID: roachpb.MustMakeTenantID(tenantID + 10),
}
spec.StartTime = tc.frontierStartTime
spec.PreviousHighWaterTimestamp = tc.frontierStartTime
if tc.frontierStartTime.IsEmpty() {
spec.InitialScanTimestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
}
spec.Checkpoint.ResolvedSpans = tc.jobCheckpoint
proc, err := newStreamIngestionDataProcessor(ctx, &flowCtx, 0 /* processorID */, spec, &post, out)
require.NoError(t, err)
Expand Down Expand Up @@ -331,8 +335,8 @@ func TestStreamIngestionFrontierProcessor(t *testing.T) {
require.NoError(t, err)
progress := job.Progress().Progress
if progress == nil {
if !heartbeatTs.Equal(spec.StartTime) {
t.Fatalf("heartbeat %v should equal start time of %v", heartbeatTs, spec.StartTime)
if !heartbeatTs.Equal(spec.InitialScanTimestamp) {
t.Fatalf("heartbeat %v should equal start time of %v", heartbeatTs, spec.InitialScanTimestamp)
}
} else {
persistedHighwater := *progress.(*jobspb.Progress_HighWater).HighWater
Expand Down
29 changes: 20 additions & 9 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,14 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
progress := ingestionJob.Progress()
streamAddress := streamingccl.StreamAddress(details.StreamAddress)

startTime := progress.GetStreamIngest().StartTime
var previousHighWater, heartbeatTimestamp hlc.Timestamp
initialScanTimestamp := details.ReplicationStartTime
// Start from the last checkpoint if it exists.
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
startTime = *h
previousHighWater = *h
heartbeatTimestamp = previousHighWater
} else {
heartbeatTimestamp = initialScanTimestamp
}

// Initialize a stream client and resolve topology.
Expand All @@ -229,7 +233,7 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
streamID := streampb.StreamID(details.StreamID)
updateRunningStatus(ctx, ingestionJob, fmt.Sprintf("connecting to the producer job %d "+
"and creating a stream replication plan", streamID))
if err := waitUntilProducerActive(ctx, client, streamID, startTime, ingestionJob.ID()); err != nil {
if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil {
return err
}

Expand All @@ -241,9 +245,6 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.

// TODO(casper): update running status
err = ingestionJob.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if md.Progress.GetStreamIngest().StartTime.Less(startTime) {
md.Progress.GetStreamIngest().StartTime = startTime
}
md.Progress.GetStreamIngest().StreamAddresses = topology.StreamAddresses()
ju.UpdateProgress(md.Progress)
return nil
Expand All @@ -252,8 +253,14 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.
return errors.Wrap(err, "failed to update job progress")
}

log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s",
ingestionJob.ID(), progress.GetStreamIngest().StartTime)
if previousHighWater.IsEmpty() {
lidorcarmel marked this conversation as resolved.
Show resolved Hide resolved
log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s",
ingestionJob.ID(), initialScanTimestamp)
} else {
log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s",
ingestionJob.ID(), previousHighWater)
}

ingestProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest
checkpoint := ingestProgress.Checkpoint

Expand All @@ -268,12 +275,16 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, sqlInstanceIDs, progress.GetStreamIngest().StartTime, checkpoint,
streamAddress, topology, sqlInstanceIDs, initialScanTimestamp, previousHighWater, checkpoint,
ingestionJob.ID(), streamID, topology.SourceTenantID, details.DestinationTenantID)
if err != nil {
return err
}

if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.AfterReplicationFlowPlan != nil {
knobs.AfterReplicationFlowPlan(streamIngestionSpecs, streamIngestionFrontierSpec)
}

// Plan and run the DistSQL flow.
log.Infof(ctx, "starting to run DistSQL flow for stream ingestion job %d",
ingestionJob.ID())
Expand Down
Loading