From 6236309a53cb27cb7fdbca56689fc0c7e15d5fd7 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 6 Apr 2021 19:29:24 -0700 Subject: [PATCH] rowexec: optimize the inverted filterer Previously, we were ignoring the first return parameter of `prepareAddIndexRow` which indicates whether we should be adding a row for inverted expr evaluation which is suboptimal (essentially we were ignoring the result of prefiltering). This is now fixed. Additionally, this commit adds some comments and a sanity check around the usage of `batchedInvertedExprEvaluator`. Release note: None --- pkg/sql/rowexec/inverted_expr_evaluator.go | 16 ++++++++- pkg/sql/rowexec/inverted_filterer.go | 40 ++++++++++------------ pkg/sql/rowexec/inverted_filterer_test.go | 1 + 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/pkg/sql/rowexec/inverted_expr_evaluator.go b/pkg/sql/rowexec/inverted_expr_evaluator.go index 9a3ed45b81b3..3c68c7e2c6c8 100644 --- a/pkg/sql/rowexec/inverted_expr_evaluator.go +++ b/pkg/sql/rowexec/inverted_expr_evaluator.go @@ -288,7 +288,7 @@ type batchedInvertedExprEvaluator struct { nonInvertedPrefixes []roachpb.Key // Spans here are in sorted order and non-overlapping. fragmentedSpans []invertedSpanRoutingInfo - // The routing index computed by prepareAddIndexRow + // The routing index computed by prepareAddIndexRow. routingIndex int // Temporary state used during initialization. @@ -514,10 +514,24 @@ func (b *batchedInvertedExprEvaluator) prepareAddIndexRow( if encFull != nil { routingEnc = encFull } + // Find the first span that comes after the encoded routing value. i := sort.Search(len(b.fragmentedSpans), func(i int) bool { return bytes.Compare(b.fragmentedSpans[i].span.Start, routingEnc) > 0 }) + // Decrement by 1 so that now i tracks the index of the span that might + // contain the encoded routing value. i-- + if i < 0 { + // Negative index indicates that some assumptions are violated, return + // an assertion error in this case. + return false, errors.AssertionFailedf("unexpectedly negative routing index %d", i) + } + if bytes.Compare(b.fragmentedSpans[i].span.End, routingEnc) <= 0 { + return false, errors.AssertionFailedf( + "unexpectedly the end of the routing span %d is not greater "+ + "than encoded routing value", i, + ) + } b.routingIndex = i return b.prefilter(enc) } diff --git a/pkg/sql/rowexec/inverted_filterer.go b/pkg/sql/rowexec/inverted_filterer.go index f78fcbf12e8f..9331f8bde65a 100644 --- a/pkg/sql/rowexec/inverted_filterer.go +++ b/pkg/sql/rowexec/inverted_filterer.go @@ -39,9 +39,6 @@ const ( ifrEmittingRows ) -// TODO(sumeer): support pre-filtering, akin to the invertedJoiner, by passing -// relationship info and parameters in the spec and using it to construct a -// preFilterer. type invertedFilterer struct { execinfra.ProcessorBase runningState invertedFiltererState @@ -85,9 +82,8 @@ func newInvertedFilterer( }, } - // The RowContainer columns are the PK columns, that are the columns - // other than the inverted column. The output has the same types as - // the input. + // The RowContainer columns are all columns other than the inverted column. + // The output has the same types as the input. outputColTypes := input.OutputTypes() rcColTypes := make([]*types.T, len(outputColTypes)-1) copy(rcColTypes, outputColTypes[:ifr.invertedColIdx]) @@ -210,18 +206,7 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr row[i].Datum = tree.DNull } } - // Transform to keyRow. - copy(ifr.keyRow, row[:ifr.invertedColIdx]) - copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:]) - // Add the primary key in the row to the row container. The first column in - // the inverted index is the value that was indexed, and the remaining are - // the primary key columns. - keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow) - if err != nil { - ifr.MoveToDraining(err) - return ifrStateUnknown, ifr.DrainHelper() - } // Add to the evaluator. // // NB: Inverted columns are custom encoded in a manner that does not @@ -247,13 +232,26 @@ func (ifr *invertedFilterer) readInput() (invertedFiltererState, *execinfrapb.Pr } enc = []byte(*row[ifr.invertedColIdx].Datum.(*tree.DBytes)) } - if _, err = ifr.invertedEval.prepareAddIndexRow(enc, nil /* encFull */); err != nil { + shouldAdd, err := ifr.invertedEval.prepareAddIndexRow(enc, nil /* encFull */) + if err != nil { ifr.MoveToDraining(err) return ifrStateUnknown, ifr.DrainHelper() } - if err = ifr.invertedEval.addIndexRow(keyIndex); err != nil { - ifr.MoveToDraining(err) - return ifrStateUnknown, ifr.DrainHelper() + if shouldAdd { + // Transform to keyRow which is everything other than the inverted + // column and then add it to the row container and the inverted expr + // evaluator. + copy(ifr.keyRow, row[:ifr.invertedColIdx]) + copy(ifr.keyRow[ifr.invertedColIdx:], row[ifr.invertedColIdx+1:]) + keyIndex, err := ifr.rc.AddRow(ifr.Ctx, ifr.keyRow) + if err != nil { + ifr.MoveToDraining(err) + return ifrStateUnknown, ifr.DrainHelper() + } + if err = ifr.invertedEval.addIndexRow(keyIndex); err != nil { + ifr.MoveToDraining(err) + return ifrStateUnknown, ifr.DrainHelper() + } } return ifrReadingInput, nil } diff --git a/pkg/sql/rowexec/inverted_filterer_test.go b/pkg/sql/rowexec/inverted_filterer_test.go index 58c7424b246b..9b35a4ea7c21 100644 --- a/pkg/sql/rowexec/inverted_filterer_test.go +++ b/pkg/sql/rowexec/inverted_filterer_test.go @@ -44,6 +44,7 @@ func TestInvertedFilterer(t *testing.T) { // done in helpers called by invertedFilterer that have their own // comprehensive tests. The intersection intersects the spans for the // inverted column values 1 and 3. + // TODO(yuzefovich): add some unit tests that prefiltering works. testCases := []ProcessorTestCase{ { Name: "simple-intersection-and-onexpr",