diff --git a/pkg/ccl/backupccl/backup_job.go b/pkg/ccl/backupccl/backup_job.go index 726f0b72eb8f..0b8d77eff2e2 100644 --- a/pkg/ccl/backupccl/backup_job.go +++ b/pkg/ccl/backupccl/backup_job.go @@ -172,7 +172,7 @@ func clusterNodeCount(gw gossip.OptionalGossip) (int, error) { // file. func backup( ctx context.Context, - phs sql.PlanHookState, + execCtx sql.JobExecContext, defaultURI string, urisByLocalityKV map[string]string, db *kv.DB, @@ -286,7 +286,7 @@ func backup( if err := distBackup( ctx, - phs, + execCtx, spans, introducedSpans, pkIDs, @@ -403,10 +403,10 @@ type backupResumer struct { // Resume is part of the jobs.Resumer interface. func (b *backupResumer) Resume( - ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, + ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { details := b.job.Details().(jobspb.BackupDetails) - p := phs.(sql.PlanHookState) + p := execCtx.(sql.JobExecContext) // For all backups, partitioned or not, the main BACKUP manifest is stored at // details.URI. @@ -655,18 +655,18 @@ func (b *backupResumer) clearStats(ctx context.Context, DB *kv.DB) error { } // OnFailOrCancel is part of the jobs.Resumer interface. -func (b *backupResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { +func (b *backupResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { defer b.maybeNotifyScheduledJobCompletion( ctx, jobs.StatusFailed, - phs.(sql.PlanHookState).ExecCfg(), + execCtx.(sql.JobExecContext).ExecCfg(), ) telemetry.Count("backup.total.failed") telemetry.CountBucketed("backup.duration-sec.failed", int64(timeutil.Since(timeutil.FromUnixMicros(b.job.Payload().StartedMicros)).Seconds())) - p := phs.(sql.PlanHookState) + p := execCtx.(sql.JobExecContext) cfg := p.ExecCfg() if err := b.clearStats(ctx, p.ExecCfg().DB); err != nil { log.Warningf(ctx, "unable to clear stats from job payload: %+v", err) diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 26f6d3fcc8de..df5d9cce3933 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -31,7 +31,7 @@ import ( // build up the BulkOpSummary. func distBackup( ctx context.Context, - phs sql.PlanHookState, + execCtx sql.JobExecContext, spans roachpb.Spans, introducedSpans roachpb.Spans, pkIDs map[uint64]bool, @@ -45,10 +45,10 @@ func distBackup( ctx = logtags.AddTag(ctx, "backup-distsql", nil) var noTxn *kv.Txn - dsp := phs.DistSQLPlanner() - evalCtx := phs.ExtendedEvalContext() + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() - planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, phs.ExecCfg()) + planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg()) if err != nil { return err } @@ -64,8 +64,8 @@ func distBackup( mvccFilter, encryption, startTime, endTime, - phs.User(), - phs.ExecCfg(), + execCtx.User(), + execCtx.ExecCfg(), ) if err != nil { return err diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 0b9fc94e7695..add75f9fa820 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -480,7 +480,7 @@ func rewriteBackupSpanKey(kr *storageccl.KeyRewriter, key roachpb.Key) (roachpb. // files. func restore( restoreCtx context.Context, - phs sql.PlanHookState, + execCtx sql.JobExecContext, numClusterNodes int, backupManifests []BackupManifest, backupLocalityInfo []jobspb.RestoreDetails_BackupLocalityInfo, @@ -491,7 +491,7 @@ func restore( job *jobs.Job, encryption *jobspb.BackupEncryptionOptions, ) (RowCount, error) { - user := phs.User() + user := execCtx.User() // A note about contexts and spans in this method: the top-level context // `restoreCtx` is used for orchestration logging. All operations that carry // out work get their individual contexts. @@ -620,7 +620,7 @@ func restore( // TODO(pbardea): Improve logging in processors. if err := distRestore( restoreCtx, - phs, + execCtx, importSpanChunks, pkIDs, encryption, @@ -652,7 +652,7 @@ func restore( // be broken down into two methods. func loadBackupSQLDescs( ctx context.Context, - p sql.PlanHookState, + p sql.JobExecContext, details jobspb.RestoreDetails, encryption *jobspb.BackupEncryptionOptions, ) ([]BackupManifest, BackupManifest, []catalog.Descriptor, error) { @@ -832,7 +832,7 @@ func getTempSystemDBID(details jobspb.RestoreDetails) descpb.ID { // createImportingDescriptors create the tables that we will restore into. It also // fetches the information from the old tables that we need for the restore. func createImportingDescriptors( - ctx context.Context, p sql.PlanHookState, sqlDescs []catalog.Descriptor, r *restoreResumer, + ctx context.Context, p sql.JobExecContext, sqlDescs []catalog.Descriptor, r *restoreResumer, ) (tables []catalog.TableDescriptor, oldTableIDs []descpb.ID, spans []roachpb.Span, err error) { details := r.job.Details().(jobspb.RestoreDetails) @@ -1094,10 +1094,10 @@ func createImportingDescriptors( // Resume is part of the jobs.Resumer interface. func (r *restoreResumer) Resume( - ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, + ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { details := r.job.Details().(jobspb.RestoreDetails) - p := phs.(sql.PlanHookState) + p := execCtx.(sql.JobExecContext) r.versionAtLeast20_2 = p.ExecCfg().Settings.Version.IsActive( ctx, clusterversion.VersionLeasedDatabaseDescriptors) @@ -1475,14 +1475,14 @@ func (r *restoreResumer) publishDescriptors( // has been committed from a restore that has failed or been canceled. It does // this by adding the table descriptors in DROP state, which causes the schema // change stuff to delete the keys in the background. -func (r *restoreResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { +func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { telemetry.Count("restore.total.failed") telemetry.CountBucketed("restore.duration-sec.failed", int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds())) details := r.job.Details().(jobspb.RestoreDetails) - execCfg := phs.(sql.PlanHookState).ExecCfg() + execCfg := execCtx.(sql.JobExecContext).ExecCfg() return descs.Txn(ctx, execCfg.Settings, execCfg.LeaseManager, execCfg.InternalExecutor, execCfg.DB, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { for _, tenant := range details.Tenants { diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index e653c06e0b69..4030c4a81ad3 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -38,7 +38,7 @@ import ( // This method also closes the given progCh. func distRestore( ctx context.Context, - phs sql.PlanHookState, + execCtx sql.JobExecContext, chunks [][]execinfrapb.RestoreSpanEntry, pkIDs map[uint64]bool, encryption *jobspb.BackupEncryptionOptions, @@ -50,13 +50,13 @@ func distRestore( defer close(progCh) var noTxn *kv.Txn - dsp := phs.DistSQLPlanner() - evalCtx := phs.ExtendedEvalContext() + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() if encryption != nil && encryption.Mode == jobspb.EncryptionMode_KMS { kms, err := cloud.KMSFromURI(encryption.KMSInfo.Uri, &backupKMSEnv{ - settings: phs.ExecCfg().Settings, - conf: &phs.ExecCfg().ExternalIODirConfig, + settings: execCtx.ExecCfg().Settings, + conf: &execCtx.ExecCfg().ExternalIODirConfig, }) if err != nil { return err @@ -75,7 +75,7 @@ func distRestore( fileEncryption = &roachpb.FileEncryptionOptions{Key: encryption.Key} } - planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, phs.ExecCfg()) + planCtx, _, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg()) if err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index 96e7e1c5e749..08fae8ee623e 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -62,7 +62,7 @@ var changefeedResultTypes = []*types.T{ // progress of the changefeed's corresponding system job. func distChangefeedFlow( ctx context.Context, - phs sql.PlanHookState, + execCtx sql.JobExecContext, jobID int64, details jobspb.ChangefeedDetails, progress jobspb.Progress, @@ -99,7 +99,7 @@ func distChangefeedFlow( spansTS = initialHighWater } - execCfg := phs.ExecCfg() + execCfg := execCtx.ExecCfg() trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, execCfg.Codec, details.Targets, spansTS) if err != nil { return err @@ -111,8 +111,8 @@ func distChangefeedFlow( if err != nil { return err } - dsp := phs.DistSQLPlanner() - evalCtx := phs.ExtendedEvalContext() + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, true /* distribute */) var spanPartitions []sql.SpanPartition @@ -143,7 +143,7 @@ func distChangefeedFlow( corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{ Watches: watches, Feed: details, - UserProto: phs.User().EncodeProto(), + UserProto: execCtx.User().EncodeProto(), } } // NB: This SpanFrontier processor depends on the set of tracked spans being @@ -154,7 +154,7 @@ func distChangefeedFlow( TrackedSpans: trackedSpans, Feed: details, JobID: jobID, - UserProto: phs.User().EncodeProto(), + UserProto: execCtx.User().EncodeProto(), } p := sql.MakePhysicalPlan(gatewayNodeID) diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index a6b2f878293a..28a2d5f60d96 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -576,10 +576,10 @@ func generateChangefeedSessionID() string { // Resume is part of the jobs.Resumer interface. func (b *changefeedResumer) Resume( - ctx context.Context, planHookState interface{}, startedCh chan<- tree.Datums, + ctx context.Context, exec interface{}, startedCh chan<- tree.Datums, ) error { - phs := planHookState.(sql.PlanHookState) - execCfg := phs.ExecCfg() + jobExec := exec.(sql.JobExecContext) + execCfg := jobExec.ExecCfg() jobID := *b.job.ID() details := b.job.Details().(jobspb.ChangefeedDetails) progress := b.job.Progress() @@ -595,7 +595,7 @@ func (b *changefeedResumer) Resume( } var err error for r := retry.StartWithCtx(ctx, opts); r.Next(); { - if err = distChangefeedFlow(ctx, phs, jobID, details, progress, startedCh); err == nil { + if err = distChangefeedFlow(ctx, jobExec, jobID, details, progress, startedCh); err == nil { return nil } if !IsRetryableError(err) { @@ -646,9 +646,9 @@ func (b *changefeedResumer) Resume( } // OnFailOrCancel is part of the jobs.Resumer interface. -func (b *changefeedResumer) OnFailOrCancel(ctx context.Context, planHookState interface{}) error { - phs := planHookState.(sql.PlanHookState) - execCfg := phs.ExecCfg() +func (b *changefeedResumer) OnFailOrCancel(ctx context.Context, jobExec interface{}) error { + exec := jobExec.(sql.JobExecContext) + execCfg := exec.ExecCfg() progress := b.job.Progress() b.maybeCleanUpProtectedTimestamp(ctx, execCfg.DB, execCfg.ProtectedTimestampProvider, progress.GetChangefeed().ProtectedTimestampRecord) @@ -660,7 +660,7 @@ func (b *changefeedResumer) OnFailOrCancel(ctx context.Context, planHookState in telemetry.Count(`changefeed.enterprise.cancel`) } else { telemetry.Count(`changefeed.enterprise.fail`) - phs.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Failures.Inc(1) + exec.ExecCfg().JobRegistry.MetricsStruct().Changefeed.(*Metrics).Failures.Inc(1) } return nil } @@ -688,7 +688,7 @@ var _ jobs.PauseRequester = (*changefeedResumer)(nil) // paused, we want to install a protected timestamp at the most recent high // watermark if there isn't already one. func (b *changefeedResumer) OnPauseRequest( - ctx context.Context, planHookState interface{}, txn *kv.Txn, progress *jobspb.Progress, + ctx context.Context, jobExec interface{}, txn *kv.Txn, progress *jobspb.Progress, ) error { details := b.job.Details().(jobspb.ChangefeedDetails) if _, shouldPause := details.Opts[changefeedbase.OptProtectDataFromGCOnPause]; !shouldPause { @@ -713,7 +713,7 @@ func (b *changefeedResumer) OnPauseRequest( return nil } - pts := planHookState.(sql.PlanHookState).ExecCfg().ProtectedTimestampProvider + pts := jobExec.(sql.JobExecContext).ExecCfg().ProtectedTimestampProvider return createProtectedTimestampRecord(ctx, pts, txn, *b.job.ID(), details.Targets, *resolved, cp) } diff --git a/pkg/ccl/importccl/import_processor_test.go b/pkg/ccl/importccl/import_processor_test.go index 6de1e8a5a3ce..a94494069a81 100644 --- a/pkg/ccl/importccl/import_processor_test.go +++ b/pkg/ccl/importccl/import_processor_test.go @@ -548,11 +548,11 @@ type cancellableImportResumer struct { } func (r *cancellableImportResumer) Resume( - _ context.Context, phs interface{}, resultsCh chan<- tree.Datums, + _ context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { r.jobID = *r.wrapped.job.ID() r.jobIDCh <- r.jobID - if err := r.wrapped.Resume(r.ctx, phs, resultsCh); err != nil { + if err := r.wrapped.Resume(r.ctx, execCtx, resultsCh); err != nil { return err } if r.onSuccessBarrier != nil { @@ -561,7 +561,7 @@ func (r *cancellableImportResumer) Resume( return errors.New("job succeed, but we're forcing it to be paused") } -func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { +func (r *cancellableImportResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { // This callback is invoked when an error or cancellation occurs // during the import. Since our Resume handler returned an // error (after pausing the job), we need to short-circuits diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 41175d01b47d..9284f353b7dd 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -954,7 +955,7 @@ func prepareNewTableDescsForIngestion( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, - p sql.PlanHookState, + p sql.JobExecContext, importTables []jobspb.ImportDetails_Table, parentID descpb.ID, ) ([]*descpb.TableDescriptor, error) { @@ -1084,7 +1085,7 @@ func prepareExistingTableDescForIngestion( // step of import. The descriptors are in an IMPORTING state (offline) on // successful completion of this method. func (r *importResumer) prepareTableDescsForIngestion( - ctx context.Context, p sql.PlanHookState, details jobspb.ImportDetails, + ctx context.Context, p sql.JobExecContext, details jobspb.ImportDetails, ) error { err := descs.Txn(ctx, p.ExecCfg().Settings, p.ExecCfg().LeaseManager, p.ExecCfg().InternalExecutor, p.ExecCfg().DB, func( @@ -1169,7 +1170,7 @@ func (r *importResumer) prepareTableDescsForIngestion( // descriptors for bundle formats. func parseAndCreateBundleTableDescs( ctx context.Context, - p sql.PlanHookState, + p sql.JobExecContext, details jobspb.ImportDetails, seqVals map[descpb.ID]int64, skipFKs bool, @@ -1177,6 +1178,7 @@ func parseAndCreateBundleTableDescs( files []string, format roachpb.IOFileFormat, walltime int64, + owner security.SQLUsername, ) ([]*tabledesc.Mutable, error) { var tableDescs []*tabledesc.Mutable @@ -1210,10 +1212,10 @@ func parseAndCreateBundleTableDescs( switch format.Format { case roachpb.IOFileFormat_Mysqldump: evalCtx := &p.ExtendedEvalContext().EvalContext - tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals) + tableDescs, err = readMysqlCreateTable(ctx, reader, evalCtx, p, defaultCSVTableID, parentID, tableName, fks, seqVals, owner) case roachpb.IOFileFormat_PgDump: evalCtx := &p.ExtendedEvalContext().EvalContext - tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize)) + tableDescs, err = readPostgresCreateTable(ctx, reader, evalCtx, p, tableName, parentID, walltime, fks, int(format.PgDump.MaxRowSize), owner) default: return tableDescs, errors.Errorf("non-bundle format %q does not support reading schemas", format.Format.String()) } @@ -1230,7 +1232,7 @@ func parseAndCreateBundleTableDescs( } func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs interface{}) error { - p := phs.(sql.PlanHookState) + p := phs.(sql.JobExecContext) seqVals := make(map[descpb.ID]int64) details := r.job.Details().(jobspb.ImportDetails) skipFKs := details.SkipFKs @@ -1238,6 +1240,8 @@ func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs inter files := details.URIs format := details.Format + owner := r.job.Payload().UsernameProto.Decode() + if details.ParseBundleSchema { if err := r.job.RunningStatus(ctx, func(_ context.Context, _ jobspb.Details) (jobs.RunningStatus, error) { return runningStatusImportBundleParseSchema, nil @@ -1250,7 +1254,7 @@ func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs inter walltime := p.ExecCfg().Clock.Now().WallTime if tableDescs, err = parseAndCreateBundleTableDescs( - ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime); err != nil { + ctx, p, details, seqVals, skipFKs, parentID, files, format, walltime, owner); err != nil { return err } @@ -1285,9 +1289,9 @@ func (r *importResumer) parseBundleSchemaIfNeeded(ctx context.Context, phs inter // Resume is part of the jobs.Resumer interface. func (r *importResumer) Resume( - ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, + ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { - p := phs.(sql.PlanHookState) + p := execCtx.(sql.JobExecContext) if err := r.parseBundleSchemaIfNeeded(ctx, p); err != nil { return err } @@ -1514,10 +1518,10 @@ func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.Executor // been committed from a import that has failed or been canceled. It does this // by adding the table descriptors in DROP state, which causes the schema change // stuff to delete the keys in the background. -func (r *importResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { +func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { details := r.job.Details().(jobspb.ImportDetails) addToFileFormatTelemetry(details.Format.Format.String(), "failed") - cfg := phs.(sql.PlanHookState).ExecCfg() + cfg := execCtx.(sql.JobExecContext).ExecCfg() lm, ie, db := cfg.LeaseManager, cfg.InternalExecutor, cfg.DB return descs.Txn(ctx, cfg.Settings, lm, ie, db, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, diff --git a/pkg/ccl/importccl/read_import_mysql.go b/pkg/ccl/importccl/read_import_mysql.go index 77c7ec018172..261162302df3 100644 --- a/pkg/ccl/importccl/read_import_mysql.go +++ b/pkg/ccl/importccl/read_import_mysql.go @@ -285,11 +285,12 @@ func readMysqlCreateTable( ctx context.Context, input io.Reader, evalCtx *tree.EvalContext, - p sql.PlanHookState, + p sql.JobExecContext, startingID, parentID descpb.ID, match string, fks fkHandler, seqVals map[descpb.ID]int64, + owner security.SQLUsername, ) ([]*tabledesc.Mutable, error) { match = lexbase.NormalizeName(match) r := bufio.NewReaderSize(input, 1024*64) @@ -321,7 +322,7 @@ func readMysqlCreateTable( continue } id := descpb.ID(int(startingID) + len(ret)) - tbl, moreFKs, err := mysqlTableToCockroach(ctx, evalCtx, p, parentID, id, name, i.TableSpec, fks, seqVals) + tbl, moreFKs, err := mysqlTableToCockroach(ctx, evalCtx, p, parentID, id, name, i.TableSpec, fks, seqVals, owner) if err != nil { return nil, err } @@ -361,12 +362,13 @@ func safeName(in mysqlIdent) tree.Name { func mysqlTableToCockroach( ctx context.Context, evalCtx *tree.EvalContext, - p sql.PlanHookState, + p sql.JobExecContext, parentID, id descpb.ID, name string, in *mysql.TableSpec, fks fkHandler, seqVals map[descpb.ID]int64, + owner security.SQLUsername, ) ([]*tabledesc.Mutable, []delayedFK, error) { if in == nil { return nil, nil, errors.Errorf("could not read definition for table %q (possible unsupported type?)", name) @@ -400,7 +402,6 @@ func mysqlTableToCockroach( var seqDesc *tabledesc.Mutable // If we have an auto-increment seq, create it and increment the id. - owner := security.AdminRoleName() if seqName != "" { var opts tree.SequenceOptions if startingValue != 0 { @@ -409,10 +410,6 @@ func mysqlTableToCockroach( } var err error if p != nil { - params := p.RunParams(ctx) - if params.SessionData() != nil { - owner = params.SessionData().User() - } priv := descpb.NewDefaultPrivilegeDescriptor(owner) seqDesc, err = sql.NewSequenceTableDesc( ctx, @@ -424,7 +421,7 @@ func mysqlTableToCockroach( time, priv, tree.PersistencePermanent, - ¶ms, + nil, /* params */ ) } else { priv := descpb.NewDefaultPrivilegeDescriptor(owner) diff --git a/pkg/ccl/importccl/read_import_mysql_test.go b/pkg/ccl/importccl/read_import_mysql_test.go index 302db52a4d5d..9aa29fb74d73 100644 --- a/pkg/ccl/importccl/read_import_mysql_test.go +++ b/pkg/ccl/importccl/read_import_mysql_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -117,7 +118,7 @@ func readMysqlCreateFrom( } defer f.Close() - tbl, err := readMysqlCreateTable(context.Background(), f, testEvalCtx, nil, id, expectedParent, name, fks, map[descpb.ID]int64{}) + tbl, err := readMysqlCreateTable(context.Background(), f, testEvalCtx, nil, id, expectedParent, name, fks, map[descpb.ID]int64{}, security.RootUserName()) if err != nil { t.Fatal(err) } diff --git a/pkg/ccl/importccl/read_import_pgdump.go b/pkg/ccl/importccl/read_import_pgdump.go index 701bf2dd2d85..881ea9473fd9 100644 --- a/pkg/ccl/importccl/read_import_pgdump.go +++ b/pkg/ccl/importccl/read_import_pgdump.go @@ -219,12 +219,13 @@ func readPostgresCreateTable( ctx context.Context, input io.Reader, evalCtx *tree.EvalContext, - p sql.PlanHookState, + p sql.JobExecContext, match string, parentID descpb.ID, walltime int64, fks fkHandler, max int, + owner security.SQLUsername, ) ([]*tabledesc.Mutable, error) { // Modify the CreateTable stmt with the various index additions. We do this // instead of creating a full table descriptor first and adding indexes @@ -236,15 +237,10 @@ func readPostgresCreateTable( createSeq := make(map[string]*tree.CreateSequence) tableFKs := make(map[string][]*tree.ForeignKeyConstraintTableDef) ps := newPostgreStream(input, max) - params := p.RunParams(ctx) for { stmt, err := ps.Next() if err == io.EOF { ret := make([]*tabledesc.Mutable, 0, len(createTbl)) - owner := security.AdminRoleName() - if params.SessionData() != nil { - owner = params.SessionData().User() - } for name, seq := range createSeq { id := descpb.ID(int(defaultCSVTableID) + len(ret)) desc, err := sql.NewSequenceTableDesc( @@ -257,7 +253,7 @@ func readPostgresCreateTable( hlc.Timestamp{WallTime: walltime}, descpb.NewDefaultPrivilegeDescriptor(owner), tree.PersistencePermanent, - ¶ms, + nil, /* params */ ) if err != nil { return nil, err diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index d454db036b83..7783618403c0 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -234,7 +234,7 @@ func (r *Registry) runJob( job.mu.Unlock() // Bookkeeping. - phs, cleanup := r.planFn("resume-"+taskName, username) + execCtx, cleanup := r.execCtx("resume-"+taskName, username) defer cleanup() spanName := fmt.Sprintf(`%s-%d`, typ, *job.ID()) var span opentracing.Span @@ -242,7 +242,7 @@ func (r *Registry) runJob( defer span.Finish() // Run the actual job. - err := r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, status, finalResumeError) + err := r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, status, finalResumeError) if err != nil { // TODO (lucy): This needs to distinguish between assertion errors in // the job registry and assertion errors in job execution returned from @@ -269,7 +269,7 @@ SET status = WHEN status = $1 THEN $2 WHEN status = $3 THEN $4 ELSE status - END + END WHERE (status IN ($1, $3)) AND (claim_session_id = $5 AND claim_instance_id = $6) RETURNING id, status`, StatusPauseRequested, StatusPaused, diff --git a/pkg/jobs/deprecated.go b/pkg/jobs/deprecated.go index c1a24d2d22d4..d606256dd149 100644 --- a/pkg/jobs/deprecated.go +++ b/pkg/jobs/deprecated.go @@ -352,7 +352,7 @@ func (r *Registry) deprecatedResume( } // Bookkeeping. payload := job.Payload() - phs, cleanup := r.planFn("resume-"+job.taskName(), payload.UsernameProto.Decode()) + execCtx, cleanup := r.execCtx("resume-"+job.taskName(), payload.UsernameProto.Decode()) defer cleanup() spanName := fmt.Sprintf(`%s-%d`, payload.Type(), *job.ID()) var span opentracing.Span @@ -366,7 +366,7 @@ func (r *Registry) deprecatedResume( if job.Payload().FinalResumeError != nil { finalResumeError = errors.DecodeError(ctx, *job.Payload().FinalResumeError) } - err = r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, status, finalResumeError) + err = r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, status, finalResumeError) if err != nil { // TODO (lucy): This needs to distinguish between assertion errors in // the job registry and assertion errors in job execution returned from diff --git a/pkg/jobs/helpers_test.go b/pkg/jobs/helpers_test.go index 0a803bd680ab..96e857a68c4e 100644 --- a/pkg/jobs/helpers_test.go +++ b/pkg/jobs/helpers_test.go @@ -56,12 +56,12 @@ type OnPauseRequestFunc = onPauseRequestFunc var _ PauseRequester = FakeResumer{} func (d FakeResumer) OnPauseRequest( - ctx context.Context, phs interface{}, txn *kv.Txn, details *jobspb.Progress, + ctx context.Context, execCtx interface{}, txn *kv.Txn, details *jobspb.Progress, ) error { if d.PauseRequest == nil { return nil } - return d.PauseRequest(ctx, phs, txn, details) + return d.PauseRequest(ctx, execCtx, txn, details) } // Started is a wrapper around the internal function that moves a job to the diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 9360e946531b..967f2e87dd38 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -499,9 +499,9 @@ func (j *Job) pauseRequested(ctx context.Context, fn onPauseRequestFunc) error { return fmt.Errorf("job with status %s cannot be requested to be paused", md.Status) } if fn != nil { - phs, cleanup := j.registry.planFn("pause request", j.Payload().UsernameProto.Decode()) + execCtx, cleanup := j.registry.execCtx("pause request", j.Payload().UsernameProto.Decode()) defer cleanup() - if err := fn(ctx, phs, txn, md.Progress); err != nil { + if err := fn(ctx, execCtx, txn, md.Progress); err != nil { return err } ju.UpdateProgress(md.Progress) diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index c4358ad74571..805bf41480ef 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -1680,7 +1680,7 @@ func TestShowJobsWithError(t *testing.T) { } if _, err := sqlDB.Exec(` -- Create a corrupted progress field. - INSERT INTO system.jobs(id, status, payload, progress) SELECT id+2, status, payload, '\xaaaa'::BYTES FROM system.jobs WHERE id = $1; + INSERT INTO system.jobs(id, status, payload, progress) SELECT id+2, status, payload, '\xaaaa'::BYTES FROM system.jobs WHERE id = $1; `, jobID); err != nil { t.Fatal(err) } @@ -1956,7 +1956,7 @@ func TestJobInTxn(t *testing.T) { defer sql.ClearPlanHooks() // Piggy back on BACKUP to be able to create a succeeding test job. sql.AddPlanHook( - func(_ context.Context, stmt tree.Statement, phs sql.PlanHookState, + func(_ context.Context, stmt tree.Statement, execCtx sql.PlanHookState, ) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) { st, ok := stmt.(*tree.Backup) if !ok { @@ -1964,7 +1964,7 @@ func TestJobInTxn(t *testing.T) { } fn := func(_ context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error { var err error - job, err = phs.ExtendedEvalContext().QueueJob( + job, err = execCtx.ExtendedEvalContext().QueueJob( jobs.Record{ Description: st.String(), Details: jobspb.BackupDetails{}, @@ -1991,7 +1991,7 @@ func TestJobInTxn(t *testing.T) { }) // Piggy back on RESTORE to be able to create a failing test job. sql.AddPlanHook( - func(_ context.Context, stmt tree.Statement, phs sql.PlanHookState, + func(_ context.Context, stmt tree.Statement, execCtx sql.PlanHookState, ) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) { _, ok := stmt.(*tree.Restore) if !ok { @@ -1999,7 +1999,7 @@ func TestJobInTxn(t *testing.T) { } fn := func(_ context.Context, _ []sql.PlanNode, _ chan<- tree.Datums) error { var err error - job, err = phs.ExtendedEvalContext().QueueJob( + job, err = execCtx.ExtendedEvalContext().QueueJob( jobs.Record{ Description: "RESTORE", Details: jobspb.RestoreDetails{}, diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 1f3843979a9b..bcc92afb3076 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -108,7 +108,7 @@ type Registry struct { clock *hlc.Clock nodeID *base.SQLIDContainer settings *cluster.Settings - planFn planHookMaker + execCtx jobExecCtxMaker metrics Metrics // adoptionChan is used to nudge the registry to resume claimed jobs and @@ -118,7 +118,7 @@ type Registry struct { // sessionBoundInternalExecutorFactory provides a way for jobs to create // internal executors. This is rarely needed, and usually job resumers should - // use the internal executor from the PlanHookState. The intended user of this + // use the internal executor from the JobExecCtx. The intended user of this // interface is the schema change job resumer, which needs to set the // tableCollectionModifier on the internal executor to different values in // multiple concurrent queries. This situation is an exception to the internal @@ -167,26 +167,26 @@ type Registry struct { TestingResumerCreationKnobs map[jobspb.Type]func(Resumer) Resumer } -// planHookMaker is a wrapper around sql.NewInternalPlanner. It returns an +// jobExecCtxMaker is a wrapper around sql.NewInternalPlanner. It returns an // *sql.planner as an interface{} due to package dependency cycles. It should // be cast to that type in the sql package when it is used. Returns a cleanup // function that must be called once the caller is done with the planner. // // TODO(mjibson): Can we do something to avoid passing an interface{} here // that must be type casted in a Resumer? It cannot be done here because -// PlanHookState lives in the sql package, which would create a dependency -// cycle if listed here. Furthermore, moving PlanHookState into a common +// JobExecContext lives in the sql package, which would create a dependency +// cycle if listed here. Furthermore, moving JobExecContext into a common // subpackage like sqlbase is difficult because of the amount of sql-only -// stuff that PlanHookState exports. One other choice is to merge this package +// stuff that JobExecContext exports. One other choice is to merge this package // back into the sql package. There's maybe a better way that I'm unaware of. -type planHookMaker func(opName string, user security.SQLUsername) (interface{}, func()) +type jobExecCtxMaker func(opName string, user security.SQLUsername) (interface{}, func()) // PreventAdoptionFile is the name of the file which, if present in the first // on-disk store, will prevent the adoption of background jobs by that node. const PreventAdoptionFile = "DISABLE_STARTING_BACKGROUND_JOBS" // MakeRegistry creates a new Registry. planFn is a wrapper around -// sql.newInternalPlanner. It returns a sql.PlanHookState, but must be +// sql.newInternalPlanner. It returns a sql.JobExecCtx, but must be // coerced into that in the Resumer functions. func MakeRegistry( ac log.AmbientContext, @@ -199,7 +199,7 @@ func MakeRegistry( sqlInstance sqlliveness.Instance, settings *cluster.Settings, histogramWindowInterval time.Duration, - planFn planHookMaker, + execCtxFn jobExecCtxMaker, preventAdoptionFile string, ) *Registry { r := &Registry{ @@ -212,7 +212,7 @@ func MakeRegistry( nodeID: nodeID, sqlInstance: sqlInstance, settings: settings, - planFn: planFn, + execCtx: execCtxFn, preventAdoptionFile: preventAdoptionFile, adoptionCh: make(chan adoptionNotice), } @@ -984,9 +984,9 @@ func (r *Registry) Unpause(ctx context.Context, txn *kv.Txn, id int64) error { // type Resumer interface { // Resume is called when a job is started or resumed. Sending results on the - // chan will return them to a user, if a user's session is connected. phs - // is a sql.PlanHookState. - Resume(ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums) error + // chan will return them to a user, if a user's session is connected. execCtx + // is a sql.JobExecCtx. + Resume(ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums) error // OnFailOrCancel is called when a job fails or is cancel-requested. // @@ -994,7 +994,7 @@ type Resumer interface { // which is not guaranteed to run on the node where the job is running. So it // cannot assume that any other methods have been called on this Resumer // object. - OnFailOrCancel(ctx context.Context, phs interface{}) error + OnFailOrCancel(ctx context.Context, execCtx interface{}) error } // PauseRequester is an extension of Resumer which allows job implementers to inject @@ -1003,9 +1003,9 @@ type PauseRequester interface { Resumer // OnPauseRequest is called in the transaction that moves a job to PauseRequested. - // If an error is returned, the pause request will fail. phs is a - // sql.PlanHookState. - OnPauseRequest(ctx context.Context, phs interface{}, txn *kv.Txn, details *jobspb.Progress) error + // If an error is returned, the pause request will fail. execCtx is a + // sql.JobExecCtx. + OnPauseRequest(ctx context.Context, execCtx interface{}, txn *kv.Txn, details *jobspb.Progress) error } // Constructor creates a resumable job of a certain type. The Resumer is @@ -1059,7 +1059,7 @@ func (r retryJobError) Error() string { // the job was not completed with success. status is the current job status. func (r *Registry) stepThroughStateMachine( ctx context.Context, - phs interface{}, + execCtx interface{}, resumer Resumer, resultsCh chan<- tree.Datums, job *Job, @@ -1086,11 +1086,11 @@ func (r *Registry) stepThroughStateMachine( func() { jm.CurrentlyRunning.Inc(1) defer jm.CurrentlyRunning.Dec(1) - err = resumer.Resume(resumeCtx, phs, resultsCh) + err = resumer.Resume(resumeCtx, execCtx, resultsCh) }() if err == nil { jm.ResumeCompleted.Inc(1) - return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusSucceeded, nil) + return r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, StatusSucceeded, nil) } if resumeCtx.Err() != nil { // The context was canceled. Tell the user, but don't attempt to @@ -1116,7 +1116,7 @@ func (r *Registry) stepThroughStateMachine( } return sErr } - return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusReverting, err) + return r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, StatusReverting, err) case StatusPauseRequested: return errors.Errorf("job %s", status) case StatusCancelRequested: @@ -1142,7 +1142,7 @@ func (r *Registry) stepThroughStateMachine( // TODO(spaskob): this is silly, we should remove the OnSuccess hooks and // execute them in resume so that the client can handle these errors // better. - return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusReverting, errors.Wrapf(err, "could not mark job %d as succeeded", *job.ID())) + return r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, StatusReverting, errors.Wrapf(err, "could not mark job %d as succeeded", *job.ID())) } return nil case StatusReverting: @@ -1156,7 +1156,7 @@ func (r *Registry) stepThroughStateMachine( func() { jm.CurrentlyRunning.Inc(1) defer jm.CurrentlyRunning.Dec(1) - err = resumer.OnFailOrCancel(onFailOrCancelCtx, phs) + err = resumer.OnFailOrCancel(onFailOrCancelCtx, execCtx) }() if successOnFailOrCancel := err == nil; successOnFailOrCancel { jm.FailOrCancelCompleted.Inc(1) @@ -1166,7 +1166,7 @@ func (r *Registry) stepThroughStateMachine( if HasErrJobCanceled(jobErr) { nextStatus = StatusCanceled } - return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, nextStatus, jobErr) + return r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, nextStatus, jobErr) } if onFailOrCancelCtx.Err() != nil { jm.FailOrCancelRetryError.Inc(1) @@ -1186,7 +1186,7 @@ func (r *Registry) stepThroughStateMachine( } return sErr } - return r.stepThroughStateMachine(ctx, phs, resumer, resultsCh, job, StatusFailed, + return r.stepThroughStateMachine(ctx, execCtx, resumer, resultsCh, job, StatusFailed, errors.Wrapf(err, "job %d: cannot be reverted, manual cleanup may be required", *job.ID())) case StatusFailed: if jobErr == nil { diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 6a9d39378cab..fcf3413e2703 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -248,7 +248,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { func(opName string, user security.SQLUsername) (interface{}, func()) { // This is a hack to get around a Go package dependency cycle. See comment // in sql/jobs/registry.go on planHookMaker. - return sql.NewInternalPlanner(opName, nil, user, &sql.MemoryMetrics{}, execCfg) + return sql.MakeJobExecContext(opName, user, &sql.MemoryMetrics{}, execCfg) }, cfg.jobAdoptionStopFile, ) diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 47ebc0fd206b..c5f288235f50 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -106,6 +106,7 @@ go_library( "inverted_filter.go", "inverted_join.go", "inverted_span_encoding.go", + "job_exec_context.go", "join.go", "join_predicate.go", "limit.go", diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index eba98263e9bd..18707b6acd08 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -479,9 +479,9 @@ var _ jobs.Resumer = &createStatsResumer{} // Resume is part of the jobs.Resumer interface. func (r *createStatsResumer) Resume( - ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, + ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { - p := phs.(*planner) + p := execCtx.(JobExecContext) details := r.job.Details().(jobspb.CreateStatsDetails) if details.Name == stats.AutoStatsName { // We want to make sure there is only one automatic CREATE STATISTICS job @@ -509,12 +509,12 @@ func (r *createStatsResumer) Resume( evalCtx.Txn = txn if details.AsOf != nil { - p.semaCtx.AsOfTimestamp = details.AsOf - p.extendedEvalCtx.SetTxnTimestamp(details.AsOf.GoTime()) + p.SemaCtx().AsOfTimestamp = details.AsOf + p.ExtendedEvalContext().SetTxnTimestamp(details.AsOf.GoTime()) txn.SetFixedTimestamp(ctx, *details.AsOf) } - planCtx := dsp.NewPlanningCtx(ctx, evalCtx, p, txn, true /* distribute */) + planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, txn, true /* distribute */) if err := dsp.planAndRunCreateStats( ctx, evalCtx, planCtx, txn, r.job, NewRowResultWriter(rows), ); err != nil { @@ -582,7 +582,7 @@ func (r *createStatsResumer) Resume( // pending, running, or paused status that started earlier than this one. If // there are, checkRunningJobs returns an error. If job is nil, checkRunningJobs // just checks if there are any pending, running, or paused CreateStats jobs. -func checkRunningJobs(ctx context.Context, job *jobs.Job, p *planner) error { +func checkRunningJobs(ctx context.Context, job *jobs.Job, p JobExecContext) error { var jobID int64 if job != nil { jobID = *job.ID() diff --git a/pkg/sql/distsql_plan_csv.go b/pkg/sql/distsql_plan_csv.go index ed2a7e43a92d..cee4526cb037 100644 --- a/pkg/sql/distsql_plan_csv.go +++ b/pkg/sql/distsql_plan_csv.go @@ -183,7 +183,7 @@ func presplitTableBoundaries( // returned. func DistIngest( ctx context.Context, - phs PlanHookState, + execCtx JobExecContext, job *jobs.Job, tables map[string]*execinfrapb.ReadImportDataSpec_ImportTable, from []string, @@ -193,15 +193,15 @@ func DistIngest( ) (roachpb.BulkOpSummary, error) { ctx = logtags.AddTag(ctx, "import-distsql-ingest", nil) - dsp := phs.DistSQLPlanner() - evalCtx := phs.ExtendedEvalContext() + dsp := execCtx.DistSQLPlanner() + evalCtx := execCtx.ExtendedEvalContext() - planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, phs.ExecCfg()) + planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg()) if err != nil { return roachpb.BulkOpSummary{}, err } - inputSpecs := makeImportReaderSpecs(job, tables, from, format, nodes, walltime, phs.User()) + inputSpecs := makeImportReaderSpecs(job, tables, from, format, nodes, walltime, execCtx.User()) gatewayNodeID, err := evalCtx.ExecCfg.NodeID.OptionalNodeIDErr(47970) if err != nil { @@ -285,7 +285,7 @@ func DistIngest( return nil }) - if err := presplitTableBoundaries(ctx, phs.ExecCfg(), tables); err != nil { + if err := presplitTableBoundaries(ctx, execCtx.ExecCfg(), tables); err != nil { return roachpb.BulkOpSummary{}, err } diff --git a/pkg/sql/distsql_plan_stats.go b/pkg/sql/distsql_plan_stats.go index 870da8e9b7d6..33eb02f9ebcf 100644 --- a/pkg/sql/distsql_plan_stats.go +++ b/pkg/sql/distsql_plan_stats.go @@ -80,7 +80,7 @@ func (dsp *DistSQLPlanner) createStatsPlan( if err != nil { return nil, err } - sb := span.MakeBuilder(planCtx.planner.ExecCfg().Codec, desc, scan.index) + sb := span.MakeBuilder(planCtx.ExtendedEvalCtx.Codec, desc, scan.index) scan.spans, err = sb.UnconstrainedSpans() if err != nil { return nil, err @@ -183,7 +183,7 @@ func (dsp *DistSQLPlanner) createStatsPlan( ) // Estimate the expected number of rows based on existing stats in the cache. - tableStats, err := planCtx.planner.execCfg.TableStatsCache.GetTableStats(planCtx.ctx, desc.ID) + tableStats, err := planCtx.ExtendedEvalCtx.ExecCfg.TableStatsCache.GetTableStats(planCtx.ctx, desc.ID) if err != nil { return nil, err } diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index 912d4208c199..1b2058092163 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -75,9 +75,9 @@ func performGC( // Resume is part of the jobs.Resumer interface. func (r schemaChangeGCResumer) Resume( - ctx context.Context, phs interface{}, _ chan<- tree.Datums, + ctx context.Context, execCtx interface{}, _ chan<- tree.Datums, ) error { - p := phs.(sql.PlanHookState) + p := execCtx.(sql.JobExecContext) // TODO(pbardea): Wait for no versions. execCfg := p.ExecCfg() if fn := execCfg.GCJobTestingKnobs.RunBeforeResume; fn != nil { diff --git a/pkg/sql/job_exec_context.go b/pkg/sql/job_exec_context.go new file mode 100644 index 000000000000..c9781429a6cc --- /dev/null +++ b/pkg/sql/job_exec_context.go @@ -0,0 +1,65 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" +) + +// plannerJobExecContext is a wrapper to implement JobExecContext with a planner +// without allowing casting directly to a planner. Eventually it would be nice +// if we could implement the API entirely without a planner however the only +// implementation of extendedEvalContext is very tied to a planner. +type plannerJobExecContext struct { + p *planner +} + +// MakeJobExecContext makes a JobExecContext. +func MakeJobExecContext( + opName string, user security.SQLUsername, memMetrics *MemoryMetrics, execCfg *ExecutorConfig, +) (JobExecContext, func()) { + p, close := newInternalPlanner(opName, nil /*txn*/, user, memMetrics, execCfg) + return &plannerJobExecContext{p: p}, close +} + +func (e *plannerJobExecContext) SemaCtx() *tree.SemaContext { return e.p.SemaCtx() } +func (e *plannerJobExecContext) ExtendedEvalContext() *extendedEvalContext { + return e.p.ExtendedEvalContext() +} +func (e *plannerJobExecContext) SessionData() *sessiondata.SessionData { + return e.p.SessionData() +} +func (e *plannerJobExecContext) ExecCfg() *ExecutorConfig { return e.p.ExecCfg() } +func (e *plannerJobExecContext) DistSQLPlanner() *DistSQLPlanner { return e.p.DistSQLPlanner() } +func (e *plannerJobExecContext) LeaseMgr() *lease.Manager { return e.p.LeaseMgr() } +func (e *plannerJobExecContext) User() security.SQLUsername { return e.p.User() } + +// JobExecContext provides the execution environment for a job. It is what is +// passed to the Resume/OnFailOrCancel/OnPauseRequested methods of a jobs's +// Resumer to give that resumer access to things like ExecutorCfg, LeaseMgr, +// etc -- the kinds of things that would usually be on planner or similar during +// a non-job SQL statement's execution. Unlike a planner however, or planner-ish +// interfaces like PlanHookState, JobExecContext does not include a txn or the +// methods that defined in terms of "the" txn, such as privilege/name accessors. +// (though note that ExtendedEvalContext may transitively include methods that +// close over/expect a txn so use it with caution). +type JobExecContext interface { + SemaCtx() *tree.SemaContext + ExtendedEvalContext() *extendedEvalContext + SessionData() *sessiondata.SessionData + ExecCfg() *ExecutorConfig + DistSQLPlanner() *DistSQLPlanner + LeaseMgr() *lease.Manager + User() security.SQLUsername +} diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index c5f93b26bfb1..8c10af77436d 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -2049,9 +2049,9 @@ type schemaChangeResumer struct { } func (r schemaChangeResumer) Resume( - ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, + ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums, ) error { - p := phs.(PlanHookState) + p := execCtx.(JobExecContext) details := r.job.Details().(jobspb.SchemaChangeDetails) if p.ExecCfg().SchemaChangerTestingKnobs.SchemaChangeJobNoOp != nil && p.ExecCfg().SchemaChangerTestingKnobs.SchemaChangeJobNoOp() { @@ -2221,8 +2221,8 @@ func (r schemaChangeResumer) Resume( } // OnFailOrCancel is part of the jobs.Resumer interface. -func (r schemaChangeResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { - p := phs.(PlanHookState) +func (r schemaChangeResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { + p := execCtx.(JobExecContext) details := r.job.Details().(jobspb.SchemaChangeDetails) if details.DroppedDatabaseID != descpb.InvalidID { diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index 8c6a66ec1047..d1996f1dc336 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -265,27 +265,27 @@ type typeChangeResumer struct { // Resume implements the jobs.Resumer interface. func (t *typeChangeResumer) Resume( - ctx context.Context, phs interface{}, _ chan<- tree.Datums, + ctx context.Context, execCtx interface{}, _ chan<- tree.Datums, ) error { - p := phs.(*planner) - if p.execCfg.TypeSchemaChangerTestingKnobs.TypeSchemaChangeJobNoOp != nil { - if p.execCfg.TypeSchemaChangerTestingKnobs.TypeSchemaChangeJobNoOp() { + p := execCtx.(JobExecContext) + if p.ExecCfg().TypeSchemaChangerTestingKnobs.TypeSchemaChangeJobNoOp != nil { + if p.ExecCfg().TypeSchemaChangerTestingKnobs.TypeSchemaChangeJobNoOp() { return nil } } tc := &typeSchemaChanger{ typeID: t.job.Details().(jobspb.TypeSchemaChangeDetails).TypeID, - execCfg: p.execCfg, + execCfg: p.ExecCfg(), } return tc.execWithRetry(ctx) } // OnFailOrCancel implements the jobs.Resumer interface. -func (t *typeChangeResumer) OnFailOrCancel(ctx context.Context, phs interface{}) error { +func (t *typeChangeResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error { // If the job failed, just try again to clean up any draining names. tc := &typeSchemaChanger{ typeID: t.job.Details().(jobspb.TypeSchemaChangeDetails).TypeID, - execCfg: phs.(*planner).ExecCfg(), + execCfg: execCtx.(JobExecContext).ExecCfg(), } return drainNamesForDescriptor(