Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
91567: sql: attempt to reduce duplication in legacy schema changer txn management r=ajwerner a=ZhouXing19

This is part of the effort to deprecate `ieFactory.NewInternalExecutor()`. The internal executor should be bound to the outer txn, if there's one.

Epic: none

Release note: None

Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com>
  • Loading branch information
craig[bot] and ajwerner committed Nov 9, 2022
2 parents f6147b6 + e03be7b commit 15369db
Showing 2 changed files with 34 additions and 23 deletions.
30 changes: 17 additions & 13 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
@@ -172,13 +172,17 @@ func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) histor
func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
readAsOf hlc.Timestamp,
) descs.HistoricalInternalExecTxnRunner {
return descs.NewHistoricalInternalExecTxnRunner(readAsOf, func(ctx context.Context, retryable descs.InternalExecFn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
return descs.NewHistoricalInternalExecTxnRunner(readAsOf, func(
ctx context.Context, retryable descs.InternalExecFn,
) error {
return sc.fixedTimestampTxnWithExecutor(ctx, readAsOf, func(
ctx context.Context,
txn *kv.Txn,
_ *sessiondata.SessionData,
descriptors *descs.Collection,
ie sqlutil.InternalExecutor,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := sc.ieFactory.NewInternalExecutor(NewFakeSessionData(sc.execCfg.SV()))
return retryable(ctx, txn, ie, nil /* descriptors */)
return retryable(ctx, txn, ie, descriptors)
})
})
}
@@ -188,10 +192,10 @@ func (sc *SchemaChanger) fixedTimestampTxn(
readAsOf hlc.Timestamp,
retryable func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error,
) error {
return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil {
return err
}
return sc.fixedTimestampTxnWithExecutor(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, _ *sessiondata.SessionData,
descriptors *descs.Collection, _ sqlutil.InternalExecutor,
) error {
return retryable(ctx, txn, descriptors)
})
}
@@ -207,9 +211,9 @@ func (sc *SchemaChanger) fixedTimestampTxnWithExecutor(
ie sqlutil.InternalExecutor,
) error,
) error {
sd := NewFakeSessionData(sc.execCfg.SV())
return sc.txnWithExecutor(ctx, sd, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection, ie sqlutil.InternalExecutor,
return sc.txnWithExecutor(ctx, func(
ctx context.Context, txn *kv.Txn, sd *sessiondata.SessionData,
descriptors *descs.Collection, ie sqlutil.InternalExecutor,
) error {
if err := txn.SetFixedTimestamp(ctx, readAsOf); err != nil {
return err
27 changes: 17 additions & 10 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
@@ -2456,27 +2456,34 @@ func (*SchemaChangerTestingKnobs) ModuleTestingKnobs() {}
func (sc *SchemaChanger) txn(
ctx context.Context, f func(context.Context, *kv.Txn, *descs.Collection) error,
) error {
if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil {
if err := fn(sc.job.ID()); err != nil {
return err
}
}
return sc.execCfg.InternalExecutorFactory.DescsTxn(ctx, sc.db, f)
return sc.txnWithExecutor(ctx, func(
ctx context.Context, txn *kv.Txn, _ *sessiondata.SessionData,
collection *descs.Collection, _ sqlutil.InternalExecutor,
) error {
return f(ctx, txn, collection)
})
}

// txnWithExecutor is to run internal executor within a txn.
func (sc *SchemaChanger) txnWithExecutor(
ctx context.Context,
sd *sessiondata.SessionData,
f func(context.Context, *kv.Txn, *descs.Collection, sqlutil.InternalExecutor) error,
f func(
context.Context, *kv.Txn, *sessiondata.SessionData,
*descs.Collection, sqlutil.InternalExecutor,
) error,
) error {
if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil {
if err := fn(sc.job.ID()); err != nil {
return err
}
}
return sc.execCfg.InternalExecutorFactory.
DescsTxnWithExecutor(ctx, sc.db, sd, f)
sd := NewFakeSessionData(sc.execCfg.SV())
return sc.execCfg.InternalExecutorFactory.DescsTxnWithExecutor(ctx, sc.db, sd, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
ie sqlutil.InternalExecutor,
) error {
return f(ctx, txn, sd, descriptors, ie)
})
}

// createSchemaChangeEvalCtx creates an extendedEvalContext() to be used for backfills.

0 comments on commit 15369db

Please sign in to comment.