Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: introduce new internal executor interfaces #82477

Merged
merged 7 commits into from
Aug 12, 2022
3 changes: 3 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/startupmigrations"
Expand Down Expand Up @@ -954,6 +955,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ieFactoryMonitor,
)

collectionFactory.SetInternalExecutorWithTxn(ieFactory)

distSQLServer.ServerConfig.InternalExecutorFactory = ieFactory
jobRegistry.SetInternalExecutorFactory(ieFactory)
execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg)
Expand Down
25 changes: 25 additions & 0 deletions pkg/sql/catalog/descs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)

Expand All @@ -33,6 +37,19 @@ type CollectionFactory struct {
spanConfigSplitter spanconfig.Splitter
spanConfigLimiter spanconfig.Limiter
defaultMonitor *mon.BytesMonitor
ieFactoryWithTxn InternalExecutorFactoryWithTxn
}

// InternalExecutorFactoryWithTxn is used to create an internal executor
// with associated extra txn state information.
// It should only be used as a field hanging off CollectionFactory.
type InternalExecutorFactoryWithTxn interface {
NewInternalExecutorWithTxn(
sd *sessiondata.SessionData,
sv *settings.Values,
txn *kv.Txn,
descCol *Collection,
) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc)
}

// NewCollectionFactory constructs a new CollectionFactory which holds onto
Expand Down Expand Up @@ -84,3 +101,11 @@ func (cf *CollectionFactory) NewCollection(
return newCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase,
cf.virtualSchemas, temporarySchemaProvider, monitor)
}

// SetInternalExecutorWithTxn is to set the internal executor factory hanging
// off the collection factory.
func (cf *CollectionFactory) SetInternalExecutorWithTxn(
ieFactoryWithTxn InternalExecutorFactoryWithTxn,
) {
cf.ieFactoryWithTxn = ieFactoryWithTxn
}
94 changes: 94 additions & 0 deletions pkg/sql/catalog/descs/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand All @@ -40,6 +41,7 @@ var errTwoVersionInvariantViolated = errors.Errorf("two version invariant violat
//
// The passed transaction is pre-emptively anchored to the system config key on
// the system tenant.
// Deprecated: Use cf.TxnWithExecutor().
func (cf *CollectionFactory) Txn(
ctx context.Context,
ie sqlutil.InternalExecutor,
Expand Down Expand Up @@ -113,6 +115,98 @@ func (cf *CollectionFactory) Txn(
}
}

// TxnWithExecutor enables callers to run transactions with a *Collection such that all
// retrieved immutable descriptors are properly leased and all mutable
// descriptors are handled. The function deals with verifying the two version
// invariant and retrying when it is violated. Callers need not worry that they
// write mutable descriptors multiple times. The call will explicitly wait for
// the leases to drain on old versions of descriptors modified or deleted in the
// transaction; callers do not need to call lease.WaitForOneVersion.
// It also enables using internal executor to run sql queries in a txn manner.
//
// The passed transaction is pre-emptively anchored to the system config key on
// the system tenant.
func (cf *CollectionFactory) TxnWithExecutor(
ctx context.Context,
db *kv.DB,
sd *sessiondata.SessionData,
f func(ctx context.Context, txn *kv.Txn, descriptors *Collection, ie sqlutil.InternalExecutor) error,
) error {
// Waits for descriptors that were modified, skipping
// over ones that had their descriptor wiped.
waitForDescriptors := func(modifiedDescriptors []lease.IDVersion, deletedDescs catalog.DescriptorIDSet) error {
// Wait for a single version on leased descriptors.
for _, ld := range modifiedDescriptors {
waitForNoVersion := deletedDescs.Contains(ld.ID)
retryOpts := retry.Options{
InitialBackoff: time.Millisecond,
Multiplier: 1.5,
MaxBackoff: time.Second,
}
// Detect unpublished ones.
if waitForNoVersion {
err := cf.leaseMgr.WaitForNoVersion(ctx, ld.ID, retryOpts)
if err != nil {
return err
}
} else {
_, err := cf.leaseMgr.WaitForOneVersion(ctx, ld.ID, retryOpts)
if err != nil {
return err
}
}
}
return nil
}
for {
var modifiedDescriptors []lease.IDVersion
var deletedDescs catalog.DescriptorIDSet
var descsCol *Collection
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
modifiedDescriptors = nil
deletedDescs = catalog.DescriptorIDSet{}
descsCol = cf.NewCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */)
defer func() {
descsCol.ReleaseAll(ctx)
}()

ie, commitTxnFn := cf.ieFactoryWithTxn.NewInternalExecutorWithTxn(sd, &cf.settings.SV, txn, descsCol)
if err := f(ctx, txn, descsCol, ie); err != nil {
return err
}

if err := commitTxnFn(ctx); err != nil {
return err
}

if err := descsCol.ValidateUncommittedDescriptors(ctx, txn); err != nil {
return err
}
modifiedDescriptors = descsCol.GetDescriptorsWithNewVersion()

if err := CheckSpanCountLimit(
ctx, descsCol, cf.spanConfigSplitter, cf.spanConfigLimiter, txn,
); err != nil {
return err
}
retryErr, err := CheckTwoVersionInvariant(
ctx, db.Clock(), ie, descsCol, txn, nil /* onRetryBackoff */)
if retryErr {
return errTwoVersionInvariantViolated
}
deletedDescs = descsCol.deletedDescs
return err
}); errors.Is(err, errTwoVersionInvariantViolated) {
continue
} else {
if err == nil {
err = waitForDescriptors(modifiedDescriptors, deletedDescs)
}
return err
}
}
}

