Skip to content

Commit

Permalink
sql: introduce ieFactory.RunWithouTxn() and collectionFactory.TxnWith…
Browse files Browse the repository at this point in the history
…Executor()

This commit introduces two functions that allow users to run sql statements
with an internal executor. We intend to limit the usage of a real
internal executor only inside these functions, instead of free-floating
or hanging off certain structs. In other words, we restrict the init of an
internal executor.

The motivation is that if an internal executor is used to run multiple sql
statements in a txn manner, these executions are expected to use the same set of
info (such as descriptor collections) among their conn executor.
While this rule can be easily forgot by the user of internal executors.
Hence we provide an interface that wraps the initialization of internal executors
with the query executions, so that the users won't need to be worried.

Informs: once all existing usages of the internal executors are replaced with
the new interfaces proposed here, cockroachdb#70888 should be solved.

Release note: None
  • Loading branch information
ZhouXing19 committed Aug 10, 2022
1 parent b101651 commit dc0fed4
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 0 deletions.
36 changes: 36 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/status"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher"
"github.com/cockroachdb/cockroach/pkg/server/tracedumper"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfiglimiter"
Expand All @@ -62,6 +63,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/cacheutil"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descidgen"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
Expand Down Expand Up @@ -91,6 +93,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/stats"
"github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics"
"github.com/cockroachdb/cockroach/pkg/startupmigrations"
Expand Down Expand Up @@ -954,6 +957,39 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
ieFactoryMonitor,
)

ieFactoryWithTxn := func(
ctx context.Context,
sd *sessiondata.SessionData,
sv *settings.Values,
txn *kv.Txn,
descCol *descs.Collection,
) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) {
schemaChangeJobRecords := make(map[descpb.ID]*jobs.Record)
// By default, if not given session data, we initialize a sessionData that
// would be the same as what would be created if root logged in.
// The sessionData's user can be override when calling the query
// functions of internal executor.
// TODO(janexing): since we can be running queries with a higher privilege
// than the actual user, a security boundary should be added to the error
// handling of internal executor.
if sd == nil {
sd = sql.NewFakeSessionData(sv)
sd.UserProto = username.RootUserName().EncodeProto()
}
ie, commitTxnFunc := sql.NewInternalExecutorWithTxn(
pgServer.SQLServer,
sd,
txn,
internalMemMetrics,
ieFactoryMonitor,
descCol,
schemaChangeJobRecords,
)

return ie, commitTxnFunc
}
collectionFactory.SetInternalExecutorWithTxn(ieFactoryWithTxn)

distSQLServer.ServerConfig.InternalExecutorFactory = ieFactory
jobRegistry.SetInternalExecutorFactory(ieFactory)
execCfg.IndexBackfiller = sql.NewIndexBackfiller(execCfg)
Expand Down
23 changes: 23 additions & 0 deletions pkg/sql/catalog/descs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)

Expand All @@ -33,8 +37,19 @@ type CollectionFactory struct {
spanConfigSplitter spanconfig.Splitter
spanConfigLimiter spanconfig.Limiter
defaultMonitor *mon.BytesMonitor
ieFactoryWithTxn InternalExecutorFactoryWithTxn
}

// InternalExecutorFactoryWithTxn is used to create an internal executor
// with associated extra txn state information.
type InternalExecutorFactoryWithTxn func(
ctx context.Context,
sd *sessiondata.SessionData,
sv *settings.Values,
txn *kv.Txn,
descCol *Collection,
) (sqlutil.InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc)

// NewCollectionFactory constructs a new CollectionFactory which holds onto
// the node-level dependencies needed to construct a Collection.
func NewCollectionFactory(
Expand Down Expand Up @@ -84,3 +99,11 @@ func (cf *CollectionFactory) NewCollection(
return newCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase,
cf.virtualSchemas, temporarySchemaProvider, monitor)
}

