Skip to content

Commit

Permalink
Added warnings collection in EnrichQuerySourceOperator
Browse files Browse the repository at this point in the history
  • Loading branch information
craigtaverner committed Nov 8, 2024
1 parent d1aa099 commit 498adc3
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ static TransportVersion def(int id) {
public static final TransportVersion LOGSDB_TELEMETRY = def(8_784_00_0);
public static final TransportVersion LOGSDB_TELEMETRY_STATS = def(8_785_00_0);
public static final TransportVersion KQL_QUERY_ADDED = def(8_786_00_0);
public static final TransportVersion ESQL_ENRICH_RUNTIME_WARNINGS = def(8_787_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.core.Releasables;

import java.io.IOException;
Expand All @@ -38,17 +39,25 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
private int queryPosition = -1;
private final IndexReader indexReader;
private final IndexSearcher searcher;
private final Warnings warnings;
private final int maxPageSize;

// using smaller pages enables quick cancellation and reduces sorting costs
public static final int DEFAULT_MAX_PAGE_SIZE = 256;

public EnrichQuerySourceOperator(BlockFactory blockFactory, int maxPageSize, QueryList queryList, IndexReader indexReader) {
public EnrichQuerySourceOperator(
BlockFactory blockFactory,
int maxPageSize,
QueryList queryList,
IndexReader indexReader,
Warnings warnings
) {
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
this.queryList = queryList;
this.indexReader = indexReader;
this.searcher = new IndexSearcher(indexReader);
this.warnings = warnings;
}

@Override
Expand All @@ -73,12 +82,18 @@ public Page getOutput() {
}
int totalMatches = 0;
do {
Query query = nextQuery();
if (query == null) {
assert isFinished();
break;
Query query;
try {
query = nextQuery();
if (query == null) {
assert isFinished();
break;
}
query = searcher.rewrite(new ConstantScoreQuery(query));
} catch (Exception e) {
warnings.registerException(e);
continue;
}
query = searcher.rewrite(new ConstantScoreQuery(query));
final var weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
if (weight == null) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -120,7 +122,8 @@ public void testQueries() throws Exception {
// 3 -> [] -> []
// 4 -> [a1] -> [3]
// 5 -> [] -> []
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader);
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, 128, queryList, reader, warnings);
Page p0 = queryOperator.getOutput();
assertNotNull(p0);
assertThat(p0.getPositionCount(), equalTo(6));
Expand Down Expand Up @@ -187,7 +190,8 @@ public void testRandomMatchQueries() throws Exception {
MappedFieldType uidField = new KeywordFieldMapper.KeywordFieldType("uid");
var queryList = QueryList.rawTermQueryList(uidField, mock(SearchExecutionContext.class), inputTerms);
int maxPageSize = between(1, 256);
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader);
var warnings = Warnings.createWarnings(DriverContext.WarningsMode.IGNORE, 0, 0, "test enrich");
EnrichQuerySourceOperator queryOperator = new EnrichQuerySourceOperator(blockFactory, maxPageSize, queryList, reader, warnings);
Map<Integer, Set<Integer>> actualPositions = new HashMap<>();
while (queryOperator.isFinished() == false) {
Page page = queryOperator.getOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public void testLookupIndex() throws IOException {
DataType.KEYWORD,
"lookup",
"data",
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG)))
List.of(new Alias(Source.EMPTY, "l", new ReferenceAttribute(Source.EMPTY, "l", DataType.LONG))),
Source.EMPTY
);
DriverContext driverContext = driverContext();
try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public enum Cap {
RANGEQUERY_FOR_DATETIME,

/**
* Enforce strict type checking on ENRICH range types.
* Enforce strict type checking on ENRICH range types, and warnings for KEYWORD parsing at runtime. Done in #115091.
*/
ENRICH_STRICT_RANGE_TYPES,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.OutputOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.compute.operator.lookup.EnrichQuerySourceOperator;
import org.elasticsearch.compute.operator.lookup.MergePositionsOperator;
import org.elasticsearch.compute.operator.lookup.QueryList;
Expand Down Expand Up @@ -77,6 +78,7 @@
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand Down Expand Up @@ -326,11 +328,18 @@ private void doLookup(T request, CancellableTask task, ActionListener<Page> list
releasables.add(mergePositionsOperator);
SearchExecutionContext searchExecutionContext = searchContext.getSearchExecutionContext();
QueryList queryList = queryList(request, searchExecutionContext, inputBlock, request.inputDataType);
var warnings = Warnings.createWarnings(
DriverContext.WarningsMode.COLLECT,
request.source.source().getLineNumber(),
request.source.source().getColumnNumber(),
request.source.text()
);
var queryOperator = new EnrichQuerySourceOperator(
driverContext.blockFactory(),
EnrichQuerySourceOperator.DEFAULT_MAX_PAGE_SIZE,
queryList,
searchExecutionContext.getIndexReader()
searchExecutionContext.getIndexReader(),
warnings
);
releasables.add(queryOperator);
var extractFieldsOperator = extractFieldsOperator(searchContext, driverContext, request.extractFields);
Expand Down Expand Up @@ -446,13 +455,22 @@ abstract static class Request {
final DataType inputDataType;
final Page inputPage;
final List<NamedExpression> extractFields;
final Source source;

Request(String sessionId, String index, DataType inputDataType, Page inputPage, List<NamedExpression> extractFields) {
Request(
String sessionId,
String index,
DataType inputDataType,
Page inputPage,
List<NamedExpression> extractFields,
Source source
) {
this.sessionId = sessionId;
this.index = index;
this.inputDataType = inputDataType;
this.inputPage = inputPage;
this.extractFields = extractFields;
this.source = source;
}
}

Expand All @@ -462,6 +480,7 @@ abstract static class TransportRequest extends org.elasticsearch.transport.Trans
final DataType inputDataType;
final Page inputPage;
final List<NamedExpression> extractFields;
final Source source;
// TODO: Remove this workaround once we have Block RefCount
final Page toRelease;
final RefCounted refs = AbstractRefCounted.of(this::releasePage);
Expand All @@ -472,14 +491,16 @@ abstract static class TransportRequest extends org.elasticsearch.transport.Trans
DataType inputDataType,
Page inputPage,
Page toRelease,
List<NamedExpression> extractFields
List<NamedExpression> extractFields,
Source source
) {
this.sessionId = sessionId;
this.shardId = shardId;
this.inputDataType = inputDataType;
this.inputPage = inputPage;
this.toRelease = toRelease;
this.extractFields = extractFields;
this.source = source;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;

import java.io.IOException;
Expand All @@ -35,6 +36,7 @@ public final class EnrichLookupOperator extends AsyncOperator {
private final String matchType;
private final String matchField;
private final List<NamedExpression> enrichFields;
private final Source source;
private long totalTerms = 0L;

public record Factory(
Expand All @@ -47,7 +49,8 @@ public record Factory(
String enrichIndex,
String matchType,
String matchField,
List<NamedExpression> enrichFields
List<NamedExpression> enrichFields,
Source source
) implements OperatorFactory {
@Override
public String describe() {
Expand Down Expand Up @@ -75,7 +78,8 @@ public Operator get(DriverContext driverContext) {
enrichIndex,
matchType,
matchField,
enrichFields
enrichFields,
source
);
}
}
Expand All @@ -91,7 +95,8 @@ public EnrichLookupOperator(
String enrichIndex,
String matchType,
String matchField,
List<NamedExpression> enrichFields
List<NamedExpression> enrichFields,
Source source
) {
super(driverContext, maxOutstandingRequests);
this.sessionId = sessionId;
Expand All @@ -103,6 +108,7 @@ public EnrichLookupOperator(
this.matchType = matchType;
this.matchField = matchField;
this.enrichFields = enrichFields;
this.source = source;
}

@Override
Expand All @@ -116,7 +122,8 @@ protected void performAsync(Page inputPage, ActionListener<Page> listener) {
matchType,
matchField,
new Page(inputBlock),
enrichFields
enrichFields,
source
);
enrichLookupService.lookupAsync(request, parentTask, listener.map(inputPage::appendPage));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamOutput;
Expand Down Expand Up @@ -73,12 +74,14 @@ protected TransportRequest transportRequest(EnrichLookupService.Request request,
request.matchField,
request.inputPage,
null,
request.extractFields
request.extractFields,
request.source
);
}

@Override
protected QueryList queryList(TransportRequest request, SearchExecutionContext context, Block inputBlock, DataType inputDataType) {
validateTypes(inputDataType, context.getFieldType(request.matchField));
MappedFieldType fieldType = context.getFieldType(request.matchField);
return switch (request.matchType) {
case "match", "range" -> termQueryList(fieldType, context, inputBlock, inputDataType);
Expand All @@ -104,6 +107,10 @@ private static void validateTypes(DataType inputDataType, MappedFieldType fieldT
}

private static boolean rangeTypesCompatible(RangeType rangeType, DataType inputDataType) {
if (inputDataType.noText() == DataType.KEYWORD) {
// We allow runtime parsing of string types to numeric types
return true;
}
return switch (rangeType) {
case INTEGER, LONG -> inputDataType.isWholeNumber();
case IP -> inputDataType == DataType.IP;
Expand All @@ -123,9 +130,10 @@ public static class Request extends AbstractLookupService.Request {
String matchType,
String matchField,
Page inputPage,
List<NamedExpression> extractFields
List<NamedExpression> extractFields,
Source source
) {
super(sessionId, index, inputDataType, inputPage, extractFields);
super(sessionId, index, inputDataType, inputPage, extractFields, source);
this.matchType = matchType;
this.matchField = matchField;
}
Expand All @@ -143,9 +151,10 @@ protected static class TransportRequest extends AbstractLookupService.TransportR
String matchField,
Page inputPage,
Page toRelease,
List<NamedExpression> extractFields
List<NamedExpression> extractFields,
Source source
) {
super(sessionId, shardId, inputDataType, inputPage, toRelease, extractFields);
super(sessionId, shardId, inputDataType, inputPage, toRelease, extractFields, source);
this.matchType = matchType;
this.matchField = matchField;
}
Expand All @@ -165,6 +174,10 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
}
PlanStreamInput planIn = new PlanStreamInput(in, in.namedWriteableRegistry(), null);
List<NamedExpression> extractFields = planIn.readNamedWriteableCollectionAsList(NamedExpression.class);
var source = Source.EMPTY;
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_ENRICH_RUNTIME_WARNINGS)) {
source = Source.readFrom(planIn);
}
TransportRequest result = new TransportRequest(
sessionId,
shardId,
Expand All @@ -173,7 +186,8 @@ static TransportRequest readFrom(StreamInput in, BlockFactory blockFactory) thro
matchField,
inputPage,
inputPage,
extractFields
extractFields,
source
);
result.setParentTask(parentTaskId);
return result;
Expand All @@ -192,6 +206,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeWriteable(inputPage);
PlanStreamOutput planOut = new PlanStreamOutput(out, null);
planOut.writeNamedWriteableCollection(extractFields);
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_ENRICH_RUNTIME_WARNINGS)) {
source.writeTo(planOut);
}
}

@Override
Expand Down
Loading

0 comments on commit 498adc3

Please sign in to comment.