Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rowexec: optimize the inverted filterer #63196

Merged
merged 1 commit into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion pkg/sql/rowexec/inverted_expr_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ type batchedInvertedExprEvaluator struct {
nonInvertedPrefixes []roachpb.Key
// Spans here are in sorted order and non-overlapping.
fragmentedSpans []invertedSpanRoutingInfo
// The routing index computed by prepareAddIndexRow
// The routing index computed by prepareAddIndexRow.
routingIndex int

// Temporary state used during initialization.
Expand Down Expand Up @@ -514,10 +514,24 @@ func (b *batchedInvertedExprEvaluator) prepareAddIndexRow(
if encFull != nil {
routingEnc = encFull
}
// Find the first span that comes after the encoded routing value.
i := sort.Search(len(b.fragmentedSpans), func(i int) bool {
return bytes.Compare(b.fragmentedSpans[i].span.Start, routingEnc) > 0
})
// Decrement by 1 so that now i tracks the index of the span that might
// contain the encoded routing value.
i--
if i < 0 {
// Negative index indicates that some assumptions are violated, return
// an assertion error in this case.
return false, errors.AssertionFailedf("unexpectedly negative routing index %d", i)
}
if bytes.Compare(b.fragmentedSpans[i].span.End, routingEnc) <= 0 {
return false, errors.AssertionFailedf(
"unexpectedly the end of the routing span %d is not greater "+
"than encoded routing value", i,
)
}
b.routingIndex = i
return b.prefilter(enc)
}
Expand Down
40 changes: 19 additions & 21 deletions pkg/sql/rowexec/inverted_filterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ const (
ifrEmittingRows
)

// TODO(sumeer): support pre-filtering, akin to the invertedJoiner, by passing
// relationship info and parameters in the spec and using it to construct a
// preFilterer.
type invertedFilterer struct {
execinfra.ProcessorBase
runningState invertedFiltererState
Expand Down Expand Up @@ -85,9 +82,8 @@ func newInvertedFilterer(
},
}

// The RowContainer columns are the PK columns, that are the columns
// other than the inverted column. The output has the same types as
// the input.
// The RowContainer columns are all columns other than the inverted column.
// The output has the same types as the input.
outputColTypes := input.OutputTypes()
rcColTypes := make([]*types.T, len(outputColTypes)-1)
copy(rcColTypes, outputColTypes[:ifr.invertedColIdx])
Expand Down Expand Up @@ -210,18 +206,7 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr
row[i].Datum = tree.DNull
}
}
// Transform to keyRow.
copy(ifr.keyRow, row[:ifr.invertedColIdx])
copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:])

// Add the primary key in the row to the row container. The first column in
// the inverted index is the value that was indexed, and the remaining are
// the primary key columns.
keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow)
if err != nil {
ifr.MoveToDraining(err)
return ifrStateUnknown, ifr.DrainHelper()
}
// Add to the evaluator.
//
// NB: Inverted columns are custom encoded in a manner that does not
Expand All @@ -247,13 +232,26 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr
}
enc = []byte(*row[ifr.invertedColIdx].Datum.(*tree.DBytes))
}
if _, err = ifr.invertedEval.prepareAddIndexRow(enc, nil /* encFull */); err != nil {
shouldAdd, err := ifr.invertedEval.prepareAddIndexRow(enc, nil /* encFull */)
if err != nil {
ifr.MoveToDraining(err)
return ifrStateUnknown, ifr.DrainHelper()
}
if err = ifr.invertedEval.addIndexRow(keyIndex); err != nil {
ifr.MoveToDraining(err)
return ifrStateUnknown, ifr.DrainHelper()
if shouldAdd {
// Transform to keyRow which is everything other than the inverted
// column and then add it to the row container and the inverted expr
// evaluator.
copy(ifr.keyRow, row[:ifr.invertedColIdx])
copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:])
keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow)
if err != nil {
ifr.MoveToDraining(err)
return ifrStateUnknown, ifr.DrainHelper()
}
if err = ifr.invertedEval.addIndexRow(keyIndex); err != nil {
ifr.MoveToDraining(err)
return ifrStateUnknown, ifr.DrainHelper()
}
}
return ifrReadingInput, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/inverted_filterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestInvertedFilterer(t *testing.T) {
// done in helpers called by invertedFilterer that have their own
// comprehensive tests. The intersection intersects the spans for the
// inverted column values 1 and 3.
// TODO(yuzefovich): add some unit tests that prefiltering works.
testCases := []ProcessorTestCase{
{
Name: "simple-intersection-and-onexpr",
Expand Down