Skip to content

Commit

Permalink
Add shingle size, total model size, and model's hash ring to profile …
Browse files Browse the repository at this point in the history
…API (opendistro-for-elasticsearch#113)

Hash ring helps identify node X runs the AD job for a detector Y with models on node 1,2,3.  This helps oncalls locate logs. Total model size gives transparency relating to the current memory usage. What's more, shingle size help answer question "why my detector does not report anything?"

This PR adds the above info to profile API via a broadcast call that consults ModelManager and FeatureManager about current state pertaining to a detector.  Then these states are consolidated into information humans can parse.

This PR also queries all AD result indices instead of only current result index so that we can fetch a stopped detector's error after the result index with the error is rotated.

Testing done:
1. add unit tests for the newly added code
2. Run end-to-end testing to verify new profiles make senses when a detector stops running and is running
  • Loading branch information
kaituo committed May 19, 2020
1 parent 339c1c4 commit 829a9b6
Show file tree
Hide file tree
Showing 24 changed files with 1,488 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.ProfileName;
import com.amazon.opendistroforelasticsearch.ad.rest.RestAnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestDeleteAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction;
Expand Down Expand Up @@ -67,6 +66,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelAction;
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction;
Expand Down Expand Up @@ -187,8 +188,8 @@ public List<RestHandler> getRestHandlers(
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);

AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(profileRunner, ProfileName.getNames());
AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry, this.nodeFilter);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(profileRunner);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
clusterService,
Expand Down Expand Up @@ -433,7 +434,8 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new ActionHandler<>(ThresholdResultAction.INSTANCE, ThresholdResultTransportAction.class),
new ActionHandler<>(AnomalyResultAction.INSTANCE, AnomalyResultTransportAction.class),
new ActionHandler<>(CronAction.INSTANCE, CronTransportAction.class),
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class)
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class),
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParseException;
Expand All @@ -44,23 +45,30 @@
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;

import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfile;
import com.amazon.opendistroforelasticsearch.ad.model.DetectorState;
import com.amazon.opendistroforelasticsearch.ad.model.ProfileName;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileRequest;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileResponse;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.amazon.opendistroforelasticsearch.ad.util.MultiResponsesDelegateActionListener;

