Skip to content

Commit

Permalink
colfetcher: produce batches only of needed columns
Browse files Browse the repository at this point in the history
This commit updates the cFetcher to operate only on the needed columns.
Previously, it would get all columns present in the whole table and
create `coldata.Batch`es for each of the columns, even when only
a subset of columns is needed (or even available if the index is not
covering).

For example, imagine we have a table like
```
t (a INT PRIMARY KEY, b INT, c INT, d INT, INDEX b_idx (b))
```
and we have a query like `SELECT b FROM t@b_idx`. Previously, we would
create a batch with 4 `int64` columns with only 1 (for column `b`) being
actually populated and all others marked as "not needed" and set to all
NULL values.

This is suboptimal, and this commit refactors things so that a batch
with only a single vector is created. This is achieved in two steps:
- first, we populate the slice of column descriptors that are accessible
from the index. These are all columns of the table for covering indexes.
For non-covering secondary indexes we only keep columns present in the
index
- next, we examine the set of columns that need to be fetched and prune
all not needed columns away.

Note that we are always careful to update the post-processing spec
accordingly so that the spec always refers to correct new ordinals.

It's worth pointing out that since we're modifying the spec directly, we
had to introduce some special logic to keep the original state of the
`PostProcessSpec` in order for the flow diagrams to refer to the
original columns.

Release note: None
  • Loading branch information
yuzefovich committed Oct 27, 2021
1 parent 61f736a commit 8e40f1c
Show file tree
Hide file tree
Showing 23 changed files with 781 additions and 399 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ func NewColOperator(
estimatedRowCount := spec.EstimatedRowCount
scanOp, err := colfetcher.NewColBatchScan(
ctx, colmem.NewAllocator(ctx, cFetcherMemAcc, factory), kvFetcherMemAcc,
flowCtx, evalCtx, core.TableReader, post, estimatedRowCount,
flowCtx, evalCtx, args.ExprHelper, core.TableReader, post, estimatedRowCount,
)
if err != nil {
return r, err
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/colexec/colexecspan/span_assembler.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions pkg/sql/colexec/colexecspan/span_assembler_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,19 @@ import (

// NewColSpanAssembler returns a ColSpanAssembler operator that is able to
// generate lookup spans from input batches.
// - neededColOrdsInWholeTable is a set containing the ordinals of all columns
// that need to be fetched. These ordinals are based on the schema of the whole
// table rather than only among the needed columns.
func NewColSpanAssembler(
codec keys.SQLCodec,
allocator *colmem.Allocator,
table catalog.TableDescriptor,
index catalog.Index,
inputTypes []*types.T,
neededCols util.FastIntSet,
neededColOrdsInWholeTable util.FastIntSet,
) ColSpanAssembler {
base := spanAssemblerPool.Get().(*spanAssemblerBase)
base.colFamStartKeys, base.colFamEndKeys = getColFamilyEncodings(neededCols, table, index)
base.colFamStartKeys, base.colFamEndKeys = getColFamilyEncodings(neededColOrdsInWholeTable, table, index)
keyPrefix := rowenc.MakeIndexKeyPrefix(codec, table, index.GetID())
base.scratchKey = append(base.scratchKey[:0], keyPrefix...)
base.prefixLength = len(keyPrefix)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/hash_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func NewHashAggregator(
aggFnsAlloc: aggFnsAlloc,
hashAlloc: aggBucketAlloc{allocator: args.Allocator},
}
hashAgg.accountingHelper.Init(outputUnlimitedAllocator, args.OutputTypes, nil /* notNeededVecIdxs */)
hashAgg.accountingHelper.Init(outputUnlimitedAllocator, args.OutputTypes)
hashAgg.bufferingState.tuples = colexecutils.NewAppendOnlyBufferedBatch(args.Allocator, args.InputTypes, nil /* colsToStore */)
hashAgg.datumAlloc.AllocSize = hashAggregatorAllocSize
hashAgg.aggHelper = newAggregatorHelper(args, &hashAgg.datumAlloc, true /* isHashAgg */, hashAggregatorMaxBuffered)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/ordered_synchronizer_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewOrderedSynchronizer(
typs: typs,
canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs),
}
os.accountingHelper.Init(allocator, typs, nil /* notNeededVecIdxs */)
os.accountingHelper.Init(allocator, typs)
return os
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colfetcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
name = "colfetcher",
srcs = [
"cfetcher.go",
"cfetcher_setup.go",
"colbatch_scan.go",
"index_join.go",
":gen-fetcherstate-stringer", # keep
Expand All @@ -31,6 +32,7 @@ go_library(
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/memsize",
"//pkg/sql/physicalplan",
"//pkg/sql/row",
"//pkg/sql/rowenc",
"//pkg/sql/rowinfra",
Expand Down
Loading

0 comments on commit 8e40f1c

Please sign in to comment.