Skip to content

Commit

Permalink
Merge branch '8.x' into lucene_9_12_1
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisHegarty authored Dec 12, 2024
2 parents 954d39a + c341d73 commit fcc563a
Show file tree
Hide file tree
Showing 19 changed files with 223 additions and 80 deletions.
3 changes: 0 additions & 3 deletions branches.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
{
"branch": "8.x"
},
{
"branch": "8.15"
},
{
"branch": "7.17"
}
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/118114.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118114
summary: Enable physical plan verification
area: ES|QL
type: enhancement
issues: []
27 changes: 27 additions & 0 deletions docs/reference/mapping/params/index-prefixes.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,30 @@ PUT my-index-000001
}
}
--------------------------------

`index_prefixes` parameter instructs {ES} to create a subfield "._index_prefix". This
field will be used to do fast prefix queries. When doing highlighting, add "._index_prefix"
subfield to the `matched_fields` parameter to highlight the main field based on the
found matches of the prefix field, like in the request below:

[source,console]
--------------------------------
GET my-index-000001/_search
{
"query": {
"prefix": {
"full_name": {
"value": "ki"
}
}
},
"highlight": {
"fields": {
"full_name": {
"matched_fields": ["full_name._index_prefix"]
}
}
}
}
--------------------------------
// TEST[continued]
15 changes: 15 additions & 0 deletions docs/reference/mapping/types/search-as-you-type.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,21 @@ GET my-index-000001/_search
"my_field._3gram"
]
}
},
"highlight": {
"fields": {
"my_field": {
"matched_fields": ["my_field._index_prefix"] <1>
}
}
}
}
--------------------------------------------------
// TEST[continued]

<1> Adding "my_field._index_prefix" to the `matched_fields` allows to highlight
"my_field" also based on matches from "my_field._index_prefix" field.

