Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: Refactor Join inside the planner #115813

Merged
merged 10 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.sandbox.document.HalfFloatPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -600,7 +601,10 @@ else if (Files.isDirectory(path)) {
Files.walkFileTree(path, EnumSet.allOf(FileVisitOption.class), 1, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
if (Regex.simpleMatch(filePattern, file.toString())) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix to make CsvTests work inside the IDE against individual files ("lookup.csv-spec" vs "*.csv-spec)

// remove the path folder from the URL
String name = Strings.replace(file.toUri().toString(), path.toUri().toString(), StringUtils.EMPTY);
Tuple<String, String> entrySplit = pathAndName(name);
if (root.equals(entrySplit.v1()) && Regex.simpleMatch(filePattern, entrySplit.v2())) {
matches.add(file.toUri().toURL());
}
return FileVisitResult.CONTINUE;
Expand Down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commented tests are failing - the plan is to revisit them once lookup join is properly added. Right now this is not a priority...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a comment so that these tests can easily be found later. As they are now, it's just an ignored set of tests.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
maxOfInt
maxOfInt-Ignore
required_capability: inlinestats

// tag::max-languages[]
Expand Down Expand Up @@ -54,7 +54,7 @@ emp_no:integer | avg_worked_seconds:long | gender:keyword | max_avg_worked_secon
10030 | 394597613 | M | 394597613
;

maxOfLong
maxOfLong-Ignore
required_capability: inlinestats

FROM employees
Expand Down Expand Up @@ -185,7 +185,7 @@ emp_no:integer | avg_worked_seconds:long | languages:integer | height:double | m
;


two
two-Ignore
required_capability: inlinestats

FROM employees
Expand Down Expand Up @@ -281,7 +281,7 @@ abbrev:keyword | type:keyword | scalerank:integer | min_scalerank:integer
GWL | [mid, military] | 9 | [2, 4]
;

afterStats
afterStats-Ignore
required_capability: inlinestats

FROM airports
Expand Down Expand Up @@ -322,7 +322,7 @@ abbrev:keyword | country:keyword | count:long
BDQ | India | 50
;

afterLookup
afterLookup-Ignore
required_capability: inlinestats

FROM airports
Expand Down Expand Up @@ -364,7 +364,7 @@ abbrev:keyword | city:keyword | region:text | "COUNT(*)":long
FUK | Fukuoka | 中央区 | 2
;

beforeStats
beforeStats-Ignore
required_capability: inlinestats

FROM airports
Expand Down Expand Up @@ -424,7 +424,7 @@ abbrev:keyword | type:keyword | city:keyword | "COUNT(*)":long | region:te
ACA | major | Acapulco de Juárez | 385 | Acapulco de Juárez
;

beforeAndAfterEnrich
beforeAndAfterEnrich-Ignore
required_capability: inlinestats
required_capability: enrich_load

Expand All @@ -445,7 +445,7 @@ abbrev:keyword | type:keyword | city:keyword | "COUNT(*)":long | region:te
;


shadowing
shadowing-Ignore
required_capability: inlinestats

ROW left = "left", client_ip = "172.21.0.5", env = "env", right = "right"
Expand All @@ -456,7 +456,7 @@ left:keyword | client_ip:keyword | right:keyword | env:keyword
left | 172.21.0.5 | right | right
;

shadowingMulti
shadowingMulti-Ignore
required_capability: inlinestats

ROW left = "left", airport = "Zurich Airport ZRH", city = "Zürich", middle = "middle", region = "North-East Switzerland", right = "right"
Expand All @@ -467,7 +467,7 @@ left:keyword | city:keyword | middle:keyword | right:keyword | airport:keyword |
left | Zürich | middle | right | left | left | left
;

shadowingSelf
shadowingSelf-Ignore
required_capability: inlinestats

ROW city="Raleigh"
Expand Down Expand Up @@ -537,7 +537,7 @@ emp_no:integer | one:integer
10005 | 1
;

percentile
percentile-Ignore
required_capability: inlinestats

FROM employees
Expand Down Expand Up @@ -575,7 +575,7 @@ abbrev:keyword | scalerank:integer | location:geo_point
ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | 20 | -100 | 2
;

byTwoCalculatedSecondOverwrites
byTwoCalculatedSecondOverwrites-Ignore
required_capability: inlinestats_v2

FROM airports
Expand All @@ -594,7 +594,7 @@ abbrev:keyword | scalerank:integer | location:geo_point
ZLO | 7 | POINT (-104.560095200097 19.1480860285854) | -100 | 2
;

byTwoCalculatedSecondOverwritesReferencingFirst
byTwoCalculatedSecondOverwritesReferencingFirst-Ignore
required_capability: inlinestats_v2

FROM airports
Expand All @@ -615,7 +615,7 @@ abbrev:keyword | scalerank:integer | location:geo_point
;


groupShadowsAgg
groupShadowsAgg-Ignore
required_capability: inlinestats_v2

FROM airports
Expand Down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only two tests are failing and the issue is name collision/merging in join.
Because there's no qualifier, given a field x we cannot determine at runtime from what side to extract the field - e.g.

FROM l | LOOKUP r ON k | KEEP a

during physical planning we know k is available on both l and r but have no idea whether a comes from l or r and thus cannot determine on what branch to place the field extractors.
There are different solutions to this approach but are outside of this PR - the most comprehensive one imo is name qualifiers but it's also the most complicated.

Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ aa:keyword | ab:keyword | na:integer | nb:integer
bar | bar | null | null
;

lookupBeforeStats
# needs qualifiers for proper field resolution and extraction
lookupBeforeStats-Ignore
required_capability: lookup_v4
FROM employees
| RENAME languages AS int
Expand Down Expand Up @@ -212,7 +213,8 @@ emp_no:integer | languages:long | name:keyword
10004 | 5 | five
;

lookupBeforeSort
# needs qualifiers for field resolution
lookupBeforeSort-Ignore
required_capability: lookup_v4
FROM employees
| WHERE emp_no < 10005
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,7 @@ foo:long | client_ip:ip
8268153 | 172.21.3.15
;

multiIndexIndirectUseOfUnionTypesInInlineStats
multiIndexIndirectUseOfUnionTypesInInlineStats-Ignore
// TODO: `union_types` is required only because this makes the test skip in the csv tests; better solution:
// make the csv tests work with multiple indices.
required_capability: union_types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.In;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.plan.TableIdentifier;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Drop;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
Expand All @@ -72,7 +73,6 @@
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Rename;
import org.elasticsearch.xpack.esql.plan.logical.Stats;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
Expand Down Expand Up @@ -405,8 +405,8 @@ protected LogicalPlan doRule(LogicalPlan plan) {
childrenOutput.addAll(output);
}

if (plan instanceof Stats stats) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By exposing the Aggregate, there's no reason to have the Stats interface anymore.
The upside is the Aggregate resolution gets reused automatically (including nested filters, etc..)

return resolveStats(stats, childrenOutput);
if (plan instanceof Aggregate aggregate) {
return resolveAggregate(aggregate, childrenOutput);
}

if (plan instanceof Drop d) {
Expand Down Expand Up @@ -440,12 +440,12 @@ protected LogicalPlan doRule(LogicalPlan plan) {
return plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
}

private LogicalPlan resolveStats(Stats stats, List<Attribute> childrenOutput) {
private Aggregate resolveAggregate(Aggregate aggregate, List<Attribute> childrenOutput) {
// if the grouping is resolved but the aggs are not, use the former to resolve the latter
// e.g. STATS a ... GROUP BY a = x + 1
Holder<Boolean> changed = new Holder<>(false);
List<Expression> groupings = stats.groupings();
List<? extends NamedExpression> aggregates = stats.aggregates();
List<Expression> groupings = aggregate.groupings();
List<? extends NamedExpression> aggregates = aggregate.aggregates();
// first resolve groupings since the aggs might refer to them
// trying to globally resolve unresolved attributes will lead to some being marked as unresolvable
if (Resolvables.resolved(groupings) == false) {
Expand All @@ -459,7 +459,7 @@ private LogicalPlan resolveStats(Stats stats, List<Attribute> childrenOutput) {
}
groupings = newGroupings;
if (changed.get()) {
stats = stats.with(stats.child(), newGroupings, stats.aggregates());
aggregate = aggregate.with(aggregate.child(), newGroupings, aggregate.aggregates());
changed.set(false);
}
}
Expand All @@ -475,8 +475,8 @@ private LogicalPlan resolveStats(Stats stats, List<Attribute> childrenOutput) {
List<Attribute> resolvedList = NamedExpressions.mergeOutputAttributes(resolved, childrenOutput);

List<NamedExpression> newAggregates = new ArrayList<>();
for (NamedExpression aggregate : stats.aggregates()) {
var agg = (NamedExpression) aggregate.transformUp(UnresolvedAttribute.class, ua -> {
for (NamedExpression ag : aggregate.aggregates()) {
var agg = (NamedExpression) ag.transformUp(UnresolvedAttribute.class, ua -> {
Expression ne = ua;
Attribute maybeResolved = maybeResolveAttribute(ua, resolvedList);
if (maybeResolved != null) {
Expand All @@ -489,10 +489,10 @@ private LogicalPlan resolveStats(Stats stats, List<Attribute> childrenOutput) {
}

// TODO: remove this when Stats interface is removed
stats = changed.get() ? stats.with(stats.child(), groupings, newAggregates) : stats;
aggregate = changed.get() ? aggregate.with(aggregate.child(), groupings, newAggregates) : aggregate;
}

return (LogicalPlan) stats;
return aggregate;
}

private LogicalPlan resolveMvExpand(MvExpand p, List<Attribute> childrenOutput) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
import org.elasticsearch.xpack.esql.expression.function.EsqlFunctionRegistry;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.planner.Mapper;
import org.elasticsearch.xpack.esql.planner.mapper.Mapper;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlSession;
import org.elasticsearch.xpack.esql.session.IndexResolver;
Expand All @@ -29,8 +28,6 @@
import org.elasticsearch.xpack.esql.stats.PlanningMetricsManager;
import org.elasticsearch.xpack.esql.stats.QueryMetric;

import java.util.function.BiConsumer;

import static org.elasticsearch.action.ActionListener.wrap;

public class PlanExecutor {
Expand All @@ -47,7 +44,7 @@ public PlanExecutor(IndexResolver indexResolver, MeterRegistry meterRegistry) {
this.indexResolver = indexResolver;
this.preAnalyzer = new PreAnalyzer();
this.functionRegistry = new EsqlFunctionRegistry();
this.mapper = new Mapper(functionRegistry);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

functionRegistry is not used inside the Mapper.

this.mapper = new Mapper();
this.metrics = new Metrics(functionRegistry);
this.verifier = new Verifier(metrics);
this.planningMetricsManager = new PlanningMetricsManager(meterRegistry);
Expand All @@ -60,7 +57,7 @@ public void esql(
EnrichPolicyResolver enrichPolicyResolver,
EsqlExecutionInfo executionInfo,
IndicesExpressionGrouper indicesExpressionGrouper,
BiConsumer<PhysicalPlan, ActionListener<Result>> runPhase,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduced a interface inside the generic BiConsumer to preserve the generics signature.

EsqlSession.PlanRunner planRunner,
ActionListener<Result> listener
) {
final PlanningMetrics planningMetrics = new PlanningMetrics();
Expand All @@ -79,7 +76,7 @@ public void esql(
);
QueryMetric clientId = QueryMetric.fromString("rest");
metrics.total(clientId);
session.execute(request, executionInfo, runPhase, wrap(x -> {
session.execute(request, executionInfo, planRunner, wrap(x -> {
planningMetricsManager.publish(planningMetrics, true);
listener.onResponse(x);
}, ex -> {
Expand Down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just renames

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEmptyRelation;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEquals;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateEvalFoldables;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateInlineEvals;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PropagateNullable;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneColumns;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PruneEmptyPlans;
Expand All @@ -39,13 +40,12 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownEval;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.PushDownRegexExtract;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.RemoveStatsOverride;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceAggregateAggExpressionWithEval;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceAggregateNestedExpressionWithEval;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceAliasingEvalWithProject;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceLimitAndSortAsTopN;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceLookupWithJoin;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceOrderByExpressionWithEval;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceRegexMatch;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStatsAggExpressionWithEval;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStatsNestedExpressionWithEval;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceTrivialTypeConversions;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SetAsOptimized;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SimplifyComparisonsArithmetics;
Expand All @@ -54,6 +54,7 @@
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SplitInWithFoldableValue;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteFilteredExpression;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSpatialSurrogates;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogatePlans;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SubstituteSurrogates;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.TranslateMetricsAggregate;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
Expand Down Expand Up @@ -121,26 +122,27 @@ protected static Batch<LogicalPlan> substitutions() {
return new Batch<>(
"Substitutions",
Limiter.ONCE,
new ReplaceLookupWithJoin(),
new SubstituteSurrogatePlans(),
// translate filtered expressions into aggregate with filters - can't use surrogate expressions because it was
// retrofitted for constant folding - this needs to be fixed
new SubstituteFilteredExpression(),
new RemoveStatsOverride(),
// first extract nested expressions inside aggs
new ReplaceStatsNestedExpressionWithEval(),
new ReplaceAggregateNestedExpressionWithEval(),
// then extract nested aggs top-level
new ReplaceStatsAggExpressionWithEval(),
new ReplaceAggregateAggExpressionWithEval(),
// lastly replace surrogate functions
new SubstituteSurrogates(),
// translate metric aggregates after surrogate substitution and replace nested expressions with eval (again)
new TranslateMetricsAggregate(),
new ReplaceStatsNestedExpressionWithEval(),
new ReplaceAggregateNestedExpressionWithEval(),
new ReplaceRegexMatch(),
new ReplaceTrivialTypeConversions(),
new ReplaceAliasingEvalWithProject(),
new SkipQueryOnEmptyMappings(),
new SubstituteSpatialSurrogates(),
new ReplaceOrderByExpressionWithEval()
new ReplaceOrderByExpressionWithEval(),
new PropagateInlineEvals()
// new NormalizeAggregate(), - waits on https://github.com/elastic/elasticsearch/issues/100634
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public CombineProjections() {
}

@Override
@SuppressWarnings("unchecked")
protected LogicalPlan rule(UnaryPlan plan) {
LogicalPlan child = plan.child();

Expand Down Expand Up @@ -67,7 +66,7 @@ protected LogicalPlan rule(UnaryPlan plan) {
if (grouping instanceof Attribute attribute) {
groupingAttrs.add(attribute);
} else {
// After applying ReplaceStatsNestedExpressionWithEval, groupings can only contain attributes.
// After applying ReplaceAggregateNestedExpressionWithEval, groupings can only contain attributes.
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
}
}
Expand Down
Loading