diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java new file mode 100644 index 0000000000000..eb741da145e30 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Analyzer.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.analysis; + +import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry; +import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.ql.rule.RuleExecutor; + +import java.util.Collection; + +import static java.util.Arrays.asList; + +public class Analyzer extends RuleExecutor<LogicalPlan> { + + private final FunctionRegistry functionRegistry; + private final Verifier verifier; + + public Analyzer(FunctionRegistry functionRegistry, Verifier verifier) { + this.functionRegistry = functionRegistry; + this.verifier = verifier; + } + + @Override + protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() { + Batch resolution = new Batch("Resolution"); + + return asList(resolution); + } + + public LogicalPlan analyze(LogicalPlan plan) { + return verify(execute(plan)); + } + + private LogicalPlan verify(LogicalPlan plan) { + Collection<Failure> failures = verifier.verify(plan); + if (!failures.isEmpty()) { + throw new VerificationException(failures); + } + return plan; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Failure.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Failure.java new file mode 100644 index 0000000000000..d4dadb5be64cd --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Failure.java @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.analysis; + +import org.elasticsearch.xpack.ql.tree.Node; + +import java.util.Objects; + +import static org.elasticsearch.common.logging.LoggerMessageFormat.format; + +class Failure { + + private final Node<?> node; + private final String message; + + Failure(Node<?> node, String message) { + this.node = node; + this.message = message; + } + + Node<?> node() { + return node; + } + + String message() { + return message; + } + + @Override + public int hashCode() { + return Objects.hash(message, node); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + Failure other = (Failure) obj; + return Objects.equals(message, other.message) && Objects.equals(node, other.node); + } + + @Override + public String toString() { + return message; + } + + static Failure fail(Node<?> source, String message, Object... args) { + return new Failure(source, format(message, args)); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/PreAnalyzer.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/PreAnalyzer.java new file mode 100644 index 0000000000000..adb05a1be0c74 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/PreAnalyzer.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.analysis; + +import org.elasticsearch.xpack.ql.index.IndexResolution; +import org.elasticsearch.xpack.ql.plan.logical.EsRelation; +import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.ql.plan.logical.UnresolvedRelation; + +public class PreAnalyzer { + + public LogicalPlan preAnalyze(LogicalPlan plan, IndexResolution indices) { + if (plan.analyzed() == false) { + // FIXME: includeFrozen needs to be set already + plan = plan.transformUp(r -> new EsRelation(r.source(), indices.get(), false), UnresolvedRelation.class); + plan.forEachUp(LogicalPlan::setPreAnalyzed); + } + return plan; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/VerificationException.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/VerificationException.java new file mode 100644 index 0000000000000..ac7800db056be --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/VerificationException.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.analysis; + +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.xpack.eql.EqlClientException; +import org.elasticsearch.xpack.ql.tree.Location; +import org.elasticsearch.xpack.ql.util.StringUtils; + +import java.util.Collection; +import java.util.stream.Collectors; + +public class VerificationException extends EqlClientException { + + protected VerificationException(Collection<Failure> sources) { + super(asMessage(sources)); + } + + private static String asMessage(Collection<Failure> failures) { + return failures.stream().map(f -> { + Location l = f.node().source().source(); + return "line " + l.getLineNumber() + ":" + l.getColumnNumber() + ": " + f.message(); + }).collect(Collectors.joining(StringUtils.NEW_LINE, "Found " + failures.size() + " problem(s)\n", StringUtils.EMPTY)); + } + + @Override + public RestStatus status() { + return RestStatus.BAD_REQUEST; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java new file mode 100644 index 0000000000000..51dfb6a3d971c --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/analysis/Verifier.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.analysis; + +import org.elasticsearch.xpack.ql.capabilities.Unresolvable; +import org.elasticsearch.xpack.ql.expression.Attribute; +import org.elasticsearch.xpack.ql.expression.UnresolvedAttribute; +import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.ql.tree.Node; +import org.elasticsearch.xpack.ql.type.DataTypes; +import org.elasticsearch.xpack.ql.util.StringUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static java.util.stream.Collectors.toMap; +import static org.elasticsearch.xpack.eql.analysis.Failure.fail; + +/** + * The verifier has the role of checking the analyzed tree for failures and build a list of failures following this check. + * It is created in the plan executor along with the metrics instance passed as constructor parameter. + */ +public class Verifier { + + public Map<Node<?>, String> verifyFailures(LogicalPlan plan) { + Collection<Failure> failures = verify(plan); + return failures.stream().collect(toMap(Failure::node, Failure::message)); + } + + Collection<Failure> verify(LogicalPlan plan) { + Set<Failure> failures = new LinkedHashSet<>(); + + // start bottom-up + plan.forEachUp(p -> { + if (p.analyzed()) { + return; + } + + // if the children are unresolved, so will this node; counting it will only add noise + if (p.childrenResolved() == false) { + return; + } + + Set<Failure> localFailures = new LinkedHashSet<>(); + + if (p instanceof Unresolvable) { + localFailures.add(fail(p, ((Unresolvable) p).unresolvedMessage())); + } else { + p.forEachExpressions(e -> { + // everything is fine, skip expression + if (e.resolved()) { + return; + } + + e.forEachUp(ae -> { + // we're only interested in the children + if (ae.childrenResolved() == false) { + return; + } + if (ae instanceof Unresolvable) { + // handle Attributes differently to provide more context + if (ae instanceof UnresolvedAttribute) { + UnresolvedAttribute ua = (UnresolvedAttribute) ae; + // only work out the synonyms for raw unresolved attributes + if (ua.customMessage() == false) { + boolean useQualifier = ua.qualifier() != null; + List<String> potentialMatches = new ArrayList<>(); + for (Attribute a : p.inputSet()) { + String nameCandidate = useQualifier ? a.qualifiedName() : a.name(); + // add only primitives (object types would only result in another error) + if (DataTypes.isUnsupported(a.dataType()) == false && DataTypes.isPrimitive(a.dataType())) { + potentialMatches.add(nameCandidate); + } + } + + List<String> matches = StringUtils.findSimilar(ua.qualifiedName(), potentialMatches); + if (matches.isEmpty() == false) { + ae = ua.withUnresolvedMessage(UnresolvedAttribute.errorMessage(ua.qualifiedName(), matches)); + } + } + } + + localFailures.add(fail(ae, ((Unresolvable) ae).unresolvedMessage())); + return; + } + // type resolution + if (ae.typeResolved().unresolved()) { + localFailures.add(fail(ae, ae.typeResolved().message())); + } + + }); + }); + } + }); + + return failures; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/PlanExecutor.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/PlanExecutor.java new file mode 100644 index 0000000000000..39658b3acf226 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/execution/PlanExecutor.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.execution; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.xpack.eql.analysis.Analyzer; +import org.elasticsearch.xpack.eql.analysis.PreAnalyzer; +import org.elasticsearch.xpack.eql.analysis.Verifier; +import org.elasticsearch.xpack.eql.optimizer.Optimizer; +import org.elasticsearch.xpack.eql.planner.Planner; +import org.elasticsearch.xpack.eql.session.Configuration; +import org.elasticsearch.xpack.eql.session.EqlSession; +import org.elasticsearch.xpack.eql.session.Results; +import org.elasticsearch.xpack.ql.expression.function.FunctionRegistry; +import org.elasticsearch.xpack.ql.index.IndexResolver; + +import java.util.List; + +import static org.elasticsearch.action.ActionListener.wrap; + +public class PlanExecutor { + private final Client client; + private final NamedWriteableRegistry writableRegistry; + + private final IndexResolver indexResolver; + private final FunctionRegistry functionRegistry; + + private final PreAnalyzer preAnalyzer; + private final Analyzer analyzer; + private final Optimizer optimizer; + private final Planner planner; + + public PlanExecutor(Client client, IndexResolver indexResolver, NamedWriteableRegistry writeableRegistry) { + this.client = client; + this.writableRegistry = writeableRegistry; + + this.indexResolver = indexResolver; + this.functionRegistry = null; + + this.preAnalyzer = new PreAnalyzer(); + this.analyzer = new Analyzer(functionRegistry, new Verifier()); + this.optimizer = new Optimizer(); + this.planner = new Planner(); + } + + private EqlSession newSession(Configuration cfg) { + return new EqlSession(client, cfg, indexResolver, preAnalyzer, analyzer, optimizer, planner, this); + } + + public void eql(Configuration cfg, String eql, List<Object> params, ActionListener<Results> listener) { + newSession(cfg).eql(eql, params, wrap(listener::onResponse, listener::onFailure)); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java new file mode 100644 index 0000000000000..c9d5986b5ec81 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/optimizer/Optimizer.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.optimizer; + +import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.ql.rule.RuleExecutor; + +import static java.util.Collections.emptyList; + +public class Optimizer extends RuleExecutor<LogicalPlan> { + + public LogicalPlan optimize(LogicalPlan verified) { + return verified.optimized() ? verified : execute(verified); + } + + @Override + protected Iterable<RuleExecutor<LogicalPlan>.Batch> batches() { + return emptyList(); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/PhysicalPlan.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/PhysicalPlan.java new file mode 100644 index 0000000000000..be4f1ef97ca3b --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plan/physical/PhysicalPlan.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.eql.plan.physical; + +import org.elasticsearch.xpack.eql.session.Executable; +import org.elasticsearch.xpack.ql.plan.QueryPlan; +import org.elasticsearch.xpack.ql.tree.Source; + +import java.util.List; + +/** + * A PhysicalPlan is "how" a LogicalPlan (the "what") actually gets translated into one or more queries. + * + * LogicalPlan = I want to get from DEN to SFO + * PhysicalPlan = take Delta, DEN to SJC, then SJC to SFO + */ +public abstract class PhysicalPlan extends QueryPlan<PhysicalPlan> implements Executable { + + public PhysicalPlan(Source source, List<PhysicalPlan> children) { + super(source, children); + } + + @Override + public abstract int hashCode(); + + @Override + public abstract boolean equals(Object obj); +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Planner.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Planner.java new file mode 100644 index 0000000000000..0eb373d68096a --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/planner/Planner.java @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.planner; + +import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; + +public class Planner { + + public PhysicalPlan plan(LogicalPlan plan) { + throw new UnsupportedOperationException(); + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java index ef79d7ac21229..abed7c607acfd 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/EqlPlugin.java @@ -8,20 +8,33 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.eql.action.EqlSearchAction; +import org.elasticsearch.xpack.eql.execution.PlanExecutor; +import org.elasticsearch.xpack.ql.index.IndexResolver; +import org.elasticsearch.xpack.ql.type.DefaultDataTypeRegistry; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.function.Supplier; @@ -34,6 +47,20 @@ public class EqlPlugin extends Plugin implements ActionPlugin { Setting.Property.NodeScope ); + @Override + public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry, + Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) { + + return createComponents(client, clusterService.getClusterName().value(), namedWriteableRegistry); + } + + private Collection<Object> createComponents(Client client, String clusterName, NamedWriteableRegistry namedWriteableRegistry) { + IndexResolver indexResolver = new IndexResolver(client, clusterName, DefaultDataTypeRegistry.INSTANCE); + PlanExecutor planExecutor = new PlanExecutor(client, indexResolver, namedWriteableRegistry); + return Arrays.asList(planExecutor); + } + @Override public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() { @@ -60,6 +87,11 @@ boolean isSnapshot() { return Build.CURRENT.isSnapshot(); } + // TODO: this needs to be used by all plugin methods - including getActions and createComponents + private boolean isEnabled(Settings settings) { + return EQL_ENABLED_SETTING.get(settings); + } + @Override public List<RestHandler> getRestHandlers(Settings settings, RestController restController, @@ -69,10 +101,9 @@ public List<RestHandler> getRestHandlers(Settings settings, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) { - boolean enabled = EQL_ENABLED_SETTING.get(settings); - if (!enabled) { + if (isEnabled(settings) == false) { return Collections.emptyList(); } return Arrays.asList(new RestEqlSearchAction(restController)); } -} +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java index b73183f500e9e..960c3c93c5a1d 100644 --- a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/TransportEqlSearchAction.java @@ -12,6 +12,9 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.time.DateUtils; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -21,7 +24,11 @@ import org.elasticsearch.xpack.eql.action.EqlSearchAction; import org.elasticsearch.xpack.eql.action.EqlSearchRequest; import org.elasticsearch.xpack.eql.action.EqlSearchResponse; +import org.elasticsearch.xpack.eql.execution.PlanExecutor; +import org.elasticsearch.xpack.eql.session.Configuration; +import org.elasticsearch.xpack.eql.session.Results; +import java.time.ZoneId; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -29,28 +36,39 @@ public class TransportEqlSearchAction extends HandledTransportAction<EqlSearchRequest, EqlSearchResponse> { private final SecurityContext securityContext; private final ClusterService clusterService; + private final PlanExecutor planExecutor; @Inject public TransportEqlSearchAction(Settings settings, ClusterService clusterService, TransportService transportService, - ThreadPool threadPool, ActionFilters actionFilters) { + ThreadPool threadPool, ActionFilters actionFilters, PlanExecutor planExecutor) { super(EqlSearchAction.NAME, transportService, actionFilters, EqlSearchRequest::new); this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ? new SecurityContext(settings, threadPool.getThreadContext()) : null; this.clusterService = clusterService; + this.planExecutor = planExecutor; } @Override protected void doExecute(Task task, EqlSearchRequest request, ActionListener<EqlSearchResponse> listener) { - operation(request, listener); + operation(planExecutor, request, username(securityContext), clusterName(clusterService), listener); } - public static void operation(EqlSearchRequest request, ActionListener<EqlSearchResponse> listener) { - // TODO: implement parsing and querying - listener.onResponse(createResponse(request)); + public static void operation(PlanExecutor planExecutor, EqlSearchRequest request, String username, + String clusterName, ActionListener<EqlSearchResponse> listener) { + // TODO: these should be sent by the client + ZoneId zoneId = DateUtils.of("Z"); + QueryBuilder filter = request.query(); + TimeValue timeout = TimeValue.timeValueSeconds(30); + boolean includeFrozen = request.indicesOptions().ignoreThrottled() == false; + String clientId = null; + + Configuration cfg = new Configuration(request.indices(), zoneId, username, clusterName, filter, timeout, includeFrozen, clientId); + //planExecutor.eql(cfg, request.rule(), emptyList(), wrap(r -> listener.onResponse(createResponse(r)), listener::onFailure)); + listener.onResponse(createResponse(null)); } - static EqlSearchResponse createResponse(EqlSearchRequest request) { + static EqlSearchResponse createResponse(Results results) { // Stubbed search response // TODO: implement actual search response processing once the parser/executor is in place List<SearchHit> events = Arrays.asList( @@ -63,4 +81,12 @@ static EqlSearchResponse createResponse(EqlSearchRequest request) { ), null, new TotalHits(0, TotalHits.Relation.EQUAL_TO)); return new EqlSearchResponse(hits, 0, false); } -} + + static String username(SecurityContext securityContext) { + return securityContext != null && securityContext.getUser() != null ? securityContext.getUser().principal() : null; + } + + static String clusterName(ClusterService clusterService) { + return clusterService.getClusterName().value(); + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Configuration.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Configuration.java new file mode 100644 index 0000000000000..c0bfbf389e0f7 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Configuration.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.session; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; + +import java.time.ZoneId; + +public class Configuration extends org.elasticsearch.xpack.ql.session.Configuration { + + private final String[] indices; + private final TimeValue requestTimeout; + private final String clientId; + private final boolean includeFrozenIndices; + + @Nullable + private QueryBuilder filter; + + public Configuration(String[] indices, ZoneId zi, String username, String clusterName, QueryBuilder filter, + TimeValue requestTimeout, + boolean includeFrozen, String clientId) { + + super(zi, username, clusterName); + + this.indices = indices; + this.filter = filter; + this.requestTimeout = requestTimeout; + this.clientId = clientId; + this.includeFrozenIndices = includeFrozen; + } + + public String[] indices() { + return indices; + } + + public TimeValue requestTimeout() { + return requestTimeout; + } + + public QueryBuilder filter() { + return filter; + } + + public String clientId() { + return clientId; + } + + public boolean includeFrozen() { + return includeFrozenIndices; + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java new file mode 100644 index 0000000000000..20cbd3cf98fe8 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/EqlSession.java @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.session; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.xpack.eql.analysis.Analyzer; +import org.elasticsearch.xpack.eql.analysis.PreAnalyzer; +import org.elasticsearch.xpack.eql.execution.PlanExecutor; +import org.elasticsearch.xpack.eql.optimizer.Optimizer; +import org.elasticsearch.xpack.eql.plan.physical.PhysicalPlan; +import org.elasticsearch.xpack.eql.planner.Planner; +import org.elasticsearch.xpack.ql.index.IndexResolver; +import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan; +import org.elasticsearch.xpack.ql.util.Check; + +import java.util.List; + +import static org.elasticsearch.action.ActionListener.wrap; + +public class EqlSession { + + private final Client client; + private final Configuration configuration; + private final IndexResolver indexResolver; + + private final PreAnalyzer preAnalyzer; + private final Analyzer analyzer; + private final Optimizer optimizer; + private final Planner planner; + + public EqlSession(Client client, Configuration cfg, IndexResolver indexResolver, PreAnalyzer preAnalyzer, Analyzer analyzer, + Optimizer optimizer, Planner planner, PlanExecutor planExecutor) { + + this.client = client; + this.configuration = cfg; + this.indexResolver = indexResolver; + this.preAnalyzer = preAnalyzer; + this.analyzer = analyzer; + this.optimizer = optimizer; + this.planner = planner; + } + + public Client client() { + return client; + } + + public Optimizer optimizer() { + return optimizer; + } + + public Configuration configuration() { + return configuration; + } + + public void eql(String eql, List<Object> params, ActionListener<Results> listener) { + eqlExecutable(eql, params, wrap(e -> e.execute(this, listener), listener::onFailure)); + } + + public void eqlExecutable(String eql, List<Object> params, ActionListener<PhysicalPlan> listener) { + try { + physicalPlan(doParse(eql, params), listener); + } catch (Exception ex) { + listener.onFailure(ex); + } + } + + public void physicalPlan(LogicalPlan optimized, ActionListener<PhysicalPlan> listener) { + optimizedPlan(optimized, wrap(o -> listener.onResponse(planner.plan(o)), listener::onFailure)); + } + + public void optimizedPlan(LogicalPlan verified, ActionListener<LogicalPlan> listener) { + analyzedPlan(verified, wrap(v -> listener.onResponse(optimizer.optimize(v)), listener::onFailure)); + } + + public void analyzedPlan(LogicalPlan parsed, ActionListener<LogicalPlan> listener) { + if (parsed.analyzed()) { + listener.onResponse(parsed); + return; + } + + preAnalyze(parsed, wrap(p -> listener.onResponse(analyzer.analyze(p)), listener::onFailure)); + } + + private <T> void preAnalyze(LogicalPlan parsed, ActionListener<LogicalPlan> listener) { + String indexWildcard = Strings.arrayToCommaDelimitedString(configuration.indices()); + + indexResolver.resolveAsMergedMapping(indexWildcard, null, configuration.includeFrozen(), wrap(r -> { + listener.onResponse(preAnalyzer.preAnalyze(parsed, r)); + }, listener::onFailure)); + } + + private LogicalPlan doParse(String eql, List<Object> params) { + Check.isTrue(params.isEmpty(), "Parameters were given despite being ignored - server bug"); + //LogicalPlan plan = new EqlParser().createStatement(eql); + throw new UnsupportedOperationException(); + } +} \ No newline at end of file diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Executable.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Executable.java new file mode 100644 index 0000000000000..71dc188e492c4 --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Executable.java @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.session; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.xpack.ql.expression.Attribute; + +import java.util.List; + +public interface Executable { + + List<Attribute> output(); + + void execute(EqlSession session, ActionListener<Results> listener); +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java new file mode 100644 index 0000000000000..b0277e3b7931b --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Results.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.session; + +import org.apache.lucene.search.TotalHits; + +import java.util.List; + +public class Results { + + private final TotalHits totalHits; + + private final List<Object> results; + + public Results(TotalHits totalHits, List<Object> results) { + this.totalHits = totalHits; + this.results = results; + } + + public TotalHits totalHits() { + return totalHits; + } + + public List<Object> results() { + return results; + } +} diff --git a/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Sequence.java b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Sequence.java new file mode 100644 index 0000000000000..1f6584decac9e --- /dev/null +++ b/x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/session/Sequence.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.eql.session; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.search.SearchHit; + +import java.util.List; + +public class Sequence { + + private final List<Tuple<Object, List<SearchHit>>> events; + + public Sequence(List<Tuple<Object, List<SearchHit>>> events) { + this.events = events; + } + + public List<Tuple<Object, List<SearchHit>>> events() { + return events; + } +} diff --git a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/plan/logical/LeafPlan.java b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/plan/logical/LeafPlan.java index 8b1ef29c8755c..a0d89c9c83526 100644 --- a/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/plan/logical/LeafPlan.java +++ b/x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/plan/logical/LeafPlan.java @@ -10,7 +10,7 @@ import java.util.Collections; import java.util.List; -abstract class LeafPlan extends LogicalPlan { +public abstract class LeafPlan extends LogicalPlan { protected LeafPlan(Source source) { super(source, Collections.emptyList());