-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
EQL: Introduce basic execution pipeline #51809
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.analysis; | ||
|
||
import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry; | ||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; | ||
import org.elasticsearch.xpack.ql.rule.RuleExecutor; | ||
|
||
import java.util.Collection; | ||
|
||
import static java.util.Arrays.asList; | ||
|
||
public class Analyzer extends RuleExecutor<LogicalPlan> { | ||
|
||
private final FunctionRegistry functionRegistry; | ||
private final Verifier verifier; | ||
|
||
public Analyzer(FunctionRegistry functionRegistry, Verifier verifier) { | ||
this.functionRegistry = functionRegistry; | ||
this.verifier = verifier; | ||
} | ||
|
||
@Override | ||
protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() { | ||
Batch resolution = new Batch("Resolution"); | ||
|
||
return asList(resolution); | ||
} | ||
|
||
public LogicalPlan analyze(LogicalPlan plan) { | ||
return verify(execute(plan)); | ||
} | ||
|
||
private LogicalPlan verify(LogicalPlan plan) { | ||
Collection<Failure> failures = verifier.verify(plan); | ||
if (!failures.isEmpty()) { | ||
throw new VerificationException(failures); | ||
} | ||
return plan; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.analysis; | ||
|
||
import org.elasticsearch.xpack.ql.tree.Node; | ||
|
||
import java.util.Objects; | ||
|
||
import static org.elasticsearch.common.logging.LoggerMessageFormat.format; | ||
|
||
class Failure { | ||
|
||
private final Node<?> node; | ||
private final String message; | ||
|
||
Failure(Node<?> node, String message) { | ||
this.node = node; | ||
this.message = message; | ||
} | ||
|
||
Node<?> node() { | ||
return node; | ||
} | ||
|
||
String message() { | ||
return message; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(message, node); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (this == obj) { | ||
return true; | ||
} | ||
|
||
if (obj == null || getClass() != obj.getClass()) { | ||
return false; | ||
} | ||
|
||
Failure other = (Failure) obj; | ||
return Objects.equals(message, other.message) && Objects.equals(node, other.node); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return message; | ||
} | ||
|
||
static Failure fail(Node<?> source, String message, Object... args) { | ||
return new Failure(source, format(message, args)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.analysis; | ||
|
||
import org.elasticsearch.xpack.ql.index.IndexResolution; | ||
import org.elasticsearch.xpack.ql.plan.logical.EsRelation; | ||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; | ||
import org.elasticsearch.xpack.ql.plan.logical.UnresolvedRelation; | ||
|
||
public class PreAnalyzer { | ||
|
||
public LogicalPlan preAnalyze(LogicalPlan plan, IndexResolution indices) { | ||
if (plan.analyzed() == false) { | ||
// FIXME: includeFrozen needs to be set already | ||
plan = plan.transformUp(r -> new EsRelation(r.source(), indices.get(), false), UnresolvedRelation.class); | ||
plan.forEachUp(LogicalPlan::setPreAnalyzed); | ||
} | ||
return plan; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.eql.analysis; | ||
|
||
import org.elasticsearch.rest.RestStatus; | ||
import org.elasticsearch.xpack.eql.EqlClientException; | ||
import org.elasticsearch.xpack.ql.tree.Location; | ||
import org.elasticsearch.xpack.ql.util.StringUtils; | ||
|
||
import java.util.Collection; | ||
import java.util.stream.Collectors; | ||
|
||
public class VerificationException extends EqlClientException { | ||
|
||
protected VerificationException(Collection<Failure> sources) { | ||
super(asMessage(sources)); | ||
} | ||
|
||
private static String asMessage(Collection<Failure> failures) { | ||
return failures.stream().map(f -> { | ||
Location l = f.node().source().source(); | ||
return "line " + l.getLineNumber() + ":" + l.getColumnNumber() + ": " + f.message(); | ||
}).collect(Collectors.joining(StringUtils.NEW_LINE, "Found " + failures.size() + " problem(s)\n", StringUtils.EMPTY)); | ||
} | ||
|
||
@Override | ||
public RestStatus status() { | ||
return RestStatus.BAD_REQUEST; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.analysis; | ||
|
||
import org.elasticsearch.xpack.ql.capabilities.Unresolvable; | ||
import org.elasticsearch.xpack.ql.expression.Attribute; | ||
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute; | ||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; | ||
import org.elasticsearch.xpack.ql.tree.Node; | ||
import org.elasticsearch.xpack.ql.type.DataTypes; | ||
import org.elasticsearch.xpack.ql.util.StringUtils; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.LinkedHashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
import static java.util.stream.Collectors.toMap; | ||
import static org.elasticsearch.xpack.eql.analysis.Failure.fail; | ||
|
||
/** | ||
* The verifier has the role of checking the analyzed tree for failures and build a list of failures following this check. | ||
* It is created in the plan executor along with the metrics instance passed as constructor parameter. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are assuming that metrics will be passed in the constructor in the same way as SQL, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct as after the analyzer the type of query becomes clear. |
||
*/ | ||
public class Verifier { | ||
|
||
public Map<Node<?>, String> verifyFailures(LogicalPlan plan) { | ||
Collection<Failure> failures = verify(plan); | ||
return failures.stream().collect(toMap(Failure::node, Failure::message)); | ||
} | ||
|
||
Collection<Failure> verify(LogicalPlan plan) { | ||
Set<Failure> failures = new LinkedHashSet<>(); | ||
|
||
// start bottom-up | ||
plan.forEachUp(p -> { | ||
if (p.analyzed()) { | ||
return; | ||
} | ||
|
||
// if the children are unresolved, so will this node; counting it will only add noise | ||
if (p.childrenResolved() == false) { | ||
return; | ||
} | ||
|
||
Set<Failure> localFailures = new LinkedHashSet<>(); | ||
|
||
if (p instanceof Unresolvable) { | ||
localFailures.add(fail(p, ((Unresolvable) p).unresolvedMessage())); | ||
} else { | ||
p.forEachExpressions(e -> { | ||
// everything is fine, skip expression | ||
if (e.resolved()) { | ||
return; | ||
} | ||
|
||
e.forEachUp(ae -> { | ||
// we're only interested in the children | ||
if (ae.childrenResolved() == false) { | ||
return; | ||
} | ||
if (ae instanceof Unresolvable) { | ||
// handle Attributes differently to provide more context | ||
if (ae instanceof UnresolvedAttribute) { | ||
UnresolvedAttribute ua = (UnresolvedAttribute) ae; | ||
// only work out the synonyms for raw unresolved attributes | ||
if (ua.customMessage() == false) { | ||
boolean useQualifier = ua.qualifier() != null; | ||
List<String> potentialMatches = new ArrayList<>(); | ||
for (Attribute a : p.inputSet()) { | ||
String nameCandidate = useQualifier ? a.qualifiedName() : a.name(); | ||
// add only primitives (object types would only result in another error) | ||
if (DataTypes.isUnsupported(a.dataType()) == false && DataTypes.isPrimitive(a.dataType())) { | ||
potentialMatches.add(nameCandidate); | ||
} | ||
} | ||
|
||
List<String> matches = StringUtils.findSimilar(ua.qualifiedName(), potentialMatches); | ||
if (matches.isEmpty() == false) { | ||
ae = ua.withUnresolvedMessage(UnresolvedAttribute.errorMessage(ua.qualifiedName(), matches)); | ||
} | ||
} | ||
} | ||
|
||
localFailures.add(fail(ae, ((Unresolvable) ae).unresolvedMessage())); | ||
return; | ||
} | ||
// type resolution | ||
if (ae.typeResolved().unresolved()) { | ||
localFailures.add(fail(ae, ae.typeResolved().message())); | ||
} | ||
|
||
}); | ||
}); | ||
} | ||
}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: a bit deeply nested imho, not sure what are the current guidelines There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are no limits on nesting - it's really a matter of styles. Due to the use of lambdas, some bits tend to be nested and improving that by extracting the code into methods doesn't always work especially when using a lot of variables... |
||
|
||
return failures; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.execution; | ||
|
||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; | ||
import org.elasticsearch.xpack.eql.analysis.Analyzer; | ||
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer; | ||
import org.elasticsearch.xpack.eql.analysis.Verifier; | ||
import org.elasticsearch.xpack.eql.optimizer.Optimizer; | ||
import org.elasticsearch.xpack.eql.planner.Planner; | ||
import org.elasticsearch.xpack.eql.session.Configuration; | ||
import org.elasticsearch.xpack.eql.session.EqlSession; | ||
import org.elasticsearch.xpack.eql.session.Results; | ||
import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry; | ||
import org.elasticsearch.xpack.ql.index.IndexResolver; | ||
|
||
import java.util.List; | ||
|
||
import static org.elasticsearch.action.ActionListener.wrap; | ||
|
||
public class PlanExecutor { | ||
private final Client client; | ||
private final NamedWriteableRegistry writableRegistry; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. writableRegistry -> writeableRegistry There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm on the fence here - the correct word is |
||
|
||
private final IndexResolver indexResolver; | ||
private final FunctionRegistry functionRegistry; | ||
|
||
private final PreAnalyzer preAnalyzer; | ||
private final Analyzer analyzer; | ||
private final Optimizer optimizer; | ||
private final Planner planner; | ||
|
||
public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRegistry writeableRegistry) { | ||
this.client = client; | ||
this.writableRegistry = writeableRegistry; | ||
|
||
this.indexResolver = indexResolver; | ||
this.functionRegistry = null; | ||
|
||
this.preAnalyzer = new PreAnalyzer(); | ||
this.analyzer = new Analyzer(functionRegistry, new Verifier()); | ||
this.optimizer = new Optimizer(); | ||
this.planner = new Planner(); | ||
} | ||
|
||
private EqlSession newSession(Configuration cfg) { | ||
return new EqlSession(client, cfg, indexResolver, preAnalyzer, analyzer, optimizer, planner, this); | ||
} | ||
|
||
public void eql(Configuration cfg, String eql, List<Object> params, ActionListener<Results> listener) { | ||
newSession(cfg).eql(eql, params, wrap(listener::onResponse, listener::onFailure)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.optimizer; | ||
|
||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; | ||
import org.elasticsearch.xpack.ql.rule.RuleExecutor; | ||
|
||
import static java.util.Collections.emptyList; | ||
|
||
public class Optimizer extends RuleExecutor<LogicalPlan> { | ||
|
||
public LogicalPlan optimize(LogicalPlan verified) { | ||
return verified.optimized() ? verified : execute(verified); | ||
} | ||
|
||
@Override | ||
protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() { | ||
return emptyList(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.eql.plan.physical; | ||
|
||
import org.elasticsearch.xpack.eql.session.Executable; | ||
import org.elasticsearch.xpack.ql.plan.QueryPlan; | ||
import org.elasticsearch.xpack.ql.tree.Source; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* A PhysicalPlan is "how" a LogicalPlan (the "what") actually gets translated into one or more queries. | ||
* | ||
* LogicalPlan = I want to get from DEN to SFO | ||
* PhysicalPlan = take Delta, DEN to SJC, then SJC to SFO | ||
*/ | ||
public abstract class PhysicalPlan extends QueryPlan<PhysicalPlan> implements Executable { | ||
|
||
public PhysicalPlan(Source source, List<PhysicalPlan> children) { | ||
super(source, children); | ||
} | ||
|
||
@Override | ||
public abstract int hashCode(); | ||
|
||
@Override | ||
public abstract boolean equals(Object obj); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
include message to hashCode and equals impl?