Skip to content

Commit

Permalink
rowexec: optimize the inverted filterer
Browse files Browse the repository at this point in the history
Previously, we were ignoring the first return parameter of
`prepareAddIndexRow` which indicates whether we should be adding a row
for inverted expr evaluation which is suboptimal (essentially we were
ignoring the result of prefiltering). This is now fixed.

Additionally, this commit adds some comments and a sanity check around
the usage of `batchedInvertedExprEvaluator`.

Release note: None
  • Loading branch information
yuzefovich committed Apr 15, 2021
1 parent 35b3464 commit 6236309
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 22 deletions.
16 changes: 15 additions & 1 deletion pkg/sql/rowexec/inverted_expr_evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ type batchedInvertedExprEvaluator struct {
nonInvertedPrefixes []roachpb.Key
// Spans here are in sorted order and non-overlapping.
fragmentedSpans []invertedSpanRoutingInfo
// The routing index computed by prepareAddIndexRow
// The routing index computed by prepareAddIndexRow.
routingIndex int

// Temporary state used during initialization.
Expand Down Expand Up @@ -514,10 +514,24 @@ func (b *batchedInvertedExprEvaluator) prepareAddIndexRow(
if encFull != nil {
routingEnc = encFull
}
// Find the first span that comes after the encoded routing value.
i := sort.Search(len(b.fragmentedSpans), func(i int) bool {
return bytes.Compare(b.fragmentedSpans[i].span.Start, routingEnc) > 0
})
// Decrement by 1 so that now i tracks the index of the span that might
// contain the encoded routing value.
i--
if i < 0 {
// Negative index indicates that some assumptions are violated, return
// an assertion error in this case.
return false, errors.AssertionFailedf("unexpectedly negative routing index %d", i)
}
if bytes.Compare(b.fragmentedSpans[i].span.End, routingEnc) <= 0 {
return false, errors.AssertionFailedf(
"unexpectedly the end of the routing span %d is not greater "+
"than encoded routing value", i,
)
}
b.routingIndex = i
return b.prefilter(enc)
}
Expand Down
40 changes: 19 additions & 21 deletions pkg/sql/rowexec/inverted_filterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ const (
ifrEmittingRows
)

// TODO(sumeer): support pre-filtering, akin to the invertedJoiner, by passing
// relationship info and parameters in the spec and using it to construct a
// preFilterer.
type invertedFilterer struct {
execinfra.ProcessorBase
runningState invertedFiltererState
Expand Down Expand Up @@ -85,9 +82,8 @@ func newInvertedFilterer(
},
}

// The RowContainer columns are the PK columns, that are the columns
// other than the inverted column. The output has the same types as
// the input.
// The RowContainer columns are all columns other than the inverted column.
// The output has the same types as the input.
outputColTypes := input.OutputTypes()
rcColTypes := make([]*types.T, len(outputColTypes)-1)
copy(rcColTypes, outputColTypes[:ifr.invertedColIdx])
Expand Down Expand Up @@ -210,18 +206,7 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr
row[i].Datum = tree.DNull
}
}
// Transform to keyRow.
copy(ifr.keyRow, row[:ifr.invertedColIdx])
copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:])

// Add the primary key in the row to the row container. The first column in
// the inverted index is the value that was indexed, and the remaining are
// the primary key columns.
keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow)
if err != nil {
ifr.MoveToDraining(err)
return ifrStateUnknown, ifr.DrainHelper()
}
// Add to the evaluator.
//
// NB: Inverted columns are custom encoded in a manner that does not
Expand All @@ -247,13 +232,26 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr
}
enc = []byte(*row[ifr.invertedColIdx].Datum.(*tree.DBytes))
}
if _, err = ifr.invertedEval.prepareAddIndexRow(enc, nil /* encFull */); err != nil {
shouldAdd, err := ifr.invertedEval.prepareAddIndexRow(enc, nil /* encFull */)
if err != nil {
ifr.MoveToDraining(err)
return ifrStateUnknown, ifr.DrainHelper()
}
if err = ifr.invertedEval.addIndexRow(keyIndex); err != nil {
ifr.MoveToDraining(err)
return ifrStateUnknown, ifr.DrainHelper()
if shouldAdd {
// Transform to keyRow which is everything other than the inverted
// column and then add it to the row container and the inverted expr
// evaluator.
copy(ifr.keyRow, row[:ifr.invertedColIdx])
copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:])
keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow)
if err != nil {
ifr.MoveToDraining(err)
return ifrStateUnknown, ifr.DrainHelper()
}
if err = ifr.invertedEval.addIndexRow(keyIndex); err != nil {
ifr.MoveToDraining(err)
return ifrStateUnknown, ifr.DrainHelper()
}
}
return ifrReadingInput, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/inverted_filterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestInvertedFilterer(t *testing.T) {
// done in helpers called by invertedFilterer that have their own
// comprehensive tests. The intersection intersects the spans for the
// inverted column values 1 and 3.
// TODO(yuzefovich): add some unit tests that prefiltering works.
testCases := []ProcessorTestCase{
{
Name: "simple-intersection-and-onexpr",
Expand Down

0 comments on commit 6236309

Please sign in to comment.