Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EQL: Introduce basic execution pipeline #51809

Merged
merged 3 commits into from
Feb 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.analysis;

import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.rule.RuleExecutor;

import java.util.Collection;

import static java.util.Arrays.asList;

public class Analyzer extends RuleExecutor<LogicalPlan> {

private final FunctionRegistry functionRegistry;
private final Verifier verifier;

public Analyzer(FunctionRegistry functionRegistry, Verifier verifier) {
this.functionRegistry = functionRegistry;
this.verifier = verifier;
}

@Override
protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() {
Batch resolution = new Batch("Resolution");

return asList(resolution);
}

public LogicalPlan analyze(LogicalPlan plan) {
return verify(execute(plan));
}

private LogicalPlan verify(LogicalPlan plan) {
Collection<Failure> failures = verifier.verify(plan);
if (!failures.isEmpty()) {
throw new VerificationException(failures);
}
return plan;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.analysis;

import org.elasticsearch.xpack.ql.tree.Node;

import java.util.Objects;

import static org.elasticsearch.common.logging.LoggerMessageFormat.format;

class Failure {

private final Node<?> node;
private final String message;

Failure(Node<?> node, String message) {
this.node = node;
this.message = message;
}

Node<?> node() {
return node;
}

String message() {
return message;
}

@Override
public int hashCode() {
return Objects.hash(node);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

Failure other = (Failure) obj;
return Objects.equals(node, other.node);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include message to hashCode and equals impl?


@Override
public String toString() {
return message;
}

static Failure fail(Node<?> source, String message, Object... args) {
return new Failure(source, format(message, args));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.analysis;

import org.elasticsearch.xpack.ql.index.IndexResolution;
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.UnresolvedRelation;

public class PreAnalyzer {

public LogicalPlan preAnalyze(LogicalPlan plan, IndexResolution indices) {
if (plan.analyzed() == false) {
// FIXME: includeFrozen needs to be set already
plan = plan.transformUp(r -> new EsRelation(r.source(), indices.get(), false), UnresolvedRelation.class);
plan.forEachUp(LogicalPlan::setPreAnalyzed);
}
return plan;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.analysis;

import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.eql.EqlClientException;
import org.elasticsearch.xpack.ql.tree.Location;
import org.elasticsearch.xpack.ql.util.StringUtils;

import java.util.Collection;
import java.util.stream.Collectors;

public class VerificationException extends EqlClientException {

protected VerificationException(Collection<Failure> sources) {
super(asMessage(sources));
}

private static String asMessage(Collection<Failure> failures) {
return failures.stream().map(f -> {
Location l = f.node().source().source();
return "line " + l.getLineNumber() + ":" + l.getColumnNumber() + ": " + f.message();
}).collect(Collectors.joining(StringUtils.NEW_LINE, "Found " + failures.size() + " problem(s)\n", StringUtils.EMPTY));
}

@Override
public RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.analysis;

import org.elasticsearch.xpack.ql.capabilities.Unresolvable;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.Node;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.StringUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.xpack.eql.analysis.Failure.fail;

/**
* The verifier has the role of checking the analyzed tree for failures and build a list of failures following this check.
* It is created in the plan executor along with the metrics instance passed as constructor parameter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are assuming that metrics will be passed in the constructor in the same way as SQL, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct as after the analyzer the type of query becomes clear.

*/
public class Verifier {

public Map<Node<?>, String> verifyFailures(LogicalPlan plan) {
Collection<Failure> failures = verify(plan);
return failures.stream().collect(toMap(Failure::node, Failure::message));
}

Collection<Failure> verify(LogicalPlan plan) {
Set<Failure> failures = new LinkedHashSet<>();

// start bottom-up
plan.forEachUp(p -> {
if (p.analyzed()) {
return;
}

// if the children are unresolved, so will this node; counting it will only add noise
if (p.childrenResolved() == false) {
return;
}

Set<Failure> localFailures = new LinkedHashSet<>();

if (p instanceof Unresolvable) {
localFailures.add(fail(p, ((Unresolvable) p).unresolvedMessage()));
} else {
p.forEachExpressions(e -> {
// everything is fine, skip expression
if (e.resolved()) {
return;
}

e.forEachUp(ae -> {
// we're only interested in the children
if (ae.childrenResolved() == false) {
return;
}
if (ae instanceof Unresolvable) {
// handle Attributes different to provide more context
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

different -> differently

if (ae instanceof UnresolvedAttribute) {
UnresolvedAttribute ua = (UnresolvedAttribute) ae;
// only work out the synonyms for raw unresolved attributes
if (ua.customMessage() == false) {
boolean useQualifier = ua.qualifier() != null;
List<String> potentialMatches = new ArrayList<>();
for (Attribute a : p.inputSet()) {
String nameCandidate = useQualifier ? a.qualifiedName() : a.name();
// add only primitives (object types would only result in another error)
if (DataTypes.isUnsupported(a.dataType()) == false && DataTypes.isPrimitive(a.dataType())) {
potentialMatches.add(nameCandidate);
}
}

List<String> matches = StringUtils.findSimilar(ua.qualifiedName(), potentialMatches);
if (matches.isEmpty() == false) {
ae = ua.withUnresolvedMessage(UnresolvedAttribute.errorMessage(ua.qualifiedName(), matches));
}
}
}

localFailures.add(fail(ae, ((Unresolvable) ae).unresolvedMessage()));
return;
}
// type resolution
if (ae.typeResolved().unresolved()) {
localFailures.add(fail(ae, ae.typeResolved().message()));
}

});
});
}
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a bit deeply nested imho, not sure what are the current guidelines

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no limits on nesting - it's really a matter of styles. Due to the use of lambdas, some bits tend to be nested and improving that by extracting the code into methods doesn't always work especially when using a lot of variables...


return failures;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.execution;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.xpack.eql.analysis.Analyzer;
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.eql.analysis.Verifier;
import org.elasticsearch.xpack.eql.optimizer.Optimizer;
import org.elasticsearch.xpack.eql.planner.Planner;
import org.elasticsearch.xpack.eql.session.Configuration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.ql.index.IndexResolver;

import java.util.List;

import static org.elasticsearch.action.ActionListener.wrap;

public class PlanExecutor {
private final Client client;
private final NamedWriteableRegistry writableRegistry;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writableRegistry -> writeableRegistry

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence here - the correct word is writable not writeable; not sure which way to go...


private final IndexResolver indexResolver;
private final FunctionRegistry functionRegistry;

private final PreAnalyzer preAnalyzer;
private final Analyzer analyzer;
private final Optimizer optimizer;
private final Planner planner;

public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRegistry writeableRegistry) {
this.client = client;
this.writableRegistry = writeableRegistry;

this.indexResolver = indexResolver;
this.functionRegistry = null;

this.preAnalyzer = new PreAnalyzer();
this.analyzer = new Analyzer(functionRegistry, new Verifier());
this.optimizer = new Optimizer();
this.planner = new Planner();
}

private EqlSession newSession(Configuration cfg) {
return new EqlSession(client, cfg, indexResolver, preAnalyzer, analyzer, optimizer, planner, this);
}

public void eql(Configuration cfg, String eql, List<Object> params, ActionListener<Results> listener) {
newSession(cfg).eql(eql, params, wrap(listener::onResponse, listener::onFailure));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.optimizer;

import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.rule.RuleExecutor;

import static java.util.Collections.emptyList;

public class Optimizer extends RuleExecutor<LogicalPlan> {

public LogicalPlan optimize(LogicalPlan verified) {
return verified.optimized() ? verified : execute(verified);
}

@Override
protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() {
return emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.plan.physical;

import org.elasticsearch.xpack.eql.session.Executable;
import org.elasticsearch.xpack.ql.plan.QueryPlan;
import org.elasticsearch.xpack.ql.tree.Source;

import java.util.List;

/**
* A PhysicalPlan is "how" a LogicalPlan (the "what") actually gets translated into one or more queries.
*
* LogicalPlan = I want to get from DEN to SFO
* PhysicalPlan = take Delta, DEN to SJC, then SJC to SFO
*/
public abstract class PhysicalPlan extends QueryPlan<PhysicalPlan> implements Executable {

public PhysicalPlan(Source source, List<PhysicalPlan> children) {
super(source, children);
}

@Override
public abstract int hashCode();

@Override
public abstract boolean equals(Object obj);
}
Loading