Skip to content

Commit

Permalink
sql,*ccl: remove InternalExecutor from ExecCfg
Browse files Browse the repository at this point in the history
In a bid to make the InternalExecutor more tightly bound
with certain conn executor state such as transaction, session
data, extraTxnState (including descs collection), this patch
removes the `InternalExecutor` field from the executor config.
In this way consumers of the IE cannot just run queries off of
this free floating instance of the IE, but instead have to
initialize a new IE in the context of the surrounding txn.

The `InternalExecFactory` currently takes a closure that allows
for injecting necessary state into a newly created IE, but one
could imagine a future where we use typed parameters to force
all users to bind the IE with the surrounding state it is being
used in.

Release note: None
  • Loading branch information
adityamaru committed Nov 28, 2021
1 parent 8137ebb commit 323561b
Show file tree
Hide file tree
Showing 79 changed files with 383 additions and 210 deletions.
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/backup_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -686,7 +687,8 @@ func (b *backupResumer) maybeNotifyScheduledJobCompletion(
err := exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// We cannot rely on b.job containing created_by_id because on job
// resumption the registry does not populate the resumer's CreatedByInfo.
datums, err := exec.InternalExecutor.QueryRowEx(
ie := exec.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
datums, err := ie.QueryRowEx(
ctx,
"lookup-schedule-info",
txn,
Expand All @@ -706,7 +708,7 @@ func (b *backupResumer) maybeNotifyScheduledJobCompletion(

scheduleID := int64(tree.MustBeDInt(datums[0]))
if err := jobs.NotifyJobTermination(
ctx, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID, exec.InternalExecutor, txn); err != nil {
ctx, env, b.job.ID(), jobStatus, b.job.Details(), scheduleID, ie, txn); err != nil {
return errors.Wrapf(err,
"failed to notify schedule %d of completion of job %d", scheduleID, b.job.ID())
}
Expand Down
13 changes: 8 additions & 5 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/interval"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -820,13 +821,14 @@ func backupPlanHook(

var spans []roachpb.Span
var tenants []descpb.TenantInfoWithUsage
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
if backupStmt.Targets != nil && backupStmt.Targets.Tenant != (roachpb.TenantID{}) {
if !p.ExecCfg().Codec.ForSystemTenant() {
return pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can backup other tenants")
}

tenantInfo, err := retrieveSingleTenantMetadata(
ctx, p.ExecCfg().InternalExecutor, p.ExtendedEvalContext().Txn, backupStmt.Targets.Tenant,
ctx, ie, p.ExtendedEvalContext().Txn, backupStmt.Targets.Tenant,
)
if err != nil {
return err
Expand All @@ -842,7 +844,7 @@ func backupPlanHook(
if p.ExecCfg().Codec.ForSystemTenant() && backupStmt.Coverage() == tree.AllDescriptors {
// Include all tenants.
tenants, err = retrieveAllTenantsMetadata(
ctx, p.ExecCfg().InternalExecutor, p.ExtendedEvalContext().Txn,
ctx, ie, p.ExtendedEvalContext().Txn,
)
if err != nil {
return err
Expand Down Expand Up @@ -1185,7 +1187,7 @@ func getScheduledBackupExecutionArgsFromSchedule(
ctx context.Context,
env scheduledjobs.JobSchedulerEnv,
txn *kv.Txn,
ie *sql.InternalExecutor,
ie sqlutil.InternalExecutor,
scheduleID int64,
) (*jobs.ScheduledJob, *ScheduledBackupExecutionArgs, error) {
// Load the schedule that has spawned this job.
Expand Down Expand Up @@ -1224,8 +1226,9 @@ func planSchedulePTSChaining(
return nil
}

ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
_, args, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env,
p.ExtendedEvalContext().Txn, p.ExecCfg().InternalExecutor, backupStmt.CreatedByInfo.ID)
p.ExtendedEvalContext().Txn, ie.(*sql.InternalExecutor), backupStmt.CreatedByInfo.ID)
if err != nil {
return err
}
Expand All @@ -1244,7 +1247,7 @@ func planSchedulePTSChaining(
}

_, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env,
p.ExtendedEvalContext().Txn, p.ExecCfg().InternalExecutor, args.DependentScheduleID)
p.ExtendedEvalContext().Txn, ie.(*sql.InternalExecutor), args.DependentScheduleID)
if err != nil {
// If we are unable to resolve the dependent incremental schedule (it
// could have been dropped) we do not need to perform any chaining.
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/backupccl/backup_planning_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -76,7 +76,7 @@ func tenantMetadataFromRow(row tree.Datums) (descpb.TenantInfoWithUsage, error)
}

func retrieveSingleTenantMetadata(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID,
ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn, tenantID roachpb.TenantID,
) (descpb.TenantInfoWithUsage, error) {
row, err := ie.QueryRow(
ctx, "backup-lookup-tenant", txn,
Expand All @@ -96,7 +96,7 @@ func retrieveSingleTenantMetadata(
}

func retrieveAllTenantsMetadata(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn,
ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn,
) ([]descpb.TenantInfoWithUsage, error) {
rows, err := ie.QueryBuffered(
ctx, "backup-lookup-tenants", txn,
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/jsonpb"
Expand Down Expand Up @@ -430,7 +431,7 @@ func doCreateBackupSchedules(
return err
}

ex := p.ExecCfg().InternalExecutor
ex := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

unpauseOnSuccessID := jobs.InvalidScheduleID

Expand Down Expand Up @@ -509,7 +510,7 @@ func doCreateBackupSchedules(

func setDependentSchedule(
ctx context.Context,
ex *sql.InternalExecutor,
ex sqlutil.InternalExecutor,
scheduleExecutionArgs *ScheduledBackupExecutionArgs,
schedule *jobs.ScheduledJob,
dependentID int64,
Expand Down Expand Up @@ -659,8 +660,8 @@ func emitSchedule(
func checkScheduleAlreadyExists(
ctx context.Context, p sql.PlanHookState, scheduleLabel string,
) (bool, error) {

row, err := p.ExecCfg().InternalExecutor.QueryRowEx(ctx, "check-sched",
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
row, err := ie.QueryRowEx(ctx, "check-sched",
p.ExtendedEvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
fmt.Sprintf("SELECT count(schedule_name) FROM %s WHERE schedule_name = '%s'",
scheduledjobs.ProdJobSchedulerEnv.ScheduledJobsTableName(), scheduleLabel))
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1897,7 +1897,8 @@ func insertStats(
}

err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := stats.InsertNewStats(ctx, execCfg.InternalExecutor, txn, latestStats); err != nil {
if err := stats.InsertNewStats(ctx, execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {}),
txn, latestStats); err != nil {
return errors.Wrapf(err, "inserting stats from backup")
}
details.StatsInserted = true
Expand Down Expand Up @@ -2121,7 +2122,7 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}
if details.DescriptorCoverage == tree.AllDescriptors {
// We've dropped defaultdb and postgres in the planning phase, we must
// recreate them now if the full cluster restore failed.
ie := p.ExecCfg().InternalExecutor
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
_, err := ie.Exec(ctx, "recreate-defaultdb", txn, "CREATE DATABASE IF NOT EXISTS defaultdb")
if err != nil {
return err
Expand Down Expand Up @@ -2667,7 +2668,7 @@ func (r *restoreResumer) restoreSystemTables(
}

func (r *restoreResumer) cleanupTempSystemTables(ctx context.Context, txn *kv.Txn) error {
executor := r.execCfg.InternalExecutor
executor := r.execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
// Check if the temp system database has already been dropped. This can happen
// if the restore job fails after the system database has cleaned up.
checkIfDatabaseExists := "SELECT database_name FROM [SHOW DATABASES] WHERE database_name=$1"
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/roleoption"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -891,7 +892,7 @@ func allocateDescriptorRewrites(
// restore.
func dropDefaultUserDBs(ctx context.Context, execCfg *sql.ExecutorConfig) error {
return sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
ie := execCfg.InternalExecutor
ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
_, err := ie.Exec(ctx, "drop-defaultdb", nil, "DROP DATABASE IF EXISTS defaultdb")
if err != nil {
return err
Expand Down Expand Up @@ -1894,8 +1895,9 @@ func doRestorePlan(
if !p.ExecCfg().Codec.ForSystemTenant() {
return pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can restore other tenants")
}
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
for _, i := range tenants {
res, err := p.ExecCfg().InternalExecutor.QueryRow(
res, err := ie.QueryRow(
ctx, "restore-lookup-tenant", p.ExtendedEvalContext().Txn,
`SELECT active FROM system.tenants WHERE id = $1`, i.ID,
)
Expand Down
12 changes: 7 additions & 5 deletions pkg/ccl/backupccl/schedule_pts_chaining.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -56,9 +57,10 @@ func maybeUpdateSchedulePTSRecord(
}

return exec.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := exec.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
// We cannot rely on b.job containing created_by_id because on job
// resumption the registry does not populate the resumers' CreatedByInfo.
datums, err := exec.InternalExecutor.QueryRowEx(
datums, err := ie.QueryRowEx(
ctx,
"lookup-schedule-info",
txn,
Expand All @@ -77,8 +79,7 @@ func maybeUpdateSchedulePTSRecord(
}

scheduleID := int64(tree.MustBeDInt(datums[0]))
_, args, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn,
exec.InternalExecutor, scheduleID)
_, args, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn, ie, scheduleID)
if err != nil {
return errors.Wrap(err, "load scheduled job")
}
Expand Down Expand Up @@ -132,10 +133,11 @@ func manageFullBackupPTSChaining(
exec *sql.ExecutorConfig,
args *ScheduledBackupExecutionArgs,
) error {
ie := exec.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
// Let's resolve the dependent incremental schedule as the first step. If the
// schedule has been dropped then we can avoid doing unnecessary work.
incSj, incArgs, err := getScheduledBackupExecutionArgsFromSchedule(ctx, env, txn,
exec.InternalExecutor, args.DependentScheduleID)
ie, args.DependentScheduleID)
if err != nil {
if jobs.HasScheduledJobNotFoundError(err) {
log.Warningf(ctx, "could not find dependent schedule with id %d",
Expand Down Expand Up @@ -180,7 +182,7 @@ func manageFullBackupPTSChaining(
return err
}
incSj.SetExecutionDetails(incSj.ExecutorType(), jobspb.ExecutionArguments{Args: any})
return incSj.Update(ctx, exec.InternalExecutor, txn)
return incSj.Update(ctx, ie, txn)
}

// manageIncrementalBackupPTSChaining is invoked on successful completion of an
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -89,7 +90,7 @@ func defaultSystemTableRestoreFunc(
txn *kv.Txn,
systemTableName, tempTableName string,
) error {
executor := execCfg.InternalExecutor
executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName)
opName := systemTableName + "-data-deletion"
Expand Down Expand Up @@ -117,7 +118,7 @@ func defaultSystemTableRestoreFunc(
func jobsMigrationFunc(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, tempTableName string,
) (err error) {
executor := execCfg.InternalExecutor
executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

const statesToRevert = `('` + string(jobs.StatusRunning) + `', ` +
`'` + string(jobs.StatusPauseRequested) + `', ` +
Expand Down Expand Up @@ -191,7 +192,7 @@ func jobsRestoreFunc(
txn *kv.Txn,
systemTableName, tempTableName string,
) error {
executor := execCfg.InternalExecutor
executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

// When restoring jobs, don't clear the existing table.

Expand All @@ -212,7 +213,7 @@ func settingsRestoreFunc(
txn *kv.Txn,
systemTableName, tempTableName string,
) error {
executor := execCfg.InternalExecutor
executor := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE name <> 'version'", systemTableName)
opName := systemTableName + "-data-deletion"
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqltelemetry",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/sql/types",
"//pkg/util",
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/importccl/import_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/opt/memo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -165,8 +166,8 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
// is in keeping with the semantics we use when creating a schema during
// sql execution. Namely, queue job in the txn which creates the schema
// desc and run once the txn has committed.
if err := p.ExecCfg().JobRegistry.Run(ctx, p.ExecCfg().InternalExecutor,
schemaMetadata.queuedSchemaJobs); err != nil {
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
if err := p.ExecCfg().JobRegistry.Run(ctx, ie, schemaMetadata.queuedSchemaJobs); err != nil {
return err
}

Expand Down Expand Up @@ -954,6 +955,7 @@ func (r *importResumer) publishTables(
func (r *importResumer) writeStubStatisticsForImportedTables(
ctx context.Context, execCfg *sql.ExecutorConfig, res roachpb.BulkOpSummary,
) {
ie := execCfg.InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
details := r.job.Details().(jobspb.ImportDetails)
for _, tbl := range details.Tables {
if tbl.IsNew {
Expand All @@ -977,7 +979,7 @@ func (r *importResumer) writeStubStatisticsForImportedTables(
statistic.AvgSize = avgRowSize
}
// TODO(michae2): parallelize insertion of statistics.
err = stats.InsertNewStats(ctx, execCfg.InternalExecutor, nil /* txn */, statistics)
err = stats.InsertNewStats(ctx, ie, nil /* txn */, statistics)
}
if err != nil {
// Failure to create statistics should not fail the entire import.
Expand Down Expand Up @@ -1317,7 +1319,8 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, execCtx interface{})
// This would be a job to drop all the schemas, and a job to update the parent
// database descriptor.
if len(jobsToRunAfterTxnCommit) != 0 {
if err := p.ExecCfg().JobRegistry.Run(ctx, p.ExecCfg().InternalExecutor,
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})
if err := p.ExecCfg().JobRegistry.Run(ctx, ie,
jobsToRunAfterTxnCommit); err != nil {
return errors.Wrap(err, "failed to run jobs that drop the imported schemas")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/migration/migrationjob/migration_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error {
execCtx := execCtxI.(sql.JobExecContext)
pl := r.j.Payload()
cv := *pl.GetMigration().ClusterVersion
ie := execCtx.ExecCfg().InternalExecutor
ie := execCtx.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {})

alreadyCompleted, err := CheckIfMigrationCompleted(ctx, nil /* txn */, ie, cv)
if alreadyCompleted || err != nil {
Expand All @@ -88,7 +88,7 @@ func (r resumer) Resume(ctx context.Context, execCtxI interface{}) error {
Settings: execCtx.ExecCfg().Settings,
CollectionFactory: execCtx.ExecCfg().CollectionFactory,
LeaseManager: execCtx.ExecCfg().LeaseManager,
InternalExecutor: execCtx.ExecCfg().InternalExecutor,
InternalExecutor: ie,
TestingKnobs: execCtx.ExecCfg().MigrationTestingKnobs,
}, r.j)
default:
Expand Down
Loading

0 comments on commit 323561b

Please sign in to comment.