// SetInternalExecutorWithTxn is to set the internal executor factory hanging
// off the collection factory.
func (cf *CollectionFactory) SetInternalExecutorWithTxn(
ieFactoryWithTxn InternalExecutorFactoryWithTxn,
) {
cf.ieFactoryWithTxn = ieFactoryWithTxn
}
94 changes: 94 additions & 0 deletions pkg/sql/catalog/descs/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand All @@ -40,6 +41,7 @@ var errTwoVersionInvariantViolated = errors.Errorf("two version invariant violat
//
// The passed transaction is pre-emptively anchored to the system config key on
// the system tenant.
// Deprecated: Use cf.TxnWithExecutor().
func (cf *CollectionFactory) Txn(
ctx context.Context,
ie sqlutil.InternalExecutor,
Expand Down Expand Up @@ -113,6 +115,98 @@ func (cf *CollectionFactory) Txn(
}
}

// TxnWithExecutor enables callers to run transactions with a *Collection such that all
// retrieved immutable descriptors are properly leased and all mutable
// descriptors are handled. The function deals with verifying the two version
// invariant and retrying when it is violated. Callers need not worry that they
// write mutable descriptors multiple times. The call will explicitly wait for
// the leases to drain on old versions of descriptors modified or deleted in the
// transaction; callers do not need to call lease.WaitForOneVersion.
// It also enables using internal executor to run sql queries in a txn manner.
//
// The passed transaction is pre-emptively anchored to the system config key on
// the system tenant.
func (cf *CollectionFactory) TxnWithExecutor(
ctx context.Context,
db *kv.DB,
sd *sessiondata.SessionData,
f func(ctx context.Context, txn *kv.Txn, descriptors *Collection, ie sqlutil.InternalExecutor) error,
) error {
// Waits for descriptors that were modified, skipping
// over ones that had their descriptor wiped.
waitForDescriptors := func(modifiedDescriptors []lease.IDVersion, deletedDescs catalog.DescriptorIDSet) error {
// Wait for a single version on leased descriptors.
for _, ld := range modifiedDescriptors {
waitForNoVersion := deletedDescs.Contains(ld.ID)
retryOpts := retry.Options{
InitialBackoff: time.Millisecond,
Multiplier: 1.5,
MaxBackoff: time.Second,
}
// Detect unpublished ones.
if waitForNoVersion {
err := cf.leaseMgr.WaitForNoVersion(ctx, ld.ID, retryOpts)
if err != nil {
return err
}
} else {
_, err := cf.leaseMgr.WaitForOneVersion(ctx, ld.ID, retryOpts)
if err != nil {
return err
}
}
}
return nil
}
for {
var modifiedDescriptors []lease.IDVersion
var deletedDescs catalog.DescriptorIDSet
var descsCol *Collection
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
modifiedDescriptors = nil
deletedDescs = catalog.DescriptorIDSet{}
descsCol = cf.NewCollection(ctx, nil /* temporarySchemaProvider */, nil /* monitor */)
defer func() {
descsCol.ReleaseAll(ctx)
}()

ie, commitTxnFn := cf.ieFactoryWithTxn(ctx, sd, &cf.settings.SV, txn, descsCol)
if err := f(ctx, txn, descsCol, ie); err != nil {
return err
}

if err := commitTxnFn(ctx); err != nil {
return err
}

if err := descsCol.ValidateUncommittedDescriptors(ctx, txn); err != nil {
return err
}
modifiedDescriptors = descsCol.GetDescriptorsWithNewVersion()

if err := CheckSpanCountLimit(
ctx, descsCol, cf.spanConfigSplitter, cf.spanConfigLimiter, txn,
); err != nil {
return err
}
retryErr, err := CheckTwoVersionInvariant(
ctx, db.Clock(), ie, descsCol, txn, nil /* onRetryBackoff */)
if retryErr {
return errTwoVersionInvariantViolated
}
deletedDescs = descsCol.deletedDescs
return err
}); errors.Is(err, errTwoVersionInvariantViolated) {
continue
} else {
if err == nil {
err = waitForDescriptors(modifiedDescriptors, deletedDescs)
}
return err
}
}
}

// CheckTwoVersionInvariant checks whether any new schema being modified written
// at a version V has only valid leases at version = V - 1. A transaction retry
// error as well as a boolean is returned whenever the invariant is violated.
Expand Down
91 changes: 91 additions & 0 deletions pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,52 @@ func MakeInternalExecutor(
}
}

