Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ptstorage, *: deprecate storage.ex with txn-bound internal executor #91404

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
Expand Down Expand Up @@ -370,13 +371,17 @@ func backup(
}

func releaseProtectedTimestamp(
ctx context.Context, txn *kv.Txn, pts protectedts.Storage, ptsID *uuid.UUID,
ctx context.Context,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
pts protectedts.Storage,
ptsID *uuid.UUID,
) error {
// If the job doesn't have a protected timestamp then there's nothing to do.
if ptsID == nil {
return nil
}
err := pts.Release(ctx, txn, *ptsID)
err := pts.Release(ctx, txn, ie, *ptsID)
if errors.Is(err, protectedts.ErrNotExists) {
// No reason to return an error which might cause problems if it doesn't
// seem to exist.
Expand Down Expand Up @@ -543,7 +548,7 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
// Update the job payload (non-volatile job definition) once, with the now
// resolved destination, updated description, etc. If we resume again we'll
// skip this whole block so this isn't an excessive update of payload.
if err := b.job.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := b.job.Update(ctx, nil /* txn */, nil /* ie */, func(_ *kv.Txn, _ sqlutil.InternalExecutor, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
if err := md.CheckRunningOrReverting(); err != nil {
return err
}
Expand Down Expand Up @@ -689,9 +694,9 @@ func (b *backupResumer) Resume(ctx context.Context, execCtx interface{}) error {
}

if details.ProtectedTimestampRecord != nil && !b.testingKnobs.ignoreProtectedTimestamps {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := p.ExecCfg().InternalExecutorFactory.TxnWithExecutor(ctx, p.ExecCfg().DB, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
details := b.job.Details().(jobspb.BackupDetails)
return releaseProtectedTimestamp(ctx, txn, p.ExecCfg().ProtectedTimestampProvider,
return releaseProtectedTimestamp(ctx, txn, ie, p.ExecCfg().ProtectedTimestampProvider,
details.ProtectedTimestampRecord)
}); err != nil {
log.Errorf(ctx, "failed to release protected timestamp: %v", err)
Expand Down Expand Up @@ -1003,9 +1008,9 @@ func (b *backupResumer) OnFailOrCancel(
p := execCtx.(sql.JobExecContext)
cfg := p.ExecCfg()
b.deleteCheckpoint(ctx, cfg, p.User())
if err := cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := cfg.InternalExecutorFactory.TxnWithExecutor(ctx, cfg.DB, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
details := b.job.Details().(jobspb.BackupDetails)
return releaseProtectedTimestamp(ctx, txn, cfg.ProtectedTimestampProvider,
return releaseProtectedTimestamp(ctx, txn, ie, cfg.ProtectedTimestampProvider,
details.ProtectedTimestampRecord)
}); err != nil {
return err
Expand Down
12 changes: 8 additions & 4 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
Expand Down Expand Up @@ -785,11 +786,14 @@ func backupPlanHook(
if detached {
// When running inside an explicit transaction, we simply create the job
// record. We do not wait for the job to finish.
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, plannerTxn)
if err != nil {
if err := p.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
_, err = p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, plannerTxn, ie)
return err
}); err != nil {
return err
}

resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
return nil
}
Expand Down Expand Up @@ -920,7 +924,7 @@ func getScheduledBackupExecutionArgsFromSchedule(
ctx context.Context,
env scheduledjobs.JobSchedulerEnv,
txn *kv.Txn,
ie *sql.InternalExecutor,
ie sqlutil.InternalExecutor,
scheduleID int64,
) (*jobs.ScheduledJob, *backuppb.ScheduledBackupExecutionArgs, error) {
// Load the schedule that has spawned this job.
Expand Down
11 changes: 10 additions & 1 deletion pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -640,7 +641,15 @@ func dryRunInvokeBackup(
if err != nil {
return eventpb.RecoveryEvent{}, err
}
return invokeBackup(ctx, backupFn, p.ExecCfg().JobRegistry, p.Txn())
var recoverEnv eventpb.RecoveryEvent
if err := p.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
recoverEnv, err = invokeBackup(ctx, backupFn, p.ExecCfg().JobRegistry, p.Txn(), ie)
return err
}); err != nil {
return eventpb.RecoveryEvent{}, err
}

return recoverEnv, nil
}

func fullyQualifyScheduledBackupTargetTables(
Expand Down
49 changes: 28 additions & 21 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,8 +963,8 @@ func createImportingDescriptors(
}

if !details.PrepareCompleted {
err := sql.DescsTxn(ctx, p.ExecCfg(), func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
err := p.ExecCfg().InternalExecutorFactory.DescsTxnWithExecutor(ctx, p.ExecCfg().DB, nil /* session data */, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor,
) error {
// A couple of pieces of cleanup are required for multi-region databases.
// First, we need to find all of the MULTIREGION_ENUMs types and remap the
Expand Down Expand Up @@ -1228,7 +1228,7 @@ func createImportingDescriptors(
}

// Update the job once all descs have been prepared for ingestion.
err := r.job.SetDetails(ctx, txn, details)
err := r.job.SetDetails(ctx, txn, ie, details)

// Emit to the event log now that the job has finished preparing descs.
emitRestoreJobEvent(ctx, p, jobs.StatusRunning, r.job)
Expand Down Expand Up @@ -1512,10 +1512,11 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
// public.
// TODO (lucy): Ideally we'd just create the database in the public state in
// the first place, as a special case.
publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor) (err error) {
if err := p.ExecCfg().InternalExecutorFactory.DescsTxnWithExecutor(ctx, p.ExecCfg().DB, nil /* session data */, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor,
) error {
return r.publishDescriptors(ctx, txn, ie, p.ExecCfg(), p.User(), descsCol, details, nil)
}
if err := r.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, r.execCfg.DB, nil /* sd */, publishDescriptors); err != nil {
}); err != nil {
return err
}
p.ExecCfg().JobRegistry.NotifyToAdoptJobs()
Expand Down Expand Up @@ -1571,7 +1572,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
resTotal.Add(res)

if details.DescriptorCoverage == tree.AllDescriptors {
if err := r.restoreSystemTables(ctx, p.ExecCfg().DB, preData.systemTables); err != nil {
if err := r.restoreSystemTables(ctx, p.ExecCfg().DB, p.ExecCfg().InternalExecutorFactory, preData.systemTables); err != nil {
return err
}
// Reload the details as we may have updated the job.
Expand Down Expand Up @@ -1632,7 +1633,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro

var devalidateIndexes map[descpb.ID][]descpb.IndexID
if toValidate := len(details.RevalidateIndexes); toValidate > 0 {
if err := r.job.RunningStatus(ctx, nil /* txn */, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) {
if err := r.job.RunningStatus(ctx, nil /* txn */, nil /* ie */, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) {
return jobs.RunningStatus(fmt.Sprintf("re-validating %d indexes", toValidate)), nil
}); err != nil {
return errors.Wrapf(err, "failed to update running status of job %d", errors.Safe(r.job.ID()))
Expand All @@ -1644,10 +1645,11 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
devalidateIndexes = bad
}

publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor) (err error) {
if err := p.ExecCfg().InternalExecutorFactory.DescsTxnWithExecutor(ctx, p.ExecCfg().DB, nil /* session data */, func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ie sqlutil.InternalExecutor,
) error {
return r.publishDescriptors(ctx, txn, ie, p.ExecCfg(), p.User(), descsCol, details, devalidateIndexes)
}
if err := r.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, r.execCfg.DB, nil /* sd */, publishDescriptors); err != nil {
}); err != nil {
return err
}

Expand All @@ -1670,7 +1672,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
// includes the jobs that are being restored. As soon as we restore these
// jobs, they become accessible to the user, and may start executing. We
// need this to happen after the descriptors have been marked public.
if err := r.restoreSystemTables(ctx, p.ExecCfg().DB, mainData.systemTables); err != nil {
if err := r.restoreSystemTables(ctx, p.ExecCfg().DB, p.ExecCfg().InternalExecutorFactory, mainData.systemTables); err != nil {
return err
}
// Reload the details as we may have updated the job.
Expand Down Expand Up @@ -1803,6 +1805,7 @@ func revalidateIndexes(
if err := sql.ValidateForwardIndexes(
ctx,
job,
execCfg.InternalExecutorFactory,
tableDesc.MakePublic(),
forward,
runner,
Expand All @@ -1823,6 +1826,7 @@ func revalidateIndexes(
ctx,
execCfg.Codec,
job,
execCfg.InternalExecutorFactory,
tableDesc.MakePublic(),
inverted,
runner,
Expand Down Expand Up @@ -1922,7 +1926,9 @@ func insertStats(
restoreStatsInsertBatchSize = len(latestStats)
}

if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := execCfg.InternalExecutorFactory.TxnWithExecutor(ctx, execCfg.DB, nil /* session data */, func(
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor,
) error {
if err := stats.InsertNewStats(ctx, execCfg.Settings, execCfg.InternalExecutor, txn,
latestStats[:restoreStatsInsertBatchSize]); err != nil {
return errors.Wrapf(err, "inserting stats from backup")
Expand All @@ -1931,7 +1937,7 @@ func insertStats(
// If this is the last batch, mark the stats insertion complete.
if restoreStatsInsertBatchSize == len(latestStats) {
details.StatsInserted = true
if err := job.SetDetails(ctx, txn, details); err != nil {
if err := job.SetDetails(ctx, txn, ie, details); err != nil {
return errors.Wrapf(err, "updating job marking stats insertion complete")
}
}
Expand Down Expand Up @@ -1995,9 +2001,7 @@ func (r *restoreResumer) publishDescriptors(

// Go through the descriptors and find any declarative schema change jobs
// affecting them.
if err := scbackup.CreateDeclarativeSchemaChangeJobs(
ctx, r.execCfg.JobRegistry, txn, ie, all,
); err != nil {
if err := scbackup.CreateDeclarativeSchemaChangeJobs(ctx, r.execCfg.JobRegistry, txn, ie, all); err != nil {
return err
}

Expand Down Expand Up @@ -2120,7 +2124,7 @@ func (r *restoreResumer) publishDescriptors(
details.SchemaDescs = newSchemas
details.DatabaseDescs = newDBs
details.FunctionDescs = newFunctions
if err := r.job.SetDetails(ctx, txn, details); err != nil {
if err := r.job.SetDetails(ctx, txn, ie, details); err != nil {
return errors.Wrap(err,
"updating job details after publishing tables")
}
Expand Down Expand Up @@ -2795,7 +2799,10 @@ func (r *restoreResumer) restoreSystemUsers(
// restoreSystemTables atomically replaces the contents of the system tables
// with the data from the restored system tables.
func (r *restoreResumer) restoreSystemTables(
ctx context.Context, db *kv.DB, tables []catalog.TableDescriptor,
ctx context.Context,
db *kv.DB,
ief sqlutil.InternalExecutorFactory,
tables []catalog.TableDescriptor,
) error {
details := r.job.Details().(jobspb.RestoreDetails)
if details.SystemTablesMigrated == nil {
Expand Down Expand Up @@ -2834,7 +2841,7 @@ func (r *restoreResumer) restoreSystemTables(
continue
}

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := ief.TxnWithExecutor(ctx, db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
if err := systemTable.config.migrationFunc(ctx, r.execCfg, txn,
systemTable.stagingTableName, details.DescriptorRewrites); err != nil {
return err
Expand All @@ -2844,7 +2851,7 @@ func (r *restoreResumer) restoreSystemTables(
// restarts don't try to import data over our migrated data. This would
// fail since the restored data would shadow the migrated keys.
details.SystemTablesMigrated[systemTable.systemTableName] = true
return r.job.SetDetails(ctx, txn, details)
return r.job.SetDetails(ctx, txn, ie, details)
}); err != nil {
return err
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1840,11 +1840,14 @@ func doRestorePlan(
// When running in detached mode, we simply create the job record.
// We do not wait for the job to finish.
jobID := p.ExecCfg().JobRegistry.MakeJobID()
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.Txn())
if err != nil {
if err := p.WithInternalExecutor(ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
_, err = p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.Txn(), ie)
return err
}); err != nil {
return err
}

resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
collectRestoreTelemetry(ctx, jobID, restoreDetails, intoDB, newDBName, subdir, restoreStmt,
descsByTablePattern, restoreDBs, asOfInterval, debugPauseOn, p.SessionData().ApplicationName)
Expand Down
27 changes: 19 additions & 8 deletions pkg/ccl/backupccl/schedule_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (e *scheduledBackupExecutor) ExecuteJob(
env scheduledjobs.JobSchedulerEnv,
sj *jobs.ScheduledJob,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
) error {
if err := e.executeBackup(ctx, cfg, sj, txn); err != nil {
e.metrics.NumFailed.Inc(1)
Expand Down Expand Up @@ -108,12 +109,16 @@ func (e *scheduledBackupExecutor) executeBackup(
if err != nil {
return err
}
_, err = invokeBackup(ctx, backupFn, nil, nil)
_, err = invokeBackup(ctx, backupFn, nil /* registry */, nil /* txn */, nil /* ie */)
return err
}

func invokeBackup(
ctx context.Context, backupFn sql.PlanHookRowFn, registry *jobs.Registry, txn *kv.Txn,
ctx context.Context,
backupFn sql.PlanHookRowFn,
registry *jobs.Registry,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
) (eventpb.RecoveryEvent, error) {
resultCh := make(chan tree.Datums) // No need to close
g := ctxgroup.WithContext(ctx)
Expand All @@ -122,7 +127,7 @@ func invokeBackup(
g.GoCtx(func(ctx context.Context) error {
select {
case res := <-resultCh:
backupEvent = getBackupFnTelemetry(ctx, registry, txn, res)
backupEvent = getBackupFnTelemetry(ctx, registry, txn, ie, res)
return nil
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -427,6 +432,7 @@ func unlinkOrDropDependentSchedule(
scheduleControllerEnv scheduledjobs.ScheduleControllerEnv,
env scheduledjobs.JobSchedulerEnv,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
args *backuppb.ScheduledBackupExecutionArgs,
) (int, error) {
if args.DependentScheduleID == 0 {
Expand Down Expand Up @@ -455,7 +461,7 @@ func unlinkOrDropDependentSchedule(
return 0, err
}

return 1, releaseProtectedTimestamp(ctx, txn, scheduleControllerEnv.PTSProvider(),
return 1, releaseProtectedTimestamp(ctx, txn, ie, scheduleControllerEnv.PTSProvider(),
dependentArgs.ProtectedTimestampRecord)
}

Expand Down Expand Up @@ -483,26 +489,31 @@ func (e *scheduledBackupExecutor) OnDrop(
sj *jobs.ScheduledJob,
txn *kv.Txn,
descsCol *descs.Collection,
ie sqlutil.InternalExecutor,
) (int, error) {
args := &backuppb.ScheduledBackupExecutionArgs{}

if err := pbtypes.UnmarshalAny(sj.ExecutionArgs().Args, args); err != nil {
return 0, errors.Wrap(err, "un-marshaling args")
}

dependentRowsDropped, err := unlinkOrDropDependentSchedule(ctx, scheduleControllerEnv, env, txn, args)
dependentRowsDropped, err := unlinkOrDropDependentSchedule(ctx, scheduleControllerEnv, env, txn, ie, args)
if err != nil {
return dependentRowsDropped, errors.Wrap(err, "failed to unlink dependent schedule")
}

return dependentRowsDropped, releaseProtectedTimestamp(ctx, txn, scheduleControllerEnv.PTSProvider(),
return dependentRowsDropped, releaseProtectedTimestamp(ctx, txn, ie, scheduleControllerEnv.PTSProvider(),
args.ProtectedTimestampRecord)
}

// getBackupFnTelemetry collects the telemetry from the dry-run backup
// corresponding to backupFnResult.
func getBackupFnTelemetry(
ctx context.Context, registry *jobs.Registry, txn *kv.Txn, backupFnResult tree.Datums,
ctx context.Context,
registry *jobs.Registry,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
backupFnResult tree.Datums,
) eventpb.RecoveryEvent {
if registry == nil {
return eventpb.RecoveryEvent{}
Expand All @@ -523,7 +534,7 @@ func getBackupFnTelemetry(
return jobspb.BackupDetails{}, errors.New("expected job ID as first column of result")
}

job, err := registry.LoadJobWithTxn(ctx, jobspb.JobID(jobID), txn)
job, err := registry.LoadJobWithTxn(ctx, jobspb.JobID(jobID), txn, ie)
if err != nil {
return jobspb.BackupDetails{}, errors.Wrap(err, "failed to load dry-run backup job")
}
Expand Down
Loading