Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: make descs.Collection usage more consistent #64054

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,11 @@ type historicalTxnRunner func(ctx context.Context, fn scTxnFn) error
// makeFixedTimestampRunner creates a historicalTxnRunner suitable for use by the helpers.
func (sc *SchemaChanger) makeFixedTimestampRunner(readAsOf hlc.Timestamp) historicalTxnRunner {
runner := func(ctx context.Context, retryable scTxnFn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, sc.ieFactory)
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors)
return retryable(ctx, txn, &evalCtx)
})
}
Expand All @@ -156,9 +158,13 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
readAsOf hlc.Timestamp,
) HistoricalInternalExecTxnRunner {
runner := func(ctx context.Context, retryable InternalExecFn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) error {
return sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
// We need to re-create the evalCtx since the txn may retry.
ie := createSchemaChangeEvalCtx(ctx, sc.execCfg, readAsOf, sc.ieFactory).InternalExecutor.(*InternalExecutor)
ie := createSchemaChangeEvalCtx(
ctx, sc.execCfg, readAsOf, sc.ieFactory, descriptors,
).InternalExecutor.(*InternalExecutor)
return retryable(ctx, txn, ie)
})
}
Expand All @@ -168,11 +174,11 @@ func (sc *SchemaChanger) makeFixedTimestampInternalExecRunner(
func (sc *SchemaChanger) fixedTimestampTxn(
ctx context.Context,
readAsOf hlc.Timestamp,
retryable func(ctx context.Context, txn *kv.Txn) error,
retryable func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error,
) error {
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
txn.SetFixedTimestamp(ctx, readAsOf)
return retryable(ctx, txn)
return retryable(ctx, txn, descriptors)
})
}

