Skip to content

Commit

Permalink
*,isql: introduce isql.Txn and isql.DB
Browse files Browse the repository at this point in the history
This massive refactor works to bind the `*kv.Txn` with the internal
executor and other sql-layer, txn-associated state. This work follows on from
an earlier project to more tightly couple internal executors to the rest of
`extraTxnState`. That project resulted in sprawling changes to propagated the
paired dependencies through the system. In practice, we're better off coupling
them through an object.

There are some refactors added in here to curry and hide some of these
dependencies from interfaces. Those may be possible to extract to be separate.

Additionally, not all of the dependency sprawl has been eliminated; there are
cases where we could pass a `isql.Txn` but instead keep passing the underlying
`isql.Executor` and `*kv.Txn`. We can do more cleanup along the way.

Lastly, I couldn't help myself from lifting some `sql.ExecCfg` arguments up and
being more specific in some places.

Epic: none

Release note: None
  • Loading branch information
ajwerner committed Jan 18, 2023
1 parent 4ec5a5f commit 9936310
Show file tree
Hide file tree
Showing 484 changed files with 6,523 additions and 6,657 deletions.
4 changes: 2 additions & 2 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,7 @@ GO_TARGETS = [
"//pkg/sql/importer:importer_test",
"//pkg/sql/inverted:inverted",
"//pkg/sql/inverted:inverted_test",
"//pkg/sql/isql:isql",
"//pkg/sql/lex:lex",
"//pkg/sql/lex:lex_test",
"//pkg/sql/lexbase/allkeywords:allkeywords",
Expand Down Expand Up @@ -1856,7 +1857,6 @@ GO_TARGETS = [
"//pkg/sql/sqltelemetry:sqltelemetry",
"//pkg/sql/sqltestutils:sqltestutils",
"//pkg/sql/sqltestutils:sqltestutils_test",
"//pkg/sql/sqlutil:sqlutil",
"//pkg/sql/stats/bounds:bounds",
"//pkg/sql/stats:stats",
"//pkg/sql/stats:stats_test",
Expand Down Expand Up @@ -2798,6 +2798,7 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/idxusage:get_x_data",
"//pkg/sql/importer:get_x_data",
"//pkg/sql/inverted:get_x_data",
"//pkg/sql/isql:get_x_data",
"//pkg/sql/lex:get_x_data",
"//pkg/sql/lexbase:get_x_data",
"//pkg/sql/lexbase/allkeywords:get_x_data",
Expand Down Expand Up @@ -2978,7 +2979,6 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/sqlstats/ssmemstorage:get_x_data",
"//pkg/sql/sqltelemetry:get_x_data",
"//pkg/sql/sqltestutils:get_x_data",
"//pkg/sql/sqlutil:get_x_data",
"//pkg/sql/stats:get_x_data",
"//pkg/sql/stats/bounds:get_x_data",
"//pkg/sql/stmtdiagnostics:get_x_data",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand All @@ -115,7 +116,6 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/sql/syntheticprivilege",
"//pkg/sql/types",
Expand Down Expand Up @@ -251,6 +251,7 @@ go_test(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/randgen",
Expand All @@ -259,7 +260,6 @@ go_test(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/storage",
"//pkg/testutils",
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/alter_backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ func doAlterBackupPlan(
}

ioConf := baseStore.ExternalIOConf()
kmsEnv := backupencryption.MakeBackupKMSEnv(baseStore.Settings(), &ioConf, p.ExecCfg().DB,
p.User(), p.ExecCfg().InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
baseStore.Settings(), &ioConf, p.ExecCfg().InternalDB, p.User(),
)

// Check that at least one of the old keys has been used to encrypt the backup in the past.
// Use the first one that works to decrypt the ENCRYPTION-INFO file(s).
Expand Down
25 changes: 14 additions & 11 deletions pkg/ccl/backupccl/alter_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ func loadSchedules(
}

execCfg := p.ExecCfg()
env := sql.JobSchedulerEnv(execCfg)
schedule, err := jobs.LoadScheduledJob(ctx, env, int64(scheduleID), execCfg.InternalExecutor, p.Txn())
env := sql.JobSchedulerEnv(execCfg.JobsKnobs())
schedules := jobs.ScheduledJobTxn(p.InternalSQLTxn())
schedule, err := schedules.Load(ctx, env, int64(scheduleID))
if err != nil {
return s, err
}
Expand All @@ -74,7 +75,7 @@ func loadSchedules(
var dependentStmt *tree.Backup

if args.DependentScheduleID != 0 {
dependentSchedule, err = jobs.LoadScheduledJob(ctx, env, args.DependentScheduleID, execCfg.InternalExecutor, p.Txn())
dependentSchedule, err = schedules.Load(ctx, env, args.DependentScheduleID)
if err != nil {
return scheduleDetails{}, err
}
Expand Down Expand Up @@ -188,10 +189,11 @@ func doAlterBackupSchedules(
if err != nil {
return err
}
scheduledJobs := jobs.ScheduledJobTxn(p.InternalSQLTxn())
s.fullJob.SetExecutionDetails(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: fullAny})
if err := s.fullJob.Update(ctx, p.ExecCfg().InternalExecutor, p.Txn()); err != nil {
if err := scheduledJobs.Update(ctx, s.fullJob); err != nil {
return err
}

Expand All @@ -204,7 +206,8 @@ func doAlterBackupSchedules(
s.incJob.SetExecutionDetails(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: incAny})
if err := s.incJob.Update(ctx, p.ExecCfg().InternalExecutor, p.Txn()); err != nil {

if err := scheduledJobs.Update(ctx, s.incJob); err != nil {
return err
}

Expand Down Expand Up @@ -382,8 +385,8 @@ func processFullBackupRecurrence(
return s, nil
}

env := sql.JobSchedulerEnv(p.ExecCfg())
ex := p.ExecCfg().InternalExecutor
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
scheduledJobs := jobs.ScheduledJobTxn(p.InternalSQLTxn())
if fullBackupAlways {
if s.incJob == nil {
// Nothing to do.
Expand All @@ -396,7 +399,7 @@ func processFullBackupRecurrence(
}
s.fullArgs.DependentScheduleID = 0
s.fullArgs.UnpauseOnSuccess = 0
if err := s.incJob.Delete(ctx, ex, p.Txn()); err != nil {
if err := scheduledJobs.Delete(ctx, s.incJob); err != nil {
return scheduleDetails{}, err
}
s.incJob = nil
Expand Down Expand Up @@ -453,7 +456,7 @@ func processFullBackupRecurrence(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: incAny})

if err := s.incJob.Create(ctx, ex, p.Txn()); err != nil {
if err := scheduledJobs.Create(ctx, s.incJob); err != nil {
return scheduleDetails{}, err
}
s.fullArgs.UnpauseOnSuccess = s.incJob.ScheduleID()
Expand All @@ -480,7 +483,7 @@ func validateFullIncrementalFrequencies(p sql.PlanHookState, s scheduleDetails)
if s.incJob == nil {
return nil
}
env := sql.JobSchedulerEnv(p.ExecCfg())
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
now := env.Now()

fullFreq, err := frequencyFromCron(now, s.fullJob.ScheduleExpr())
Expand Down Expand Up @@ -536,7 +539,7 @@ func processInto(p sql.PlanHookState, spec *alterBackupScheduleSpec, s scheduleD

// Kick off a full backup immediately so we can unpause incrementals.
// This mirrors the behavior of CREATE SCHEDULE FOR BACKUP.
env := sql.JobSchedulerEnv(p.ExecCfg())
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
s.fullJob.SetNextRun(env.Now())

return nil
Expand Down
89 changes: 54 additions & 35 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
Expand All @@ -44,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
Expand Down Expand Up @@ -140,9 +140,12 @@ func backup(
var lastCheckpoint time.Time

var completedSpans, completedIntroducedSpans []roachpb.Span
kmsEnv := backupencryption.MakeBackupKMSEnv(execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig, execCtx.ExecCfg().DB, execCtx.User(),
execCtx.ExecCfg().InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig,
execCtx.ExecCfg().InternalDB,
execCtx.User(),
)
// TODO(benesch): verify these files, rather than accepting them as truth
// blindly.
// No concurrency yet, so these assignments are safe.
Expand Down Expand Up @@ -369,13 +372,13 @@ func backup(
}

func releaseProtectedTimestamp(
ctx context.Context, txn *kv.Txn, pts protectedts.Storage, ptsID *uuid.UUID,
ctx context.Context, pts protectedts.Storage, ptsID *uuid.UUID,
) error {
// If the job doesn't have a protected timestamp then there's nothing to do.
if ptsID == nil {
return nil
}
err := pts.Release(ctx, txn, *ptsID)
err := pts.Release(ctx, *ptsID)
if errors.Is(err, protectedts.ErrNotExists) {
// No reason to return an error which might cause problems if it doesn't
// seem to exist.
Expand Down Expand Up @@ -447,8 +450,12 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// The span is finished by the registry executing the job.
details := b.job.Details().(jobspb.BackupDetails)
p := execCtx.(sql.JobExecContext)
kmsEnv := backupencryption.MakeBackupKMSEnv(p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig, p.ExecCfg().DB, p.User(), p.ExecCfg().InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig,
p.ExecCfg().InternalDB,
p.User(),
)

// Resolve the backup destination. We can skip this step if we
// have already resolved and persisted the destination either
Expand Down Expand Up @@ -511,9 +518,10 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// details and manifest in a prior resumption.
//
// TODO(adityamaru: Break this code block into helper methods.
insqlDB := p.ExecCfg().InternalDB
if details.URI == "" {
initialDetails := details
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := insqlDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), txn, details, p.User(), backupDest,
)
Expand All @@ -538,9 +546,12 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
details.ProtectedTimestampRecord = &protectedtsID

if details.ProtectedTimestampRecord != nil {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := insqlDB.Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) error {
ptp := p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn)
return protectTimestampForBackup(
ctx, p.ExecCfg(), txn, b.job.ID(), backupManifest, details,
ctx, b.job.ID(), ptp, backupManifest, details,
)
}); err != nil {
return err
Expand All @@ -562,8 +573,8 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}

if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return planSchedulePTSChaining(ctx, p.ExecCfg(), txn, &details, b.job.CreatedBy())
if err := insqlDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return planSchedulePTSChaining(ctx, p.ExecCfg().JobsKnobs(), txn, &details, b.job.CreatedBy())
}); err != nil {
return err
}
Expand All @@ -586,7 +597,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// Update the job payload (non-volatile job definition) once, with the now
// resolved destination, updated description, etc. If we resume again we'll
// skip this whole block so this isn't an excessive update of payload.
if err := b.job.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := b.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := md.CheckRunningOrReverting(); err != nil {
return err
}
Expand Down Expand Up @@ -732,10 +743,12 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
}

if details.ProtectedTimestampRecord != nil && !b.testingKnobs.ignoreProtectedTimestamps {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := p.ExecCfg().InternalDB.Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) error {
details := b.job.Details().(jobspb.BackupDetails)
return releaseProtectedTimestamp(ctx, txn, p.ExecCfg().ProtectedTimestampProvider,
details.ProtectedTimestampRecord)
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn)
return releaseProtectedTimestamp(ctx, pts, details.ProtectedTimestampRecord)
}); err != nil {
log.Errorf(ctx, "failed to release protected timestamp: %v", err)
}
Expand Down Expand Up @@ -807,7 +820,9 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
logJobCompletion(ctx, b.getTelemetryEventType(), b.job.ID(), true, nil)
}

return b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusSucceeded, p.ExecCfg())
return b.maybeNotifyScheduledJobCompletion(
ctx, jobs.StatusSucceeded, p.ExecCfg().JobsKnobs(), p.ExecCfg().InternalDB,
)
}

// ReportResults implements JobResultsReporter interface.
Expand All @@ -830,15 +845,19 @@ func (b *backupResumer) ReportResults(ctx context.Context, resultsCh chan<- tree
func getBackupDetailAndManifest(
ctx context.Context,
execCfg *sql.ExecutorConfig,
txn *kv.Txn,
txn isql.Txn,
initialDetails jobspb.BackupDetails,
user username.SQLUsername,
backupDestination backupdest.ResolvedDestination,
) (jobspb.BackupDetails, backuppb.BackupManifest, error) {
makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI

kmsEnv := backupencryption.MakeBackupKMSEnv(execCfg.Settings, &execCfg.ExternalIODirConfig,
execCfg.DB, user, execCfg.InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
execCfg.Settings,
&execCfg.ExternalIODirConfig,
execCfg.InternalDB,
user,
)

mem := execCfg.RootMemoryMonitor.MakeBoundAccount()
defer mem.Close(ctx)
Expand Down Expand Up @@ -1004,22 +1023,20 @@ func (b *backupResumer) readManifestOnResume(
}

func (b *backupResumer) maybeNotifyScheduledJobCompletion(
ctx context.Context, jobStatus jobs.Status, exec *sql.ExecutorConfig,
ctx context.Context, jobStatus jobs.Status, knobs *jobs.TestingKnobs, db isql.DB,
) error {
env := scheduledjobs.ProdJobSchedulerEnv
if knobs, ok := exec.DistSQLSrv.TestingKnobs.JobsTestingKnobs.(*jobs.TestingKnobs); ok {
if knobs.JobSchedulerEnv != nil {
env = knobs.JobSchedulerEnv
}
if knobs != nil && knobs.JobSchedulerEnv != nil {
env = knobs.JobSchedulerEnv
}

err := exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// We cannot rely on b.job containing created_by_id because on job
// resumption the registry does not populate the resumer's CreatedByInfo.
datums, err := exec.InternalExecutor.QueryRowEx(
datums, err := txn.QueryRowEx(
ctx,
"lookup-schedule-info",
txn,
txn.KV(),
sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(
"SELECT created_by_id FROM %s WHERE id=$1 AND created_by_type=$2",
Expand All @@ -1035,7 +1052,8 @@ func (b *backupResumer) maybeNotifyScheduledJobCompletion(

scheduleID := int64(tree.MustBeDInt(datums[0]))
if err := jobs.NotifyJobTermination(
ctx, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID, exec.InternalExecutor, txn); err != nil {
ctx, txn, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID,
); err != nil {
return errors.Wrapf(err,
"failed to notify schedule %d of completion of job %d", scheduleID, b.job.ID())
}
Expand All @@ -1056,19 +1074,20 @@ func (b *backupResumer) OnFailOrCancel(
p := execCtx.(sql.JobExecContext)
cfg := p.ExecCfg()
b.deleteCheckpoint(ctx, cfg, p.User())
if err := cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := cfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
details := b.job.Details().(jobspb.BackupDetails)
return releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider,
details.ProtectedTimestampRecord)
pts := cfg.ProtectedTimestampProvider.WithTxn(txn)
return releaseProtectedTimestamp(ctx, pts, details.ProtectedTimestampRecord)
}); err != nil {
return err
}

// This should never return an error unless resolving the schedule that the
// job is being run under fails. This could happen if the schedule is dropped
// while the job is executing.
if err := b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusFailed,
execCtx.(sql.JobExecContext).ExecCfg()); err != nil {
if err := b.maybeNotifyScheduledJobCompletion(
ctx, jobs.StatusFailed, cfg.JobsKnobs(), cfg.InternalDB,
); err != nil {
log.Errorf(ctx, "failed to notify job %d on completion of OnFailOrCancel: %+v",
b.job.ID(), err)
}
Expand Down
Loading

0 comments on commit 9936310

Please sign in to comment.