Skip to content

Commit

Permalink
builtins: add start_replication_stream overload
Browse files Browse the repository at this point in the history
This change adds an overload to
`crdb_internal.start_replication_stream` that takes in the tenant
name instead of the tenant ID. The underlying logic renames the same
except for an additional step to get the ID of the tenant given
the name.

We do not change `Create` in the `partitionStreamClient` to use this
new overload, this will be done in a follow-up where we can then delete
the overload that takes in a tenant ID. We do however switch all
tests to use the new overload.

Informs: cockroachdb#91240

Release note: None
  • Loading branch information
adityamaru committed Nov 15, 2022
1 parent 03f6062 commit a4053d7
Show file tree
Hide file tree
Showing 17 changed files with 152 additions and 37 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -2652,6 +2652,8 @@ The swap_ordinate_string parameter is a 2-character string naming the ordinates
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.start_replication_stream"></a><code>crdb_internal.start_replication_stream(tenant_id: <a href="int.html">int</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.start_replication_stream"></a><code>crdb_internal.start_replication_stream(tenant_name: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function can be used on the producer side to start a replication stream for the specified tenant. The returned stream ID uniquely identifies created stream. The caller must periodically invoke crdb_internal.heartbeat_stream() function to notify that the replication is still ongoing.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.stream_ingestion_stats_json"></a><code>crdb_internal.stream_ingestion_stats_json(job_id: <a href="int.html">int</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>This function can be used on the ingestion side to get a statistics summary of a stream ingestion job in json format.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.stream_ingestion_stats_pb"></a><code>crdb_internal.stream_ingestion_stats_pb(job_id: <a href="int.html">int</a>) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>This function can be used on the ingestion side to get a statistics summary of a stream ingestion job in protobuf format.</p>
Expand Down
2 changes: 2 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ func DefaultTestTempStorageConfigWithSize(
// TestTenantArgs are the arguments used when creating a tenant from a
// TestServer.
type TestTenantArgs struct {
TenantName roachpb.TenantName

TenantID roachpb.TenantID

// Existing, if true, indicates an existing tenant, rather than a new tenant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func (p *partitionedStreamClient) Create(
p.mu.Lock()
defer p.mu.Unlock()
var streamID streampb.StreamID
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1)`, tenantID.ToUint64())
row := p.mu.srcConn.QueryRow(ctx, `SELECT crdb_internal.start_replication_stream($1::INT)`,
tenantID.ToUint64())
err := row.Scan(&streamID)
if err != nil {
return streampb.InvalidStreamID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func TestPartitionedStreamReplicationClient(t *testing.T) {

defer cleanup()

tenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID())
testTenantName := roachpb.TenantName("test-tenant")
tenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName)
defer cleanupTenant()

ctx := context.Background()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func ingestionPlanHook(
}

// Create a new tenant for the replication stream
if _, err := sql.GetTenantRecord(ctx, p.ExecCfg(), p.Txn(), newTenantID.ToUint64()); err == nil {
if _, err := sql.GetTenantRecordByID(ctx, p.ExecCfg(), p.Txn(), newTenantID); err == nil {
return errors.Newf("tenant with id %s already exists", newTenantID)
}
tenantInfo := &descpb.TenantInfoWithUsage{
Expand Down
10 changes: 8 additions & 2 deletions pkg/ccl/streamingccl/streamingtest/replication_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ func (rf *ReplicationFeed) consumeUntil(

// TenantState maintains test state related to tenant.
type TenantState struct {
// Name is the name of the tenant.
Name roachpb.TenantName
// ID is the ID of the tenant.
ID roachpb.TenantID
// Codec is the Codec of the tenant.
Expand Down Expand Up @@ -240,10 +242,14 @@ SET CLUSTER SETTING sql.defaults.experimental_stream_replication.enabled = 'on';

// CreateTenant creates a tenant under the replication helper's server
func (rh *ReplicationHelper) CreateTenant(
t *testing.T, tenantID roachpb.TenantID,
t *testing.T, tenantID roachpb.TenantID, tenantName roachpb.TenantName,
) (TenantState, func()) {
_, tenantConn := serverutils.StartTenant(t, rh.SysServer, base.TestTenantArgs{TenantID: tenantID})
_, tenantConn := serverutils.StartTenant(t, rh.SysServer, base.TestTenantArgs{
TenantID: tenantID,
TenantName: tenantName,
})
return TenantState{
Name: tenantName,
ID: tenantID,
Codec: keys.MakeSQLCodec(tenantID),
SQL: sqlutils.MakeSQLRunner(tenantConn),
Expand Down
8 changes: 8 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/repstream"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand All @@ -34,6 +35,13 @@ func (r *replicationStreamManagerImpl) StartReplicationStream(
return startReplicationStreamJob(ctx, r.evalCtx, r.txn, tenantID)
}

// StartReplicationStreamByName implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) StartReplicationStreamByName(
ctx context.Context, tenantName roachpb.TenantName,
) (streampb.StreamID, error) {
return startReplicationStreamJobByName(ctx, r.evalCtx, r.txn, tenantName)
}

// HeartbeatReplicationStream implements streaming.ReplicationStreamManager interface.
func (r *replicationStreamManagerImpl) HeartbeatReplicationStream(
ctx context.Context, streamID streampb.StreamID, frontier hlc.Timestamp,
Expand Down
29 changes: 17 additions & 12 deletions pkg/ccl/streamingccl/streamproducer/replication_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,15 @@ func TestReplicationStreamInitialization(t *testing.T) {

h, cleanup := streamingtest.NewReplicationHelper(t, serverArgs)
defer cleanup()
srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID())
testTenantName := roachpb.TenantName("test-tenant")
srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName)
defer cleanupTenant()

// Makes the stream time out really soon
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '10ms'")
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.stream_liveness_track_frequency = '1ms'")
t.Run("failed-after-timeout", func(t *testing.T) {
rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64())
rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName)
streamID := rows[0][0]

h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %s", streamID),
Expand All @@ -259,7 +260,7 @@ func TestReplicationStreamInitialization(t *testing.T) {
// Make sure the stream does not time out within the test timeout
h.SysSQL.Exec(t, "SET CLUSTER SETTING stream_replication.job_liveness_timeout = '500s'")
t.Run("continuously-running-within-timeout", func(t *testing.T) {
rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64())
rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName)
streamID := rows[0][0]

h.SysSQL.CheckQueryResultsRetry(t, fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %s", streamID),
Expand Down Expand Up @@ -329,7 +330,8 @@ func TestStreamPartition(t *testing.T) {
DisableDefaultTestTenant: true,
})
defer cleanup()
srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID())
testTenantName := roachpb.TenantName("test-tenant")
srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName)
defer cleanupTenant()

srcTenant.SQL.Exec(t, `
Expand All @@ -342,7 +344,7 @@ USE d;
`)

ctx := context.Background()
rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64())
rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName)
streamID := rows[0][0]

const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)`
Expand Down Expand Up @@ -474,7 +476,8 @@ func TestStreamAddSSTable(t *testing.T) {
DisableDefaultTestTenant: true,
})
defer cleanup()
srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID())
testTenantName := roachpb.TenantName("test-tenant")
srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName)
defer cleanupTenant()

