Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Forward porting profile API related PRs #128

Merged
merged 3 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult;
import com.amazon.opendistroforelasticsearch.ad.model.ProfileName;
import com.amazon.opendistroforelasticsearch.ad.rest.RestAnomalyDetectorJobAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestDeleteAnomalyDetectorAction;
import com.amazon.opendistroforelasticsearch.ad.rest.RestExecuteAnomalyDetectorAction;
Expand Down Expand Up @@ -67,6 +66,8 @@
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteDetectorTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelAction;
import com.amazon.opendistroforelasticsearch.ad.transport.DeleteModelTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction;
import com.amazon.opendistroforelasticsearch.ad.transport.ProfileTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultAction;
import com.amazon.opendistroforelasticsearch.ad.transport.RCFResultTransportAction;
import com.amazon.opendistroforelasticsearch.ad.transport.StopDetectorAction;
Expand Down Expand Up @@ -125,10 +126,12 @@
import java.security.PrivilegedAction;
import java.time.Clock;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -187,8 +190,15 @@ public List<RestHandler> getRestHandlers(
jobRunner.setAnomalyResultHandler(anomalyResultHandler);
jobRunner.setSettings(settings);

AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(client, this.xContentRegistry);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(profileRunner, ProfileName.getNames());
AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner(
client,
this.xContentRegistry,
this.nodeFilter,
indexNameExpressionResolver,
clusterService,
Calendar.getInstance(TimeZone.getTimeZone("UTC"))
);
RestGetAnomalyDetectorAction restGetAnomalyDetectorAction = new RestGetAnomalyDetectorAction(profileRunner);
RestIndexAnomalyDetectorAction restIndexAnomalyDetectorAction = new RestIndexAnomalyDetectorAction(
settings,
clusterService,
Expand Down Expand Up @@ -433,7 +443,8 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new ActionHandler<>(ThresholdResultAction.INSTANCE, ThresholdResultTransportAction.class),
new ActionHandler<>(AnomalyResultAction.INSTANCE, AnomalyResultTransportAction.class),
new ActionHandler<>(CronAction.INSTANCE, CronTransportAction.class),
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class)
new ActionHandler<>(ADStatsNodesAction.INSTANCE, ADStatsNodesTransportAction.class),
new ActionHandler<>(ProfileAction.INSTANCE, ProfileTransportAction.class)
);
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,14 @@ public class CommonName {

// box type
public static final String BOX_TYPE_KEY = "box_type";

// ======================================
// Profile name
// ======================================
public static final String STATE = "state";
public static final String ERROR = "error";
public static final String COORDINATING_NODE = "coordinating_node";
public static final String SHINGLE_SIZE = "shingle_size";
public static final String TOTAL_SIZE_IN_BYTES = "total_size_in_bytes";
public static final String MODELS = "models";
}
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,13 @@ private double[][] transpose(double[][] matrix) {
private long truncateToMinute(long epochMillis) {
return Instant.ofEpochMilli(epochMillis).truncatedTo(ChronoUnit.MINUTES).toEpochMilli();
}

public int getShingleSize(String detectorId) {
Deque<Entry<Long, double[]>> shingle = detectorIdsToTimeShingles.get(detectorId);
if (shingle != null) {
return shingle.size();
} else {
return -1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Arrays;
import java.util.Iterator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -1014,4 +1015,24 @@ private double computeRcfConfidence(RandomCutForest forest) {
return Math.max(0, confidence); // Replaces -0 wth 0 for cosmetic purpose.
}
}

/**
* Get all RCF partition's size corresponding to a detector. Thresholding models' size is a constant since they are small in size (KB).
* @param detectorId detector id
* @return a map of model id to its memory size
*/
public Map<String, Long> getModelSize(String detectorId) {
Map<String, Long> res = new HashMap<>();
forests
.entrySet()
.stream()
.filter(entry -> getDetectorIdForModelId(entry.getKey()).equals(detectorId))
.forEach(entry -> { res.put(entry.getKey(), estimateModelSize(entry.getValue().getModel())); });
thresholds
.entrySet()
.stream()
.filter(entry -> getDetectorIdForModelId(entry.getKey()).equals(detectorId))
.forEach(entry -> { res.put(entry.getKey(), 0L); });
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,56 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;

public class DetectorProfile implements ToXContentObject, Mergeable {
private DetectorState state;
private String error;

private static final String STATE_FIELD = "state";
private static final String ERROR_FIELD = "error";
private ModelProfile[] modelProfile;
private int shingleSize;
private String coordinatingNode;
private long totalSizeInBytes;

public XContentBuilder toXContent(XContentBuilder builder) throws IOException {
return toXContent(builder, ToXContent.EMPTY_PARAMS);
}

public DetectorProfile() {
state = null;
error = null;
modelProfile = null;
shingleSize = -1;
coordinatingNode = null;
totalSizeInBytes = -1;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
XContentBuilder xContentBuilder = builder.startObject();

if (state != null) {
xContentBuilder.field(STATE_FIELD, state);
xContentBuilder.field(CommonName.STATE, state);
}
if (error != null) {
xContentBuilder.field(ERROR_FIELD, error);
xContentBuilder.field(CommonName.ERROR, error);
}
if (modelProfile != null && modelProfile.length > 0) {
xContentBuilder.startArray(CommonName.MODELS);
for (ModelProfile profile : modelProfile) {
profile.toXContent(xContentBuilder, params);
}
xContentBuilder.endArray();
}
if (shingleSize != -1) {
xContentBuilder.field(CommonName.SHINGLE_SIZE, shingleSize);
}
if (coordinatingNode != null) {
xContentBuilder.field(CommonName.COORDINATING_NODE, coordinatingNode);
}
if (totalSizeInBytes != -1) {
xContentBuilder.field(CommonName.TOTAL_SIZE_IN_BYTES, totalSizeInBytes);
}

return xContentBuilder.endObject();
}

Expand All @@ -64,6 +93,38 @@ public void setError(String error) {
this.error = error;
}

public ModelProfile[] getModelProfile() {
return modelProfile;
}

public void setModelProfile(ModelProfile[] modelProfile) {
this.modelProfile = modelProfile;
}

public int getShingleSize() {
return shingleSize;
}

public void setShingleSize(int shingleSize) {
this.shingleSize = shingleSize;
}

public String getCoordinatingNode() {
return coordinatingNode;
}

public void setCoordinatingNode(String coordinatingNode) {
this.coordinatingNode = coordinatingNode;
}

public long getTotalSizeInBytes() {
return totalSizeInBytes;
}

public void setTotalSizeInBytes(long totalSizeInBytes) {
this.totalSizeInBytes = totalSizeInBytes;
}

@Override
public void merge(Mergeable other) {
if (this == other || other == null || getClass() != other.getClass()) {
Expand All @@ -76,7 +137,18 @@ public void merge(Mergeable other) {
if (otherProfile.getError() != null) {
this.error = otherProfile.getError();
}

if (otherProfile.getCoordinatingNode() != null) {
this.coordinatingNode = otherProfile.getCoordinatingNode();
}
if (otherProfile.getShingleSize() != -1) {
this.shingleSize = otherProfile.getShingleSize();
}
if (otherProfile.getModelProfile() != null) {
this.modelProfile = otherProfile.getModelProfile();
}
if (otherProfile.getTotalSizeInBytes() != -1) {
this.totalSizeInBytes = otherProfile.getTotalSizeInBytes();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.amazon.opendistroforelasticsearch.ad.model;

/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

import java.io.IOException;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;

public class ModelProfile implements Writeable, ToXContent {
// field name in toXContent
public static final String MODEL_ID = "model_id";
public static final String MODEL_SIZE_IN_BYTES = "model_size_in_bytes";
public static final String NODE_ID = "node_id";

private final String modelId;
private final long modelSizeInBytes;
private final String nodeId;

public ModelProfile(String modelId, long modelSize, String nodeId) {
super();
this.modelId = modelId;
this.modelSizeInBytes = modelSize;
this.nodeId = nodeId;
}

public ModelProfile(StreamInput in) throws IOException {
modelId = in.readString();
modelSizeInBytes = in.readVLong();
nodeId = in.readString();
}

public String getModelId() {
return modelId;
}

public long getModelSize() {
return modelSizeInBytes;
}

public String getNodeId() {
return nodeId;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(MODEL_ID, modelId);
if (modelSizeInBytes > 0) {
builder.field(MODEL_SIZE_IN_BYTES, modelSizeInBytes);
}
builder.field(NODE_ID, nodeId);
builder.endObject();
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(modelId);
out.writeVLong(modelSizeInBytes);
out.writeString(nodeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@
import java.util.HashSet;
import java.util.Set;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;

public enum ProfileName {
STATE("state"),
ERROR("error");
STATE(CommonName.STATE),
ERROR(CommonName.ERROR),
COORDINATING_NODE(CommonName.COORDINATING_NODE),
SHINGLE_SIZE(CommonName.SHINGLE_SIZE),
TOTAL_SIZE_IN_BYTES(CommonName.TOTAL_SIZE_IN_BYTES),
MODELS(CommonName.MODELS);

private String name;

Expand All @@ -38,26 +44,20 @@ public String getName() {
return name;
}

/**
* Get set of profile names
*
* @return set of profile names
*/
public static Set<String> getNames() {
Set<String> names = new HashSet<>();

for (ProfileName statName : ProfileName.values()) {
names.add(statName.getName());
}
return names;
}

public static ProfileName getName(String name) {
switch (name) {
case "state":
case CommonName.STATE:
return STATE;
case "error":
case CommonName.ERROR:
return ERROR;
case CommonName.COORDINATING_NODE:
return COORDINATING_NODE;
case CommonName.SHINGLE_SIZE:
return SHINGLE_SIZE;
case CommonName.TOTAL_SIZE_IN_BYTES:
return TOTAL_SIZE_IN_BYTES;
case CommonName.MODELS:
return MODELS;
default:
throw new IllegalArgumentException("Unsupported profile types");
}
Expand Down
Loading