diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 8cfee1c91436..d82385f540f3 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1803,7 +1803,9 @@ func revalidateIndexes( // since our table is offline. var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error { return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor) + ie := job.MakeSessionBoundInternalExecutor(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(sql.NewFakeSessionData(execCfg.SV())) + }) return fn(ctx, txn, ie) }) } diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 041f94475016..e810ca38190e 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -83,13 +83,15 @@ func New( metrics *Metrics, ) SchemaFeed { m := &schemaFeed{ - filter: schemaChangeEventFilters[events], - db: cfg.DB, - clock: cfg.DB.Clock(), - settings: cfg.Settings, - targets: targets, - leaseMgr: cfg.LeaseManager.(*lease.Manager), - ie: cfg.SessionBoundInternalExecutorFactory(ctx, &sessiondata.SessionData{}), + filter: schemaChangeEventFilters[events], + db: cfg.DB, + clock: cfg.DB.Clock(), + settings: cfg.Settings, + targets: targets, + leaseMgr: cfg.LeaseManager.(*lease.Manager), + ie: cfg.SessionBoundInternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(&sessiondata.SessionData{}) + }), collectionFactory: cfg.CollectionFactory, metrics: metrics, } diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index dac6e12927ea..8e025a52aa22 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -723,9 +723,9 @@ func (j *Job) FractionCompleted() float32 { // sessionBoundInternalExecutorFactory for a more detailed explanation of why // this exists. func (j *Job) MakeSessionBoundInternalExecutor( - ctx context.Context, sd *sessiondata.SessionData, + ctx context.Context, initInternalExecutor func(sqlutil.InternalExecutor), ) sqlutil.InternalExecutor { - return j.registry.sessionBoundInternalExecutorFactory(ctx, sd) + return j.registry.sessionBoundInternalExecutorFactory(ctx, initInternalExecutor) } func (j *Job) runInTxn( diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 80d489217d3f..4fc1d68e1880 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -774,7 +774,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { // SessionBoundInternalExecutorFactory. The same applies for setting a // SessionBoundInternalExecutor on the job registry. ieFactory := func( - ctx context.Context, sessionData *sessiondata.SessionData, + ctx context.Context, initInternalExecutor func(ie sqlutil.InternalExecutor), ) sqlutil.InternalExecutor { ie := sql.MakeInternalExecutor( ctx, @@ -782,7 +782,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { internalMemMetrics, cfg.Settings, ) - ie.SetSessionData(sessionData) + initInternalExecutor(&ie) return &ie } diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index f6c1a0235428..318a4a0d9cb7 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -116,8 +116,8 @@ func MakeInternalExecutor( } // SetSessionData binds the session variables that will be used by queries -// performed through this executor from now on. This creates a new session stack. -// It is recommended to use SetSessionDataStack. +// performed through this executor from now on. This creates a new session +// stack. // // SetSessionData cannot be called concurrently with query execution. func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) { diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 1832a11f6ca1..6de79d9d6960 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/span" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -1159,8 +1160,9 @@ func (ef *execFactory) showEnv(plan string, envOpts exec.ExplainEnvData) (exec.N ie := ef.planner.extendedEvalCtx.ExecCfg.InternalExecutorFactory( ef.planner.EvalContext().Context, - ef.planner.SessionData(), - ) + func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(ef.planner.SessionData()) + }) c := makeStmtEnvCollector(ef.planner.EvalContext().Context, ie.(*InternalExecutor)) // Show the version of Cockroach running. diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index b9ab21449576..93bb1b25acad 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -825,7 +826,9 @@ func (p *planner) QueryRowEx( stmt string, qargs ...interface{}, ) (tree.Datums, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(p.SessionData()) + }) return ie.QueryRowEx(ctx, opName, txn, override, stmt, qargs...) } @@ -843,7 +846,9 @@ func (p *planner) QueryIteratorEx( stmt string, qargs ...interface{}, ) (tree.InternalRows, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(p.SessionData()) + }) rows, err := ie.QueryIteratorEx(ctx, opName, txn, override, stmt, qargs...) return rows.(tree.InternalRows), err } diff --git a/pkg/sql/resolve_oid.go b/pkg/sql/resolve_oid.go index 177cf5e370e4..c47b8e3ac1fc 100644 --- a/pkg/sql/resolve_oid.go +++ b/pkg/sql/resolve_oid.go @@ -29,7 +29,9 @@ import ( func (p *planner) ResolveOIDFromString( ctx context.Context, resultType *types.T, toResolve *tree.DString, ) (*tree.DOid, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(p.SessionData()) + }) return resolveOID( ctx, p.Txn(), ie, @@ -41,7 +43,9 @@ func (p *planner) ResolveOIDFromString( func (p *planner) ResolveOIDFromOID( ctx context.Context, resultType *types.T, toResolve *tree.DOid, ) (*tree.DOid, error) { - ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData()) + ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(p.SessionData()) + }) return resolveOID( ctx, p.Txn(), ie, diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 85ce65e56fb3..5a7c52a5cb04 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -128,7 +128,7 @@ func NewSchemaChangerForTesting( // Note that this doesn't end up actually being session-bound but that's // good enough for testing. ieFactory: func( - ctx context.Context, sd *sessiondata.SessionData, + ctx context.Context, initInternalExecutor func(sqlutil.InternalExecutor), ) sqlutil.InternalExecutor { return execCfg.InternalExecutor }, @@ -1949,7 +1949,9 @@ func createSchemaChangeEvalCtx( ) extendedEvalContext { sd := NewFakeSessionData(execCfg.SV()) - ie := ieFactory(ctx, sd) + ie := ieFactory(ctx, func(ex sqlutil.InternalExecutor) { + ex.SetSessionData(sd) + }) evalCtx := extendedEvalContext{ // Make a session tracing object on-the-fly. This is OK @@ -2057,8 +2059,8 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er clock: p.ExecCfg().Clock, settings: p.ExecCfg().Settings, execCfg: p.ExecCfg(), - ieFactory: func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor { - return r.job.MakeSessionBoundInternalExecutor(ctx, sd) + ieFactory: func(ctx context.Context, initInternalExecutor func(ie sqlutil.InternalExecutor)) sqlutil.InternalExecutor { + return r.job.MakeSessionBoundInternalExecutor(ctx, initInternalExecutor) }, metrics: p.ExecCfg().SchemaChangerMetrics, } @@ -2243,8 +2245,8 @@ func (r schemaChangeResumer) OnFailOrCancel(ctx context.Context, execCtx interfa clock: p.ExecCfg().Clock, settings: p.ExecCfg().Settings, execCfg: p.ExecCfg(), - ieFactory: func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor { - return r.job.MakeSessionBoundInternalExecutor(ctx, sd) + ieFactory: func(ctx context.Context, initInternalExecutor func(executor sqlutil.InternalExecutor)) sqlutil.InternalExecutor { + return r.job.MakeSessionBoundInternalExecutor(ctx, initInternalExecutor) }, } diff --git a/pkg/sql/schemachanger/scsqldeps/index_validator.go b/pkg/sql/schemachanger/scsqldeps/index_validator.go index 2a1a414c946f..155c059a7840 100644 --- a/pkg/sql/schemachanger/scsqldeps/index_validator.go +++ b/pkg/sql/schemachanger/scsqldeps/index_validator.go @@ -75,7 +75,9 @@ func (iv indexValidator) ValidateForwardIndexes( if err != nil { return err } - return fn(ctx, validationTxn, iv.ieFactory(ctx, iv.newFakeSessionData(&iv.settings.SV))) + return fn(ctx, validationTxn, iv.ieFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(iv.newFakeSessionData(&iv.settings.SV)) + })) } return iv.validateForwardIndexes(ctx, tableDesc, indexes, txnRunner, withFirstMutationPublic, gatherAllInvalid, override) } @@ -95,7 +97,9 @@ func (iv indexValidator) ValidateInvertedIndexes( if err != nil { return err } - return fn(ctx, validationTxn, iv.ieFactory(ctx, iv.newFakeSessionData(&iv.settings.SV))) + return fn(ctx, validationTxn, iv.ieFactory(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(iv.newFakeSessionData(&iv.settings.SV)) + })) } return iv.validateInvertedIndexes(ctx, iv.codec, tableDesc, indexes, txnRunner, gatherAllInvalid, override) } diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index fedcc0d28440..9f734d47e005 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -158,6 +158,13 @@ type InternalExecutor interface { WithSyntheticDescriptors( descs []catalog.Descriptor, run func() error, ) error + + // SetSessionData binds the session variables that will be used by queries + // performed through this executor from now on. This creates a new session + // stack. + // + // SetSessionData cannot be called concurrently with query execution. + SetSessionData(*sessiondata.SessionData) } // InternalRows is an iterator interface that's exposed by the internal @@ -194,7 +201,7 @@ type InternalRows interface { // SessionBoundInternalExecutorFactory is a function that produces a "session // bound" internal executor. type SessionBoundInternalExecutorFactory func( - context.Context, *sessiondata.SessionData, + context.Context, func(InternalExecutor), ) InternalExecutor // InternalExecFn is the type of functions that operates using an internalExecutor. diff --git a/pkg/sql/temporary_schema.go b/pkg/sql/temporary_schema.go index 007359720d00..9975d64940b9 100644 --- a/pkg/sql/temporary_schema.go +++ b/pkg/sql/temporary_schema.go @@ -582,7 +582,9 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup( } // Clean up temporary data for inactive sessions. - ie := c.makeSessionBoundInternalExecutor(ctx, &sessiondata.SessionData{}) + ie := c.makeSessionBoundInternalExecutor(ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(&sessiondata.SessionData{}) + }) for sessionID := range sessionIDs { if _, ok := activeSessions[sessionID.Uint128]; !ok { log.Eventf(ctx, "cleaning up temporary object for session %q", sessionID) diff --git a/pkg/sql/unsplit.go b/pkg/sql/unsplit.go index 4ece5f0a3356..6079203ca57e 100644 --- a/pkg/sql/unsplit.go +++ b/pkg/sql/unsplit.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/errors" ) @@ -106,7 +107,9 @@ func (n *unsplitAllNode) startExec(params runParams) error { if n.index.GetID() != n.tableDesc.GetPrimaryIndexID() { indexName = n.index.GetName() } - ie := params.p.ExecCfg().InternalExecutorFactory(params.ctx, params.SessionData()) + ie := params.p.ExecCfg().InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) { + ie.SetSessionData(params.SessionData()) + }) it, err := ie.QueryIteratorEx( params.ctx, "split points query", params.p.txn, sessiondata.NoSessionDataOverride, statement,