Skip to content

Commit

Permalink
support return AD job when get detector (opendistro-for-elasticsearch#50
Browse files Browse the repository at this point in the history
)

* support return AD job when get detector
* handle null index response
  • Loading branch information
ylwu-amzn authored and kaituo committed Mar 6, 2020
1 parent 8d2da6a commit 57daff6
Show file tree
Hide file tree
Showing 14 changed files with 418 additions and 185 deletions.
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

buildscript {
ext {
es_mv = '7.2'
es_group = "org.elasticsearch"
es_version = '7.4.2'
es_distribution = 'oss-zip'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class AnomalyDetector implements ToXContentObject {
private static final String WINDOW_DELAY_FIELD = "window_delay";
private static final String LAST_UPDATE_TIME_FIELD = "last_update_time";
public static final String UI_METADATA_FIELD = "ui_metadata";
public static final String ENABLED_FIELD = "enabled";

private final String detectorId;
private final Long version;
Expand Down Expand Up @@ -161,9 +160,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(DETECTION_INTERVAL_FIELD, detectionInterval)
.field(WINDOW_DELAY_FIELD, windowDelay)
.field(SCHEMA_VERSION_FIELD, schemaVersion);
if (params.param(ENABLED_FIELD) != null) {
xContentBuilder.field(ENABLED_FIELD, params.paramAsBoolean(ENABLED_FIELD, false));
}

if (uiMetadata != null && !uiMetadata.isEmpty()) {
xContentBuilder.field(UI_METADATA_FIELD, uiMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ public class AnomalyDetectorJob implements ToXContentObject, ScheduledJobParamet
private static final String SCHEDULE_FIELD = "schedule";
private static final String IS_ENABLED_FIELD = "enabled";
private static final String ENABLED_TIME_FIELD = "enabled_time";
private static final String DISABLED_TIME_FIELD = "disabled_time";

private final String name;
private final Schedule schedule;
private final Boolean isEnabled;
private final Instant enabledTime;
private final Instant disabledTime;
private final Instant lastUpdateTime;
private final Long lockDurationSeconds;

Expand All @@ -56,13 +58,15 @@ public AnomalyDetectorJob(
Schedule schedule,
Boolean isEnabled,
Instant enabledTime,
Instant disabledTime,
Instant lastUpdateTime,
Long lockDurationSeconds
) {
this.name = name;
this.schedule = schedule;
this.isEnabled = isEnabled;
this.enabledTime = enabledTime;
this.disabledTime = disabledTime;
this.lastUpdateTime = lastUpdateTime;
this.lockDurationSeconds = lockDurationSeconds;
}
Expand All @@ -77,6 +81,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.field(ENABLED_TIME_FIELD, enabledTime.toEpochMilli())
.field(LAST_UPDATE_TIME_FIELD, lastUpdateTime.toEpochMilli())
.field(LOCK_DURATION_SECONDS, lockDurationSeconds);
if (disabledTime != null) {
xContentBuilder.field(DISABLED_TIME_FIELD, disabledTime.toEpochMilli());
}
return xContentBuilder.endObject();
}

Expand All @@ -85,6 +92,7 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException
Schedule schedule = null;
Boolean isEnabled = null;
Instant enabledTime = null;
Instant disabledTime = null;
Instant lastUpdateTime = null;
Long lockDurationSeconds = DEFAULT_AD_JOB_LOC_DURATION_SECONDS;

Expand All @@ -106,6 +114,9 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException
case ENABLED_TIME_FIELD:
enabledTime = ParseUtils.toInstant(parser);
break;
case DISABLED_TIME_FIELD:
disabledTime = ParseUtils.toInstant(parser);
break;
case LAST_UPDATE_TIME_FIELD:
lastUpdateTime = ParseUtils.toInstant(parser);
break;
Expand All @@ -117,7 +128,7 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException
break;
}
}
return new AnomalyDetectorJob(name, schedule, isEnabled, enabledTime, lastUpdateTime, lockDurationSeconds);
return new AnomalyDetectorJob(name, schedule, isEnabled, enabledTime, disabledTime, lastUpdateTime, lockDurationSeconds);
}

@Override
Expand All @@ -131,6 +142,7 @@ public boolean equals(Object o) {
&& Objects.equal(getSchedule(), that.getSchedule())
&& Objects.equal(isEnabled(), that.isEnabled())
&& Objects.equal(getEnabledTime(), that.getEnabledTime())
&& Objects.equal(getDisabledTime(), that.getDisabledTime())
&& Objects.equal(getLastUpdateTime(), that.getLastUpdateTime())
&& Objects.equal(getLockDurationSeconds(), that.getLockDurationSeconds());
}
Expand Down Expand Up @@ -160,6 +172,10 @@ public Instant getEnabledTime() {
return enabledTime;
}

public Instant getDisabledTime() {
return disabledTime;
}

@Override
public Instant getLastUpdateTime() {
return lastUpdateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
String rawPath = request.rawPath();

if (rawPath.endsWith(START_JOB)) {
handler.createAnomalyDetectorJob();
handler.startAnomalyDetectorJob();
} else if (rawPath.endsWith(STOP_JOB)) {
handler.deleteAnomalyDetectorJob(detectorId);
handler.stopAnomalyDetectorJob(detectorId);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
package com.amazon.opendistroforelasticsearch.ad.rest;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorActionHandler;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
Expand Down Expand Up @@ -69,10 +74,40 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
return channel -> {
logger.info("Delete anomaly detector {}", detectorId);
handler
.getDetectorJob(clusterService, client, detectorId, channel, () -> deleteAnomalyDetectorDoc(client, detectorId, channel));
.getDetectorJob(
clusterService,
client,
detectorId,
channel,
() -> deleteAnomalyDetectorJobDoc(client, detectorId, channel)
);
};
}

private void deleteAnomalyDetectorJobDoc(NodeClient client, String detectorId, RestChannel channel) {
logger.info("Delete anomaly detector job {}", detectorId);
DeleteRequest deleteRequest = new DeleteRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, detectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.delete(deleteRequest, ActionListener.wrap(response -> {
if (response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND) {
deleteAnomalyDetectorDoc(client, detectorId, channel);
} else {
logger.error("Fail to delete anomaly detector job {}", detectorId);
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
deleteAnomalyDetectorDoc(client, detectorId, channel);
} else {
logger.error("Failed to delete anomaly detector job", exception);
try {
channel.sendResponse(new BytesRestResponse(channel, exception));
} catch (IOException e) {
logger.error("Failed to send response of delete anomaly detector job exception", e);
}
}
}));
}

private void deleteAnomalyDetectorDoc(NodeClient client, String detectorId, RestChannel channel) {
logger.info("Delete anomaly detector {}", detectorId);
DeleteRequest deleteRequest = new DeleteRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX, detectorId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package com.amazon.opendistroforelasticsearch.ad.rest;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin;
import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -27,12 +27,8 @@
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
Expand All @@ -47,9 +43,9 @@
import java.util.Locale;

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ENABLED_FIELD;
import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.createXContentParser;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
Expand All @@ -74,26 +70,31 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String detectorId = request.param(DETECTOR_ID);
boolean returnJob = request.paramAsBoolean("job", false);
MultiGetRequest.Item adItem = new MultiGetRequest.Item(ANOMALY_DETECTORS_INDEX, detectorId)
.version(RestActions.parseVersion(request));
MultiGetRequest.Item adJobItem = new MultiGetRequest.Item(ANOMALY_DETECTOR_JOB_INDEX, detectorId)
.version(RestActions.parseVersion(request));
MultiGetRequest multiGetRequest = new MultiGetRequest().add(adItem).add(adJobItem);
return channel -> client.multiGet(multiGetRequest, onMultiGetResponse(channel));
MultiGetRequest multiGetRequest = new MultiGetRequest().add(adItem);
if (returnJob) {
MultiGetRequest.Item adJobItem = new MultiGetRequest.Item(ANOMALY_DETECTOR_JOB_INDEX, detectorId)
.version(RestActions.parseVersion(request));
multiGetRequest.add(adJobItem);
}

return channel -> client.multiGet(multiGetRequest, onMultiGetResponse(channel, returnJob, detectorId));
}

private ActionListener<MultiGetResponse> onMultiGetResponse(RestChannel channel) {
private ActionListener<MultiGetResponse> onMultiGetResponse(RestChannel channel, boolean returnJob, String detectorId) {
return new RestResponseListener<MultiGetResponse>(channel) {
@Override
public RestResponse buildResponse(MultiGetResponse multiGetResponse) throws Exception {
MultiGetItemResponse[] responses = multiGetResponse.getResponses();
AnomalyDetector detector = null;
XContentBuilder builder = null;
Boolean adJobEnabled = false;
AnomalyDetector detector = null;
AnomalyDetectorJob adJob = null;
for (MultiGetItemResponse response : responses) {
if (ANOMALY_DETECTORS_INDEX.equals(response.getIndex())) {
if (!response.getResponse().isExists()) {
return new BytesRestResponse(RestStatus.NOT_FOUND, channel.newBuilder());
if (response.getResponse() == null || !response.getResponse().isExists()) {
return new BytesRestResponse(RestStatus.NOT_FOUND, "Can't find detector with id: " + detectorId);
}
builder = channel
.newBuilder()
Expand All @@ -103,33 +104,44 @@ public RestResponse buildResponse(MultiGetResponse multiGetResponse) throws Exce
.field(RestHandlerUtils._PRIMARY_TERM, response.getResponse().getPrimaryTerm())
.field(RestHandlerUtils._SEQ_NO, response.getResponse().getSeqNo());
if (!response.getResponse().isSourceEmpty()) {
XContentParser parser = XContentHelper
.createParser(
channel.request().getXContentRegistry(),
LoggingDeprecationHandler.INSTANCE,
response.getResponse().getSourceAsBytesRef(),
XContentType.JSON
);
try {
try (
XContentParser parser = RestHandlerUtils
.createXContentParser(channel, response.getResponse().getSourceAsBytesRef())
) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
detector = parser.namedObject(AnomalyDetector.class, AnomalyDetector.PARSE_FIELD_NAME, null);
} catch (Throwable t) {
logger.error("Fail to parse detector", t);
throw t;
} finally {
parser.close();
return new BytesRestResponse(
RestStatus.INTERNAL_SERVER_ERROR,
"Failed to parse detector with id: " + detectorId
);
}
}
}

if (ANOMALY_DETECTOR_JOB_INDEX.equals(response.getIndex())) {
if (!response.isFailed() && response.getResponse().isExists()) {
adJobEnabled = true;
if (response.getResponse() != null
&& response.getResponse().isExists()
&& !response.getResponse().isSourceEmpty()) {
try (XContentParser parser = createXContentParser(channel, response.getResponse().getSourceAsBytesRef())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
adJob = AnomalyDetectorJob.parse(parser);
} catch (Throwable t) {
logger.error("Fail to parse detector job ", t);
return new BytesRestResponse(
RestStatus.INTERNAL_SERVER_ERROR,
"Failed to parse detector job with id: " + detectorId
);
}
}
}
}
ToXContent.Params params = new ToXContent.MapParams(ImmutableMap.of(ENABLED_FIELD, adJobEnabled.toString()));
builder.field(RestHandlerUtils.ANOMALY_DETECTOR, detector, params);

builder.field(RestHandlerUtils.ANOMALY_DETECTOR, detector);
if (returnJob) {
builder.field(RestHandlerUtils.ANOMALY_DETECTOR_JOB, adJob);
}
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@

package com.amazon.opendistroforelasticsearch.ad.rest.handler;

import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob;
import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;

import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* Common handler to process AD request.
Expand Down Expand Up @@ -57,7 +61,7 @@ public void getDetectorJob(
if (clusterService.state().getMetaData().indices().containsKey(ANOMALY_DETECTOR_JOB_INDEX)) {
GetRequest request = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX).id(detectorId);
client.get(request, ActionListener.wrap(response -> onGetAdJobResponseForWrite(response, channel, function), exception -> {
logger.error("Fail to search AD job using detector id" + detectorId, exception);
logger.error("Fail to get anomaly detector job: " + detectorId, exception);
try {
channel.sendResponse(new BytesRestResponse(channel, exception));
} catch (IOException e) {
Expand All @@ -74,8 +78,18 @@ private void onGetAdJobResponseForWrite(GetResponse response, RestChannel channe
String adJobId = response.getId();
if (adJobId != null) {
// check if AD job is running on the detector, if yes, we can't delete the detector
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Detector job is running: " + adJobId));
return;
try (XContentParser parser = RestHandlerUtils.createXContentParser(channel, response.getSourceAsBytesRef())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation);
AnomalyDetectorJob adJob = AnomalyDetectorJob.parse(parser);
if (adJob.isEnabled()) {
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Detector job is running: " + adJobId));
return;
}
} catch (IOException e) {
String message = "Failed to parse anomaly detector job " + adJobId;
logger.error(message, e);
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, message));
}
}
}
function.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public interface AnomalyDetectorFunction {

/**
* Performs this operation.
*
* Notes: don't forget to send back response via channel if you process response with this method.
*/
void execute();
}
Loading

0 comments on commit 57daff6

Please sign in to comment.