-
Notifications
You must be signed in to change notification settings - Fork 24.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
EQL: Introduce basic execution pipeline (#51809)
Add main classes that form the 'execution' pipeline are added - most of them have no functionality; the purpose of this PR is to add flesh out the contract between the various moving parts so that work can start on them independently.
- Loading branch information
Showing
17 changed files
with
702 additions
and
11 deletions.
There are no files selected for viewing
45 changes: 45 additions & 0 deletions
45
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.analysis; | ||
|
||
import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry; | ||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; | ||
import org.elasticsearch.xpack.ql.rule.RuleExecutor; | ||
|
||
import java.util.Collection; | ||
|
||
import static java.util.Arrays.asList; | ||
|
||
public class Analyzer extends RuleExecutor<LogicalPlan> { | ||
|
||
private final FunctionRegistry functionRegistry; | ||
private final Verifier verifier; | ||
|
||
public Analyzer(FunctionRegistry functionRegistry, Verifier verifier) { | ||
this.functionRegistry = functionRegistry; | ||
this.verifier = verifier; | ||
} | ||
|
||
@Override | ||
protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() { | ||
Batch resolution = new Batch("Resolution"); | ||
|
||
return asList(resolution); | ||
} | ||
|
||
public LogicalPlan analyze(LogicalPlan plan) { | ||
return verify(execute(plan)); | ||
} | ||
|
||
private LogicalPlan verify(LogicalPlan plan) { | ||
Collection<Failure> failures = verifier.verify(plan); | ||
if (!failures.isEmpty()) { | ||
throw new VerificationException(failures); | ||
} | ||
return plan; | ||
} | ||
} |
60 changes: 60 additions & 0 deletions
60
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Failure.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.analysis; | ||
|
||
import org.elasticsearch.xpack.ql.tree.Node; | ||
|
||
import java.util.Objects; | ||
|
||
import static org.elasticsearch.common.logging.LoggerMessageFormat.format; | ||
|
||
class Failure { | ||
|
||
private final Node<?> node; | ||
private final String message; | ||
|
||
Failure(Node<?> node, String message) { | ||
this.node = node; | ||
this.message = message; | ||
} | ||
|
||
Node<?> node() { | ||
return node; | ||
} | ||
|
||
String message() { | ||
return message; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(message, node); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (this == obj) { | ||
return true; | ||
} | ||
|
||
if (obj == null || getClass() != obj.getClass()) { | ||
return false; | ||
} | ||
|
||
Failure other = (Failure) obj; | ||
return Objects.equals(message, other.message) && Objects.equals(node, other.node); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return message; | ||
} | ||
|
||
static Failure fail(Node<?> source, String message, Object... args) { | ||
return new Failure(source, format(message, args)); | ||
} | ||
} |
24 changes: 24 additions & 0 deletions
24
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/PreAnalyzer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.analysis; | ||
|
||
import org.elasticsearch.xpack.ql.index.IndexResolution; | ||
import org.elasticsearch.xpack.ql.plan.logical.EsRelation; | ||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; | ||
import org.elasticsearch.xpack.ql.plan.logical.UnresolvedRelation; | ||
|
||
public class PreAnalyzer { | ||
|
||
public LogicalPlan preAnalyze(LogicalPlan plan, IndexResolution indices) { | ||
if (plan.analyzed() == false) { | ||
// FIXME: includeFrozen needs to be set already | ||
plan = plan.transformUp(r -> new EsRelation(r.source(), indices.get(), false), UnresolvedRelation.class); | ||
plan.forEachUp(LogicalPlan::setPreAnalyzed); | ||
} | ||
return plan; | ||
} | ||
} |
33 changes: 33 additions & 0 deletions
33
.../plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/VerificationException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.eql.analysis; | ||
|
||
import org.elasticsearch.rest.RestStatus; | ||
import org.elasticsearch.xpack.eql.EqlClientException; | ||
import org.elasticsearch.xpack.ql.tree.Location; | ||
import org.elasticsearch.xpack.ql.util.StringUtils; | ||
|
||
import java.util.Collection; | ||
import java.util.stream.Collectors; | ||
|
||
public class VerificationException extends EqlClientException { | ||
|
||
protected VerificationException(Collection<Failure> sources) { | ||
super(asMessage(sources)); | ||
} | ||
|
||
private static String asMessage(Collection<Failure> failures) { | ||
return failures.stream().map(f -> { | ||
Location l = f.node().source().source(); | ||
return "line " + l.getLineNumber() + ":" + l.getColumnNumber() + ": " + f.message(); | ||
}).collect(Collectors.joining(StringUtils.NEW_LINE, "Found " + failures.size() + " problem(s)\n", StringUtils.EMPTY)); | ||
} | ||
|
||
@Override | ||
public RestStatus status() { | ||
return RestStatus.BAD_REQUEST; | ||
} | ||
} |
106 changes: 106 additions & 0 deletions
106
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.analysis; | ||
|
||
import org.elasticsearch.xpack.ql.capabilities.Unresolvable; | ||
import org.elasticsearch.xpack.ql.expression.Attribute; | ||
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute; | ||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; | ||
import org.elasticsearch.xpack.ql.tree.Node; | ||
import org.elasticsearch.xpack.ql.type.DataTypes; | ||
import org.elasticsearch.xpack.ql.util.StringUtils; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.LinkedHashSet; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
import static java.util.stream.Collectors.toMap; | ||
import static org.elasticsearch.xpack.eql.analysis.Failure.fail; | ||
|
||
/** | ||
* The verifier has the role of checking the analyzed tree for failures and build a list of failures following this check. | ||
* It is created in the plan executor along with the metrics instance passed as constructor parameter. | ||
*/ | ||
public class Verifier { | ||
|
||
public Map<Node<?>, String> verifyFailures(LogicalPlan plan) { | ||
Collection<Failure> failures = verify(plan); | ||
return failures.stream().collect(toMap(Failure::node, Failure::message)); | ||
} | ||
|
||
Collection<Failure> verify(LogicalPlan plan) { | ||
Set<Failure> failures = new LinkedHashSet<>(); | ||
|
||
// start bottom-up | ||
plan.forEachUp(p -> { | ||
if (p.analyzed()) { | ||
return; | ||
} | ||
|
||
// if the children are unresolved, so will this node; counting it will only add noise | ||
if (p.childrenResolved() == false) { | ||
return; | ||
} | ||
|
||
Set<Failure> localFailures = new LinkedHashSet<>(); | ||
|
||
if (p instanceof Unresolvable) { | ||
localFailures.add(fail(p, ((Unresolvable) p).unresolvedMessage())); | ||
} else { | ||
p.forEachExpressions(e -> { | ||
// everything is fine, skip expression | ||
if (e.resolved()) { | ||
return; | ||
} | ||
|
||
e.forEachUp(ae -> { | ||
// we're only interested in the children | ||
if (ae.childrenResolved() == false) { | ||
return; | ||
} | ||
if (ae instanceof Unresolvable) { | ||
// handle Attributes differently to provide more context | ||
if (ae instanceof UnresolvedAttribute) { | ||
UnresolvedAttribute ua = (UnresolvedAttribute) ae; | ||
// only work out the synonyms for raw unresolved attributes | ||
if (ua.customMessage() == false) { | ||
boolean useQualifier = ua.qualifier() != null; | ||
List<String> potentialMatches = new ArrayList<>(); | ||
for (Attribute a : p.inputSet()) { | ||
String nameCandidate = useQualifier ? a.qualifiedName() : a.name(); | ||
// add only primitives (object types would only result in another error) | ||
if (DataTypes.isUnsupported(a.dataType()) == false && DataTypes.isPrimitive(a.dataType())) { | ||
potentialMatches.add(nameCandidate); | ||
} | ||
} | ||
|
||
List<String> matches = StringUtils.findSimilar(ua.qualifiedName(), potentialMatches); | ||
if (matches.isEmpty() == false) { | ||
ae = ua.withUnresolvedMessage(UnresolvedAttribute.errorMessage(ua.qualifiedName(), matches)); | ||
} | ||
} | ||
} | ||
|
||
localFailures.add(fail(ae, ((Unresolvable) ae).unresolvedMessage())); | ||
return; | ||
} | ||
// type resolution | ||
if (ae.typeResolved().unresolved()) { | ||
localFailures.add(fail(ae, ae.typeResolved().message())); | ||
} | ||
|
||
}); | ||
}); | ||
} | ||
}); | ||
|
||
return failures; | ||
} | ||
} |
59 changes: 59 additions & 0 deletions
59
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/PlanExecutor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.execution; | ||
|
||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; | ||
import org.elasticsearch.xpack.eql.analysis.Analyzer; | ||
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer; | ||
import org.elasticsearch.xpack.eql.analysis.Verifier; | ||
import org.elasticsearch.xpack.eql.optimizer.Optimizer; | ||
import org.elasticsearch.xpack.eql.planner.Planner; | ||
import org.elasticsearch.xpack.eql.session.Configuration; | ||
import org.elasticsearch.xpack.eql.session.EqlSession; | ||
import org.elasticsearch.xpack.eql.session.Results; | ||
import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry; | ||
import org.elasticsearch.xpack.ql.index.IndexResolver; | ||
|
||
import java.util.List; | ||
|
||
import static org.elasticsearch.action.ActionListener.wrap; | ||
|
||
public class PlanExecutor { | ||
private final Client client; | ||
private final NamedWriteableRegistry writableRegistry; | ||
|
||
private final IndexResolver indexResolver; | ||
private final FunctionRegistry functionRegistry; | ||
|
||
private final PreAnalyzer preAnalyzer; | ||
private final Analyzer analyzer; | ||
private final Optimizer optimizer; | ||
private final Planner planner; | ||
|
||
public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRegistry writeableRegistry) { | ||
this.client = client; | ||
this.writableRegistry = writeableRegistry; | ||
|
||
this.indexResolver = indexResolver; | ||
this.functionRegistry = null; | ||
|
||
this.preAnalyzer = new PreAnalyzer(); | ||
this.analyzer = new Analyzer(functionRegistry, new Verifier()); | ||
this.optimizer = new Optimizer(); | ||
this.planner = new Planner(); | ||
} | ||
|
||
private EqlSession newSession(Configuration cfg) { | ||
return new EqlSession(client, cfg, indexResolver, preAnalyzer, analyzer, optimizer, planner, this); | ||
} | ||
|
||
public void eql(Configuration cfg, String eql, List<Object> params, ActionListener<Results> listener) { | ||
newSession(cfg).eql(eql, params, wrap(listener::onResponse, listener::onFailure)); | ||
} | ||
} |
24 changes: 24 additions & 0 deletions
24
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
package org.elasticsearch.xpack.eql.optimizer; | ||
|
||
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; | ||
import org.elasticsearch.xpack.ql.rule.RuleExecutor; | ||
|
||
import static java.util.Collections.emptyList; | ||
|
||
public class Optimizer extends RuleExecutor<LogicalPlan> { | ||
|
||
public LogicalPlan optimize(LogicalPlan verified) { | ||
return verified.optimized() ? verified : execute(verified); | ||
} | ||
|
||
@Override | ||
protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() { | ||
return emptyList(); | ||
} | ||
} |
31 changes: 31 additions & 0 deletions
31
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/PhysicalPlan.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
package org.elasticsearch.xpack.eql.plan.physical; | ||
|
||
import org.elasticsearch.xpack.eql.session.Executable; | ||
import org.elasticsearch.xpack.ql.plan.QueryPlan; | ||
import org.elasticsearch.xpack.ql.tree.Source; | ||
|
||
import java.util.List; | ||
|
||
/** | ||
* A PhysicalPlan is "how" a LogicalPlan (the "what") actually gets translated into one or more queries. | ||
* | ||
* LogicalPlan = I want to get from DEN to SFO | ||
* PhysicalPlan = take Delta, DEN to SJC, then SJC to SFO | ||
*/ | ||
public abstract class PhysicalPlan extends QueryPlan<PhysicalPlan> implements Executable { | ||
|
||
public PhysicalPlan(Source source, List<PhysicalPlan> children) { | ||
super(source, children); | ||
} | ||
|
||
@Override | ||
public abstract int hashCode(); | ||
|
||
@Override | ||
public abstract boolean equals(Object obj); | ||
} |
Oops, something went wrong.