Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107671: c2c: add SetupSpanConfigsStream(tenantName) to stream client r=stevendanna a=msbutler

This patch adds a new call to the stream client, which uses the new
crdb_internal.setup_span_configs_stream() builtin, to create a replication
stream specification to stream span config updates relevant to the replicating
tenant. This new client call essentially combines the client.Create() and
client.Plan() into one call, as it returns a streamID and topology to the user.
Note this new call differs from client.Create() as it neither creates a job nor
lays a protected timestamp on the source cluster. In other words, the user
should treat this span configuration stream as ephemeral.

A future PR will teach client.Subscribe to stream these updates and ingest them
into the destination.

Informs #106823

Release note: None

107915: sql: Fix mutex leak within TestCheckConstraintDropAndColumn r=rimadeodhar a=rimadeodhar

The test does not unlock the jobControlMu mutex in the case of an error. This PR fixes that.

Epic: none
Fixes: #107433
Release note: none

Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: rimadeodhar <[email protected]>
  • Loading branch information
3 people committed Aug 2, 2023
3 parents 00abaf4 + ffea5af + e578000 commit 32f846f
Show file tree
Hide file tree
Showing 21 changed files with 309 additions and 51 deletions.
2 changes: 0 additions & 2 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ subtest replication-builtins

user root

query error pq: crdb_internal\.replication_stream_spec\(\): job.*is not a replication stream job
SELECT crdb_internal.replication_stream_spec(crdb_internal.create_sql_schema_telemetry_job())

