Skip to content

Commit

Permalink
sql: make descs.Collection usage more consistent
Browse files Browse the repository at this point in the history
This commit removes undesirable uses of catalogkv.

Release note: None
  • Loading branch information
ajwerner committed Apr 22, 2021
1 parent eb44d97 commit 1a0c8de
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 42 deletions.
64 changes: 40 additions & 24 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,11 @@ type historicalTxnRunner func(ctx context.Context, fn scTxnFn) error
// makeFixedTimestampRunner creates a historicalTxnRunner suitable for use by the helpers.
func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) historicalTxnRunner {
runner := func(ctx context.Context, retryable scTxnFn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, sc.ieFactory)
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors)
return retryable(ctx, txn, &evalCtx)
})
}
Expand All @@ -156,9 +158,13 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
readAsOf hlc.Timestamp,
) HistoricalInternalExecTxnRunner {
runner := func(ctx context.Context, retryable InternalExecFn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, sc.ieFactory).InternalExecutor.(*InternalExecutor)
ie := createSchemaChangeEvalCtx(
ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors,
).InternalExecutor.(*InternalExecutor)
return retryable(ctx, txn, ie)
})
}
Expand All @@ -168,11 +174,11 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
func (sc *SchemaChanger) fixedTimestampTxn(
ctx context.Context,
readAsOf hlc.Timestamp,
retryable func(ctx context.Context, txn *kv.Txn) error,
retryable func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error,
) error {
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
txn.SetFixedTimestamp(ctx, readAsOf)
return retryable(ctx, txn)
return retryable(ctx, txn, descriptors)
})
}

