diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index c3b2c1f4188a..f802ba96824d 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -50,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/status" "github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher" "github.com/cockroachdb/cockroach/pkg/server/tracedumper" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfiglimiter" @@ -62,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/cacheutil" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descidgen" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" @@ -91,6 +93,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/startupmigrations" @@ -954,6 +957,39 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { ieFactoryMonitor, ) + ieFactoryWithTxn := func( + ctx context.Context, + sd *sessiondata.SessionData, + sv *settings.Values, + txn *kv.Txn, + descCol *descs.Collection, + ) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) { + schemaChangeJobRecords := make(map[descpb.ID]*jobs.Record) + // By default, if not given session data, we initialize a sessionData that + // would be the same as what would be created if root logged in. + // The sessionData's user can be override when calling the query + // functions of internal executor. + // TODO(janexing): since we can be running queries with a higher privilege + // than the actual user, a security boundary should be added to the error + // handling of internal executor. + if sd == nil { + sd = sql.NewFakeSessionData(sv) + sd.UserProto = username.RootUserName().EncodeProto() + } + ie, commitTxnFunc := sql.NewInternalExecutorWithTxn( + pgServer.SQLServer, + sd, + txn, + internalMemMetrics, + ieFactoryMonitor, + descCol, + schemaChangeJobRecords, + ) + + return ie, commitTxnFunc + } + collectionFactory.SetInternalExecutorWithTxn(ieFactoryWithTxn) + distSQLServer.ServerConfig.InternalExecutorFactory = ieFactory jobRegistry.SetInternalExecutorFactory(ieFactory) execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg) diff --git a/pkg/sql/catalog/descs/factory.go b/pkg/sql/catalog/descs/factory.go index 655b38d1b5ca..ff551ddceb7a 100644 --- a/pkg/sql/catalog/descs/factory.go +++ b/pkg/sql/catalog/descs/factory.go @@ -14,11 +14,15 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/mon" ) @@ -33,8 +37,19 @@ type CollectionFactory struct { spanConfigSplitter spanconfig.Splitter spanConfigLimiter spanconfig.Limiter defaultMonitor *mon.BytesMonitor + ieFactoryWithTxn InternalExecutorFactoryWithTxn } +// InternalExecutorFactoryWithTxn is used to create an internal executor +// with associated extra txn state information. +type InternalExecutorFactoryWithTxn func( + ctx context.Context, + sd *sessiondata.SessionData, + sv *settings.Values, + txn *kv.Txn, + descCol *Collection, +) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) + // NewCollectionFactory constructs a new CollectionFactory which holds onto // the node-level dependencies needed to construct a Collection. func NewCollectionFactory( @@ -84,3 +99,11 @@ func (cf *CollectionFactory) NewCollection( return newCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase, cf.virtualSchemas, temporarySchemaProvider, monitor) } + +// SetInternalExecutorWithTxn is to set the internal executor factory hanging +// off the collection factory. +func (cf *CollectionFactory) SetInternalExecutorWithTxn( + ieFactoryWithTxn InternalExecutorFactoryWithTxn, +) { + cf.ieFactoryWithTxn = ieFactoryWithTxn +} diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index 94659d833ae9..a4e27ef0407e 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/retry" @@ -40,6 +41,7 @@ var errTwoVersionInvariantViolated = errors.Errorf("two version invariant violat // // The passed transaction is pre-emptively anchored to the system config key on // the system tenant. +// Deprecated: Use cf.TxnWithExecutor(). func (cf *CollectionFactory) Txn( ctx context.Context, ie sqlutil.InternalExecutor, @@ -113,6 +115,98 @@ func (cf *CollectionFactory) Txn( } } +// TxnWithExecutor enables callers to run transactions with a *Collection such that all +// retrieved immutable descriptors are properly leased and all mutable +// descriptors are handled. The function deals with verifying the two version +// invariant and retrying when it is violated. Callers need not worry that they +// write mutable descriptors multiple times. The call will explicitly wait for +// the leases to drain on old versions of descriptors modified or deleted in the +// transaction; callers do not need to call lease.WaitForOneVersion. +// It also enables using internal executor to run sql queries in a txn manner. +// +// The passed transaction is pre-emptively anchored to the system config key on +// the system tenant. +func (cf *CollectionFactory) TxnWithExecutor( + ctx context.Context, + db *kv.DB, + sd *sessiondata.SessionData, + f func(ctx context.Context, txn *kv.Txn, descriptors *Collection, ie sqlutil.InternalExecutor) error, +) error { + // Waits for descriptors that were modified, skipping + // over ones that had their descriptor wiped. + waitForDescriptors := func(modifiedDescriptors []lease.IDVersion, deletedDescs catalog.DescriptorIDSet) error { + // Wait for a single version on leased descriptors. + for _, ld := range modifiedDescriptors { + waitForNoVersion := deletedDescs.Contains(ld.ID) + retryOpts := retry.Options{ + InitialBackoff: time.Millisecond, + Multiplier: 1.5, + MaxBackoff: time.Second, + } + // Detect unpublished ones. + if waitForNoVersion { + err := cf.leaseMgr.WaitForNoVersion(ctx, ld.ID, retryOpts) + if err != nil { + return err + } + } else { + _, err := cf.leaseMgr.WaitForOneVersion(ctx, ld.ID, retryOpts) + if err != nil { + return err + } + } + } + return nil + } + for { + var modifiedDescriptors []lease.IDVersion + var deletedDescs catalog.DescriptorIDSet + var descsCol *Collection + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + modifiedDescriptors = nil + deletedDescs = catalog.DescriptorIDSet{} + descsCol = cf.NewCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */) + defer func() { + descsCol.ReleaseAll(ctx) + }() + + ie, commitTxnFn := cf.ieFactoryWithTxn(ctx, sd, &cf.settings.SV, txn, descsCol) + if err := f(ctx, txn, descsCol, ie); err != nil { + return err + } + + if err := commitTxnFn(ctx); err != nil { + return err + } + + if err := descsCol.ValidateUncommittedDescriptors(ctx, txn); err != nil { + return err + } + modifiedDescriptors = descsCol.GetDescriptorsWithNewVersion() + + if err := CheckSpanCountLimit( + ctx, descsCol, cf.spanConfigSplitter, cf.spanConfigLimiter, txn, + ); err != nil { + return err + } + retryErr, err := CheckTwoVersionInvariant( + ctx, db.Clock(), ie, descsCol, txn, nil /* onRetryBackoff */) + if retryErr { + return errTwoVersionInvariantViolated + } + deletedDescs = descsCol.deletedDescs + return err + }); errors.Is(err, errTwoVersionInvariantViolated) { + continue + } else { + if err == nil { + err = waitForDescriptors(modifiedDescriptors, deletedDescs) + } + return err + } + } +} + // CheckTwoVersionInvariant checks whether any new schema being modified written // at a version V has only valid leases at version = V - 1. A transaction retry // error as well as a boolean is returned whenever the invariant is violated. diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index e3afb6a90d85..adf8c9c6a680 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -120,6 +120,52 @@ func MakeInternalExecutor( } } +// NewInternalExecutorWithTxn creates an Internal Executor with txn related +// information, and also a function that can be called to commit the txn. +// This function should only be used in the implementation of +// descs.CollectionFactory's InternalExecutorFactoryWithTxn. +// TODO (janexing): This function will be soon refactored after we change +// the internal executor infrastructure with a single conn executor for all +// sql statement executions within a txn. +func NewInternalExecutorWithTxn( + s *Server, + sd *sessiondata.SessionData, + txn *kv.Txn, + memMetrics MemoryMetrics, + monitor *mon.BytesMonitor, + descCol *descs.Collection, + schemaChangeJobRecords map[descpb.ID]*jobs.Record, +) (*InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) { + schemaChangerState := &SchemaChangerState{ + mode: sd.NewSchemaChangerMode, + } + ie := InternalExecutor{ + s: s, + mon: monitor, + memMetrics: memMetrics, + extraTxnState: &extraTxnState{ + txn: txn, + descCollection: descCol, + schemaChangeJobRecords: schemaChangeJobRecords, + schemaChangerState: schemaChangerState, + }, + } + ie.s.populateMinimalSessionData(sd) + ie.sessionDataStack = sessiondata.NewStack(sd) + + commitTxnFunc := func(ctx context.Context) error { + defer func() { + ie.releaseSchemaChangeJobRecords() + }() + if err := ie.commitTxn(ctx); err != nil { + return err + } + return nil + } + + return &ie, commitTxnFunc +} + // MakeInternalExecutorMemMonitor creates and starts memory monitor for an // InternalExecutor. func MakeInternalExecutorMemMonitor( @@ -966,6 +1012,42 @@ func (ie *InternalExecutor) execInternal( return r, nil } +// ReleaseSchemaChangeJobRecords is to release the schema change job records. +func (ie *InternalExecutor) releaseSchemaChangeJobRecords() { + for k := range ie.extraTxnState.schemaChangeJobRecords { + delete(ie.extraTxnState.schemaChangeJobRecords, k) + } +} + +// commitTxn is to commit the txn bound to the internal executor. +// It should only be used in CollectionFactory.TxnWithExecutor(). +func (ie *InternalExecutor) commitTxn(ctx context.Context) error { + if ie.extraTxnState == nil || ie.extraTxnState.txn == nil { + return errors.New("no txn to commit") + } + + var sd *sessiondata.SessionData + if ie.sessionDataStack != nil { + sd = ie.sessionDataStack.Top().Clone() + } else { + sd = ie.s.newSessionData(SessionArgs{}) + } + + rw := newAsyncIEResultChannel() + stmtBuf := NewStmtBuf() + + ex, err := ie.initConnEx(ctx, ie.extraTxnState.txn, rw, sd, stmtBuf, nil /* syncCallback */) + if err != nil { + return errors.Wrap(err, "cannot create conn executor to commit txn") + } + defer ex.close(ctx, externalTxnClose) + + if err := ex.commitSQLTransactionInternal(ctx); err != nil { + return err + } + return nil +} + // internalClientComm is an implementation of ClientComm used by the // InternalExecutor. Result rows are buffered in memory. type internalClientComm struct { @@ -1158,3 +1240,12 @@ func (ief *InternalExecutorFactory) NewInternalExecutor( ie.SetSessionData(sd) return &ie } + +// RunWithoutTxn is to create an internal executor without binding to a txn, +// and run the passed function with this internal executor. +func (ief *InternalExecutorFactory) RunWithoutTxn( + ctx context.Context, run func(ctx context.Context, ie sqlutil.InternalExecutor) error, +) error { + ie := ief.NewInternalExecutor(nil /* sessionData */) + return run(ctx, ie) +} diff --git a/pkg/sql/sqlutil/internal_executor.go b/pkg/sql/sqlutil/internal_executor.go index 1825c8051957..c94f183b2b6c 100644 --- a/pkg/sql/sqlutil/internal_executor.go +++ b/pkg/sql/sqlutil/internal_executor.go @@ -214,6 +214,9 @@ type InternalExecutorFactory interface { // NewInternalExecutor constructs a new internal executor. // TODO (janexing): this should be deprecated soon. NewInternalExecutor(sd *sessiondata.SessionData) InternalExecutor + // RunWithoutTxn is to create an internal executor without binding to a txn, + // and run the passed function with this internal executor. + RunWithoutTxn(ctx context.Context, run func(ctx context.Context, ie InternalExecutor) error) error } // InternalExecFn is the type of functions that operates using an internalExecutor. @@ -223,3 +226,7 @@ type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor) // passes the fn the exported InternalExecutor instead of the whole unexported // extendedEvalContenxt, so it can be implemented outside pkg/sql. type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error + +// InternalExecutorCommitTxnFunc is to commit the txn associated with an +// internal executor. +type InternalExecutorCommitTxnFunc func(ctx context.Context) error