Skip to content

Commit

Permalink
sql: refactor InternalExecutor on EvalCtx
Browse files Browse the repository at this point in the history
Minor refactor to remove duplicate InternalExecutor definition
on EvalCtx and in the tree package.

We already have 2 interface declarations for InternalExecutor
in sql and sqlutil, this refactor hopefully makes the dependencies
a little more clear.

For builtins, we now use an InternalExecutorFactory where
we must pass in the txn, session data and desc.Collections
to use the IE.

Release note: None
  • Loading branch information
RichardJCai committed Oct 21, 2021
1 parent 7d2431d commit f891b79
Show file tree
Hide file tree
Showing 22 changed files with 191 additions and 157 deletions.
45 changes: 45 additions & 0 deletions pkg/bench/rttanalysis/internal_executor_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rttanalysis

import "testing"

func BenchmarkInternalExecutor(b *testing.B) {
tests := []RoundTripBenchTestCase{
{
Name: "has_schema_privilege_1_col",
Setup: `CREATE SCHEMA s;`,
Stmt: `SELECT has_schema_privilege('s', 'CREATE')`,
},
{
Name: "has_schema_privilege_2_col",
Setup: `CREATE SCHEMA s;`,
Stmt: `SELECT has_schema_privilege('s', 'CREATE'), has_schema_privilege('s', 'CREATE')`,
},
{
Name: "has_schema_privilege_3_col",
Setup: `CREATE SCHEMA s;`,
Stmt: `SELECT has_schema_privilege('s', 'CREATE'), has_schema_privilege('s', 'CREATE'), has_schema_privilege('s', 'CREATE')`,
},
{
Name: "has_schema_privilege_3_schemas_3_col",
Setup: `CREATE SCHEMA s; CREATE SCHEMA s2; CREATE SCHEMA s3;`,
Stmt: `SELECT has_schema_privilege('s', 'CREATE'), has_schema_privilege('s', 'CREATE'), has_schema_privilege('s', 'CREATE')`,
},
{
Name: "has_schema_privilege_5_schemas_3_col",
Setup: `CREATE SCHEMA s; CREATE SCHEMA s2; CREATE SCHEMA s3; CREATE SCHEMA s4; CREATE SCHEMA s5;`,
Stmt: `SELECT has_schema_privilege('s', 'CREATE'), has_schema_privilege('s', 'CREATE'), has_schema_privilege('s', 'CREATE')`,
},
}

RunRoundTripBenchmark(b, tests)
}
10 changes: 10 additions & 0 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ exp,benchmark
20,Grant/grant_all_on_3_tables
17,GrantRole/grant_1_role
20,GrantRole/grant_2_roles
10,InternalExecutor/has_schema_privilege_1_col
14,InternalExecutor/has_schema_privilege_2_col
18,InternalExecutor/has_schema_privilege_3_col
48,InternalExecutor/has_schema_privilege_3_schemas_3_col
72,InternalExecutor/has_schema_privilege_5_schemas_3_col
2,ORMQueries/activerecord_type_introspection_query
2,ORMQueries/django_table_introspection_1_table
2,ORMQueries/django_table_introspection_4_tables
Expand All @@ -60,8 +65,13 @@ exp,benchmark
2,ORMQueries/has_column_privilege_using_column_name
2,ORMQueries/has_table_privilege_real_table
2,ORMQueries/has_table_privilege_virtual_table
3,ORMQueries/information_schema._pg_index_position
2,ORMQueries/pg_attribute
2,ORMQueries/pg_class
4,ORMQueries/pg_is_other_temp_schema
4,ORMQueries/pg_is_other_temp_schema_multiple_times
2,ORMQueries/pg_my_temp_schema
2,ORMQueries/pg_my_temp_schema_multiple_times
2,ORMQueries/pg_namespace
2,ORMQueries/pg_type
16,Revoke/revoke_all_on_1_table
Expand Down
1 change: 0 additions & 1 deletion pkg/bench/rttanalysis/validate_benchmark_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,6 @@ func rewriteBenchmarkExpectations(t *testing.T, benchmarks []string) {
t.Fatalf("duplicate expecatations for Name %s", expectations[i].name)
}
}

