Skip to content

Commit

Permalink
Merge #136243
Browse files Browse the repository at this point in the history
136243: crosscluster/logical: move job txn into helper r=msbutler a=msbutler

This pure refactor attempts to prevent any side effects from trickling into the
job creation txn.

Epic: none

Release note: none

Co-authored-by: Michael Butler <[email protected]>
  • Loading branch information
craig[bot] and msbutler committed Dec 4, 2024
2 parents dc85573 + 6dd0b69 commit 78e363f
Showing 1 changed file with 56 additions and 47 deletions.
103 changes: 56 additions & 47 deletions pkg/crosscluster/logical/create_logical_replication_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,63 +281,72 @@ func createLogicalReplicationStreamPlanHook(
defaultConflictResolution = *cr
}

var jobID jobspb.JobID
if err := p.ExecCfg().InternalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
dstTableDescs := make([]*tabledesc.Mutable, 0, len(srcTableDescs))
for _, pair := range repPairs {
dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(pair.DstDescriptorID))
if err != nil {
return err
}
dstTableDescs = append(dstTableDescs, dstTableDesc)
}
jr := jobs.Record{
JobID: p.ExecCfg().JobRegistry.MakeJobID(),
Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI),
Username: p.User(),
Details: jobspb.LogicalReplicationDetails{
StreamID: uint64(spec.StreamID),
SourceClusterID: spec.SourceClusterID,
ReplicationStartTime: replicationStartTime,
SourceClusterConnStr: string(streamAddress),
ReplicationPairs: repPairs,
TableNames: srcTableNames,
DefaultConflictResolution: defaultConflictResolution,
Discard: discard,
Mode: mode,
MetricsLabel: options.metricsLabel,
},
Progress: progress,
}
if err := doLDRPlan(ctx, p.ExecCfg().InternalDB, p.ExecCfg().JobRegistry, jr, srcTableDescs, options.SkipSchemaCheck()); err != nil {
return err
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jr.JobID))}
return nil
}

if buildutil.CrdbTestBuild {
if len(srcTableDescs) != len(dstTableDescs) {
panic("srcTableDescs and dstTableDescs should have the same length")
}
}
for i := range srcTableDescs {
err := tabledesc.CheckLogicalReplicationCompatibility(srcTableDescs[i], dstTableDescs[i].TableDesc(), options.SkipSchemaCheck())
if err != nil {
return err
}
}
return fn, streamCreationHeader, nil, false, nil
}

jr := jobs.Record{
JobID: p.ExecCfg().JobRegistry.MakeJobID(),
Description: fmt.Sprintf("LOGICAL REPLICATION STREAM into %s from %s", targetsDescription, cleanedURI),
Username: p.User(),
Details: jobspb.LogicalReplicationDetails{
StreamID: uint64(spec.StreamID),
SourceClusterID: spec.SourceClusterID,
ReplicationStartTime: replicationStartTime,
SourceClusterConnStr: string(streamAddress),
ReplicationPairs: repPairs,
TableNames: srcTableNames,
DefaultConflictResolution: defaultConflictResolution,
Discard: discard,
Mode: mode,
MetricsLabel: options.metricsLabel,
},
Progress: progress,
}
jobID = jr.JobID
if err := replicationutils.LockLDRTables(ctx, txn, dstTableDescs, jr.JobID); err != nil {
func doLDRPlan(
ctx context.Context,
internalDB *sql.InternalDB,
jobRegistry *jobs.Registry,
jr jobs.Record,
srcTableDescs []*descpb.TableDescriptor,
skipSchemaCheck bool,
) error {
details := jr.Details.(jobspb.LogicalReplicationDetails)
return internalDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
dstTableDescs := make([]*tabledesc.Mutable, 0, len(details.ReplicationPairs))
for _, pair := range details.ReplicationPairs {
dstTableDesc, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, catid.DescID(pair.DstDescriptorID))
if err != nil {
return err
}
if _, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
dstTableDescs = append(dstTableDescs, dstTableDesc)
}

if buildutil.CrdbTestBuild {
if len(srcTableDescs) != len(dstTableDescs) {
return errors.AssertionFailedf("srcTableDescs and dstTableDescs should have the same length")
}
}
for i := range srcTableDescs {
err := tabledesc.CheckLogicalReplicationCompatibility(srcTableDescs[i], dstTableDescs[i].TableDesc(), skipSchemaCheck)
if err != nil {
return err
}
}
if err := replicationutils.LockLDRTables(ctx, txn, dstTableDescs, jr.JobID); err != nil {
return err
}); err != nil {
}
if _, err := jobRegistry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn); err != nil {
return err
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
return nil
}

return fn, streamCreationHeader, nil, false, nil
})
}

func createLogicalReplicationStreamTypeCheck(
Expand Down

0 comments on commit 78e363f

Please sign in to comment.