From 4406030f02e1a449ce7aa0deac9c101d8452062d Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 26 Nov 2024 15:26:58 -0500 Subject: [PATCH 1/2] crosscluster/logical: create job record outside txn Epic: none Release note: none --- .../create_logical_replication_stmt.go | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/pkg/crosscluster/logical/create_logical_replication_stmt.go b/pkg/crosscluster/logical/create_logical_replication_stmt.go index ef917218a666..3a3dfe8f8dbe 100644 --- a/pkg/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/crosscluster/logical/create_logical_replication_stmt.go @@ -281,7 +281,25 @@ func createLogicalReplicationStreamPlanHook( defaultConflictResolution = *cr } - var jobID jobspb.JobID + jr := jobs.Record{ + JobID: p.ExecCfg().JobRegistry.MakeJobID(), + Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI), + Username: p.User(), + Details: jobspb.LogicalReplicationDetails{ + StreamID: uint64(spec.StreamID), + SourceClusterID: spec.SourceClusterID, + ReplicationStartTime: replicationStartTime, + SourceClusterConnStr: string(streamAddress), + ReplicationPairs: repPairs, + TableNames: srcTableNames, + DefaultConflictResolution: defaultConflictResolution, + Discard: discard, + Mode: mode, + MetricsLabel: options.metricsLabel, + }, + Progress: progress, + } + if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { dstTableDescs := make([]*tabledesc.Mutable, 0, len(srcTableDescs)) for _, pair := range repPairs { @@ -304,25 +322,6 @@ func createLogicalReplicationStreamPlanHook( } } - jr := jobs.Record{ - JobID: p.ExecCfg().JobRegistry.MakeJobID(), - Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI), - Username: p.User(), - Details: jobspb.LogicalReplicationDetails{ - StreamID: uint64(spec.StreamID), - SourceClusterID: spec.SourceClusterID, - ReplicationStartTime: replicationStartTime, - SourceClusterConnStr: string(streamAddress), - ReplicationPairs: repPairs, - TableNames: srcTableNames, - DefaultConflictResolution: defaultConflictResolution, - Discard: discard, - Mode: mode, - MetricsLabel: options.metricsLabel, - }, - Progress: progress, - } - jobID = jr.JobID if err := replicationutils.LockLDRTables(ctx, txn, dstTableDescs, jr.JobID); err != nil { return err } @@ -333,7 +332,7 @@ func createLogicalReplicationStreamPlanHook( }); err != nil { return err } - resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))} + resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jr.JobID))} return nil } From 6dd0b69636e57b61fde6edb86fce5382e98eca9d Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Tue, 26 Nov 2024 15:30:11 -0500 Subject: [PATCH 2/2] crosscluster/logical: move job txn into helper This pure refactor attempts to prevent any side effects from trickling into the job creation txn. Epic: none Release note: none --- .../create_logical_replication_stmt.go | 64 +++++++++++-------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/pkg/crosscluster/logical/create_logical_replication_stmt.go b/pkg/crosscluster/logical/create_logical_replication_stmt.go index 3a3dfe8f8dbe..26f0c2e4af5a 100644 --- a/pkg/crosscluster/logical/create_logical_replication_stmt.go +++ b/pkg/crosscluster/logical/create_logical_replication_stmt.go @@ -299,44 +299,54 @@ func createLogicalReplicationStreamPlanHook( }, Progress: progress, } + if err := doLDRPlan(ctx, p.ExecCfg().InternalDB, p.ExecCfg().JobRegistry, jr, srcTableDescs, options.SkipSchemaCheck()); err != nil { + return err + } + resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jr.JobID))} + return nil + } - if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { - dstTableDescs := make([]*tabledesc.Mutable, 0, len(srcTableDescs)) - for _, pair := range repPairs { - dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(pair.DstDescriptorID)) - if err != nil { - return err - } - dstTableDescs = append(dstTableDescs, dstTableDesc) - } + return fn, streamCreationHeader, nil, false, nil +} - if buildutil.CrdbTestBuild { - if len(srcTableDescs) != len(dstTableDescs) { - panic("srcTableDescs and dstTableDescs should have the same length") - } - } - for i := range srcTableDescs { - err := tabledesc.CheckLogicalReplicationCompatibility(srcTableDescs[i], dstTableDescs[i].TableDesc(), options.SkipSchemaCheck()) - if err != nil { - return err - } +func doLDRPlan( + ctx context.Context, + internalDB *sql.InternalDB, + jobRegistry *jobs.Registry, + jr jobs.Record, + srcTableDescs []*descpb.TableDescriptor, + skipSchemaCheck bool, +) error { + details := jr.Details.(jobspb.LogicalReplicationDetails) + return internalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error { + dstTableDescs := make([]*tabledesc.Mutable, 0, len(details.ReplicationPairs)) + for _, pair := range details.ReplicationPairs { + dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(pair.DstDescriptorID)) + if err != nil { + return err } + dstTableDescs = append(dstTableDescs, dstTableDesc) + } - if err := replicationutils.LockLDRTables(ctx, txn, dstTableDescs, jr.JobID); err != nil { - return err + if buildutil.CrdbTestBuild { + if len(srcTableDescs) != len(dstTableDescs) { + return errors.AssertionFailedf("srcTableDescs and dstTableDescs should have the same length") } - if _, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil { + } + for i := range srcTableDescs { + err := tabledesc.CheckLogicalReplicationCompatibility(srcTableDescs[i], dstTableDescs[i].TableDesc(), skipSchemaCheck) + if err != nil { return err } + } + if err := replicationutils.LockLDRTables(ctx, txn, dstTableDescs, jr.JobID); err != nil { return err - }); err != nil { + } + if _, err := jobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil { return err } - resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jr.JobID))} return nil - } - - return fn, streamCreationHeader, nil, false, nil + }) } func createLogicalReplicationStreamTypeCheck(