Skip to content

Commit

Permalink
sql: refactor InternalExecutor on EvalCtx
Browse files Browse the repository at this point in the history
Minor refactor to remove duplicate InternalExecutor definition
on EvalCtx and in the tree package.

We already have 2 interface declarations for InternalExecutor
in sql and sqlutil, this refactor hopefully makes the dependencies
a little more clear.

For builtins, we now use an InternalExecutorFactory where
we must pass in the txn, session data and desc.Collections
to use the IE.

Release note: None
  • Loading branch information
RichardJCai committed Nov 24, 2021
1 parent e85866e commit c61e3fc
Show file tree
Hide file tree
Showing 28 changed files with 277 additions and 165 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/streamingccl/streamingutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand All @@ -29,8 +30,9 @@ func doCompleteStreamIngestion(
) error {
// Get the job payload for job_id.
const jobsQuery = `SELECT progress FROM system.jobs WHERE id=$1 FOR UPDATE`
row, err := evalCtx.InternalExecutor.QueryRow(evalCtx.Context,
"get-stream-ingestion-job-metadata", txn, jobsQuery, jobID)
row, err := evalCtx.Planner.QueryRowEx(evalCtx.Context,
"get-stream-ingestion-job-metadata",
txn, sessiondata.NodeUserSessionDataOverride, jobsQuery, jobID)
if err != nil {
return err
}
Expand Down Expand Up @@ -82,7 +84,8 @@ func doCompleteStreamIngestion(
return err
}
updateJobQuery := `UPDATE system.jobs SET progress=$1 WHERE id=$2`
_, err = evalCtx.InternalExecutor.QueryRow(evalCtx.Context,
"set-stream-ingestion-job-metadata", txn, updateJobQuery, progressBytes, jobID)
_, err = evalCtx.Planner.QueryRowEx(evalCtx.Context,
"set-stream-ingestion-job-metadata", txn,
sessiondata.NodeUserSessionDataOverride, updateJobQuery, progressBytes, jobID)
return err
}
2 changes: 2 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ie.SetSessionData(sessionData)
return &ie
}

distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory = ieFactory
jobRegistry.SetSessionBoundInternalExecutorFactory(ieFactory)
execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg, ieFactory)
Expand All @@ -795,6 +796,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
sql.ValidateForwardIndexes,
sql.ValidateInvertedIndexes,
sql.NewFakeSessionData)
execCfg.InternalExecutorFactory = ieFactory

distSQLServer.ServerConfig.ProtectedTimestampProvider = execCfg.ProtectedTimestampProvider

