Skip to content

Commit

Permalink
sql: refator internal executor factory to accept init function
Browse files Browse the repository at this point in the history
This change refactors the InternalExecutorFactory to take in
a closure that allows injecting external state into a new internal
executor.

Release note: None
  • Loading branch information
adityamaru committed Nov 26, 2021
1 parent a213807 commit 8137ebb
Show file tree
Hide file tree
Showing 13 changed files with 64 additions and 31 deletions.
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1803,7 +1803,9 @@ func revalidateIndexes(
// since our table is offline.
var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error {
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor)
ie := job.MakeSessionBoundInternalExecutor(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(sql.NewFakeSessionData(execCfg.SV()))
})
return fn(ctx, txn, ie)
})
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ func New(
metrics *Metrics,
) SchemaFeed {
m := &schemaFeed{
filter: schemaChangeEventFilters[events],
db: cfg.DB,
clock: cfg.DB.Clock(),
settings: cfg.Settings,
targets: targets,
leaseMgr: cfg.LeaseManager.(*lease.Manager),
ie: cfg.SessionBoundInternalExecutorFactory(ctx, &sessiondata.SessionData{}),
filter: schemaChangeEventFilters[events],
db: cfg.DB,
clock: cfg.DB.Clock(),
settings: cfg.Settings,
targets: targets,
leaseMgr: cfg.LeaseManager.(*lease.Manager),
ie: cfg.SessionBoundInternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(&sessiondata.SessionData{})
}),
collectionFactory: cfg.CollectionFactory,
metrics: metrics,
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,9 +723,9 @@ func (j *Job) FractionCompleted() float32 {
// sessionBoundInternalExecutorFactory for a more detailed explanation of why
// this exists.
func (j *Job) MakeSessionBoundInternalExecutor(
ctx context.Context, sd *sessiondata.SessionData,
ctx context.Context, initInternalExecutor func(sqlutil.InternalExecutor),
) sqlutil.InternalExecutor {
return j.registry.sessionBoundInternalExecutorFactory(ctx, sd)
return j.registry.sessionBoundInternalExecutorFactory(ctx, initInternalExecutor)
}

func (j *Job) runInTxn(
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -774,15 +774,15 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// SessionBoundInternalExecutorFactory. The same applies for setting a
// SessionBoundInternalExecutor on the job registry.
ieFactory := func(
ctx context.Context, sessionData *sessiondata.SessionData,
ctx context.Context, initInternalExecutor func(ie sqlutil.InternalExecutor),
) sqlutil.InternalExecutor {
ie := sql.MakeInternalExecutor(
ctx,
pgServer.SQLServer,
internalMemMetrics,
cfg.Settings,
)
ie.SetSessionData(sessionData)
initInternalExecutor(&ie)
return &ie
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ func MakeInternalExecutor(
}

// SetSessionData binds the session variables that will be used by queries
// performed through this executor from now on. This creates a new session stack.
// It is recommended to use SetSessionDataStack.
// performed through this executor from now on. This creates a new session
// stack.
//
// SetSessionData cannot be called concurrently with query execution.
func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData) {
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/span"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand Down Expand Up @@ -1159,8 +1160,9 @@ func (ef *execFactory) showEnv(plan string, envOpts exec.ExplainEnvData) (exec.N

ie := ef.planner.extendedEvalCtx.ExecCfg.InternalExecutorFactory(
ef.planner.EvalContext().Context,
ef.planner.SessionData(),
)
func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(ef.planner.SessionData())
})
c := makeStmtEnvCollector(ef.planner.EvalContext().Context, ie.(*InternalExecutor))

// Show the version of Cockroach running.
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/cancelchecker"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
Expand Down Expand Up @@ -825,7 +826,9 @@ func (p *planner) QueryRowEx(
stmt string,
qargs ...interface{},
) (tree.Datums, error) {
ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData())
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(p.SessionData())
})
return ie.QueryRowEx(ctx, opName, txn, override, stmt, qargs...)
}

