Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Get feature data #250

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@

package com.amazon.opendistroforelasticsearch.ad.feature;

import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_ENTITIES_PER_QUERY;
import static org.apache.commons.math3.linear.MatrixUtils.createRealMatrix;

import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -35,20 +38,38 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.range.InternalDateRange;
import org.elasticsearch.search.aggregations.bucket.range.InternalDateRange.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.InternalTDigestPercentiles;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.Min;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
import org.elasticsearch.search.aggregations.metrics.Percentile;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;

import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages;
import com.amazon.opendistroforelasticsearch.ad.dataprocessor.Interpolator;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.Feature;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils;
Expand All @@ -59,6 +80,8 @@
public class SearchFeatureDao {

protected static final String AGG_NAME_MAX = "max_timefield";
protected static final String AGG_NAME_MIN = "min_timefield";
protected static final String AGG_NAME_TERM = "term_agg";

private static final Logger logger = LogManager.getLogger(SearchFeatureDao.class);

Expand All @@ -67,6 +90,8 @@ public class SearchFeatureDao {
private final NamedXContentRegistry xContent;
private final Interpolator interpolator;
private final ClientUtil clientUtil;
private ThreadPool threadPool;
private int maxEntitiesPerQuery;

/**
* Constructor injection.
Expand All @@ -75,12 +100,26 @@ public class SearchFeatureDao {
* @param xContent ES XContentRegistry
* @param interpolator interpolator for missing values
* @param clientUtil utility for ES client
* @param threadPool accessor to different threadpools
* @param settings ES settings
* @param clusterService ES ClusterService
*/
public SearchFeatureDao(Client client, NamedXContentRegistry xContent, Interpolator interpolator, ClientUtil clientUtil) {
public SearchFeatureDao(
Client client,
NamedXContentRegistry xContent,
Interpolator interpolator,
ClientUtil clientUtil,
ThreadPool threadPool,
Settings settings,
ClusterService clusterService
) {
this.client = client;
this.xContent = xContent;
this.interpolator = interpolator;
this.clientUtil = clientUtil;
this.threadPool = threadPool;
this.maxEntitiesPerQuery = MAX_ENTITIES_PER_QUERY.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_ENTITIES_PER_QUERY, it -> maxEntitiesPerQuery = it);
}

/**
Expand Down Expand Up @@ -129,6 +168,47 @@ private Optional<Long> getLatestDataTime(SearchResponse searchResponse) {
.map(agg -> (long) agg.getValue());
}

/**
* Get the entity's earliest and latest timestamps
* @param detector detector config
* @param entityName entity's name
* @param listener listener to return back the requested timestamps
*/
public void getEntityMinMaxDataTime(
AnomalyDetector detector,
String entityName,
ActionListener<Entry<Optional<Long>, Optional<Long>>> listener
) {
TermQueryBuilder term = new TermQueryBuilder(detector.getCategoryField().get(0), entityName);
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().filter(term);

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(internalFilterQuery)
.aggregation(AggregationBuilders.max(AGG_NAME_MAX).field(detector.getTimeField()))
.aggregation(AggregationBuilders.min(AGG_NAME_MIN).field(detector.getTimeField()))
.trackTotalHits(false)
.size(0);
SearchRequest searchRequest = new SearchRequest().indices(detector.getIndices().toArray(new String[0])).source(searchSourceBuilder);
client
.search(
searchRequest,
ActionListener.wrap(response -> { listener.onResponse(parseMinMaxDataTime(response)); }, listener::onFailure)
);
}

private Entry<Optional<Long>, Optional<Long>> parseMinMaxDataTime(SearchResponse searchResponse) {
Optional<Map<String, Aggregation>> mapOptional = Optional
.ofNullable(searchResponse)
.map(SearchResponse::getAggregations)
.map(aggs -> aggs.asMap());

Optional<Long> latest = mapOptional.map(map -> (Max) map.get(AGG_NAME_MAX)).map(agg -> (long) agg.getValue());

Optional<Long> earliest = mapOptional.map(map -> (Min) map.get(AGG_NAME_MIN)).map(agg -> (long) agg.getValue());

return new SimpleImmutableEntry<>(earliest, latest);
}

/**
* Gets features for the given time period.
* This function also adds given detector to negative cache before sending es request.
Expand Down Expand Up @@ -569,4 +649,137 @@ private Optional<double[]> parseAggregations(Optional<Aggregations> aggregations
)
.filter(result -> Arrays.stream(result).noneMatch(d -> Double.isNaN(d) || Double.isInfinite(d)));
}

public void getColdStartSamplesForPeriods(
AnomalyDetector detector,
List<Entry<Long, Long>> ranges,
String entityName,
ActionListener<List<Optional<double[]>>> listener
) throws IOException {
SearchRequest request = createColdStartFeatureSearchRequest(detector, ranges, entityName);

client.search(request, ActionListener.wrap(response -> {
Aggregations aggs = response.getAggregations();
if (aggs == null) {
listener.onResponse(Collections.emptyList());
return;
}

// Extract buckets and order by from_as_string. Currently by default it is ascending. Better not to assume it.
// Example responses from date range bucket aggregation:
// "aggregations":{"date_range":{"buckets":[{"key":"1598865166000-1598865226000","from":1.598865166E12,"
// from_as_string":"1598865166000","to":1.598865226E12,"to_as_string":"1598865226000","doc_count":3,
// "deny_max":{"value":154.0}},{"key":"1598869006000-1598869066000","from":1.598869006E12,
// "from_as_string":"1598869006000","to":1.598869066E12,"to_as_string":"1598869066000","doc_count":3,
// "deny_max":{"value":141.0}},
listener
.onResponse(
aggs
.asList()
.stream()
.filter(InternalDateRange.class::isInstance)
.flatMap(agg -> ((InternalDateRange) agg).getBuckets().stream())
.filter(bucket -> bucket.getFrom() != null)
.sorted(Comparator.comparing((Bucket bucket) -> Long.valueOf(bucket.getFromAsString())))
.map(bucket -> parseBucket(bucket, detector.getEnabledFeatureIds()))
.collect(Collectors.toList())
);
}, listener::onFailure));
}

/**
* Get features by entities. An entity is one combination of particular
* categorical fields’ value. A categorical field in this setting refers to
* an Elasticsearch field of type keyword or ip. Specifically, an entity
* can be the IP address 182.3.4.5.
* @param detector Accessor to the detector object
* @param startMilli Start of time range to query
* @param endMilli End of time range to query
* @param listener Listener to return entities and their data points
*/
public void getFeaturesByEntities(
AnomalyDetector detector,
long startMilli,
long endMilli,
ActionListener<Map<String, double[]>> listener
) {
try {
RangeQueryBuilder rangeQuery = new RangeQueryBuilder(detector.getTimeField())
.gte(startMilli)
.lt(endMilli)
.format("epoch_millis");

BoolQueryBuilder internalFilterQuery = new BoolQueryBuilder().filter(detector.getFilterQuery()).filter(rangeQuery);

/* Terms aggregation implementation.*/
// Support one category field
TermsAggregationBuilder termsAgg = AggregationBuilders
.terms(AGG_NAME_TERM)
.field(detector.getCategoryField().get(0))
.size(maxEntitiesPerQuery);
for (Feature feature : detector.getFeatureAttributes()) {
AggregatorFactories.Builder internalAgg = ParseUtils
.parseAggregators(feature.getAggregation().toString(), xContent, feature.getId());
termsAgg.subAggregation(internalAgg.getAggregatorFactories().iterator().next());
}

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.query(internalFilterQuery)
.size(0)
.aggregation(termsAgg)
.trackTotalHits(false);
SearchRequest searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);

ActionListener<SearchResponse> termsListener = ActionListener.wrap(response -> {
Aggregations aggs = response.getAggregations();
if (aggs == null) {
listener.onResponse(Collections.emptyMap());
return;
}

Map<String, double[]> results = aggs
.asList()
.stream()
.filter(agg -> AGG_NAME_TERM.equals(agg.getName()))
.flatMap(agg -> ((Terms) agg).getBuckets().stream())
.collect(
Collectors.toMap(Terms.Bucket::getKeyAsString, bucket -> parseBucket(bucket, detector.getEnabledFeatureIds()).get())
);

listener.onResponse(results);
}, listener::onFailure);

client
.search(
searchRequest,
new ThreadedActionListener<>(logger, threadPool, AnomalyDetectorPlugin.AD_THREAD_POOL_NAME, termsListener, false)
);

} catch (IOException e) {
throw new EndRunException(detector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, true);
}
}

private SearchRequest createColdStartFeatureSearchRequest(AnomalyDetector detector, List<Entry<Long, Long>> ranges, String entityName) {
try {
SearchSourceBuilder searchSourceBuilder = ParseUtils.generateEntityColdStartQuery(detector, ranges, entityName, xContent);
return new SearchRequest(detector.getIndices().toArray(new String[0]), searchSourceBuilder);
} catch (IOException e) {
logger
.warn(
"Failed to create cold start feature search request for "
+ detector.getDetectorId()
+ " from "
+ ranges.get(0).getKey()
+ " to "
+ ranges.get(ranges.size() - 1).getKey(),
e
);
throw new IllegalStateException(e);
}
}

private Optional<double[]> parseBucket(MultiBucketsAggregation.Bucket bucket, List<String> featureIds) {
return parseAggregations(Optional.ofNullable(bucket).map(b -> b.getAggregations()), featureIds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
Expand All @@ -44,6 +46,7 @@

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.Feature;
import com.amazon.opendistroforelasticsearch.ad.model.FeatureData;

/**
* Parsing utility functions.
Expand Down Expand Up @@ -341,4 +344,50 @@ public static String generateInternalFeatureQueryTemplate(AnomalyDetector detect

return internalSearchSourceBuilder.toString();
}

public static SearchSourceBuilder generateEntityColdStartQuery(
AnomalyDetector detector,
List<Entry<Long, Long>> ranges,
String entityName,
NamedXContentRegistry xContentRegistry
) throws IOException {

TermQueryBuilder term = new TermQueryBuilder(detector.getCategoryField().get(0), entityName);
BoolQueryBuilder internalFilterQuery = QueryBuilders.boolQuery().filter(detector.getFilterQuery()).filter(term);

DateRangeAggregationBuilder dateRangeBuilder = dateRange("date_range").field(detector.getTimeField()).format("epoch_millis");
for (Entry<Long, Long> range : ranges) {
dateRangeBuilder.addRange(range.getKey(), range.getValue());
}

if (detector.getFeatureAttributes() != null) {
for (Feature feature : detector.getFeatureAttributes()) {
AggregatorFactories.Builder internalAgg = parseAggregators(
feature.getAggregation().toString(),
xContentRegistry,
feature.getId()
);
dateRangeBuilder.subAggregation(internalAgg.getAggregatorFactories().iterator().next());
}
}

return new SearchSourceBuilder().query(internalFilterQuery).size(0).aggregation(dateRangeBuilder);
}

/**
* Map feature data to its Id and name
* @param currentFeature Feature data
* @param detector Detector Config object
* @return a list of feature data with Id and name
*/
public static List<FeatureData> getFeatureData(double[] currentFeature, AnomalyDetector detector) {
List<String> featureIds = detector.getEnabledFeatureIds();
List<String> featureNames = detector.getEnabledFeatureNames();
int featureLen = featureIds.size();
List<FeatureData> featureData = new ArrayList<>();
for (int i = 0; i < featureLen; i++) {
featureData.add(new FeatureData(featureIds.get(i), featureNames.get(i), currentFeature[i]));
}
return featureData;
}
}
Loading