From dc0fed4882c01a0f645e10be315a0e57495bb49c Mon Sep 17 00:00:00 2001 From: Jane Xing Date: Mon, 27 Jun 2022 17:30:42 -0400 Subject: [PATCH] sql: introduce ieFactory.RunWithouTxn() and collectionFactory.TxnWithExecutor() This commit introduces two functions that allow users to run sql statements with an internal executor. We intend to limit the usage of a real internal executor only inside these functions, instead of free-floating or hanging off certain structs. In other words, we restrict the init of an internal executor. The motivation is that if an internal executor is used to run multiple sql statements in a txn manner, these executions are expected to use the same set of info (such as descriptor collections) among their conn executor. While this rule can be easily forgot by the user of internal executors. Hence we provide an interface that wraps the initialization of internal executors with the query executions, so that the users won't need to be worried. Informs: once all existing usages of the internal executors are replaced with the new interfaces proposed here, #70888 should be solved. Release note: None --- pkg/server/server_sql.go | 36 +++++++++++ pkg/sql/catalog/descs/factory.go | 23 +++++++ pkg/sql/catalog/descs/txn.go | 94 ++++++++++++++++++++++++++++ pkg/sql/internal.go | 91 +++++++++++++++++++++++++++ pkg/sql/sqlutil/internal_executor.go | 7 +++ 5 files changed, 251 insertions(+) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index c3b2c1f4188a..f802ba96824d 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -50,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/server/tracedumper" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfiglimiter" @@ -62,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/cacheutil" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descidgen" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" @@ -91,6 +93,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/startupmigrations" @@ -954,6 +957,39 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ieFactoryMonitor, ) + ieFactoryWithTxn := func( + ctx context.Context, + sd *sessiondata.SessionData, + sv *settings.Values, + txn *kv.Txn, + descCol *descs.Collection, + ) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) { + schemaChangeJobRecords := make(map[descpb.ID]*jobs.Record) + // By default, if not given session data, we initialize a sessionData that + // would be the same as what would be created if root logged in. + // The sessionData's user can be override when calling the query + // functions of internal executor. + // TODO(janexing): since we can be running queries with a higher privilege + // than the actual user, a security boundary should be added to the error + // handling of internal executor. + if sd == nil { + sd = sql.NewFakeSessionData(sv) + sd.UserProto = username.RootUserName().EncodeProto() + } + ie, commitTxnFunc := sql.NewInternalExecutorWithTxn( + pgServer.SQLServer, + sd, + txn, + internalMemMetrics, + ieFactoryMonitor, + descCol, + schemaChangeJobRecords, + ) + + return ie, commitTxnFunc + } + collectionFactory.SetInternalExecutorWithTxn(ieFactoryWithTxn) + distSQLServer.ServerConfig.InternalExecutorFactory = ieFactory jobRegistry.SetInternalExecutorFactory(ieFactory) execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg) diff --git a/pkg/sql/catalog/descs/factory.go b/pkg/sql/catalog/descs/factory.go index 655b38d1b5ca..ff551ddceb7a 100644 --- a/pkg/sql/catalog/descs/factory.go +++ b/pkg/sql/catalog/descs/factory.go @@ -14,11 +14,15 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -33,8 +37,19 @@ type CollectionFactory struct { spanConfigSplitter spanconfig.Splitter spanConfigLimiter spanconfig.Limiter defaultMonitor *mon.BytesMonitor + ieFactoryWithTxn InternalExecutorFactoryWithTxn } +// InternalExecutorFactoryWithTxn is used to create an internal executor +// with associated extra txn state information. +type InternalExecutorFactoryWithTxn func( + ctx context.Context, + sd *sessiondata.SessionData, + sv *settings.Values, + txn *kv.Txn, + descCol *Collection, +) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) + // NewCollectionFactory constructs a new CollectionFactory which holds onto // the node-level dependencies needed to construct a Collection. func NewCollectionFactory( @@ -84,3 +99,11 @@ func (cf *CollectionFactory) NewCollection( return newCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase, cf.virtualSchemas, temporarySchemaProvider, monitor) } + +// SetInternalExecutorWithTxn is to set the internal executor factory hanging +// off the collection factory. +func (cf *CollectionFactory) SetInternalExecutorWithTxn( + ieFactoryWithTxn InternalExecutorFactoryWithTxn, +) { + cf.ieFactoryWithTxn = ieFactoryWithTxn +} diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index 94659d833ae9..a4e27ef0407e 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -40,6 +41,7 @@ var errTwoVersionInvariantViolated = errors.Errorf("two version invariant violat // // The passed transaction is pre-emptively anchored to the system config key on // the system tenant. +// Deprecated: Use cf.TxnWithExecutor(). func (cf *CollectionFactory) Txn( ctx context.Context, ie sqlutil.InternalExecutor, @@ -113,6 +115,98 @@ func (cf *CollectionFactory) Txn( } } +// TxnWithExecutor enables callers to run transactions with a *Collection such that all +// retrieved immutable descriptors are properly leased and all mutable +// descriptors are handled. The function deals with verifying the two version +// invariant and retrying when it is violated. Callers need not worry that they +// write mutable descriptors multiple times. The call will explicitly wait for +// the leases to drain on old versions of descriptors modified or deleted in the +// transaction; callers do not need to call lease.WaitForOneVersion. +// It also enables using internal executor to run sql queries in a txn manner. +// +// The passed transaction is pre-emptively anchored to the system config key on +// the system tenant. +func (cf *CollectionFactory) TxnWithExecutor( + ctx context.Context, + db *kv.DB, + sd *sessiondata.SessionData, + f func(ctx context.Context, txn *kv.Txn, descriptors *Collection, ie sqlutil.InternalExecutor) error, +) error { + // Waits for descriptors that were modified, skipping + // over ones that had their descriptor wiped. + waitForDescriptors := func(modifiedDescriptors []lease.IDVersion, deletedDescs catalog.DescriptorIDSet) error { + // Wait for a single version on leased descriptors. + for _, ld := range modifiedDescriptors { + waitForNoVersion := deletedDescs.Contains(ld.ID) + retryOpts := retry.Options{ + InitialBackoff: time.Millisecond, + Multiplier: 1.5, + MaxBackoff: time.Second, + } + // Detect unpublished ones. + if waitForNoVersion { + err := cf.leaseMgr.WaitForNoVersion(ctx, ld.ID, retryOpts) + if err != nil { + return err + } + } else { + _, err := cf.leaseMgr.WaitForOneVersion(ctx, ld.ID, retryOpts) + if err != nil { + return err + } + } + } + return nil + } + for { + var modifiedDescriptors []lease.IDVersion + var deletedDescs catalog.DescriptorIDSet + var descsCol *Collection + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + modifiedDescriptors = nil + deletedDescs = catalog.DescriptorIDSet{} + descsCol = cf.NewCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */) + defer func() { + descsCol.ReleaseAll(ctx) + }() + + ie, commitTxnFn := cf.ieFactoryWithTxn(ctx, sd, &cf.settings.SV, txn, descsCol) + if err := f(ctx, txn, descsCol, ie); err != nil { + return err + } + + if err := commitTxnFn(ctx); err != nil { + return err + } + + if err := descsCol.ValidateUncommittedDescriptors(ctx, txn); err != nil { + return err + } + modifiedDescriptors = descsCol.GetDescriptorsWithNewVersion() + + if err := CheckSpanCountLimit( + ctx, descsCol, cf.spanConfigSplitter, cf.spanConfigLimiter, txn, + ); err != nil { + return err + } + retryErr, err := CheckTwoVersionInvariant( + ctx, db.Clock(), ie, descsCol, txn, nil /* onRetryBackoff */) + if retryErr { + return errTwoVersionInvariantViolated + } + deletedDescs = descsCol.deletedDescs + return err + }); errors.Is(err, errTwoVersionInvariantViolated) { + continue + } else { + if err == nil { + err = waitForDescriptors(modifiedDescriptors, deletedDescs) + } + return err + } + } +} + // CheckTwoVersionInvariant checks whether any new schema being modified written // at a version V has only valid leases at version = V - 1. A transaction retry // error as well as a boolean is returned whenever the invariant is violated. diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index e3afb6a90d85..adf8c9c6a680 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -120,6 +120,52 @@ func MakeInternalExecutor( } } +// NewInternalExecutorWithTxn creates an Internal Executor with txn related +// information, and also a function that can be called to commit the txn. +// This function should only be used in the implementation of +// descs.CollectionFactory's InternalExecutorFactoryWithTxn. +// TODO (janexing): This function will be soon refactored after we change +// the internal executor infrastructure with a single conn executor for all +// sql statement executions within a txn. +func NewInternalExecutorWithTxn( + s *Server, + sd *sessiondata.SessionData, + txn *kv.Txn, + memMetrics MemoryMetrics, + monitor *mon.BytesMonitor, + descCol *descs.Collection, + schemaChangeJobRecords map[descpb.ID]*jobs.Record, +) (*InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) { + schemaChangerState := &SchemaChangerState{ + mode: sd.NewSchemaChangerMode, + } + ie := InternalExecutor{ + s: s, + mon: monitor, + memMetrics: memMetrics, + extraTxnState: &extraTxnState{ + txn: txn, + descCollection: descCol, + schemaChangeJobRecords: schemaChangeJobRecords, + schemaChangerState: schemaChangerState, + }, + } + ie.s.populateMinimalSessionData(sd) + ie.sessionDataStack = sessiondata.NewStack(sd) + + commitTxnFunc := func(ctx context.Context) error { + defer func() { + ie.releaseSchemaChangeJobRecords() + }() + if err := ie.commitTxn(ctx); err != nil { + return err + } + return nil + } + + return &ie, commitTxnFunc +} + // MakeInternalExecutorMemMonitor creates and starts memory monitor for an // InternalExecutor. func MakeInternalExecutorMemMonitor( @@ -966,6 +1012,42 @@ func (ie *InternalExecutor) execInternal( return r, nil } +// ReleaseSchemaChangeJobRecords is to release the schema change job records. +func (ie *InternalExecutor) releaseSchemaChangeJobRecords() { + for k := range ie.extraTxnState.schemaChangeJobRecords { + delete(ie.extraTxnState.schemaChangeJobRecords, k) + } +} + +// commitTxn is to commit the txn bound to the internal executor. +// It should only be used in CollectionFactory.TxnWithExecutor(). +func (ie *InternalExecutor) commitTxn(ctx context.Context) error { + if ie.extraTxnState == nil || ie.extraTxnState.txn == nil { + return errors.New("no txn to commit") + } + + var sd *sessiondata.SessionData + if ie.sessionDataStack != nil { + sd = ie.sessionDataStack.Top().Clone() + } else { + sd = ie.s.newSessionData(SessionArgs{}) + } + + rw := newAsyncIEResultChannel() + stmtBuf := NewStmtBuf() + + ex, err := ie.initConnEx(ctx, ie.extraTxnState.txn, rw, sd, stmtBuf, nil /* syncCallback */) + if err != nil { + return errors.Wrap(err, "cannot create conn executor to commit txn") + } + defer ex.close(ctx, externalTxnClose) + + if err := ex.commitSQLTransactionInternal(ctx); err != nil { + return err + } + return nil +} + // internalClientComm is an implementation of ClientComm used by the // InternalExecutor. Result rows are buffered in memory. type internalClientComm struct { @@ -1158,3 +1240,12 @@ func (ief *InternalExecutorFactory) NewInternalExecutor( ie.SetSessionData(sd) return &ie } + +// RunWithoutTxn is to create an internal executor without binding to a txn, +// and run the passed function with this internal executor. +func (ief *InternalExecutorFactory) RunWithoutTxn( + ctx context.Context, run func(ctx context.Context, ie sqlutil.InternalExecutor) error, +) error { + ie := ief.NewInternalExecutor(nil /* sessionData */) + return run(ctx, ie) +} diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index 1825c8051957..c94f183b2b6c 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -214,6 +214,9 @@ type InternalExecutorFactory interface { // NewInternalExecutor constructs a new internal executor. // TODO (janexing): this should be deprecated soon. NewInternalExecutor(sd *sessiondata.SessionData) InternalExecutor + // RunWithoutTxn is to create an internal executor without binding to a txn, + // and run the passed function with this internal executor. + RunWithoutTxn(ctx context.Context, run func(ctx context.Context, ie InternalExecutor) error) error } // InternalExecFn is the type of functions that operates using an internalExecutor. @@ -223,3 +226,7 @@ type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor) // passes the fn the exported InternalExecutor instead of the whole unexported // extendedEvalContenxt, so it can be implemented outside pkg/sql. type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error + +// InternalExecutorCommitTxnFunc is to commit the txn associated with an +// internal executor. +type InternalExecutorCommitTxnFunc func(ctx context.Context) error