Skip to content

Commit

Permalink
fix: The aggs result of NestedAggregator with sub NestedAggregator ma…
Browse files Browse the repository at this point in the history
…y be not accurately

Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
kkewwei committed Apr 26, 2024
1 parent b4692c8 commit 9c0784d
Show file tree
Hide file tree
Showing 3 changed files with 373 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.BitSet;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.ParseField;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ObjectMapper;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -88,12 +90,24 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA
) throws IOException {
super(name, factories, context, parent, cardinality, metadata);

Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
Query parentFilter = isParent(parentObjectMapper, childObjectMapper, context.mapperService())
? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter);
this.childFilter = childObjectMapper.nestedTypeFilter();
this.collectsFromSingleBucket = cardinality.map(estimate -> estimate < 2);
}

private boolean isParent(ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper, MapperService mapperService) {
if (parentObjectMapper == null) {
return false;
}
ObjectMapper parent;
do {
parent = childObjectMapper.getParentObjectMapper(mapperService);
} while (parent != null && parent != parentObjectMapper);
return parentObjectMapper == parent;
}

@Override
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(ctx);
Expand All @@ -108,19 +122,16 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// parentDoc can be 0 when searching:
if (parentDocs == null || childDocs == null) {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getChildAndRootParent(parentDocs, childDocs, parentDoc);
int currentRootDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
for (; childDocId < currentRootDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
}
}
Expand All @@ -130,6 +141,29 @@ public void collect(int parentDoc, long bucket) throws IOException {
}
}

static Tuple<Integer, Integer> getChildAndRootParent(BitSet parentDocs, DocIdSetIterator childDocs, int parentDoc) throws IOException {
int currentRootDoc;
int prevParentDoc = parentDocs.prevSetBit(parentDoc);
if (prevParentDoc == -1) {
currentRootDoc = parentDocs.nextSetBit(0);
} else if (prevParentDoc == parentDoc) {
currentRootDoc = parentDoc;
if (currentRootDoc == 0) {
prevParentDoc = -1;
} else {
prevParentDoc = parentDocs.prevSetBit(currentRootDoc - 1);
}
} else {
currentRootDoc = parentDocs.nextSetBit(prevParentDoc + 1);
}

int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
return Tuple.tuple(currentRootDoc, childDocId);
}

@Override
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {
super.preGetSubLeafCollectors(ctx);
Expand Down Expand Up @@ -191,9 +225,8 @@ public void setScorer(Scorable scorer) throws IOException {

@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// parentDoc can be 0 when searching:
if (parentDocs == null || childDocs == null) {
return;
}

Expand All @@ -214,13 +247,11 @@ void processBufferedChildBuckets() throws IOException {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(currentParentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getChildAndRootParent(parentDocs, childDocs, currentParentDoc);
int currentRootDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
for (; childDocId < currentRootDoc; childDocId = childDocs.nextDoc()) {
cachedScorer.doc = childDocId;
for (var bucket : bucketBuffer) {
collectBucket(sub, childDocId, bucket);
Expand Down
Loading

0 comments on commit 9c0784d

Please sign in to comment.