Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streampb: move out of CCL #91806

Merged
merged 1 commit into from
Nov 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@
/pkg/settings/ @cockroachdb/unowned
/pkg/spanconfig/ @cockroachdb/kv-prs
/pkg/startupmigrations/ @cockroachdb/unowned @cockroachdb/sql-schema
/pkg/streaming/ @cockroachdb/disaster-recovery
/pkg/repstream/ @cockroachdb/disaster-recovery
/pkg/testutils/ @cockroachdb/test-eng-noreview
/pkg/testutils/reduce/ @cockroachdb/sql-queries
/pkg/testutils/sqlutils/ @cockroachdb/sql-queries
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/redact_safe.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ pkg/base/node_id.go | `*SQLIDContainer`
pkg/base/node_id.go | `*StoreIDContainer`
pkg/ccl/backupccl/backuppb/backup.go | `sz`
pkg/ccl/backupccl/backuppb/backup.go | `timing`
pkg/ccl/streamingccl/streampb/streamid.go | `StreamID`
pkg/cli/exit/exit.go | `Code`
pkg/jobs/jobspb/wrap.go | `Type`
pkg/kv/bulk/bulk_metrics.go | `sz`
Expand All @@ -18,6 +17,7 @@ pkg/kv/kvserver/concurrency/lock/locking.go | `Durability`
pkg/kv/kvserver/concurrency/lock/locking.go | `Strength`
pkg/kv/kvserver/concurrency/lock/locking.go | `WaitPolicy`
pkg/kv/kvserver/kvserverpb/raft.go | `SnapshotRequest_Type`
pkg/repstream/streampb/streamid.go | `StreamID`
pkg/roachpb/data.go | `LeaseSequence`
pkg/roachpb/data.go | `ReplicaChangeType`
pkg/roachpb/data.go | `TransactionStatus`
Expand Down
8 changes: 4 additions & 4 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,6 @@ GO_TARGETS = [
"//pkg/ccl/streamingccl/streamingest:streamingest",
"//pkg/ccl/streamingccl/streamingest:streamingest_test",
"//pkg/ccl/streamingccl/streamingtest:streamingtest",
"//pkg/ccl/streamingccl/streampb:streampb",
"//pkg/ccl/streamingccl/streamproducer:streamproducer",
"//pkg/ccl/streamingccl/streamproducer:streamproducer_test",
"//pkg/ccl/streamingccl:streamingccl",
Expand Down Expand Up @@ -1248,6 +1247,8 @@ GO_TARGETS = [
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:resource",
"//pkg/obsservice/obspb:obspb",
"//pkg/release:release",
"//pkg/repstream/streampb:streampb",
"//pkg/repstream:repstream",
"//pkg/roachpb/gen:gen",
"//pkg/roachpb/gen:gen_lib",
"//pkg/roachpb/roachpbmock:roachpbmock",
Expand Down Expand Up @@ -1834,7 +1835,6 @@ GO_TARGETS = [
"//pkg/storage/metamorphic:metamorphic_test",
"//pkg/storage:storage",
"//pkg/storage:storage_test",
"//pkg/streaming:streaming",
"//pkg/testutils/buildutil:buildutil",
"//pkg/testutils/colcontainerutils:colcontainerutils",
"//pkg/testutils/diagutils:diagutils",
Expand Down Expand Up @@ -2279,7 +2279,6 @@ GET_X_DATA_TARGETS = [
"//pkg/ccl/streamingccl/streamclient:get_x_data",
"//pkg/ccl/streamingccl/streamingest:get_x_data",
"//pkg/ccl/streamingccl/streamingtest:get_x_data",
"//pkg/ccl/streamingccl/streampb:get_x_data",
"//pkg/ccl/streamingccl/streamproducer:get_x_data",
"//pkg/ccl/telemetryccl:get_x_data",
"//pkg/ccl/testccl/authccl:get_x_data",
Expand Down Expand Up @@ -2542,6 +2541,8 @@ GET_X_DATA_TARGETS = [
"//pkg/obsservice/obspb/opentelemetry-proto/logs/v1:get_x_data",
"//pkg/obsservice/obspb/opentelemetry-proto/resource/v1:get_x_data",
"//pkg/release:get_x_data",
"//pkg/repstream:get_x_data",
"//pkg/repstream/streampb:get_x_data",
"//pkg/roachpb:get_x_data",
"//pkg/roachpb/gen:get_x_data",
"//pkg/roachpb/roachpbmock:get_x_data",
Expand Down Expand Up @@ -2903,7 +2904,6 @@ GET_X_DATA_TARGETS = [
"//pkg/storage/enginepb:get_x_data",
"//pkg/storage/fs:get_x_data",
"//pkg/storage/metamorphic:get_x_data",
"//pkg/streaming:get_x_data",
"//pkg/testutils:get_x_data",
"//pkg/testutils/buildutil:get_x_data",
"//pkg/testutils/colcontainerutils:get_x_data",
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/streamingccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl/streampb",
"//pkg/jobs/jobspb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/storage",
"//pkg/streaming",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/streamingccl/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,18 @@ package streamingccl
import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
)

// StreamStatusErr is an error that encapsulate a replication stream's inactive status.
type StreamStatusErr struct {
StreamID streaming.StreamID
StreamID streampb.StreamID
StreamStatus streampb.StreamReplicationStatus_StreamStatus
}

// NewStreamStatusErr creates a new StreamStatusErr.
func NewStreamStatusErr(
streamID streaming.StreamID, streamStatus streampb.StreamReplicationStatus_StreamStatus,
streamID streampb.StreamID, streamStatus streampb.StreamReplicationStatus_StreamStatus,
) StreamStatusErr {
return StreamStatusErr{
StreamID: streamID,
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streampb",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/sql",
Expand All @@ -26,7 +26,6 @@ go_library(
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/valueside",
"//pkg/sql/sem/tree",
"//pkg/streaming",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
Expand Down Expand Up @@ -55,18 +54,17 @@ go_test(
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamingtest",
"//pkg/ccl/streamingccl/streampb",
"//pkg/ccl/streamingccl/streamproducer",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/pgwire/pgcode",
"//pkg/streaming",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
Expand Down
13 changes: 6 additions & 7 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -48,7 +47,7 @@ type Client interface {
// Create initializes a stream with the source, potentially reserving any
// required resources, such as protected timestamps, and returns an ID which
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenantID roachpb.TenantID) (streaming.StreamID, error)
Create(ctx context.Context, tenantID roachpb.TenantID) (streampb.StreamID, error)

// Dial checks if the source is able to be connected to for queries
Dial(ctx context.Context) error
Expand All @@ -63,21 +62,21 @@ type Client interface {
// TODO(dt): ts -> checkpointToken.
Heartbeat(
ctx context.Context,
streamID streaming.StreamID,
streamID streampb.StreamID,
consumed hlc.Timestamp,
) (streampb.StreamReplicationStatus, error)

// Plan returns a Topology for this stream.
// TODO(dt): separate target argument from address argument.
Plan(ctx context.Context, streamID streaming.StreamID) (Topology, error)
Plan(ctx context.Context, streamID streampb.StreamID) (Topology, error)

// Subscribe opens and returns a subscription for the specified partition from
// the specified remote address. This is used by each consumer processor to
// open its subscription to its partition of a larger stream.
// TODO(dt): ts -> checkpointToken.
Subscribe(
ctx context.Context,
streamID streaming.StreamID,
streamID streampb.StreamID,
spec SubscriptionToken,
checkpoint hlc.Timestamp,
) (Subscription, error)
Expand All @@ -86,7 +85,7 @@ type Client interface {
Close(ctx context.Context) error

// Complete completes a replication stream consumption.
Complete(ctx context.Context, streamID streaming.StreamID, successfulIngestion bool) error
Complete(ctx context.Context, streamID streampb.StreamID, successfulIngestion bool) error
}

// Topology is a configuration of stream partitions. These are particular to a
Expand Down
15 changes: 7 additions & 8 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -40,12 +39,12 @@ func (sc testStreamClient) Dial(ctx context.Context) error {
// Create implements the Client interface.
func (sc testStreamClient) Create(
ctx context.Context, target roachpb.TenantID,
) (streaming.StreamID, error) {
return streaming.StreamID(1), nil
) (streampb.StreamID, error) {
return streampb.StreamID(1), nil
}

// Plan implements the Client interface.
func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Topology, error) {
func (sc testStreamClient) Plan(ctx context.Context, ID streampb.StreamID) (Topology, error) {
return Topology{
{SrcAddr: "test://host1"},
{SrcAddr: "test://host2"},
Expand All @@ -54,7 +53,7 @@ func (sc testStreamClient) Plan(ctx context.Context, ID streaming.StreamID) (Top

// Heartbeat implements the Client interface.
func (sc testStreamClient) Heartbeat(
ctx context.Context, ID streaming.StreamID, _ hlc.Timestamp,
ctx context.Context, ID streampb.StreamID, _ hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
return streampb.StreamReplicationStatus{}, nil
}
Expand All @@ -66,7 +65,7 @@ func (sc testStreamClient) Close(ctx context.Context) error {

// Subscribe implements the Client interface.
func (sc testStreamClient) Subscribe(
ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
) (Subscription, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Expand All @@ -93,7 +92,7 @@ func (sc testStreamClient) Subscribe(

// Complete implements the streamclient.Client interface.
func (sc testStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
return nil
}
Expand Down
17 changes: 8 additions & 9 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ import (
"net/url"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand Down Expand Up @@ -65,13 +64,13 @@ var _ Client = &partitionedStreamClient{}
// Create implements Client interface.
func (p *partitionedStreamClient) Create(
ctx context.Context, tenantID roachpb.TenantID,
) (streaming.StreamID, error) {
) (streampb.StreamID, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()

p.mu.Lock()
defer p.mu.Unlock()
var streamID streaming.StreamID
var streamID streampb.StreamID
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64())
err := row.Scan(&streamID)
if err != nil {
Expand All @@ -92,7 +91,7 @@ func (p *partitionedStreamClient) Dial(ctx context.Context) error {

// Heartbeat implements Client interface.
func (p *partitionedStreamClient) Heartbeat(
ctx context.Context, streamID streaming.StreamID, consumed hlc.Timestamp,
ctx context.Context, streamID streampb.StreamID, consumed hlc.Timestamp,
) (streampb.StreamReplicationStatus, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Heartbeat")
defer sp.Finish()
Expand Down Expand Up @@ -126,7 +125,7 @@ func (p *partitionedStreamClient) postgresURL(servingAddr string) (url.URL, erro

// Plan implements Client interface.
func (p *partitionedStreamClient) Plan(
ctx context.Context, streamID streaming.StreamID,
ctx context.Context, streamID streampb.StreamID,
) (Topology, error) {
var spec streampb.ReplicationStreamSpec
{
Expand Down Expand Up @@ -184,7 +183,7 @@ func (p *partitionedStreamClient) Close(ctx context.Context) error {

// Subscribe implements Client interface.
func (p *partitionedStreamClient) Subscribe(
ctx context.Context, stream streaming.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
ctx context.Context, stream streampb.StreamID, spec SubscriptionToken, checkpoint hlc.Timestamp,
) (Subscription, error) {
_, sp := tracing.ChildSpan(ctx, "streamclient.Client.Subscribe")
defer sp.Finish()
Expand Down Expand Up @@ -215,7 +214,7 @@ func (p *partitionedStreamClient) Subscribe(

// Complete implements the streamclient.Client interface.
func (p *partitionedStreamClient) Complete(
ctx context.Context, streamID streaming.StreamID, successfulIngestion bool,
ctx context.Context, streamID streampb.StreamID, successfulIngestion bool,
) error {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Complete")
defer sp.Finish()
Expand All @@ -239,7 +238,7 @@ type partitionedStreamSubscription struct {

streamEvent *streampb.StreamEvent
specBytes []byte
streamID streaming.StreamID
streamID streampb.StreamID
}

var _ Subscription = (*partitionedStreamSubscription)(nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // Ensure we can start tenant.
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingtest"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" // Ensure we can start replication stream.
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
Expand Down Expand Up @@ -98,7 +97,7 @@ INSERT INTO d.t2 VALUES (2);
require.NoError(t, client.Close(ctx))
}()
require.NoError(t, err)
expectStreamState := func(streamID streaming.StreamID, status jobs.Status) {
expectStreamState := func(streamID streampb.StreamID, status jobs.Status) {
h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", streamID),
[][]string{{string(status)}})
}
Expand Down Expand Up @@ -212,7 +211,7 @@ INSERT INTO d.t2 VALUES (2);
})

// Testing client.Complete()
err = client.Complete(ctx, streaming.StreamID(999), true)
err = client.Complete(ctx, streampb.StreamID(999), true)
require.True(t, testutils.IsError(err, fmt.Sprintf("job %d: not found in system.jobs table", 999)), err)

// Makes producer job exit quickly.
Expand Down
Loading