// CheckTwoVersionInvariant checks whether any new schema being modified written
// at a version V has only valid leases at version = V - 1. A transaction retry
// error as well as a boolean is returned whenever the invariant is violated.
Expand Down
128 changes: 128 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
Expand Down Expand Up @@ -120,6 +121,52 @@ func MakeInternalExecutor(
}
}

// newInternalExecutorWithTxn creates an Internal Executor with txn related
// information, and also a function that can be called to commit the txn.
// This function should only be used in the implementation of
// descs.CollectionFactory's InternalExecutorFactoryWithTxn.
// TODO (janexing): This function will be soon refactored after we change
// the internal executor infrastructure with a single conn executor for all
// sql statement executions within a txn.
func newInternalExecutorWithTxn(
s *Server,
sd *sessiondata.SessionData,
txn *kv.Txn,
memMetrics MemoryMetrics,
monitor *mon.BytesMonitor,
descCol *descs.Collection,
schemaChangeJobRecords map[descpb.ID]*jobs.Record,
) (*InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) {
schemaChangerState := &SchemaChangerState{
mode: sd.NewSchemaChangerMode,
}
ie := InternalExecutor{
s: s,
mon: monitor,
memMetrics: memMetrics,
extraTxnState: &extraTxnState{
txn: txn,
descCollection: descCol,
schemaChangeJobRecords: schemaChangeJobRecords,
schemaChangerState: schemaChangerState,
},
}
ie.s.populateMinimalSessionData(sd)
ie.sessionDataStack = sessiondata.NewStack(sd)

commitTxnFunc := func(ctx context.Context) error {
defer func() {
ie.releaseSchemaChangeJobRecords()
}()
if err := ie.commitTxn(ctx); err != nil {
return err
}
return nil
}

return &ie, commitTxnFunc
}