query error pq: crdb_internal\.stream_ingestion_stats_json\(\): unimplemented
SELECT crdb_internal.stream_ingestion_stats_json(1);
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ go_library(
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/timeutil",
"@com_github_cockroachdb_apd_v3//:apd",
Expand Down
41 changes: 41 additions & 0 deletions pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ package replicationtestutils
import (
"bytes"
"context"
"math/rand"
"net/url"
"os"
"strings"
"testing"
"time"
Expand All @@ -29,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -185,6 +188,8 @@ type ReplicationHelper struct {
SysSQL *sqlutils.SQLRunner
// PGUrl is the pgurl of this server.
PGUrl url.URL

rng *rand.Rand
}

// NewReplicationHelper starts test server with the required cluster settings for streming
Expand All @@ -211,10 +216,14 @@ SET CLUSTER SETTING cross_cluster_replication.enabled = true;
// Sink to read data from.
sink, cleanupSink := sqlutils.PGUrl(t, s.AdvSQLAddr(), t.Name(), url.User(username.RootUser))

rng, seed := randutil.NewPseudoRand()
t.Logf("Replication helper seed %d", seed)

h := &ReplicationHelper{
SysServer: s,
SysSQL: sqlutils.MakeSQLRunner(db),
PGUrl: sink,
rng: rng,
}

return h, func() {
Expand Down Expand Up @@ -262,3 +271,35 @@ func (rh *ReplicationHelper) StartReplicationStream(
require.NoError(t, err)
return replicationProducerSpec
}

func (rh *ReplicationHelper) SetupSpanConfigsReplicationStream(
t *testing.T, sourceTenantName roachpb.TenantName,
) streampb.ReplicationStreamSpec {
var rawSpec []byte
row := rh.SysSQL.QueryRow(t, `SELECT crdb_internal.setup_span_configs_stream($1)`, sourceTenantName)
row.Scan(&rawSpec)
var spec streampb.ReplicationStreamSpec
err := protoutil.Unmarshal(rawSpec, &spec)
require.NoError(t, err)
return spec
}

func (rh *ReplicationHelper) MaybeGenerateInlineURL(t *testing.T) *url.URL {
if rh.rng.Float64() > 0.5 {
return &rh.PGUrl
}

t.Log("using inline certificates")
ret := rh.PGUrl
v := ret.Query()
for _, opt := range []string{"sslcert", "sslkey", "sslrootcert"} {
path := v.Get(opt)
content, err := os.ReadFile(path)
require.NoError(t, err)
v.Set(opt, string(content))

}
v.Set("sslinline", "true")
ret.RawQuery = v.Encode()
return &ret
}
6 changes: 6 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type Client interface {
// can be used to interact with this stream in the future.
Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error)

// SetupSpanConfigsStream creates a stream for the span configs
// that apply to the passed in tenant, and returns the subscriptions the
// client can subscribe to. No protected timestamp or job is persisted to the
// source cluster.
SetupSpanConfigsStream(ctx context.Context, tenant roachpb.TenantName) (streampb.StreamID, Topology, error)

// Dial checks if the source is able to be connected to for queries
Dial(ctx context.Context) error

Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ func (sc testStreamClient) Create(
}, nil
}

// SetupSpanConfigsStream implements the Client interface.
func (sc testStreamClient) SetupSpanConfigsStream(
ctx context.Context, tenant roachpb.TenantName,
) (streampb.StreamID, Topology, error) {
panic("not implemented")
}

// Plan implements the Client interface.
func (sc testStreamClient) Plan(_ context.Context, _ streampb.StreamID) (Topology, error) {
return Topology{
Expand Down
30 changes: 29 additions & 1 deletion pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (p *partitionedStreamClient) Create(
) (streampb.ReplicationProducerSpec, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create")
defer sp.Finish()

p.mu.Lock()
defer p.mu.Unlock()
var rawReplicationProducerSpec []byte
Expand All @@ -101,6 +100,30 @@ func (p *partitionedStreamClient) Create(
return replicationProducerSpec, err
}

func (p *partitionedStreamClient) SetupSpanConfigsStream(
ctx context.Context, tenantName roachpb.TenantName,
) (streampb.StreamID, Topology, error) {
ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.SetupSpanConfigsStream")
defer sp.Finish()
var spec streampb.ReplicationStreamSpec

{
p.mu.Lock()
defer p.mu.Unlock()

row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.setup_span_configs_stream($1)`, tenantName)
var rawSpec []byte
if err := row.Scan(&rawSpec); err != nil {
return 0, Topology{}, errors.Wrapf(err, "cannot setup span config replication stream for tenant %s", tenantName)
}
if err := protoutil.Unmarshal(rawSpec, &spec); err != nil {
return 0, Topology{}, err
}
}
topology, err := p.createTopology(spec)
return spec.SpanConfigStreamID, topology, err
}

// Dial implements Client interface.
func (p *partitionedStreamClient) Dial(ctx context.Context) error {
p.mu.Lock()
Expand Down Expand Up @@ -161,7 +184,12 @@ func (p *partitionedStreamClient) Plan(
return Topology{}, err
}
}
return p.createTopology(spec)
}

func (p *partitionedStreamClient) createTopology(
spec streampb.ReplicationStreamSpec,
) (Topology, error) {
topology := Topology{
SourceTenantID: spec.SourceTenantID,
}
Expand Down
73 changes: 48 additions & 25 deletions pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ package streamclient_test
import (
"context"
"fmt"
"net/url"
"os"
"strings"
"testing"
"time"
Expand All @@ -35,7 +33,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/lib/pq"
Expand All @@ -62,6 +59,48 @@ func (f *subscriptionFeedSource) Error() error {
// Close implements the streamingtest.FeedSource interface.
func (f *subscriptionFeedSource) Close(ctx context.Context) {}

func TestPartitionedSpanConfigReplicationClient(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

h, cleanup := replicationtestutils.NewReplicationHelper(t,
base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
},
)

defer cleanup()

testTenantName := roachpb.TenantName("test-tenant")
tenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName)
defer cleanupTenant()

ctx := context.Background()

maybeInlineURL := h.MaybeGenerateInlineURL(t)
client, err := streamclient.NewPartitionedStreamClient(ctx, maybeInlineURL)
require.NoError(t, err)
defer func() {
require.NoError(t, client.Close(ctx))
}()

streamID, topology, err := client.SetupSpanConfigsStream(ctx, testTenantName)
require.NoError(t, err)

require.NotEqual(t, 0, streamID)
require.Equal(t, 1, len(topology.Partitions))
require.Equal(t, tenant.ID, topology.SourceTenantID)

// Since a span config replication stream does not create a job, the HeartBeat
// call will deem the stream inactive.
status, err := client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.NoError(t, err)
require.Equal(t, streampb.StreamReplicationStatus_STREAM_INACTIVE, status.StreamStatus)
}

func TestPartitionedStreamReplicationClient(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -96,28 +135,7 @@ INSERT INTO d.t1 (i) VALUES (42);
INSERT INTO d.t2 VALUES (2);
`)

rng, _ := randutil.NewPseudoRand()
maybeGenerateInlineURL := func(orig *url.URL) *url.URL {
if rng.Float64() > 0.5 {
return orig
}

t.Log("using inline certificates")
ret := *orig
v := ret.Query()
for _, opt := range []string{"sslcert", "sslkey", "sslrootcert"} {
path := v.Get(opt)
content, err := os.ReadFile(path)
require.NoError(t, err)
v.Set(opt, string(content))

}
v.Set("sslinline", "true")
ret.RawQuery = v.Encode()
return &ret
}

maybeInlineURL := maybeGenerateInlineURL(&h.PGUrl)
maybeInlineURL := h.MaybeGenerateInlineURL(t)
client, err := streamclient.NewPartitionedStreamClient(ctx, maybeInlineURL)
defer func() {
require.NoError(t, client.Close(ctx))
Expand All @@ -142,6 +160,11 @@ INSERT INTO d.t2 VALUES (2);
_, err = client.Plan(ctx, 999)
require.True(t, testutils.IsError(err, fmt.Sprintf("job with ID %d does not exist", 999)), err)

var telemetryJobID int64
h.SysSQL.QueryRow(t, "SELECT crdb_internal.create_sql_schema_telemetry_job()").Scan(&telemetryJobID)
_, err = client.Plan(ctx, streampb.StreamID(telemetryJobID))
require.True(t, testutils.IsError(err, fmt.Sprintf("job with id %d is not a replication stream job", telemetryJobID)), err)

expectStreamState(streamID, jobs.StatusRunning)
status, err := client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
require.NoError(t, err)
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,13 @@ func (m *RandomStreamClient) Create(
}, nil
}

// SetupSpanConfigsStream implements the Client interface.
func (m *RandomStreamClient) SetupSpanConfigsStream(
ctx context.Context, tenant roachpb.TenantName,
) (streampb.StreamID, Topology, error) {
panic("SetupSpanConfigsStream not implemented")
}

// Heartbeat implements the Client interface.
func (m *RandomStreamClient) Heartbeat(
ctx context.Context, _ streampb.StreamID, ts hlc.Timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ func (m *mockStreamClient) Create(
panic("unimplemented")
}

// SetupSpanConfigsStream implements the Client interface.
func (m *mockStreamClient) SetupSpanConfigsStream(
ctx context.Context, tenant roachpb.TenantName,
) (streampb.StreamID, streamclient.Topology, error) {
panic("unimplemented")
}

// Dial implements the Client interface.
func (m *mockStreamClient) Dial(_ context.Context) error {
panic("unimplemented")
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ go_library(
"//pkg/security/username",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/isql",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/privilege",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/syntheticprivilege",
Expand Down Expand Up @@ -96,6 +100,7 @@ go_test(
"//pkg/sql",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/distsql",
"//pkg/sql/isql",
"//pkg/sql/sem/eval",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (
"github.com/cockroachdb/errors"
)

func makeTenantSpan(tenantID uint64) *roachpb.Span {
func makeTenantSpan(tenantID uint64) roachpb.Span {
prefix := keys.MakeTenantPrefix(roachpb.MustMakeTenantID(tenantID))
return &roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}
return roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}
}

func makeProducerJobRecord(
Expand All @@ -47,7 +47,7 @@ func makeProducerJobRecord(
Username: user,
Details: jobspb.StreamReplicationDetails{
ProtectedTimestampRecordID: ptsID,
Spans: []*roachpb.Span{makeTenantSpan(tenantID)},
Spans: []roachpb.Span{makeTenantSpan(tenantID)},
TenantID: roachpb.MustMakeTenantID(tenantID),
},
Progress: jobspb.StreamReplicationProgress{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamproducer/producer_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestStreamReplicationProducerJob(t *testing.T) {
return insqlDB.Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) error {
deprecatedTenantSpan := roachpb.Spans{*makeTenantSpan(30)}
deprecatedTenantSpan := roachpb.Spans{makeTenantSpan(30)}
tenantTarget := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MustMakeTenantID(30)})
record := jobsprotectedts.MakeRecord(
ptsID, int64(jr.JobID), ts, deprecatedTenantSpan,
Expand Down
8 changes: 7 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (r *replicationStreamManagerImpl) StreamPartition(
func (r *replicationStreamManagerImpl) GetReplicationStreamSpec(
ctx context.Context, streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error) {
return getReplicationStreamSpec(ctx, r.evalCtx, streamID)
return getReplicationStreamSpec(ctx, r.evalCtx, r.txn, streamID)
}

// CompleteReplicationStream implements ReplicationStreamManager interface.
Expand All @@ -65,6 +65,12 @@ func (r *replicationStreamManagerImpl) CompleteReplicationStream(
return completeReplicationStream(ctx, r.evalCtx, r.txn, streamID, successfulIngestion)
}

func (r *replicationStreamManagerImpl) SetupSpanConfigsStream(
ctx context.Context, tenantName roachpb.TenantName,
) (*streampb.ReplicationStreamSpec, error) {
return setupSpanConfigsStream(ctx, r.evalCtx, r.txn, tenantName)
}

func newReplicationStreamManagerWithPrivilegesCheck(
ctx context.Context, evalCtx *eval.Context, txn isql.Txn,
) (eval.ReplicationStreamManager, error) {
Expand Down
Loading

0 comments on commit 32f846f

Please sign in to comment.