From 54dbbe5beba96061cbdade0d5e64a06d63b657cc Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 13 Oct 2020 14:38:27 +0000 Subject: [PATCH] jobs, *: use separate interface for job exec Previously we passed PlanHookState to job execution methods like Resume or OnFailOrCancel. PlanHookState is a window into planner designed to surface its API to plannning code that exists outside the SQL package and is injected via hooks. Planning code expects to use certain methods that assume they are running during statement execution, and in particular have access to the transaction in which that statement is being executed, for example to resolve roles or privilages. Job Execution on the other hand is not done during statement execution and critically does not have a txn set. Previously the PlanHookState argument to job execution was backed by a planner as its concrete type that had a nil txn. However the API being shared with that used by planning code made it easy to accidentally call methods that assumed they were only ever called during statement planning during job execution, and thus violate the assumptions that txn could be nil or something, leading to bugs. This changes the job execution methods to expect to be passed a new type, JobExecContext, that has only the subset of methods that we expect to use during execution and, more importantly, does not have the methods exposed that should only be called during statement evaluation, and would be callable on PlanHookState. For now the implementation of JobExecContext is still backed by a planner as fields like ExtendedEvalContext are fairly closely tied to sql.planner. Later work might try to restrict this API as well. Release note: none. --- pkg/ccl/backupccl/backup_job.go | 14 ++-- .../backupccl/backup_processor_planning.go | 12 ++-- pkg/ccl/backupccl/restore_job.go | 18 ++--- .../backupccl/restore_processor_planning.go | 12 ++-- pkg/ccl/changefeedccl/changefeed_dist.go | 12 ++-- pkg/ccl/changefeedccl/changefeed_stmt.go | 20 +++--- pkg/ccl/importccl/import_processor_test.go | 6 +- pkg/ccl/importccl/import_stmt.go | 26 ++++---- pkg/ccl/importccl/read_import_mysql.go | 15 ++--- pkg/ccl/importccl/read_import_mysql_test.go | 3 +- pkg/ccl/importccl/read_import_pgdump.go | 10 +-- pkg/jobs/adopt.go | 6 +- pkg/jobs/deprecated.go | 4 +- pkg/jobs/helpers_test.go | 4 +- pkg/jobs/jobs.go | 4 +- pkg/jobs/jobs_test.go | 10 +-- pkg/jobs/registry.go | 50 +++++++------- pkg/server/server_sql.go | 2 +- pkg/sql/BUILD.bazel | 1 + pkg/sql/create_stats.go | 12 ++-- pkg/sql/distsql_plan_csv.go | 12 ++-- pkg/sql/distsql_plan_stats.go | 4 +- pkg/sql/gcjob/gc_job.go | 4 +- pkg/sql/job_exec_context.go | 65 +++++++++++++++++++ pkg/sql/schema_changer.go | 8 +-- pkg/sql/type_change.go | 14 ++-- 26 files changed, 206 insertions(+), 142 deletions(-) create mode 100644 pkg/sql/job_exec_context.go diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 726f0b72eb8f..0b8d77eff2e2 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -172,7 +172,7 @@ func clusterNodeCount(gw gossip.OptionalGossip) (int, error) { // file. func backup( ctx context.Context, - phs sql.PlanHookState, + execCtx sql.JobExecContext, defaultURI string, urisByLocalityKV map[string]string, db *kv.DB, @@ -286,7 +286,7 @@ func backup( if err := distBackup( ctx, - phs, + execCtx, spans, introducedSpans, pkIDs, @@ -403,10 +403,10 @@ type backupResumer struct { // Resume is part of the jobs.Resumer interface. func (b *backupResumer) Resume( - ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, + ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { details := b.job.Details().(jobspb.BackupDetails) - p := phs.(sql.PlanHookState) + p := execCtx.(sql.JobExecContext) // For all backups, partitioned or not, the main BACKUP manifest is stored at // details.URI. @@ -655,18 +655,18 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error { } // OnFailOrCancel is part of the jobs.Resumer interface. -func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { +func (b *backupResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { defer b.maybeNotifyScheduledJobCompletion( ctx, jobs.StatusFailed, - phs.(sql.PlanHookState).ExecCfg(), + execCtx.(sql.JobExecContext).ExecCfg(), ) telemetry.Count("backup.total.failed") telemetry.CountBucketed("backup.duration-sec.failed", int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds())) - p := phs.(sql.PlanHookState) + p := execCtx.(sql.JobExecContext) cfg := p.ExecCfg() if err := b.clearStats(ctx, p.ExecCfg().DB); err != nil { log.Warningf(ctx, "unable to clear stats from job payload: %+v", err) diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 26f6d3fcc8de..df5d9cce3933 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -31,7 +31,7 @@ import ( // build up the BulkOpSummary. func distBackup( ctx context.Context, - phs sql.PlanHookState, + execCtx sql.JobExecContext, spans roachpb.Spans, introducedSpans roachpb.Spans, pkIDs map[uint64]bool, @@ -45,10 +45,10 @@ func distBackup( ctx = logtags.AddTag(ctx, "backup-distsql", nil) var noTxn *kv.Txn - dsp := phs.DistSQLPlanner() - evalCtx := phs.ExtendedEvalContext() + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() - planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, phs.ExecCfg()) + planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg()) if err != nil { return err } @@ -64,8 +64,8 @@ func distBackup( mvccFilter, encryption, startTime, endTime, - phs.User(), - phs.ExecCfg(), + execCtx.User(), + execCtx.ExecCfg(), ) if err != nil { return err diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 0b9fc94e7695..add75f9fa820 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -480,7 +480,7 @@ func rewriteBackupSpanKey(kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb. // files. func restore( restoreCtx context.Context, - phs sql.PlanHookState, + execCtx sql.JobExecContext, numClusterNodes int, backupManifests []BackupManifest, backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, @@ -491,7 +491,7 @@ func restore( job *jobs.Job, encryption *jobspb.BackupEncryptionOptions, ) (RowCount, error) { - user := phs.User() + user := execCtx.User() // A note about contexts and spans in this method: the top-level context // `restoreCtx` is used for orchestration logging. All operations that carry // out work get their individual contexts. @@ -620,7 +620,7 @@ func restore( // TODO(pbardea): Improve logging in processors. if err := distRestore( restoreCtx, - phs, + execCtx, importSpanChunks, pkIDs, encryption, @@ -652,7 +652,7 @@ func restore( // be broken down into two methods. func loadBackupSQLDescs( ctx context.Context, - p sql.PlanHookState, + p sql.JobExecContext, details jobspb.RestoreDetails, encryption *jobspb.BackupEncryptionOptions, ) ([]BackupManifest, BackupManifest, []catalog.Descriptor, error) { @@ -832,7 +832,7 @@ func getTempSystemDBID(details jobspb.RestoreDetails) descpb.ID { // createImportingDescriptors create the tables that we will restore into. It also // fetches the information from the old tables that we need for the restore. func createImportingDescriptors( - ctx context.Context, p sql.PlanHookState, sqlDescs []catalog.Descriptor, r *restoreResumer, + ctx context.Context, p sql.JobExecContext, sqlDescs []catalog.Descriptor, r *restoreResumer, ) (tables []catalog.TableDescriptor, oldTableIDs []descpb.ID, spans []roachpb.Span, err error) { details := r.job.Details().(jobspb.RestoreDetails) @@ -1094,10 +1094,10 @@ func createImportingDescriptors( // Resume is part of the jobs.Resumer interface. func (r *restoreResumer) Resume( - ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, + ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { details := r.job.Details().(jobspb.RestoreDetails) - p := phs.(sql.PlanHookState) + p := execCtx.(sql.JobExecContext) r.versionAtLeast20_2 = p.ExecCfg().Settings.Version.IsActive( ctx, clusterversion.VersionLeasedDatabaseDescriptors) @@ -1475,14 +1475,14 @@ func (r *restoreResumer) publishDescriptors( // has been committed from a restore that has failed or been canceled. It does // this by adding the table descriptors in DROP state, which causes the schema // change stuff to delete the keys in the background. -func (r *restoreResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { +func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { telemetry.Count("restore.total.failed") telemetry.CountBucketed("restore.duration-sec.failed", int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds())) details := r.job.Details().(jobspb.RestoreDetails) - execCfg := phs.(sql.PlanHookState).ExecCfg() + execCfg := execCtx.(sql.JobExecContext).ExecCfg() return descs.Txn(ctx, execCfg.Settings, execCfg.LeaseManager, execCfg.InternalExecutor, execCfg.DB, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { for _, tenant := range details.Tenants { diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index e653c06e0b69..4030c4a81ad3 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -38,7 +38,7 @@ import ( // This method also closes the given progCh. func distRestore( ctx context.Context, - phs sql.PlanHookState, + execCtx sql.JobExecContext, chunks [][]execinfrapb.RestoreSpanEntry, pkIDs map[uint64]bool, encryption *jobspb.BackupEncryptionOptions, @@ -50,13 +50,13 @@ func distRestore( defer close(progCh) var noTxn *kv.Txn - dsp := phs.DistSQLPlanner() - evalCtx := phs.ExtendedEvalContext() + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() if encryption != nil && encryption.Mode == jobspb.EncryptionMode_KMS { kms, err := cloud.KMSFromURI(encryption.KMSInfo.Uri, &backupKMSEnv{ - settings: phs.ExecCfg().Settings, - conf: &phs.ExecCfg().ExternalIODirConfig, + settings: execCtx.ExecCfg().Settings, + conf: &execCtx.ExecCfg().ExternalIODirConfig, }) if err != nil { return err @@ -75,7 +75,7 @@ func distRestore( fileEncryption = &roachpb.FileEncryptionOptions{Key: encryption.Key} } - planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, phs.ExecCfg()) + planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg()) if err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 96e7e1c5e749..08fae8ee623e 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -62,7 +62,7 @@ var changefeedResultTypes = []*types.T{ // progress of the changefeed's corresponding system job. func distChangefeedFlow( ctx context.Context, - phs sql.PlanHookState, + execCtx sql.JobExecContext, jobID int64, details jobspb.ChangefeedDetails, progress jobspb.Progress, @@ -99,7 +99,7 @@ func distChangefeedFlow( spansTS = initialHighWater } - execCfg := phs.ExecCfg() + execCfg := execCtx.ExecCfg() trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, execCfg.Codec, details.Targets, spansTS) if err != nil { return err @@ -111,8 +111,8 @@ func distChangefeedFlow( if err != nil { return err } - dsp := phs.DistSQLPlanner() - evalCtx := phs.ExtendedEvalContext() + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, true /* distribute */) var spanPartitions []sql.SpanPartition @@ -143,7 +143,7 @@ func distChangefeedFlow( corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{ Watches: watches, Feed: details, - UserProto: phs.User().EncodeProto(), + UserProto: execCtx.User().EncodeProto(), } } // NB: This SpanFrontier processor depends on the set of tracked spans being @@ -154,7 +154,7 @@ func distChangefeedFlow( TrackedSpans: trackedSpans, Feed: details, JobID: jobID, - UserProto: phs.User().EncodeProto(), + UserProto: execCtx.User().EncodeProto(), } p := sql.MakePhysicalPlan(gatewayNodeID) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index a6b2f878293a..28a2d5f60d96 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -576,10 +576,10 @@ func generateChangefeedSessionID() string { // Resume is part of the jobs.Resumer interface. func (b *changefeedResumer) Resume( - ctx context.Context, planHookState interface{}, startedCh chan<- tree.Datums, + ctx context.Context, exec interface{}, startedCh chan<- tree.Datums, ) error { - phs := planHookState.(sql.PlanHookState) - execCfg := phs.ExecCfg() + jobExec := exec.(sql.JobExecContext) + execCfg := jobExec.ExecCfg() jobID := *b.job.ID() details := b.job.Details().(jobspb.ChangefeedDetails) progress := b.job.Progress() @@ -595,7 +595,7 @@ func (b *changefeedResumer) Resume( } var err error for r := retry.StartWithCtx(ctx, opts); r.Next(); { - if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil { + if err = distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh); err == nil { return nil } if !IsRetryableError(err) { @@ -646,9 +646,9 @@ func (b *changefeedResumer) Resume( } // OnFailOrCancel is part of the jobs.Resumer interface. -func (b *changefeedResumer) OnFailOrCancel(ctx context.Context, planHookState interface{}) error { - phs := planHookState.(sql.PlanHookState) - execCfg := phs.ExecCfg() +func (b *changefeedResumer) OnFailOrCancel(ctx context.Context, jobExec interface{}) error { + exec := jobExec.(sql.JobExecContext) + execCfg := exec.ExecCfg() progress := b.job.Progress() b.maybeCleanUpProtectedTimestamp(ctx, execCfg.DB, execCfg.ProtectedTimestampProvider, progress.GetChangefeed().ProtectedTimestampRecord) @@ -660,7 +660,7 @@ func (b *changefeedResumer) OnFailOrCancel(ctx context.Context, planHookState in telemetry.Count(`changefeed.enterprise.cancel`) } else { telemetry.Count(`changefeed.enterprise.fail`) - phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Failures.Inc(1) + exec.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Failures.Inc(1) } return nil } @@ -688,7 +688,7 @@ var _ jobs.PauseRequester = (*changefeedResumer)(nil) // paused, we want to install a protected timestamp at the most recent high // watermark if there isn't already one. func (b *changefeedResumer) OnPauseRequest( - ctx context.Context, planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress, + ctx context.Context, jobExec interface{}, txn *kv.Txn, progress *jobspb.Progress, ) error { details := b.job.Details().(jobspb.ChangefeedDetails) if _, shouldPause := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldPause { @@ -713,7 +713,7 @@ func (b *changefeedResumer) OnPauseRequest( return nil } - pts := planHookState.(sql.PlanHookState).ExecCfg().ProtectedTimestampProvider + pts := jobExec.(sql.JobExecContext).ExecCfg().ProtectedTimestampProvider return createProtectedTimestampRecord(ctx, pts, txn, *b.job.ID(), details.Targets, *resolved, cp) } diff --git a/pkg/ccl/importccl/import_processor_test.go b/pkg/ccl/importccl/import_processor_test.go index 6de1e8a5a3ce..a94494069a81 100644 --- a/pkg/ccl/importccl/import_processor_test.go +++ b/pkg/ccl/importccl/import_processor_test.go @@ -548,11 +548,11 @@ type cancellableImportResumer struct { } func (r *cancellableImportResumer) Resume( - _ context.Context, phs interface{}, resultsCh chan<- tree.Datums, + _ context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { r.jobID = *r.wrapped.job.ID() r.jobIDCh <- r.jobID - if err := r.wrapped.Resume(r.ctx, phs, resultsCh); err != nil { + if err := r.wrapped.Resume(r.ctx, execCtx, resultsCh); err != nil { return err } if r.onSuccessBarrier != nil { @@ -561,7 +561,7 @@ func (r *cancellableImportResumer) Resume( return errors.New("job succeed, but we're forcing it to be paused") } -func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { +func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { // This callback is invoked when an error or cancellation occurs // during the import. Since our Resume handler returned an // error (after pausing the job), we need to short-circuits diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 41175d01b47d..9284f353b7dd 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -954,7 +955,7 @@ func prepareNewTableDescsForIngestion( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, - p sql.PlanHookState, + p sql.JobExecContext, importTables []jobspb.ImportDetails_Table, parentID descpb.ID, ) ([]*descpb.TableDescriptor, error) { @@ -1084,7 +1085,7 @@ func prepareExistingTableDescForIngestion( // step of import. The descriptors are in an IMPORTING state (offline) on // successful completion of this method. func (r *importResumer) prepareTableDescsForIngestion( - ctx context.Context, p sql.PlanHookState, details jobspb.ImportDetails, + ctx context.Context, p sql.JobExecContext, details jobspb.ImportDetails, ) error { err := descs.Txn(ctx, p.ExecCfg().Settings, p.ExecCfg().LeaseManager, p.ExecCfg().InternalExecutor, p.ExecCfg().DB, func( @@ -1169,7 +1170,7 @@ func (r *importResumer) prepareTableDescsForIngestion( // descriptors for bundle formats. func parseAndCreateBundleTableDescs( ctx context.Context, - p sql.PlanHookState, + p sql.JobExecContext, details jobspb.ImportDetails, seqVals map[descpb.ID]int64, skipFKs bool, @@ -1177,6 +1178,7 @@ func parseAndCreateBundleTableDescs( files []string, format roachpb.IOFileFormat, walltime int64, + owner security.SQLUsername, ) ([]*tabledesc.Mutable, error) { var tableDescs []*tabledesc.Mutable @@ -1210,10 +1212,10 @@ func parseAndCreateBundleTableDescs( switch format.Format { case roachpb.IOFileFormat_Mysqldump: evalCtx := &p.ExtendedEvalContext().EvalContext - tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals) + tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals, owner) case roachpb.IOFileFormat_PgDump: evalCtx := &p.ExtendedEvalContext().EvalContext - tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize)) + tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize), owner) default: return tableDescs, errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String()) } @@ -1230,7 +1232,7 @@ func parseAndCreateBundleTableDescs( } func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs interface{}) error { - p := phs.(sql.PlanHookState) + p := phs.(sql.JobExecContext) seqVals := make(map[descpb.ID]int64) details := r.job.Details().(jobspb.ImportDetails) skipFKs := details.SkipFKs @@ -1238,6 +1240,8 @@ func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs inter files := details.URIs format := details.Format + owner := r.job.Payload().UsernameProto.Decode() + if details.ParseBundleSchema { if err := r.job.RunningStatus(ctx, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) { return runningStatusImportBundleParseSchema, nil @@ -1250,7 +1254,7 @@ func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs inter walltime := p.ExecCfg().Clock.Now().WallTime if tableDescs, err = parseAndCreateBundleTableDescs( - ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime); err != nil { + ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime, owner); err != nil { return err } @@ -1285,9 +1289,9 @@ func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs inter // Resume is part of the jobs.Resumer interface. func (r *importResumer) Resume( - ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, + ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { - p := phs.(sql.PlanHookState) + p := execCtx.(sql.JobExecContext) if err := r.parseBundleSchemaIfNeeded(ctx, p); err != nil { return err } @@ -1514,10 +1518,10 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor // been committed from a import that has failed or been canceled. It does this // by adding the table descriptors in DROP state, which causes the schema change // stuff to delete the keys in the background. -func (r *importResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { +func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { details := r.job.Details().(jobspb.ImportDetails) addToFileFormatTelemetry(details.Format.Format.String(), "failed") - cfg := phs.(sql.PlanHookState).ExecCfg() + cfg := execCtx.(sql.JobExecContext).ExecCfg() lm, ie, db := cfg.LeaseManager, cfg.InternalExecutor, cfg.DB return descs.Txn(ctx, cfg.Settings, lm, ie, db, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, diff --git a/pkg/ccl/importccl/read_import_mysql.go b/pkg/ccl/importccl/read_import_mysql.go index 77c7ec018172..261162302df3 100644 --- a/pkg/ccl/importccl/read_import_mysql.go +++ b/pkg/ccl/importccl/read_import_mysql.go @@ -285,11 +285,12 @@ func readMysqlCreateTable( ctx context.Context, input io.Reader, evalCtx *tree.EvalContext, - p sql.PlanHookState, + p sql.JobExecContext, startingID, parentID descpb.ID, match string, fks fkHandler, seqVals map[descpb.ID]int64, + owner security.SQLUsername, ) ([]*tabledesc.Mutable, error) { match = lexbase.NormalizeName(match) r := bufio.NewReaderSize(input, 1024*64) @@ -321,7 +322,7 @@ func readMysqlCreateTable( continue } id := descpb.ID(int(startingID) + len(ret)) - tbl, moreFKs, err := mysqlTableToCockroach(ctx, evalCtx, p, parentID, id, name, i.TableSpec, fks, seqVals) + tbl, moreFKs, err := mysqlTableToCockroach(ctx, evalCtx, p, parentID, id, name, i.TableSpec, fks, seqVals, owner) if err != nil { return nil, err } @@ -361,12 +362,13 @@ func safeName(in mysqlIdent) tree.Name { func mysqlTableToCockroach( ctx context.Context, evalCtx *tree.EvalContext, - p sql.PlanHookState, + p sql.JobExecContext, parentID, id descpb.ID, name string, in *mysql.TableSpec, fks fkHandler, seqVals map[descpb.ID]int64, + owner security.SQLUsername, ) ([]*tabledesc.Mutable, []delayedFK, error) { if in == nil { return nil, nil, errors.Errorf("could not read definition for table %q (possible unsupported type?)", name) @@ -400,7 +402,6 @@ func mysqlTableToCockroach( var seqDesc *tabledesc.Mutable // If we have an auto-increment seq, create it and increment the id. - owner := security.AdminRoleName() if seqName != "" { var opts tree.SequenceOptions if startingValue != 0 { @@ -409,10 +410,6 @@ func mysqlTableToCockroach( } var err error if p != nil { - params := p.RunParams(ctx) - if params.SessionData() != nil { - owner = params.SessionData().User() - } priv := descpb.NewDefaultPrivilegeDescriptor(owner) seqDesc, err = sql.NewSequenceTableDesc( ctx, @@ -424,7 +421,7 @@ func mysqlTableToCockroach( time, priv, tree.PersistencePermanent, - ¶ms, + nil, /* params */ ) } else { priv := descpb.NewDefaultPrivilegeDescriptor(owner) diff --git a/pkg/ccl/importccl/read_import_mysql_test.go b/pkg/ccl/importccl/read_import_mysql_test.go index 302db52a4d5d..9aa29fb74d73 100644 --- a/pkg/ccl/importccl/read_import_mysql_test.go +++ b/pkg/ccl/importccl/read_import_mysql_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -117,7 +118,7 @@ func readMysqlCreateFrom( } defer f.Close() - tbl, err := readMysqlCreateTable(context.Background(), f, testEvalCtx, nil, id, expectedParent, name, fks, map[descpb.ID]int64{}) + tbl, err := readMysqlCreateTable(context.Background(), f, testEvalCtx, nil, id, expectedParent, name, fks, map[descpb.ID]int64{}, security.RootUserName()) if err != nil { t.Fatal(err) } diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index 701bf2dd2d85..881ea9473fd9 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -219,12 +219,13 @@ func readPostgresCreateTable( ctx context.Context, input io.Reader, evalCtx *tree.EvalContext, - p sql.PlanHookState, + p sql.JobExecContext, match string, parentID descpb.ID, walltime int64, fks fkHandler, max int, + owner security.SQLUsername, ) ([]*tabledesc.Mutable, error) { // Modify the CreateTable stmt with the various index additions. We do this // instead of creating a full table descriptor first and adding indexes @@ -236,15 +237,10 @@ func readPostgresCreateTable( createSeq := make(map[string]*tree.CreateSequence) tableFKs := make(map[string][]*tree.ForeignKeyConstraintTableDef) ps := newPostgreStream(input, max) - params := p.RunParams(ctx) for { stmt, err := ps.Next() if err == io.EOF { ret := make([]*tabledesc.Mutable, 0, len(createTbl)) - owner := security.AdminRoleName() - if params.SessionData() != nil { - owner = params.SessionData().User() - } for name, seq := range createSeq { id := descpb.ID(int(defaultCSVTableID) + len(ret)) desc, err := sql.NewSequenceTableDesc( @@ -257,7 +253,7 @@ func readPostgresCreateTable( hlc.Timestamp{WallTime: walltime}, descpb.NewDefaultPrivilegeDescriptor(owner), tree.PersistencePermanent, - ¶ms, + nil, /* params */ ) if err != nil { return nil, err diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index d454db036b83..7783618403c0 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -234,7 +234,7 @@ func (r *Registry) runJob( job.mu.Unlock() // Bookkeeping. - phs, cleanup := r.planFn("resume-"+taskName, username) + execCtx, cleanup := r.execCtx("resume-"+taskName, username) defer cleanup() spanName := fmt.Sprintf(`%s-%d`, typ, *job.ID()) var span opentracing.Span @@ -242,7 +242,7 @@ func (r *Registry) runJob( defer span.Finish() // Run the actual job. - err := r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, status, finalResumeError) + err := r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, status, finalResumeError) if err != nil { // TODO (lucy): This needs to distinguish between assertion errors in // the job registry and assertion errors in job execution returned from @@ -269,7 +269,7 @@ SET status = WHEN status = $1 THEN $2 WHEN status = $3 THEN $4 ELSE status - END + END WHERE (status IN ($1, $3)) AND (claim_session_id = $5 AND claim_instance_id = $6) RETURNING id, status`, StatusPauseRequested, StatusPaused, diff --git a/pkg/jobs/deprecated.go b/pkg/jobs/deprecated.go index c1a24d2d22d4..d606256dd149 100644 --- a/pkg/jobs/deprecated.go +++ b/pkg/jobs/deprecated.go @@ -352,7 +352,7 @@ func (r *Registry) deprecatedResume( } // Bookkeeping. payload := job.Payload() - phs, cleanup := r.planFn("resume-"+job.taskName(), payload.UsernameProto.Decode()) + execCtx, cleanup := r.execCtx("resume-"+job.taskName(), payload.UsernameProto.Decode()) defer cleanup() spanName := fmt.Sprintf(`%s-%d`, payload.Type(), *job.ID()) var span opentracing.Span @@ -366,7 +366,7 @@ func (r *Registry) deprecatedResume( if job.Payload().FinalResumeError != nil { finalResumeError = errors.DecodeError(ctx, *job.Payload().FinalResumeError) } - err = r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, status, finalResumeError) + err = r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, status, finalResumeError) if err != nil { // TODO (lucy): This needs to distinguish between assertion errors in // the job registry and assertion errors in job execution returned from diff --git a/pkg/jobs/helpers_test.go b/pkg/jobs/helpers_test.go index 0a803bd680ab..96e857a68c4e 100644 --- a/pkg/jobs/helpers_test.go +++ b/pkg/jobs/helpers_test.go @@ -56,12 +56,12 @@ type OnPauseRequestFunc = onPauseRequestFunc var _ PauseRequester = FakeResumer{} func (d FakeResumer) OnPauseRequest( - ctx context.Context, phs interface{}, txn *kv.Txn, details *jobspb.Progress, + ctx context.Context, execCtx interface{}, txn *kv.Txn, details *jobspb.Progress, ) error { if d.PauseRequest == nil { return nil } - return d.PauseRequest(ctx, phs, txn, details) + return d.PauseRequest(ctx, execCtx, txn, details) } // Started is a wrapper around the internal function that moves a job to the diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 9360e946531b..967f2e87dd38 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -499,9 +499,9 @@ func (j *Job) pauseRequested(ctx context.Context, fn onPauseRequestFunc) error { return fmt.Errorf("job with status %s cannot be requested to be paused", md.Status) } if fn != nil { - phs, cleanup := j.registry.planFn("pause request", j.Payload().UsernameProto.Decode()) + execCtx, cleanup := j.registry.execCtx("pause request", j.Payload().UsernameProto.Decode()) defer cleanup() - if err := fn(ctx, phs, txn, md.Progress); err != nil { + if err := fn(ctx, execCtx, txn, md.Progress); err != nil { return err } ju.UpdateProgress(md.Progress) diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index c4358ad74571..805bf41480ef 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -1680,7 +1680,7 @@ func TestShowJobsWithError(t *testing.T) { } if _, err := sqlDB.Exec(` -- Create a corrupted progress field. - INSERT INTO system.jobs(id, status, payload, progress) SELECT id+2, status, payload, '\xaaaa'::BYTES FROM system.jobs WHERE id = $1; + INSERT INTO system.jobs(id, status, payload, progress) SELECT id+2, status, payload, '\xaaaa'::BYTES FROM system.jobs WHERE id = $1; `, jobID); err != nil { t.Fatal(err) } @@ -1956,7 +1956,7 @@ func TestJobInTxn(t *testing.T) { defer sql.ClearPlanHooks() // Piggy back on BACKUP to be able to create a succeeding test job. sql.AddPlanHook( - func(_ context.Context, stmt tree.Statement, phs sql.PlanHookState, + func(_ context.Context, stmt tree.Statement, execCtx sql.PlanHookState, ) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) { st, ok := stmt.(*tree.Backup) if !ok { @@ -1964,7 +1964,7 @@ func TestJobInTxn(t *testing.T) { } fn := func(_ context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error { var err error - job, err = phs.ExtendedEvalContext().QueueJob( + job, err = execCtx.ExtendedEvalContext().QueueJob( jobs.Record{ Description: st.String(), Details: jobspb.BackupDetails{}, @@ -1991,7 +1991,7 @@ func TestJobInTxn(t *testing.T) { }) // Piggy back on RESTORE to be able to create a failing test job. sql.AddPlanHook( - func(_ context.Context, stmt tree.Statement, phs sql.PlanHookState, + func(_ context.Context, stmt tree.Statement, execCtx sql.PlanHookState, ) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) { _, ok := stmt.(*tree.Restore) if !ok { @@ -1999,7 +1999,7 @@ func TestJobInTxn(t *testing.T) { } fn := func(_ context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error { var err error - job, err = phs.ExtendedEvalContext().QueueJob( + job, err = execCtx.ExtendedEvalContext().QueueJob( jobs.Record{ Description: "RESTORE", Details: jobspb.RestoreDetails{}, diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 1f3843979a9b..bcc92afb3076 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -108,7 +108,7 @@ type Registry struct { clock *hlc.Clock nodeID *base.SQLIDContainer settings *cluster.Settings - planFn planHookMaker + execCtx jobExecCtxMaker metrics Metrics // adoptionChan is used to nudge the registry to resume claimed jobs and @@ -118,7 +118,7 @@ type Registry struct { // sessionBoundInternalExecutorFactory provides a way for jobs to create // internal executors. This is rarely needed, and usually job resumers should - // use the internal executor from the PlanHookState. The intended user of this + // use the internal executor from the JobExecCtx. The intended user of this // interface is the schema change job resumer, which needs to set the // tableCollectionModifier on the internal executor to different values in // multiple concurrent queries. This situation is an exception to the internal @@ -167,26 +167,26 @@ type Registry struct { TestingResumerCreationKnobs map[jobspb.Type]func(Resumer) Resumer } -// planHookMaker is a wrapper around sql.NewInternalPlanner. It returns an +// jobExecCtxMaker is a wrapper around sql.NewInternalPlanner. It returns an // *sql.planner as an interface{} due to package dependency cycles. It should // be cast to that type in the sql package when it is used. Returns a cleanup // function that must be called once the caller is done with the planner. // // TODO(mjibson): Can we do something to avoid passing an interface{} here // that must be type casted in a Resumer? It cannot be done here because -// PlanHookState lives in the sql package, which would create a dependency -// cycle if listed here. Furthermore, moving PlanHookState into a common +// JobExecContext lives in the sql package, which would create a dependency +// cycle if listed here. Furthermore, moving JobExecContext into a common // subpackage like sqlbase is difficult because of the amount of sql-only -// stuff that PlanHookState exports. One other choice is to merge this package +// stuff that JobExecContext exports. One other choice is to merge this package // back into the sql package. There's maybe a better way that I'm unaware of. -type planHookMaker func(opName string, user security.SQLUsername) (interface{}, func()) +type jobExecCtxMaker func(opName string, user security.SQLUsername) (interface{}, func()) // PreventAdoptionFile is the name of the file which, if present in the first // on-disk store, will prevent the adoption of background jobs by that node. const PreventAdoptionFile = "DISABLE_STARTING_BACKGROUND_JOBS" // MakeRegistry creates a new Registry. planFn is a wrapper around -// sql.newInternalPlanner. It returns a sql.PlanHookState, but must be +// sql.newInternalPlanner. It returns a sql.JobExecCtx, but must be // coerced into that in the Resumer functions. func MakeRegistry( ac log.AmbientContext, @@ -199,7 +199,7 @@ func MakeRegistry( sqlInstance sqlliveness.Instance, settings *cluster.Settings, histogramWindowInterval time.Duration, - planFn planHookMaker, + execCtxFn jobExecCtxMaker, preventAdoptionFile string, ) *Registry { r := &Registry{ @@ -212,7 +212,7 @@ func MakeRegistry( nodeID: nodeID, sqlInstance: sqlInstance, settings: settings, - planFn: planFn, + execCtx: execCtxFn, preventAdoptionFile: preventAdoptionFile, adoptionCh: make(chan adoptionNotice), } @@ -984,9 +984,9 @@ func (r *Registry) Unpause(ctx context.Context, txn *kv.Txn, id int64) error { // type Resumer interface { // Resume is called when a job is started or resumed. Sending results on the - // chan will return them to a user, if a user's session is connected. phs - // is a sql.PlanHookState. - Resume(ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums) error + // chan will return them to a user, if a user's session is connected. execCtx + // is a sql.JobExecCtx. + Resume(ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums) error // OnFailOrCancel is called when a job fails or is cancel-requested. // @@ -994,7 +994,7 @@ type Resumer interface { // which is not guaranteed to run on the node where the job is running. So it // cannot assume that any other methods have been called on this Resumer // object. - OnFailOrCancel(ctx context.Context, phs interface{}) error + OnFailOrCancel(ctx context.Context, execCtx interface{}) error } // PauseRequester is an extension of Resumer which allows job implementers to inject @@ -1003,9 +1003,9 @@ type PauseRequester interface { Resumer // OnPauseRequest is called in the transaction that moves a job to PauseRequested. - // If an error is returned, the pause request will fail. phs is a - // sql.PlanHookState. - OnPauseRequest(ctx context.Context, phs interface{}, txn *kv.Txn, details *jobspb.Progress) error + // If an error is returned, the pause request will fail. execCtx is a + // sql.JobExecCtx. + OnPauseRequest(ctx context.Context, execCtx interface{}, txn *kv.Txn, details *jobspb.Progress) error } // Constructor creates a resumable job of a certain type. The Resumer is @@ -1059,7 +1059,7 @@ func (r retryJobError) Error() string { // the job was not completed with success. status is the current job status. func (r *Registry) stepThroughStateMachine( ctx context.Context, - phs interface{}, + execCtx interface{}, resumer Resumer, resultsCh chan<- tree.Datums, job *Job, @@ -1086,11 +1086,11 @@ func (r *Registry) stepThroughStateMachine( func() { jm.CurrentlyRunning.Inc(1) defer jm.CurrentlyRunning.Dec(1) - err = resumer.Resume(resumeCtx, phs, resultsCh) + err = resumer.Resume(resumeCtx, execCtx, resultsCh) }() if err == nil { jm.ResumeCompleted.Inc(1) - return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusSucceeded, nil) + return r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, StatusSucceeded, nil) } if resumeCtx.Err() != nil { // The context was canceled. Tell the user, but don't attempt to @@ -1116,7 +1116,7 @@ func (r *Registry) stepThroughStateMachine( } return sErr } - return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusReverting, err) + return r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, StatusReverting, err) case StatusPauseRequested: return errors.Errorf("job %s", status) case StatusCancelRequested: @@ -1142,7 +1142,7 @@ func (r *Registry) stepThroughStateMachine( // TODO(spaskob): this is silly, we should remove the OnSuccess hooks and // execute them in resume so that the client can handle these errors // better. - return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusReverting, errors.Wrapf(err, "could not mark job %d as succeeded", *job.ID())) + return r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, StatusReverting, errors.Wrapf(err, "could not mark job %d as succeeded", *job.ID())) } return nil case StatusReverting: @@ -1156,7 +1156,7 @@ func (r *Registry) stepThroughStateMachine( func() { jm.CurrentlyRunning.Inc(1) defer jm.CurrentlyRunning.Dec(1) - err = resumer.OnFailOrCancel(onFailOrCancelCtx, phs) + err = resumer.OnFailOrCancel(onFailOrCancelCtx, execCtx) }() if successOnFailOrCancel := err == nil; successOnFailOrCancel { jm.FailOrCancelCompleted.Inc(1) @@ -1166,7 +1166,7 @@ func (r *Registry) stepThroughStateMachine( if HasErrJobCanceled(jobErr) { nextStatus = StatusCanceled } - return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, nextStatus, jobErr) + return r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, nextStatus, jobErr) } if onFailOrCancelCtx.Err() != nil { jm.FailOrCancelRetryError.Inc(1) @@ -1186,7 +1186,7 @@ func (r *Registry) stepThroughStateMachine( } return sErr } - return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusFailed, + return r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, StatusFailed, errors.Wrapf(err, "job %d: cannot be reverted, manual cleanup may be required", *job.ID())) case StatusFailed: if jobErr == nil { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 6a9d39378cab..fcf3413e2703 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -248,7 +248,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { func(opName string, user security.SQLUsername) (interface{}, func()) { // This is a hack to get around a Go package dependency cycle. See comment // in sql/jobs/registry.go on planHookMaker. - return sql.NewInternalPlanner(opName, nil, user, &sql.MemoryMetrics{}, execCfg) + return sql.MakeJobExecContext(opName, user, &sql.MemoryMetrics{}, execCfg) }, cfg.jobAdoptionStopFile, ) diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 47ebc0fd206b..c5f288235f50 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -106,6 +106,7 @@ go_library( "inverted_filter.go", "inverted_join.go", "inverted_span_encoding.go", + "job_exec_context.go", "join.go", "join_predicate.go", "limit.go", diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index eba98263e9bd..18707b6acd08 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -479,9 +479,9 @@ var _ jobs.Resumer = &createStatsResumer{} // Resume is part of the jobs.Resumer interface. func (r *createStatsResumer) Resume( - ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, + ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { - p := phs.(*planner) + p := execCtx.(JobExecContext) details := r.job.Details().(jobspb.CreateStatsDetails) if details.Name == stats.AutoStatsName { // We want to make sure there is only one automatic CREATE STATISTICS job @@ -509,12 +509,12 @@ func (r *createStatsResumer) Resume( evalCtx.Txn = txn if details.AsOf != nil { - p.semaCtx.AsOfTimestamp = details.AsOf - p.extendedEvalCtx.SetTxnTimestamp(details.AsOf.GoTime()) + p.SemaCtx().AsOfTimestamp = details.AsOf + p.ExtendedEvalContext().SetTxnTimestamp(details.AsOf.GoTime()) txn.SetFixedTimestamp(ctx, *details.AsOf) } - planCtx := dsp.NewPlanningCtx(ctx, evalCtx, p, txn, true /* distribute */) + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, txn, true /* distribute */) if err := dsp.planAndRunCreateStats( ctx, evalCtx, planCtx, txn, r.job, NewRowResultWriter(rows), ); err != nil { @@ -582,7 +582,7 @@ func (r *createStatsResumer) Resume( // pending, running, or paused status that started earlier than this one. If // there are, checkRunningJobs returns an error. If job is nil, checkRunningJobs // just checks if there are any pending, running, or paused CreateStats jobs. -func checkRunningJobs(ctx context.Context, job *jobs.Job, p *planner) error { +func checkRunningJobs(ctx context.Context, job *jobs.Job, p JobExecContext) error { var jobID int64 if job != nil { jobID = *job.ID() diff --git a/pkg/sql/distsql_plan_csv.go b/pkg/sql/distsql_plan_csv.go index ed2a7e43a92d..cee4526cb037 100644 --- a/pkg/sql/distsql_plan_csv.go +++ b/pkg/sql/distsql_plan_csv.go @@ -183,7 +183,7 @@ func presplitTableBoundaries( // returned. func DistIngest( ctx context.Context, - phs PlanHookState, + execCtx JobExecContext, job *jobs.Job, tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable, from []string, @@ -193,15 +193,15 @@ func DistIngest( ) (roachpb.BulkOpSummary, error) { ctx = logtags.AddTag(ctx, "import-distsql-ingest", nil) - dsp := phs.DistSQLPlanner() - evalCtx := phs.ExtendedEvalContext() + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() - planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, phs.ExecCfg()) + planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg()) if err != nil { return roachpb.BulkOpSummary{}, err } - inputSpecs := makeImportReaderSpecs(job, tables, from, format, nodes, walltime, phs.User()) + inputSpecs := makeImportReaderSpecs(job, tables, from, format, nodes, walltime, execCtx.User()) gatewayNodeID, err := evalCtx.ExecCfg.NodeID.OptionalNodeIDErr(47970) if err != nil { @@ -285,7 +285,7 @@ func DistIngest( return nil }) - if err := presplitTableBoundaries(ctx, phs.ExecCfg(), tables); err != nil { + if err := presplitTableBoundaries(ctx, execCtx.ExecCfg(), tables); err != nil { return roachpb.BulkOpSummary{}, err } diff --git a/pkg/sql/distsql_plan_stats.go b/pkg/sql/distsql_plan_stats.go index 870da8e9b7d6..33eb02f9ebcf 100644 --- a/pkg/sql/distsql_plan_stats.go +++ b/pkg/sql/distsql_plan_stats.go @@ -80,7 +80,7 @@ func (dsp *DistSQLPlanner) createStatsPlan( if err != nil { return nil, err } - sb := span.MakeBuilder(planCtx.planner.ExecCfg().Codec, desc, scan.index) + sb := span.MakeBuilder(planCtx.ExtendedEvalCtx.Codec, desc, scan.index) scan.spans, err = sb.UnconstrainedSpans() if err != nil { return nil, err @@ -183,7 +183,7 @@ func (dsp *DistSQLPlanner) createStatsPlan( ) // Estimate the expected number of rows based on existing stats in the cache. - tableStats, err := planCtx.planner.execCfg.TableStatsCache.GetTableStats(planCtx.ctx, desc.ID) + tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(planCtx.ctx, desc.ID) if err != nil { return nil, err } diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index 912d4208c199..1b2058092163 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -75,9 +75,9 @@ func performGC( // Resume is part of the jobs.Resumer interface. func (r schemaChangeGCResumer) Resume( - ctx context.Context, phs interface{}, _ chan<- tree.Datums, + ctx context.Context, execCtx interface{}, _ chan<- tree.Datums, ) error { - p := phs.(sql.PlanHookState) + p := execCtx.(sql.JobExecContext) // TODO(pbardea): Wait for no versions. execCfg := p.ExecCfg() if fn := execCfg.GCJobTestingKnobs.RunBeforeResume; fn != nil { diff --git a/pkg/sql/job_exec_context.go b/pkg/sql/job_exec_context.go new file mode 100644 index 000000000000..c9781429a6cc --- /dev/null +++ b/pkg/sql/job_exec_context.go @@ -0,0 +1,65 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" +) + +// plannerJobExecContext is a wrapper to implement JobExecContext with a planner +// without allowing casting directly to a planner. Eventually it would be nice +// if we could implement the API entirely without a planner however the only +// implementation of extendedEvalContext is very tied to a planner. +type plannerJobExecContext struct { + p *planner +} + +// MakeJobExecContext makes a JobExecContext. +func MakeJobExecContext( + opName string, user security.SQLUsername, memMetrics *MemoryMetrics, execCfg *ExecutorConfig, +) (JobExecContext, func()) { + p, close := newInternalPlanner(opName, nil /*txn*/, user, memMetrics, execCfg) + return &plannerJobExecContext{p: p}, close +} + +func (e *plannerJobExecContext) SemaCtx() *tree.SemaContext { return e.p.SemaCtx() } +func (e *plannerJobExecContext) ExtendedEvalContext() *extendedEvalContext { + return e.p.ExtendedEvalContext() +} +func (e *plannerJobExecContext) SessionData() *sessiondata.SessionData { + return e.p.SessionData() +} +func (e *plannerJobExecContext) ExecCfg() *ExecutorConfig { return e.p.ExecCfg() } +func (e *plannerJobExecContext) DistSQLPlanner() *DistSQLPlanner { return e.p.DistSQLPlanner() } +func (e *plannerJobExecContext) LeaseMgr() *lease.Manager { return e.p.LeaseMgr() } +func (e *plannerJobExecContext) User() security.SQLUsername { return e.p.User() } + +// JobExecContext provides the execution environment for a job. It is what is +// passed to the Resume/OnFailOrCancel/OnPauseRequested methods of a jobs's +// Resumer to give that resumer access to things like ExecutorCfg, LeaseMgr, +// etc -- the kinds of things that would usually be on planner or similar during +// a non-job SQL statement's execution. Unlike a planner however, or planner-ish +// interfaces like PlanHookState, JobExecContext does not include a txn or the +// methods that defined in terms of "the" txn, such as privilege/name accessors. +// (though note that ExtendedEvalContext may transitively include methods that +// close over/expect a txn so use it with caution). +type JobExecContext interface { + SemaCtx() *tree.SemaContext + ExtendedEvalContext() *extendedEvalContext + SessionData() *sessiondata.SessionData + ExecCfg() *ExecutorConfig + DistSQLPlanner() *DistSQLPlanner + LeaseMgr() *lease.Manager + User() security.SQLUsername +} diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index c5f93b26bfb1..8c10af77436d 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2049,9 +2049,9 @@ type schemaChangeResumer struct { } func (r schemaChangeResumer) Resume( - ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, + ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { - p := phs.(PlanHookState) + p := execCtx.(JobExecContext) details := r.job.Details().(jobspb.SchemaChangeDetails) if p.ExecCfg().SchemaChangerTestingKnobs.SchemaChangeJobNoOp != nil && p.ExecCfg().SchemaChangerTestingKnobs.SchemaChangeJobNoOp() { @@ -2221,8 +2221,8 @@ func (r schemaChangeResumer) Resume( } // OnFailOrCancel is part of the jobs.Resumer interface. -func (r schemaChangeResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { - p := phs.(PlanHookState) +func (r schemaChangeResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { + p := execCtx.(JobExecContext) details := r.job.Details().(jobspb.SchemaChangeDetails) if details.DroppedDatabaseID != descpb.InvalidID { diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index 8c6a66ec1047..d1996f1dc336 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -265,27 +265,27 @@ type typeChangeResumer struct { // Resume implements the jobs.Resumer interface. func (t *typeChangeResumer) Resume( - ctx context.Context, phs interface{}, _ chan<- tree.Datums, + ctx context.Context, execCtx interface{}, _ chan<- tree.Datums, ) error { - p := phs.(*planner) - if p.execCfg.TypeSchemaChangerTestingKnobs.TypeSchemaChangeJobNoOp != nil { - if p.execCfg.TypeSchemaChangerTestingKnobs.TypeSchemaChangeJobNoOp() { + p := execCtx.(JobExecContext) + if p.ExecCfg().TypeSchemaChangerTestingKnobs.TypeSchemaChangeJobNoOp != nil { + if p.ExecCfg().TypeSchemaChangerTestingKnobs.TypeSchemaChangeJobNoOp() { return nil } } tc := &typeSchemaChanger{ typeID: t.job.Details().(jobspb.TypeSchemaChangeDetails).TypeID, - execCfg: p.execCfg, + execCfg: p.ExecCfg(), } return tc.execWithRetry(ctx) } // OnFailOrCancel implements the jobs.Resumer interface. -func (t *typeChangeResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { +func (t *typeChangeResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { // If the job failed, just try again to clean up any draining names. tc := &typeSchemaChanger{ typeID: t.job.Details().(jobspb.TypeSchemaChangeDetails).TypeID, - execCfg: phs.(*planner).ExecCfg(), + execCfg: execCtx.(JobExecContext).ExecCfg(), } return drainNamesForDescriptor(