diff --git a/pkg/crosscluster/logical/BUILD.bazel b/pkg/crosscluster/logical/BUILD.bazel index 28bd1e1ed12a..732832a9946b 100644 --- a/pkg/crosscluster/logical/BUILD.bazel +++ b/pkg/crosscluster/logical/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "//pkg/kv/kvpb", "//pkg/repstream/streampb", "//pkg/roachpb", + "//pkg/security/username", "//pkg/server/telemetry", "//pkg/settings", "//pkg/settings/cluster", @@ -43,6 +44,8 @@ go_library( "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", + "//pkg/sql/catalog/externalcatalog", + "//pkg/sql/catalog/externalcatalog/externalpb", "//pkg/sql/catalog/lease", "//pkg/sql/catalog/resolver", "//pkg/sql/catalog/tabledesc", diff --git a/pkg/crosscluster/logical/create_logical_replication_stmt.go b/pkg/crosscluster/logical/create_logical_replication_stmt.go index d7dc63631943..1236ae335843 100644 --- a/pkg/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/crosscluster/logical/create_logical_replication_stmt.go @@ -18,11 +18,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/repstream/streampb" + "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/externalcatalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/externalcatalog/externalpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" @@ -131,46 +135,11 @@ func createLogicalReplicationStreamPlanHook( return pgerror.Newf(pgcode.InvalidParameterValue, "unknown discard option %q", m) } } - - var ( - targetsDescription string - srcTableNames = make([]string, len(stmt.From.Tables)) - repPairs = make([]jobspb.LogicalReplicationDetails_ReplicationPair, len(stmt.Into.Tables)) - srcTableDescs = make([]*descpb.TableDescriptor, len(stmt.Into.Tables)) - ) - for i := range stmt.From.Tables { - - dstObjName, err := stmt.Into.Tables[i].ToUnresolvedObjectName(tree.NoAnnotation) - if err != nil { - return err - } - dstTableName := dstObjName.ToTableName() - prefix, td, err := resolver.ResolveMutableExistingTableObject(ctx, p, &dstTableName, true, tree.ResolveRequireTableDesc) - if err != nil { - return err - } - repPairs[i].DstDescriptorID = int32(td.GetID()) - - tbNameWithSchema := tree.MakeTableNameWithSchema( - tree.Name(prefix.Database.GetName()), - tree.Name(prefix.Schema.GetName()), - tree.Name(td.GetName()), - ) - - srcTableNames[i] = stmt.From.Tables[i].String() - - if i == 0 { - targetsDescription = tbNameWithSchema.FQString() - } else { - targetsDescription += ", " + tbNameWithSchema.FQString() - } - - if mode != jobspb.LogicalReplicationDetails_Validated { - if len(td.OutboundForeignKeys()) > 0 || len(td.InboundForeignKeys()) > 0 { - return pgerror.Newf(pgcode.InvalidParameterValue, "foreign keys are only supported with MODE = 'validated'") - } - } + resolvedDestObjects, err := resolveDestinationObjects(ctx, p, stmt.Into, stmt.CreateTable) + if err != nil { + return err } + if !p.ExtendedEvalContext().TxnIsSingleStmt { return errors.New("cannot CREATE LOGICAL REPLICATION STREAM in a multi-statement transaction") } @@ -210,11 +179,14 @@ func createLogicalReplicationStreamPlanHook( defer func() { _ = client.Close(ctx) }() - if err := client.Dial(ctx); err != nil { return err } + srcTableNames := make([]string, len(stmt.From.Tables)) + for i, tb := range stmt.From.Tables { + srcTableNames[i] = tb.String() + } spec, err := client.CreateForTables(ctx, &streampb.ReplicationProducerRequest{ TableNames: srcTableNames, }) @@ -233,39 +205,46 @@ func createLogicalReplicationStreamPlanHook( } crossClusterResolver := crosscluster.MakeCrossClusterTypeResolver(sourceTypes) + replicationStartTime := spec.ReplicationStartTime + progress := jobspb.LogicalReplicationProgress{} + if cursor, ok := options.GetCursor(); ok { + replicationStartTime = cursor + progress.ReplicatedTime = cursor + } + // If the user asked to ignore "ttl-deletes", make sure that at least one of // the source tables actually has a TTL job which sets the omit bit that // is used for filtering; if not, they probably forgot that step. throwNoTTLWithCDCIgnoreError := discard == jobspb.LogicalReplicationDetails_DiscardCDCIgnoredTTLDeletes + // TODO: consider moving repPair construction into doLDRPlan after dest tables are created. + repPairs := make([]jobspb.LogicalReplicationDetails_ReplicationPair, len(spec.ExternalCatalog.Tables)) for i, td := range spec.ExternalCatalog.Tables { cpy := tabledesc.NewBuilder(&td).BuildCreatedMutableTable() if err := typedesc.HydrateTypesInDescriptor(ctx, cpy, crossClusterResolver); err != nil { return err } - srcTableDescs[i] = cpy.TableDesc() + // TODO: i don't like this at all. this could be fixed if repPairs were + // populated in doLDRPlan. + spec.ExternalCatalog.Tables[i] = *cpy.TableDesc() + + if !stmt.CreateTable { + repPairs[i].DstDescriptorID = int32(resolvedDestObjects.TableIDs[i]) + } repPairs[i].SrcDescriptorID = int32(td.ID) if td.RowLevelTTL != nil && td.RowLevelTTL.DisableChangefeedReplication { throwNoTTLWithCDCIgnoreError = false } } - - if throwNoTTLWithCDCIgnoreError { - return pgerror.Newf(pgcode.InvalidParameterValue, "DISCARD = 'ttl-deletes' specified but no tables have changefeed-excluded TTLs") - } - - replicationStartTime := spec.ReplicationStartTime - progress := jobspb.LogicalReplicationProgress{} - if cursor, ok := options.GetCursor(); ok { - replicationStartTime = cursor - progress.ReplicatedTime = cursor - } - if uf, ok := options.GetUserFunctions(); ok { for i, name := range srcTableNames { repPairs[i].DstFunctionID = uf[name] } } + if throwNoTTLWithCDCIgnoreError { + return pgerror.Newf(pgcode.InvalidParameterValue, "DISCARD = 'ttl-deletes' specified but no tables have changefeed-excluded TTLs") + } + // Default conflict resolution if not set will be LWW defaultConflictResolution := jobspb.LogicalReplicationDetails_DefaultConflictResolution{ ConflictResolutionType: jobspb.LogicalReplicationDetails_DefaultConflictResolution_LWW, @@ -276,23 +255,24 @@ func createLogicalReplicationStreamPlanHook( jr := jobs.Record{ JobID: p.ExecCfg().JobRegistry.MakeJobID(), - Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI), + Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", resolvedDestObjects.TargetDescription(), cleanedURI), Username: p.User(), Details: jobspb.LogicalReplicationDetails{ StreamID: uint64(spec.StreamID), SourceClusterID: spec.SourceClusterID, ReplicationStartTime: replicationStartTime, - SourceClusterConnStr: string(streamAddress), ReplicationPairs: repPairs, + SourceClusterConnStr: string(streamAddress), TableNames: srcTableNames, DefaultConflictResolution: defaultConflictResolution, Discard: discard, Mode: mode, MetricsLabel: options.metricsLabel, + CreateTable: stmt.CreateTable, }, Progress: progress, } - if err := doLDRPlan(ctx, p.ExecCfg().InternalDB, p.ExecCfg().JobRegistry, jr, srcTableDescs, options.SkipSchemaCheck()); err != nil { + if err := doLDRPlan(ctx, p.User(), p.ExecCfg(), jr, spec.ExternalCatalog, resolvedDestObjects, options.SkipSchemaCheck()); err != nil { return err } resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jr.JobID))} @@ -302,19 +282,117 @@ func createLogicalReplicationStreamPlanHook( return fn, streamCreationHeader, nil, false, nil } +type ResolvedDestObjects struct { + TableIDs []catid.DescID + TableNames []tree.TableName + + // For CreateTable case + ParentSchemaID catid.DescID + ParentDatabaseID catid.DescID +} + +func (r *ResolvedDestObjects) TargetDescription() string { + var targetDescription string + for i := range r.TableNames { + if i == 0 { + targetDescription = r.TableNames[i].FQString() + } else { + targetDescription += ", " + r.TableNames[i].FQString() + } + } + return targetDescription +} + +func resolveDestinationObjects( + ctx context.Context, + p sql.PlanHookState, + destResources tree.LogicalReplicationResources, + createTable bool, +) (ResolvedDestObjects, error) { + var resolved ResolvedDestObjects + for i := range destResources.Tables { + dstObjName, err := destResources.Tables[i].ToUnresolvedObjectName(tree.NoAnnotation) + if err != nil { + return resolved, err + } + dstObjName.HasExplicitSchema() + + if createTable { + _, _, resPrefix, err := resolver.ResolveTarget(ctx, + &dstObjName, p, p.SessionData().Database, p.SessionData().SearchPath) + if err != nil { + return resolved, errors.Newf("resolving target import name") + } + + if resolved.ParentDatabaseID == 0 { + resolved.ParentDatabaseID = resPrefix.Database.GetID() + resolved.ParentSchemaID = resPrefix.Schema.GetID() + } else if resolved.ParentDatabaseID != resPrefix.Database.GetID() { + return resolved, errors.Newf("destination tables must all be in the same database") + } else if resolved.ParentSchemaID != resPrefix.Schema.GetID() { + return resolved, errors.Newf("destination tables must all be in the same schema") + } + + tbNameWithSchema := tree.MakeTableNameWithSchema( + tree.Name(resPrefix.Database.GetName()), + tree.Name(resPrefix.Schema.GetName()), + tree.Name(dstObjName.Object()), + ) + resolved.TableNames = append(resolved.TableNames, tbNameWithSchema) + } else { + dstTableName := dstObjName.ToTableName() + prefix, td, err := resolver.ResolveMutableExistingTableObject(ctx, p, &dstTableName, true, tree.ResolveRequireTableDesc) + if err != nil { + return resolved, err + } + + tbNameWithSchema := tree.MakeTableNameWithSchema( + tree.Name(prefix.Database.GetName()), + tree.Name(prefix.Schema.GetName()), + tree.Name(td.GetName()), + ) + resolved.TableNames = append(resolved.TableNames, tbNameWithSchema) + resolved.TableIDs = append(resolved.TableIDs, td.GetID()) + } + } + return resolved, nil +} + func doLDRPlan( ctx context.Context, - internalDB *sql.InternalDB, - jobRegistry *jobs.Registry, + user username.SQLUsername, + execCfg *sql.ExecutorConfig, jr jobs.Record, - srcTableDescs []*descpb.TableDescriptor, + srcExternalCatalog externalpb.ExternalCatalog, + resolvedDestObjects ResolvedDestObjects, skipSchemaCheck bool, ) error { details := jr.Details.(jobspb.LogicalReplicationDetails) - return internalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + return execCfg.InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + var ( + err error + writtenDescs []catalog.Descriptor + ) + if details.CreateTable { + writtenDescs, err = externalcatalog.IngestExternalCatalog(ctx, execCfg, user, srcExternalCatalog, txn, txn.Descriptors(), resolvedDestObjects.ParentDatabaseID, resolvedDestObjects.ParentSchemaID, false) + if err != nil { + return err + } + } + dstTableDescs := make([]*tabledesc.Mutable, 0, len(details.ReplicationPairs)) - for _, pair := range details.ReplicationPairs { - dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(pair.DstDescriptorID)) + for i := range details.ReplicationPairs { + if details.CreateTable { + // TODO: it's quite messy to set RepPairs for the CreateTable case here, + // but we need to because we only get the table IDs after ingesting the + // external catalog. It's also good to ingest the external catalog in + // same txn as validation so it's automatically rolled back if theres an + // error during validation. + // + // Instead, we could populate repPairs in this txn. + details.ReplicationPairs[i].DstDescriptorID = int32(writtenDescs[i].GetID()) + } + dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(details.ReplicationPairs[i].DstDescriptorID)) if err != nil { return err } @@ -322,20 +400,28 @@ func doLDRPlan( } if buildutil.CrdbTestBuild { - if len(srcTableDescs) != len(dstTableDescs) { + if len(srcExternalCatalog.Tables) != len(dstTableDescs) { return errors.AssertionFailedf("srcTableDescs and dstTableDescs should have the same length") } } - for i := range srcTableDescs { - err := tabledesc.CheckLogicalReplicationCompatibility(srcTableDescs[i], dstTableDescs[i].TableDesc(), skipSchemaCheck) + for i := range srcExternalCatalog.Tables { + destTableDesc := dstTableDescs[i] + if details.Mode != jobspb.LogicalReplicationDetails_Validated { + if len(destTableDesc.OutboundForeignKeys()) > 0 || len(destTableDesc.InboundForeignKeys()) > 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, "foreign keys are only supported with MODE = 'validated'") + } + } + + err := tabledesc.CheckLogicalReplicationCompatibility(&srcExternalCatalog.Tables[i], destTableDesc.TableDesc(), skipSchemaCheck || details.CreateTable) if err != nil { return err } } + if err := replicationutils.LockLDRTables(ctx, txn, dstTableDescs, jr.JobID); err != nil { return err } - if _, err := jobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil { + if _, err := execCfg.JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil { return err } return nil diff --git a/pkg/crosscluster/logical/logical_replication_job_test.go b/pkg/crosscluster/logical/logical_replication_job_test.go index c41c8830587d..1431cb966c41 100644 --- a/pkg/crosscluster/logical/logical_replication_job_test.go +++ b/pkg/crosscluster/logical/logical_replication_job_test.go @@ -376,6 +376,31 @@ func TestLogicalStreamIngestionJobWithCursor(t *testing.T) { dbB.CheckQueryResults(t, "SELECT * from b.tab", expectedRowsB) } +func TestCreateTables(t *testing.T) { + defer leaktest.AfterTest(t)() + skip.UnderDeadlock(t) + defer log.Scope(t).Close(t) + + ctx := context.Background() + + tc, srv, sqlDBs, _ := setupServerWithNumDBs(t, ctx, testClusterBaseClusterArgs, 1, 1) + defer tc.Stopper().Stop(ctx) + + sqlA := sqlDBs[0] + sqlA.Exec(t, "INSERT INTO tab VALUES (1, 'hello')") + aURL, cleanup := srv.PGUrl(t, serverutils.DBName("a")) + defer cleanup() + + sqlA.Exec(t, "CREATE DATABASE b") + sqlB := sqlutils.MakeSQLRunner(srv.SQLConn(t, serverutils.DBName("b"))) + + var jobID jobspb.JobID + sqlB.QueryRow(t, "CREATE LOGICALLY REPLICATED TABLE b.tab FROM TABLE tab ON $1", aURL.String()).Scan(&jobID) + + WaitUntilReplicatedTime(t, srv.Clock().Now(), sqlB, jobID) + sqlB.CheckQueryResults(t, "SELECT * FROM tab", [][]string{{"1", "hello"}}) +} + // TestLogicalStreamIngestionAdvancePTS tests that the producer side pts advances // as the destination side frontier advances. func TestLogicalStreamIngestionAdvancePTS(t *testing.T) { diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index be215c646d7a..22bb79b89275 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -273,6 +273,8 @@ message LogicalReplicationDetails { Discard discard = 11; + bool create_table = 12; + // Next ID: 12. } diff --git a/pkg/sql/catalog/externalcatalog/BUILD.bazel b/pkg/sql/catalog/externalcatalog/BUILD.bazel index 438e261a409b..acda90db61ab 100644 --- a/pkg/sql/catalog/externalcatalog/BUILD.bazel +++ b/pkg/sql/catalog/externalcatalog/BUILD.bazel @@ -36,6 +36,7 @@ go_test( "//pkg/security/username", "//pkg/server", "//pkg/sql", + "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/externalcatalog/externalpb", diff --git a/pkg/sql/catalog/externalcatalog/external_catalog.go b/pkg/sql/catalog/externalcatalog/external_catalog.go index b4b4f7823d82..877a73c4ce54 100644 --- a/pkg/sql/catalog/externalcatalog/external_catalog.go +++ b/pkg/sql/catalog/externalcatalog/external_catalog.go @@ -102,10 +102,12 @@ func IngestExternalCatalog( databaseID descpb.ID, schemaID descpb.ID, setOffline bool, -) error { +) ([]catalog.Descriptor, error) { + + written := make([]catalog.Descriptor, 0, len(externalCatalog.Tables)) dbDesc, err := descsCol.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Database(ctx, databaseID) if err != nil { - return err + return written, err } tablesToWrite := make([]catalog.TableDescriptor, 0, len(externalCatalog.Tables)) var originalParentID descpb.ID @@ -113,11 +115,11 @@ func IngestExternalCatalog( if originalParentID == 0 { originalParentID = table.ParentID } else if originalParentID != table.ParentID { - return errors.New("all tables must belong to the same parent") + return written, errors.New("all tables must belong to the same parent") } newID, err := execCfg.DescIDGenerator.GenerateUniqueDescID(ctx) if err != nil { - return err + return written, err } // TODO: rewrite the tables to fresh ids. mutTable := tabledesc.NewBuilder(&table).BuildCreatedMutableTable() @@ -131,8 +133,9 @@ func IngestExternalCatalog( mutTable.Version = 1 mutTable.ID = newID tablesToWrite = append(tablesToWrite, mutTable) + written = append(written, mutTable) } - return ingesting.WriteDescriptors( + return written, ingesting.WriteDescriptors( ctx, txn.KV(), user, descsCol, nil, nil, tablesToWrite, nil, nil, tree.RequestedDescriptors, nil /* extra */, "", true, ) diff --git a/pkg/sql/catalog/externalcatalog/external_catalog_test.go b/pkg/sql/catalog/externalcatalog/external_catalog_test.go index 196c5ef49f19..6242120ea493 100644 --- a/pkg/sql/catalog/externalcatalog/external_catalog_test.go +++ b/pkg/sql/catalog/externalcatalog/external_catalog_test.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/externalcatalog/externalpb" @@ -98,11 +99,15 @@ func TestExtractIngestExternalCatalog(t *testing.T) { })) require.ErrorContains(t, sql.TestingDescsTxn(ctx, srv, func(ctx context.Context, txn isql.Txn, col *descs.Collection) error { - return IngestExternalCatalog(ctx, &execCfg, sqlUser, sadCatalog, txn, col, parentID, schemaID, false) + _, err := IngestExternalCatalog(ctx, &execCfg, sqlUser, sadCatalog, txn, col, parentID, schemaID, false) + return err }), "invalid inbound foreign key") + var written []catalog.Descriptor require.NoError(t, sql.TestingDescsTxn(ctx, srv, func(ctx context.Context, txn isql.Txn, col *descs.Collection) error { - return IngestExternalCatalog(ctx, &execCfg, sqlUser, ingestableCatalog, txn, col, parentID, schemaID, false) + written, err = IngestExternalCatalog(ctx, &execCfg, sqlUser, ingestableCatalog, txn, col, parentID, schemaID, false) + return err })) + require.Equal(t, 2, len(written)) sqlDB.CheckQueryResults(t, "SELECT schema_name,table_name FROM [SHOW TABLES]", [][]string{{"public", "tab1"}, {"public", "tab2"}}) }