diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal index deaca15f5489..10294182923a 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal +++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal @@ -168,8 +168,6 @@ subtest replication-builtins user root -query error pq: crdb_internal\.replication_stream_spec\(\): job.*is not a replication stream job -SELECT crdb_internal.replication_stream_spec(crdb_internal.create_sql_schema_telemetry_job()) query error pq: crdb_internal\.stream_ingestion_stats_json\(\): unimplemented SELECT crdb_internal.stream_ingestion_stats_json(1); diff --git a/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel b/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel index b43e9988be7d..38757682ac83 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel +++ b/pkg/ccl/streamingccl/replicationtestutils/BUILD.bazel @@ -43,6 +43,7 @@ go_library( "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/protoutil", + "//pkg/util/randutil", "//pkg/util/retry", "//pkg/util/timeutil", "@com_github_cockroachdb_apd_v3//:apd", diff --git a/pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go b/pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go index e1d8115cada7..cfe59d688581 100644 --- a/pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go +++ b/pkg/ccl/streamingccl/replicationtestutils/replication_helpers.go @@ -11,7 +11,9 @@ package replicationtestutils import ( "bytes" "context" + "math/rand" "net/url" + "os" "strings" "testing" "time" @@ -29,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -185,6 +188,8 @@ type ReplicationHelper struct { SysSQL *sqlutils.SQLRunner // PGUrl is the pgurl of this server. PGUrl url.URL + + rng *rand.Rand } // NewReplicationHelper starts test server with the required cluster settings for streming @@ -211,10 +216,14 @@ SET CLUSTER SETTING cross_cluster_replication.enabled = true; // Sink to read data from. sink, cleanupSink := sqlutils.PGUrl(t, s.AdvSQLAddr(), t.Name(), url.User(username.RootUser)) + rng, seed := randutil.NewPseudoRand() + t.Logf("Replication helper seed %d", seed) + h := &ReplicationHelper{ SysServer: s, SysSQL: sqlutils.MakeSQLRunner(db), PGUrl: sink, + rng: rng, } return h, func() { @@ -262,3 +271,35 @@ func (rh *ReplicationHelper) StartReplicationStream( require.NoError(t, err) return replicationProducerSpec } + +func (rh *ReplicationHelper) SetupSpanConfigsReplicationStream( + t *testing.T, sourceTenantName roachpb.TenantName, +) streampb.ReplicationStreamSpec { + var rawSpec []byte + row := rh.SysSQL.QueryRow(t, `SELECT crdb_internal.setup_span_configs_stream($1)`, sourceTenantName) + row.Scan(&rawSpec) + var spec streampb.ReplicationStreamSpec + err := protoutil.Unmarshal(rawSpec, &spec) + require.NoError(t, err) + return spec +} + +func (rh *ReplicationHelper) MaybeGenerateInlineURL(t *testing.T) *url.URL { + if rh.rng.Float64() > 0.5 { + return &rh.PGUrl + } + + t.Log("using inline certificates") + ret := rh.PGUrl + v := ret.Query() + for _, opt := range []string{"sslcert", "sslkey", "sslrootcert"} { + path := v.Get(opt) + content, err := os.ReadFile(path) + require.NoError(t, err) + v.Set(opt, string(content)) + + } + v.Set("sslinline", "true") + ret.RawQuery = v.Encode() + return &ret +} diff --git a/pkg/ccl/streamingccl/streamclient/client.go b/pkg/ccl/streamingccl/streamclient/client.go index 08ac8abe88bc..ae92c4ce3bb7 100644 --- a/pkg/ccl/streamingccl/streamclient/client.go +++ b/pkg/ccl/streamingccl/streamclient/client.go @@ -51,6 +51,12 @@ type Client interface { // can be used to interact with this stream in the future. Create(ctx context.Context, tenant roachpb.TenantName) (streampb.ReplicationProducerSpec, error) + // SetupSpanConfigsStream creates a stream for the span configs + // that apply to the passed in tenant, and returns the subscriptions the + // client can subscribe to. No protected timestamp or job is persisted to the + // source cluster. + SetupSpanConfigsStream(ctx context.Context, tenant roachpb.TenantName) (streampb.StreamID, Topology, error) + // Dial checks if the source is able to be connected to for queries Dial(ctx context.Context) error diff --git a/pkg/ccl/streamingccl/streamclient/client_test.go b/pkg/ccl/streamingccl/streamclient/client_test.go index a10987b25648..41e016d28c65 100644 --- a/pkg/ccl/streamingccl/streamclient/client_test.go +++ b/pkg/ccl/streamingccl/streamclient/client_test.go @@ -47,6 +47,13 @@ func (sc testStreamClient) Create( }, nil } +// SetupSpanConfigsStream implements the Client interface. +func (sc testStreamClient) SetupSpanConfigsStream( + ctx context.Context, tenant roachpb.TenantName, +) (streampb.StreamID, Topology, error) { + panic("not implemented") +} + // Plan implements the Client interface. func (sc testStreamClient) Plan(_ context.Context, _ streampb.StreamID) (Topology, error) { return Topology{ diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index c57eaf5b3f70..3dc3572d18ff 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -84,7 +84,6 @@ func (p *partitionedStreamClient) Create( ) (streampb.ReplicationProducerSpec, error) { ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.Create") defer sp.Finish() - p.mu.Lock() defer p.mu.Unlock() var rawReplicationProducerSpec []byte @@ -101,6 +100,30 @@ func (p *partitionedStreamClient) Create( return replicationProducerSpec, err } +func (p *partitionedStreamClient) SetupSpanConfigsStream( + ctx context.Context, tenantName roachpb.TenantName, +) (streampb.StreamID, Topology, error) { + ctx, sp := tracing.ChildSpan(ctx, "streamclient.Client.SetupSpanConfigsStream") + defer sp.Finish() + var spec streampb.ReplicationStreamSpec + + { + p.mu.Lock() + defer p.mu.Unlock() + + row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.setup_span_configs_stream($1)`, tenantName) + var rawSpec []byte + if err := row.Scan(&rawSpec); err != nil { + return 0, Topology{}, errors.Wrapf(err, "cannot setup span config replication stream for tenant %s", tenantName) + } + if err := protoutil.Unmarshal(rawSpec, &spec); err != nil { + return 0, Topology{}, err + } + } + topology, err := p.createTopology(spec) + return spec.SpanConfigStreamID, topology, err +} + // Dial implements Client interface. func (p *partitionedStreamClient) Dial(ctx context.Context) error { p.mu.Lock() @@ -161,7 +184,12 @@ func (p *partitionedStreamClient) Plan( return Topology{}, err } } + return p.createTopology(spec) +} +func (p *partitionedStreamClient) createTopology( + spec streampb.ReplicationStreamSpec, +) (Topology, error) { topology := Topology{ SourceTenantID: spec.SourceTenantID, } diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go index 14d448d2f5a3..f0990e189cb6 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go @@ -11,8 +11,6 @@ package streamclient_test import ( "context" "fmt" - "net/url" - "os" "strings" "testing" "time" @@ -35,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/lib/pq" @@ -62,6 +59,48 @@ func (f *subscriptionFeedSource) Error() error { // Close implements the streamingtest.FeedSource interface. func (f *subscriptionFeedSource) Close(ctx context.Context) {} +func TestPartitionedSpanConfigReplicationClient(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + h, cleanup := replicationtestutils.NewReplicationHelper(t, + base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + }, + ) + + defer cleanup() + + testTenantName := roachpb.TenantName("test-tenant") + tenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName) + defer cleanupTenant() + + ctx := context.Background() + + maybeInlineURL := h.MaybeGenerateInlineURL(t) + client, err := streamclient.NewPartitionedStreamClient(ctx, maybeInlineURL) + require.NoError(t, err) + defer func() { + require.NoError(t, client.Close(ctx)) + }() + + streamID, topology, err := client.SetupSpanConfigsStream(ctx, testTenantName) + require.NoError(t, err) + + require.NotEqual(t, 0, streamID) + require.Equal(t, 1, len(topology.Partitions)) + require.Equal(t, tenant.ID, topology.SourceTenantID) + + // Since a span config replication stream does not create a job, the HeartBeat + // call will deem the stream inactive. + status, err := client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}) + require.NoError(t, err) + require.Equal(t, streampb.StreamReplicationStatus_STREAM_INACTIVE, status.StreamStatus) +} + func TestPartitionedStreamReplicationClient(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -96,28 +135,7 @@ INSERT INTO d.t1 (i) VALUES (42); INSERT INTO d.t2 VALUES (2); `) - rng, _ := randutil.NewPseudoRand() - maybeGenerateInlineURL := func(orig *url.URL) *url.URL { - if rng.Float64() > 0.5 { - return orig - } - - t.Log("using inline certificates") - ret := *orig - v := ret.Query() - for _, opt := range []string{"sslcert", "sslkey", "sslrootcert"} { - path := v.Get(opt) - content, err := os.ReadFile(path) - require.NoError(t, err) - v.Set(opt, string(content)) - - } - v.Set("sslinline", "true") - ret.RawQuery = v.Encode() - return &ret - } - - maybeInlineURL := maybeGenerateInlineURL(&h.PGUrl) + maybeInlineURL := h.MaybeGenerateInlineURL(t) client, err := streamclient.NewPartitionedStreamClient(ctx, maybeInlineURL) defer func() { require.NoError(t, client.Close(ctx)) @@ -142,6 +160,11 @@ INSERT INTO d.t2 VALUES (2); _, err = client.Plan(ctx, 999) require.True(t, testutils.IsError(err, fmt.Sprintf("job with ID %d does not exist", 999)), err) + var telemetryJobID int64 + h.SysSQL.QueryRow(t, "SELECT crdb_internal.create_sql_schema_telemetry_job()").Scan(&telemetryJobID) + _, err = client.Plan(ctx, streampb.StreamID(telemetryJobID)) + require.True(t, testutils.IsError(err, fmt.Sprintf("job with id %d is not a replication stream job", telemetryJobID)), err) + expectStreamState(streamID, jobs.StatusRunning) status, err := client.Heartbeat(ctx, streamID, hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}) require.NoError(t, err) diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index 388c234e0e90..18dc12c329df 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -393,6 +393,13 @@ func (m *RandomStreamClient) Create( }, nil } +// SetupSpanConfigsStream implements the Client interface. +func (m *RandomStreamClient) SetupSpanConfigsStream( + ctx context.Context, tenant roachpb.TenantName, +) (streampb.StreamID, Topology, error) { + panic("SetupSpanConfigsStream not implemented") +} + // Heartbeat implements the Client interface. func (m *RandomStreamClient) Heartbeat( ctx context.Context, _ streampb.StreamID, ts hlc.Timestamp, diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go index 2bfab41fbfef..b91a23037e34 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_test.go @@ -71,6 +71,13 @@ func (m *mockStreamClient) Create( panic("unimplemented") } +// SetupSpanConfigsStream implements the Client interface. +func (m *mockStreamClient) SetupSpanConfigsStream( + ctx context.Context, tenant roachpb.TenantName, +) (streampb.StreamID, streamclient.Topology, error) { + panic("unimplemented") +} + // Dial implements the Client interface. func (m *mockStreamClient) Dial(_ context.Context) error { panic("unimplemented") diff --git a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel index 8394f2a1343e..00b9d07a1218 100644 --- a/pkg/ccl/streamingccl/streamproducer/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamproducer/BUILD.bazel @@ -32,10 +32,14 @@ go_library( "//pkg/security/username", "//pkg/settings/cluster", "//pkg/sql", + "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/descs", + "//pkg/sql/catalog/systemschema", "//pkg/sql/isql", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/privilege", + "//pkg/sql/sem/builtins", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", "//pkg/sql/syntheticprivilege", @@ -96,6 +100,7 @@ go_test( "//pkg/sql", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/desctestutils", + "//pkg/sql/catalog/systemschema", "//pkg/sql/distsql", "//pkg/sql/isql", "//pkg/sql/sem/eval", diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job.go b/pkg/ccl/streamingccl/streamproducer/producer_job.go index 6b17d69b804a..05cebd70fec6 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job.go @@ -29,9 +29,9 @@ import ( "github.com/cockroachdb/errors" ) -func makeTenantSpan(tenantID uint64) *roachpb.Span { +func makeTenantSpan(tenantID uint64) roachpb.Span { prefix := keys.MakeTenantPrefix(roachpb.MustMakeTenantID(tenantID)) - return &roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()} + return roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()} } func makeProducerJobRecord( @@ -47,7 +47,7 @@ func makeProducerJobRecord( Username: user, Details: jobspb.StreamReplicationDetails{ ProtectedTimestampRecordID: ptsID, - Spans: []*roachpb.Span{makeTenantSpan(tenantID)}, + Spans: []roachpb.Span{makeTenantSpan(tenantID)}, TenantID: roachpb.MustMakeTenantID(tenantID), }, Progress: jobspb.StreamReplicationProgress{ diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go index b3d6b651e0da..6dd97580c316 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job_test.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job_test.go @@ -160,7 +160,7 @@ func TestStreamReplicationProducerJob(t *testing.T) { return insqlDB.Txn(ctx, func( ctx context.Context, txn isql.Txn, ) error { - deprecatedTenantSpan := roachpb.Spans{*makeTenantSpan(30)} + deprecatedTenantSpan := roachpb.Spans{makeTenantSpan(30)} tenantTarget := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MustMakeTenantID(30)}) record := jobsprotectedts.MakeRecord( ptsID, int64(jr.JobID), ts, deprecatedTenantSpan, diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager.go b/pkg/ccl/streamingccl/streamproducer/replication_manager.go index ed263951ba21..485ebaf32a59 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager.go @@ -55,7 +55,7 @@ func (r *replicationStreamManagerImpl) StreamPartition( func (r *replicationStreamManagerImpl) GetReplicationStreamSpec( ctx context.Context, streamID streampb.StreamID, ) (*streampb.ReplicationStreamSpec, error) { - return getReplicationStreamSpec(ctx, r.evalCtx, streamID) + return getReplicationStreamSpec(ctx, r.evalCtx, r.txn, streamID) } // CompleteReplicationStream implements ReplicationStreamManager interface. @@ -65,6 +65,12 @@ func (r *replicationStreamManagerImpl) CompleteReplicationStream( return completeReplicationStream(ctx, r.evalCtx, r.txn, streamID, successfulIngestion) } +func (r *replicationStreamManagerImpl) SetupSpanConfigsStream( + ctx context.Context, tenantName roachpb.TenantName, +) (*streampb.ReplicationStreamSpec, error) { + return setupSpanConfigsStream(ctx, r.evalCtx, r.txn, tenantName) +} + func newReplicationStreamManagerWithPrivilegesCheck( ctx context.Context, evalCtx *eval.Context, txn isql.Txn, ) (eval.ReplicationStreamManager, error) { diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go index 1b2907aa41b6..a4af091fa5df 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" @@ -229,6 +230,38 @@ func testStreamReplicationStatus( checkStreamStatus(t, hlc.MaxTimestamp, expectedStreamStatus) } +func TestSpanConfigReplicationStreamSetup(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + serverArgs := base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + }, + } + + h, cleanup := replicationtestutils.NewReplicationHelper(t, serverArgs) + defer cleanup() + testTenantName := roachpb.TenantName("test-tenant") + h.SysSQL.Exec(t, "CREATE TENANT $1", testTenantName) + specs := h.SetupSpanConfigsReplicationStream(t, testTenantName) + + var spanConfigTableID uint32 + h.SysSQL.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`, + systemschema.SpanConfigurationsTableName.Table()).Scan(&spanConfigTableID) + codec := keys.MakeSQLCodec(roachpb.SystemTenantID) + spanConfigKey := codec.TablePrefix(spanConfigTableID) + expectedSpan := roachpb.Span{Key: spanConfigKey, EndKey: spanConfigKey.PrefixEnd()} + + require.Equal(t, 1, len(specs.Partitions)) + require.Equal(t, 1, len(specs.Partitions[0].PartitionSpec.Spans)) + + require.NotEqual(t, 0, specs.SpanConfigStreamID) + + require.Equal(t, expectedSpan, specs.Partitions[0].PartitionSpec.Spans[0]) + +} func TestReplicationStreamInitialization(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 37e1ae206eae..7ef4b035b8d5 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -22,7 +22,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/repstream/streampb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -64,7 +68,7 @@ func startReplicationProducerJob( statementTime := hlc.Timestamp{ WallTime: evalCtx.GetStmtTimestamp().UnixNano(), } - deprecatedSpansToProtect := roachpb.Spans{*makeTenantSpan(tenantID)} + deprecatedSpansToProtect := roachpb.Spans{makeTenantSpan(tenantID)} targetToProtect := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MustMakeTenantID(tenantID)}) pts := jobsprotectedts.MakeRecord(ptsID, int64(jr.JobID), statementTime, deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect) @@ -184,7 +188,7 @@ func heartbeatReplicationStream( // job progress. if frontier == hlc.MaxTimestamp { var status streampb.StreamReplicationStatus - pj, err := execConfig.JobRegistry.LoadJob(ctx, jobspb.JobID(streamID)) + pj, err := execConfig.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(streamID), txn) if jobs.HasJobNotFoundError(err) || testutils.IsError(err, "not found in system.jobs table") { status.StreamStatus = streampb.StreamReplicationStatus_STREAM_INACTIVE return status, nil @@ -215,41 +219,55 @@ func heartbeatReplicationStream( // getReplicationStreamSpec gets a replication stream specification for the specified stream. func getReplicationStreamSpec( - ctx context.Context, evalCtx *eval.Context, streamID streampb.StreamID, + ctx context.Context, evalCtx *eval.Context, txn isql.Txn, streamID streampb.StreamID, ) (*streampb.ReplicationStreamSpec, error) { jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext) // Returns error if the replication stream is not active - j, err := jobExecCtx.ExecCfg().JobRegistry.LoadJob(ctx, jobspb.JobID(streamID)) + j, err := jobExecCtx.ExecCfg().JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(streamID), txn) if err != nil { - return nil, errors.Wrapf(err, "replication stream %d has error", streamID) + return nil, errors.Wrapf(err, "could not load job for replication stream %d", streamID) } if j.Status() != jobs.StatusRunning { return nil, errors.Errorf("replication stream %d is not running", streamID) } + details, ok := j.Details().(jobspb.StreamReplicationDetails) + if !ok { + return nil, errors.Errorf("job with id %d is not a replication stream job", streamID) + } + return buildReplicationStreamSpec(ctx, evalCtx, details.TenantID, false, details.Spans) + +} + +func buildReplicationStreamSpec( + ctx context.Context, + evalCtx *eval.Context, + tenantID roachpb.TenantID, + forSpanConfigs bool, + targetSpans roachpb.Spans, +) (*streampb.ReplicationStreamSpec, error) { + jobExecCtx := evalCtx.JobExecContext.(sql.JobExecContext) // Partition the spans with SQLPlanner dsp := jobExecCtx.DistSQLPlanner() planCtx := dsp.NewPlanningCtx(ctx, jobExecCtx.ExtendedEvalContext(), nil /* planner */, nil /* txn */, sql.DistributionTypeSystemTenantOnly) - details, ok := j.Details().(jobspb.StreamReplicationDetails) - if !ok { - return nil, errors.Errorf("job with id %d is not a replication stream job", streamID) - } - replicatedSpans := details.Spans - spans := make([]roachpb.Span, 0, len(replicatedSpans)) - for _, span := range replicatedSpans { - spans = append(spans, *span) - } - spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, spans) + spanPartitions, err := dsp.PartitionSpans(ctx, planCtx, targetSpans) if err != nil { return nil, err } + var spanConfigsStreamID streampb.StreamID + if forSpanConfigs { + spanConfigsStreamID = streampb.StreamID(builtins.GenerateUniqueInt(builtins.ProcessUniqueID(evalCtx.NodeID.SQLInstanceID()))) + } + res := &streampb.ReplicationStreamSpec{ - Partitions: make([]streampb.ReplicationStreamSpec_Partition, 0, len(spanPartitions)), - SourceTenantID: details.TenantID, + Partitions: make([]streampb.ReplicationStreamSpec_Partition, 0, len(spanPartitions)), + SourceTenantID: tenantID, + SpanConfigStreamID: spanConfigsStreamID, } + for _, sp := range spanPartitions { nodeInfo, err := dsp.GetSQLInstanceInfo(sp.SQLInstanceID) if err != nil { @@ -307,3 +325,35 @@ func completeReplicationStream( return nil }) } + +func setupSpanConfigsStream( + ctx context.Context, evalCtx *eval.Context, txn isql.Txn, tenantName roachpb.TenantName, +) (*streampb.ReplicationStreamSpec, error) { + + tenantRecord, err := sql.GetTenantRecordByName(ctx, evalCtx.Settings, txn, tenantName) + if err != nil { + return nil, err + } + tenantID := roachpb.MustMakeTenantID(tenantRecord.ID) + var spanConfigID descpb.ID + execConfig := evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig) + + if err := sql.DescsTxn(ctx, execConfig, func(ctx context.Context, txn isql.Txn, col *descs.Collection) error { + g := col.ByName(txn.KV()).Get() + _, imm, err := descs.PrefixAndTable(ctx, g, systemschema.SpanConfigurationsTableName) + if err != nil { + return err + } + spanConfigID = imm.GetID() + return nil + }); err != nil { + return nil, err + } + spanConfigKey := evalCtx.Codec.TablePrefix(uint32(spanConfigID)) + + // TODO(msbutler): crop this span to the keyspan within the span config + // table relevant to this specific tenant once I teach the client.Subscribe() + // to stream span configs, which will make testing easier. + span := roachpb.Span{Key: spanConfigKey, EndKey: spanConfigKey.PrefixEnd()} + return buildReplicationStreamSpec(ctx, evalCtx, tenantID, true, roachpb.Spans{span}) +} diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 527ac77cbb0b..483e3a9fa25f 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -184,7 +184,7 @@ message StreamIngestionProgress { message StreamReplicationDetails { // Key spans we are replicating - repeated roachpb.Span spans = 1; + repeated roachpb.Span spans = 1 [(gogoproto.nullable) = false]; // ID of the protected timestamp record that protects the above spans bytes protected_timestamp_record_id = 2 [ diff --git a/pkg/repstream/streampb/stream.proto b/pkg/repstream/streampb/stream.proto index 335bd5f56aea..5ded42130907 100644 --- a/pkg/repstream/streampb/stream.proto +++ b/pkg/repstream/streampb/stream.proto @@ -92,6 +92,10 @@ message ReplicationStreamSpec { repeated Partition partitions = 1 [(gogoproto.nullable) = false]; roachpb.TenantID source_tenant_id = 2 [(gogoproto.nullable) = false, (gogoproto.customname) = "SourceTenantID"]; + + int64 span_config_stream_id = 3 [(gogoproto.customname) = "SpanConfigStreamID", (gogoproto + .casttype) = "StreamID"]; + } // StreamEvent describes a replication stream event diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index e458af695f38..16874d3b1484 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -6991,6 +6991,8 @@ func TestCheckConstraintDropAndColumn(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) ctx := context.Background() + + // jobControlMu guards changes to the shared delayJobChannels array. var jobControlMu syncutil.Mutex var delayJobList []string var delayJobChannels []chan struct{} @@ -7002,10 +7004,14 @@ func TestCheckConstraintDropAndColumn(t *testing.T) { params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ RunBeforeResume: func(jobID jobspb.JobID) error { + // We cannot use defer jobControlMu.Unlock within this routine + // as we need to unlock the jobControlMu conditionally prior to waiting on + // `channel` below. lockHeld := true jobControlMu.Lock() scJob, err := s.JobRegistry().(*jobs.Registry).LoadJob(ctx, jobID) if err != nil { + jobControlMu.Unlock() return err } pl := scJob.Payload() diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go index fd3dd7e2b6db..c7f2580f7ac0 100644 --- a/pkg/sql/sem/builtins/fixed_oids.go +++ b/pkg/sql/sem/builtins/fixed_oids.go @@ -2436,6 +2436,7 @@ var builtinOidsArray = []string{ 2463: `workload_index_recs(timestamptz: timestamptz) -> string`, 2464: `workload_index_recs(budget: string) -> string`, 2465: `workload_index_recs(timestamptz: timestamptz, budget: string) -> string`, + 2466: `crdb_internal.setup_span_configs_stream(tenant_name: string) -> bytes`, } var builtinOidsBySignature map[string]oid.Oid diff --git a/pkg/sql/sem/builtins/replication_builtins.go b/pkg/sql/sem/builtins/replication_builtins.go index c83d2a5afe82..f2859913a6e3 100644 --- a/pkg/sql/sem/builtins/replication_builtins.go +++ b/pkg/sql/sem/builtins/replication_builtins.go @@ -294,4 +294,36 @@ var replicationBuiltins = map[string]builtinDefinition{ Volatility: volatility.Volatile, }, ), + "crdb_internal.setup_span_configs_stream": makeBuiltin( + tree.FunctionProperties{ + Category: builtinconstants.CategoryStreamIngestion, + Undocumented: true, + DistsqlBlocklist: true, + }, + tree.Overload{ + Types: tree.ParamTypes{ + {Name: "tenant_name", Typ: types.String}, + }, + ReturnType: tree.FixedReturnType(types.Bytes), + Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) + if err != nil { + return nil, err + } + tenantName := string(tree.MustBeDString(args[0])) + spec, err := mgr.SetupSpanConfigsStream(ctx, roachpb.TenantName(tenantName)) + if err != nil { + return nil, err + } + rawSpec, err := protoutil.Marshal(spec) + if err != nil { + return nil, err + } + return tree.NewDBytes(tree.DBytes(rawSpec)), err + }, + Info: "This function can be used on the consumer side to setup a replication stream for " + + "the span configs of the tenant. The client can then run 'stream_partition' on a partition with the returned spec", + Volatility: volatility.Volatile, + }, + ), } diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index 6b03eb222cd2..5468469cb2f2 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -807,6 +807,9 @@ type ReplicationStreamManager interface { // tenant on the producer side. StartReplicationStream(ctx context.Context, tenantName roachpb.TenantName) (streampb.ReplicationProducerSpec, error) + // SetupSpanConfigsStream creates and plans a replication stream to stream the span config updates for a specific tenant. + SetupSpanConfigsStream(ctx context.Context, tenantName roachpb.TenantName) (*streampb.ReplicationStreamSpec, error) + // HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating // consumer has consumed until the given 'frontier' timestamp. This updates the producer job // progress and extends its life, and the new producer progress will be returned.