Skip to content

Commit

Permalink
Syntax support and operator for count all
Browse files Browse the repository at this point in the history
Wip
  • Loading branch information
costin committed Sep 14, 2023
1 parent c84f20b commit 959254d
Show file tree
Hide file tree
Showing 14 changed files with 1,224 additions and 706 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.lucene;

import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.search.internal.SearchContext;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Function;

/**
* Source operator that incrementally counts the results in Lucene searches
*/
public class LuceneCountOperator extends LuceneOperator {

private int totalHits = 0;
private int remainingDocs;

private IntVector.Builder docsBuilder;
private final LeafCollector leafCollector;

public static class Factory implements LuceneOperator.Factory {
private final DataPartitioning dataPartitioning;
private final int taskConcurrency;
private final int limit;
private final LuceneSliceQueue sliceQueue;

public Factory(
List<SearchContext> searchContexts,
Function<SearchContext, Query> queryFunction,
DataPartitioning dataPartitioning,
int taskConcurrency,
int limit
) {
this.limit = limit;
this.dataPartitioning = dataPartitioning;
var weightFunction = weightFunction(queryFunction, ScoreMode.COMPLETE_NO_SCORES);
this.sliceQueue = LuceneSliceQueue.create(searchContexts, weightFunction, dataPartitioning, taskConcurrency);
this.taskConcurrency = Math.min(sliceQueue.totalSlices(), taskConcurrency);
}

@Override
public SourceOperator get(DriverContext driverContext) {
return new LuceneCountOperator(sliceQueue, limit);
}

@Override
public int taskConcurrency() {
return taskConcurrency;
}

public int limit() {
return limit;
}

@Override
public String describe() {
return "LuceneCountOperator[dataPartitioning = "
+ dataPartitioning
+ ", limit = "
+ limit
+ "]";
}
}

public LuceneCountOperator(LuceneSliceQueue sliceQueue, int limit) {
super(Integer.MAX_VALUE, sliceQueue);
this.remainingDocs = limit;
this.leafCollector = new LeafCollector() {
@Override
public void setScorer(Scorable scorer) {}

@Override
public void collect(int doc) {
if (remainingDocs > 0) {
--remainingDocs;
totalHits++;
}
}
};
}

@Override
public boolean isFinished() {
return doneCollecting;
}

@Override
public void finish() {
doneCollecting = true;
}

@Override
public Page getOutput() {
if (isFinished()) {
assert remainingDocs == 0 : remainingDocs;
return null;
}
try {
final LuceneScorer scorer = getCurrentOrLoadNextScorer();
if (scorer == null) {
return null;
}
Weight weight = scorer.weight;
var leafReaderContext = scorer.leafReaderContext();
// see org.apache.lucene.search.TotalHitCountCollector
int leafCount = weight == null ? -1 : weight.count(leafReaderContext);
if (leafCount != -1) {
// check to not go over limit
var count = Math.min(leafCount, remainingDocs);
totalHits += count;
remainingDocs -= count;
} else {
scorer.scoreNextRange(
leafCollector,
leafReaderContext.reader().getLiveDocs(),
remainingDocs
);
}

Page page = null;
if (remainingDocs <= 0 || scorer.isDone()) {
pagesEmitted++;
page = new Page(1, IntBlock.newConstantBlockWith(totalHits, 1));
}
return page;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public interface Factory extends SourceOperator.SourceOperatorFactory {

@Override
public void close() {

}

LuceneScorer getCurrentOrLoadNextScorer() {
Expand Down Expand Up @@ -97,7 +96,7 @@ LuceneScorer getCurrentOrLoadNextScorer() {
static final class LuceneScorer {
private final int shardIndex;
private final SearchContext searchContext;
private final Weight weight;
final Weight weight;
private final LeafReaderContext leafReaderContext;

private BulkScorer bulkScorer;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.lucene;

import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.AnyOperatorTestCase;
import org.elasticsearch.compute.operator.Driver;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.OperatorTestCase;
import org.elasticsearch.compute.operator.PageConsumerOperator;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import org.junit.After;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class LuceneCountOperatorTests extends AnyOperatorTestCase {
private Directory directory = newDirectory();
private IndexReader reader;

@After
public void closeIndex() throws IOException {
IOUtils.close(reader, directory);
}

@Override
protected LuceneCountOperator.Factory simple(BigArrays bigArrays) {
return simple(bigArrays, randomFrom(DataPartitioning.values()), between(1, 10_000), 100);
}

private LuceneCountOperator.Factory simple(BigArrays bigArrays, DataPartitioning dataPartitioning, int numDocs, int limit) {
int commitEvery = Math.max(1, numDocs / 10);
try (
RandomIndexWriter writer = new RandomIndexWriter(
random(),
directory,
newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE)
)
) {
for (int d = 0; d < numDocs; d++) {
List<IndexableField> doc = new ArrayList<>();
doc.add(new SortedNumericDocValuesField("s", d));
writer.addDocument(doc);
if (d % commitEvery == 0) {
writer.commit();
}
}
reader = writer.getReader();
} catch (IOException e) {
throw new RuntimeException(e);
}

SearchContext ctx = mockSearchContext(reader);
SearchExecutionContext ectx = mock(SearchExecutionContext.class);
when(ctx.getSearchExecutionContext()).thenReturn(ectx);
when(ectx.getIndexReader()).thenReturn(reader);
Function<SearchContext, Query> queryFunction = c -> new MatchAllDocsQuery();
return new LuceneCountOperator.Factory(List.of(ctx), queryFunction, dataPartitioning, 1, limit);
}

@Override
protected String expectedToStringOfSimple() {
assumeFalse("can't support variable maxPageSize", true); // TODO allow testing this
return "LuceneCountOperator[shardId=0, maxPageSize=**random**]";
}

@Override
protected String expectedDescriptionOfSimple() {
assumeFalse("can't support variable maxPageSize", true); // TODO allow testing this
return """
LuceneCountOperator[dataPartitioning = SHARD, maxPageSize = **random**, limit = 100, sorts = [{"s":{"order":"asc"}}]]""";
}

// TODO tests for the other data partitioning configurations

public void testShardDataPartitioning() {
int size = between(1_000, 20_000);
int limit = between(10, size);
testSimple(size, limit);
}

public void testEmpty() {
testSimple(0, between(10, 10_000));
}

private void testSimple(int size, int limit) {
DriverContext ctx = new DriverContext();
LuceneCountOperator.Factory factory = simple(nonBreakingBigArrays(), DataPartitioning.SHARD, size, limit);

List<Page> results = new ArrayList<>();
OperatorTestCase.runDriver(
new Driver(ctx, factory.get(ctx), List.of(), new PageConsumerOperator(page -> results.add(page)), () -> {})
);
OperatorTestCase.assertDriverContext(ctx);

for (Page page : results) {
assertThat(page.getPositionCount(), is(1));
}

for (Page page : results) {
LongBlock sBlock = page.getBlock(1);
for (int p = 0; p < page.getPositionCount(); p++) {
assertThat(sBlock.getLong(sBlock.getFirstValueIndex(p)), both(greaterThanOrEqualTo(0L)).and(lessThan((long) size)));
}
}
assertThat(results, hasSize(1));
}

/**
* Creates a mock search context with the given index reader.
* The returned mock search context can be used to test with {@link LuceneOperator}.
*/
public static SearchContext mockSearchContext(IndexReader reader) {
try {
ContextIndexSearcher searcher = new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
TrivialQueryCachingPolicy.NEVER,
true
);
SearchContext searchContext = mock(SearchContext.class);
when(searchContext.searcher()).thenReturn(searcher);
return searchContext;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
6 changes: 5 additions & 1 deletion x-pack/plugin/esql/src/main/antlr/EsqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ operatorExpression
primaryExpression
: constant #constantDefault
| qualifiedName #dereference
| functionExpression #function
| LP booleanExpression RP #parenthesizedExpression
| identifier LP (booleanExpression (COMMA booleanExpression)*)? RP #functionExpression
;

functionExpression
: identifier LP (ASTERISK | (booleanExpression (COMMA booleanExpression)*))? RP
;

rowCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ private FunctionDefinition[][] functions() {

@Override
protected String normalize(String name) {
return normalizeName(name);
}

public static String normalizeName(String name) {
return name.toLowerCase(Locale.ROOT);
}
}

Large diffs are not rendered by default.

Loading

0 comments on commit 959254d

Please sign in to comment.