Expand All @@ -843,7 +846,9 @@ func (p *planner) QueryIteratorEx(
stmt string,
qargs ...interface{},
) (tree.InternalRows, error) {
ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData())
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(p.SessionData())
})
rows, err := ie.QueryIteratorEx(ctx, opName, txn, override, stmt, qargs...)
return rows.(tree.InternalRows), err
}
8 changes: 6 additions & 2 deletions pkg/sql/resolve_oid.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ import (
func (p *planner) ResolveOIDFromString(
ctx context.Context, resultType *types.T, toResolve *tree.DString,
) (*tree.DOid, error) {
ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData())
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(p.SessionData())
})
return resolveOID(
ctx, p.Txn(),
ie,
Expand All @@ -41,7 +43,9 @@ func (p *planner) ResolveOIDFromString(
func (p *planner) ResolveOIDFromOID(
ctx context.Context, resultType *types.T, toResolve *tree.DOid,
) (*tree.DOid, error) {
ie := p.ExecCfg().InternalExecutorFactory(ctx, p.SessionData())
ie := p.ExecCfg().InternalExecutorFactory(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(p.SessionData())
})
return resolveOID(
ctx, p.Txn(),
ie,
Expand Down
14 changes: 8 additions & 6 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewSchemaChangerForTesting(
// Note that this doesn't end up actually being session-bound but that's
// good enough for testing.
ieFactory: func(
ctx context.Context, sd *sessiondata.SessionData,
ctx context.Context, initInternalExecutor func(sqlutil.InternalExecutor),
) sqlutil.InternalExecutor {
return execCfg.InternalExecutor
},
Expand Down Expand Up @@ -1949,7 +1949,9 @@ func createSchemaChangeEvalCtx(
) extendedEvalContext {

sd := NewFakeSessionData(execCfg.SV())
ie := ieFactory(ctx, sd)
ie := ieFactory(ctx, func(ex sqlutil.InternalExecutor) {
ex.SetSessionData(sd)
})

evalCtx := extendedEvalContext{
// Make a session tracing object on-the-fly. This is OK
Expand Down Expand Up @@ -2057,8 +2059,8 @@ func (r schemaChangeResumer) Resume(ctx context.Context, execCtx interface{}) er
clock: p.ExecCfg().Clock,
settings: p.ExecCfg().Settings,
execCfg: p.ExecCfg(),
ieFactory: func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor {
return r.job.MakeSessionBoundInternalExecutor(ctx, sd)
ieFactory: func(ctx context.Context, initInternalExecutor func(ie sqlutil.InternalExecutor)) sqlutil.InternalExecutor {
return r.job.MakeSessionBoundInternalExecutor(ctx, initInternalExecutor)
},
metrics: p.ExecCfg().SchemaChangerMetrics,
}
Expand Down Expand Up @@ -2243,8 +2245,8 @@ func (r schemaChangeResumer) OnFailOrCancel(ctx context.Context, execCtx interfa
clock: p.ExecCfg().Clock,
settings: p.ExecCfg().Settings,
execCfg: p.ExecCfg(),
ieFactory: func(ctx context.Context, sd *sessiondata.SessionData) sqlutil.InternalExecutor {
return r.job.MakeSessionBoundInternalExecutor(ctx, sd)
ieFactory: func(ctx context.Context, initInternalExecutor func(executor sqlutil.InternalExecutor)) sqlutil.InternalExecutor {
return r.job.MakeSessionBoundInternalExecutor(ctx, initInternalExecutor)
},
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/schemachanger/scsqldeps/index_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ func (iv indexValidator) ValidateForwardIndexes(
if err != nil {
return err
}
return fn(ctx, validationTxn, iv.ieFactory(ctx, iv.newFakeSessionData(&iv.settings.SV)))
return fn(ctx, validationTxn, iv.ieFactory(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(iv.newFakeSessionData(&iv.settings.SV))
}))
}
return iv.validateForwardIndexes(ctx, tableDesc, indexes, txnRunner, withFirstMutationPublic, gatherAllInvalid, override)
}
Expand All @@ -95,7 +97,9 @@ func (iv indexValidator) ValidateInvertedIndexes(
if err != nil {
return err
}
return fn(ctx, validationTxn, iv.ieFactory(ctx, iv.newFakeSessionData(&iv.settings.SV)))
return fn(ctx, validationTxn, iv.ieFactory(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(iv.newFakeSessionData(&iv.settings.SV))
}))
}
return iv.validateInvertedIndexes(ctx, iv.codec, tableDesc, indexes, txnRunner, gatherAllInvalid, override)
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/sqlutil/internal_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ type InternalExecutor interface {
WithSyntheticDescriptors(
descs []catalog.Descriptor, run func() error,
) error

// SetSessionData binds the session variables that will be used by queries
// performed through this executor from now on. This creates a new session
// stack.
//
// SetSessionData cannot be called concurrently with query execution.
SetSessionData(*sessiondata.SessionData)
}

// InternalRows is an iterator interface that's exposed by the internal
Expand Down Expand Up @@ -194,7 +201,7 @@ type InternalRows interface {
// SessionBoundInternalExecutorFactory is a function that produces a "session
// bound" internal executor.
type SessionBoundInternalExecutorFactory func(
context.Context, *sessiondata.SessionData,
context.Context, func(InternalExecutor),
) InternalExecutor

// InternalExecFn is the type of functions that operates using an internalExecutor.
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/temporary_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,9 @@ func (c *TemporaryObjectCleaner) doTemporaryObjectCleanup(
}

// Clean up temporary data for inactive sessions.
ie := c.makeSessionBoundInternalExecutor(ctx, &sessiondata.SessionData{})
ie := c.makeSessionBoundInternalExecutor(ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(&sessiondata.SessionData{})
})
for sessionID := range sessionIDs {
if _, ok := activeSessions[sessionID.Uint128]; !ok {
log.Eventf(ctx, "cleaning up temporary object for session %q", sessionID)
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/unsplit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -106,7 +107,9 @@ func (n *unsplitAllNode) startExec(params runParams) error {
if n.index.GetID() != n.tableDesc.GetPrimaryIndexID() {
indexName = n.index.GetName()
}
ie := params.p.ExecCfg().InternalExecutorFactory(params.ctx, params.SessionData())
ie := params.p.ExecCfg().InternalExecutorFactory(params.ctx, func(ie sqlutil.InternalExecutor) {
ie.SetSessionData(params.SessionData())
})
it, err := ie.QueryIteratorEx(
params.ctx, "split points query", params.p.txn, sessiondata.NoSessionDataOverride,
statement,
Expand Down

0 comments on commit 8137ebb

Please sign in to comment.