Skip to content

Commit

Permalink
sql: remove Txn() from JobExecContext
Browse files Browse the repository at this point in the history
This removes Txn() from JobExecContext. This method would always
return a nil context, making it a bit error prone to actually use in
jobs.

Release note: None
  • Loading branch information
stevendanna committed Nov 22, 2022
1 parent e312f62 commit d640759
Show file tree
Hide file tree
Showing 9 changed files with 23 additions and 27 deletions.
23 changes: 13 additions & 10 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,16 +473,19 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// TODO(adityamaru: Break this code block into helper methods.
if details.URI == "" {
initialDetails := details
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), p.Txn(), details, p.User(), backupDest,
)
if err != nil {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), txn, details, p.User(), backupDest,
)
if err != nil {
return err
}
details = backupDetails
backupManifest = &m
return nil
}); err != nil {
return err
}
details = backupDetails
// Reset backupDetails so nobody accidentally uses it.
backupDetails = jobspb.BackupDetails{} //lint:ignore SA4006 intentionally clearing so no one uses this.
backupManifest = &m

// Now that we have resolved the details, and manifest, write a protected
// timestamp record on the backup's target spans/schema object.
Expand All @@ -497,7 +500,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
if details.ProtectedTimestampRecord != nil {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return protectTimestampForBackup(
ctx, p.ExecCfg(), txn, b.job.ID(), m, details,
ctx, p.ExecCfg(), txn, b.job.ID(), backupManifest, details,
)
}); err != nil {
return err
Expand Down Expand Up @@ -563,7 +566,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
lic := utilccl.CheckEnterpriseEnabled(
p.ExecCfg().Settings, p.ExecCfg().NodeInfo.LogicalClusterID(), "",
) != nil
collectTelemetry(ctx, m, initialDetails, details, lic, b.job.ID())
collectTelemetry(ctx, backupManifest, initialDetails, details, lic, b.job.ID())
}

// For all backups, partitioned or not, the main BACKUP manifest is stored at
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func logAndSanitizeBackupDestinations(ctx context.Context, backupDestinations ..

func collectTelemetry(
ctx context.Context,
backupManifest backuppb.BackupManifest,
backupManifest *backuppb.BackupManifest,
initialDetails, backupDetails jobspb.BackupDetails,
licensed bool,
jobID jobspb.JobID,
Expand Down Expand Up @@ -1156,7 +1156,7 @@ func getReintroducedSpans(
return tableSpans, nil
}

func getProtectedTimestampTargetForBackup(backupManifest backuppb.BackupManifest) *ptpb.Target {
func getProtectedTimestampTargetForBackup(backupManifest *backuppb.BackupManifest) *ptpb.Target {
if backupManifest.DescriptorCoverage == tree.AllDescriptors {
return ptpb.MakeClusterTarget()
}
Expand Down Expand Up @@ -1195,7 +1195,7 @@ func protectTimestampForBackup(
execCfg *sql.ExecutorConfig,
txn *kv.Txn,
jobID jobspb.JobID,
backupManifest backuppb.BackupManifest,
backupManifest *backuppb.BackupManifest,
backupDetails jobspb.BackupDetails,
) error {
tsToProtect := backupManifest.EndTime
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/changefeedccl/cdceval/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,5 @@ func constrainSpansBySelectClause(
}

func schemaTS(execCtx sql.JobExecContext) hlc.Timestamp {
if execCtx.Txn() != nil {
return execCtx.Txn().ReadTimestamp()
}
return execCtx.ExecCfg().Clock.Now()
}
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdceval/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
// buffer using cdceval.AsStringUnredacted.
func NormalizeAndValidateSelectForTarget(
ctx context.Context,
execCtx sql.JobExecContext,
execCtx sql.PlanHookState,
desc catalog.TableDescriptor,
target jobspb.ChangefeedTargetSpecification,
sc *tree.SelectClause,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdceval/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestNormalizeAndValidate(t *testing.T) {
SearchPath: sessiondata.DefaultSearchPath.GetPathArray(),
})
defer cleanup()
execCtx := p.(sql.JobExecContext)
execCtx := p.(sql.PlanHookState)

for _, tc := range []struct {
name string
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestSelectClauseRequiresPrev(t *testing.T) {
SearchPath: sessiondata.DefaultSearchPath.GetPathArray(),
})
defer cleanup()
execCtx := p.(sql.JobExecContext)
execCtx := p.(sql.PlanHookState)

for _, tc := range []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ func validateDetailsAndOptions(
// This method modifies passed in select clause to reflect normalization step.
func validateAndNormalizeChangefeedExpression(
ctx context.Context,
execCtx sql.JobExecContext,
execCtx sql.PlanHookState,
sc *tree.SelectClause,
descriptors map[tree.TablePattern]catalog.Descriptor,
targets []jobspb.ChangefeedTargetSpecification,
Expand Down Expand Up @@ -977,7 +977,7 @@ func (b *changefeedResumer) handleChangefeedError(
const errorFmt = "job failed (%v) but is being paused because of %s=%s"
errorMessage := fmt.Sprintf(errorFmt, changefeedErr,
changefeedbase.OptOnError, changefeedbase.OptOnErrorPause)
return b.job.PauseRequested(ctx, jobExec.Txn(), func(ctx context.Context,
return b.job.PauseRequested(ctx, nil /* txn */, func(ctx context.Context,
planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error {
err := b.OnPauseRequest(ctx, jobExec, txn, progress)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6861,7 +6861,7 @@ func normalizeCDCExpression(t *testing.T, execCfgI interface{}, exprStr string)
})
defer cleanup()

execCtx := p.(sql.JobExecContext)
execCtx := p.(sql.PlanHookState)
_, _, err = cdceval.NormalizeAndValidateSelectForTarget(
context.Background(), execCtx, desc, target, sc, false, false, false,
)
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,7 @@ func (s *streamIngestionResumer) handleResumeError(
// The ingestion job is paused but the producer job will keep
// running until it times out. Users can still resume ingestion before
// the producer job times out.
jobExecCtx := execCtx.(sql.JobExecContext)
return s.job.PauseRequested(resumeCtx, jobExecCtx.Txn(), func(ctx context.Context,
return s.job.PauseRequested(resumeCtx, nil /* txn */, func(ctx context.Context,
planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress) error {
progress.RunningStatus = errorMessage
return nil
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/job_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
Expand Down Expand Up @@ -70,7 +69,6 @@ func (e *plannerJobExecContext) MigrationJobDeps() upgrade.JobDeps {
func (e *plannerJobExecContext) SpanConfigReconciler() spanconfig.Reconciler {
return e.p.SpanConfigReconciler()
}
func (e *plannerJobExecContext) Txn() *kv.Txn { return e.p.Txn() }

// ConstrainPrimaryIndexSpanByExpr implements SpanConstrainer
func (e *plannerJobExecContext) ConstrainPrimaryIndexSpanByExpr(
Expand Down Expand Up @@ -106,5 +104,4 @@ type JobExecContext interface {
User() username.SQLUsername
MigrationJobDeps() upgrade.JobDeps
SpanConfigReconciler() spanconfig.Reconciler
Txn() *kv.Txn
}

0 comments on commit d640759

Please sign in to comment.