Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Reduce the number of Evals ReplaceMissingFieldWithNull creates #104669

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/104586.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 104586
summary: Reduce the number of Evals `ReplaceMissingFieldWithNull` creates
area: ES|QL
type: bug
issues:
- 104583
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,21 @@ public static EnrichResolution emptyPolicyResolution() {
return new EnrichResolution(Set.of(), Set.of());
}

public static SearchStats statsForExistingField(String... names) {
return fieldMatchingExistOrMissing(true, names);
}

public static SearchStats statsForMissingField(String... names) {
return fieldMatchingExistOrMissing(false, names);
}

private static SearchStats fieldMatchingExistOrMissing(boolean exists, String... names) {
return new TestSearchStats() {
private final Set<String> missingFields = Set.of(names);
private final Set<String> fields = Set.of(names);

@Override
public boolean exists(String field) {
return missingFields.contains(field) == false;
return fields.contains(field) == exists;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1287,7 +1288,7 @@ public void testStatsNestFields() {
}
}

public void testStatsMissingFields() {
public void testStatsMissingFieldWithStats() {
final String node1, node2;
if (randomBoolean()) {
internalCluster().ensureAtLeastNumDataNodes(2);
Expand Down Expand Up @@ -1326,6 +1327,39 @@ public void testStatsMissingFields() {
}
}

public void testStatsMissingFieldKeepApp() {
final String node1, node2;
if (randomBoolean()) {
internalCluster().ensureAtLeastNumDataNodes(2);
node1 = randomDataNode().getName();
node2 = randomValueOtherThan(node1, () -> randomDataNode().getName());
} else {
node1 = randomDataNode().getName();
node2 = randomDataNode().getName();
}
assertAcked(
client().admin()
.indices()
.prepareCreate("foo-index")
.setSettings(Settings.builder().put("index.routing.allocation.require._name", node1))
.setMapping("foo_int", "type=integer", "foo_long", "type=long", "foo_float", "type=float", "foo_double", "type=double")
);
assertAcked(
client().admin()
.indices()
.prepareCreate("bar-index")
.setSettings(Settings.builder().put("index.routing.allocation.require._name", node2))
.setMapping("bar_int", "type=integer", "bar_long", "type=long", "bar_float", "type=float", "bar_double", "type=double")
);
String command = String.format(Locale.ROOT, "from foo-index,bar-index");
try (var resp = run(command)) {
var valuesList = getValuesList(resp);
assertEquals(8, resp.columns().size());
assertEquals(0, valuesList.size());
assertEquals(Collections.emptyList(), valuesList);
}
}

public void testCountTextField() {
assertAcked(client().admin().indices().prepareCreate("test_count").setMapping("name", "type=text"));
int numDocs = between(10, 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public Object fold() {

@Override
public ExpressionEvaluator.Factory toEvaluator(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) {
// force datatype initialization
var dataType = dataType();
ExpressionEvaluator.Factory[] factories = children().stream()
.map(e -> toEvaluator.apply(new MvMax(e.source(), e)))
.toArray(ExpressionEvaluator.Factory[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ public Object fold() {

@Override
public ExpressionEvaluator.Factory toEvaluator(Function<Expression, ExpressionEvaluator.Factory> toEvaluator) {
// force datatype initialization
var dataType = dataType();

ExpressionEvaluator.Factory[] factories = children().stream()
.map(e -> toEvaluator.apply(new MvMin(e.source(), e)))
.toArray(ExpressionEvaluator.Factory[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.stats.SearchStats;
import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
import org.elasticsearch.xpack.ql.expression.Alias;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.FieldAttribute;
Expand All @@ -37,6 +39,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static java.util.Arrays.asList;
import static org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer.cleanup;
Expand Down Expand Up @@ -115,24 +118,35 @@ private LogicalPlan missingToNull(LogicalPlan plan, SearchStats stats) {
else if (plan instanceof Project project) {
var projections = project.projections();
List<NamedExpression> newProjections = new ArrayList<>(projections.size());
List<Alias> literals = new ArrayList<>();
Map<DataType, Alias> nullLiteral = Maps.newLinkedHashMapWithExpectedSize(EsqlDataTypes.types().size());

for (NamedExpression projection : projections) {
if (projection instanceof FieldAttribute f && stats.exists(f.qualifiedName()) == false) {
var alias = new Alias(f.source(), f.name(), null, Literal.of(f, null), f.id());
literals.add(alias);
newProjections.add(alias.toAttribute());
} else {
newProjections.add(projection);
DataType dt = f.dataType();
Alias nullAlias = nullLiteral.get(f.dataType());
// save the first field as null (per datatype)
if (nullAlias == null) {
Alias alias = new Alias(f.source(), f.name(), null, Literal.of(f, null), f.id());
nullLiteral.put(dt, alias);
projection = alias.toAttribute();
}
// otherwise point to it
else {
// since avoids creating field copies
projection = new Alias(f.source(), f.name(), f.qualifier(), nullAlias.toAttribute(), f.id());
}
}

newProjections.add(projection);
}
if (literals.size() > 0) {
plan = new Eval(project.source(), project.child(), literals);
// add the first found field as null
if (nullLiteral.size() > 0) {
plan = new Eval(project.source(), project.child(), new ArrayList<>(nullLiteral.values()));
plan = new Project(project.source(), plan, newProjections);
} else {
plan = project;
}
} else {
}
// otherwise transform fields in place
else {
plan = plan.transformExpressionsOnlyUp(
FieldAttribute.class,
f -> stats.exists(f.qualifiedName()) ? f : Literal.of(f, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,17 +405,25 @@ static class PropagateEvalFoldables extends Rule<LogicalPlan, LogicalPlan> {
@Override
public LogicalPlan apply(LogicalPlan plan) {
var collectRefs = new AttributeMap<Expression>();
// collect aliases

java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefs.resolve(r, r);

// collect aliases bottom-up
plan.forEachExpressionUp(Alias.class, a -> {
var c = a.child();
if (c.foldable()) {
collectRefs.put(a.toAttribute(), c);
boolean shouldCollect = c.foldable();
// try to resolve the expression based on an existing foldables
if (shouldCollect == false) {
c = c.transformUp(ReferenceAttribute.class, replaceReference);
shouldCollect = c.foldable();
}
if (shouldCollect) {
collectRefs.put(a.toAttribute(), Literal.of(c));
}
});
if (collectRefs.isEmpty()) {
return plan;
}
java.util.function.Function<ReferenceAttribute, Expression> replaceReference = r -> collectRefs.resolve(r, r);

plan = plan.transformUp(p -> {
// Apply the replacement inside Filter and Eval (which shouldn't make a difference)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,11 @@ public String describe() {
Stream.of(sinkOperatorFactory)
).map(Describable::describe).collect(joining("\n\\_", "\\_", ""));
}

@Override
public String toString() {
return describe();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.optimizer;

import org.elasticsearch.common.util.Maps;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.analysis.Analyzer;
Expand All @@ -29,18 +30,24 @@
import org.elasticsearch.xpack.ql.plan.logical.Project;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.type.EsField;
import org.hamcrest.Matchers;
import org.junit.BeforeClass;

import java.util.List;
import java.util.Locale;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.L;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_SEARCH_STATS;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.TEST_VERIFIER;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.as;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.loadMapping;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForExistingField;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.statsForMissingField;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.withDefaultLimitWarning;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;

Expand Down Expand Up @@ -263,6 +270,48 @@ public void testMissingFieldInFilterNoProjection() {
);
}

public void testSparseDocument() throws Exception {
var query = """
from large
| keep field00*
| limit 10
""";

int size = 256;
Map<String, EsField> large = Maps.newLinkedHashMapWithExpectedSize(size);
for (int i = 0; i < size; i++) {
var name = String.format(Locale.ROOT, "field%03d", i);
large.put(name, new EsField(name, DataTypes.INTEGER, emptyMap(), true, false));
}

SearchStats searchStats = statsForExistingField("field000", "field001", "field002", "field003", "field004");

EsIndex index = new EsIndex("large", large);
IndexResolution getIndexResult = IndexResolution.valid(index);
var logicalOptimizer = new LogicalPlanOptimizer(new LogicalOptimizerContext(EsqlTestUtils.TEST_CFG));

var analyzer = new Analyzer(
new AnalyzerContext(EsqlTestUtils.TEST_CFG, new EsqlFunctionRegistry(), getIndexResult, EsqlTestUtils.emptyPolicyResolution()),
TEST_VERIFIER
);

var analyzed = analyzer.analyze(parser.createStatement(query));
var optimized = logicalOptimizer.optimize(analyzed);
var localContext = new LocalLogicalOptimizerContext(EsqlTestUtils.TEST_CFG, searchStats);
var plan = new LocalLogicalPlanOptimizer(localContext).localOptimize(optimized);

var project = as(plan, Project.class);
assertThat(project.projections(), hasSize(10));
assertThat(
Expressions.names(project.projections()),
contains("field000", "field001", "field002", "field003", "field004", "field005", "field006", "field007", "field008", "field009")
);
var eval = as(project.child(), Eval.class);
var field = eval.fields().get(0);
assertThat(Expressions.name(field), is("field005"));
assertThat(field.child().fold(), Matchers.nullValue());
}

private LocalRelation asEmptyRelation(Object o) {
var empty = as(o, LocalRelation.class);
assertThat(empty.supplier(), is(LocalSupplier.EMPTY));
Expand All @@ -285,6 +334,10 @@ private LogicalPlan localPlan(LogicalPlan plan, SearchStats searchStats) {
return localPlan;
}

private LogicalPlan localPlan(String query) {
return localPlan(plan(query), TEST_SEARCH_STATS);
}

@Override
protected List<String> filteredWarnings() {
return withDefaultLimitWarning(super.filteredWarnings());
Expand Down
Loading