Skip to content

Commit

Permalink
Add IEFactory
Browse files Browse the repository at this point in the history
Release note (<category, see below>): <what> <show> <why>
  • Loading branch information
RichardJCai committed Oct 12, 2021
1 parent 4832a18 commit 26f0cc2
Show file tree
Hide file tree
Showing 14 changed files with 113 additions and 145 deletions.
10 changes: 2 additions & 8 deletions pkg/ccl/streamingccl/streamingutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand All @@ -33,9 +31,7 @@ func doCompleteIngestion(
const jobsQuery = `SELECT progress FROM system.jobs WHERE id=$1 FOR UPDATE`
row, err := evalCtx.Planner.QueryRowEx(evalCtx.Context,
"get-stream-ingestion-job-metadata",
txn, sessiondata.InternalExecutorOverride{
User: security.RootUserName(),
}, jobsQuery, jobID)
txn, evalCtx.SessionData(), jobsQuery, jobID)
if err != nil {
return err
}
Expand Down Expand Up @@ -89,8 +85,6 @@ func doCompleteIngestion(
updateJobQuery := `UPDATE system.jobs SET progress=$1 WHERE id=$2`
_, err = evalCtx.Planner.QueryRowEx(evalCtx.Context,
"set-stream-ingestion-job-metadata", txn,
sessiondata.InternalExecutorOverride{
User: security.RootUserName(),
}, updateJobQuery, progressBytes, jobID)
evalCtx.SessionData(), updateJobQuery, progressBytes, jobID)
return err
}
17 changes: 16 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {

// Now that we have a pgwire.Server (which has a sql.Server), we can close a
// circular dependency between the rowexec.Server and sql.Server and set
// SessionBoundInternalExecutorFactory. The same applies for setting a
// InternalExecutorFactory. The same applies for setting a
// SessionBoundInternalExecutor on the job registry.
ieFactory := func(
ctx context.Context, sessionData *sessiondata.SessionData,
Expand All @@ -777,9 +777,24 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ie.SetSessionData(sessionData)
return &ie
}

ieFactoryForBuiltins := func(
ctx context.Context, sessionData *sessiondata.SessionData, extraTxnState sql.ExtraTxnState,
) sql.InternalExecutor {
ie := sql.MakeInternalExecutor(
ctx,
pgServer.SQLServer,
internalMemMetrics,
cfg.Settings,
)
ie.SetExtraTxnState(extraTxnState)
ie.SetSessionData(sessionData)
return ie
}
distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory = ieFactory
jobRegistry.SetSessionBoundInternalExecutorFactory(ieFactory)
execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg, ieFactory)
execCfg.InternalExecutorFactory = ieFactoryForBuiltins

distSQLServer.ServerConfig.ProtectedTimestampProvider = execCfg.ProtectedTimestampProvider

Expand Down
21 changes: 14 additions & 7 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (s *Server) SetupConn(

ex := s.newConnExecutor(
ctx, sdMutIterator, stmtBuf, clientComm, memMetrics, &s.Metrics,
s.sqlStats.GetApplicationStats(sd.ApplicationName),
s.sqlStats.GetApplicationStats(sd.ApplicationName), ExtraTxnState{},
)
return ConnectionHandler{ex}, nil
}
Expand Down Expand Up @@ -724,6 +724,7 @@ func (s *Server) newConnExecutor(
memMetrics MemoryMetrics,
srvMetrics *Metrics,
applicationStats sqlstats.ApplicationStats,
extraTxnState ExtraTxnState,
) *connExecutor {
// Create the various monitors.
// The session monitors are started in activate().
Expand Down Expand Up @@ -809,6 +810,12 @@ func (s *Server) newConnExecutor(
}

ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionInit, timeutil.Now())
ex.mu.ActiveQueries = make(map[ClusterWideID]*queryMeta)
ex.machine = fsm.MakeMachine(TxnStateTransitions, stateNoTxn{}, &ex.state)

ex.sessionTracing.ex = ex
ex.transitionCtx.sessionTracing = &ex.sessionTracing

