Skip to content

Commit

Permalink
EQL: Hook engine to Elasticsearch (#52828)
Browse files Browse the repository at this point in the history
Add query execution and return actual results returned from
Elasticsearch inside the tests

(cherry picked from commit 3e03928)
  • Loading branch information
costin committed Feb 27, 2020
1 parent 69b78f7 commit 40bc06f
Show file tree
Hide file tree
Showing 14 changed files with 396 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.elasticsearch.client;

import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.eql.EqlSearchRequest;
import org.elasticsearch.client.eql.EqlSearchResponse;
import org.junit.Before;
Expand All @@ -33,9 +35,23 @@ public void setupRemoteClusterConfig() throws Exception {
}

public void testBasicSearch() throws Exception {

Request doc1 = new Request(HttpPut.METHOD_NAME, "/index/_doc/1");
doc1.setJsonEntity("{\"event_subtype_full\": \"already_running\", " +
"\"event_type\": \"process\", " +
"\"event_type_full\": \"process_event\", " +
"\"opcode\": 3," +
"\"pid\": 0," +
"\"process_name\": \"System Idle Process\"," +
"\"serial_event_id\": 1," +
"\"subtype\": \"create\"," +
"\"timestamp\": 116444736000000000," +
"\"unique_pid\": 1}");
client().performRequest(doc1);
client().performRequest(new Request(HttpPost.METHOD_NAME, "/_refresh"));

EqlClient eql = highLevelClient().eql();
// TODO: Add real checks when end-to-end basic functionality is implemented
EqlSearchRequest request = new EqlSearchRequest("test", "test");
EqlSearchRequest request = new EqlSearchRequest("index", "process where true");
EqlSearchResponse response = execute(request, eql::search, eql::searchAsync);
assertNotNull(response);
assertFalse(response.isTimeout());
Expand Down
1 change: 1 addition & 0 deletions docs/reference/eql/search.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ specified in the `rule` parameter. The EQL query matches events with an
----
GET sec_logs/_eql/search
{
"event_type_field": "event.category",
"rule": """
process where process.name == "cmd.exe"
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ setup:
- index:
_index: eql_test
_id: 1
- str: test1
int: 1
- event_type: process
user: SYSTEM

---
# Testing round-trip and the basic shape of the response
# Currently not implemented or wired and always returns empty result.
# TODO: define more test once everything is wired up
"Execute some EQL.":
- do:
eql.search:
Expand All @@ -22,6 +20,7 @@ setup:
rule: "process where user = 'SYSTEM'"

- match: {timed_out: false}
- match: {took: 0}
- match: {hits.total.value: 1}
- match: {hits.total.relation: "eq"}
- match: {hits.events.0._source.user: "SYSTEM"}

Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.execution.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.eql.session.Configuration;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.index.IndexResolver;
import org.elasticsearch.xpack.ql.util.StringUtils;

import java.util.Collections;
import java.util.List;

public class Querier {

private static final Logger log = LogManager.getLogger(Querier.class);

private final Configuration cfg;
private final Client client;
private final TimeValue keepAlive;
private final QueryBuilder filter;


public Querier(EqlSession eqlSession) {
this.cfg = eqlSession.configuration();
this.client = eqlSession.client();
this.keepAlive = cfg.requestTimeout();
this.filter = cfg.filter();
}


public void query(List<Attribute> output, QueryContainer container, String index, ActionListener<Results> listener) {
// prepare the request
SearchSourceBuilder sourceBuilder = SourceGenerator.sourceBuilder(container, filter, cfg.size());

// set query timeout
sourceBuilder.timeout(cfg.requestTimeout());

if (log.isTraceEnabled()) {
log.trace("About to execute query {} on {}", StringUtils.toString(sourceBuilder), index);
}

SearchRequest search = prepareRequest(client, sourceBuilder, cfg.requestTimeout(), false,
Strings.commaDelimitedListToStringArray(index));

ActionListener<SearchResponse> l = new SearchAfterListener(listener, client, cfg, output, container, search);

client.search(search, l);
}

public static SearchRequest prepareRequest(Client client, SearchSourceBuilder source, TimeValue timeout, boolean includeFrozen,
String... indices) {
return client.prepareSearch(indices)
// always track total hits accurately
.setTrackTotalHits(true)
.setAllowPartialSearchResults(false)
.setSource(source)
.setTimeout(timeout)
.setIndicesOptions(
includeFrozen ? IndexResolver.FIELD_CAPS_FROZEN_INDICES_OPTIONS : IndexResolver.FIELD_CAPS_INDICES_OPTIONS)
.request();
}

protected static void logSearchResponse(SearchResponse response, Logger logger) {
List<Aggregation> aggs = Collections.emptyList();
if (response.getAggregations() != null) {
aggs = response.getAggregations().asList();
}
StringBuilder aggsNames = new StringBuilder();
for (int i = 0; i < aggs.size(); i++) {
aggsNames.append(aggs.get(i).getName() + (i + 1 == aggs.size() ? "" : ", "));
}

logger.trace("Got search response [hits {} {}, {} aggregations: [{}], {} failed shards, {} skipped shards, "
+ "{} successful shards, {} total shards, took {}, timed out [{}]]", response.getHits().getTotalHits().relation.toString(),
response.getHits().getTotalHits().value, aggs.size(), aggsNames, response.getFailedShards(), response.getSkippedShards(),
response.getSuccessfulShards(), response.getTotalShards(), response.getTook(), response.isTimedOut());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.execution.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.xpack.eql.EqlIllegalArgumentException;
import org.elasticsearch.xpack.eql.execution.search.extractor.FieldHitExtractor;
import org.elasticsearch.xpack.eql.querydsl.container.ComputedRef;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.eql.querydsl.container.SearchHitFieldRef;
import org.elasticsearch.xpack.eql.session.Configuration;
import org.elasticsearch.xpack.eql.session.Results;
import org.elasticsearch.xpack.ql.execution.search.FieldExtraction;
import org.elasticsearch.xpack.ql.execution.search.extractor.ComputingExtractor;
import org.elasticsearch.xpack.ql.execution.search.extractor.HitExtractor;
import org.elasticsearch.xpack.ql.expression.Attribute;
import org.elasticsearch.xpack.ql.expression.gen.pipeline.HitExtractorInput;
import org.elasticsearch.xpack.ql.expression.gen.pipeline.Pipe;
import org.elasticsearch.xpack.ql.expression.gen.pipeline.ReferenceInput;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

class SearchAfterListener implements ActionListener<SearchResponse> {

private static final Logger log = LogManager.getLogger(SearchAfterListener.class);

private final ActionListener<Results> listener;

private final Client client;
private final Configuration cfg;
private final List<Attribute> output;
private final QueryContainer container;
private final SearchRequest request;

SearchAfterListener(ActionListener<Results> listener, Client client, Configuration cfg, List<Attribute> output,
QueryContainer container, SearchRequest request) {

this.listener = listener;

this.client = client;
this.cfg = cfg;
this.output = output;
this.container = container;
this.request = request;
}

@Override
public void onResponse(SearchResponse response) {
try {
ShardSearchFailure[] failures = response.getShardFailures();
if (CollectionUtils.isEmpty(failures) == false) {
listener.onFailure(new EqlIllegalArgumentException(failures[0].reason(), failures[0].getCause()));
} else {
handleResponse(response, listener);
}
} catch (Exception ex) {
listener.onFailure(ex);
}
}

private void handleResponse(SearchResponse response, ActionListener<Results> listener) {
// create response extractors for the first time
List<Tuple<FieldExtraction, String>> refs = container.fields();

List<HitExtractor> exts = new ArrayList<>(refs.size());
for (Tuple<FieldExtraction, String> ref : refs) {
exts.add(createExtractor(ref.v1()));
}

if (log.isTraceEnabled()) {
Querier.logSearchResponse(response, log);
}

List<?> results = Arrays.asList(response.getHits().getHits());
listener.onResponse(new Results(response.getHits().getTotalHits(), response.getTook(), response.isTimedOut(), results));
}

private HitExtractor createExtractor(FieldExtraction ref) {
if (ref instanceof SearchHitFieldRef) {
SearchHitFieldRef f = (SearchHitFieldRef) ref;
return new FieldHitExtractor(f.name(), f.fullFieldName(), f.getDataType(), cfg.zoneId(), f.useDocValue(), f.hitName(), false);
}

if (ref instanceof ComputedRef) {
Pipe proc = ((ComputedRef) ref).processor();
// collect hitNames
Set<String> hitNames = new LinkedHashSet<>();
proc = proc.transformDown(l -> {
HitExtractor he = createExtractor(l.context());
hitNames.add(he.hitName());

if (hitNames.size() > 1) {
throw new EqlIllegalArgumentException("Multi-level nested fields [{}] not supported yet", hitNames);
}

return new HitExtractorInput(l.source(), l.expression(), he);
}, ReferenceInput.class);
String hitName = null;
if (hitNames.size() == 1) {
hitName = hitNames.iterator().next();
}
return new ComputingExtractor(proc.asProcessor(), hitName);
}

throw new EqlIllegalArgumentException("Unexpected value reference {}", ref.getClass());
}

@Override
public void onFailure(Exception ex) {
listener.onFailure(ex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ public static SearchSourceBuilder sourceBuilder(QueryContainer container, QueryB
sortBuilder.build(source);
optimize(sortBuilder, source);

// set fetch size
if (size != null) {
int sz = size;

if (source.size() == -1) {
source.size(sz);
}
}

return source;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.eql.execution.search.extractor;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.xpack.ql.execution.search.extractor.AbstractFieldHitExtractor;
import org.elasticsearch.xpack.ql.type.DataType;
import org.elasticsearch.xpack.ql.util.DateUtils;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;

import static org.elasticsearch.xpack.ql.type.DataTypes.DATETIME;

public class FieldHitExtractor extends AbstractFieldHitExtractor {

static final String NAME = "f";

public FieldHitExtractor(StreamInput in) throws IOException {
super(in);
}

public FieldHitExtractor(String name, String fullFieldName, DataType dataType, ZoneId zoneId, boolean useDocValue, String hitName,
boolean arrayLeniency) {
super(name, fullFieldName, dataType, zoneId, useDocValue, hitName, arrayLeniency);
}

@Override
public String getWriteableName() {
return NAME;
}

@Override
protected ZoneId readZoneId(StreamInput in) throws IOException {
return DateUtils.UTC;
}

@Override
protected Object unwrapCustomValue(Object values) {
DataType dataType = dataType();

if (dataType == DATETIME) {
if (values instanceof String) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(values.toString())), zoneId());
}
}

return null;
}

@Override
protected boolean isPrimitive(List<?> list) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.eql.plan.physical;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.xpack.eql.execution.search.Querier;
import org.elasticsearch.xpack.eql.querydsl.container.QueryContainer;
import org.elasticsearch.xpack.eql.session.EqlSession;
import org.elasticsearch.xpack.eql.session.Results;
Expand Down Expand Up @@ -49,7 +50,7 @@ public List<Attribute> output() {

@Override
public void execute(EqlSession session, ActionListener<Results> listener) {
throw new UnsupportedOperationException();
new Querier(session).query(output, queryContainer, index, listener);
}

@Override
Expand Down
Loading

0 comments on commit 40bc06f

Please sign in to comment.