Skip to content

Commit

Permalink
crosscluster/logical: move job txn into helper
Browse files Browse the repository at this point in the history
This pure refactor attempts to prevent any side effects from trickling into the
job creation txn.

Epic: none

Release note: none
  • Loading branch information
msbutler committed Dec 3, 2024
1 parent 4406030 commit 6dd0b69
Showing 1 changed file with 37 additions and 27 deletions.
64 changes: 37 additions & 27 deletions pkg/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,44 +299,54 @@ func createLogicalReplicationStreamPlanHook(
},
Progress: progress,
}
if err := doLDRPlan(ctx, p.ExecCfg().InternalDB, p.ExecCfg().JobRegistry, jr, srcTableDescs, options.SkipSchemaCheck()); err != nil {
return err
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jr.JobID))}
return nil
}

if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
dstTableDescs := make([]*tabledesc.Mutable, 0, len(srcTableDescs))
for _, pair := range repPairs {
dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(pair.DstDescriptorID))
if err != nil {
return err
}
dstTableDescs = append(dstTableDescs, dstTableDesc)
}
return fn, streamCreationHeader, nil, false, nil
}

if buildutil.CrdbTestBuild {
if len(srcTableDescs) != len(dstTableDescs) {
panic("srcTableDescs and dstTableDescs should have the same length")
}
}
for i := range srcTableDescs {
err := tabledesc.CheckLogicalReplicationCompatibility(srcTableDescs[i], dstTableDescs[i].TableDesc(), options.SkipSchemaCheck())
if err != nil {
return err
}
func doLDRPlan(
ctx context.Context,
internalDB *sql.InternalDB,
jobRegistry *jobs.Registry,
jr jobs.Record,
srcTableDescs []*descpb.TableDescriptor,
skipSchemaCheck bool,
) error {
details := jr.Details.(jobspb.LogicalReplicationDetails)
return internalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
dstTableDescs := make([]*tabledesc.Mutable, 0, len(details.ReplicationPairs))
for _, pair := range details.ReplicationPairs {
dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(pair.DstDescriptorID))
if err != nil {
return err
}
dstTableDescs = append(dstTableDescs, dstTableDesc)
}

if err := replicationutils.LockLDRTables(ctx, txn, dstTableDescs, jr.JobID); err != nil {
return err
if buildutil.CrdbTestBuild {
if len(srcTableDescs) != len(dstTableDescs) {
return errors.AssertionFailedf("srcTableDescs and dstTableDescs should have the same length")
}
if _, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
}
for i := range srcTableDescs {
err := tabledesc.CheckLogicalReplicationCompatibility(srcTableDescs[i], dstTableDescs[i].TableDesc(), skipSchemaCheck)
if err != nil {
return err
}
}
if err := replicationutils.LockLDRTables(ctx, txn, dstTableDescs, jr.JobID); err != nil {
return err
}); err != nil {
}
if _, err := jobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
return err
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jr.JobID))}
return nil
}

return fn, streamCreationHeader, nil, false, nil
})
}

func createLogicalReplicationStreamTypeCheck(
Expand Down

0 comments on commit 6dd0b69

Please sign in to comment.