Skip to content

Commit

Permalink
EQL: Introduce basic execution pipeline (elastic#51809)
Browse files Browse the repository at this point in the history
Add main classes that form the 'execution' pipeline are added - most of
them have no functionality; the purpose of this PR is to add flesh out
the contract between the various moving parts so that work can start on
them independently.
  • Loading branch information
costin authored Feb 3, 2020
1 parent a462700 commit 9a1bae5
Show file tree
Hide file tree
Showing 17 changed files with 702 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.analysis;

import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.rule.RuleExecutor;

import java.util.Collection;

import static java.util.Arrays.asList;

public class Analyzer extends RuleExecutor<LogicalPlan> {

private final FunctionRegistry functionRegistry;
private final Verifier verifier;

public Analyzer(FunctionRegistry functionRegistry, Verifier verifier) {
this.functionRegistry = functionRegistry;
this.verifier = verifier;
}

@Override
protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() {
Batch resolution = new Batch("Resolution");

return asList(resolution);
}

public LogicalPlan analyze(LogicalPlan plan) {
return verify(execute(plan));
}

private LogicalPlan verify(LogicalPlan plan) {
Collection<Failure> failures = verifier.verify(plan);
if (!failures.isEmpty()) {
throw new VerificationException(failures);
}
return plan;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.analysis;

import org.elasticsearch.xpack.ql.tree.Node;

import java.util.Objects;

import static org.elasticsearch.common.logging.LoggerMessageFormat.format;

class Failure {

private final Node<?> node;
private final String message;

Failure(Node<?> node, String message) {
this.node = node;
this.message = message;
}

Node<?> node() {
return node;
}

String message() {
return message;
}

@Override
public int hashCode() {
return Objects.hash(message, node);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

Failure other = (Failure) obj;
return Objects.equals(message, other.message) && Objects.equals(node, other.node);
}

@Override
public String toString() {
return message;
}

static Failure fail(Node<?> source, String message, Object... args) {
return new Failure(source, format(message, args));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.analysis;

import org.elasticsearch.xpack.ql.index.IndexResolution;
import org.elasticsearch.xpack.ql.plan.logical.EsRelation;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.plan.logical.UnresolvedRelation;

public class PreAnalyzer {

public LogicalPlan preAnalyze(LogicalPlan plan, IndexResolution indices) {
if (plan.analyzed() == false) {
// FIXME: includeFrozen needs to be set already
plan = plan.transformUp(r -> new EsRelation(r.source(), indices.get(), false), UnresolvedRelation.class);
plan.forEachUp(LogicalPlan::setPreAnalyzed);
}
return plan;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.analysis;

import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.eql.EqlClientException;
import org.elasticsearch.xpack.ql.tree.Location;
import org.elasticsearch.xpack.ql.util.StringUtils;

import java.util.Collection;
import java.util.stream.Collectors;

public class VerificationException extends EqlClientException {

protected VerificationException(Collection<Failure> sources) {
super(asMessage(sources));
}

private static String asMessage(Collection<Failure> failures) {
return failures.stream().map(f -> {
Location l = f.node().source().source();
return "line " + l.getLineNumber() + ":" + l.getColumnNumber() + ": " + f.message();
}).collect(Collectors.joining(StringUtils.NEW_LINE, "Found " + failures.size() + " problem(s)\n", StringUtils.EMPTY));
}

@Override
public RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.analysis;

import org.elasticsearch.xpack.ql.capabilities.Unresolvable;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.tree.Node;
import org.elasticsearch.xpack.ql.type.DataTypes;
import org.elasticsearch.xpack.ql.util.StringUtils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.stream.Collectors.toMap;
import static org.elasticsearch.xpack.eql.analysis.Failure.fail;

/**
* The verifier has the role of checking the analyzed tree for failures and build a list of failures following this check.
* It is created in the plan executor along with the metrics instance passed as constructor parameter.
*/
public class Verifier {

public Map<Node<?>, String> verifyFailures(LogicalPlan plan) {
Collection<Failure> failures = verify(plan);
return failures.stream().collect(toMap(Failure::node, Failure::message));
}

Collection<Failure> verify(LogicalPlan plan) {
Set<Failure> failures = new LinkedHashSet<>();

// start bottom-up
plan.forEachUp(p -> {
if (p.analyzed()) {
return;
}

// if the children are unresolved, so will this node; counting it will only add noise
if (p.childrenResolved() == false) {
return;
}

Set<Failure> localFailures = new LinkedHashSet<>();

if (p instanceof Unresolvable) {
localFailures.add(fail(p, ((Unresolvable) p).unresolvedMessage()));
} else {
p.forEachExpressions(e -> {
// everything is fine, skip expression
if (e.resolved()) {
return;
}

e.forEachUp(ae -> {
// we're only interested in the children
if (ae.childrenResolved() == false) {
return;
}
if (ae instanceof Unresolvable) {
// handle Attributes differently to provide more context
if (ae instanceof UnresolvedAttribute) {
UnresolvedAttribute ua = (UnresolvedAttribute) ae;
// only work out the synonyms for raw unresolved attributes
if (ua.customMessage() == false) {
boolean useQualifier = ua.qualifier() != null;
List<String> potentialMatches = new ArrayList<>();
for (Attribute a : p.inputSet()) {
String nameCandidate = useQualifier ? a.qualifiedName() : a.name();
// add only primitives (object types would only result in another error)
if (DataTypes.isUnsupported(a.dataType()) == false && DataTypes.isPrimitive(a.dataType())) {
potentialMatches.add(nameCandidate);
}
}

List<String> matches = StringUtils.findSimilar(ua.qualifiedName(), potentialMatches);
if (matches.isEmpty() == false) {
ae = ua.withUnresolvedMessage(UnresolvedAttribute.errorMessage(ua.qualifiedName(), matches));
}
}
}

localFailures.add(fail(ae, ((Unresolvable) ae).unresolvedMessage()));
return;
}
// type resolution
if (ae.typeResolved().unresolved()) {
localFailures.add(fail(ae, ae.typeResolved().message()));
}

});
});
}
});

return failures;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.execution;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.xpack.eql.analysis.Analyzer;
import org.elasticsearch.xpack.eql.analysis.PreAnalyzer;
import org.elasticsearch.xpack.eql.analysis.Verifier;
import org.elasticsearch.xpack.eql.optimizer.Optimizer;
import org.elasticsearch.xpack.eql.planner.Planner;
import org.elasticsearch.xpack.eql.session.Configuration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry;
import org.elasticsearch.xpack.ql.index.IndexResolver;

import java.util.List;

import static org.elasticsearch.action.ActionListener.wrap;

public class PlanExecutor {
private final Client client;
private final NamedWriteableRegistry writableRegistry;

private final IndexResolver indexResolver;
private final FunctionRegistry functionRegistry;

private final PreAnalyzer preAnalyzer;
private final Analyzer analyzer;
private final Optimizer optimizer;
private final Planner planner;

public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRegistry writeableRegistry) {
this.client = client;
this.writableRegistry = writeableRegistry;

this.indexResolver = indexResolver;
this.functionRegistry = null;

this.preAnalyzer = new PreAnalyzer();
this.analyzer = new Analyzer(functionRegistry, new Verifier());
this.optimizer = new Optimizer();
this.planner = new Planner();
}

private EqlSession newSession(Configuration cfg) {
return new EqlSession(client, cfg, indexResolver, preAnalyzer, analyzer, optimizer, planner, this);
}

public void eql(Configuration cfg, String eql, List<Object> params, ActionListener<Results> listener) {
newSession(cfg).eql(eql, params, wrap(listener::onResponse, listener::onFailure));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.optimizer;

import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.ql.rule.RuleExecutor;

import static java.util.Collections.emptyList;

public class Optimizer extends RuleExecutor<LogicalPlan> {

public LogicalPlan optimize(LogicalPlan verified) {
return verified.optimized() ? verified : execute(verified);
}

@Override
protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() {
return emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.eql.plan.physical;

import org.elasticsearch.xpack.eql.session.Executable;
import org.elasticsearch.xpack.ql.plan.QueryPlan;
import org.elasticsearch.xpack.ql.tree.Source;

import java.util.List;

/**
* A PhysicalPlan is "how" a LogicalPlan (the "what") actually gets translated into one or more queries.
*
* LogicalPlan = I want to get from DEN to SFO
* PhysicalPlan = take Delta, DEN to SJC, then SJC to SFO
*/
public abstract class PhysicalPlan extends QueryPlan<PhysicalPlan> implements Executable {

public PhysicalPlan(Source source, List<PhysicalPlan> children) {
super(source, children);
}

@Override
public abstract int hashCode();

@Override
public abstract boolean equals(Object obj);
}
Loading

0 comments on commit 9a1bae5

Please sign in to comment.