// NewInternalExecutorWithTxn creates an Internal Executor with txn related
// information, and also a function that can be called to commit the txn.
// This function should only be used in the implementation of
// descs.CollectionFactory's InternalExecutorFactoryWithTxn.
// TODO (janexing): This function will be soon refactored after we change
// the internal executor infrastructure with a single conn executor for all
// sql statement executions within a txn.
func NewInternalExecutorWithTxn(
s *Server,
sd *sessiondata.SessionData,
txn *kv.Txn,
memMetrics MemoryMetrics,
monitor *mon.BytesMonitor,
descCol *descs.Collection,
schemaChangeJobRecords map[descpb.ID]*jobs.Record,
) (*InternalExecutor, sqlutil.InternalExecutorCommitTxnFunc) {
schemaChangerState := &SchemaChangerState{
mode: sd.NewSchemaChangerMode,
}
ie := InternalExecutor{
s: s,
mon: monitor,
memMetrics: memMetrics,
extraTxnState: &extraTxnState{
txn: txn,
descCollection: descCol,
schemaChangeJobRecords: schemaChangeJobRecords,
schemaChangerState: schemaChangerState,
},
}
ie.s.populateMinimalSessionData(sd)
ie.sessionDataStack = sessiondata.NewStack(sd)

commitTxnFunc := func(ctx context.Context) error {
defer func() {
ie.releaseSchemaChangeJobRecords()
}()
if err := ie.commitTxn(ctx); err != nil {
return err
}
return nil
}

return &ie, commitTxnFunc
}

// MakeInternalExecutorMemMonitor creates and starts memory monitor for an
// InternalExecutor.
func MakeInternalExecutorMemMonitor(
Expand Down Expand Up @@ -966,6 +1012,42 @@ func (ie *InternalExecutor) execInternal(
return r, nil
}

// ReleaseSchemaChangeJobRecords is to release the schema change job records.
func (ie *InternalExecutor) releaseSchemaChangeJobRecords() {
for k := range ie.extraTxnState.schemaChangeJobRecords {
delete(ie.extraTxnState.schemaChangeJobRecords, k)
}
}

// commitTxn is to commit the txn bound to the internal executor.
// It should only be used in CollectionFactory.TxnWithExecutor().
func (ie *InternalExecutor) commitTxn(ctx context.Context) error {
if ie.extraTxnState == nil || ie.extraTxnState.txn == nil {
return errors.New("no txn to commit")
}

var sd *sessiondata.SessionData
if ie.sessionDataStack != nil {
sd = ie.sessionDataStack.Top().Clone()
} else {
sd = ie.s.newSessionData(SessionArgs{})
}

rw := newAsyncIEResultChannel()
stmtBuf := NewStmtBuf()

ex, err := ie.initConnEx(ctx, ie.extraTxnState.txn, rw, sd, stmtBuf, nil /* syncCallback */)
if err != nil {
return errors.Wrap(err, "cannot create conn executor to commit txn")
}
defer ex.close(ctx, externalTxnClose)

if err := ex.commitSQLTransactionInternal(ctx); err != nil {
return err
}
return nil
}

// internalClientComm is an implementation of ClientComm used by the
// InternalExecutor. Result rows are buffered in memory.
type internalClientComm struct {
Expand Down Expand Up @@ -1158,3 +1240,12 @@ func (ief *InternalExecutorFactory) NewInternalExecutor(
ie.SetSessionData(sd)
return &ie
}

// RunWithoutTxn is to create an internal executor without binding to a txn,
// and run the passed function with this internal executor.
func (ief *InternalExecutorFactory) RunWithoutTxn(
ctx context.Context, run func(ctx context.Context, ie sqlutil.InternalExecutor) error,
) error {
ie := ief.NewInternalExecutor(nil /* sessionData */)
return run(ctx, ie)
}
7 changes: 7 additions & 0 deletions pkg/sql/sqlutil/internal_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ type InternalExecutorFactory interface {
// NewInternalExecutor constructs a new internal executor.
// TODO (janexing): this should be deprecated soon.
NewInternalExecutor(sd *sessiondata.SessionData) InternalExecutor
// RunWithoutTxn is to create an internal executor without binding to a txn,
// and run the passed function with this internal executor.
RunWithoutTxn(ctx context.Context, run func(ctx context.Context, ie InternalExecutor) error) error
}

// InternalExecFn is the type of functions that operates using an internalExecutor.
Expand All @@ -223,3 +226,7 @@ type InternalExecFn func(ctx context.Context, txn *kv.Txn, ie InternalExecutor)
// passes the fn the exported InternalExecutor instead of the whole unexported
// extendedEvalContenxt, so it can be implemented outside pkg/sql.
type HistoricalInternalExecTxnRunner func(ctx context.Context, fn InternalExecFn) error

// InternalExecutorCommitTxnFunc is to commit the txn associated with an
// internal executor.
type InternalExecutorCommitTxnFunc func(ctx context.Context) error

0 comments on commit dc0fed4

Please sign in to comment.