writeExpectationsFile(t, expectations)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamingutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/security",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/streaming",
"//pkg/util/hlc",
"//pkg/util/protoutil",
Expand Down
10 changes: 2 additions & 8 deletions pkg/ccl/streamingccl/streamingutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
Expand All @@ -33,9 +31,7 @@ func doCompleteIngestion(
const jobsQuery = `SELECT progress FROM system.jobs WHERE id=$1 FOR UPDATE`
row, err := evalCtx.Planner.QueryRowEx(evalCtx.Context,
"get-stream-ingestion-job-metadata",
txn, sessiondata.InternalExecutorOverride{
User: security.RootUserName(),
}, jobsQuery, jobID)
txn, evalCtx.SessionData(), jobsQuery, jobID)
if err != nil {
return err
}
Expand Down Expand Up @@ -89,8 +85,6 @@ func doCompleteIngestion(
updateJobQuery := `UPDATE system.jobs SET progress=$1 WHERE id=$2`
_, err = evalCtx.Planner.QueryRowEx(evalCtx.Context,
"set-stream-ingestion-job-metadata", txn,
sessiondata.InternalExecutorOverride{
User: security.RootUserName(),
}, updateJobQuery, progressBytes, jobID)
evalCtx.SessionData(), updateJobQuery, progressBytes, jobID)
return err
}
17 changes: 16 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {

// Now that we have a pgwire.Server (which has a sql.Server), we can close a
// circular dependency between the rowexec.Server and sql.Server and set
// SessionBoundInternalExecutorFactory. The same applies for setting a
// InternalExecutorFactory. The same applies for setting a
// SessionBoundInternalExecutor on the job registry.
ieFactory := func(
ctx context.Context, sessionData *sessiondata.SessionData,
Expand All @@ -777,9 +777,24 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ie.SetSessionData(sessionData)
return &ie
}

ieFactoryForBuiltins := func(
ctx context.Context, sessionData *sessiondata.SessionData, extraTxnState sql.ExtraTxnState,
) sql.InternalExecutor {
ie := sql.MakeInternalExecutor(
ctx,
pgServer.SQLServer,
internalMemMetrics,
cfg.Settings,
)
ie.SetExtraTxnState(extraTxnState)
ie.SetSessionData(sessionData)
return ie
}
distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory = ieFactory
jobRegistry.SetSessionBoundInternalExecutorFactory(ieFactory)
execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg, ieFactory)
execCfg.InternalExecutorFactory = ieFactoryForBuiltins

distSQLServer.ServerConfig.ProtectedTimestampProvider = execCfg.ProtectedTimestampProvider