// MakeInternalExecutorMemMonitor creates and starts memory monitor for an
// InternalExecutor.
func MakeInternalExecutorMemMonitor(
Expand Down Expand Up @@ -966,6 +1013,42 @@ func (ie *InternalExecutor) execInternal(
return r, nil
}

// ReleaseSchemaChangeJobRecords is to release the schema change job records.
func (ie *InternalExecutor) releaseSchemaChangeJobRecords() {
for k := range ie.extraTxnState.schemaChangeJobRecords {
delete(ie.extraTxnState.schemaChangeJobRecords, k)
}
}

// commitTxn is to commit the txn bound to the internal executor.
// It should only be used in CollectionFactory.TxnWithExecutor().
func (ie *InternalExecutor) commitTxn(ctx context.Context) error {
if ie.extraTxnState == nil || ie.extraTxnState.txn == nil {
return errors.New("no txn to commit")
}

var sd *sessiondata.SessionData
if ie.sessionDataStack != nil {
sd = ie.sessionDataStack.Top().Clone()
} else {
sd = ie.s.newSessionData(SessionArgs{})
}

rw := newAsyncIEResultChannel()
stmtBuf := NewStmtBuf()

ex, err := ie.initConnEx(ctx, ie.extraTxnState.txn, rw, sd, stmtBuf, nil /* syncCallback */)
if err != nil {
return errors.Wrap(err, "cannot create conn executor to commit txn")
}
defer ex.close(ctx, externalTxnClose)

if err := ex.commitSQLTransactionInternal(ctx); err != nil {
return err
}
return nil
}

// internalClientComm is an implementation of ClientComm used by the
// InternalExecutor. Result rows are buffered in memory.
type internalClientComm struct {
Expand Down Expand Up @@ -1149,6 +1232,9 @@ func NewInternalExecutorFactory(
}
}

var _ sqlutil.InternalExecutorFactory = &InternalExecutorFactory{}
var _ descs.InternalExecutorFactoryWithTxn = &InternalExecutorFactory{}

// NewInternalExecutor constructs a new internal executor.
// TODO (janexing): this should be deprecated soon.
func (ief *InternalExecutorFactory) NewInternalExecutor(
Expand All @@ -1158,3 +1244,45 @@ func (ief *InternalExecutorFactory) NewInternalExecutor(
ie.SetSessionData(sd)
return &ie
}

// NewInternalExecutorWithTxn creates an internal executor with txn-related info,
// such as descriptor collection and schema change job records, etc. It should
// be called only after InternalExecutorFactory.NewInternalExecutor is already
// called to construct the InternalExecutorFactory with required server info.
// This function should only be used under CollectionFactory.TxnWithExecutor().
func (ief *InternalExecutorFactory) NewInternalExecutorWithTxn(
sd *sessiondata.SessionData, sv *settings.Values, txn *kv.Txn, descCol *descs.Collection,
) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) {
schemaChangeJobRecords := make(map[descpb.ID]*jobs.Record)
// By default, if not given session data, we initialize a sessionData that
// would be the same as what would be created if root logged in.
// The sessionData's user can be override when calling the query
// functions of internal executor.
// TODO(janexing): since we can be running queries with a higher privilege
// than the actual user, a security boundary should be added to the error
// handling of internal executor.
if sd == nil {
sd = NewFakeSessionData(sv)
sd.UserProto = username.RootUserName().EncodeProto()
}
ie, commitTxnFunc := newInternalExecutorWithTxn(
ief.server,
sd,
txn,
ief.memMetrics,
ief.monitor,
descCol,
schemaChangeJobRecords,
)

return ie, commitTxnFunc
}

// RunWithoutTxn is to create an internal executor without binding to a txn,
// and run the passed function with this internal executor.
func (ief *InternalExecutorFactory) RunWithoutTxn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this method necessary / how is it different from NewInternalExecutor() ? Can't the caller run exactly the two lines inside this function?
The comment talks about "binding to a txn" without other explanations. I don't think either fewer or more words would be needed since most callers don't care about that / don't know what "binding to a txn" means.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this function is far from ideal... We made this function with the hope of avoiding use cases where an internal executor is created without binding to any txn-related metadata, but is used to run queries with a not nil-txn. In other words, we wanted to make the usages "with" and "without" an outer txn more distinct from each other, and let callers think twice about which one they should use.
I think we can add a comment saying that it's disallowed to use this function to run DDLs or multiple statement in a transactional manner.

Copy link
Contributor

@andreimatei andreimatei Aug 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can have a NewInternalExecutorWithoutTxn if you insist on the importance of having the "without part" in your face, but I don't see why the caller needs to structure its logic into a closure if they're not getting anything in return.

Copy link
Collaborator Author

@ZhouXing19 ZhouXing19 Aug 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we only have NewInternalExecutorWithoutTxn, it can still happen to the caller to use it to run statements with a not-nil txn, which is wrong. To wrap it in this function is to make it more explicit that you shouldn't do this (though it's true that we can't truly disallow it here)

I think the ideal case is to remove the txn field in internal executor's query functions (e.g. ie.QueryRowEx()). The txn should be bound to the internal executor, rather than each statement execution. With that, I think it's totally fine for us to remove this function and just do ie := NewInternalExecutorWithoutTxn ()

ctx context.Context, run func(ctx context.Context, ie sqlutil.InternalExecutor) error,
) error {
ie := ief.NewInternalExecutor(nil /* sessionData */)
return run(ctx, ie)
}
7 changes: 7 additions & 0 deletions pkg/sql/sqlutil/internal_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ type InternalExecutorFactory interface {
// NewInternalExecutor constructs a new internal executor.
// TODO (janexing): this should be deprecated soon.
NewInternalExecutor(sd *sessiondata.SessionData) InternalExecutor
// RunWithoutTxn is to create an internal executor without binding to a txn,
// and run the passed function with this internal executor.
RunWithoutTxn(ctx context.Context, run func(ctx context.Context, ie InternalExecutor) error) error
}

// InternalExecFn is the type of functions that operates using an internalExecutor.
Expand All @@ -223,3 +226,7 @@ type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor)
// passes the fn the exported InternalExecutor instead of the whole unexported
// extendedEvalContenxt, so it can be implemented outside pkg/sql.
type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error

// InternalExecutorCommitTxnFunc is to commit the txn associated with an
// internal executor.
type InternalExecutorCommitTxnFunc func(ctx context.Context) error