Skip to content

Commit

Permalink
Merge #137089
Browse files Browse the repository at this point in the history
137089: crosscluster/logical: create new tables if specified r=dt a=msbutler

This implements the new CREATE LOGICALLY REPLICATES TABLES syntax, which
creates the destination tables during LDR planning. In this commit, the tables
are created online and LDR iniital scan proceeds as it always has. In a future
commit, these tables will be created in an offline state enabling a fast
initial scan via AddSStable ingestion.

Epic: none

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Dec 12, 2024
2 parents 6cb15dc + f05cedf commit 4f464d5
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 73 deletions.
3 changes: 3 additions & 0 deletions pkg/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//pkg/kv/kvpb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/settings/cluster",
Expand All @@ -43,6 +44,8 @@ go_library(
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/externalcatalog",
"//pkg/sql/catalog/externalcatalog/externalpb",
"//pkg/sql/catalog/lease",
"//pkg/sql/catalog/resolver",
"//pkg/sql/catalog/tabledesc",
Expand Down
218 changes: 152 additions & 66 deletions pkg/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/externalcatalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/externalcatalog/externalpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
Expand Down Expand Up @@ -131,46 +135,11 @@ func createLogicalReplicationStreamPlanHook(
return pgerror.Newf(pgcode.InvalidParameterValue, "unknown discard option %q", m)
}
}

