Skip to content

Commit

Permalink
Merge #82477
Browse files Browse the repository at this point in the history
82477: sql: introduce new internal executor interfaces r=rafiss,ajwerner a=ZhouXing19

This PR aims to provide a set of safer interfaces for the internal executor, making it less easy to abuse.

Currently, each conn executor underneath the internal executor (we call it “child executor”) has its own set of information, such as descriptor collection, job collection, schema change jobs, etc, even when it’s run with a not-nil outer `kv.Txn`, or there're multiple SQL executions under the same `kv.Txn`.
This is not intuitive, since it violates a rather deep principle that a `descs.Collection` and a SQL txn have a 1:1 relationship. The code doesn’t enforce that, but it ought to. The more places that make it possible to decouple this, the more anxious we get.

Ideally, internal executor with a not-nil txn is either planner or `collectionFactory` oriented, so that the txn is always tightly coupled with the descriptor collection. We thus propose a set of new interfaces to ensure this coupling.

Currently, the usage of an internal executor query function (e.g. `InternalExecutor.ExecEx()`) falls into the following 3 categories:
1. The query is run under a planner context and with a not-nil kv.Txn from this planner.
2. The query is run without a kv.Txn. (e.g. InternalExecutor.ExecEx(..., nil /* txn */, stmt...)
3. The query is running with a not-nil kv.Txn but not under the planner context.

For usage 1, the descriptor collections, txn state, job collections, and session data from the parent planner are expected to be passed to the internal executor's child conn executor.
For usage 2 and 3, if multiple SQL statements are run under the same txn, these executions should share the descs.Collection, txn state machine, job collections and session data for their conn executors. 

To suit these 3 use cases, we proposed 3 interfaces for each of the query function:
(In the following we use `InternalExecutor.ExecEx` as the example)
- For case 1, refactor to use `func (p *planner) ExecExUpdated()`, where the internal executor is always initialized with `descs.Collection`, `TxnState` and etc. from the `sql.planner`. 
- For case 2, refactor to use `ieFactory.WithoutTxn()`, where the query is always run with a nil kv.Txn. 
- For case 3, refactor to use `CollectionFactory.TxnWithExecutor()`. In this function, the internal executor is generated and passed to the call back function to run the query.

We also tried refactoring some of the existing use cases to give an example of the new interface.

(Note that the ultimate goal of this improvement is to deprecate all the "free-hanging" `InternalExecutor` objects (such as `sql.ExecutorConfig.InternalExecutor`) and replace them with an `InternalExecutorFactory` field. `InternalExecutorFactory` is to initialize a REAL internal executor, but it cannot be used directly to run SQL statement queries.
Instead, we wrap the initialization of an internal executor inside each query function, i.e. init it only when you really need to run a query. In other words, the creation of an internal executor becomes closer to the query running.)

fixes #69495
fixes #78998

Release Note: None



Co-authored-by: Jane Xing <[email protected]>
  • Loading branch information
craig[bot] and ZhouXing19 committed Aug 12, 2022
2 parents 773f7d4 + afb5db4 commit 468ac01
Show file tree
Hide file tree
Showing 41 changed files with 906 additions and 342 deletions.
4 changes: 2 additions & 2 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ exp,benchmark
2,ORMQueries/has_table_privilege_1
4,ORMQueries/has_table_privilege_3
6,ORMQueries/has_table_privilege_5
15,ORMQueries/information_schema._pg_index_position
3,ORMQueries/information_schema._pg_index_position
2,ORMQueries/pg_attribute
2,ORMQueries/pg_class
11,ORMQueries/pg_is_other_temp_schema
23,ORMQueries/pg_is_other_temp_schema_multiple_times
18,ORMQueries/pg_is_other_temp_schema_multiple_times
4,ORMQueries/pg_my_temp_schema
4,ORMQueries/pg_my_temp_schema_multiple_times
4,ORMQueries/pg_namespace
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,7 +1581,7 @@ func revalidateIndexes(
// since our table is offline.
var runner sqlutil.HistoricalInternalExecTxnRunner = func(ctx context.Context, fn sqlutil.InternalExecFn) error {
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
ie := job.MakeSessionBoundInternalExecutor(ctx, sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor)
ie := job.MakeSessionBoundInternalExecutor(sql.NewFakeSessionData(execCfg.SV())).(*sql.InternalExecutor)
return fn(ctx, txn, ie)
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func newRowFetcherCache(
return &rowFetcherCache{
codec: codec,
leaseMgr: leaseMgr,
collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */),
collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */),
db: db,
fetchers: cache.NewUnorderedCache(defaultCacheConfig),
watchedFamilies: watchedFamilies,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ func getQualifiedTableName(
func getQualifiedTableNameObj(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor,
) (tree.TableName, error) {
col := execCfg.CollectionFactory.MakeCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */)
col := execCfg.CollectionFactory.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */)
dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
if err != nil {
return tree.TableName{}, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func New(
settings: cfg.Settings,
targets: targets,
leaseMgr: cfg.LeaseManager.(*lease.Manager),
ie: cfg.SessionBoundInternalExecutorFactory(ctx, &sessiondata.SessionData{}),
ie: cfg.InternalExecutorFactory.NewInternalExecutor(&sessiondata.SessionData{}),
collectionFactory: cfg.CollectionFactory,
metrics: metrics,
tolerances: tolerances,
Expand Down
9 changes: 7 additions & 2 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,14 @@ func (j *Job) FractionCompleted() float32 {
// sessionBoundInternalExecutorFactory for a more detailed explanation of why
// this exists.
func (j *Job) MakeSessionBoundInternalExecutor(
ctx context.Context, sd *sessiondata.SessionData,
sd *sessiondata.SessionData,
) sqlutil.InternalExecutor {
return j.registry.sessionBoundInternalExecutorFactory(ctx, sd)
return j.registry.internalExecutorFactory.NewInternalExecutor(sd)
}

// GetInternalExecutorFactory returns the internal executor factory.
func (j *Job) GetInternalExecutorFactory() sqlutil.InternalExecutorFactory {
return j.registry.internalExecutorFactory
}

// MarkIdle marks the job as Idle. Idleness should not be toggled frequently
Expand Down
12 changes: 5 additions & 7 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type Registry struct {
// field. Modifying the TableCollection is basically a per-query operation
// and should be a per-query setting. #34304 is the issue for creating/
// improving this API.
sessionBoundInternalExecutorFactory sqlutil.SessionBoundInternalExecutorFactory
internalExecutorFactory sqlutil.InternalExecutorFactory

// if non-empty, indicates path to file that prevents any job adoptions.
preventAdoptionFile string
Expand Down Expand Up @@ -226,14 +226,12 @@ func MakeRegistry(
return r
}

// SetSessionBoundInternalExecutorFactory sets the
// SessionBoundInternalExecutorFactory that will be used by the job registry
// SetInternalExecutorFactory sets the
// InternalExecutorFactory that will be used by the job registry
// executor. We expose this separately from the constructor to avoid a circular
// dependency.
func (r *Registry) SetSessionBoundInternalExecutorFactory(
factory sqlutil.SessionBoundInternalExecutorFactory,
) {
r.sessionBoundInternalExecutorFactory = factory
func (r *Registry) SetInternalExecutorFactory(factory sqlutil.InternalExecutorFactory) {
r.internalExecutorFactory = factory
}

// NewSpanConstrainer returns an instance of sql.SpanConstrainer as an interface{},
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,17 @@ type wrappedInternalExecutor struct {
}
}

func (ie *wrappedInternalExecutor) QueryBufferedExWithCols(
ctx context.Context,
opName string,
txn *kv.Txn,
session sessiondata.InternalExecutorOverride,
stmt string,
qargs ...interface{},
) ([]tree.Datums, colinfo.ResultColumns, error) {
panic("unimplemented")
}

var _ sqlutil.InternalExecutor = &wrappedInternalExecutor{}

func (ie *wrappedInternalExecutor) Exec(
Expand Down
33 changes: 20 additions & 13 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1779,21 +1779,28 @@ func (s *adminServer) SetUIData(
for key, val := range req.KeyValues {
// Do an upsert of the key. We update each key in a separate transaction to
// avoid long-running transactions and possible deadlocks.
query := `UPSERT INTO system.ui (key, value, "lastUpdated") VALUES ($1, $2, now())`
rowsAffected, err := s.server.sqlServer.internalExecutor.ExecEx(
ctx, "admin-set-ui-data", nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.RootUserName(),
},
query, makeUIKey(userName, key), val)
if err != nil {
return nil, serverError(ctx, err)
}
if rowsAffected != 1 {
return nil, serverErrorf(ctx, "rows affected %d != expected %d", rowsAffected, 1)

if err := s.server.sqlServer.internalExecutorFactory.RunWithoutTxn(ctx, func(
ctx context.Context, ie sqlutil.InternalExecutor,
) error {
query := `UPSERT INTO system.ui (key, value, "lastUpdated") VALUES ($1, $2, now())`
rowsAffected, err := ie.ExecEx(
ctx, "admin-set-ui-data", nil, /* txn */
sessiondata.InternalExecutorOverride{
User: username.RootUserName(),
},
query, makeUIKey(userName, key), val)
if err != nil {
return serverError(ctx, err)
}
if rowsAffected != 1 {
return serverErrorf(ctx, "rows affected %d != expected %d", rowsAffected, 1)
}
return nil
}); err != nil {
return nil, err
}
}

return &serverpb.SetUIDataResponse{}, nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
closedSessionCache: closedSessionCache,
flowScheduler: flowScheduler,
circularInternalExecutor: internalExecutor,
internalExecutorFactory: nil, // will be initialized in server.newSQLServer.
circularJobRegistry: jobRegistry,
jobAdoptionStopFile: jobAdoptionStopFile,
protectedtsProvider: protectedtsProvider,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_internal_executor_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestInternalExecutorClearsMonitorMemory(t *testing.T) {
mon := s.(*TestServer).sqlServer.internalExecutorFactoryMemMonitor
ief := s.ExecutorConfig().(sql.ExecutorConfig).InternalExecutorFactory
sessionData := sql.NewFakeSessionData(&s.ClusterSettings().SV)
ie := ief(ctx, sessionData)
ie := ief.NewInternalExecutor(sessionData)
rows, err := ie.QueryIteratorEx(ctx, "test", nil, sessiondata.NodeUserSessionDataOverride, `SELECT 1`)
require.NoError(t, err)
require.Greater(t, mon.AllocBytes(), int64(0))
Expand Down
52 changes: 29 additions & 23 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,18 +130,19 @@ import (
// standalone SQLServer instances per tenant (the KV layer is shared across all
// tenants).
type SQLServer struct {
ambientCtx log.AmbientContext
stopper *stop.Stopper
sqlIDContainer *base.SQLIDContainer
pgServer *pgwire.Server
distSQLServer *distsql.ServerImpl
execCfg *sql.ExecutorConfig
cfg *BaseConfig
internalExecutor *sql.InternalExecutor
leaseMgr *lease.Manager
blobService *blobs.Service
tracingService *service.Service
tenantConnect kvtenant.Connector
ambientCtx log.AmbientContext
stopper *stop.Stopper
sqlIDContainer *base.SQLIDContainer
pgServer *pgwire.Server
distSQLServer *distsql.ServerImpl
execCfg *sql.ExecutorConfig
cfg *BaseConfig
internalExecutor *sql.InternalExecutor
internalExecutorFactory sqlutil.InternalExecutorFactory
leaseMgr *lease.Manager
blobService *blobs.Service
tracingService *service.Service
tenantConnect kvtenant.Connector
// sessionRegistry can be queried for info on running SQL sessions. It is
// shared between the sql.Server and the statusServer.
sessionRegistry *sql.SessionRegistry
Expand Down Expand Up @@ -303,6 +304,9 @@ type sqlServerArgs struct {
// TODO(tbg): make this less hacky.
circularInternalExecutor *sql.InternalExecutor // empty initially

// internalExecutorFactory is to initialize an internal executor.
internalExecutorFactory sqlutil.InternalExecutorFactory

// Stores and deletes expired liveness sessions.
sqlLivenessProvider sqlliveness.Provider

Expand Down Expand Up @@ -947,18 +951,18 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ieFactoryMonitor.StartNoReserved(ctx, pgServer.SQLServer.GetBytesMonitor())
// Now that we have a pgwire.Server (which has a sql.Server), we can close a
// circular dependency between the rowexec.Server and sql.Server and set
// SessionBoundInternalExecutorFactory. The same applies for setting a
// InternalExecutorFactory. The same applies for setting a
// SessionBoundInternalExecutor on the job registry.
ieFactory := func(
ctx context.Context, sessionData *sessiondata.SessionData,
) sqlutil.InternalExecutor {
ie := sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor)
ie.SetSessionData(sessionData)
return &ie
}
ieFactory := sql.NewInternalExecutorFactory(
pgServer.SQLServer,
internalMemMetrics,
ieFactoryMonitor,
)

collectionFactory.SetInternalExecutorWithTxn(ieFactory)

distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory = ieFactory
jobRegistry.SetSessionBoundInternalExecutorFactory(ieFactory)
distSQLServer.ServerConfig.InternalExecutorFactory = ieFactory
jobRegistry.SetInternalExecutorFactory(ieFactory)
execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg)
execCfg.IndexMerger = sql.NewIndexBackfillerMergePlanner(execCfg)
execCfg.IndexValidator = scdeps.NewIndexValidator(
Expand All @@ -978,6 +982,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.registry.AddMetricStruct(m)
}
*cfg.circularInternalExecutor = sql.MakeInternalExecutor(pgServer.SQLServer, internalMemMetrics, ieFactoryMonitor)
cfg.internalExecutorFactory = ieFactory
execCfg.InternalExecutor = cfg.circularInternalExecutor
stmtDiagnosticsRegistry := stmtdiagnostics.NewRegistry(
cfg.circularInternalExecutor,
Expand Down Expand Up @@ -1070,7 +1075,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.db,
codec,
cfg.registry,
distSQLServer.ServerConfig.SessionBoundInternalExecutorFactory,
distSQLServer.ServerConfig.InternalExecutorFactory,
cfg.sqlStatusServer,
cfg.isMeta1Leaseholder,
sqlExecutorTestingKnobs,
Expand Down Expand Up @@ -1118,6 +1123,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
distSQLServer: distSQLServer,
execCfg: execCfg,
internalExecutor: cfg.circularInternalExecutor,
internalExecutorFactory: cfg.internalExecutorFactory,
leaseMgr: leaseMgr,
blobService: blobService,
tracingService: tracingService,
Expand Down
37 changes: 18 additions & 19 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/storageparam"
"github.com/cockroachdb/cockroach/pkg/sql/storageparam/tablestorageparam"
Expand Down Expand Up @@ -558,10 +560,9 @@ func (n *alterTableNode) startExec(params runParams) error {
return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState,
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateCheckInTxn(
params.ctx, &params.p.semaCtx, params.ExecCfg().InternalExecutorFactory,
params.SessionData(), n.tableDesc, params.p.Txn(), ck.Expr,
); err != nil {
if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
return validateCheckInTxn(ctx, &params.p.semaCtx, params.p.SessionData(), n.tableDesc, txn, ie, ck.Expr)
}); err != nil {
return err
}
ck.Validity = descpb.ConstraintValidity_Validated
Expand All @@ -581,19 +582,12 @@ func (n *alterTableNode) startExec(params runParams) error {
return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState,
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateFkInTxn(
params.ctx,
params.ExecCfg().InternalExecutorFactory,
params.p.SessionData(),
n.tableDesc,
params.p.Txn(),
params.p.Descriptors(),
name,
); err != nil {
if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
return validateFkInTxn(ctx, n.tableDesc, txn, ie, params.p.descCollection, name)
}); err != nil {
return err
}
foundFk.Validity = descpb.ConstraintValidity_Validated

case descpb.ConstraintTypeUnique:
if constraint.Index == nil {
var foundUnique *descpb.UniqueWithoutIndexConstraint
Expand All @@ -610,11 +604,16 @@ func (n *alterTableNode) startExec(params runParams) error {
return pgerror.Newf(pgcode.ObjectNotInPrerequisiteState,
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := validateUniqueWithoutIndexConstraintInTxn(
params.ctx, params.ExecCfg().InternalExecutorFactory(
params.ctx, params.SessionData(),
), n.tableDesc, params.p.Txn(), params.p.User(), name,
); err != nil {
if err := params.p.WithInternalExecutor(params.ctx, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
return validateUniqueWithoutIndexConstraintInTxn(
params.ctx,
n.tableDesc,
txn,
ie,
params.p.User(),
name,
)
}); err != nil {
return err
}
foundUnique.Validity = descpb.ConstraintValidity_Validated
Expand Down
Loading

0 comments on commit 468ac01

Please sign in to comment.