diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md index b64bcf534a31..be6e29cc0a42 100644 --- a/docs/generated/sql/functions.md +++ b/docs/generated/sql/functions.md @@ -2652,6 +2652,8 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
crdb_internal.start_replication_stream(tenant_id: int) → int
This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.
crdb_internal.start_replication_stream(tenant_name: string) → int
This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.
+crdb_internal.stream_ingestion_stats_json(job_id: int) → jsonb
This function can be used on the ingestion side to get a statistics summary of a stream ingestion job in json format.
crdb_internal.stream_ingestion_stats_pb(job_id: int) → bytes
This function can be used on the ingestion side to get a statistics summary of a stream ingestion job in protobuf format.
diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index d5e95809490c..c2675e34bd7f 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -228,6 +228,8 @@ func DefaultTestTempStorageConfigWithSize( // TestTenantArgs are the arguments used when creating a tenant from a // TestServer. type TestTenantArgs struct { + TenantName roachpb.TenantName + TenantID roachpb.TenantID // Existing, if true, indicates an existing tenant, rather than a new tenant diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go index 814313f860e8..6b061ae159ea 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client.go @@ -71,7 +71,8 @@ func (p *partitionedStreamClient) Create( p.mu.Lock() defer p.mu.Unlock() var streamID streampb.StreamID - row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64()) + row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1::INT)`, + tenantID.ToUint64()) err := row.Scan(&streamID) if err != nil { return streampb.InvalidStreamID, diff --git a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go index 2e1191f32cdd..f9e1c774f414 100644 --- a/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go +++ b/pkg/ccl/streamingccl/streamclient/partitioned_stream_client_test.go @@ -76,7 +76,8 @@ func TestPartitionedStreamReplicationClient(t *testing.T) { defer cleanup() - tenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID()) + testTenantName := roachpb.TenantName("test-tenant") + tenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName) defer cleanupTenant() ctx := context.Background() diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 7b32c2778978..32768c8b15c9 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -139,7 +139,7 @@ func ingestionPlanHook( } // Create a new tenant for the replication stream - if _, err := sql.GetTenantRecord(ctx, p.ExecCfg(), p.Txn(), newTenantID.ToUint64()); err == nil { + if _, err := sql.GetTenantRecordByID(ctx, p.ExecCfg(), p.Txn(), newTenantID); err == nil { return errors.Newf("tenant with id %s already exists", newTenantID) } tenantInfo := &descpb.TenantInfoWithUsage{ diff --git a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go index 6321c3cd133c..e99019b02b8b 100644 --- a/pkg/ccl/streamingccl/streamingtest/replication_helpers.go +++ b/pkg/ccl/streamingccl/streamingtest/replication_helpers.go @@ -181,6 +181,8 @@ func (rf *ReplicationFeed) consumeUntil( // TenantState maintains test state related to tenant. type TenantState struct { + // Name is the name of the tenant. + Name roachpb.TenantName // ID is the ID of the tenant. ID roachpb.TenantID // Codec is the Codec of the tenant. @@ -240,10 +242,14 @@ SET CLUSTER SETTING sql.defaults.experimental_stream_replication.enabled = 'on'; // CreateTenant creates a tenant under the replication helper's server func (rh *ReplicationHelper) CreateTenant( - t *testing.T, tenantID roachpb.TenantID, + t *testing.T, tenantID roachpb.TenantID, tenantName roachpb.TenantName, ) (TenantState, func()) { - _, tenantConn := serverutils.StartTenant(t, rh.SysServer, base.TestTenantArgs{TenantID: tenantID}) + _, tenantConn := serverutils.StartTenant(t, rh.SysServer, base.TestTenantArgs{ + TenantID: tenantID, + TenantName: tenantName, + }) return TenantState{ + Name: tenantName, ID: tenantID, Codec: keys.MakeSQLCodec(tenantID), SQL: sqlutils.MakeSQLRunner(tenantConn), diff --git a/pkg/ccl/streamingccl/streamproducer/replication_manager.go b/pkg/ccl/streamingccl/streamproducer/replication_manager.go index 0848ee817ff0..c51b50a3e77a 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_manager.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_manager.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/repstream" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -34,6 +35,13 @@ func (r *replicationStreamManagerImpl) StartReplicationStream( return startReplicationStreamJob(ctx, r.evalCtx, r.txn, tenantID) } +// StartReplicationStreamByName implements streaming.ReplicationStreamManager interface. +func (r *replicationStreamManagerImpl) StartReplicationStreamByName( + ctx context.Context, tenantName roachpb.TenantName, +) (streampb.StreamID, error) { + return startReplicationStreamJobByName(ctx, r.evalCtx, r.txn, tenantName) +} + // HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface. func (r *replicationStreamManagerImpl) HeartbeatReplicationStream( ctx context.Context, streamID streampb.StreamID, frontier hlc.Timestamp, diff --git a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go index 18d6617c066a..2e8ef86d8d63 100644 --- a/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go +++ b/pkg/ccl/streamingccl/streamproducer/replication_stream_test.go @@ -241,14 +241,15 @@ func TestReplicationStreamInitialization(t *testing.T) { h, cleanup := streamingtest.NewReplicationHelper(t, serverArgs) defer cleanup() - srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID()) + testTenantName := roachpb.TenantName("test-tenant") + srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName) defer cleanupTenant() // Makes the stream time out really soon h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '10ms'") h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '1ms'") t.Run("failed-after-timeout", func(t *testing.T) { - rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64()) + rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName) streamID := rows[0][0] h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %s", streamID), @@ -259,7 +260,7 @@ func TestReplicationStreamInitialization(t *testing.T) { // Make sure the stream does not time out within the test timeout h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '500s'") t.Run("continuously-running-within-timeout", func(t *testing.T) { - rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64()) + rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName) streamID := rows[0][0] h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %s", streamID), @@ -329,7 +330,8 @@ func TestStreamPartition(t *testing.T) { DisableDefaultTestTenant: true, }) defer cleanup() - srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID()) + testTenantName := roachpb.TenantName("test-tenant") + srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName) defer cleanupTenant() srcTenant.SQL.Exec(t, ` @@ -342,7 +344,7 @@ USE d; `) ctx := context.Background() - rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64()) + rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName) streamID := rows[0][0] const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)` @@ -474,7 +476,8 @@ func TestStreamAddSSTable(t *testing.T) { DisableDefaultTestTenant: true, }) defer cleanup() - srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID()) + testTenantName := roachpb.TenantName("test-tenant") + srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName) defer cleanupTenant() srcTenant.SQL.Exec(t, ` @@ -485,7 +488,7 @@ USE d; `) ctx := context.Background() - rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64()) + rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName) streamID := rows[0][0] const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)` @@ -561,7 +564,8 @@ func TestCompleteStreamReplication(t *testing.T) { }) defer cleanup() srcTenantID := serverutils.TestTenantID() - _, cleanupTenant := h.CreateTenant(t, srcTenantID) + testTenantName := roachpb.TenantName("test-tenant") + _, cleanupTenant := h.CreateTenant(t, srcTenantID, testTenantName) defer cleanupTenant() // Make the producer job times out fast and fastly tracks ingestion cutover signal. @@ -571,7 +575,7 @@ func TestCompleteStreamReplication(t *testing.T) { var timedOutStreamID int row := h.SysSQL.QueryRow(t, - "SELECT crdb_internal.start_replication_stream($1)", srcTenantID.ToUint64()) + "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName) row.Scan(&timedOutStreamID) jobutils.WaitForJobToFail(t, h.SysSQL, jobspb.JobID(timedOutStreamID)) @@ -585,7 +589,7 @@ func TestCompleteStreamReplication(t *testing.T) { // Create a new replication stream and complete it. var streamID int row := h.SysSQL.QueryRow(t, - "SELECT crdb_internal.start_replication_stream($1)", srcTenantID.ToUint64()) + "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName) row.Scan(&streamID) jobutils.WaitForJobToRun(t, h.SysSQL, jobspb.JobID(streamID)) h.SysSQL.Exec(t, "SELECT crdb_internal.complete_replication_stream($1, $2)", @@ -644,7 +648,8 @@ func TestStreamDeleteRange(t *testing.T) { DisableDefaultTestTenant: true, }) defer cleanup() - srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID()) + testTenantName := roachpb.TenantName("test-tenant") + srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName) defer cleanupTenant() srcTenant.SQL.Exec(t, ` @@ -659,7 +664,7 @@ USE d; `) ctx := context.Background() - rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64()) + rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName) streamID := rows[0][0] const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)` diff --git a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go index 4564e46bcbca..a66b66d5446b 100644 --- a/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go +++ b/pkg/ccl/streamingccl/streamproducer/stream_lifetime.go @@ -30,6 +30,17 @@ import ( "github.com/cockroachdb/errors" ) +func startReplicationStreamJobByName( + ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantName roachpb.TenantName, +) (streampb.StreamID, error) { + tenant, err := sql.GetTenantRecordByName(ctx, evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig), txn, tenantName) + if err != nil { + return 0, err + } + + return startReplicationStreamJob(ctx, evalCtx, txn, tenant.ID) +} + // startReplicationStreamJob initializes a replication stream producer job on the source cluster that // 1. Tracks the liveness of the replication stream consumption // 2. TODO(casper): Updates the protected timestamp for spans being replicated diff --git a/pkg/ccl/testccl/sqlccl/tenant_gc_test.go b/pkg/ccl/testccl/sqlccl/tenant_gc_test.go index 86922eff04dd..8a860a5163b6 100644 --- a/pkg/ccl/testccl/sqlccl/tenant_gc_test.go +++ b/pkg/ccl/testccl/sqlccl/tenant_gc_test.go @@ -512,7 +512,7 @@ func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) { job, err := jobRegistry.LoadJob(ctx, sj.ID()) require.NoError(t, err) require.Equal(t, jobs.StatusSucceeded, job.Status()) - _, err = sql.GetTenantRecord(ctx, &execCfg, nil /* txn */, tenID.ToUint64()) + _, err = sql.GetTenantRecordByID(ctx, &execCfg, nil /* txn */, tenID) require.EqualError(t, err, fmt.Sprintf(`tenant "%d" does not exist`, tenID.ToUint64())) progress := job.Progress() require.Equal(t, jobspb.SchemaChangeGCProgress_CLEARED, progress.GetSchemaChangeGC().Tenant.Status) @@ -609,7 +609,7 @@ func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) { "SELECT status FROM [SHOW JOBS] WHERE description = 'GC for tenant 10'", [][]string{{"succeeded"}}, ) - _, err := sql.GetTenantRecord(ctx, &execCfg, nil /* txn */, tenID.ToUint64()) + _, err := sql.GetTenantRecordByID(ctx, &execCfg, nil /* txn */, tenID) require.EqualError(t, err, `tenant "10" does not exist`) // PTS record protecting system tenant cluster should block tenant GC. diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index de94437c4693..0b1799f6c74e 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -778,23 +778,42 @@ func (ts *TestServer) StartTenant( if rowCount == 0 { // Tenant doesn't exist. Create it. if _, err := ts.InternalExecutor().(*sql.InternalExecutor).Exec( - ctx, "testserver-create-tenant", nil /* txn */, "SELECT crdb_internal.create_tenant($1)", params.TenantID.ToUint64(), + ctx, "testserver-create-tenant", nil /* txn */, "SELECT crdb_internal.create_tenant($1, $2)", + params.TenantID.ToUint64(), params.TenantName, ); err != nil { return nil, err } + } else if params.TenantName != "" { + _, err := ts.InternalExecutor().(*sql.InternalExecutor).Exec(ctx, "rename-test-tenant", nil, + `SELECT crdb_internal.rename_tenant($1, $2)`, params.TenantID, params.TenantName) + if err != nil { + return nil, err + } } } else if !params.SkipTenantCheck { - rowCount, err := ts.InternalExecutor().(*sql.InternalExecutor).Exec( + row, err := ts.InternalExecutor().(*sql.InternalExecutor).QueryRow( ctx, "testserver-check-tenant-active", nil, - "SELECT 1 FROM system.tenants WHERE id=$1 AND active=true", - params.TenantID.ToUint64(), + "SELECT name FROM system.tenants WHERE id=$1 AND active=true", + params.TenantID.ToUint64(), string(params.TenantName), ) if err != nil { return nil, err } - if rowCount == 0 { + if row == nil { return nil, errors.New("not found") } + // Check that the name passed in via params matches the name persisted in + // the system.tenants table. + if row[0] != tree.DNull { + actualName := (*string)(row[0].(*tree.DString)) + if *actualName != string(params.TenantName) { + return nil, errors.Newf("name mismatch; tenant %d has name %q, but params specifies name %q", + params.TenantID.ToUint64(), *actualName, string(params.TenantName)) + } + } else if params.TenantName != "" { + return nil, errors.Newf("name mismatch; tenant %d has no name, but params specifies name %q", + params.TenantID.ToUint64(), string(params.TenantName)) + } } st := params.Settings diff --git a/pkg/sql/gcjob/tenant_garbage_collection.go b/pkg/sql/gcjob/tenant_garbage_collection.go index f2054110565c..e78203731a73 100644 --- a/pkg/sql/gcjob/tenant_garbage_collection.go +++ b/pkg/sql/gcjob/tenant_garbage_collection.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -41,7 +42,7 @@ func gcTenant( ) } - info, err := sql.GetTenantRecord(ctx, execCfg, nil /* txn */, tenID) + info, err := sql.GetTenantRecordByID(ctx, execCfg, nil /* txn */, roachpb.MakeTenantID(tenID)) if err != nil { if pgerror.GetPGCode(err) == pgcode.UndefinedObject { // The tenant row is deleted only after its data is cleared so there is diff --git a/pkg/sql/gcjob_test/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go index e63d2f788f49..cdd767bf1f94 100644 --- a/pkg/sql/gcjob_test/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -344,7 +344,7 @@ func TestGCResumer(t *testing.T) { job, err := jobRegistry.LoadJob(ctx, sj.ID()) require.NoError(t, err) require.Equal(t, jobs.StatusSucceeded, job.Status()) - _, err = sql.GetTenantRecord(ctx, &execCfg, nil /* txn */, tenID) + _, err = sql.GetTenantRecordByID(ctx, &execCfg, nil /* txn */, roachpb.MakeTenantID(tenID)) require.EqualError(t, err, `tenant "10" does not exist`) progress := job.Progress() require.Equal(t, jobspb.SchemaChangeGCProgress_CLEARED, progress.GetSchemaChangeGC().Tenant.Status) @@ -372,7 +372,7 @@ func TestGCResumer(t *testing.T) { job, err := jobRegistry.LoadJob(ctx, sj.ID()) require.NoError(t, err) require.Equal(t, jobs.StatusSucceeded, job.Status()) - _, err = sql.GetTenantRecord(ctx, &execCfg, nil /* txn */, tenID) + _, err = sql.GetTenantRecordByID(ctx, &execCfg, nil /* txn */, roachpb.MakeTenantID(tenID)) require.EqualError(t, err, `tenant "10" does not exist`) progress := job.Progress() require.Equal(t, jobspb.SchemaChangeGCProgress_CLEARED, progress.GetSchemaChangeGC().Tenant.Status) @@ -500,7 +500,7 @@ func TestGCTenant(t *testing.T) { require.NoError(t, gcClosure(dropTenID, progress)) require.Equal(t, jobspb.SchemaChangeGCProgress_CLEARED, progress.Tenant.Status) - _, err = sql.GetTenantRecord(ctx, &execCfg, nil /* txn */, dropTenID) + _, err = sql.GetTenantRecordByID(ctx, &execCfg, nil /* txn */, roachpb.MakeTenantID(dropTenID)) require.EqualError(t, err, `tenant "11" does not exist`) require.NoError(t, gcClosure(dropTenID, progress)) diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go index 8f8555b940c7..6b339ab1312d 100644 --- a/pkg/sql/sem/builtins/fixed_oids.go +++ b/pkg/sql/sem/builtins/fixed_oids.go @@ -480,6 +480,7 @@ var builtinOidsBySignature = map[string]oid.Oid{ `crdb_internal.show_create_all_types(database_name: string) -> string`: 353, `crdb_internal.sql_liveness_is_alive(session_id: bytes) -> bool`: 1353, `crdb_internal.start_replication_stream(tenant_id: int) -> int`: 1548, + `crdb_internal.start_replication_stream(tenant_name: string) -> int`: 2044, `crdb_internal.stream_ingestion_stats_json(job_id: int) -> jsonb`: 1546, `crdb_internal.stream_ingestion_stats_pb(job_id: int) -> bytes`: 1547, `crdb_internal.stream_partition(stream_id: int, partition_spec: bytes) -> bytes`: 1550, diff --git a/pkg/sql/sem/builtins/replication_builtins.go b/pkg/sql/sem/builtins/replication_builtins.go index 0ba86b627b7f..54b288479c77 100644 --- a/pkg/sql/sem/builtins/replication_builtins.go +++ b/pkg/sql/sem/builtins/replication_builtins.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -184,6 +185,29 @@ var replicationBuiltins = map[string]builtinDefinition{ "notify that the replication is still ongoing.", Volatility: volatility.Volatile, }, + tree.Overload{ + Types: tree.ArgTypes{ + {"tenant_name", types.String}, + }, + ReturnType: tree.FixedReturnType(types.Int), + Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx) + if err != nil { + return nil, err + } + tenantName := tree.MustBeDString(args[0]) + jobID, err := mgr.StartReplicationStreamByName(ctx, roachpb.TenantName(tenantName)) + if err != nil { + return nil, err + } + return tree.NewDInt(tree.DInt(jobID)), err + }, + Info: "This function can be used on the producer side to start a replication stream for " + + "the specified tenant. The returned stream ID uniquely identifies created stream. " + + "The caller must periodically invoke crdb_internal.heartbeat_stream() function to " + + "notify that the replication is still ongoing.", + Volatility: volatility.Volatile, + }, ), "crdb_internal.replication_stream_progress": makeBuiltin( diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index e6928f84ee2e..e02608a58b4a 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -701,12 +701,19 @@ type StreamManagerFactory interface { // ReplicationStreamManager represents a collection of APIs that streaming replication supports // on the production side. type ReplicationStreamManager interface { - // StartReplicationStream starts a stream replication job for the specified tenant on the producer side. + // StartReplicationStream starts a stream replication job for the specified + // tenant on the producer side. + // + // TODO(adityamaru): Remove this method in a follow up. StartReplicationStream( ctx context.Context, tenantID uint64, ) (streampb.StreamID, error) + // StartReplicationStreamByName starts a stream replication job for the + // specified tenant on the producer side. + StartReplicationStreamByName(ctx context.Context, tenantName roachpb.TenantName) (streampb.StreamID, error) + // HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating // consumer has consumed until the given 'frontier' timestamp. This updates the producer job // progress and extends its life, and the new producer progress will be returned. diff --git a/pkg/sql/tenant.go b/pkg/sql/tenant.go index 5efb3ef010b4..67ac5e2028ca 100644 --- a/pkg/sql/tenant.go +++ b/pkg/sql/tenant.go @@ -185,18 +185,45 @@ func CreateTenantRecord( ) } -// GetTenantRecord retrieves a tenant in system.tenants. -func GetTenantRecord( - ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, tenID uint64, +// GetTenantRecordByName retrieves a tenant with the provided name from +// system.tenants. +func GetTenantRecordByName( + ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, tenantName roachpb.TenantName, +) (*descpb.TenantInfo, error) { + if !execCfg.Settings.Version.IsActive(ctx, clusterversion.V23_1TenantNames) { + return nil, errors.Newf("tenant names not supported until upgrade to %s or higher is completed", + clusterversion.V23_1TenantNames.String()) + } + row, err := execCfg.InternalExecutor.QueryRowEx( + ctx, "activate-tenant", txn, sessiondata.NodeUserSessionDataOverride, + `SELECT info FROM system.tenants WHERE name = $1`, tenantName, + ) + if err != nil { + return nil, err + } else if row == nil { + return nil, pgerror.Newf(pgcode.UndefinedObject, "tenant %q does not exist", tenantName) + } + + info := &descpb.TenantInfo{} + infoBytes := []byte(tree.MustBeDBytes(row[0])) + if err := protoutil.Unmarshal(infoBytes, info); err != nil { + return nil, err + } + return info, nil +} + +// GetTenantRecordByID retrieves a tenant in system.tenants. +func GetTenantRecordByID( + ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, tenID roachpb.TenantID, ) (*descpb.TenantInfo, error) { row, err := execCfg.InternalExecutor.QueryRowEx( ctx, "activate-tenant", txn, sessiondata.NodeUserSessionDataOverride, - `SELECT info FROM system.tenants WHERE id = $1`, tenID, + `SELECT info FROM system.tenants WHERE id = $1`, tenID.ToUint64(), ) if err != nil { return nil, err } else if row == nil { - return nil, pgerror.Newf(pgcode.UndefinedObject, "tenant \"%d\" does not exist", tenID) + return nil, pgerror.Newf(pgcode.UndefinedObject, "tenant \"%d\" does not exist", tenID.ToUint64()) } info := &descpb.TenantInfo{} @@ -405,7 +432,7 @@ func ActivateTenant(ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, t } // Retrieve the tenant's info. - info, err := GetTenantRecord(ctx, execCfg, txn, tenID) + info, err := GetTenantRecordByID(ctx, execCfg, txn, roachpb.MakeTenantID(tenID)) if err != nil { return errors.Wrap(err, "activating tenant") } @@ -456,7 +483,7 @@ func (p *planner) DestroyTenant(ctx context.Context, tenID uint64, synchronous b } // Retrieve the tenant's info. - info, err := GetTenantRecord(ctx, p.execCfg, p.txn, tenID) + info, err := GetTenantRecordByID(ctx, p.execCfg, p.txn, roachpb.MakeTenantID(tenID)) if err != nil { return errors.Wrap(err, "destroying tenant") } @@ -608,7 +635,7 @@ func (p *planner) GCTenant(ctx context.Context, tenID uint64) error { var info *descpb.TenantInfo if txnErr := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { var err error - info, err = GetTenantRecord(ctx, p.execCfg, p.txn, tenID) + info, err = GetTenantRecordByID(ctx, p.execCfg, p.txn, roachpb.MakeTenantID(tenID)) return err }); txnErr != nil { return errors.Wrapf(txnErr, "retrieving tenant %d", tenID)