Skip to content

Commit

Permalink
sql: use the descs.Collection to access types during distributed flows
Browse files Browse the repository at this point in the history
This commit enables distributed queries to access user defined type
metadata during flow setup via the lease manager, so that accesses to
this metadata is cached and doesn't have to go through k/v on every
access.

This is achieved by giving the `FlowContext` a `descs.Collection` is
used to access the descriptors through the lease manager.

Release note: None
  • Loading branch information
rohany committed Aug 3, 2020
1 parent 65c5770 commit bf9ffe9
Show file tree
Hide file tree
Showing 40 changed files with 489 additions and 319 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/exportcsv.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ func newCSVWriterProcessor(
input: input,
output: output,
}
if err := c.out.Init(&execinfrapb.PostProcessSpec{}, c.OutputTypes(), flowCtx.NewEvalCtx(), output); err != nil {
semaCtx := tree.MakeSemaContext()
if err := c.out.Init(&execinfrapb.PostProcessSpec{}, c.OutputTypes(), &semaCtx, flowCtx.NewEvalCtx(), output); err != nil {
return nil, err
}
return c, nil
Expand Down
37 changes: 0 additions & 37 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
Expand Down Expand Up @@ -120,43 +119,7 @@ func makeInputConverter(
evalCtx *tree.EvalContext,
kvCh chan row.KVBatch,
) (inputConverter, error) {

// installTypeMetadata is a closure that performs the work of installing
// type metadata in all of the tables being imported.
installTypeMetadata := func(evalCtx *tree.EvalContext) error {
for _, table := range spec.Tables {
var colTypes []*types.T
for _, col := range table.Desc.Columns {
colTypes = append(colTypes, col.Type)
}
if err := execinfrapb.HydrateTypeSlice(evalCtx, colTypes); err != nil {
return err
}
}
return nil
}

injectTimeIntoEvalCtx(evalCtx, spec.WalltimeNanos)

if evalCtx.Txn != nil {
// If we have a transaction, then use it.
if err := installTypeMetadata(evalCtx); err != nil {
return nil, err
}
} else if evalCtx.DB != nil {
// Otherwise, open up a new transaction to hydrate type metadata.
// We only perform this logic if evalCtx.DB != nil because there are
// some tests that pass an evalCtx with a nil DB to this function.
// TODO (rohany): Once we lease type descriptors, this should instead
// look into the leased set using the DistSQLTypeResolver.
if err := evalCtx.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
evalCtx.Txn = txn
return installTypeMetadata(evalCtx)
}); err != nil {
return nil, err
}
}

var singleTable *sqlbase.TableDescriptor
var singleTableTargetCols tree.NameList
if len(spec.Tables) == 1 {
Expand Down
24 changes: 21 additions & 3 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand All @@ -47,9 +48,26 @@ func runImport(
) (*roachpb.BulkOpSummary, error) {
// Used to send ingested import rows to the KV layer.
kvCh := make(chan row.KVBatch, 10)
evalCtx := flowCtx.NewEvalCtx()
evalCtx.DB = flowCtx.Cfg.DB
conv, err := makeInputConverter(ctx, spec, evalCtx, kvCh)

// Install type metadata in all of the import tables. The DB is nil in some
// tests, so check first here.
if flowCtx.Cfg.DB != nil {
if err := flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
resolver := flowCtx.TypeResolverFactory.NewTypeResolver(txn)
for _, table := range spec.Tables {
if err := sqlbase.HydrateTypesInTableDescriptor(ctx, table.Desc, resolver); err != nil {
return err
}
}
return nil
}); err != nil {
return nil, err
}
// Release leases on any accessed types now that type metadata is installed.
flowCtx.TypeResolverFactory.Descriptors.ReleaseAll(ctx)
}

conv, err := makeInputConverter(ctx, spec, flowCtx.NewEvalCtx(), kvCh)
if err != nil {
return nil, err
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,13 @@ func (sc *SchemaChanger) validateConstraints(
// need a semaContext set up that can resolve types in order to pretty
// print the check expression back to the user.
evalCtx.Txn = txn
semaCtx := tree.MakeSemaContext()
// Use the DistSQLTypeResolver because we need to resolve types by ID.
semaCtx.TypeResolver = &execinfrapb.DistSQLTypeResolver{EvalContext: &evalCtx.EvalContext}
semaCtx := tree.MakeSemaContext()
collection := descs.NewCollection(sc.leaseMgr, sc.settings)
semaCtx.TypeResolver = descs.NewDistSQLTypeResolver(collection, txn)
// TODO (rohany): When to release this? As of now this is only going to get released
// after the check is validated.
defer func() { collection.ReleaseAll(ctx) }()
switch c.ConstraintType {
case sqlbase.ConstraintToUpdate_CHECK:
if err := validateCheckInTxn(ctx, sc.leaseMgr, &semaCtx, &evalCtx.EvalContext, desc, txn, c.Check.Name); err != nil {
Expand Down Expand Up @@ -1374,7 +1378,7 @@ func runSchemaChangesInTxn(
if doneColumnBackfill || !sqlbase.ColumnNeedsBackfill(m.GetColumn()) {
break
}
if err := columnBackfillInTxn(ctx, planner.Txn(), planner.Descriptors(), planner.EvalContext(), immutDesc, traceKV); err != nil {
if err := columnBackfillInTxn(ctx, planner.Txn(), planner.EvalContext(), planner.SemaCtx(), immutDesc, traceKV); err != nil {
return err
}
doneColumnBackfill = true
Expand Down Expand Up @@ -1435,7 +1439,7 @@ func runSchemaChangesInTxn(
break
}
if err := columnBackfillInTxn(
ctx, planner.Txn(), planner.Descriptors(), planner.EvalContext(), immutDesc, traceKV,
ctx, planner.Txn(), planner.EvalContext(), planner.SemaCtx(), immutDesc, traceKV,
); err != nil {
return err
}
Expand Down Expand Up @@ -1675,8 +1679,8 @@ func validateFkInTxn(
func columnBackfillInTxn(
ctx context.Context,
txn *kv.Txn,
tc *descs.Collection,
evalCtx *tree.EvalContext,
semaCtx *tree.SemaContext,
tableDesc *sqlbase.ImmutableTableDescriptor,
traceKV bool,
) error {
Expand All @@ -1685,7 +1689,7 @@ func columnBackfillInTxn(
return nil
}
var backfiller backfill.ColumnBackfiller
if err := backfiller.Init(ctx, evalCtx, tableDesc); err != nil {
if err := backfiller.InitForLocalUse(ctx, evalCtx, semaCtx, tableDesc); err != nil {
return err
}
sp := tableDesc.PrimaryIndexSpan(evalCtx.Codec)
Expand Down
Loading

0 comments on commit bf9ffe9

Please sign in to comment.