Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Reduce the number of Evals ReplaceMissingFieldWithNull creates #104586

Merged
merged 7 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/104586.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 104586
summary: Reduce the number of Evals `ReplaceMissingFieldWithNull` creates
area: ES|QL
type: bug
issues:
- 104583
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,21 @@ public static EnrichResolution emptyPolicyResolution() {
return new EnrichResolution();
}

public static SearchStats statsForExistingField(String... names) {
return fieldMatchingExistOrMissing(true, names);
}

public static SearchStats statsForMissingField(String... names) {
return fieldMatchingExistOrMissing(false, names);
}

private static SearchStats fieldMatchingExistOrMissing(boolean exists, String... names) {
return new TestSearchStats() {
private final Set<String> missingFields = Set.of(names);
private final Set<String> fields = Set.of(names);

@Override
public boolean exists(String field) {
return missingFields.contains(field) == false;
return fields.contains(field) == exists;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1314,7 +1315,7 @@ public void testStatsNestFields() {
}
}

public void testStatsMissingFields() {
public void testStatsMissingFieldWithStats() {
final String node1, node2;
if (randomBoolean()) {
internalCluster().ensureAtLeastNumDataNodes(2);
Expand Down Expand Up @@ -1353,6 +1354,39 @@ public void testStatsMissingFields() {
}
}

public void testStatsMissingFieldKeepApp() {
final String node1, node2;
if (randomBoolean()) {
internalCluster().ensureAtLeastNumDataNodes(2);
node1 = randomDataNode().getName();
node2 = randomValueOtherThan(node1, () -> randomDataNode().getName());
} else {
node1 = randomDataNode().getName();
node2 = randomDataNode().getName();
}
assertAcked(
client().admin()
.indices()
.prepareCreate("foo-index")
.setSettings(Settings.builder().put("index.routing.allocation.require._name", node1))
.setMapping("foo_int", "type=integer", "foo_long", "type=long", "foo_float", "type=float", "foo_double", "type=double")
);
assertAcked(
client().admin()
.indices()
.prepareCreate("bar-index")
.setSettings(Settings.builder().put("index.routing.allocation.require._name", node2))
.setMapping("bar_int", "type=integer", "bar_long", "type=long", "bar_float", "type=float", "bar_double", "type=double")
);
String command = String.format(Locale.ROOT, "from foo-index,bar-index");
try (var resp = run(command)) {
var valuesList = getValuesList(resp);
assertEquals(8, resp.columns().size());
assertEquals(0, valuesList.size());
assertEquals(Collections.emptyList(), valuesList);
}
}

public void testCountTextField() {
assertAcked(client().admin().indices().prepareCreate("test_count").setMapping("name", "type=text"));
int numDocs = between(10, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ public Object fold() {

@Override
public ExpressionEvaluator.Factory toEvaluator(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) {
// force datatype initialization
var dataType = dataType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why this wasn't needed so far.
Nit: maybe just calling the function ignoring the return and not shadowing the instance variable might be cleaner.

Copy link
Member Author

@costin costin Jan 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a bug - the node was mutated but the new instance did not have its dataType()) called.
For example, the node was serialized, deserialized than the execution kicked in - since nobody called dataType(), the value was not initialized throwing a NPE - see the CI failures.

ExpressionEvaluator.Factory[] factories = children().stream()
.map(e -> toEvaluator.apply(new MvMax(e.source(), e)))
.toArray(ExpressionEvaluator.Factory[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public Object fold() {

@Override
public ExpressionEvaluator.Factory toEvaluator(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) {
// force datatype initialization
var dataType = dataType();

ExpressionEvaluator.Factory[] factories = children().stream()
.map(e -> toEvaluator.apply(new MvMin(e.source(), e)))
.toArray(ExpressionEvaluator.Factory[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.Alias;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expression;
Expand All @@ -43,6 +45,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -129,24 +132,35 @@ private LogicalPlan missingToNull(LogicalPlan plan, SearchStats stats) {
else if (plan instanceof Project project) {
var projections = project.projections();
List<NamedExpression> newProjections = new ArrayList<>(projections.size());
List<Alias> literals = new ArrayList<>();
Map<DataType, Alias> nullLiteral = Maps.newLinkedHashMapWithExpectedSize(EsqlDataTypes.types().size());

for (NamedExpression projection : projections) {
if (projection instanceof FieldAttribute f && stats.exists(f.qualifiedName()) == false) {
var alias = new Alias(f.source(), f.name(), null, Literal.of(f, null), f.id());
literals.add(alias);
newProjections.add(alias.toAttribute());
} else {
newProjections.add(projection);
DataType dt = f.dataType();
Alias nullAlias = nullLiteral.get(f.dataType());
// save the first field as null (per datatype)
if (nullAlias == null) {
Alias alias = new Alias(f.source(), f.name(), null, Literal.of(f, null), f.id());
nullLiteral.put(dt, alias);
projection = alias.toAttribute();
}
// otherwise point to it
else {
// since avoids creating field copies
projection = new Alias(f.source(), f.name(), f.qualifier(), nullAlias.toAttribute(), f.id());
}
}

newProjections.add(projection);
}
if (literals.size() > 0) {
plan = new Eval(project.source(), project.child(), literals);
// add the first found field as null
if (nullLiteral.size() > 0) {
plan = new Eval(project.source(), project.child(), new ArrayList<>(nullLiteral.values()));
plan = new Project(project.source(), plan, newProjections);
} else {
plan = project;
}
} else {
}
// otherwise transform fields in place
else {
plan = plan.transformExpressionsOnlyUp(
FieldAttribute.class,
f -> stats.exists(f.qualifiedName()) ? f : Literal.of(f, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,17 +406,25 @@ static class PropagateEvalFoldables extends Rule<LogicalPlan, LogicalPlan> {
@Override
public LogicalPlan apply(LogicalPlan plan) {
var collectRefs = new AttributeMap<Expression>();
// collect aliases

java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefs.resolve(r, r);

// collect aliases bottom-up
plan.forEachExpressionUp(Alias.class, a -> {
var c = a.child();
if (c.foldable()) {
collectRefs.put(a.toAttribute(), c);
boolean shouldCollect = c.foldable();
// try to resolve the expression based on an existing foldables
if (shouldCollect == false) {
c = c.transformUp(ReferenceAttribute.class, replaceReference);
shouldCollect = c.foldable();
}
if (shouldCollect) {
collectRefs.put(a.toAttribute(), Literal.of(c));
Comment on lines +409 to +422
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we need to make changes to PropagateEvalFoldables?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same ^^. Is it to accelerate the resolution? Wouldn't the first c.foldable() eventually return true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to mention this - while running debugging I've noticed this rule is doing only one transform per run causing the optimizer to run multiple times. So I've modified it so it can do the replacement in one go instead of multiple method calls.
An example is this:

eval x = 1
eval y = x
eval z = x + 3
eval w = z + y

Previously the rule had to run 4 times, now it does it in the first run.

}
});
if (collectRefs.isEmpty()) {
return plan;
}
java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefs.resolve(r, r);

plan = plan.transformUp(p -> {
// Apply the replacement inside Filter and Eval (which shouldn't make a difference)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ public String describe() {
Stream.of(sinkOperatorFactory)
).map(Describable::describe).collect(joining("\n\\_", "\\_", ""));
}

@Override
public String toString() {
return describe();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
Expand All @@ -32,19 +33,24 @@
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.type.EsField;
import org.hamcrest.Matchers;
import org.junit.BeforeClass;

import java.util.List;
import java.util.Locale;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.L;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_SEARCH_STATS;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForExistingField;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForMissingField;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -299,6 +305,48 @@ public void testIsNotNullOnExpression() {
var source = as(filter.child(), EsRelation.class);
}

public void testSparseDocument() throws Exception {
var query = """
from large
| keep field00*
| limit 10
""";

int size = 256;
Map<String, EsField> large = Maps.newLinkedHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
var name = String.format(Locale.ROOT, "field%03d", i);
large.put(name, new EsField(name, DataTypes.INTEGER, emptyMap(), true, false));
}

SearchStats searchStats = statsForExistingField("field000", "field001", "field002", "field003", "field004");

EsIndex index = new EsIndex("large", large);
IndexResolution getIndexResult = IndexResolution.valid(index);
var logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG));

var analyzer = new Analyzer(
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, EsqlTestUtils.emptyPolicyResolution()),
TEST_VERIFIER
);

var analyzed = analyzer.analyze(parser.createStatement(query));
var optimized = logicalOptimizer.optimize(analyzed);
var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, searchStats);
var plan = new LocalLogicalPlanOptimizer(localContext).localOptimize(optimized);

var project = as(plan, Project.class);
assertThat(project.projections(), hasSize(10));
assertThat(
Expressions.names(project.projections()),
contains("field000", "field001", "field002", "field003", "field004", "field005", "field006", "field007", "field008", "field009")
);
var eval = as(project.child(), Eval.class);
var field = eval.fields().get(0);
assertThat(Expressions.name(field), is("field005"));
alex-spies marked this conversation as resolved.
Show resolved Hide resolved
assertThat(Alias.unwrap(field).fold(), Matchers.nullValue());
}

private LocalRelation asEmptyRelation(Object o) {
var empty = as(o, LocalRelation.class);
assertThat(empty.supplier(), is(LocalSupplier.EMPTY));
Expand Down
Loading