Skip to content

Commit

Permalink
ES|QL: better management of exact subfields for TEXT fields (elastic#…
Browse files Browse the repository at this point in the history
  • Loading branch information
luigidellaquila authored Jan 9, 2024
1 parent 15258c8 commit 089435c
Show file tree
Hide file tree
Showing 26 changed files with 599 additions and 47 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/103510.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 103510
summary: "ES|QL: better management of exact subfields for TEXT fields"
area: ES|QL
type: bug
issues:
- 99899
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ protected IngestScriptSupport ingestScriptSupport() {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> ((BytesRef) v).utf8ToString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ public List<SyntheticSourceInvalidExample> invalidExample() throws IOException {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> (Number) v;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,9 +937,15 @@ public boolean isAggregatable() {
return fielddata;
}

public boolean canUseSyntheticSourceDelegateForQuerying() {
return syntheticSourceDelegate != null
&& syntheticSourceDelegate.ignoreAbove() == Integer.MAX_VALUE
&& (syntheticSourceDelegate.isIndexed() || syntheticSourceDelegate.isStored());
}

@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (syntheticSourceDelegate != null) {
if (canUseSyntheticSourceDelegateForQuerying()) {
return new BlockLoader.Delegating(syntheticSourceDelegate.blockLoader(blContext)) {
@Override
protected String delegatingTo() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public List<SyntheticSourceInvalidExample> invalidExample() throws IOException {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
// Just assert that we expect a boolean. Otherwise no munging.
return v -> (Boolean) v;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ public void execute() {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> ((Number) v).longValue();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected SyntheticSourceSupport syntheticSourceSupport(boolean ignoreMalformed)
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> {
// The test converts the float into a string so we do do
Number n = (Number) v;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ protected IngestScriptSupport ingestScriptSupport() {
private boolean useDocValues = false;

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
if (useDocValues) {
return v -> asJacksonNumberOutput(((Number) v).longValue());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected SyntheticSourceSupport syntheticSourceSupport(boolean ignoreMalformed)
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> {
// The test converts the float into a string so we do do
Number n = (Number) v;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public void execute() {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> InetAddresses.toAddrString(InetAddressPoint.decode(BytesRef.deepCopyOf((BytesRef) v).bytes));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ protected boolean supportsIgnoreMalformed() {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return v -> ((BytesRef) v).utf8ToString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void testFetchCoerced() throws IOException {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return n -> {
Number number = ((Number) n);
if (Integer.MIN_VALUE <= number.longValue() && number.longValue() <= Integer.MAX_VALUE) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ public void testAllowMultipleValuesField() throws IOException {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return n -> ((Number) n); // Just assert it's a number
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1186,10 +1186,25 @@ public List<SyntheticSourceInvalidExample> invalidExample() throws IOException {
}

@Override
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String fieldName) {
if (nullLoaderExpected(mapper, fieldName)) {
return null;
}
return v -> ((BytesRef) v).utf8ToString();
}

private boolean nullLoaderExpected(MapperService mapper, String fieldName) {
MappedFieldType type = mapper.fieldType(fieldName);
if (type instanceof TextFieldType t) {
if (t.isSyntheticSource() == false || t.canUseSyntheticSourceDelegateForQuerying() || t.isStored()) {
return false;
}
String parentField = mapper.mappingLookup().parentField(fieldName);
return parentField == null || nullLoaderExpected(mapper, parentField);
}
return false;
}

@Override
protected IngestScriptSupport ingestScriptSupport() {
throw new AssumptionViolatedException("not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ public FieldNamesFieldMapper.FieldNamesFieldType fieldNames() {
return (FieldNamesFieldMapper.FieldNamesFieldType) mapper.fieldType(FieldNamesFieldMapper.NAME);
}
});
Function<Object, Object> valuesConvert = loadBlockExpected();
Function<Object, Object> valuesConvert = loadBlockExpected(mapper, loaderFieldName);
if (valuesConvert == null) {
assertNull(loader);
return;
Expand Down Expand Up @@ -1371,7 +1371,7 @@ protected Matcher<?> blockItemMatcher(Object expected) {
* How {@link MappedFieldType#blockLoader} should load values or {@code null}
* if that method isn't supported by field being tested.
*/
protected Function<Object, Object> loadBlockExpected() {
protected Function<Object, Object> loadBlockExpected(MapperService mapper, String loaderFieldName) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.xpack.esql.planner.EsqlTranslatorHandler;
import org.elasticsearch.xpack.esql.planner.PhysicalVerificationException;
import org.elasticsearch.xpack.esql.planner.PhysicalVerifier;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.ql.common.Failure;
import org.elasticsearch.xpack.ql.expression.Alias;
import org.elasticsearch.xpack.ql.expression.Attribute;
Expand All @@ -54,6 +55,7 @@
import org.elasticsearch.xpack.ql.querydsl.query.Query;
import org.elasticsearch.xpack.ql.rule.ParameterizedRuleExecutor;
import org.elasticsearch.xpack.ql.rule.Rule;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.Queries;
import org.elasticsearch.xpack.ql.util.Queries.Clause;
import org.elasticsearch.xpack.ql.util.StringUtils;
Expand All @@ -64,6 +66,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -193,15 +196,18 @@ private static Set<Attribute> missingAttributes(PhysicalPlan p) {
}
}

public static class PushFiltersToSource extends OptimizerRule<FilterExec> {
public static class PushFiltersToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
FilterExec,
LocalPhysicalOptimizerContext> {

@Override
protected PhysicalPlan rule(FilterExec filterExec) {
protected PhysicalPlan rule(FilterExec filterExec, LocalPhysicalOptimizerContext ctx) {
PhysicalPlan plan = filterExec;
if (filterExec.child() instanceof EsQueryExec queryExec) {
List<Expression> pushable = new ArrayList<>();
List<Expression> nonPushable = new ArrayList<>();
for (Expression exp : splitAnd(filterExec.condition())) {
(canPushToSource(exp) ? pushable : nonPushable).add(exp);
(canPushToSource(exp, x -> hasIdenticalDelegate(x, ctx.searchStats())) ? pushable : nonPushable).add(exp);
}
if (pushable.size() > 0) { // update the executable with pushable conditions
Query queryDSL = TRANSLATOR_HANDLER.asQuery(Predicates.combineAnd(pushable));
Expand All @@ -227,26 +233,30 @@ protected PhysicalPlan rule(FilterExec filterExec) {
return plan;
}

public static boolean canPushToSource(Expression exp) {
public static boolean canPushToSource(Expression exp, Predicate<FieldAttribute> hasIdenticalDelegate) {
if (exp instanceof BinaryComparison bc) {
return isAttributePushable(bc.left(), bc) && bc.right().foldable();
return isAttributePushable(bc.left(), bc, hasIdenticalDelegate) && bc.right().foldable();
} else if (exp instanceof BinaryLogic bl) {
return canPushToSource(bl.left()) && canPushToSource(bl.right());
return canPushToSource(bl.left(), hasIdenticalDelegate) && canPushToSource(bl.right(), hasIdenticalDelegate);
} else if (exp instanceof In in) {
return isAttributePushable(in.value(), null) && Expressions.foldable(in.list());
return isAttributePushable(in.value(), null, hasIdenticalDelegate) && Expressions.foldable(in.list());
} else if (exp instanceof Not not) {
return canPushToSource(not.field());
return canPushToSource(not.field(), hasIdenticalDelegate);
} else if (exp instanceof UnaryScalarFunction usf) {
if (usf instanceof RegexMatch<?> || usf instanceof IsNull || usf instanceof IsNotNull) {
return isAttributePushable(usf.field(), usf);
return isAttributePushable(usf.field(), usf, hasIdenticalDelegate);
}
}
return false;
}

private static boolean isAttributePushable(Expression expression, Expression operation) {
if (expression instanceof FieldAttribute f && f.getExactInfo().hasExact()) {
return isAggregatable(f);
private static boolean isAttributePushable(
Expression expression,
Expression operation,
Predicate<FieldAttribute> hasIdenticalDelegate
) {
if (isPushableFieldAttribute(expression, hasIdenticalDelegate)) {
return true;
}
if (expression instanceof MetadataAttribute ma && ma.searchable()) {
return operation == null
Expand Down Expand Up @@ -282,15 +292,17 @@ protected PhysicalPlan rule(LimitExec limitExec) {
}
}

private static class PushTopNToSource extends OptimizerRule<TopNExec> {
private static class PushTopNToSource extends PhysicalOptimizerRules.ParameterizedOptimizerRule<
TopNExec,
LocalPhysicalOptimizerContext> {
@Override
protected PhysicalPlan rule(TopNExec topNExec) {
protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx) {
PhysicalPlan plan = topNExec;
PhysicalPlan child = topNExec.child();

boolean canPushDownTopN = child instanceof EsQueryExec
|| (child instanceof ExchangeExec exchangeExec && exchangeExec.child() instanceof EsQueryExec);
if (canPushDownTopN && canPushDownOrders(topNExec.order())) {
if (canPushDownTopN && canPushDownOrders(topNExec.order(), x -> hasIdenticalDelegate(x, ctx.searchStats()))) {
var sorts = buildFieldSorts(topNExec.order());
var limit = topNExec.limit();

Expand All @@ -303,10 +315,9 @@ protected PhysicalPlan rule(TopNExec topNExec) {
return plan;
}

private boolean canPushDownOrders(List<Order> orders) {
private boolean canPushDownOrders(List<Order> orders, Predicate<FieldAttribute> hasIdenticalDelegate) {
// allow only exact FieldAttributes (no expressions) for sorting
return orders.stream()
.allMatch(o -> o.child() instanceof FieldAttribute fa && fa.getExactInfo().hasExact() && isAggregatable(fa));
return orders.stream().allMatch(o -> isPushableFieldAttribute(o.child(), hasIdenticalDelegate));
}

private List<EsQueryExec.FieldSort> buildFieldSorts(List<Order> orders) {
Expand Down Expand Up @@ -405,4 +416,15 @@ private Tuple<List<Attribute>, List<Stat>> pushableStats(AggregateExec aggregate
}
}

public static boolean hasIdenticalDelegate(FieldAttribute attr, SearchStats stats) {
return stats.hasIdenticalDelegate(attr.name());
}

public static boolean isPushableFieldAttribute(Expression exp, Predicate<FieldAttribute> hasIdenticalDelegate) {
if (exp instanceof FieldAttribute fa && fa.getExactInfo().hasExact() && isAggregatable(fa)) {
return fa.dataType() != DataTypes.TEXT || hasIdenticalDelegate.test(fa);
}
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.PhysicalOperation;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.type.DataType;

import java.util.ArrayList;
Expand Down Expand Up @@ -73,9 +72,6 @@ public final PhysicalOperation fieldExtractPhysicalOperation(FieldExtractExec fi
List<ValuesSourceReaderOperator.FieldInfo> fields = new ArrayList<>();
int docChannel = source.layout.get(sourceAttr.id()).channel();
for (Attribute attr : fieldExtractExec.attributesToExtract()) {
if (attr instanceof FieldAttribute fa && fa.getExactInfo().hasExact()) {
attr = fa.exactAttribute();
}
layout.append(attr);
DataType dataType = attr.dataType();
ElementType elementType = PlannerUtils.toElementType(dataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.AttributeSet;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
import org.elasticsearch.xpack.ql.expression.predicate.Predicates;
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
import org.elasticsearch.xpack.ql.plan.logical.Filter;
Expand All @@ -48,6 +49,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;

import static java.util.Arrays.asList;
import static org.elasticsearch.xpack.esql.optimizer.LocalPhysicalPlanOptimizer.PushFiltersToSource.canPushToSource;
Expand Down Expand Up @@ -149,12 +151,16 @@ public static PhysicalPlan localPlan(

/**
* Extracts the ES query provided by the filter parameter
* @param plan
* @param hasIdenticalDelegate a lambda that given a field attribute sayis if it has
* a synthetic source delegate with the exact same value
* @return
*/
public static QueryBuilder requestFilter(PhysicalPlan plan) {
return detectFilter(plan, "@timestamp");
public static QueryBuilder requestFilter(PhysicalPlan plan, Predicate<FieldAttribute> hasIdenticalDelegate) {
return detectFilter(plan, "@timestamp", hasIdenticalDelegate);
}

static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName, Predicate<FieldAttribute> hasIdenticalDelegate) {
// first position is the REST filter, the second the query filter
var requestFilter = new QueryBuilder[] { null, null };

Expand All @@ -175,7 +181,7 @@ static QueryBuilder detectFilter(PhysicalPlan plan, String fieldName) {
boolean matchesField = refs.removeIf(e -> fieldName.equals(e.name()));
// the expression only contains the target reference
// and the expression is pushable (functions can be fully translated)
if (matchesField && refs.isEmpty() && canPushToSource(exp)) {
if (matchesField && refs.isEmpty() && canPushToSource(exp, hasIdenticalDelegate)) {
matches.add(exp);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ private void startComputeOnDataNodes(
ActionListener<Void> parentListener,
Supplier<ActionListener<ComputeResponse>> dataNodeListenerSupplier
) {
QueryBuilder requestFilter = PlannerUtils.requestFilter(dataNodePlan);
// The lambda is to say if a TEXT field has an identical exact subfield
// We cannot use SearchContext because we don't have it yet.
// Since it's used only for @timestamp, it is relatively safe to assume it's not needed
// but it would be better to have a proper impl.
QueryBuilder requestFilter = PlannerUtils.requestFilter(dataNodePlan, x -> true);
lookupDataNodes(parentTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(dataNodes -> {
try (RefCountingRunnable refs = new RefCountingRunnable(() -> parentListener.onResponse(null))) {
// For each target node, first open a remote exchange on the remote node, then link the exchange source to
Expand Down
Loading

0 comments on commit 089435c

Please sign in to comment.