Skip to content

Commit

Permalink
ESQL: Reduce the number of Evals ReplaceMissingFieldWithNull creates (e…
Browse files Browse the repository at this point in the history
…lastic#104586)

Improve ReplaceMissingFieldWithNull to create just one eval (per
 datatype) for the missing value and have the rest point to it. This
 reduces the amount of EvalOperators created in the pipeline.
Preserve type information (one null eval per dataType)
Fix subtle error in LocalPhysicalPlanOptimizer test

Fix elastic#104583
  • Loading branch information
costin authored Jan 24, 2024
1 parent b2977d5 commit d6f900c
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 24 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/104586.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 104586
summary: Reduce the number of Evals `ReplaceMissingFieldWithNull` creates
area: ES|QL
type: bug
issues:
- 104583
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,21 @@ public static EnrichResolution emptyPolicyResolution() {
return new EnrichResolution();
}

public static SearchStats statsForExistingField(String... names) {
return fieldMatchingExistOrMissing(true, names);
}

public static SearchStats statsForMissingField(String... names) {
return fieldMatchingExistOrMissing(false, names);
}

private static SearchStats fieldMatchingExistOrMissing(boolean exists, String... names) {
return new TestSearchStats() {
private final Set<String> missingFields = Set.of(names);
private final Set<String> fields = Set.of(names);

@Override
public boolean exists(String field) {
return missingFields.contains(field) == false;
return fields.contains(field) == exists;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1314,7 +1315,7 @@ public void testStatsNestFields() {
}
}

public void testStatsMissingFields() {
public void testStatsMissingFieldWithStats() {
final String node1, node2;
if (randomBoolean()) {
internalCluster().ensureAtLeastNumDataNodes(2);
Expand Down Expand Up @@ -1353,6 +1354,39 @@ public void testStatsMissingFields() {
}
}

public void testStatsMissingFieldKeepApp() {
final String node1, node2;
if (randomBoolean()) {
internalCluster().ensureAtLeastNumDataNodes(2);
node1 = randomDataNode().getName();
node2 = randomValueOtherThan(node1, () -> randomDataNode().getName());
} else {
node1 = randomDataNode().getName();
node2 = randomDataNode().getName();
}
assertAcked(
client().admin()
.indices()
.prepareCreate("foo-index")
.setSettings(Settings.builder().put("index.routing.allocation.require._name", node1))
.setMapping("foo_int", "type=integer", "foo_long", "type=long", "foo_float", "type=float", "foo_double", "type=double")
);
assertAcked(
client().admin()
.indices()
.prepareCreate("bar-index")
.setSettings(Settings.builder().put("index.routing.allocation.require._name", node2))
.setMapping("bar_int", "type=integer", "bar_long", "type=long", "bar_float", "type=float", "bar_double", "type=double")
);
String command = String.format(Locale.ROOT, "from foo-index,bar-index");
try (var resp = run(command)) {
var valuesList = getValuesList(resp);
assertEquals(8, resp.columns().size());
assertEquals(0, valuesList.size());
assertEquals(Collections.emptyList(), valuesList);
}
}

public void testCountTextField() {
assertAcked(client().admin().indices().prepareCreate("test_count").setMapping("name", "type=text"));
int numDocs = between(10, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ public Object fold() {

@Override
public ExpressionEvaluator.Factory toEvaluator(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) {
// force datatype initialization
var dataType = dataType();
ExpressionEvaluator.Factory[] factories = children().stream()
.map(e -> toEvaluator.apply(new MvMax(e.source(), e)))
.toArray(ExpressionEvaluator.Factory[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public Object fold() {

@Override
public ExpressionEvaluator.Factory toEvaluator(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) {
// force datatype initialization
var dataType = dataType();

ExpressionEvaluator.Factory[] factories = children().stream()
.map(e -> toEvaluator.apply(new MvMin(e.source(), e)))
.toArray(ExpressionEvaluator.Factory[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.Alias;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.Expression;
Expand All @@ -43,6 +45,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Arrays.asList;
Expand Down Expand Up @@ -129,24 +132,35 @@ private LogicalPlan missingToNull(LogicalPlan plan, SearchStats stats) {
else if (plan instanceof Project project) {
var projections = project.projections();
List<NamedExpression> newProjections = new ArrayList<>(projections.size());
List<Alias> literals = new ArrayList<>();
Map<DataType, Alias> nullLiteral = Maps.newLinkedHashMapWithExpectedSize(EsqlDataTypes.types().size());

for (NamedExpression projection : projections) {
if (projection instanceof FieldAttribute f && stats.exists(f.qualifiedName()) == false) {
var alias = new Alias(f.source(), f.name(), null, Literal.of(f, null), f.id());
literals.add(alias);
newProjections.add(alias.toAttribute());
} else {
newProjections.add(projection);
DataType dt = f.dataType();
Alias nullAlias = nullLiteral.get(f.dataType());
// save the first field as null (per datatype)
if (nullAlias == null) {
Alias alias = new Alias(f.source(), f.name(), null, Literal.of(f, null), f.id());
nullLiteral.put(dt, alias);
projection = alias.toAttribute();
}
// otherwise point to it
else {
// since avoids creating field copies
projection = new Alias(f.source(), f.name(), f.qualifier(), nullAlias.toAttribute(), f.id());
}
}

newProjections.add(projection);
}
if (literals.size() > 0) {
plan = new Eval(project.source(), project.child(), literals);
// add the first found field as null
if (nullLiteral.size() > 0) {
plan = new Eval(project.source(), project.child(), new ArrayList<>(nullLiteral.values()));
plan = new Project(project.source(), plan, newProjections);
} else {
plan = project;
}
} else {
}
// otherwise transform fields in place
else {
plan = plan.transformExpressionsOnlyUp(
FieldAttribute.class,
f -> stats.exists(f.qualifiedName()) ? f : Literal.of(f, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,17 +406,25 @@ static class PropagateEvalFoldables extends Rule<LogicalPlan, LogicalPlan> {
@Override
public LogicalPlan apply(LogicalPlan plan) {
var collectRefs = new AttributeMap<Expression>();
// collect aliases

java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefs.resolve(r, r);

// collect aliases bottom-up
plan.forEachExpressionUp(Alias.class, a -> {
var c = a.child();
if (c.foldable()) {
collectRefs.put(a.toAttribute(), c);
boolean shouldCollect = c.foldable();
// try to resolve the expression based on an existing foldables
if (shouldCollect == false) {
c = c.transformUp(ReferenceAttribute.class, replaceReference);
shouldCollect = c.foldable();
}
if (shouldCollect) {
collectRefs.put(a.toAttribute(), Literal.of(c));
}
});
if (collectRefs.isEmpty()) {
return plan;
}
java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefs.resolve(r, r);

plan = plan.transformUp(p -> {
// Apply the replacement inside Filter and Eval (which shouldn't make a difference)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,11 @@ public String describe() {
Stream.of(sinkOperatorFactory)
).map(Describable::describe).collect(joining("\n\\_", "\\_", ""));
}

@Override
public String toString() {
return describe();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
Expand All @@ -32,19 +33,24 @@
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.type.EsField;
import org.hamcrest.Matchers;
import org.junit.BeforeClass;

import java.util.List;
import java.util.Locale;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.L;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_SEARCH_STATS;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForExistingField;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForMissingField;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -299,6 +305,48 @@ public void testIsNotNullOnExpression() {
var source = as(filter.child(), EsRelation.class);
}

public void testSparseDocument() throws Exception {
var query = """
from large
| keep field00*
| limit 10
""";

int size = 256;
Map<String, EsField> large = Maps.newLinkedHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
var name = String.format(Locale.ROOT, "field%03d", i);
large.put(name, new EsField(name, DataTypes.INTEGER, emptyMap(), true, false));
}

SearchStats searchStats = statsForExistingField("field000", "field001", "field002", "field003", "field004");

EsIndex index = new EsIndex("large", large);
IndexResolution getIndexResult = IndexResolution.valid(index);
var logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG));

var analyzer = new Analyzer(
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, EsqlTestUtils.emptyPolicyResolution()),
TEST_VERIFIER
);

var analyzed = analyzer.analyze(parser.createStatement(query));
var optimized = logicalOptimizer.optimize(analyzed);
var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, searchStats);
var plan = new LocalLogicalPlanOptimizer(localContext).localOptimize(optimized);

var project = as(plan, Project.class);
assertThat(project.projections(), hasSize(10));
assertThat(
Expressions.names(project.projections()),
contains("field000", "field001", "field002", "field003", "field004", "field005", "field006", "field007", "field008", "field009")
);
var eval = as(project.child(), Eval.class);
var field = eval.fields().get(0);
assertThat(Expressions.name(field), is("field005"));
assertThat(Alias.unwrap(field).fold(), Matchers.nullValue());
}

private LocalRelation asEmptyRelation(Object o) {
var empty = as(o, LocalRelation.class);
assertThat(empty.supplier(), is(LocalSupplier.EMPTY));
Expand Down
Loading

0 comments on commit d6f900c

Please sign in to comment.