ex.extraTxnState.prepStmtsNamespace = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]PreparedPortal),
Expand All @@ -823,14 +830,12 @@ func (s *Server) newConnExecutor(
)
ex.extraTxnState.txnRewindPos = -1
ex.extraTxnState.schemaChangeJobRecords = make(map[descpb.ID]*jobs.Record)
ex.mu.ActiveQueries = make(map[ClusterWideID]*queryMeta)
ex.machine = fsm.MakeMachine(TxnStateTransitions, stateNoTxn{}, &ex.state)

ex.sessionTracing.ex = ex
ex.transitionCtx.sessionTracing = &ex.sessionTracing

ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}

if extraTxnState.descs != nil {
ex.extraTxnState.descCollection = *extraTxnState.descs
}

ex.initPlanner(ctx, &ex.planner)

return ex
Expand All @@ -857,6 +862,7 @@ func (s *Server) newConnExecutorWithTxn(
txn *kv.Txn,
syntheticDescs []catalog.Descriptor,
applicationStats sqlstats.ApplicationStats,
extraTxnState ExtraTxnState,
) *connExecutor {
ex := s.newConnExecutor(
ctx,
Expand All @@ -866,6 +872,7 @@ func (s *Server) newConnExecutorWithTxn(
memMetrics,
srvMetrics,
applicationStats,
extraTxnState,
)
if txn.Type() == kv.LeafTxn {
// If the txn is a leaf txn it is not allowed to perform mutations. For
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,21 @@ type ExecutorConfig struct {
// SpanConfigReconciliationJobDeps are used to drive the span config
// reconciliation job.
SpanConfigReconciliationJobDeps spanconfig.ReconciliationDependencies

// InternalExecutorFactory is used to create an InternalExecutor binded with
// SessionData and other ExtraTxnState.
// This is currently only for builtin functions where we need to execute sql.
InternalExecutorFactory InternalExecutorFactory
}

// InternalExecutorFactory is a function that produces a "session
// bound" internal executor.
type InternalExecutorFactory func(
context.Context, *sessiondata.SessionData, ExtraTxnState,
) InternalExecutor

type ExtraTxnState struct {
descs *descs.Collection
}

// UpdateVersionSystemSettingHook provides a callback that allows us
Expand Down
12 changes: 5 additions & 7 deletions pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ The UI can then be accessed at http://localhost:16686/search`, stmt)
}

func (b *stmtBundleBuilder) addEnv(ctx context.Context) {
c := makeStmtEnvCollector(ctx, b.ie, nil)
c := makeStmtEnvCollector(ctx, b.ie)

var buf bytes.Buffer
if err := c.PrintVersion(&buf); err != nil {
Expand Down Expand Up @@ -416,10 +416,8 @@ type stmtEnvCollector struct {
sessionDataOverride *sessiondata.SessionData
}

func makeStmtEnvCollector(
ctx context.Context, ie *InternalExecutor, sessionDataOverride *sessiondata.SessionData,
) stmtEnvCollector {
return stmtEnvCollector{ctx: ctx, ie: ie, sessionDataOverride: sessionDataOverride}
func makeStmtEnvCollector(ctx context.Context, ie *InternalExecutor) stmtEnvCollector {
return stmtEnvCollector{ctx: ctx, ie: ie}
}

// environmentQuery is a helper to run a query that returns a single string
Expand All @@ -429,7 +427,7 @@ func (c *stmtEnvCollector) query(query string) (string, error) {
c.ctx,
"stmtEnvCollector",
nil, /* txn */
sessiondata.InternalExecutorOverride{SessionData: c.sessionDataOverride},
sessiondata.InternalExecutorOverride{},
query,
)
if err != nil {
Expand Down Expand Up @@ -550,7 +548,7 @@ func (c *stmtEnvCollector) PrintClusterSettings(w io.Writer) error {
c.ctx,
"stmtEnvCollector",
nil, /* txn */
sessiondata.InternalExecutorOverride{SessionData: c.sessionDataOverride},
sessiondata.InternalExecutorOverride{},
"SELECT variable, value, description FROM [ SHOW ALL CLUSTER SETTINGS ]",
)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/faketreeeval/evalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (ep *DummyEvalPlanner) QueryRowEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
session *sessiondata.SessionData,
stmt string,
qargs ...interface{},
) (tree.Datums, error) {
Expand All @@ -346,7 +346,7 @@ func (ep *DummyEvalPlanner) QueryIteratorEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
session *sessiondata.SessionData,
stmt string,
qargs ...interface{},
) (tree.InternalRows, error) {
Expand Down
15 changes: 10 additions & 5 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type InternalExecutor struct {
//
// Warning: Not safe for concurrent use from multiple goroutines.
syntheticDescriptors []catalog.Descriptor

extraTxnState ExtraTxnState
}

// WithSyntheticDescriptors sets the synthetic descriptors before running the
Expand Down Expand Up @@ -125,6 +127,10 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData)
ie.sessionDataStack = sessiondata.NewStack(sessionData)
}

func (ie *InternalExecutor) SetExtraTxnState(extraTxnState ExtraTxnState) {
ie.extraTxnState = extraTxnState
}

// initConnEx creates a connExecutor and runs it on a separate goroutine. It
// takes in a StmtBuf into which commands can be pushed and a WaitGroup that
// will be signaled when connEx.run() returns.
Expand All @@ -143,6 +149,7 @@ func (ie *InternalExecutor) initConnEx(
sd *sessiondata.SessionData,
stmtBuf *StmtBuf,
wg *sync.WaitGroup,
extraTxnState ExtraTxnState,
syncCallback func([]resWithPos),
errCallback func(error),
) {
Expand Down Expand Up @@ -182,6 +189,7 @@ func (ie *InternalExecutor) initConnEx(
ie.memMetrics,
&ie.s.InternalMetrics,
applicationStats,
extraTxnState,
)
} else {
ex = ie.s.newConnExecutorWithTxn(
Expand All @@ -195,6 +203,7 @@ func (ie *InternalExecutor) initConnEx(
txn,
ie.syntheticDescriptors,
applicationStats,
extraTxnState,
)
}

Expand Down Expand Up @@ -570,9 +579,6 @@ func (ie *InternalExecutor) QueryIteratorEx(

// applyOverrides overrides the respective fields from sd for all the fields set on o.
func applyOverrides(o sessiondata.InternalExecutorOverride, sd *sessiondata.SessionData) {
if o.SessionData != nil {
*sd = *o.SessionData
}
if !o.User.Undefined() {
sd.UserProto = o.User.EncodeProto()
}
Expand All @@ -588,7 +594,6 @@ func applyOverrides(o sessiondata.InternalExecutorOverride, sd *sessiondata.Sess
if o.DatabaseIDToTempSchemaID != nil {
sd.DatabaseIDToTempSchemaID = o.DatabaseIDToTempSchemaID
}
sd.StubCatalogTablesEnabled = o.StubCatalogTables
}

func (ie *InternalExecutor) maybeRootSessionDataOverride(
Expand Down Expand Up @@ -732,7 +737,7 @@ func (ie *InternalExecutor) execInternal(
stmtBuf.Close()
_ = rw.addResult(ctx, ieIteratorResult{err: err})
}
ie.initConnEx(ctx, txn, rw, sd, stmtBuf, &wg, syncCallback, errCallback)
ie.initConnEx(ctx, txn, rw, sd, stmtBuf, &wg, ie.extraTxnState, syncCallback, errCallback)

typeHints := make(tree.PlaceholderTypes, len(datums))
for i, d := range datums {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,11 +1153,12 @@ func (e *urlOutputter) finish() (url.URL, error) {
func (ef *execFactory) showEnv(plan string, envOpts exec.ExplainEnvData) (exec.Node, error) {
var out urlOutputter

c := makeStmtEnvCollector(
ie := ef.planner.extendedEvalCtx.ExecCfg.InternalExecutorFactory(
ef.planner.EvalContext().Context,
ef.planner.extendedEvalCtx.ExecCfg.InternalExecutor,
ef.planner.SessionData(),
ExtraTxnState{ef.planner.Descriptors()},
)
c := makeStmtEnvCollector(ef.planner.EvalContext().Context, &ie)

// Show the version of Cockroach running.
if err := c.PrintVersion(&out.buf); err != nil {
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,21 +825,23 @@ func (p *planner) QueryRowEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
session *sessiondata.SessionData,
stmt string,
qargs ...interface{},
) (tree.Datums, error) {
return p.ExecCfg().InternalExecutor.QueryRowEx(ctx, opName, txn, session, stmt, qargs...)
ie := p.ExecCfg().InternalExecutorFactory(ctx, session, ExtraTxnState{descs: p.Descriptors()})
return ie.QueryRowEx(ctx, opName, txn, sessiondata.InternalExecutorOverride{}, stmt, qargs...)
}

func (p *planner) QueryIteratorEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
session *sessiondata.SessionData,
stmt string,
qargs ...interface{},
) (tree.InternalRows, error) {
rows, err := p.ExecCfg().InternalExecutor.QueryIteratorEx(ctx, opName, txn, session, stmt, qargs...)
ie := p.ExecCfg().InternalExecutorFactory(ctx, session, ExtraTxnState{descs: p.Descriptors()})
rows, err := ie.QueryIteratorEx(ctx, opName, txn, sessiondata.InternalExecutorOverride{}, stmt, qargs...)
return rows.(tree.InternalRows), err
}
16 changes: 7 additions & 9 deletions pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/lexbase"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -1837,9 +1836,8 @@ func makePayloadsForTraceGenerator(
ctx.Ctx(),
"crdb_internal.payloads_for_trace",
ctx.Txn,
sessiondata.InternalExecutorOverride{
User: security.RootUserName(),
}, query,
ctx.SessionData(),
query,
traceID,
)
if err != nil {
Expand Down Expand Up @@ -1898,7 +1896,7 @@ type showCreateAllTablesGenerator struct {
ids []int64
dbName string
acc mon.BoundAccount
user security.SQLUsername
sessionData *sessiondata.SessionData

// The following variables are updated during
// calls to Next() and change throughout the lifecycle of
Expand Down Expand Up @@ -1930,7 +1928,7 @@ func (s *showCreateAllTablesGenerator) Start(ctx context.Context, txn *kv.Txn) e
// We also account for the memory in the BoundAccount memory monitor in
// showCreateAllTablesGenerator.
ids, err := getTopologicallySortedTableIDs(
ctx, s.evalPlanner, txn, s.dbName, s.user, &s.acc,
ctx, s.evalPlanner, txn, s.dbName, s.sessionData, &s.acc,
)
if err != nil {
return err
Expand All @@ -1956,7 +1954,7 @@ func (s *showCreateAllTablesGenerator) Next(ctx context.Context) (bool, error) {
}

createStmt, err := getCreateStatement(
ctx, s.evalPlanner, s.txn, s.ids[s.idx], s.dbName, s.user,
ctx, s.evalPlanner, s.txn, s.ids[s.idx], s.dbName, s.sessionData,
)
if err != nil {
return false, err
Expand Down Expand Up @@ -2002,7 +2000,7 @@ func (s *showCreateAllTablesGenerator) Next(ctx context.Context) (bool, error) {
statementReturnType = alterValidateFKStatements
}
alterStmt, err := getAlterStatements(
ctx, s.evalPlanner, s.txn, s.ids[s.idx], s.dbName, s.user, statementReturnType,
ctx, s.evalPlanner, s.txn, s.ids[s.idx], s.dbName, s.sessionData, statementReturnType,
)
if err != nil {
return false, err
Expand Down Expand Up @@ -2043,6 +2041,6 @@ func makeShowCreateAllTablesGenerator(
evalPlanner: ctx.Planner,
dbName: dbName,
acc: ctx.Mon.MakeBoundAccount(),
user: ctx.SessionData().User(),
sessionData: ctx.SessionData(),
}, nil
}
Loading

0 comments on commit 26f0cc2

Please sign in to comment.