Expand Down Expand Up @@ -672,8 +678,12 @@ func (sc *SchemaChanger) validateConstraints(
readAsOf := sc.clock.Now()
var tableDesc catalog.TableDescriptor

if err := sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) error {
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.descID)
if err := sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidCached = true
tableDesc, err = descriptors.GetImmutableTableByID(ctx, txn, sc.descID, flags)
return err
}); err != nil {
return err
Expand Down Expand Up @@ -707,7 +717,7 @@ func (sc *SchemaChanger) validateConstraints(
evalCtx.Txn = txn
// Use the DistSQLTypeResolver because we need to resolve types by ID.
semaCtx := tree.MakeSemaContext()
collection := descs.NewCollection(sc.settings, sc.leaseMgr, nil /* hydratedTables */)
collection := evalCtx.Descs
semaCtx.TypeResolver = descs.NewDistSQLTypeResolver(collection, txn)
// TODO (rohany): When to release this? As of now this is only going to get released
// after the check is validated.
Expand Down Expand Up @@ -1019,7 +1029,9 @@ func (sc *SchemaChanger) distIndexBackfill(
// cheap scan.
const pageSize = 10000
noop := func(_ []kv.KeyValue) error { return nil }
if err := sc.fixedTimestampTxn(ctx, writeAsOf, func(ctx context.Context, txn *kv.Txn) error {
if err := sc.fixedTimestampTxn(ctx, writeAsOf, func(
ctx context.Context, txn *kv.Txn, _ *descs.Collection,
) error {
for _, span := range targetSpans {
// TODO(dt): a Count() request would be nice here if the target isn't
// empty, since we don't need to drag all the results back just to
Expand Down Expand Up @@ -1056,8 +1068,10 @@ func (sc *SchemaChanger) distIndexBackfill(
var planCtx *PlanningCtx
// The txn is used to fetch a tableDesc, partition the spans and set the
// evalCtx ts all of which is during planning of the DistSQL flow.
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
tc := descs.NewCollection(sc.settings, sc.leaseMgr, nil /* hydratedTables */)
if err := sc.txn(ctx, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {

// It is okay to release the lease on the descriptor before running the
// index backfill flow because any schema change that would invalidate the
// index being backfilled, would be queued behind the backfill in the
Expand All @@ -1069,12 +1083,11 @@ func (sc *SchemaChanger) distIndexBackfill(
// clear what this buys us in terms of checking the descriptors validity.
// Thus, in favor of simpler code and no correctness concerns we release
// the lease once the flow is planned.
defer tc.ReleaseAll(ctx)
tableDesc, err := sc.getTableVersion(ctx, txn, tc, version)
tableDesc, err := sc.getTableVersion(ctx, txn, descriptors, version)
if err != nil {
return err
}
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory)
evalCtx = createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory, descriptors)
planCtx = sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
Expand Down Expand Up @@ -1303,7 +1316,9 @@ func (sc *SchemaChanger) distBackfill(
// may not commit. Instead write the updated value for todoSpans to this
// variable and assign to todoSpans after committing.
var updatedTodoSpans []roachpb.Span
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := sc.txn(ctx, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
updatedTodoSpans = todoSpans
// Report schema change progress. We define progress at this point
// as the fraction of fully-backfilled ranges of the primary index of
Expand All @@ -1328,10 +1343,7 @@ func (sc *SchemaChanger) distBackfill(
}
}

tc := descs.NewCollection(sc.settings, sc.leaseMgr, nil /* hydratedTables */)
// Use a leased table descriptor for the backfill.
defer tc.ReleaseAll(ctx)
tableDesc, err := sc.getTableVersion(ctx, txn, tc, version)
tableDesc, err := sc.getTableVersion(ctx, txn, descriptors, version)
if err != nil {
return err
}
Expand All @@ -1343,7 +1355,7 @@ func (sc *SchemaChanger) distBackfill(
return nil
}
cbw := MetadataCallbackWriter{rowResultWriter: &errOnlyResultWriter{}, fn: metaFn}
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory)
evalCtx := createSchemaChangeEvalCtx(ctx, sc.execCfg, txn.ReadTimestamp(), sc.ieFactory, descriptors)
recv := MakeDistSQLReceiver(
ctx,
&cbw,
Expand Down Expand Up @@ -1465,8 +1477,12 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error {

readAsOf := sc.clock.Now()
var tableDesc catalog.TableDescriptor
if err := sc.fixedTimestampTxn(ctx, readAsOf, func(ctx context.Context, txn *kv.Txn) (err error) {
tableDesc, err = catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.descID)
if err := sc.fixedTimestampTxn(ctx, readAsOf, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) (err error) {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidCached = true
tableDesc, err = descriptors.GetImmutableTableByID(ctx, txn, sc.descID, flags)
return err
}); err != nil {
return err
Expand Down
34 changes: 21 additions & 13 deletions pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
Expand Down Expand Up @@ -127,20 +128,27 @@ func (ib *IndexBackfillPlanner) plan(
var evalCtx extendedEvalContext
var planCtx *PlanningCtx
td := tabledesc.NewBuilder(tableDesc.TableDesc()).BuildExistingMutableTable()
if err := ib.execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, ib.ieFactory)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
// batch size. Also plumb in a testing knob.
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, chunkSize, indexesToBackfill)
if err != nil {
if err := descs.Txn(ctx,
ib.execCfg.Settings,
ib.execCfg.LeaseManager,
ib.execCfg.InternalExecutor,
ib.execCfg.DB,
func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
evalCtx = createSchemaChangeEvalCtx(ctx, ib.execCfg, nowTimestamp, ib.ieFactory, descriptors)
planCtx = ib.execCfg.DistSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
true /* distribute */)
// TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the
// batch size. Also plumb in a testing knob.
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, chunkSize, indexesToBackfill)
if err != nil {
return err
}
p, err = ib.execCfg.DistSQLPlanner.createBackfillerPhysicalPlan(planCtx, spec, sourceSpans)
return err
}
p, err = ib.execCfg.DistSQLPlanner.createBackfillerPhysicalPlan(planCtx, spec, sourceSpans)
return err
}); err != nil {
}); err != nil {
return nil, err
}

Expand Down
20 changes: 16 additions & 4 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,16 @@ func (sc *SchemaChanger) notFirstInLine(ctx context.Context, desc catalog.Descri
func (sc *SchemaChanger) getTargetDescriptor(ctx context.Context) (catalog.Descriptor, error) {
// Retrieve the descriptor that is being changed.
var desc catalog.Descriptor
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
desc, err = catalogkv.MustGetDescriptorByID(ctx, txn, sc.execCfg.Codec, sc.descID)
if err := sc.txn(ctx, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) (err error) {
flags := tree.CommonLookupFlags{
AvoidCached: true,
Required: true,
IncludeOffline: true,
IncludeDropped: true,
}
desc, err = descriptors.GetImmutableDescriptorByID(ctx, txn, sc.descID, flags)
return err
}); err != nil {
return nil, err
Expand Down Expand Up @@ -776,8 +784,10 @@ func (sc *SchemaChanger) handlePermanentSchemaChangeError(

// initialize the job running status.
func (sc *SchemaChanger) initJobRunningStatus(ctx context.Context) error {
return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
desc, err := catalogkv.MustGetTableDescByID(ctx, txn, sc.execCfg.Codec, sc.descID)
return sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descriptors *descs.Collection) error {
flags := tree.ObjectLookupFlagsWithRequired()
flags.AvoidCached = true
desc, err := descriptors.GetImmutableTableByID(ctx, txn, sc.descID, flags)
if err != nil {
return err
}
Expand Down Expand Up @@ -2022,6 +2032,7 @@ func createSchemaChangeEvalCtx(
execCfg *ExecutorConfig,
ts hlc.Timestamp,
ieFactory sqlutil.SessionBoundInternalExecutorFactory,
descriptors *descs.Collection,
) extendedEvalContext {

sd := NewFakeSessionData()
Expand All @@ -2032,6 +2043,7 @@ func createSchemaChangeEvalCtx(
// other fields are used.
Tracing: &SessionTracing{},
ExecCfg: execCfg,
Descs: descriptors,
EvalContext: tree.EvalContext{
SessionData: sd,
InternalExecutor: ieFactory(ctx, sd),
Expand Down