Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*,isql: introduce isql.Txn and isql.DB #93218

Merged
merged 1 commit into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1581,6 +1581,7 @@ GO_TARGETS = [
"//pkg/sql/importer:importer_test",
"//pkg/sql/inverted:inverted",
"//pkg/sql/inverted:inverted_test",
"//pkg/sql/isql:isql",
"//pkg/sql/lex:lex",
"//pkg/sql/lex:lex_test",
"//pkg/sql/lexbase/allkeywords:allkeywords",
Expand Down Expand Up @@ -1856,7 +1857,6 @@ GO_TARGETS = [
"//pkg/sql/sqltelemetry:sqltelemetry",
"//pkg/sql/sqltestutils:sqltestutils",
"//pkg/sql/sqltestutils:sqltestutils_test",
"//pkg/sql/sqlutil:sqlutil",
"//pkg/sql/stats/bounds:bounds",
"//pkg/sql/stats:stats",
"//pkg/sql/stats:stats_test",
Expand Down Expand Up @@ -2798,6 +2798,7 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/idxusage:get_x_data",
"//pkg/sql/importer:get_x_data",
"//pkg/sql/inverted:get_x_data",
"//pkg/sql/isql:get_x_data",
"//pkg/sql/lex:get_x_data",
"//pkg/sql/lexbase:get_x_data",
"//pkg/sql/lexbase/allkeywords:get_x_data",
Expand Down Expand Up @@ -2978,7 +2979,6 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/sqlstats/ssmemstorage:get_x_data",
"//pkg/sql/sqltelemetry:get_x_data",
"//pkg/sql/sqltestutils:get_x_data",
"//pkg/sql/sqlutil:get_x_data",
"//pkg/sql/stats:get_x_data",
"//pkg/sql/stats/bounds:get_x_data",
"//pkg/sql/stmtdiagnostics:get_x_data",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/exprutil",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand All @@ -115,7 +116,6 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/sql/syntheticprivilege",
"//pkg/sql/types",
Expand Down Expand Up @@ -251,6 +251,7 @@ go_test(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/importer",
"//pkg/sql/isql",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/randgen",
Expand All @@ -259,7 +260,6 @@ go_test(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/storage",
"//pkg/testutils",
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/alter_backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ func doAlterBackupPlan(
}

ioConf := baseStore.ExternalIOConf()
kmsEnv := backupencryption.MakeBackupKMSEnv(baseStore.Settings(), &ioConf, p.ExecCfg().DB,
p.User(), p.ExecCfg().InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
baseStore.Settings(), &ioConf, p.ExecCfg().InternalDB, p.User(),
)

// Check that at least one of the old keys has been used to encrypt the backup in the past.
// Use the first one that works to decrypt the ENCRYPTION-INFO file(s).
Expand Down
25 changes: 14 additions & 11 deletions pkg/ccl/backupccl/alter_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ func loadSchedules(
}

execCfg := p.ExecCfg()
env := sql.JobSchedulerEnv(execCfg)
schedule, err := jobs.LoadScheduledJob(ctx, env, int64(scheduleID), execCfg.InternalExecutor, p.Txn())
env := sql.JobSchedulerEnv(execCfg.JobsKnobs())
schedules := jobs.ScheduledJobTxn(p.InternalSQLTxn())
schedule, err := schedules.Load(ctx, env, int64(scheduleID))
if err != nil {
return s, err
}
Expand All @@ -74,7 +75,7 @@ func loadSchedules(
var dependentStmt *tree.Backup

if args.DependentScheduleID != 0 {
dependentSchedule, err = jobs.LoadScheduledJob(ctx, env, args.DependentScheduleID, execCfg.InternalExecutor, p.Txn())
dependentSchedule, err = schedules.Load(ctx, env, args.DependentScheduleID)
if err != nil {
return scheduleDetails{}, err
}
Expand Down Expand Up @@ -188,10 +189,11 @@ func doAlterBackupSchedules(
if err != nil {
return err
}
scheduledJobs := jobs.ScheduledJobTxn(p.InternalSQLTxn())
s.fullJob.SetExecutionDetails(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: fullAny})
if err := s.fullJob.Update(ctx, p.ExecCfg().InternalExecutor, p.Txn()); err != nil {
if err := scheduledJobs.Update(ctx, s.fullJob); err != nil {
return err
}

Expand All @@ -204,7 +206,8 @@ func doAlterBackupSchedules(
s.incJob.SetExecutionDetails(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: incAny})
if err := s.incJob.Update(ctx, p.ExecCfg().InternalExecutor, p.Txn()); err != nil {

if err := scheduledJobs.Update(ctx, s.incJob); err != nil {
return err
}

Expand Down Expand Up @@ -382,8 +385,8 @@ func processFullBackupRecurrence(
return s, nil
}

env := sql.JobSchedulerEnv(p.ExecCfg())
ex := p.ExecCfg().InternalExecutor
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
scheduledJobs := jobs.ScheduledJobTxn(p.InternalSQLTxn())
if fullBackupAlways {
if s.incJob == nil {
// Nothing to do.
Expand All @@ -396,7 +399,7 @@ func processFullBackupRecurrence(
}
s.fullArgs.DependentScheduleID = 0
s.fullArgs.UnpauseOnSuccess = 0
if err := s.incJob.Delete(ctx, ex, p.Txn()); err != nil {
if err := scheduledJobs.Delete(ctx, s.incJob); err != nil {
return scheduleDetails{}, err
}
s.incJob = nil
Expand Down Expand Up @@ -453,7 +456,7 @@ func processFullBackupRecurrence(
tree.ScheduledBackupExecutor.InternalName(),
jobspb.ExecutionArguments{Args: incAny})

if err := s.incJob.Create(ctx, ex, p.Txn()); err != nil {
if err := scheduledJobs.Create(ctx, s.incJob); err != nil {
return scheduleDetails{}, err
}
s.fullArgs.UnpauseOnSuccess = s.incJob.ScheduleID()
Expand All @@ -480,7 +483,7 @@ func validateFullIncrementalFrequencies(p sql.PlanHookState, s scheduleDetails)
if s.incJob == nil {
return nil
}
env := sql.JobSchedulerEnv(p.ExecCfg())
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
now := env.Now()

fullFreq, err := frequencyFromCron(now, s.fullJob.ScheduleExpr())
Expand Down Expand Up @@ -536,7 +539,7 @@ func processInto(p sql.PlanHookState, spec *alterBackupScheduleSpec, s scheduleD

// Kick off a full backup immediately so we can unpause incrementals.
// This mirrors the behavior of CREATE SCHEDULE FOR BACKUP.
env := sql.JobSchedulerEnv(p.ExecCfg())
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
s.fullJob.SetNextRun(env.Now())

return nil
Expand Down
89 changes: 54 additions & 35 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
Expand All @@ -44,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
Expand Down Expand Up @@ -140,9 +140,12 @@ func backup(
var lastCheckpoint time.Time

var completedSpans, completedIntroducedSpans []roachpb.Span
kmsEnv := backupencryption.MakeBackupKMSEnv(execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig, execCtx.ExecCfg().DB, execCtx.User(),
execCtx.ExecCfg().InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
execCtx.ExecCfg().Settings,
&execCtx.ExecCfg().ExternalIODirConfig,
execCtx.ExecCfg().InternalDB,
execCtx.User(),
)
// TODO(benesch): verify these files, rather than accepting them as truth
// blindly.
// No concurrency yet, so these assignments are safe.
Expand Down Expand Up @@ -369,13 +372,13 @@ func backup(
}

func releaseProtectedTimestamp(
ctx context.Context, txn *kv.Txn, pts protectedts.Storage, ptsID *uuid.UUID,
ctx context.Context, pts protectedts.Storage, ptsID *uuid.UUID,
) error {
// If the job doesn't have a protected timestamp then there's nothing to do.
if ptsID == nil {
return nil
}
err := pts.Release(ctx, txn, *ptsID)
err := pts.Release(ctx, *ptsID)
if errors.Is(err, protectedts.ErrNotExists) {
// No reason to return an error which might cause problems if it doesn't
// seem to exist.
Expand Down Expand Up @@ -447,8 +450,12 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// The span is finished by the registry executing the job.
details := b.job.Details().(jobspb.BackupDetails)
p := execCtx.(sql.JobExecContext)
kmsEnv := backupencryption.MakeBackupKMSEnv(p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig, p.ExecCfg().DB, p.User(), p.ExecCfg().InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
p.ExecCfg().Settings,
&p.ExecCfg().ExternalIODirConfig,
p.ExecCfg().InternalDB,
p.User(),
)

// Resolve the backup destination. We can skip this step if we
// have already resolved and persisted the destination either
Expand Down Expand Up @@ -511,9 +518,10 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// details and manifest in a prior resumption.
//
// TODO(adityamaru: Break this code block into helper methods.
insqlDB := p.ExecCfg().InternalDB
if details.URI == "" {
initialDetails := details
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := insqlDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
backupDetails, m, err := getBackupDetailAndManifest(
ctx, p.ExecCfg(), txn, details, p.User(), backupDest,
)
Expand All @@ -538,9 +546,12 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
details.ProtectedTimestampRecord = &protectedtsID

if details.ProtectedTimestampRecord != nil {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := insqlDB.Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) error {
ptp := p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn)
return protectTimestampForBackup(
ctx, p.ExecCfg(), txn, b.job.ID(), backupManifest, details,
ctx, b.job.ID(), ptp, backupManifest, details,
)
}); err != nil {
return err
Expand All @@ -562,8 +573,8 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
return err
}

if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return planSchedulePTSChaining(ctx, p.ExecCfg(), txn, &details, b.job.CreatedBy())
if err := insqlDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return planSchedulePTSChaining(ctx, p.ExecCfg().JobsKnobs(), txn, &details, b.job.CreatedBy())
}); err != nil {
return err
}
Expand All @@ -586,7 +597,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// Update the job payload (non-volatile job definition) once, with the now
// resolved destination, updated description, etc. If we resume again we'll
// skip this whole block so this isn't an excessive update of payload.
if err := b.job.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := b.job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := md.CheckRunningOrReverting(); err != nil {
return err
}
Expand Down Expand Up @@ -732,10 +743,12 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
}

if details.ProtectedTimestampRecord != nil && !b.testingKnobs.ignoreProtectedTimestamps {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := p.ExecCfg().InternalDB.Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) error {
details := b.job.Details().(jobspb.BackupDetails)
return releaseProtectedTimestamp(ctx, txn, p.ExecCfg().ProtectedTimestampProvider,
details.ProtectedTimestampRecord)
pts := p.ExecCfg().ProtectedTimestampProvider.WithTxn(txn)
return releaseProtectedTimestamp(ctx, pts, details.ProtectedTimestampRecord)
}); err != nil {
log.Errorf(ctx, "failed to release protected timestamp: %v", err)
}
Expand Down Expand Up @@ -807,7 +820,9 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
logJobCompletion(ctx, b.getTelemetryEventType(), b.job.ID(), true, nil)
}

return b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusSucceeded, p.ExecCfg())
return b.maybeNotifyScheduledJobCompletion(
ctx, jobs.StatusSucceeded, p.ExecCfg().JobsKnobs(), p.ExecCfg().InternalDB,
)
}

// ReportResults implements JobResultsReporter interface.
Expand All @@ -830,15 +845,19 @@ func (b *backupResumer) ReportResults(ctx context.Context, resultsCh chan<- tree
func getBackupDetailAndManifest(
ctx context.Context,
execCfg *sql.ExecutorConfig,
txn *kv.Txn,
txn isql.Txn,
initialDetails jobspb.BackupDetails,
user username.SQLUsername,
backupDestination backupdest.ResolvedDestination,
) (jobspb.BackupDetails, backuppb.BackupManifest, error) {
makeCloudStorage := execCfg.DistSQLSrv.ExternalStorageFromURI

kmsEnv := backupencryption.MakeBackupKMSEnv(execCfg.Settings, &execCfg.ExternalIODirConfig,
execCfg.DB, user, execCfg.InternalExecutor)
kmsEnv := backupencryption.MakeBackupKMSEnv(
execCfg.Settings,
&execCfg.ExternalIODirConfig,
execCfg.InternalDB,
user,
)

mem := execCfg.RootMemoryMonitor.MakeBoundAccount()
defer mem.Close(ctx)
Expand Down Expand Up @@ -1004,22 +1023,20 @@ func (b *backupResumer) readManifestOnResume(
}

func (b *backupResumer) maybeNotifyScheduledJobCompletion(
ctx context.Context, jobStatus jobs.Status, exec *sql.ExecutorConfig,
ctx context.Context, jobStatus jobs.Status, knobs *jobs.TestingKnobs, db isql.DB,
) error {
env := scheduledjobs.ProdJobSchedulerEnv
if knobs, ok := exec.DistSQLSrv.TestingKnobs.JobsTestingKnobs.(*jobs.TestingKnobs); ok {
if knobs.JobSchedulerEnv != nil {
env = knobs.JobSchedulerEnv
}
if knobs != nil && knobs.JobSchedulerEnv != nil {
env = knobs.JobSchedulerEnv
}

err := exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
// We cannot rely on b.job containing created_by_id because on job
// resumption the registry does not populate the resumer's CreatedByInfo.
datums, err := exec.InternalExecutor.QueryRowEx(
datums, err := txn.QueryRowEx(
ctx,
"lookup-schedule-info",
txn,
txn.KV(),
sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(
"SELECT created_by_id FROM %s WHERE id=$1 AND created_by_type=$2",
Expand All @@ -1035,7 +1052,8 @@ func (b *backupResumer) maybeNotifyScheduledJobCompletion(

scheduleID := int64(tree.MustBeDInt(datums[0]))
if err := jobs.NotifyJobTermination(
ctx, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID, exec.InternalExecutor, txn); err != nil {
ctx, txn, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID,
); err != nil {
return errors.Wrapf(err,
"failed to notify schedule %d of completion of job %d", scheduleID, b.job.ID())
}
Expand All @@ -1056,19 +1074,20 @@ func (b *backupResumer) OnFailOrCancel(
p := execCtx.(sql.JobExecContext)
cfg := p.ExecCfg()
b.deleteCheckpoint(ctx, cfg, p.User())
if err := cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := cfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
details := b.job.Details().(jobspb.BackupDetails)
return releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider,
details.ProtectedTimestampRecord)
pts := cfg.ProtectedTimestampProvider.WithTxn(txn)
return releaseProtectedTimestamp(ctx, pts, details.ProtectedTimestampRecord)
}); err != nil {
return err
}

// This should never return an error unless resolving the schedule that the
// job is being run under fails. This could happen if the schedule is dropped
// while the job is executing.
if err := b.maybeNotifyScheduledJobCompletion(ctx, jobs.StatusFailed,
execCtx.(sql.JobExecContext).ExecCfg()); err != nil {
if err := b.maybeNotifyScheduledJobCompletion(
ctx, jobs.StatusFailed, cfg.JobsKnobs(), cfg.InternalDB,
); err != nil {
log.Errorf(ctx, "failed to notify job %d on completion of OnFailOrCancel: %+v",
b.job.ID(), err)
}
Expand Down
Loading