Skip to content

Commit

Permalink
Merge #93218
Browse files Browse the repository at this point in the history
93218: *,isql: introduce isql.Txn and isql.DB r=ajwerner a=ajwerner

This massive refactor works to bind the `*kv.Txn` with the internal executor and other sql-layer, txn-associated state. This work follows on from an earlier project to more tightly couple internal executors to the rest of `extraTxnState`. That project resulted in sprawling changes to propagated the paired dependencies through the system. In practice, we're better off coupling them through an object.

There are some refactors added in here to curry and hide some of these dependencies from interfaces. Those may be possible to extract to be separate.

Additionally, not all of the dependency sprawl has been eliminated; there are cases where we could pass a `isql.Txn` but instead keep passing the underlying `isql.Executor` and `*kv.Txn`. We can do more cleanup along the way.

Lastly, I couldn't help myself from lifting some `sql.ExecCfg` arguments up and being more specific in some places.

Epic: none

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Jan 18, 2023
2 parents 075b319 + 2f81142 commit 4066912
Show file tree
Hide file tree
Showing 484 changed files with 6,528 additions and 6,661 deletions.
4 changes: 2 additions & 2 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,7 @@ GO_TARGETS = [
"//pkg/sql/importer:importer_test",
"//pkg/sql/inverted:inverted",
"//pkg/sql/inverted:inverted_test",
"//pkg/sql/isql:isql",
"//pkg/sql/lex:lex",
"//pkg/sql/lex:lex_test",
"//pkg/sql/lexbase/allkeywords:allkeywords",
Expand Down Expand Up @@ -1856,7 +1857,6 @@ GO_TARGETS = [
"//pkg/sql/sqltelemetry:sqltelemetry",
"//pkg/sql/sqltestutils:sqltestutils",
"//pkg/sql/sqltestutils:sqltestutils_test",
"//pkg/sql/sqlutil:sqlutil",
"//pkg/sql/stats/bounds:bounds",
"//pkg/sql/stats:stats",
"//pkg/sql/stats:stats_test",
Expand Down Expand Up @@ -2798,6 +2798,7 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/idxusage:get_x_data",
"//pkg/sql/importer:get_x_data",
"//pkg/sql/inverted:get_x_data",
"//pkg/sql/isql:get_x_data",
"//pkg/sql/lex:get_x_data",
"//pkg/sql/lexbase:get_x_data",
"//pkg/sql/lexbase/allkeywords:get_x_data",
Expand Down Expand Up @@ -2978,7 +2979,6 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/sqlstats/ssmemstorage:get_x_data",
"//pkg/sql/sqltelemetry:get_x_data",
"//pkg/sql/sqltestutils:get_x_data",
"//pkg/sql/sqlutil:get_x_data",
"//pkg/sql/stats:get_x_data",
"//pkg/sql/stats/bounds:get_x_data",
"//pkg/sql/stmtdiagnostics:get_x_data",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand All @@ -115,7 +116,6 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/sql/syntheticprivilege",
"//pkg/sql/types",
Expand Down Expand Up @@ -251,6 +251,7 @@ go_test(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/randgen",
Expand All @@ -259,7 +260,6 @@ go_test(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/storage",
"//pkg/testutils",
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/alter_backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ func doAlterBackupPlan(
}

ioConf := baseStore.ExternalIOConf()
kmsEnv := backupencryption.MakeBackupKMSEnv(baseStore.Settings(), &ioConf, p.ExecCfg().DB,
p.User(), p.ExecCfg().InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
baseStore.Settings(), &ioConf, p.ExecCfg().InternalDB, p.User(),
)

// Check that at least one of the old keys has been used to encrypt the backup in the past.
// Use the first one that works to decrypt the ENCRYPTION-INFO file(s).
Expand Down
25 changes: 14 additions & 11 deletions pkg/ccl/backupccl/alter_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ func loadSchedules(
}

execCfg := p.ExecCfg()
env := sql.JobSchedulerEnv(execCfg)
schedule, err := jobs.LoadScheduledJob(ctx, env, int64(scheduleID), execCfg.InternalExecutor, p.Txn())
env := sql.JobSchedulerEnv(execCfg.JobsKnobs())
schedules := jobs.ScheduledJobTxn(p.InternalSQLTxn())
schedule, err := schedules.Load(ctx, env, int64(scheduleID))
if err != nil {
return s, err
}
Expand All @@ -74,7 +75,7 @@ func loadSchedules(
var dependentStmt *tree.Backup

if args.DependentScheduleID != 0 {
dependentSchedule, err = jobs.LoadScheduledJob(ctx, env, args.DependentScheduleID, execCfg.InternalExecutor, p.Txn())
dependentSchedule, err = schedules.Load(ctx, env, args.DependentScheduleID)
if err != nil {
return scheduleDetails{}, err
}
Expand Down Expand Up @@ -188,10 +189,11 @@ func doAlterBackupSchedules(
if err != nil {
return err
}
scheduledJobs := jobs.ScheduledJobTxn(p.InternalSQLTxn())
s.fullJob.SetExecutionDetails(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: fullAny})
if err := s.fullJob.Update(ctx, p.ExecCfg().InternalExecutor, p.Txn()); err != nil {
if err := scheduledJobs.Update(ctx, s.fullJob); err != nil {
return err
}

Expand All @@ -204,7 +206,8 @@ func doAlterBackupSchedules(
s.incJob.SetExecutionDetails(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: incAny})
if err := s.incJob.Update(ctx, p.ExecCfg().InternalExecutor, p.Txn()); err != nil {

if err := scheduledJobs.Update(ctx, s.incJob); err != nil {
return err
}

Expand Down Expand Up @@ -382,8 +385,8 @@ func processFullBackupRecurrence(
return s, nil
}

env := sql.JobSchedulerEnv(p.ExecCfg())
ex := p.ExecCfg().InternalExecutor
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
scheduledJobs := jobs.ScheduledJobTxn(p.InternalSQLTxn())
if fullBackupAlways {
if s.incJob == nil {
// Nothing to do.
Expand All @@ -396,7 +399,7 @@ func processFullBackupRecurrence(
}
s.fullArgs.DependentScheduleID = 0
s.fullArgs.UnpauseOnSuccess = 0
if err := s.incJob.Delete(ctx, ex, p.Txn()); err != nil {
if err := scheduledJobs.Delete(ctx, s.incJob); err != nil {
return scheduleDetails{}, err
}
s.incJob = nil
Expand Down Expand Up @@ -453,7 +456,7 @@ func processFullBackupRecurrence(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: incAny})

if err := s.incJob.Create(ctx, ex, p.Txn()); err != nil {
if err := scheduledJobs.Create(ctx, s.incJob); err != nil {
return scheduleDetails{}, err
}
s.fullArgs.UnpauseOnSuccess = s.incJob.ScheduleID()
Expand All @@ -480,7 +483,7 @@ func validateFullIncrementalFrequencies(p sql.PlanHookState, s scheduleDetails)
if s.incJob == nil {
return nil
}
env := sql.JobSchedulerEnv(p.ExecCfg())
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
now := env.Now()

fullFreq, err := frequencyFromCron(now, s.fullJob.ScheduleExpr())
Expand Down Expand Up @@ -536,7 +539,7 @@ func processInto(p sql.PlanHookState, spec *alterBackupScheduleSpec, s scheduleD

// Kick off a full backup immediately so we can unpause incrementals.
// This mirrors the behavior of CREATE SCHEDULE FOR BACKUP.
env := sql.JobSchedulerEnv(p.ExecCfg())
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
s.fullJob.SetNextRun(env.Now())

return nil
Expand Down
89 changes: 54 additions & 35 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
Expand All @@ -44,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
Expand Down Expand Up @@ -140,9 +140,12 @@ func backup(
var lastCheckpoint time.Time

var completedSpans, completedIntroducedSpans []roachpb.Span
kmsEnv := backupencryption.MakeBackupKMSEnv(execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig, execCtx.ExecCfg().DB, execCtx.User(),
execCtx.ExecCfg().InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig,
execCtx.ExecCfg().InternalDB,
execCtx.User(),
)
// TODO(benesch): verify these files, rather than accepting them as truth
// blindly.
// No concurrency yet, so these assignments are safe.
Expand Down Expand Up @@ -369,13 +372,13 @@ func backup(
}

func releaseProtectedTimestamp(
ctx context.Context, txn *kv.Txn, pts protectedts.Storage, ptsID *uuid.UUID,
ctx context.Context, pts protectedts.Storage, ptsID *uuid.UUID,
) error {
// If the job doesn't have a protected timestamp then there's nothing to do.
if ptsID == nil {
return nil
}
err := pts.Release(ctx, txn, *ptsID)
err := pts.Release(ctx, *ptsID)
if errors.Is(err, protectedts.ErrNotExists) {
// No reason to return an error which might cause problems if it doesn't
// seem to exist.
Expand Down Expand Up @@ -447,8 +450,12 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// The span is finished by the registry executing the job.
details := b.job.Details().(jobspb.BackupDetails)
p := execCtx.(sql.JobExecContext)
kmsEnv := backupencryption.MakeBackupKMSEnv(p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig, p.ExecCfg().DB, p.User(), p.ExecCfg().InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig,
p.ExecCfg().InternalDB,
p.User(),
)

// Resolve the backup destination. We can skip this step if we
// have already resolved and persisted the destination either
Expand Down Expand Up @@ -511,9 +518,10 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// details and manifest in a prior resumption.
//
// TODO(adityamaru: Break this code block into helper methods.
insqlDB := p.ExecCfg().InternalDB
if details.URI == "" {
initialDetails := details
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := insqlDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), txn, details, p.User(), backupDest,
)
Expand All @@ -538,9 +546,12 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
details.ProtectedTimestampRecord = &protectedtsID

if details.ProtectedTimestampRecord != nil {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := insqlDB.Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) error {
ptp := p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn)
return protectTimestampForBackup(
ctx, p.ExecCfg(), txn, b.job.ID(), backupManifest, details,
ctx, b.job.ID(), ptp, backupManifest, details,
)
}); err != nil {
return err
Expand All @@ -562,8 +573,8 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}

if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return planSchedulePTSChaining(ctx, p.ExecCfg(), txn, &details, b.job.CreatedBy())
if err := insqlDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return planSchedulePTSChaining(ctx, p.ExecCfg().JobsKnobs(), txn, &details, b.job.CreatedBy())
}); err != nil {
return err
}
Expand All @@ -586,7 +597,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// Update the job payload (non-volatile job definition) once, with the now
// resolved destination, updated description, etc. If we resume again we'll
// skip this whole block so this isn't an excessive update of payload.
if err := b.job.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := b.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := md.CheckRunningOrReverting(); err != nil {
return err
}
Expand Down Expand Up @@ -732,10 +743,12 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
}

if details.ProtectedTimestampRecord != nil && !b.testingKnobs.ignoreProtectedTimestamps {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := p.ExecCfg().InternalDB.Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) error {
details := b.job.Details().(jobspb.BackupDetails)
return releaseProtectedTimestamp(ctx, txn, p.ExecCfg().ProtectedTimestampProvider,
details.ProtectedTimestampRecord)
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn)
return releaseProtectedTimestamp(ctx, pts, details.ProtectedTimestampRecord)
}); err != nil {
log.Errorf(ctx, "failed to release protected timestamp: %v", err)
}
Expand Down Expand Up @@ -807,7 +820,9 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
logJobCompletion(ctx, b.getTelemetryEventType(), b.job.ID(), true, nil)
}

return b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusSucceeded, p.ExecCfg())
return b.maybeNotifyScheduledJobCompletion(
ctx, jobs.StatusSucceeded, p.ExecCfg().JobsKnobs(), p.ExecCfg().InternalDB,
)
}

// ReportResults implements JobResultsReporter interface.
Expand All @@ -830,15 +845,19 @@ func (b *backupResumer) ReportResults(ctx context.Context, resultsCh chan<- tree
func getBackupDetailAndManifest(
ctx context.Context,
execCfg *sql.ExecutorConfig,
txn *kv.Txn,
txn isql.Txn,
initialDetails jobspb.BackupDetails,
user username.SQLUsername,
backupDestination backupdest.ResolvedDestination,
) (jobspb.BackupDetails, backuppb.BackupManifest, error) {
makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI

kmsEnv := backupencryption.MakeBackupKMSEnv(execCfg.Settings, &execCfg.ExternalIODirConfig,
execCfg.DB, user, execCfg.InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
execCfg.Settings,
&execCfg.ExternalIODirConfig,
execCfg.InternalDB,
user,
)

mem := execCfg.RootMemoryMonitor.MakeBoundAccount()
defer mem.Close(ctx)
Expand Down Expand Up @@ -1004,22 +1023,20 @@ func (b *backupResumer) readManifestOnResume(
}

func (b *backupResumer) maybeNotifyScheduledJobCompletion(
ctx context.Context, jobStatus jobs.Status, exec *sql.ExecutorConfig,
ctx context.Context, jobStatus jobs.Status, knobs *jobs.TestingKnobs, db isql.DB,
) error {
env := scheduledjobs.ProdJobSchedulerEnv
if knobs, ok := exec.DistSQLSrv.TestingKnobs.JobsTestingKnobs.(*jobs.TestingKnobs); ok {
if knobs.JobSchedulerEnv != nil {
env = knobs.JobSchedulerEnv
}
if knobs != nil && knobs.JobSchedulerEnv != nil {
env = knobs.JobSchedulerEnv
}

err := exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// We cannot rely on b.job containing created_by_id because on job
// resumption the registry does not populate the resumer's CreatedByInfo.
datums, err := exec.InternalExecutor.QueryRowEx(
datums, err := txn.QueryRowEx(
ctx,
"lookup-schedule-info",
txn,
txn.KV(),
sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(
"SELECT created_by_id FROM %s WHERE id=$1 AND created_by_type=$2",
Expand All @@ -1035,7 +1052,8 @@ func (b *backupResumer) maybeNotifyScheduledJobCompletion(

scheduleID := int64(tree.MustBeDInt(datums[0]))
if err := jobs.NotifyJobTermination(
ctx, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID, exec.InternalExecutor, txn); err != nil {
ctx, txn, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID,
); err != nil {
return errors.Wrapf(err,
"failed to notify schedule %d of completion of job %d", scheduleID, b.job.ID())
}
Expand All @@ -1056,19 +1074,20 @@ func (b *backupResumer) OnFailOrCancel(
p := execCtx.(sql.JobExecContext)
cfg := p.ExecCfg()
b.deleteCheckpoint(ctx, cfg, p.User())
if err := cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := cfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
details := b.job.Details().(jobspb.BackupDetails)
return releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider,
details.ProtectedTimestampRecord)
pts := cfg.ProtectedTimestampProvider.WithTxn(txn)
return releaseProtectedTimestamp(ctx, pts, details.ProtectedTimestampRecord)
}); err != nil {
return err
}

// This should never return an error unless resolving the schedule that the
// job is being run under fails. This could happen if the schedule is dropped
// while the job is executing.
if err := b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusFailed,
execCtx.(sql.JobExecContext).ExecCfg()); err != nil {
if err := b.maybeNotifyScheduledJobCompletion(
ctx, jobs.StatusFailed, cfg.JobsKnobs(), cfg.InternalDB,
); err != nil {
log.Errorf(ctx, "failed to notify job %d on completion of OnFailOrCancel: %+v",
b.job.ID(), err)
}
Expand Down
Loading

0 comments on commit 4066912

Please sign in to comment.