Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crosscluster/logical: create new tables if specified #137089

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//pkg/kv/kvpb",
"//pkg/repstream/streampb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/server/telemetry",
"//pkg/settings",
"//pkg/settings/cluster",
Expand All @@ -43,6 +44,8 @@ go_library(
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/externalcatalog",
"//pkg/sql/catalog/externalcatalog/externalpb",
"//pkg/sql/catalog/lease",
"//pkg/sql/catalog/resolver",
"//pkg/sql/catalog/tabledesc",
Expand Down
218 changes: 152 additions & 66 deletions pkg/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/externalcatalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/externalcatalog/externalpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
Expand Down Expand Up @@ -131,46 +135,11 @@ func createLogicalReplicationStreamPlanHook(
return pgerror.Newf(pgcode.InvalidParameterValue, "unknown discard option %q", m)
}
}

var (
targetsDescription string
srcTableNames = make([]string, len(stmt.From.Tables))
repPairs = make([]jobspb.LogicalReplicationDetails_ReplicationPair, len(stmt.Into.Tables))
srcTableDescs = make([]*descpb.TableDescriptor, len(stmt.Into.Tables))
)
for i := range stmt.From.Tables {

dstObjName, err := stmt.Into.Tables[i].ToUnresolvedObjectName(tree.NoAnnotation)
if err != nil {
return err
}
dstTableName := dstObjName.ToTableName()
prefix, td, err := resolver.ResolveMutableExistingTableObject(ctx, p, &dstTableName, true, tree.ResolveRequireTableDesc)
if err != nil {
return err
}
repPairs[i].DstDescriptorID = int32(td.GetID())

tbNameWithSchema := tree.MakeTableNameWithSchema(
tree.Name(prefix.Database.GetName()),
tree.Name(prefix.Schema.GetName()),
tree.Name(td.GetName()),
)

srcTableNames[i] = stmt.From.Tables[i].String()

if i == 0 {
targetsDescription = tbNameWithSchema.FQString()
} else {
targetsDescription += ", " + tbNameWithSchema.FQString()
}

if mode != jobspb.LogicalReplicationDetails_Validated {
if len(td.OutboundForeignKeys()) > 0 || len(td.InboundForeignKeys()) > 0 {
return pgerror.Newf(pgcode.InvalidParameterValue, "foreign keys are only supported with MODE = 'validated'")
}
}
resolvedDestObjects, err := resolveDestinationObjects(ctx, p, stmt.Into, stmt.CreateTable)
if err != nil {
return err
}

if !p.ExtendedEvalContext().TxnIsSingleStmt {
return errors.New("cannot CREATE LOGICAL REPLICATION STREAM in a multi-statement transaction")
}
Expand Down Expand Up @@ -210,11 +179,14 @@ func createLogicalReplicationStreamPlanHook(
defer func() {
_ = client.Close(ctx)
}()

if err := client.Dial(ctx); err != nil {
return err
}

srcTableNames := make([]string, len(stmt.From.Tables))
for i, tb := range stmt.From.Tables {
srcTableNames[i] = tb.String()
}
spec, err := client.CreateForTables(ctx, &streampb.ReplicationProducerRequest{
TableNames: srcTableNames,
})
Expand All @@ -233,39 +205,46 @@ func createLogicalReplicationStreamPlanHook(
}
crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(sourceTypes)

replicationStartTime := spec.ReplicationStartTime
progress := jobspb.LogicalReplicationProgress{}
if cursor, ok := options.GetCursor(); ok {
replicationStartTime = cursor
progress.ReplicatedTime = cursor
}

// If the user asked to ignore "ttl-deletes", make sure that at least one of
// the source tables actually has a TTL job which sets the omit bit that
// is used for filtering; if not, they probably forgot that step.
throwNoTTLWithCDCIgnoreError := discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes

// TODO: consider moving repPair construction into doLDRPlan after dest tables are created.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dt before i do this additional refactor, i'm curious what you think of the patch as it stands now.

repPairs := make([]jobspb.LogicalReplicationDetails_ReplicationPair, len(spec.ExternalCatalog.Tables))
for i, td := range spec.ExternalCatalog.Tables {
cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable()
if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil {
return err
}
srcTableDescs[i] = cpy.TableDesc()
// TODO: i don't like this at all. this could be fixed if repPairs were
// populated in doLDRPlan.
spec.ExternalCatalog.Tables[i] = *cpy.TableDesc()

if !stmt.CreateTable {
repPairs[i].DstDescriptorID = int32(resolvedDestObjects.TableIDs[i])
}
repPairs[i].SrcDescriptorID = int32(td.ID)
if td.RowLevelTTL != nil && td.RowLevelTTL.DisableChangefeedReplication {
throwNoTTLWithCDCIgnoreError = false
}
}

if throwNoTTLWithCDCIgnoreError {
return pgerror.Newf(pgcode.InvalidParameterValue, "DISCARD = 'ttl-deletes' specified but no tables have changefeed-excluded TTLs")
}

replicationStartTime := spec.ReplicationStartTime
progress := jobspb.LogicalReplicationProgress{}
if cursor, ok := options.GetCursor(); ok {
replicationStartTime = cursor
progress.ReplicatedTime = cursor
}

if uf, ok := options.GetUserFunctions(); ok {
for i, name := range srcTableNames {
repPairs[i].DstFunctionID = uf[name]
}
}
if throwNoTTLWithCDCIgnoreError {
return pgerror.Newf(pgcode.InvalidParameterValue, "DISCARD = 'ttl-deletes' specified but no tables have changefeed-excluded TTLs")
}

// Default conflict resolution if not set will be LWW
defaultConflictResolution := jobspb.LogicalReplicationDetails_DefaultConflictResolution{
ConflictResolutionType: jobspb.LogicalReplicationDetails_DefaultConflictResolution_LWW,
Expand All @@ -276,23 +255,24 @@ func createLogicalReplicationStreamPlanHook(

jr := jobs.Record{
JobID: p.ExecCfg().JobRegistry.MakeJobID(),
Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI),
Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", resolvedDestObjects.TargetDescription(), cleanedURI),
Username: p.User(),
Details: jobspb.LogicalReplicationDetails{
StreamID: uint64(spec.StreamID),
SourceClusterID: spec.SourceClusterID,
ReplicationStartTime: replicationStartTime,
SourceClusterConnStr: string(streamAddress),
ReplicationPairs: repPairs,
SourceClusterConnStr: string(streamAddress),
TableNames: srcTableNames,
DefaultConflictResolution: defaultConflictResolution,
Discard: discard,
Mode: mode,
MetricsLabel: options.metricsLabel,
CreateTable: stmt.CreateTable,
},
Progress: progress,
}
if err := doLDRPlan(ctx, p.ExecCfg().InternalDB, p.ExecCfg().JobRegistry, jr, srcTableDescs, options.SkipSchemaCheck()); err != nil {
if err := doLDRPlan(ctx, p.User(), p.ExecCfg(), jr, spec.ExternalCatalog, resolvedDestObjects, options.SkipSchemaCheck()); err != nil {
return err
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jr.JobID))}
Expand All @@ -302,40 +282,146 @@ func createLogicalReplicationStreamPlanHook(
return fn, streamCreationHeader, nil, false, nil
}

type ResolvedDestObjects struct {
TableIDs []catid.DescID
TableNames []tree.TableName

// For CreateTable case
ParentSchemaID catid.DescID
ParentDatabaseID catid.DescID
}

func (r *ResolvedDestObjects) TargetDescription() string {
var targetDescription string
for i := range r.TableNames {
if i == 0 {
targetDescription = r.TableNames[i].FQString()
} else {
targetDescription += ", " + r.TableNames[i].FQString()
}
}
return targetDescription
}

func resolveDestinationObjects(
ctx context.Context,
p sql.PlanHookState,
destResources tree.LogicalReplicationResources,
createTable bool,
) (ResolvedDestObjects, error) {
var resolved ResolvedDestObjects
for i := range destResources.Tables {
dstObjName, err := destResources.Tables[i].ToUnresolvedObjectName(tree.NoAnnotation)
if err != nil {
return resolved, err
}
dstObjName.HasExplicitSchema()

if createTable {
_, _, resPrefix, err := resolver.ResolveTarget(ctx,
&dstObjName, p, p.SessionData().Database, p.SessionData().SearchPath)
if err != nil {
return resolved, errors.Newf("resolving target import name")
}

if resolved.ParentDatabaseID == 0 {
resolved.ParentDatabaseID = resPrefix.Database.GetID()
resolved.ParentSchemaID = resPrefix.Schema.GetID()
} else if resolved.ParentDatabaseID != resPrefix.Database.GetID() {
return resolved, errors.Newf("destination tables must all be in the same database")
} else if resolved.ParentSchemaID != resPrefix.Schema.GetID() {
return resolved, errors.Newf("destination tables must all be in the same schema")
}

tbNameWithSchema := tree.MakeTableNameWithSchema(
tree.Name(resPrefix.Database.GetName()),
tree.Name(resPrefix.Schema.GetName()),
tree.Name(dstObjName.Object()),
)
resolved.TableNames = append(resolved.TableNames, tbNameWithSchema)
} else {
dstTableName := dstObjName.ToTableName()
prefix, td, err := resolver.ResolveMutableExistingTableObject(ctx, p, &dstTableName, true, tree.ResolveRequireTableDesc)
if err != nil {
return resolved, err
}

tbNameWithSchema := tree.MakeTableNameWithSchema(
tree.Name(prefix.Database.GetName()),
tree.Name(prefix.Schema.GetName()),
tree.Name(td.GetName()),
)
resolved.TableNames = append(resolved.TableNames, tbNameWithSchema)
resolved.TableIDs = append(resolved.TableIDs, td.GetID())
}
}
return resolved, nil
}

func doLDRPlan(
ctx context.Context,
internalDB *sql.InternalDB,
jobRegistry *jobs.Registry,
user username.SQLUsername,
execCfg *sql.ExecutorConfig,
jr jobs.Record,
srcTableDescs []*descpb.TableDescriptor,
srcExternalCatalog externalpb.ExternalCatalog,
resolvedDestObjects ResolvedDestObjects,
skipSchemaCheck bool,
) error {
details := jr.Details.(jobspb.LogicalReplicationDetails)
return internalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
return execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
var (
err error
writtenDescs []catalog.Descriptor
)
if details.CreateTable {
writtenDescs, err = externalcatalog.IngestExternalCatalog(ctx, execCfg, user, srcExternalCatalog, txn, txn.Descriptors(), resolvedDestObjects.ParentDatabaseID, resolvedDestObjects.ParentSchemaID, false)
if err != nil {
return err
}
}

dstTableDescs := make([]*tabledesc.Mutable, 0, len(details.ReplicationPairs))
for _, pair := range details.ReplicationPairs {
dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(pair.DstDescriptorID))
for i := range details.ReplicationPairs {
if details.CreateTable {
// TODO: it's quite messy to set RepPairs for the CreateTable case here,
// but we need to because we only get the table IDs after ingesting the
// external catalog. It's also good to ingest the external catalog in
// same txn as validation so it's automatically rolled back if theres an
// error during validation.
//
// Instead, we could populate repPairs in this txn.
details.ReplicationPairs[i].DstDescriptorID = int32(writtenDescs[i].GetID())
}
dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(details.ReplicationPairs[i].DstDescriptorID))
if err != nil {
return err
}
dstTableDescs = append(dstTableDescs, dstTableDesc)
}

if buildutil.CrdbTestBuild {
if len(srcTableDescs) != len(dstTableDescs) {
if len(srcExternalCatalog.Tables) != len(dstTableDescs) {
return errors.AssertionFailedf("srcTableDescs and dstTableDescs should have the same length")
}
}
for i := range srcTableDescs {
err := tabledesc.CheckLogicalReplicationCompatibility(srcTableDescs[i], dstTableDescs[i].TableDesc(), skipSchemaCheck)
for i := range srcExternalCatalog.Tables {
destTableDesc := dstTableDescs[i]
if details.Mode != jobspb.LogicalReplicationDetails_Validated {
if len(destTableDesc.OutboundForeignKeys()) > 0 || len(destTableDesc.InboundForeignKeys()) > 0 {
return pgerror.Newf(pgcode.InvalidParameterValue, "foreign keys are only supported with MODE = 'validated'")
}
}

err := tabledesc.CheckLogicalReplicationCompatibility(&srcExternalCatalog.Tables[i], destTableDesc.TableDesc(), skipSchemaCheck || details.CreateTable)
if err != nil {
return err
}
}

if err := replicationutils.LockLDRTables(ctx, txn, dstTableDescs, jr.JobID); err != nil {
return err
}
if _, err := jobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
if _, err := execCfg.JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
return err
}
return nil
Expand Down
25 changes: 25 additions & 0 deletions pkg/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,31 @@ func TestLogicalStreamIngestionJobWithCursor(t *testing.T) {
dbB.CheckQueryResults(t, "SELECT * from b.tab", expectedRowsB)
}

func TestCreateTables(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderDeadlock(t)
defer log.Scope(t).Close(t)

ctx := context.Background()

tc, srv, sqlDBs, _ := setupServerWithNumDBs(t, ctx, testClusterBaseClusterArgs, 1, 1)
defer tc.Stopper().Stop(ctx)

sqlA := sqlDBs[0]
sqlA.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
aURL, cleanup := srv.PGUrl(t, serverutils.DBName("a"))
defer cleanup()

sqlA.Exec(t, "CREATE DATABASE b")
sqlB := sqlutils.MakeSQLRunner(srv.SQLConn(t, serverutils.DBName("b")))

var jobID jobspb.JobID
sqlB.QueryRow(t, "CREATE LOGICALLY REPLICATED TABLE b.tab FROM TABLE tab ON $1", aURL.String()).Scan(&jobID)

WaitUntilReplicatedTime(t, srv.Clock().Now(), sqlB, jobID)
sqlB.CheckQueryResults(t, "SELECT * FROM tab", [][]string{{"1", "hello"}})
}

// TestLogicalStreamIngestionAdvancePTS tests that the producer side pts advances
// as the destination side frontier advances.
func TestLogicalStreamIngestionAdvancePTS(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ message LogicalReplicationDetails {

Discard discard = 11;

bool create_table = 12;

// Next ID: 12.
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/externalcatalog/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
"//pkg/security/username",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/externalcatalog/externalpb",
Expand Down
Loading
Loading