Skip to content

Commit

Permalink
Merge #73886
Browse files Browse the repository at this point in the history
73886: streamingccl: partitionedStreamClient that consumes partitioned replication streams r=gh-casper a=gh-casper

    Create a partitionedStreamClient that implements streamclient.Client interface and
    talks to source cluster with crdb_internal builtins for replication stream.

    Follow-up PRs will make ingestion processors use this client to the source cluster,
    support backfill checkpoints in the Subscribe API and make more test coverage.

    Release note: none

Co-authored-by: Casper <[email protected]>
  • Loading branch information
craig[bot] and gh-casper committed Jan 14, 2022
2 parents 10b94f6 + 5acdb9a commit d6b99e9
Show file tree
Hide file tree
Showing 30 changed files with 3,088 additions and 167 deletions.
4 changes: 3 additions & 1 deletion docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2534,7 +2534,9 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
<tbody>
<tr><td><a name="crdb_internal.complete_stream_ingestion_job"></a><code>crdb_internal.complete_stream_ingestion_job(job_id: <a href="int.html">int</a>, cutover_ts: <a href="timestamp.html">timestamptz</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used to signal a running stream ingestion job to complete. The job will eventually stop ingesting, revert to the specified timestamp and leave the cluster in a consistent state. The specified timestamp can only be specified up to the microsecond. This function does not wait for the job to reach a terminal state, but instead returns the job id as soon as it has signaled the job to complete. This builtin can be used in conjunction with SHOW JOBS WHEN COMPLETE to ensure that the job has left the cluster in a consistent state.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.replication_stream_progress"></a><code>crdb_internal.replication_stream_progress(stream_id: <a href="int.html">int</a>, frontier_ts: <a href="string.html">string</a>) &rarr; <a href="string.html">string</a></code></td><td><span class="funcdesc"><p>This function can be used on the consumer side to heartbeat its replication progress to a replication stream in the source cluster. The returns a StreamReplicationStatus message that indicates stream status (RUNNING, PAUSED, or STOPPED).</p>
<tr><td><a name="crdb_internal.replication_stream_progress"></a><code>crdb_internal.replication_stream_progress(stream_id: <a href="int.html">int</a>, frontier_ts: <a href="string.html">string</a>) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>This function can be used on the consumer side to heartbeat its replication progress to a replication stream in the source cluster. The returns a StreamReplicationStatus message that indicates stream status (RUNNING, PAUSED, or STOPPED).</p>
</span></td></tr>
<tr><td><a name="crdb_internal.replication_stream_spec"></a><code>crdb_internal.replication_stream_spec(stream_id: <a href="int.html">int</a>) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>This function can be used on the consumer side to get a replication stream specification for the specified stream starting from the specified ‘start_from’ timestamp. The consumer will later call ‘stream_partition’ to a partition with the spec to start streaming.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.start_replication_stream"></a><code>crdb_internal.start_replication_stream(tenant_id: <a href="int.html">int</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.</p>
</span></td></tr>
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ go_library(
srcs = [
"client.go",
"cockroach_sinkless_replication_client.go",
"partitioned_stream_client.go",
"random_stream_client.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streampb",
"//pkg/keys",
"//pkg/roachpb:with-mocks",
"//pkg/security",
Expand All @@ -22,6 +24,7 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
"//pkg/streaming",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
Expand All @@ -39,6 +42,7 @@ go_test(
"client_test.go",
"cockroach_sinkless_replication_client_test.go",
"main_test.go",
"partitioned_stream_client_test.go",
],
embed = [":streamclient"],
deps = [
Expand All @@ -48,21 +52,27 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamingtest",
"//pkg/ccl/streamingccl/streampb",
"//pkg/ccl/streamingccl/streamproducer",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/catalog/catalogkv",
"//pkg/streaming",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/testcluster",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
],
)
49 changes: 38 additions & 11 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,15 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

// Note on data APIs and datatypes. As much as possible, the data that makes
// sense to the source cluster (e.g. checkpoint records, or subscription token, etc)
// is treated as an opaque object (e.g. []bytes) by this API. This opacity is done
// on purpuse as it abstracts the operations on the source cluster behind this API.

// StreamID identifies a stream across both its producer and consumer. It is
// used when the consumer wishes to interact with the stream's producer.
type StreamID uint64
// on purpose as it abstracts the operations on the source cluster behind this API.

// CheckpointToken is emitted by a stream producer to encode information about
// what that producer has emitted, including what spans or timestamps it might
Expand All @@ -36,6 +33,10 @@ type StreamID uint64
// to subscribe to a given partition.
type SubscriptionToken []byte

// CheckpointToken is an opaque identifier which can be used to represent checkpoint
// information to start a stream processor.
type CheckpointToken []byte

// Client provides a way for the stream ingestion job to consume a
// specified stream.
// TODO(57427): The stream client does not yet support the concept of
Expand All @@ -44,7 +45,7 @@ type Client interface {
// Create initializes a stream with the source, potentially reserving any
// required resources, such as protected timestamps, and returns an ID which
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenantID roachpb.TenantID) (StreamID, error)
Create(ctx context.Context, tenantID roachpb.TenantID) (streaming.StreamID, error)

// Destroy informs the source of the stream that it may terminate production
// and release resources such as protected timestamps.
Expand All @@ -54,22 +55,25 @@ type Client interface {
// that source cluster protected timestamp _may_ be advanced up to the passed ts
// (which may be zero if no progress has been made e.g. during backfill).
// TODO(dt): ts -> checkpointToken.
Heartbeat(ctx context.Context, ID StreamID, consumed hlc.Timestamp) error
Heartbeat(ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp) error

// Plan returns a Topology for this stream.
// TODO(dt): separate target argument from address argument.
Plan(ctx context.Context, ID StreamID) (Topology, error)
Plan(ctx context.Context, streamID streaming.StreamID) (Topology, error)

// Subscribe opens and returns a subscription for the specified partition from
// the specified remote address. This is used by each consumer processor to
// open its subscription to its partition of a larger stream.
// TODO(dt): ts -> checkpointToken, return -> Subscription.
// TODO(dt): ts -> checkpointToken.
Subscribe(
ctx context.Context,
stream StreamID,
streamID streaming.StreamID,
spec SubscriptionToken,
checkpoint hlc.Timestamp,
) (chan streamingccl.Event, chan error, error)
) (Subscription, error)

// Close releases all the resources used by this client.
Close() error
}

// Topology is a configuration of stream partitions. These are particular to a
Expand All @@ -86,6 +90,29 @@ type PartitionInfo struct {
SrcLocality roachpb.Locality
}

// Subscription represents subscription to a replication stream partition.
// Typical usage on the call site looks like:
//
// ctxWithCancel, cancelFn := context.WithCancel(ctx)
// g := ctxgroup.WithContext(ctxWithCancel)
// sub := client.Subscribe()
// g.GoCtx(sub.Subscribe)
// g.GoCtx(processEventsAndErrors(sub.Events(), sub.Err()))
// g.Wait()
type Subscription interface {
// Subscribe starts receiving subscription events. Terminates when context
// is cancelled. It will release all resources when the function returns.
Subscribe(ctx context.Context) error

// Events is a channel receiving streaming events.
// This channel is closed when no additional values will be sent to this channel.
Events() <-chan streamingccl.Event

// Err is set once when Events channel closed -- must not be called before
// the channel closes.
Err() error
}

// NewStreamClient creates a new stream client based on the stream
// address.
func NewStreamClient(streamAddress streamingccl.StreamAddress) (Client, error) {
Expand Down
52 changes: 43 additions & 9 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand All @@ -25,27 +26,36 @@ type testStreamClient struct{}
var _ Client = testStreamClient{}

// Create implements the Client interface.
func (sc testStreamClient) Create(ctx context.Context, target roachpb.TenantID) (StreamID, error) {
return StreamID(1), nil
func (sc testStreamClient) Create(
ctx context.Context, target roachpb.TenantID,
) (streaming.StreamID, error) {
return streaming.StreamID(1), nil
}

// Plan implements the Client interface.
func (sc testStreamClient) Plan(ctx context.Context, ID StreamID) (Topology, error) {
func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Topology, error) {
return Topology([]PartitionInfo{
{SrcAddr: streamingccl.PartitionAddress("test://host1")},
{SrcAddr: streamingccl.PartitionAddress("test://host2")},
}), nil
}

// Heartbeat implements the Client interface.
func (sc testStreamClient) Heartbeat(ctx context.Context, ID StreamID, _ hlc.Timestamp) error {
func (sc testStreamClient) Heartbeat(
ctx context.Context, ID streaming.StreamID, _ hlc.Timestamp,
) error {
return nil
}

// Close implements the Client interface.
func (sc testStreamClient) Close() error {
return nil
}

// Subscribe implements the Client interface.
func (sc testStreamClient) Subscribe(
ctx context.Context, stream StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
) (chan streamingccl.Event, chan error, error) {
ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
) (Subscription, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Value: roachpb.Value{
Expand All @@ -59,13 +69,37 @@ func (sc testStreamClient) Subscribe(
events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100})
close(events)

return events, nil, nil
return &testStreamSubscription{
eventCh: events,
}, nil
}

type testStreamSubscription struct {
eventCh chan streamingccl.Event
}

// Subscribe implements the Subscription interface.
func (t testStreamSubscription) Subscribe(ctx context.Context) error {
return nil
}

// Events implements the Subscription interface.
func (t testStreamSubscription) Events() <-chan streamingccl.Event {
return t.eventCh
}

// Err implements the Subscription interface.
func (t testStreamSubscription) Err() error {
return nil
}

// ExampleClientUsage serves as documentation to indicate how a stream
// client could be used.
func ExampleClient() {
client := testStreamClient{}
defer func() {
_ = client.Close()
}()

id, err := client.Create(context.Background(), roachpb.MakeTenantID(1))
if err != nil {
Expand Down Expand Up @@ -111,15 +145,15 @@ func ExampleClient() {

for _, partition := range topology {
// TODO(dt): use Subscribe helper and partition.SrcAddr
eventCh, _ /* errCh */, err := client.Subscribe(context.Background(), id, partition.SubscriptionToken, ts)
sub, err := client.Subscribe(context.Background(), id, partition.SubscriptionToken, ts)
if err != nil {
panic(err)
}

// This example looks for the closing of the channel to terminate the test,
// but an ingestion job should look for another event such as the user
// cutting over to the new cluster to move to the next stage.
for event := range eventCh {
for event := range sub.Events() {
switch event.Type() {
case streamingccl.KVEvent:
kv := event.GetKV()
Expand Down
Loading

0 comments on commit d6b99e9

Please sign in to comment.