Expand Down Expand Up @@ -672,8 +678,12 @@ func (sc *SchemaChanger) validateConstraints(
readAsOf := sc.clock.Now()
var tableDesc catalog.TableDescriptor

if err := sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) error {
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.descID)
if err := sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidCached = true
tableDesc, err = descriptors.GetImmutableTableByID(ctx, txn, sc.descID, flags)
return err
}); err != nil {
return err
Expand Down Expand Up @@ -707,7 +717,7 @@ func (sc *SchemaChanger) validateConstraints(
evalCtx.Txn = txn
// Use the DistSQLTypeResolver because we need to resolve types by ID.
semaCtx := tree.MakeSemaContext()
collection := descs.NewCollection(sc.settings, sc.leaseMgr, nil /* hydratedTables */)
collection := evalCtx.Descs
semaCtx.TypeResolver = descs.NewDistSQLTypeResolver(collection, txn)
// TODO (rohany): When to release this? As of now this is only going to get released
// after the check is validated.
Expand Down Expand Up @@ -993,7 +1003,9 @@ func (sc *SchemaChanger) distIndexBackfill(
// cheap scan.
const pageSize = 10000
noop := func(_ []kv.KeyValue) error { return nil }
if err := sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) error {
if err := sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, _ *descs.Collection,
) error {
for _, span := range targetSpans {
// TODO(dt): a Count() request would be nice here if the target isn't
// empty, since we don't need to drag all the results back just to
Expand Down Expand Up @@ -1030,8 +1042,10 @@ func (sc *SchemaChanger) distIndexBackfill(
var planCtx *PlanningCtx
// The txn is used to fetch a tableDesc, partition the spans and set the
// evalCtx ts all of which is during planning of the DistSQL flow.
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
tc := descs.NewCollection(sc.settings, sc.leaseMgr, nil /* hydratedTables */)
if err := sc.txn(ctx, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {

// It is okay to release the lease on the descriptor before running the
// index backfill flow because any schema change that would invalidate the
// index being backfilled, would be queued behind the backfill in the
Expand All @@ -1043,12 +1057,11 @@ func (sc *SchemaChanger) distIndexBackfill(
// clear what this buys us in terms of checking the descriptors validity.
// Thus, in favor of simpler code and no correctness concerns we release
// the lease once the flow is planned.
defer tc.ReleaseAll(ctx)
tableDesc, err := sc.getTableVersion(ctx, txn, tc, version)
tableDesc, err := sc.getTableVersion(ctx, txn, descriptors, version)
if err != nil {
return err
}
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory)
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory, descriptors)
planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
Expand Down Expand Up @@ -1277,7 +1290,9 @@ func (sc *SchemaChanger) distBackfill(
// may not commit. Instead write the updated value for todoSpans to this
// variable and assign to todoSpans after committing.
var updatedTodoSpans []roachpb.Span
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := sc.txn(ctx, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
updatedTodoSpans = todoSpans
// Report schema change progress. We define progress at this point
// as the fraction of fully-backfilled ranges of the primary index of
Expand All @@ -1302,10 +1317,7 @@ func (sc *SchemaChanger) distBackfill(
}
}

tc := descs.NewCollection(sc.settings, sc.leaseMgr, nil /* hydratedTables */)
// Use a leased table descriptor for the backfill.
defer tc.ReleaseAll(ctx)
tableDesc, err := sc.getTableVersion(ctx, txn, tc, version)
tableDesc, err := sc.getTableVersion(ctx, txn, descriptors, version)
if err != nil {
return err
}
Expand All @@ -1317,7 +1329,7 @@ func (sc *SchemaChanger) distBackfill(
return nil
}
cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: metaFn}
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory)
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory, descriptors)
recv := MakeDistSQLReceiver(
ctx,
&cbw,
Expand Down Expand Up @@ -1439,8 +1451,12 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error {

readAsOf := sc.clock.Now()
var tableDesc catalog.TableDescriptor
if err := sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) (err error) {
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.descID)
if err := sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) (err error) {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidCached = true
tableDesc, err = descriptors.GetImmutableTableByID(ctx, txn, sc.descID, flags)
return err
}); err != nil {
return err
Expand Down
34 changes: 21 additions & 13 deletions pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
Expand Down Expand Up @@ -124,20 +125,27 @@ func (ib *IndexBackfillPlanner) plan(
var evalCtx extendedEvalContext
var planCtx *PlanningCtx
td := tabledesc.NewBuilder(tableDesc.TableDesc()).BuildExistingMutableTable()
if err := ib.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, ib.ieFactory)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
// batch size. Also plumb in a testing knob.
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
spec, err := initIndexBackfillerSpec(*td.TableDesc(), readAsOf, chunkSize, indexesToBackfill)
if err != nil {
if err := descs.Txn(ctx,
ib.execCfg.Settings,
ib.execCfg.LeaseManager,
ib.execCfg.InternalExecutor,
ib.execCfg.DB,
func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, ib.ieFactory, descriptors)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
// batch size. Also plumb in a testing knob.
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
spec, err := initIndexBackfillerSpec(*td.TableDesc(), readAsOf, chunkSize, indexesToBackfill)
if err != nil {
return err
}
p, err = ib.execCfg.DistSQLPlanner.createBackfillerPhysicalPlan(planCtx, spec, sourceSpans)
return err
}
p, err = ib.execCfg.DistSQLPlanner.createBackfillerPhysicalPlan(planCtx, spec, sourceSpans)
return err
}); err != nil {
}); err != nil {
return nil, err
}

Expand Down
20 changes: 16 additions & 4 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,16 @@ func (sc *SchemaChanger) notFirstInLine(ctx context.Context, desc catalog.Descri
func (sc *SchemaChanger) getTargetDescriptor(ctx context.Context) (catalog.Descriptor, error) {
// Retrieve the descriptor that is being changed.
var desc catalog.Descriptor
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, sc.execCfg.Codec, sc.descID)
if err := sc.txn(ctx, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) (err error) {
flags := tree.CommonLookupFlags{
AvoidCached: true,
Required: true,
IncludeOffline: true,
IncludeDropped: true,
}
desc, err = descriptors.GetImmutableDescriptorByID(ctx, txn, sc.descID, flags)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -776,8 +784,10 @@ func (sc *SchemaChanger) handlePermanentSchemaChangeError(

// initialize the job running status.
func (sc *SchemaChanger) initJobRunningStatus(ctx context.Context) error {
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.descID)
return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidCached = true
desc, err := descriptors.GetImmutableTableByID(ctx, txn, sc.descID, flags)
if err != nil {
return err
}
Expand Down Expand Up @@ -2022,6 +2032,7 @@ func createSchemaChangeEvalCtx(
execCfg *ExecutorConfig,
ts hlc.Timestamp,
ieFactory sqlutil.SessionBoundInternalExecutorFactory,
descriptors *descs.Collection,
) extendedEvalContext {

sd := NewFakeSessionData()
Expand All @@ -2032,6 +2043,7 @@ func createSchemaChangeEvalCtx(
// other fields are used.
Tracing: &SessionTracing{},
ExecCfg: execCfg,
Descs: descriptors,
EvalContext: tree.EvalContext{
SessionData: sd,
InternalExecutor: ieFactory(ctx, sd),
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6339,7 +6339,7 @@ func TestDropTableWhileSchemaChangeReverting(t *testing.T) {
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeOnFailOrCancel: func(_ jobspb.JobID) error {
RunBeforeOnFailOrCancel: func(id jobspb.JobID) error {
close(beforeOnFailOrCancelNotification)
<-continueNotification
// Return a retry error, so that we can be sure to test the path where
Expand Down

0 comments on commit 1a0c8de

Please sign in to comment.