Skip to content

Commit

Permalink
ESQL: Load more than one field at once (elastic#102192)
Browse files Browse the repository at this point in the history
This modifies ESQL to load a list of fields at one time which is especially
effective when loading from stored fields or _source because it allows
visiting the stored fields one time.

Part of elastic#101322
  • Loading branch information
nik9000 authored Nov 20, 2023
1 parent 4567d39 commit fd300cf
Show file tree
Hide file tree
Showing 57 changed files with 2,727 additions and 1,161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

package org.elasticsearch.benchmark.compute.operator;

import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
Expand All @@ -19,6 +22,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector;
Expand All @@ -30,14 +34,16 @@
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.BlockReaderFactories;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.lookup.SearchLookup;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -56,7 +62,9 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -93,18 +101,113 @@ public class ValuesSourceReaderBenchmark {
}
}

private static BlockLoader blockLoader(String name) {
private static List<ValuesSourceReaderOperator.FieldInfo> fields(String name) {
return switch (name) {
case "long" -> numericBlockLoader(name, NumberFieldMapper.NumberType.LONG);
case "int" -> numericBlockLoader(name, NumberFieldMapper.NumberType.INTEGER);
case "double" -> numericBlockLoader(name, NumberFieldMapper.NumberType.DOUBLE);
case "keyword" -> new KeywordFieldMapper.KeywordFieldType(name).blockLoader(null);
default -> throw new IllegalArgumentException("can't read [" + name + "]");
case "3_stored_keywords" -> List.of(
new ValuesSourceReaderOperator.FieldInfo("keyword_1", List.of(blockLoader("stored_keyword_1"))),
new ValuesSourceReaderOperator.FieldInfo("keyword_2", List.of(blockLoader("stored_keyword_2"))),
new ValuesSourceReaderOperator.FieldInfo("keyword_3", List.of(blockLoader("stored_keyword_3")))
);
default -> List.of(new ValuesSourceReaderOperator.FieldInfo(name, List.of(blockLoader(name))));
};
}

private static BlockLoader numericBlockLoader(String name, NumberFieldMapper.NumberType numberType) {
return new NumberFieldMapper.NumberFieldType(name, numberType).blockLoader(null);
enum Where {
DOC_VALUES,
SOURCE,
STORED;
}

private static BlockLoader blockLoader(String name) {
Where where = Where.DOC_VALUES;
if (name.startsWith("stored_")) {
name = name.substring("stored_".length());
where = Where.STORED;
} else if (name.startsWith("source_")) {
name = name.substring("source_".length());
where = Where.SOURCE;
}
switch (name) {
case "long":
return numericBlockLoader(name, where, NumberFieldMapper.NumberType.LONG);
case "int":
return numericBlockLoader(name, where, NumberFieldMapper.NumberType.INTEGER);
case "double":
return numericBlockLoader(name, where, NumberFieldMapper.NumberType.DOUBLE);
case "keyword":
name = "keyword_1";
}
if (name.startsWith("keyword")) {
boolean syntheticSource = false;
FieldType ft = new FieldType(KeywordFieldMapper.Defaults.FIELD_TYPE);
switch (where) {
case DOC_VALUES:
break;
case SOURCE:
ft.setDocValuesType(DocValuesType.NONE);
break;
case STORED:
ft.setStored(true);
ft.setDocValuesType(DocValuesType.NONE);
syntheticSource = true;
break;
}
ft.freeze();
return new KeywordFieldMapper.KeywordFieldType(
name,
ft,
Lucene.KEYWORD_ANALYZER,
Lucene.KEYWORD_ANALYZER,
Lucene.KEYWORD_ANALYZER,
new KeywordFieldMapper.Builder(name, IndexVersion.current()).docValues(ft.docValuesType() != DocValuesType.NONE),
syntheticSource
).blockLoader(new MappedFieldType.BlockLoaderContext() {
@Override
public String indexName() {
return "benchmark";
}

@Override
public SearchLookup lookup() {
throw new UnsupportedOperationException();
}

@Override
public Set<String> sourcePaths(String name) {
return Set.of(name);
}
});
}
throw new IllegalArgumentException("can't read [" + name + "]");
}

private static BlockLoader numericBlockLoader(String name, Where where, NumberFieldMapper.NumberType numberType) {
boolean stored = false;
boolean docValues = true;
switch (where) {
case DOC_VALUES:
break;
case SOURCE:
stored = true;
docValues = false;
break;
case STORED:
throw new UnsupportedOperationException();
}
return new NumberFieldMapper.NumberFieldType(
name,
numberType,
true,
stored,
docValues,
true,
null,
Map.of(),
null,
false,
null,
null
).blockLoader(null);
}

/**
Expand All @@ -122,7 +225,7 @@ private static BlockLoader numericBlockLoader(String name, NumberFieldMapper.Num
@Param({ "in_order", "shuffled", "shuffled_singles" })
public String layout;

@Param({ "long", "int", "double", "keyword" })
@Param({ "long", "int", "double", "keyword", "stored_keyword", "3_stored_keywords" })
public String name;

private Directory directory;
Expand All @@ -134,9 +237,9 @@ private static BlockLoader numericBlockLoader(String name, NumberFieldMapper.Num
public void benchmark() {
ValuesSourceReaderOperator op = new ValuesSourceReaderOperator(
BlockFactory.getNonBreakingInstance(),
List.of(BlockReaderFactories.loaderToFactory(reader, blockLoader(name))),
0,
name
fields(name),
List.of(reader),
0
);
long sum = 0;
for (Page page : pages) {
Expand All @@ -160,7 +263,7 @@ public void benchmark() {
sum += (long) values.getDouble(p);
}
}
case "keyword" -> {
case "keyword", "stored_keyword" -> {
BytesRef scratch = new BytesRef();
BytesRefVector values = op.getOutput().<BytesRefBlock>getBlock(1).asVector();
for (int p = 0; p < values.getPositionCount(); p++) {
Expand All @@ -170,21 +273,59 @@ public void benchmark() {
sum += Integer.parseInt(r.utf8ToString());
}
}
case "3_stored_keywords" -> {
BytesRef scratch = new BytesRef();
Page out = op.getOutput();
for (BytesRefVector values : new BytesRefVector[] {
out.<BytesRefBlock>getBlock(1).asVector(),
out.<BytesRefBlock>getBlock(2).asVector(),
out.<BytesRefBlock>getBlock(3).asVector() }) {

for (int p = 0; p < values.getPositionCount(); p++) {
BytesRef r = values.getBytesRef(p, scratch);
r.offset++;
r.length--;
sum += Integer.parseInt(r.utf8ToString());
}
}
}
}
}
long expected;
if (name.equals("keyword")) {
expected = 0;
for (int i = 0; i < INDEX_SIZE; i++) {
expected += i % 1000;
}
} else {
expected = INDEX_SIZE;
expected = expected * (expected - 1) / 2;
long expected = 0;
switch (name) {
case "keyword", "stored_keyword":
for (int i = 0; i < INDEX_SIZE; i++) {
expected += i % 1000;
}
break;
case "3_stored_keywords":
for (int i = 0; i < INDEX_SIZE; i++) {
expected += 3 * (i % 1000);
}
break;
default:
expected = INDEX_SIZE;
expected = expected * (expected - 1) / 2;
}
if (expected != sum) {
throw new AssertionError("[" + layout + "][" + name + "] expected [" + expected + "] but was [" + sum + "]");
}
boolean foundStoredFieldLoader = false;
ValuesSourceReaderOperator.Status status = (ValuesSourceReaderOperator.Status) op.status();
for (Map.Entry<String, Integer> e : status.readersBuilt().entrySet()) {
if (e.getKey().indexOf("stored_fields") >= 0) {
foundStoredFieldLoader = true;
}
}
if (name.indexOf("stored") >= 0) {
if (foundStoredFieldLoader == false) {
throw new AssertionError("expected to use a stored field loader but only had: " + status.readersBuilt());
}
} else {
if (foundStoredFieldLoader) {
throw new AssertionError("expected not to use a stored field loader but only had: " + status.readersBuilt());
}
}
}

@Setup
Expand All @@ -195,15 +336,23 @@ public void setup() throws IOException {

private void setupIndex() throws IOException {
directory = new ByteBuffersDirectory();
FieldType keywordFieldType = new FieldType(KeywordFieldMapper.Defaults.FIELD_TYPE);
keywordFieldType.setStored(true);
keywordFieldType.freeze();
try (IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE))) {
for (int i = 0; i < INDEX_SIZE; i++) {
String c = Character.toString('a' - ((i % 1000) % 26) + 26);
iw.addDocument(
List.of(
new NumericDocValuesField("long", i),
new StoredField("long", i),
new NumericDocValuesField("int", i),
new StoredField("int", i),
new NumericDocValuesField("double", NumericUtils.doubleToSortableLong(i)),
new KeywordFieldMapper.KeywordField("keyword", new BytesRef(c + i % 1000), KeywordFieldMapper.Defaults.FIELD_TYPE)
new StoredField("double", (double) i),
new KeywordFieldMapper.KeywordField("keyword_1", new BytesRef(c + i % 1000), keywordFieldType),
new KeywordFieldMapper.KeywordField("keyword_2", new BytesRef(c + i % 1000), keywordFieldType),
new KeywordFieldMapper.KeywordField("keyword_3", new BytesRef(c + i % 1000), keywordFieldType)
)
);
if (i % COMMIT_INTERVAL == 0) {
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/102192.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 102192
summary: "ESQL: Load more than one field at once"
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,9 @@ public Query phrasePrefixQuery(TokenStream stream, int slop, int maxExpansions,
@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (textFieldType.isSyntheticSource()) {
return BlockStoredFieldsReader.bytesRefsFromStrings(storedFieldNameForSyntheticSource());
return new BlockStoredFieldsReader.BytesFromStringsBlockLoader(storedFieldNameForSyntheticSource());
}
return BlockSourceReader.bytesRefs(SourceValueFetcher.toString(blContext.sourcePaths(name())));
return new BlockSourceReader.BytesRefsBlockLoader(SourceValueFetcher.toString(blContext.sourcePaths(name())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,13 @@ public Query rangeQuery(
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (indexMode == IndexMode.TIME_SERIES && metricType == TimeSeriesParams.MetricType.COUNTER) {
// Counters are not supported by ESQL so we load them in null
return BlockDocValuesReader.nulls();
return BlockLoader.CONSTANT_NULLS;
}
if (hasDocValues()) {
double scalingFactorInverse = 1d / scalingFactor;
return BlockDocValuesReader.doubles(name(), l -> l * scalingFactorInverse);
return new BlockDocValuesReader.DoublesBlockLoader(name(), l -> l * scalingFactorInverse);
}
return BlockSourceReader.doubles(sourceValueFetcher(blContext.sourcePaths(name())));
return new BlockSourceReader.DoublesBlockLoader(sourceValueFetcher(blContext.sourcePaths(name())));
}

@Override
Expand Down
Loading

0 comments on commit fd300cf

Please sign in to comment.