var (
targetsDescription string
srcTableNames = make([]string, len(stmt.From.Tables))
repPairs = make([]jobspb.LogicalReplicationDetails_ReplicationPair, len(stmt.Into.Tables))
srcTableDescs = make([]*descpb.TableDescriptor, len(stmt.Into.Tables))
)
for i := range stmt.From.Tables {

dstObjName, err := stmt.Into.Tables[i].ToUnresolvedObjectName(tree.NoAnnotation)
if err != nil {
return err
}
dstTableName := dstObjName.ToTableName()
prefix, td, err := resolver.ResolveMutableExistingTableObject(ctx, p, &dstTableName, true, tree.ResolveRequireTableDesc)
if err != nil {
return err
}
repPairs[i].DstDescriptorID = int32(td.GetID())

tbNameWithSchema := tree.MakeTableNameWithSchema(
tree.Name(prefix.Database.GetName()),
tree.Name(prefix.Schema.GetName()),
tree.Name(td.GetName()),
)

srcTableNames[i] = stmt.From.Tables[i].String()

if i == 0 {
targetsDescription = tbNameWithSchema.FQString()
} else {
targetsDescription += ", " + tbNameWithSchema.FQString()
}

if mode != jobspb.LogicalReplicationDetails_Validated {
if len(td.OutboundForeignKeys()) > 0 || len(td.InboundForeignKeys()) > 0 {
return pgerror.Newf(pgcode.InvalidParameterValue, "foreign keys are only supported with MODE = 'validated'")
}
}
resolvedDestObjects, err := resolveDestinationObjects(ctx, p, stmt.Into, stmt.CreateTable)
if err != nil {
return err
}

if !p.ExtendedEvalContext().TxnIsSingleStmt {
return errors.New("cannot CREATE LOGICAL REPLICATION STREAM in a multi-statement transaction")
}
Expand Down Expand Up @@ -210,11 +179,14 @@ func createLogicalReplicationStreamPlanHook(
defer func() {
_ = client.Close(ctx)
}()

if err := client.Dial(ctx); err != nil {
return err
}

srcTableNames := make([]string, len(stmt.From.Tables))
for i, tb := range stmt.From.Tables {
srcTableNames[i] = tb.String()
}
spec, err := client.CreateForTables(ctx, &streampb.ReplicationProducerRequest{
TableNames: srcTableNames,
})
Expand All @@ -233,39 +205,46 @@ func createLogicalReplicationStreamPlanHook(
}
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(sourceTypes)

replicationStartTime := spec.ReplicationStartTime
progress := jobspb.LogicalReplicationProgress{}
if cursor, ok := options.GetCursor(); ok {
replicationStartTime = cursor
progress.ReplicatedTime = cursor
}

// If the user asked to ignore "ttl-deletes", make sure that at least one of
// the source tables actually has a TTL job which sets the omit bit that
// is used for filtering; if not, they probably forgot that step.
throwNoTTLWithCDCIgnoreError := discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes

// TODO: consider moving repPair construction into doLDRPlan after dest tables are created.
repPairs := make([]jobspb.LogicalReplicationDetails_ReplicationPair, len(spec.ExternalCatalog.Tables))
for i, td := range spec.ExternalCatalog.Tables {
cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil {
return err
}
srcTableDescs[i] = cpy.TableDesc()
// TODO: i don't like this at all. this could be fixed if repPairs were
// populated in doLDRPlan.
spec.ExternalCatalog.Tables[i] = *cpy.TableDesc()

if !stmt.CreateTable {
repPairs[i].DstDescriptorID = int32(resolvedDestObjects.TableIDs[i])
}
repPairs[i].SrcDescriptorID = int32(td.ID)
if td.RowLevelTTL != nil && td.RowLevelTTL.DisableChangefeedReplication {
throwNoTTLWithCDCIgnoreError = false
}
}

if throwNoTTLWithCDCIgnoreError {
return pgerror.Newf(pgcode.InvalidParameterValue, "DISCARD = 'ttl-deletes' specified but no tables have changefeed-excluded TTLs")
}

replicationStartTime := spec.ReplicationStartTime
progress := jobspb.LogicalReplicationProgress{}
if cursor, ok := options.GetCursor(); ok {
replicationStartTime = cursor
progress.ReplicatedTime = cursor
}

if uf, ok := options.GetUserFunctions(); ok {
for i, name := range srcTableNames {
repPairs[i].DstFunctionID = uf[name]
}
}
if throwNoTTLWithCDCIgnoreError {
return pgerror.Newf(pgcode.InvalidParameterValue, "DISCARD = 'ttl-deletes' specified but no tables have changefeed-excluded TTLs")
}

// Default conflict resolution if not set will be LWW
defaultConflictResolution := jobspb.LogicalReplicationDetails_DefaultConflictResolution{
ConflictResolutionType: jobspb.LogicalReplicationDetails_DefaultConflictResolution_LWW,
Expand All @@ -276,23 +255,24 @@ func createLogicalReplicationStreamPlanHook(

jr := jobs.Record{
JobID: p.ExecCfg().JobRegistry.MakeJobID(),
Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI),
Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", resolvedDestObjects.TargetDescription(), cleanedURI),
Username: p.User(),
Details: jobspb.LogicalReplicationDetails{
StreamID: uint64(spec.StreamID),
SourceClusterID: spec.SourceClusterID,
ReplicationStartTime: replicationStartTime,
SourceClusterConnStr: string(streamAddress),
ReplicationPairs: repPairs,
SourceClusterConnStr: string(streamAddress),
TableNames: srcTableNames,
DefaultConflictResolution: defaultConflictResolution,
Discard: discard,
Mode: mode,
MetricsLabel: options.metricsLabel,
CreateTable: stmt.CreateTable,
},
Progress: progress,
}
if err := doLDRPlan(ctx, p.ExecCfg().InternalDB, p.ExecCfg().JobRegistry, jr, srcTableDescs, options.SkipSchemaCheck()); err != nil {
if err := doLDRPlan(ctx, p.User(), p.ExecCfg(), jr, spec.ExternalCatalog, resolvedDestObjects, options.SkipSchemaCheck()); err != nil {
return err
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jr.JobID))}
Expand All @@ -302,40 +282,146 @@ func createLogicalReplicationStreamPlanHook(
return fn, streamCreationHeader, nil, false, nil
}

type ResolvedDestObjects struct {
TableIDs []catid.DescID
TableNames []tree.TableName

// For CreateTable case
ParentSchemaID catid.DescID
ParentDatabaseID catid.DescID
}

func (r *ResolvedDestObjects) TargetDescription() string {
var targetDescription string
for i := range r.TableNames {
if i == 0 {
targetDescription = r.TableNames[i].FQString()
} else {
targetDescription += ", " + r.TableNames[i].FQString()
}
}
return targetDescription
}

func resolveDestinationObjects(
ctx context.Context,
p sql.PlanHookState,
destResources tree.LogicalReplicationResources,
createTable bool,
) (ResolvedDestObjects, error) {
var resolved ResolvedDestObjects
for i := range destResources.Tables {
dstObjName, err := destResources.Tables[i].ToUnresolvedObjectName(tree.NoAnnotation)
if err != nil {
return resolved, err
}
dstObjName.HasExplicitSchema()

if createTable {
_, _, resPrefix, err := resolver.ResolveTarget(ctx,
&dstObjName, p, p.SessionData().Database, p.SessionData().SearchPath)
if err != nil {
return resolved, errors.Newf("resolving target import name")
}

if resolved.ParentDatabaseID == 0 {
resolved.ParentDatabaseID = resPrefix.Database.GetID()
resolved.ParentSchemaID = resPrefix.Schema.GetID()
} else if resolved.ParentDatabaseID != resPrefix.Database.GetID() {
return resolved, errors.Newf("destination tables must all be in the same database")
} else if resolved.ParentSchemaID != resPrefix.Schema.GetID() {
return resolved, errors.Newf("destination tables must all be in the same schema")
}

tbNameWithSchema := tree.MakeTableNameWithSchema(
tree.Name(resPrefix.Database.GetName()),
tree.Name(resPrefix.Schema.GetName()),
tree.Name(dstObjName.Object()),
)
resolved.TableNames = append(resolved.TableNames, tbNameWithSchema)
} else {
dstTableName := dstObjName.ToTableName()
prefix, td, err := resolver.ResolveMutableExistingTableObject(ctx, p, &dstTableName, true, tree.ResolveRequireTableDesc)
if err != nil {
return resolved, err
}

tbNameWithSchema := tree.MakeTableNameWithSchema(
tree.Name(prefix.Database.GetName()),
tree.Name(prefix.Schema.GetName()),
tree.Name(td.GetName()),
)
resolved.TableNames = append(resolved.TableNames, tbNameWithSchema)
resolved.TableIDs = append(resolved.TableIDs, td.GetID())
}
}
return resolved, nil
}

func doLDRPlan(
ctx context.Context,
internalDB *sql.InternalDB,
jobRegistry *jobs.Registry,
user username.SQLUsername,
execCfg *sql.ExecutorConfig,
jr jobs.Record,
srcTableDescs []*descpb.TableDescriptor,
srcExternalCatalog externalpb.ExternalCatalog,
resolvedDestObjects ResolvedDestObjects,
skipSchemaCheck bool,
) error {
details := jr.Details.(jobspb.LogicalReplicationDetails)
return internalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
return execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
var (
err error
writtenDescs []catalog.Descriptor
)
if details.CreateTable {
writtenDescs, err = externalcatalog.IngestExternalCatalog(ctx, execCfg, user, srcExternalCatalog, txn, txn.Descriptors(), resolvedDestObjects.ParentDatabaseID, resolvedDestObjects.ParentSchemaID, false)
if err != nil {
return err
}
}

dstTableDescs := make([]*tabledesc.Mutable, 0, len(details.ReplicationPairs))
for _, pair := range details.ReplicationPairs {
dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(pair.DstDescriptorID))
for i := range details.ReplicationPairs {
if details.CreateTable {
// TODO: it's quite messy to set RepPairs for the CreateTable case here,
// but we need to because we only get the table IDs after ingesting the
// external catalog. It's also good to ingest the external catalog in
// same txn as validation so it's automatically rolled back if theres an
// error during validation.
//
// Instead, we could populate repPairs in this txn.
details.ReplicationPairs[i].DstDescriptorID = int32(writtenDescs[i].GetID())
}
dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(details.ReplicationPairs[i].DstDescriptorID))
if err != nil {
return err
}
dstTableDescs = append(dstTableDescs, dstTableDesc)
}

if buildutil.CrdbTestBuild {
if len(srcTableDescs) != len(dstTableDescs) {
if len(srcExternalCatalog.Tables) != len(dstTableDescs) {
return errors.AssertionFailedf("srcTableDescs and dstTableDescs should have the same length")
}
}
for i := range srcTableDescs {
err := tabledesc.CheckLogicalReplicationCompatibility(srcTableDescs[i], dstTableDescs[i].TableDesc(), skipSchemaCheck)
for i := range srcExternalCatalog.Tables {
destTableDesc := dstTableDescs[i]
if details.Mode != jobspb.LogicalReplicationDetails_Validated {
if len(destTableDesc.OutboundForeignKeys()) > 0 || len(destTableDesc.InboundForeignKeys()) > 0 {
return pgerror.Newf(pgcode.InvalidParameterValue, "foreign keys are only supported with MODE = 'validated'")
}
}

err := tabledesc.CheckLogicalReplicationCompatibility(&srcExternalCatalog.Tables[i], destTableDesc.TableDesc(), skipSchemaCheck || details.CreateTable)
if err != nil {
return err
}
}

if err := replicationutils.LockLDRTables(ctx, txn, dstTableDescs, jr.JobID); err != nil {
return err
}
if _, err := jobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
if _, err := execCfg.JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
return err
}
return nil
Expand Down
25 changes: 25 additions & 0 deletions pkg/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,31 @@ func TestLogicalStreamIngestionJobWithCursor(t *testing.T) {
dbB.CheckQueryResults(t, "SELECT * from b.tab", expectedRowsB)
}

func TestCreateTables(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderDeadlock(t)
defer log.Scope(t).Close(t)

ctx := context.Background()

tc, srv, sqlDBs, _ := setupServerWithNumDBs(t, ctx, testClusterBaseClusterArgs, 1, 1)
defer tc.Stopper().Stop(ctx)

sqlA := sqlDBs[0]
sqlA.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
aURL, cleanup := srv.PGUrl(t, serverutils.DBName("a"))
defer cleanup()

sqlA.Exec(t, "CREATE DATABASE b")
sqlB := sqlutils.MakeSQLRunner(srv.SQLConn(t, serverutils.DBName("b")))

var jobID jobspb.JobID
sqlB.QueryRow(t, "CREATE LOGICALLY REPLICATED TABLE b.tab FROM TABLE tab ON $1", aURL.String()).Scan(&jobID)

WaitUntilReplicatedTime(t, srv.Clock().Now(), sqlB, jobID)
sqlB.CheckQueryResults(t, "SELECT * FROM tab", [][]string{{"1", "hello"}})
}

// TestLogicalStreamIngestionAdvancePTS tests that the producer side pts advances
// as the destination side frontier advances.
func TestLogicalStreamIngestionAdvancePTS(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ message LogicalReplicationDetails {

Discard discard = 11;

bool create_table = 12;

// Next ID: 12.
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/externalcatalog/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
"//pkg/security/username",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/externalcatalog/externalpb",
Expand Down
Loading

0 comments on commit 4f464d5

Please sign in to comment.