Skip to content

Commit

Permalink
ESQL: Reduce the number of Evals ReplaceMissingFieldWithNull creates (#…
Browse files Browse the repository at this point in the history
…104586) (#104669)

Improve ReplaceMissingFieldWithNull to create just one eval (per
 datatype) for the missing value and have the rest point to it. This
 reduces the amount of EvalOperators created in the pipeline.
Preserve type information (one null eval per dataType)
Fix subtle error in LocalPhysicalPlanOptimizer test

Fix #104583

(cherry picked from commit d6f900c)
  • Loading branch information
costin authored Jan 24, 2024
1 parent 1024fc9 commit 744db9e
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 23 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/104586.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 104586
summary: Reduce the number of Evals `ReplaceMissingFieldWithNull` creates
area: ES|QL
type: bug
issues:
- 104583
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,21 @@ public static EnrichResolution emptyPolicyResolution() {
return new EnrichResolution(Set.of(), Set.of());
}

public static SearchStats statsForExistingField(String... names) {
return fieldMatchingExistOrMissing(true, names);
}

public static SearchStats statsForMissingField(String... names) {
return fieldMatchingExistOrMissing(false, names);
}

private static SearchStats fieldMatchingExistOrMissing(boolean exists, String... names) {
return new TestSearchStats() {
private final Set<String> missingFields = Set.of(names);
private final Set<String> fields = Set.of(names);

@Override
public boolean exists(String field) {
return missingFields.contains(field) == false;
return fields.contains(field) == exists;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1287,7 +1288,7 @@ public void testStatsNestFields() {
}
}

public void testStatsMissingFields() {
public void testStatsMissingFieldWithStats() {
final String node1, node2;
if (randomBoolean()) {
internalCluster().ensureAtLeastNumDataNodes(2);
Expand Down Expand Up @@ -1326,6 +1327,39 @@ public void testStatsMissingFields() {
}
}

public void testStatsMissingFieldKeepApp() {
final String node1, node2;
if (randomBoolean()) {
internalCluster().ensureAtLeastNumDataNodes(2);
node1 = randomDataNode().getName();
node2 = randomValueOtherThan(node1, () -> randomDataNode().getName());
} else {
node1 = randomDataNode().getName();
node2 = randomDataNode().getName();
}
assertAcked(
client().admin()
.indices()
.prepareCreate("foo-index")
.setSettings(Settings.builder().put("index.routing.allocation.require._name", node1))
.setMapping("foo_int", "type=integer", "foo_long", "type=long", "foo_float", "type=float", "foo_double", "type=double")
);
assertAcked(
client().admin()
.indices()
.prepareCreate("bar-index")
.setSettings(Settings.builder().put("index.routing.allocation.require._name", node2))
.setMapping("bar_int", "type=integer", "bar_long", "type=long", "bar_float", "type=float", "bar_double", "type=double")
);
String command = String.format(Locale.ROOT, "from foo-index,bar-index");
try (var resp = run(command)) {
var valuesList = getValuesList(resp);
assertEquals(8, resp.columns().size());
assertEquals(0, valuesList.size());
assertEquals(Collections.emptyList(), valuesList);
}
}

public void testCountTextField() {
assertAcked(client().admin().indices().prepareCreate("test_count").setMapping("name", "type=text"));
int numDocs = between(10, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public Object fold() {

@Override
public ExpressionEvaluator.Factory toEvaluator(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) {
// force datatype initialization
var dataType = dataType();
ExpressionEvaluator.Factory[] factories = children().stream()
.map(e -> toEvaluator.apply(new MvMax(e.source(), e)))
.toArray(ExpressionEvaluator.Factory[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public Object fold() {

@Override
public ExpressionEvaluator.Factory toEvaluator(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) {
// force datatype initialization
var dataType = dataType();

ExpressionEvaluator.Factory[] factories = children().stream()
.map(e -> toEvaluator.apply(new MvMin(e.source(), e)))
.toArray(ExpressionEvaluator.Factory[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.Alias;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
Expand All @@ -37,6 +39,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static java.util.Arrays.asList;
import static org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer.cleanup;
Expand Down Expand Up @@ -115,24 +118,35 @@ private LogicalPlan missingToNull(LogicalPlan plan, SearchStats stats) {
else if (plan instanceof Project project) {
var projections = project.projections();
List<NamedExpression> newProjections = new ArrayList<>(projections.size());
List<Alias> literals = new ArrayList<>();
Map<DataType, Alias> nullLiteral = Maps.newLinkedHashMapWithExpectedSize(EsqlDataTypes.types().size());

for (NamedExpression projection : projections) {
if (projection instanceof FieldAttribute f && stats.exists(f.qualifiedName()) == false) {
var alias = new Alias(f.source(), f.name(), null, Literal.of(f, null), f.id());
literals.add(alias);
newProjections.add(alias.toAttribute());
} else {
newProjections.add(projection);
DataType dt = f.dataType();
Alias nullAlias = nullLiteral.get(f.dataType());
// save the first field as null (per datatype)
if (nullAlias == null) {
Alias alias = new Alias(f.source(), f.name(), null, Literal.of(f, null), f.id());
nullLiteral.put(dt, alias);
projection = alias.toAttribute();
}
// otherwise point to it
else {
// since avoids creating field copies
projection = new Alias(f.source(), f.name(), f.qualifier(), nullAlias.toAttribute(), f.id());
}
}

newProjections.add(projection);
}
if (literals.size() > 0) {
plan = new Eval(project.source(), project.child(), literals);
// add the first found field as null
if (nullLiteral.size() > 0) {
plan = new Eval(project.source(), project.child(), new ArrayList<>(nullLiteral.values()));
plan = new Project(project.source(), plan, newProjections);
} else {
plan = project;
}
} else {
}
// otherwise transform fields in place
else {
plan = plan.transformExpressionsOnlyUp(
FieldAttribute.class,
f -> stats.exists(f.qualifiedName()) ? f : Literal.of(f, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,17 +405,25 @@ static class PropagateEvalFoldables extends Rule<LogicalPlan, LogicalPlan> {
@Override
public LogicalPlan apply(LogicalPlan plan) {
var collectRefs = new AttributeMap<Expression>();
// collect aliases

java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefs.resolve(r, r);

// collect aliases bottom-up
plan.forEachExpressionUp(Alias.class, a -> {
var c = a.child();
if (c.foldable()) {
collectRefs.put(a.toAttribute(), c);
boolean shouldCollect = c.foldable();
// try to resolve the expression based on an existing foldables
if (shouldCollect == false) {
c = c.transformUp(ReferenceAttribute.class, replaceReference);
shouldCollect = c.foldable();
}
if (shouldCollect) {
collectRefs.put(a.toAttribute(), Literal.of(c));
}
});
if (collectRefs.isEmpty()) {
return plan;
}
java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefs.resolve(r, r);

plan = plan.transformUp(p -> {
// Apply the replacement inside Filter and Eval (which shouldn't make a difference)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,11 @@ public String describe() {
Stream.of(sinkOperatorFactory)
).map(Describable::describe).collect(joining("\n\\_", "\\_", ""));
}

@Override
public String toString() {
return describe();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
Expand All @@ -29,18 +30,24 @@
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.type.EsField;
import org.hamcrest.Matchers;
import org.junit.BeforeClass;

import java.util.List;
import java.util.Locale;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.L;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_SEARCH_STATS;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForExistingField;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForMissingField;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -263,6 +270,48 @@ public void testMissingFieldInFilterNoProjection() {
);
}

public void testSparseDocument() throws Exception {
var query = """
from large
| keep field00*
| limit 10
""";

int size = 256;
Map<String, EsField> large = Maps.newLinkedHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
var name = String.format(Locale.ROOT, "field%03d", i);
large.put(name, new EsField(name, DataTypes.INTEGER, emptyMap(), true, false));
}

SearchStats searchStats = statsForExistingField("field000", "field001", "field002", "field003", "field004");

EsIndex index = new EsIndex("large", large);
IndexResolution getIndexResult = IndexResolution.valid(index);
var logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG));

var analyzer = new Analyzer(
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, EsqlTestUtils.emptyPolicyResolution()),
TEST_VERIFIER
);

var analyzed = analyzer.analyze(parser.createStatement(query));
var optimized = logicalOptimizer.optimize(analyzed);
var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, searchStats);
var plan = new LocalLogicalPlanOptimizer(localContext).localOptimize(optimized);

var project = as(plan, Project.class);
assertThat(project.projections(), hasSize(10));
assertThat(
Expressions.names(project.projections()),
contains("field000", "field001", "field002", "field003", "field004", "field005", "field006", "field007", "field008", "field009")
);
var eval = as(project.child(), Eval.class);
var field = eval.fields().get(0);
assertThat(Expressions.name(field), is("field005"));
assertThat(field.child().fold(), Matchers.nullValue());
}

private LocalRelation asEmptyRelation(Object o) {
var empty = as(o, LocalRelation.class);
assertThat(empty.supplier(), is(LocalSupplier.EMPTY));
Expand All @@ -285,6 +334,10 @@ private LogicalPlan localPlan(LogicalPlan plan, SearchStats searchStats) {
return localPlan;
}

private LogicalPlan localPlan(String query) {
return localPlan(plan(query), TEST_SEARCH_STATS);
}

@Override
protected List<String> filteredWarnings() {
return withDefaultLimitWarning(super.filteredWarnings());
Expand Down
Loading

0 comments on commit 744db9e

Please sign in to comment.