Expand Down
47 changes: 32 additions & 15 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func (s *Server) SetupConn(

ex := s.newConnExecutor(
ctx, sdMutIterator, stmtBuf, clientComm, memMetrics, &s.Metrics,
s.sqlStats.GetApplicationStats(sd.ApplicationName),
s.sqlStats.GetApplicationStats(sd.ApplicationName), ExtraTxnState{},
)
return ConnectionHandler{ex}, nil
}
Expand Down Expand Up @@ -724,6 +724,7 @@ func (s *Server) newConnExecutor(
memMetrics MemoryMetrics,
srvMetrics *Metrics,
applicationStats sqlstats.ApplicationStats,
extraTxnState ExtraTxnState,
) *connExecutor {
// Create the various monitors.
// The session monitors are started in activate().
Expand Down Expand Up @@ -809,6 +810,12 @@ func (s *Server) newConnExecutor(
}

ex.phaseTimes.SetSessionPhaseTime(sessionphase.SessionInit, timeutil.Now())
ex.mu.ActiveQueries = make(map[ClusterWideID]*queryMeta)
ex.machine = fsm.MakeMachine(TxnStateTransitions, stateNoTxn{}, &ex.state)

ex.sessionTracing.ex = ex
ex.transitionCtx.sessionTracing = &ex.sessionTracing

ex.extraTxnState.prepStmtsNamespace = prepStmtNamespace{
prepStmts: make(map[string]*PreparedStatement),
portals: make(map[string]PreparedPortal),
Expand All @@ -818,19 +825,20 @@ func (s *Server) newConnExecutor(
portals: make(map[string]PreparedPortal),
}
ex.extraTxnState.prepStmtsNamespaceMemAcc = ex.sessionMon.MakeBoundAccount()
ex.extraTxnState.descCollection = s.cfg.CollectionFactory.MakeCollection(
descs.NewTemporarySchemaProvider(sdMutIterator.sds),
)
ex.extraTxnState.txnRewindPos = -1
ex.extraTxnState.schemaChangeJobRecords = make(map[descpb.ID]*jobs.Record)
ex.mu.ActiveQueries = make(map[ClusterWideID]*queryMeta)
ex.machine = fsm.MakeMachine(TxnStateTransitions, stateNoTxn{}, &ex.state)

ex.sessionTracing.ex = ex
ex.transitionCtx.sessionTracing = &ex.sessionTracing

ex.extraTxnState.hasAdminRoleCache = HasAdminRoleCache{}

if extraTxnState.descs != nil {
ex.extraTxnState.descCollection = extraTxnState.descs
ex.extraTxnState.keepDescCollectionOnClose = true
} else {
descCol := s.cfg.CollectionFactory.MakeCollection(
descs.NewTemporarySchemaProvider(sdMutIterator.sds),
)
ex.extraTxnState.descCollection = &descCol
}

ex.initPlanner(ctx, &ex.planner)

return ex
Expand All @@ -857,6 +865,7 @@ func (s *Server) newConnExecutorWithTxn(
txn *kv.Txn,
syntheticDescs []catalog.Descriptor,
applicationStats sqlstats.ApplicationStats,
extraTxnState ExtraTxnState,
) *connExecutor {
ex := s.newConnExecutor(
ctx,
Expand All @@ -866,6 +875,7 @@ func (s *Server) newConnExecutorWithTxn(
memMetrics,
srvMetrics,
applicationStats,
extraTxnState,
)
if txn.Type() == kv.LeafTxn {
// If the txn is a leaf txn it is not allowed to perform mutations. For
Expand Down Expand Up @@ -1114,7 +1124,12 @@ type connExecutor struct {
// transaction finishes or gets retried.
extraTxnState struct {
// descCollection collects descriptors used by the current transaction.
descCollection descs.Collection
descCollection *descs.Collection

// keepDescsCollectionOnClose determines if descCollection should be
// flushed on connExec's close. If the descCollection is passed in
// from the parent, we do not want to flush it.
keepDescCollectionOnClose bool

// jobs accumulates jobs staged for execution inside the transaction.
// Staging happens when executing statements that are implemented with a
Expand Down Expand Up @@ -1482,7 +1497,9 @@ func (ex *connExecutor) resetExtraTxnState(ctx context.Context, ev txnEvent) err
delete(ex.extraTxnState.schemaChangeJobRecords, k)
}

ex.extraTxnState.descCollection.ReleaseAll(ctx)
if !ex.extraTxnState.keepDescCollectionOnClose {
ex.extraTxnState.descCollection.ReleaseAll(ctx)
}

// Close all portals.
for name, p := range ex.extraTxnState.prepStmtsNamespace.portals {
Expand Down Expand Up @@ -2445,7 +2462,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo
RegionsServer: ex.server.cfg.RegionsServer,
SQLStatusServer: ex.server.cfg.SQLStatusServer,
MemMetrics: &ex.memMetrics,
Descs: &ex.extraTxnState.descCollection,
Descs: ex.extraTxnState.descCollection,
ExecCfg: ex.server.cfg,
DistSQLPlanner: ex.server.cfg.DistSQLPlanner,
TxnModesSetter: ex,
Expand Down Expand Up @@ -2897,7 +2914,7 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
return nil
}
executor := scexec.NewExecutor(
ex.planner.txn, &ex.extraTxnState.descCollection, ex.server.cfg.Codec,
ex.planner.txn, ex.extraTxnState.descCollection, ex.server.cfg.Codec,
nil /* backfiller */, nil /* jobTracker */, ex.server.cfg.NewSchemaChangerTestingKnobs,
ex.server.cfg.JobRegistry, ex.planner.execCfg.InternalExecutor,
)
Expand Down Expand Up @@ -2948,7 +2965,7 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
}
// Write the job ID to the affected descriptors.
if err := scexec.UpdateDescriptorJobIDs(
ctx, ex.planner.Txn(), &ex.extraTxnState.descCollection, descIDs, jobspb.InvalidJobID, job.ID(),
ctx, ex.planner.Txn(), ex.extraTxnState.descCollection, descIDs, jobspb.InvalidJobID, job.ID(),
); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ func (ex *connExecutor) checkDescriptorTwoVersionInvariant(ctx context.Context)
ctx,
ex.server.cfg.Clock,
ex.server.cfg.InternalExecutor,
&ex.extraTxnState.descCollection,
ex.extraTxnState.descCollection,
ex.state.mu.txn,
inRetryBackoff,
)
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,22 @@ type ExecutorConfig struct {
// SpanConfigReconciliationJobDeps are used to drive the span config
// reconciliation job.
SpanConfigReconciliationJobDeps spanconfig.ReconciliationDependencies

// InternalExecutorFactory is used to create an InternalExecutor binded with
// SessionData and other ExtraTxnState.
// This is currently only for builtin functions where we need to execute sql.
InternalExecutorFactory InternalExecutorFactory
}

// InternalExecutorFactory is a function that produces a "session
// bound" internal executor.
type InternalExecutorFactory func(
context.Context, *sessiondata.SessionData, ExtraTxnState,
) InternalExecutor

// ExtraTxnState holds state to initialize an InternalExecutor with.
type ExtraTxnState struct {
descs *descs.Collection
}

// UpdateVersionSystemSettingHook provides a callback that allows us
Expand Down
12 changes: 5 additions & 7 deletions pkg/sql/explain_bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ The UI can then be accessed at http://localhost:16686/search`, stmt)
}

func (b *stmtBundleBuilder) addEnv(ctx context.Context) {
c := makeStmtEnvCollector(ctx, b.ie, nil)
c := makeStmtEnvCollector(ctx, b.ie)

var buf bytes.Buffer
if err := c.PrintVersion(&buf); err != nil {
Expand Down Expand Up @@ -416,10 +416,8 @@ type stmtEnvCollector struct {
sessionDataOverride *sessiondata.SessionData
}

func makeStmtEnvCollector(
ctx context.Context, ie *InternalExecutor, sessionDataOverride *sessiondata.SessionData,
) stmtEnvCollector {
return stmtEnvCollector{ctx: ctx, ie: ie, sessionDataOverride: sessionDataOverride}
func makeStmtEnvCollector(ctx context.Context, ie *InternalExecutor) stmtEnvCollector {
return stmtEnvCollector{ctx: ctx, ie: ie}
}

// environmentQuery is a helper to run a query that returns a single string
Expand All @@ -429,7 +427,7 @@ func (c *stmtEnvCollector) query(query string) (string, error) {
c.ctx,
"stmtEnvCollector",
nil, /* txn */
sessiondata.InternalExecutorOverride{SessionData: c.sessionDataOverride},
sessiondata.InternalExecutorOverride{},
query,
)
if err != nil {
Expand Down Expand Up @@ -550,7 +548,7 @@ func (c *stmtEnvCollector) PrintClusterSettings(w io.Writer) error {
c.ctx,
"stmtEnvCollector",
nil, /* txn */
sessiondata.InternalExecutorOverride{SessionData: c.sessionDataOverride},
sessiondata.InternalExecutorOverride{},
"SELECT variable, value, description FROM [ SHOW ALL CLUSTER SETTINGS ]",
)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/faketreeeval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/sql/faketreeeval",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/security",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/faketreeeval/evalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (ep *DummyEvalPlanner) QueryRowEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
session *sessiondata.SessionData,
stmt string,
qargs ...interface{},
) (tree.Datums, error) {
Expand All @@ -346,7 +346,7 @@ func (ep *DummyEvalPlanner) QueryIteratorEx(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
session *sessiondata.SessionData,
stmt string,
qargs ...interface{},
) (tree.InternalRows, error) {
Expand Down
Loading

0 comments on commit f891b79

Please sign in to comment.