Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ES|QL: better management of exact subfields for TEXT fields #103510

Merged
merged 17 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/103510.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103510
summary: "ES|QL: better management of exact subfields for TEXT fields"
area: ES|QL
type: bug
issues:
- 99899
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ protected IngestScriptSupport ingestScriptSupport() {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> ((BytesRef) v).utf8ToString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ public List<SyntheticSourceInvalidExample> invalidExample() throws IOException {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> (Number) v;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,9 +937,15 @@ public boolean isAggregatable() {
return fielddata;
}

public boolean canUseSyntheticSourceDelegateForQuerying() {
return syntheticSourceDelegate != null
&& syntheticSourceDelegate.ignoreAbove() == Integer.MAX_VALUE
&& (syntheticSourceDelegate.isIndexed() || syntheticSourceDelegate.isStored());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A synthetic _source delegate will never have a normalizer. It might have ignore_above though. If it does I think it's still safe to use it for fetching but it won't help at the moment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should probably have a name like canUseSyntheticSourceDelegateForQuerying.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! There's no use in using the delegate for querying if the field isn't indexed. maybe check that?

And! I'm not 100% sure block loading works for synthetic source keyword fields with doc values disabled. We should fall to the originalName() stored field. If we don't that's a bug.

Copy link
Contributor Author

@luigidellaquila luigidellaquila Jan 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And! I'm not 100% sure block loading works for synthetic source keyword fields with doc values disabled. We should fall to the originalName() stored field. If we don't that's a bug.

I'm not sure I understand the exact use case here. I tried a few cases with KEYWORD fields alone and with TEXT fields with KEYWORD subfields, but every time I try to disable doc_values, I get an error at index creation time like field .. doesn't support synthetic source unless it is stored or has a sub-field of type [keyword] with doc values or stored and without a normalizer.

Do you have an example?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry -it looks like you need both doc_values: false, stored: true, ignore_above: 12 or something. I think it's probably just an issue to file and run down later.


@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (syntheticSourceDelegate != null) {
if (canUseSyntheticSourceDelegateForQuerying()) {
return new BlockLoader.Delegating(syntheticSourceDelegate.blockLoader(blContext)) {
@Override
protected String delegatingTo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public List<SyntheticSourceInvalidExample> invalidExample() throws IOException {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
// Just assert that we expect a boolean. Otherwise no munging.
return v -> (Boolean) v;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ public void execute() {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> ((Number) v).longValue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected SyntheticSourceSupport syntheticSourceSupport(boolean ignoreMalformed)
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> {
// The test converts the float into a string so we do do
Number n = (Number) v;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ protected IngestScriptSupport ingestScriptSupport() {
private boolean useDocValues = false;

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
if (useDocValues) {
return v -> asJacksonNumberOutput(((Number) v).longValue());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected SyntheticSourceSupport syntheticSourceSupport(boolean ignoreMalformed)
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> {
// The test converts the float into a string so we do do
Number n = (Number) v;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public void execute() {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> InetAddresses.toAddrString(InetAddressPoint.decode(BytesRef.deepCopyOf((BytesRef) v).bytes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ protected boolean supportsIgnoreMalformed() {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> ((BytesRef) v).utf8ToString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testFetchCoerced() throws IOException {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return n -> {
Number number = ((Number) n);
if (Integer.MIN_VALUE <= number.longValue() && number.longValue() <= Integer.MAX_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ public void testAllowMultipleValuesField() throws IOException {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return n -> ((Number) n); // Just assert it's a number
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1186,10 +1186,25 @@ public List<SyntheticSourceInvalidExample> invalidExample() throws IOException {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String fieldName) {
if (nullLoaderExpected(mapper, fieldName)) {
return null;
}
return v -> ((BytesRef) v).utf8ToString();
}

private boolean nullLoaderExpected(MapperService mapper, String fieldName) {
MappedFieldType type = mapper.fieldType(fieldName);
if (type instanceof TextFieldType t) {
if (t.isSyntheticSource() == false || t.canUseSyntheticSourceDelegateForQuerying() || t.isStored()) {
return false;
}
String parentField = mapper.mappingLookup().parentField(fieldName);
return parentField == null || nullLoaderExpected(mapper, parentField);
}
return false;
}

@Override
protected IngestScriptSupport ingestScriptSupport() {
throw new AssumptionViolatedException("not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() {
return (FieldNamesFieldMapper.FieldNamesFieldType) mapper.fieldType(FieldNamesFieldMapper.NAME);
}
});
Function<Object, Object> valuesConvert = loadBlockExpected();
Function<Object, Object> valuesConvert = loadBlockExpected(mapper, loaderFieldName);
if (valuesConvert == null) {
assertNull(loader);
return;
Expand Down Expand Up @@ -1371,7 +1371,7 @@ protected Matcher<?> blockItemMatcher(Object expected) {
* How {@link MappedFieldType#blockLoader} should load values or {@code null}
* if that method isn't supported by field being tested.
*/
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.xpack.esql.planner.EsqlTranslatorHandler;
import org.elasticsearch.xpack.esql.planner.PhysicalVerificationException;
import org.elasticsearch.xpack.esql.planner.PhysicalVerifier;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.ql.common.Failure;
import org.elasticsearch.xpack.ql.expression.Alias;
import org.elasticsearch.xpack.ql.expression.Attribute;
Expand All @@ -54,6 +55,7 @@
import org.elasticsearch.xpack.ql.querydsl.query.Query;
import org.elasticsearch.xpack.ql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.ql.rule.Rule;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.Queries;
import org.elasticsearch.xpack.ql.util.Queries.Clause;
import org.elasticsearch.xpack.ql.util.StringUtils;
Expand All @@ -64,6 +66,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -193,15 +196,18 @@ private static Set<Attribute> missingAttributes(PhysicalPlan p) {
}
}

public static class PushFiltersToSource extends OptimizerRule<FilterExec> {
public static class PushFiltersToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
FilterExec,
LocalPhysicalOptimizerContext> {

@Override
protected PhysicalPlan rule(FilterExec filterExec) {
protected PhysicalPlan rule(FilterExec filterExec, LocalPhysicalOptimizerContext ctx) {
PhysicalPlan plan = filterExec;
if (filterExec.child() instanceof EsQueryExec queryExec) {
List<Expression> pushable = new ArrayList<>();
List<Expression> nonPushable = new ArrayList<>();
for (Expression exp : splitAnd(filterExec.condition())) {
(canPushToSource(exp) ? pushable : nonPushable).add(exp);
(canPushToSource(exp, x -> hasIdenticalDelegate(x, ctx.searchStats())) ? pushable : nonPushable).add(exp);
}
if (pushable.size() > 0) { // update the executable with pushable conditions
Query queryDSL = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(pushable));
Expand All @@ -227,26 +233,30 @@ protected PhysicalPlan rule(FilterExec filterExec) {
return plan;
}

public static boolean canPushToSource(Expression exp) {
public static boolean canPushToSource(Expression exp, Predicate<FieldAttribute> hasIdenticalDelegate) {
if (exp instanceof BinaryComparison bc) {
return isAttributePushable(bc.left(), bc) && bc.right().foldable();
return isAttributePushable(bc.left(), bc, hasIdenticalDelegate) && bc.right().foldable();
} else if (exp instanceof BinaryLogic bl) {
return canPushToSource(bl.left()) && canPushToSource(bl.right());
return canPushToSource(bl.left(), hasIdenticalDelegate) && canPushToSource(bl.right(), hasIdenticalDelegate);
} else if (exp instanceof In in) {
return isAttributePushable(in.value(), null) && Expressions.foldable(in.list());
return isAttributePushable(in.value(), null, hasIdenticalDelegate) && Expressions.foldable(in.list());
} else if (exp instanceof Not not) {
return canPushToSource(not.field());
return canPushToSource(not.field(), hasIdenticalDelegate);
} else if (exp instanceof UnaryScalarFunction usf) {
if (usf instanceof RegexMatch<?> || usf instanceof IsNull || usf instanceof IsNotNull) {
return isAttributePushable(usf.field(), usf);
return isAttributePushable(usf.field(), usf, hasIdenticalDelegate);
}
}
return false;
}

private static boolean isAttributePushable(Expression expression, Expression operation) {
if (expression instanceof FieldAttribute f && f.getExactInfo().hasExact()) {
return isAggregatable(f);
private static boolean isAttributePushable(
Expression expression,
Expression operation,
Predicate<FieldAttribute> hasIdenticalDelegate
) {
if (isPushableFieldAttribute(expression, hasIdenticalDelegate)) {
return true;
}
if (expression instanceof MetadataAttribute ma && ma.searchable()) {
return operation == null
Expand Down Expand Up @@ -282,15 +292,17 @@ protected PhysicalPlan rule(LimitExec limitExec) {
}
}

private static class PushTopNToSource extends OptimizerRule<TopNExec> {
private static class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
TopNExec,
LocalPhysicalOptimizerContext> {
@Override
protected PhysicalPlan rule(TopNExec topNExec) {
protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) {
PhysicalPlan plan = topNExec;
PhysicalPlan child = topNExec.child();

boolean canPushDownTopN = child instanceof EsQueryExec
|| (child instanceof ExchangeExec exchangeExec && exchangeExec.child() instanceof EsQueryExec);
if (canPushDownTopN && canPushDownOrders(topNExec.order())) {
if (canPushDownTopN && canPushDownOrders(topNExec.order(), x -> hasIdenticalDelegate(x, ctx.searchStats()))) {
var sorts = buildFieldSorts(topNExec.order());
var limit = topNExec.limit();

Expand All @@ -303,10 +315,9 @@ protected PhysicalPlan rule(TopNExec topNExec) {
return plan;
}

private boolean canPushDownOrders(List<Order> orders) {
private boolean canPushDownOrders(List<Order> orders, Predicate<FieldAttribute> hasIdenticalDelegate) {
// allow only exact FieldAttributes (no expressions) for sorting
return orders.stream()
.allMatch(o -> o.child() instanceof FieldAttribute fa && fa.getExactInfo().hasExact() && isAggregatable(fa));
return orders.stream().allMatch(o -> isPushableFieldAttribute(o.child(), hasIdenticalDelegate));
}

private List<EsQueryExec.FieldSort> buildFieldSorts(List<Order> orders) {
Expand Down Expand Up @@ -405,4 +416,15 @@ private Tuple<List<Attribute>, List<Stat>> pushableStats(AggregateExec aggregate
}
}

public static boolean hasIdenticalDelegate(FieldAttribute attr, SearchStats stats) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this should return the MappedFieldType for the identical delegate. Or something like that. Some identifier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, but it would require a bit of refactoring in the pushdown rules to really take advantage of the returned MappedFieldType, and it's not a trivial change. I think we can consider it for a follow-up

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. We just have to make sure the sub-field it finds is the right one. In case there is more than one candidate. Evil, I know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are safe in this sense, if there are two subfields, none is used, even if one of them is good (it's QL logic, not optimal, but in this case it makes our life easier)

return stats.hasIdenticalDelegate(attr.name());
}

public static boolean isPushableFieldAttribute(Expression exp, Predicate<FieldAttribute> hasIdenticalDelegate) {
if (exp instanceof FieldAttribute fa && fa.getExactInfo().hasExact() && isAggregatable(fa)) {
return fa.dataType() != DataTypes.TEXT || hasIdenticalDelegate.test(fa);
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.type.DataType;

import java.util.ArrayList;
Expand Down Expand Up @@ -73,9 +72,6 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi
List<ValuesSourceReaderOperator.FieldInfo> fields = new ArrayList<>();
int docChannel = source.layout.get(sourceAttr.id()).channel();
for (Attribute attr : fieldExtractExec.attributesToExtract()) {
if (attr instanceof FieldAttribute fa && fa.getExactInfo().hasExact()) {
attr = fa.exactAttribute();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed, TextFieldMapper will take care of fetching values from the exact subfield

}
layout.append(attr);
DataType dataType = attr.dataType();
ElementType elementType = PlannerUtils.toElementType(dataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.AttributeSet;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.expression.predicate.Predicates;
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
Expand All @@ -48,6 +49,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

import static java.util.Arrays.asList;
import static org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer.PushFiltersToSource.canPushToSource;
Expand Down Expand Up @@ -149,12 +151,16 @@ public static PhysicalPlan localPlan(

/**
* Extracts the ES query provided by the filter parameter
* @param plan
* @param hasIdenticalDelegate a lambda that given a field attribute sayis if it has
* a synthetic source delegate with the exact same value
* @return
*/
public static QueryBuilder requestFilter(PhysicalPlan plan) {
return detectFilter(plan, "@timestamp");
public static QueryBuilder requestFilter(PhysicalPlan plan, Predicate<FieldAttribute> hasIdenticalDelegate) {
return detectFilter(plan, "@timestamp", hasIdenticalDelegate);
}

static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName, Predicate<FieldAttribute> hasIdenticalDelegate) {
// first position is the REST filter, the second the query filter
var requestFilter = new QueryBuilder[] { null, null };

Expand All @@ -175,7 +181,7 @@ static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
boolean matchesField = refs.removeIf(e -> fieldName.equals(e.name()));
// the expression only contains the target reference
// and the expression is pushable (functions can be fully translated)
if (matchesField && refs.isEmpty() && canPushToSource(exp)) {
if (matchesField && refs.isEmpty() && canPushToSource(exp, hasIdenticalDelegate)) {
matches.add(exp);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ private void startComputeOnDataNodes(
ActionListener<Void> parentListener,
Supplier<ActionListener<ComputeResponse>> dataNodeListenerSupplier
) {
QueryBuilder requestFilter = PlannerUtils.requestFilter(dataNodePlan);
// The lambda is to say if a TEXT field has an identical exact subfield
// We cannot use SearchContext because we don't have it yet.
// Since it's used only for @timestamp, it is relatively safe to assume it's not needed
// but it would be better to have a proper impl.
QueryBuilder requestFilter = PlannerUtils.requestFilter(dataNodePlan, x -> true);
lookupDataNodes(parentTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(dataNodes -> {
try (RefCountingRunnable refs = new RefCountingRunnable(() -> parentListener.onResponse(null))) {
// For each target node, first open a remote exchange on the remote node, then link the exchange source to
Expand Down
Loading