public class AnomalyDetectorProfileRunner {
private final Logger logger = LogManager.getLogger(AnomalyDetectorProfileRunner.class);
private Client client;
private NamedXContentRegistry xContentRegistry;
private DiscoveryNodeFilterer nodeFilter;
static String FAIL_TO_FIND_DETECTOR_MSG = "Fail to find detector with id: ";
static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector ";

public AnomalyDetectorProfileRunner(Client client, NamedXContentRegistry xContentRegistry) {
public AnomalyDetectorProfileRunner(Client client, NamedXContentRegistry xContentRegistry, DiscoveryNodeFilterer nodeFilter) {
this.client = client;
this.xContentRegistry = xContentRegistry;
this.nodeFilter = nodeFilter;
}

public void profile(String detectorId, ActionListener<DetectorProfile> listener, Set<ProfileName> profiles) {
Expand All @@ -70,9 +78,28 @@ public void profile(String detectorId, ActionListener<DetectorProfile> listener,
return;
}

// total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide when to consolidate results
// and return to users
int totalListener = 0;

if (profiles.contains(ProfileName.STATE)) {
totalListener++;
}

if (profiles.contains(ProfileName.ERROR)) {
totalListener++;
}

if (profiles.contains(ProfileName.COORDINATING_NODE)
|| profiles.contains(ProfileName.SHINGLE_SIZE)
|| profiles.contains(ProfileName.TOTAL_SIZE_IN_BYTES)
|| profiles.contains(ProfileName.MODELS)) {
totalListener++;
}

MultiResponsesDelegateActionListener<DetectorProfile> delegateListener = new MultiResponsesDelegateActionListener<DetectorProfile>(
listener,
profiles.size(),
totalListener,
"Fail to fetch profile for " + detectorId
);

Expand Down Expand Up @@ -102,6 +129,13 @@ private void prepareProfile(
if (profiles.contains(ProfileName.ERROR)) {
profileError(detectorId, enabledTimeMs, listener);
}

if (profiles.contains(ProfileName.COORDINATING_NODE)
|| profiles.contains(ProfileName.SHINGLE_SIZE)
|| profiles.contains(ProfileName.TOTAL_SIZE_IN_BYTES)
|| profiles.contains(ProfileName.MODELS)) {
profileModels(detectorId, profiles, listener);
}
} catch (IOException | XContentParseException | NullPointerException e) {
logger.error(e);
listener.failImmediately(FAIL_TO_GET_PROFILE_MSG, e);
Expand Down Expand Up @@ -280,8 +314,42 @@ private SearchRequest createLatestAnomalyResultRequest(String detectorId, long e

SearchSourceBuilder source = new SearchSourceBuilder().query(filterQuery).size(1).sort(sortQuery);

SearchRequest request = new SearchRequest(AnomalyResult.ANOMALY_RESULT_INDEX);
SearchRequest request = new SearchRequest(AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN);
request.source(source);
return request;
}

private void profileModels(
String detectorId,
Set<ProfileName> profiles,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
DiscoveryNode[] dataNodes = nodeFilter.getEligibleDataNodes();
ProfileRequest profileRequest = new ProfileRequest(detectorId, profiles, dataNodes);
client.execute(ProfileAction.INSTANCE, profileRequest, onModelResponse(detectorId, profiles, listener));
}

private ActionListener<ProfileResponse> onModelResponse(
String detectorId,
Set<ProfileName> profiles,
MultiResponsesDelegateActionListener<DetectorProfile> listener
) {
return ActionListener.wrap(profileResponse -> {
DetectorProfile profile = new DetectorProfile();
if (profiles.contains(ProfileName.COORDINATING_NODE)) {
profile.setCoordinatingNode(profileResponse.getCoordinatingNode());
}
if (profiles.contains(ProfileName.SHINGLE_SIZE)) {
profile.setShingleSize(profileResponse.getShingleSize());
}
if (profiles.contains(ProfileName.TOTAL_SIZE_IN_BYTES)) {
profile.setTotalSizeInBytes(profileResponse.getTotalSizeInBytes());
}
if (profiles.contains(ProfileName.MODELS)) {
profile.setModelProfile(profileResponse.getModelProfile());
}

listener.onResponse(profile);
}, listener::onFailure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,14 @@ public class CommonName {

// box type
public static final String BOX_TYPE_KEY = "box_type";

// ======================================
// Profile name
// ======================================
public static final String STATE = "state";
public static final String ERROR = "error";
public static final String COORDINATING_NODE = "coordinating_node";
public static final String SHINGLE_SIZE = "shingle_size";
public static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
public static final String MODELS = "models";
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,13 @@ private double[][] transpose(double[][] matrix) {
private long truncateToMinute(long epochMillis) {
return Instant.ofEpochMilli(epochMillis).truncatedTo(ChronoUnit.MINUTES).toEpochMilli();
}

public int getShingleSize(String detectorId) {
Deque<Entry<Long, double[]>> shingle = detectorIdsToTimeShingles.get(detectorId);
if (shingle != null) {
return shingle.size();
} else {
return -1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Arrays;
import java.util.Iterator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -1014,4 +1015,24 @@ private double computeRcfConfidence(RandomCutForest forest) {
return Math.max(0, confidence); // Replaces -0 wth 0 for cosmetic purpose.
}
}

/**
* Get all RCF partition's size corresponding to a detector. Thresholding models' size is a constant since they are small in size (KB).
* @param detectorId detector id
* @return a map of model id to its memory size
*/
public Map<String, Long> getModelSize(String detectorId) {
Map<String, Long> res = new HashMap<>();
forests
.entrySet()
.stream()
.filter(entry -> getDetectorIdForModelId(entry.getKey()).equals(detectorId))
.forEach(entry -> { res.put(entry.getKey(), estimateModelSize(entry.getValue().getModel())); });
thresholds
.entrySet()
.stream()
.filter(entry -> getDetectorIdForModelId(entry.getKey()).equals(detectorId))
.forEach(entry -> { res.put(entry.getKey(), 0L); });
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,56 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;

public class DetectorProfile implements ToXContentObject, Mergeable {
private DetectorState state;
private String error;

private static final String STATE_FIELD = "state";
private static final String ERROR_FIELD = "error";
private ModelProfile[] modelProfile;
private int shingleSize;
private String coordinatingNode;
private long totalSizeInBytes;

public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
return toXContent(builder, ToXContent.EMPTY_PARAMS);
}

public DetectorProfile() {
state = null;
error = null;
modelProfile = null;
shingleSize = -1;
coordinatingNode = null;
totalSizeInBytes = -1;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();

if (state != null) {
xContentBuilder.field(STATE_FIELD, state);
xContentBuilder.field(CommonName.STATE, state);
}
if (error != null) {
xContentBuilder.field(ERROR_FIELD, error);
xContentBuilder.field(CommonName.ERROR, error);
}
if (modelProfile != null && modelProfile.length > 0) {
xContentBuilder.startArray(CommonName.MODELS);
for (ModelProfile profile : modelProfile) {
profile.toXContent(xContentBuilder, params);
}
xContentBuilder.endArray();
}
if (shingleSize != -1) {
xContentBuilder.field(CommonName.SHINGLE_SIZE, shingleSize);
}
if (coordinatingNode != null) {
xContentBuilder.field(CommonName.COORDINATING_NODE, coordinatingNode);
}
if (totalSizeInBytes != -1) {
xContentBuilder.field(CommonName.TOTAL_SIZE_IN_BYTES, totalSizeInBytes);
}

return xContentBuilder.endObject();
}

Expand All @@ -64,6 +93,38 @@ public void setError(String error) {
this.error = error;
}

public ModelProfile[] getModelProfile() {
return modelProfile;
}

public void setModelProfile(ModelProfile[] modelProfile) {
this.modelProfile = modelProfile;
}

public int getShingleSize() {
return shingleSize;
}

public void setShingleSize(int shingleSize) {
this.shingleSize = shingleSize;
}

public String getCoordinatingNode() {
return coordinatingNode;
}

public void setCoordinatingNode(String coordinatingNode) {
this.coordinatingNode = coordinatingNode;
}

public long getTotalSizeInBytes() {
return totalSizeInBytes;
}

public void setTotalSizeInBytes(long totalSizeInBytes) {
this.totalSizeInBytes = totalSizeInBytes;
}

@Override
public void merge(Mergeable other) {
if (this == other || other == null || getClass() != other.getClass()) {
Expand Down
Loading

0 comments on commit 829a9b6

Please sign in to comment.