srcTenant.SQL.Exec(t, `
Expand All @@ -485,7 +488,7 @@ USE d;
`)

ctx := context.Background()
rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64())
rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName)
streamID := rows[0][0]

const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)`
Expand Down Expand Up @@ -561,7 +564,8 @@ func TestCompleteStreamReplication(t *testing.T) {
})
defer cleanup()
srcTenantID := serverutils.TestTenantID()
_, cleanupTenant := h.CreateTenant(t, srcTenantID)
testTenantName := roachpb.TenantName("test-tenant")
_, cleanupTenant := h.CreateTenant(t, srcTenantID, testTenantName)
defer cleanupTenant()

// Make the producer job times out fast and fastly tracks ingestion cutover signal.
Expand All @@ -571,7 +575,7 @@ func TestCompleteStreamReplication(t *testing.T) {

var timedOutStreamID int
row := h.SysSQL.QueryRow(t,
"SELECT crdb_internal.start_replication_stream($1)", srcTenantID.ToUint64())
"SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName)
row.Scan(&timedOutStreamID)
jobutils.WaitForJobToFail(t, h.SysSQL, jobspb.JobID(timedOutStreamID))

Expand All @@ -585,7 +589,7 @@ func TestCompleteStreamReplication(t *testing.T) {
// Create a new replication stream and complete it.
var streamID int
row := h.SysSQL.QueryRow(t,
"SELECT crdb_internal.start_replication_stream($1)", srcTenantID.ToUint64())
"SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName)
row.Scan(&streamID)
jobutils.WaitForJobToRun(t, h.SysSQL, jobspb.JobID(streamID))
h.SysSQL.Exec(t, "SELECT crdb_internal.complete_replication_stream($1, $2)",
Expand Down Expand Up @@ -644,7 +648,8 @@ func TestStreamDeleteRange(t *testing.T) {
DisableDefaultTestTenant: true,
})
defer cleanup()
srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID())
testTenantName := roachpb.TenantName("test-tenant")
srcTenant, cleanupTenant := h.CreateTenant(t, serverutils.TestTenantID(), testTenantName)
defer cleanupTenant()

srcTenant.SQL.Exec(t, `
Expand All @@ -659,7 +664,7 @@ USE d;
`)

ctx := context.Background()
rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1)", srcTenant.ID.ToUint64())
rows := h.SysSQL.QueryStr(t, "SELECT crdb_internal.start_replication_stream($1::STRING)", testTenantName)
streamID := rows[0][0]

const streamPartitionQuery = `SELECT * FROM crdb_internal.stream_partition($1, $2)`
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ import (
"github.com/cockroachdb/errors"
)

func startReplicationStreamJobByName(
ctx context.Context, evalCtx *eval.Context, txn *kv.Txn, tenantName roachpb.TenantName,
) (streampb.StreamID, error) {
tenant, err := sql.GetTenantRecordByName(ctx, evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig), txn, tenantName)
if err != nil {
return 0, err
}

return startReplicationStreamJob(ctx, evalCtx, txn, tenant.ID)
}

// startReplicationStreamJob initializes a replication stream producer job on the source cluster that
// 1. Tracks the liveness of the replication stream consumption
// 2. TODO(casper): Updates the protected timestamp for spans being replicated
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/testccl/sqlccl/tenant_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) {
job, err := jobRegistry.LoadJob(ctx, sj.ID())
require.NoError(t, err)
require.Equal(t, jobs.StatusSucceeded, job.Status())
_, err = sql.GetTenantRecord(ctx, &execCfg, nil /* txn */, tenID.ToUint64())
_, err = sql.GetTenantRecordByID(ctx, &execCfg, nil /* txn */, tenID)
require.EqualError(t, err, fmt.Sprintf(`tenant "%d" does not exist`, tenID.ToUint64()))
progress := job.Progress()
require.Equal(t, jobspb.SchemaChangeGCProgress_CLEARED, progress.GetSchemaChangeGC().Tenant.Status)
Expand Down Expand Up @@ -609,7 +609,7 @@ func TestGCTenantJobWaitsForProtectedTimestamps(t *testing.T) {
"SELECT status FROM [SHOW JOBS] WHERE description = 'GC for tenant 10'",
[][]string{{"succeeded"}},
)
_, err := sql.GetTenantRecord(ctx, &execCfg, nil /* txn */, tenID.ToUint64())
_, err := sql.GetTenantRecordByID(ctx, &execCfg, nil /* txn */, tenID)
require.EqualError(t, err, `tenant "10" does not exist`)

// PTS record protecting system tenant cluster should block tenant GC.
Expand Down
29 changes: 24 additions & 5 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,23 +778,42 @@ func (ts *TestServer) StartTenant(
if rowCount == 0 {
// Tenant doesn't exist. Create it.
if _, err := ts.InternalExecutor().(*sql.InternalExecutor).Exec(
ctx, "testserver-create-tenant", nil /* txn */, "SELECT crdb_internal.create_tenant($1)", params.TenantID.ToUint64(),
ctx, "testserver-create-tenant", nil /* txn */, "SELECT crdb_internal.create_tenant($1, $2)",
params.TenantID.ToUint64(), params.TenantName,
); err != nil {
return nil, err
}
} else if params.TenantName != "" {
_, err := ts.InternalExecutor().(*sql.InternalExecutor).Exec(ctx, "rename-test-tenant", nil,
`SELECT crdb_internal.rename_tenant($1, $2)`, params.TenantID, params.TenantName)
if err != nil {
return nil, err
}
}
} else if !params.SkipTenantCheck {
rowCount, err := ts.InternalExecutor().(*sql.InternalExecutor).Exec(
row, err := ts.InternalExecutor().(*sql.InternalExecutor).QueryRow(
ctx, "testserver-check-tenant-active", nil,
"SELECT 1 FROM system.tenants WHERE id=$1 AND active=true",
params.TenantID.ToUint64(),
"SELECT name FROM system.tenants WHERE id=$1 AND active=true",
params.TenantID.ToUint64(), string(params.TenantName),
)
if err != nil {
return nil, err
}
if rowCount == 0 {
if row == nil {
return nil, errors.New("not found")
}
// Check that the name passed in via params matches the name persisted in
// the system.tenants table.
if row[0] != tree.DNull {
actualName := (*string)(row[0].(*tree.DString))
if *actualName != string(params.TenantName) {
return nil, errors.Newf("name mismatch; tenant %d has name %q, but params specifies name %q",
params.TenantID.ToUint64(), *actualName, string(params.TenantName))
}
} else if params.TenantName != "" {
return nil, errors.Newf("name mismatch; tenant %d has no name, but params specifies name %q",
params.TenantID.ToUint64(), string(params.TenantName))
}
}

st := params.Settings
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/gcjob/tenant_garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -41,7 +42,7 @@ func gcTenant(
)
}

info, err := sql.GetTenantRecord(ctx, execCfg, nil /* txn */, tenID)
info, err := sql.GetTenantRecordByID(ctx, execCfg, nil /* txn */, roachpb.MakeTenantID(tenID))
if err != nil {
if pgerror.GetPGCode(err) == pgcode.UndefinedObject {
// The tenant row is deleted only after its data is cleared so there is
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/gcjob_test/gc_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestGCResumer(t *testing.T) {
job, err := jobRegistry.LoadJob(ctx, sj.ID())
require.NoError(t, err)
require.Equal(t, jobs.StatusSucceeded, job.Status())
_, err = sql.GetTenantRecord(ctx, &execCfg, nil /* txn */, tenID)
_, err = sql.GetTenantRecordByID(ctx, &execCfg, nil /* txn */, roachpb.MakeTenantID(tenID))
require.EqualError(t, err, `tenant "10" does not exist`)
progress := job.Progress()
require.Equal(t, jobspb.SchemaChangeGCProgress_CLEARED, progress.GetSchemaChangeGC().Tenant.Status)
Expand Down Expand Up @@ -372,7 +372,7 @@ func TestGCResumer(t *testing.T) {
job, err := jobRegistry.LoadJob(ctx, sj.ID())
require.NoError(t, err)
require.Equal(t, jobs.StatusSucceeded, job.Status())
_, err = sql.GetTenantRecord(ctx, &execCfg, nil /* txn */, tenID)
_, err = sql.GetTenantRecordByID(ctx, &execCfg, nil /* txn */, roachpb.MakeTenantID(tenID))
require.EqualError(t, err, `tenant "10" does not exist`)
progress := job.Progress()
require.Equal(t, jobspb.SchemaChangeGCProgress_CLEARED, progress.GetSchemaChangeGC().Tenant.Status)
Expand Down Expand Up @@ -500,7 +500,7 @@ func TestGCTenant(t *testing.T) {

require.NoError(t, gcClosure(dropTenID, progress))
require.Equal(t, jobspb.SchemaChangeGCProgress_CLEARED, progress.Tenant.Status)
_, err = sql.GetTenantRecord(ctx, &execCfg, nil /* txn */, dropTenID)
_, err = sql.GetTenantRecordByID(ctx, &execCfg, nil /* txn */, roachpb.MakeTenantID(dropTenID))
require.EqualError(t, err, `tenant "11" does not exist`)
require.NoError(t, gcClosure(dropTenID, progress))

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/fixed_oids.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ var builtinOidsBySignature = map[string]oid.Oid{
`crdb_internal.show_create_all_types(database_name: string) -> string`: 353,
`crdb_internal.sql_liveness_is_alive(session_id: bytes) -> bool`: 1353,
`crdb_internal.start_replication_stream(tenant_id: int) -> int`: 1548,
`crdb_internal.start_replication_stream(tenant_name: string) -> int`: 2044,
`crdb_internal.stream_ingestion_stats_json(job_id: int) -> jsonb`: 1546,
`crdb_internal.stream_ingestion_stats_pb(job_id: int) -> bytes`: 1547,
`crdb_internal.stream_partition(stream_id: int, partition_spec: bytes) -> bytes`: 1550,
Expand Down
24 changes: 24 additions & 0 deletions pkg/sql/sem/builtins/replication_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -184,6 +185,29 @@ var replicationBuiltins = map[string]builtinDefinition{
"notify that the replication is still ongoing.",
Volatility: volatility.Volatile,
},
tree.Overload{
Types: tree.ArgTypes{
{"tenant_name", types.String},
},
ReturnType: tree.FixedReturnType(types.Int),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
mgr, err := evalCtx.StreamManagerFactory.GetReplicationStreamManager(ctx)
if err != nil {
return nil, err
}
tenantName := tree.MustBeDString(args[0])
jobID, err := mgr.StartReplicationStreamByName(ctx, roachpb.TenantName(tenantName))
if err != nil {
return nil, err
}
return tree.NewDInt(tree.DInt(jobID)), err
},
Info: "This function can be used on the producer side to start a replication stream for " +
"the specified tenant. The returned stream ID uniquely identifies created stream. " +
"The caller must periodically invoke crdb_internal.heartbeat_stream() function to " +
"notify that the replication is still ongoing.",
Volatility: volatility.Volatile,
},
),

"crdb_internal.replication_stream_progress": makeBuiltin(
Expand Down
Loading

0 comments on commit a4053d7

Please sign in to comment.