Expand Down
7 changes: 4 additions & 3 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateCheckInTxn(
params.ctx, &params.p.semaCtx, params.EvalContext(), n.tableDesc, params.EvalContext().Txn, ck.Expr,
params.ctx, &params.p.semaCtx, params.ExecCfg().InternalExecutor, params.SessionData(), n.tableDesc, params.EvalContext().Txn, ck.Expr,
); err != nil {
return err
}
Expand All @@ -831,7 +831,8 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateFkInTxn(
params.ctx, params.p.LeaseMgr(), params.EvalContext(), n.tableDesc, params.EvalContext().Txn, name,
params.ctx, params.p.LeaseMgr(), params.ExecCfg().InternalExecutor,
n.tableDesc, params.EvalContext().Txn, name, params.EvalContext().Codec,
); err != nil {
return err
}
Expand All @@ -854,7 +855,7 @@ func (n *alterTableNode) startExec(params runParams) error {
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateUniqueWithoutIndexConstraintInTxn(
params.ctx, params.EvalContext(), n.tableDesc, params.EvalContext().Txn, name,
params.ctx, params.ExecCfg().InternalExecutor, n.tableDesc, params.EvalContext().Txn, name,
); err != nil {
return err
}
Expand Down
29 changes: 14 additions & 15 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
// We need to re-create the evalCtx since the txn may retry.
ie := createSchemaChangeEvalCtx(
ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors,
).InternalExecutor.(sqlutil.InternalExecutor)
).SchemaChangeInternalExecutor
return retryable(ctx, txn, ie)
})
}
Expand Down Expand Up @@ -710,21 +710,21 @@ func (sc *SchemaChanger) validateConstraints(
defer func() { collection.ReleaseAll(ctx) }()
if c.IsCheck() {
if err := validateCheckInTxn(
ctx, &semaCtx, &evalCtx.EvalContext, desc, txn, c.Check().Expr,
ctx, &semaCtx, evalCtx.SchemaChangeInternalExecutor, evalCtx.SessionData(), desc, txn, c.Check().Expr,
); err != nil {
return err
}
} else if c.IsForeignKey() {
if err := validateFkInTxn(ctx, sc.leaseMgr, &evalCtx.EvalContext, desc, txn, c.GetName()); err != nil {
if err := validateFkInTxn(ctx, sc.leaseMgr, evalCtx.SchemaChangeInternalExecutor, desc, txn, c.GetName(), evalCtx.Codec); err != nil {
return err
}
} else if c.IsUniqueWithoutIndex() {
if err := validateUniqueWithoutIndexConstraintInTxn(ctx, &evalCtx.EvalContext, desc, txn, c.GetName()); err != nil {
if err := validateUniqueWithoutIndexConstraintInTxn(ctx, evalCtx.SchemaChangeInternalExecutor, desc, txn, c.GetName()); err != nil {
return err
}
} else if c.IsNotNull() {
if err := validateCheckInTxn(
ctx, &semaCtx, &evalCtx.EvalContext, desc, txn, c.Check().Expr,
ctx, &semaCtx, evalCtx.SchemaChangeInternalExecutor, evalCtx.SessionData(), desc, txn, c.Check().Expr,
); err != nil {
// TODO (lucy): This should distinguish between constraint
// validation errors and other types of unexpected errors, and
Expand Down Expand Up @@ -2055,7 +2055,7 @@ func runSchemaChangesInTxn(
check := &c.ConstraintToUpdateDesc().Check
if check.Validity == descpb.ConstraintValidity_Validating {
if err := validateCheckInTxn(
ctx, &planner.semaCtx, planner.EvalContext(), tableDesc, planner.txn, check.Expr,
ctx, &planner.semaCtx, planner.ExecCfg().InternalExecutor, planner.SessionData(), tableDesc, planner.txn, check.Expr,
); err != nil {
return err
}
Expand All @@ -2079,7 +2079,7 @@ func runSchemaChangesInTxn(
uwi := &c.ConstraintToUpdateDesc().UniqueWithoutIndexConstraint
if uwi.Validity == descpb.ConstraintValidity_Validating {
if err := validateUniqueWithoutIndexConstraintInTxn(
ctx, planner.EvalContext(), tableDesc, planner.txn, c.GetName(),
ctx, planner.ExecCfg().InternalExecutor, tableDesc, planner.txn, c.GetName(),
); err != nil {
return err
}
Expand Down Expand Up @@ -2152,18 +2152,18 @@ func runSchemaChangesInTxn(
func validateCheckInTxn(
ctx context.Context,
semaCtx *tree.SemaContext,
evalCtx *tree.EvalContext,
ie *InternalExecutor,
sessionData *sessiondata.SessionData,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
checkExpr string,
) error {
ie := evalCtx.InternalExecutor.(*InternalExecutor)
var syntheticDescs []catalog.Descriptor
if tableDesc.Version > tableDesc.ClusterVersion.Version {
syntheticDescs = append(syntheticDescs, tableDesc)
}
return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateCheckExpr(ctx, semaCtx, evalCtx.SessionData(), checkExpr, tableDesc, ie, txn)
return validateCheckExpr(ctx, semaCtx, sessionData, checkExpr, tableDesc, ie, txn)
})
}

Expand All @@ -2182,12 +2182,12 @@ func validateCheckInTxn(
func validateFkInTxn(
ctx context.Context,
leaseMgr *lease.Manager,
evalCtx *tree.EvalContext,
ie *InternalExecutor,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
fkName string,
codec keys.SQLCodec,
) error {
ie := evalCtx.InternalExecutor.(*InternalExecutor)
var syntheticDescs []catalog.Descriptor
if tableDesc.Version > tableDesc.ClusterVersion.Version {
syntheticDescs = append(syntheticDescs, tableDesc)
Expand All @@ -2206,7 +2206,7 @@ func validateFkInTxn(
}

return ie.WithSyntheticDescriptors(syntheticDescs, func() error {
return validateForeignKey(ctx, tableDesc, fk, ie, txn, evalCtx.Codec)
return validateForeignKey(ctx, tableDesc, fk, ie, txn, codec)
})
}

Expand All @@ -2224,12 +2224,11 @@ func validateFkInTxn(
// reuse an existing kv.Txn safely.
func validateUniqueWithoutIndexConstraintInTxn(
ctx context.Context,
evalCtx *tree.EvalContext,
ie *InternalExecutor,
tableDesc *tabledesc.Mutable,
txn *kv.Txn,
constraintName string,
) error {
ie := evalCtx.InternalExecutor.(*InternalExecutor)
var syntheticDescs []catalog.Descriptor
if tableDesc.Version > tableDesc.ClusterVersion.Version {
syntheticDescs = append(syntheticDescs, tableDesc)
Expand Down
9 changes: 6 additions & 3 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ func validateForeignKey(
query,
)

values, err := ie.QueryRow(ctx, "validate foreign key constraint", txn, query)
values, err := ie.QueryRowEx(ctx, "validate foreign key constraint",
txn, sessiondata.NodeUserSessionDataOverride, query)
if err != nil {
return err
}
Expand All @@ -295,7 +296,8 @@ func validateForeignKey(
query,
)

values, err := ie.QueryRow(ctx, "validate fk constraint", txn, query)
values, err := ie.QueryRowEx(ctx, "validate fk constraint", txn,
sessiondata.NodeUserSessionDataOverride, query)
if err != nil {
return err
}
Expand Down Expand Up @@ -390,7 +392,8 @@ func validateUniqueConstraint(
query,
)

values, err := ie.QueryRow(ctx, "validate unique constraint", txn, query)
values, err := ie.QueryRowEx(ctx, "validate unique constraint", txn,
sessiondata.NodeUserSessionDataOverride, query)
if err != nil {
return err
}
Expand Down
10 changes: 1 addition & 9 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,7 @@ func (s *Server) newConnExecutor(
}

ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionInit, timeutil.Now())

ex.extraTxnState.prepStmtsNamespace = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]PreparedPortal),
Expand Down Expand Up @@ -2458,14 +2459,6 @@ func (ex *connExecutor) asOfClauseWithSessionDefault(expr tree.AsOfClause) tree.
// same across multiple statements. resetEvalCtx must also be called before each
// statement, to reinitialize other fields.
func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalContext, p *planner) {
ie := MakeInternalExecutor(
ctx,
ex.server,
ex.memMetrics,
ex.server.cfg.Settings,
)
ie.SetSessionDataStack(ex.sessionDataStack)

*evalCtx = extendedEvalContext{
EvalContext: tree.EvalContext{
Planner: p,
Expand All @@ -2488,7 +2481,6 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
Locality: ex.server.cfg.Locality,
Tracer: ex.server.cfg.AmbientCtx.Tracer,
ReCache: ex.server.reCache,
InternalExecutor: &ie,
DB: ex.server.cfg.DB,
SQLLivenessReader: ex.server.cfg.SQLLiveness,
SQLStatsController: ex.server.sqlStatsController,
Expand Down
6 changes: 0 additions & 6 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,6 @@ func (ds *ServerImpl) setupFlow(
if err != nil {
return ctx, nil, nil, err
}
ie := &lazyInternalExecutor{
newInternalExecutor: func() sqlutil.InternalExecutor {
return ds.SessionBoundInternalExecutorFactory(ctx, sd)
},
}

// It's important to populate evalCtx.Txn early. We'll write it again in the
// f.SetTxn() call below, but by then it will already have been captured by
Expand Down Expand Up @@ -360,7 +355,6 @@ func (ds *ServerImpl) setupFlow(
Sequence: &faketreeeval.DummySequenceOperators{},
Tenant: &faketreeeval.DummyTenantOperator{},
Regions: &faketreeeval.DummyRegionOperator{},
InternalExecutor: ie,
Txn: leafTxn,
SQLLivenessReader: ds.ServerConfig.SQLLivenessReader,
SQLStatsController: ds.ServerConfig.SQLStatsController,
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/sql/types"
Expand Down Expand Up @@ -1155,6 +1156,11 @@ type ExecutorConfig struct {
// SpanConfigReconciliationJobDeps are used to drive the span config
// reconciliation job.
SpanConfigReconciliationJobDeps spanconfig.ReconciliationDependencies

// InternalExecutorFactory is used to create an InternalExecutor binded with
// SessionData and other ExtraTxnState.
// This is currently only for builtin functions where we need to execute sql.
InternalExecutorFactory sqlutil.SessionBoundInternalExecutorFactory
}

// UpdateVersionSystemSettingHook provides a callback that allows us
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,6 @@ func makeStmtEnvCollector(ctx context.Context, ie *InternalExecutor) stmtEnvColl
// environmentQuery is a helper to run a query that returns a single string
// value.
func (c *stmtEnvCollector) query(query string) (string, error) {
var row tree.Datums
row, err := c.ie.QueryRowEx(
c.ctx,
"stmtEnvCollector",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/faketreeeval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/faketreeeval",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/security",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/faketreeeval/evalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -333,6 +334,30 @@ func (ep *DummyEvalPlanner) ResolveType(
return nil, errors.WithStack(errEvalPlanner)
}

// QueryRowEx is part of the tree.EvalPlanner interface.
func (ep *DummyEvalPlanner) QueryRowEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) (tree.Datums, error) {
return nil, errors.WithStack(errEvalPlanner)
}

// QueryIteratorEx is part of the tree.EvalPlanner interface.
func (ep *DummyEvalPlanner) QueryIteratorEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) (tree.InternalRows, error) {
return nil, errors.WithStack(errEvalPlanner)
}

// DummyPrivilegedAccessor implements the tree.PrivilegedAccessor interface by returning errors.
type DummyPrivilegedAccessor struct{}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ func (ih *instrumentationHelper) Finish(

var bundle diagnosticsBundle
if ih.collectBundle {
ie := p.extendedEvalCtx.InternalExecutor.(*InternalExecutor)
ie := p.extendedEvalCtx.ExecCfg.InternalExecutor
phaseTimes := statsCollector.PhaseTimes()
if ih.stmtDiagnosticsRecorder.IsExecLatencyConditionMet(
ih.diagRequestID, ih.diagRequest, phaseTimes.GetServiceLatencyNoOverhead(),
Expand Down
6 changes: 1 addition & 5 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,6 @@ func (ie *InternalExecutor) SetSessionData(sessionData *sessiondata.SessionData)
ie.sessionDataStack = sessiondata.NewStack(sessionData)
}

// SetSessionDataStack binds the session variable stack to the internal executor.
func (ie *InternalExecutor) SetSessionDataStack(sessionDataStack *sessiondata.Stack) {
ie.sessionDataStack = sessionDataStack
}

// initConnEx creates a connExecutor and runs it on a separate goroutine. It
// takes in a StmtBuf into which commands can be pushed and a WaitGroup that
// will be signaled when connEx.run() returns.
Expand Down Expand Up @@ -263,6 +258,7 @@ type rowsIterator struct {
}

var _ sqlutil.InternalRows = &rowsIterator{}
var _ tree.InternalRows = &rowsIterator{}

func (r *rowsIterator) Next(ctx context.Context) (_ bool, retErr error) {
// Due to recursive calls to Next() below, this deferred function might get
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/opt_exec_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,10 +1157,11 @@ func (e *urlOutputter) finish() (url.URL, error) {
func (ef *execFactory) showEnv(plan string, envOpts exec.ExplainEnvData) (exec.Node, error) {
var out urlOutputter

c := makeStmtEnvCollector(
ie := ef.planner.extendedEvalCtx.ExecCfg.InternalExecutorFactory(
ef.planner.EvalContext().Context,
ef.planner.extendedEvalCtx.InternalExecutor.(*InternalExecutor),
ef.planner.SessionData(),
)
c := makeStmtEnvCollector(ef.planner.EvalContext().Context, ie.(*InternalExecutor))

// Show the version of Cockroach running.
if err := c.PrintVersion(&out.buf); err != nil {
Expand Down
Loading

0 comments on commit c61e3fc

Please sign in to comment.