[source,console-result]
--------------------------------------------------
{
Expand All @@ -126,6 +136,11 @@ GET my-index-000001/_search
"_score" : 0.8630463,
"_source" : {
"my_field" : "quick brown fox jump lazy dog"
},
"highlight": {
"my_field": [
"quick <em>brown fox jump lazy</em> dog"
]
}
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ public void reset() throws IOException {
if (markedSlice < 0 || markedSliceOffset < 0) {
throw new IOException("Mark has not been set");
}
if (initialized && nextSlice == markedSlice + 1 && currentSliceOffset == markedSliceOffset) {
// Reset at the marked offset should return immediately without re-opening the slice
return;
}

nextSlice = markedSlice;
initialized = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ protected InputStream openSlice(int slice) throws IOException {

// Mark
input.mark(randomNonNegativeInt());
int slicesOpenedAtMark = streamsOpened.size();

// Read or skip up to another random point
final int moreBytes = randomIntBetween(0, bytes.length - mark);
int moreBytes = randomIntBetween(0, bytes.length - mark);
if (moreBytes > 0) {
if (randomBoolean()) {
final var moreBytesRead = new byte[moreBytes];
Expand All @@ -171,11 +172,13 @@ protected InputStream openSlice(int slice) throws IOException {

// Randomly read to EOF
if (randomBoolean()) {
input.readAllBytes();
moreBytes += input.readAllBytes().length;
}

// Reset
input.reset();
int slicesOpenedAfterReset = streamsOpened.size();
assert moreBytes > 0 || mark == 0 || slicesOpenedAfterReset == slicesOpenedAtMark : "Reset at mark should not re-open slices";

// Read all remaining bytes, which should be the bytes from mark up to the end
final int remainingBytes = bytes.length - mark;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ public Attribute(Source source, String name, Nullability nullability, @Nullable
this.nullability = nullability;
}

public static String rawTemporaryName(String inner, String outer, String suffix) {
return SYNTHETIC_ATTRIBUTE_NAME_PREFIX + inner + "$" + outer + "$" + suffix;
public static String rawTemporaryName(String... parts) {
var name = String.join("$", parts);
return name.isEmpty() || name.startsWith(SYNTHETIC_ATTRIBUTE_NAME_PREFIX) ? name : SYNTHETIC_ATTRIBUTE_NAME_PREFIX + name;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ protected List<Batch<PhysicalPlan>> batches() {
}

protected List<Batch<PhysicalPlan>> rules(boolean optimizeForEsSource) {
List<Rule<?, PhysicalPlan>> esSourceRules = new ArrayList<>(4);
List<Rule<?, PhysicalPlan>> esSourceRules = new ArrayList<>(6);
esSourceRules.add(new ReplaceSourceAttributes());

if (optimizeForEsSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.xpack.esql.common.Failure;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.optimizer.rules.PlanConsistencyChecker;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EnrichExec;
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;

Expand All @@ -31,10 +35,20 @@ private PhysicalVerifier() {}
/** Verifies the physical plan. */
public Collection<Failure> verify(PhysicalPlan plan) {
Set<Failure> failures = new LinkedHashSet<>();
Failures depFailures = new Failures();

// AwaitsFix https://github.com/elastic/elasticsearch/issues/118531
var enriches = plan.collectFirstChildren(EnrichExec.class::isInstance);
if (enriches.isEmpty() == false && ((EnrichExec) enriches.get(0)).mode() == Enrich.Mode.REMOTE) {
return failures;
}

plan.forEachDown(p -> {
// FIXME: re-enable
// DEPENDENCY_CHECK.checkPlan(p, failures);
if (p instanceof AggregateExec agg) {
var exclude = Expressions.references(agg.ordinalAttributes());
DEPENDENCY_CHECK.checkPlan(p, exclude, depFailures);
return;
}
if (p instanceof FieldExtractExec fieldExtractExec) {
Attribute sourceAttribute = fieldExtractExec.sourceAttribute();
if (sourceAttribute == null) {
Expand All @@ -48,8 +62,13 @@ public Collection<Failure> verify(PhysicalPlan plan) {
);
}
}
DEPENDENCY_CHECK.checkPlan(p, depFailures);
});

if (depFailures.hasFailures()) {
throw new IllegalStateException(depFailures.toString());
}

return failures;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ public class PlanConsistencyChecker<P extends QueryPlan<P>> {
* {@link org.elasticsearch.xpack.esql.common.Failure Failure}s to the {@link Failures} object.
*/
public void checkPlan(P p, Failures failures) {
checkPlan(p, AttributeSet.EMPTY, failures);
}

public void checkPlan(P p, AttributeSet exclude, Failures failures) {
AttributeSet refs = p.references();
AttributeSet input = p.inputSet();
AttributeSet missing = refs.subtract(input);
AttributeSet missing = refs.subtract(input).subtract(exclude);
// TODO: for Joins, we should probably check if the required fields from the left child are actually in the left child, not
// just any child (and analogously for the right child).
if (missing.isEmpty() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
Expand All @@ -22,7 +21,6 @@

import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -54,18 +52,9 @@ public PhysicalPlan apply(PhysicalPlan plan) {
* it loads the field lazily. If we have more than one field we need to
* make sure the fields are loaded for the standard hash aggregator.
*/
if (p instanceof AggregateExec agg && agg.groupings().size() == 1) {
// CATEGORIZE requires the standard hash aggregator as well.
if (agg.groupings().get(0).anyMatch(e -> e instanceof Categorize) == false) {
var leaves = new LinkedList<>();
// TODO: this seems out of place
agg.aggregates()
.stream()
.filter(a -> agg.groupings().contains(a) == false)
.forEach(a -> leaves.addAll(a.collectLeaves()));
var remove = agg.groupings().stream().filter(g -> leaves.contains(g) == false).toList();
missing.removeAll(Expressions.references(remove));
}
if (p instanceof AggregateExec agg) {
var ordinalAttributes = agg.ordinalAttributes();
missing.removeAll(Expressions.references(ordinalAttributes));
}

// add extractor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -184,6 +187,24 @@ protected AttributeSet computeReferences() {
return mode.isInputPartial() ? new AttributeSet(intermediateAttributes) : Aggregate.computeReferences(aggregates, groupings);
}

/** Returns the attributes that can be loaded from ordinals -- no explicit extraction is needed */
public List<Attribute> ordinalAttributes() {
List<Attribute> orginalAttributs = new ArrayList<>(groupings.size());
// Ordinals can be leveraged just for a single grouping. If there are multiple groupings, fields need to be laoded for the
// hash aggregator.
// CATEGORIZE requires the standard hash aggregator as well.
if (groupings().size() == 1 && groupings.get(0).anyMatch(e -> e instanceof Categorize) == false) {
var leaves = new HashSet<>();
aggregates.stream().filter(a -> groupings.contains(a) == false).forEach(a -> leaves.addAll(a.collectLeaves()));
groupings.forEach(g -> {
if (leaves.contains(g) == false) {
orginalAttributs.add((Attribute) g);
}
});
}
return orginalAttributs;
}

@Override
public int hashCode() {
return Objects.hash(groupings, aggregates, mode, intermediateAttributes, estimatedRowSize, child());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
Expand Down Expand Up @@ -72,6 +73,12 @@ public boolean inBetweenAggs() {
return inBetweenAggs;
}

@Override
protected AttributeSet computeReferences() {
// ExchangeExec does no input referencing, it only outputs all synthetic attributes, "sourced" from remote exchanges.
return AttributeSet.EMPTY;
}

@Override
public UnaryExec replaceChild(PhysicalPlan newChild) {
return new ExchangeExec(source(), output, inBetweenAggs, newChild);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,7 @@ public static Attribute extractSourceAttributesFrom(PhysicalPlan plan) {

@Override
protected AttributeSet computeReferences() {
AttributeSet required = new AttributeSet(docValuesAttributes);

required.add(sourceAttribute);
required.addAll(attributesToExtract);

return required;
return sourceAttribute != null ? new AttributeSet(sourceAttribute) : AttributeSet.EMPTY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ public List<Attribute> addedFields() {
public List<Attribute> output() {
if (lazyOutput == null) {
lazyOutput = new ArrayList<>(left().output());
for (Attribute attr : addedFields) {
lazyOutput.add(attr);
}
var addedFieldsNames = addedFields.stream().map(Attribute::name).toList();
lazyOutput.removeIf(a -> addedFieldsNames.contains(a.name()));
lazyOutput.addAll(addedFields);
}
return lazyOutput;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,9 @@ private void aggregatesToFactory(
// coordinator/exchange phase
else if (mode == AggregatorMode.FINAL || mode == AggregatorMode.INTERMEDIATE) {
if (grouping) {
sourceAttr = aggregateMapper.mapGrouping(aggregateFunction);
sourceAttr = aggregateMapper.mapGrouping(ne);
} else {
sourceAttr = aggregateMapper.mapNonGrouping(aggregateFunction);
sourceAttr = aggregateMapper.mapNonGrouping(ne);
}
} else {
throw new EsqlIllegalArgumentException("illegal aggregation mode");
Expand Down
Loading

0 comments on commit fcc563a

Please sign in to comment.