From ef2f7d95113e133b50144d54678d5502af6329db Mon Sep 17 00:00:00 2001 From: Yaliang Wu Date: Fri, 28 Feb 2020 15:51:30 -0800 Subject: [PATCH 1/4] support return AD job when get detector --- .../ad/model/AnomalyDetector.java | 4 - .../ad/model/AnomalyDetectorJob.java | 18 +- .../ad/rest/RestAnomalyDetectorJobAction.java | 4 +- .../rest/RestDeleteAnomalyDetectorAction.java | 37 ++- .../ad/rest/RestGetAnomalyDetectorAction.java | 72 ++--- .../handler/AnomalyDetectorActionHandler.java | 20 +- .../IndexAnomalyDetectorJobActionHandler.java | 253 ++++++++++-------- .../ad/util/RestHandlerUtils.java | 14 + .../ad/AnomalyDetectorRestTestCase.java | 58 ++-- .../ad/TestHelpers.java | 1 + .../ad/rest/AnomalyDetectorRestApiIT.java | 99 ++++++- .../ad/util/RestHandlerUtilsTests.java | 17 ++ 12 files changed, 413 insertions(+), 184 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java index 41cbf66e..7167a8d9 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetector.java @@ -69,7 +69,6 @@ public class AnomalyDetector implements ToXContentObject { private static final String WINDOW_DELAY_FIELD = "window_delay"; private static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; public static final String UI_METADATA_FIELD = "ui_metadata"; - public static final String ENABLED_FIELD = "enabled"; private final String detectorId; private final Long version; @@ -161,9 +160,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field(DETECTION_INTERVAL_FIELD, detectionInterval) .field(WINDOW_DELAY_FIELD, windowDelay) .field(SCHEMA_VERSION_FIELD, schemaVersion); - if (params.param(ENABLED_FIELD) != null) { - xContentBuilder.field(ENABLED_FIELD, params.paramAsBoolean(ENABLED_FIELD, false)); - } if (uiMetadata != null && !uiMetadata.isEmpty()) { xContentBuilder.field(UI_METADATA_FIELD, uiMetadata); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorJob.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorJob.java index fb5ebc4c..8861e14e 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorJob.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/AnomalyDetectorJob.java @@ -43,11 +43,13 @@ public class AnomalyDetectorJob implements ToXContentObject, ScheduledJobParamet private static final String SCHEDULE_FIELD = "schedule"; private static final String IS_ENABLED_FIELD = "enabled"; private static final String ENABLED_TIME_FIELD = "enabled_time"; + private static final String DISABLED_TIME_FIELD = "disabled_time"; private final String name; private final Schedule schedule; private final Boolean isEnabled; private final Instant enabledTime; + private final Instant disabledTime; private final Instant lastUpdateTime; private final Long lockDurationSeconds; @@ -56,6 +58,7 @@ public AnomalyDetectorJob( Schedule schedule, Boolean isEnabled, Instant enabledTime, + Instant disabledTime, Instant lastUpdateTime, Long lockDurationSeconds ) { @@ -63,6 +66,7 @@ public AnomalyDetectorJob( this.schedule = schedule; this.isEnabled = isEnabled; this.enabledTime = enabledTime; + this.disabledTime = disabledTime; this.lastUpdateTime = lastUpdateTime; this.lockDurationSeconds = lockDurationSeconds; } @@ -77,6 +81,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws .field(ENABLED_TIME_FIELD, enabledTime.toEpochMilli()) .field(LAST_UPDATE_TIME_FIELD, lastUpdateTime.toEpochMilli()) .field(LOCK_DURATION_SECONDS, lockDurationSeconds); + if (disabledTime != null) { + xContentBuilder.field(DISABLED_TIME_FIELD, disabledTime.toEpochMilli()); + } return xContentBuilder.endObject(); } @@ -85,6 +92,7 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException Schedule schedule = null; Boolean isEnabled = null; Instant enabledTime = null; + Instant disabledTime = null; Instant lastUpdateTime = null; Long lockDurationSeconds = DEFAULT_AD_JOB_LOC_DURATION_SECONDS; @@ -106,6 +114,9 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException case ENABLED_TIME_FIELD: enabledTime = ParseUtils.toInstant(parser); break; + case DISABLED_TIME_FIELD: + disabledTime = ParseUtils.toInstant(parser); + break; case LAST_UPDATE_TIME_FIELD: lastUpdateTime = ParseUtils.toInstant(parser); break; @@ -117,7 +128,7 @@ public static AnomalyDetectorJob parse(XContentParser parser) throws IOException break; } } - return new AnomalyDetectorJob(name, schedule, isEnabled, enabledTime, lastUpdateTime, lockDurationSeconds); + return new AnomalyDetectorJob(name, schedule, isEnabled, enabledTime, disabledTime, lastUpdateTime, lockDurationSeconds); } @Override @@ -131,6 +142,7 @@ public boolean equals(Object o) { && Objects.equal(getSchedule(), that.getSchedule()) && Objects.equal(isEnabled(), that.isEnabled()) && Objects.equal(getEnabledTime(), that.getEnabledTime()) + && Objects.equal(getDisabledTime(), that.getDisabledTime()) && Objects.equal(getLastUpdateTime(), that.getLastUpdateTime()) && Objects.equal(getLockDurationSeconds(), that.getLockDurationSeconds()); } @@ -160,6 +172,10 @@ public Instant getEnabledTime() { return enabledTime; } + public Instant getDisabledTime() { + return disabledTime; + } + @Override public Instant getLastUpdateTime() { return lastUpdateTime; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java index 761d7200..cd702781 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestAnomalyDetectorJobAction.java @@ -110,9 +110,9 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli String rawPath = request.rawPath(); if (rawPath.endsWith(START_JOB)) { - handler.createAnomalyDetectorJob(); + handler.startAnomalyDetectorJob(); } else if (rawPath.endsWith(STOP_JOB)) { - handler.deleteAnomalyDetectorJob(detectorId); + handler.stopAnomalyDetectorJob(detectorId); } }; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java index 70068a55..dc1a75ca 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestDeleteAnomalyDetectorAction.java @@ -16,15 +16,20 @@ package com.amazon.opendistroforelasticsearch.ad.rest; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.rest.handler.AnomalyDetectorActionHandler; import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; @@ -69,10 +74,40 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli return channel -> { logger.info("Delete anomaly detector {}", detectorId); handler - .getDetectorJob(clusterService, client, detectorId, channel, () -> deleteAnomalyDetectorDoc(client, detectorId, channel)); + .getDetectorJob( + clusterService, + client, + detectorId, + channel, + () -> deleteAnomalyDetectorJobDoc(client, detectorId, channel) + ); }; } + private void deleteAnomalyDetectorJobDoc(NodeClient client, String detectorId, RestChannel channel) { + logger.info("Delete anomaly detector job {}", detectorId); + DeleteRequest deleteRequest = new DeleteRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, detectorId) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client.delete(deleteRequest, ActionListener.wrap(response -> { + if (response.getResult() == DocWriteResponse.Result.DELETED || response.getResult() == DocWriteResponse.Result.NOT_FOUND) { + deleteAnomalyDetectorDoc(client, detectorId, channel); + } else { + logger.error("Fail to delete anomaly detector job {}", detectorId); + } + }, exception -> { + if (exception instanceof IndexNotFoundException) { + deleteAnomalyDetectorDoc(client, detectorId, channel); + } else { + logger.error("Failed to delete anomaly detector job", exception); + try { + channel.sendResponse(new BytesRestResponse(channel, exception)); + } catch (IOException e) { + logger.error("Failed to send response of delete anomaly detector job exception", e); + } + } + })); + } + private void deleteAnomalyDetectorDoc(NodeClient client, String detectorId, RestChannel channel) { logger.info("Delete anomaly detector {}", detectorId); DeleteRequest deleteRequest = new DeleteRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX, detectorId) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java index 8ec079fc..82582ad7 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/RestGetAnomalyDetectorAction.java @@ -16,9 +16,9 @@ package com.amazon.opendistroforelasticsearch.ad.rest; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; -import com.google.common.collect.ImmutableMap; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -27,12 +27,8 @@ import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; @@ -47,9 +43,9 @@ import java.util.Locale; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; -import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ENABLED_FIELD; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.DETECTOR_ID; +import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.createXContentParser; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** @@ -74,26 +70,31 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { String detectorId = request.param(DETECTOR_ID); + boolean returnJob = request.paramAsBoolean("job", false); MultiGetRequest.Item adItem = new MultiGetRequest.Item(ANOMALY_DETECTORS_INDEX, detectorId) .version(RestActions.parseVersion(request)); - MultiGetRequest.Item adJobItem = new MultiGetRequest.Item(ANOMALY_DETECTOR_JOB_INDEX, detectorId) - .version(RestActions.parseVersion(request)); - MultiGetRequest multiGetRequest = new MultiGetRequest().add(adItem).add(adJobItem); - return channel -> client.multiGet(multiGetRequest, onMultiGetResponse(channel)); + MultiGetRequest multiGetRequest = new MultiGetRequest().add(adItem); + if (returnJob) { + MultiGetRequest.Item adJobItem = new MultiGetRequest.Item(ANOMALY_DETECTOR_JOB_INDEX, detectorId) + .version(RestActions.parseVersion(request)); + multiGetRequest.add(adJobItem); + } + + return channel -> client.multiGet(multiGetRequest, onMultiGetResponse(channel, returnJob, detectorId)); } - private ActionListener onMultiGetResponse(RestChannel channel) { + private ActionListener onMultiGetResponse(RestChannel channel, boolean returnJob, String detectorId) { return new RestResponseListener(channel) { @Override public RestResponse buildResponse(MultiGetResponse multiGetResponse) throws Exception { MultiGetItemResponse[] responses = multiGetResponse.getResponses(); - AnomalyDetector detector = null; XContentBuilder builder = null; - Boolean adJobEnabled = false; + AnomalyDetector detector = null; + AnomalyDetectorJob adJob = null; for (MultiGetItemResponse response : responses) { if (ANOMALY_DETECTORS_INDEX.equals(response.getIndex())) { - if (!response.getResponse().isExists()) { - return new BytesRestResponse(RestStatus.NOT_FOUND, channel.newBuilder()); + if (response.getResponse() == null || !response.getResponse().isExists()) { + return new BytesRestResponse(RestStatus.NOT_FOUND, "Can't find detector with id: " + detectorId); } builder = channel .newBuilder() @@ -103,33 +104,44 @@ public RestResponse buildResponse(MultiGetResponse multiGetResponse) throws Exce .field(RestHandlerUtils._PRIMARY_TERM, response.getResponse().getPrimaryTerm()) .field(RestHandlerUtils._SEQ_NO, response.getResponse().getSeqNo()); if (!response.getResponse().isSourceEmpty()) { - XContentParser parser = XContentHelper - .createParser( - channel.request().getXContentRegistry(), - LoggingDeprecationHandler.INSTANCE, - response.getResponse().getSourceAsBytesRef(), - XContentType.JSON - ); - try { + try ( + XContentParser parser = RestHandlerUtils + .createXContentParser(channel, response.getResponse().getSourceAsBytesRef()) + ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); detector = parser.namedObject(AnomalyDetector.class, AnomalyDetector.PARSE_FIELD_NAME, null); } catch (Throwable t) { logger.error("Fail to parse detector", t); - throw t; - } finally { - parser.close(); + return new BytesRestResponse( + RestStatus.INTERNAL_SERVER_ERROR, + "Failed to parse detector with id: " + detectorId + ); } } } if (ANOMALY_DETECTOR_JOB_INDEX.equals(response.getIndex())) { - if (!response.isFailed() && response.getResponse().isExists()) { - adJobEnabled = true; + if (response.getResponse() != null + && response.getResponse().isExists() + && !response.getResponse().isSourceEmpty()) { + try (XContentParser parser = createXContentParser(channel, response.getResponse().getSourceAsBytesRef())) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + adJob = AnomalyDetectorJob.parse(parser); + } catch (Throwable t) { + logger.error("Fail to parse detector job ", t); + return new BytesRestResponse( + RestStatus.INTERNAL_SERVER_ERROR, + "Failed to parse detector job with id: " + detectorId + ); + } } } } - ToXContent.Params params = new ToXContent.MapParams(ImmutableMap.of(ENABLED_FIELD, adJobEnabled.toString())); - builder.field(RestHandlerUtils.ANOMALY_DETECTOR, detector, params); + + builder.field(RestHandlerUtils.ANOMALY_DETECTOR, detector); + if (returnJob) { + builder.field(RestHandlerUtils.ANOMALY_DETECTOR_JOB, adJob); + } builder.endObject(); return new BytesRestResponse(RestStatus.OK, builder); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AnomalyDetectorActionHandler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AnomalyDetectorActionHandler.java index 5e44b510..5aa5e013 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AnomalyDetectorActionHandler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AnomalyDetectorActionHandler.java @@ -15,6 +15,8 @@ package com.amazon.opendistroforelasticsearch.ad.rest.handler; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; +import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -22,6 +24,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestStatus; @@ -29,6 +32,7 @@ import java.io.IOException; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; +import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; /** * Common handler to process AD request. @@ -57,7 +61,7 @@ public void getDetectorJob( if (clusterService.state().getMetaData().indices().containsKey(ANOMALY_DETECTOR_JOB_INDEX)) { GetRequest request = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX).id(detectorId); client.get(request, ActionListener.wrap(response -> onGetAdJobResponseForWrite(response, channel, function), exception -> { - logger.error("Fail to search AD job using detector id" + detectorId, exception); + logger.error("Fail to get anomaly detector job: " + detectorId, exception); try { channel.sendResponse(new BytesRestResponse(channel, exception)); } catch (IOException e) { @@ -74,8 +78,18 @@ private void onGetAdJobResponseForWrite(GetResponse response, RestChannel channe String adJobId = response.getId(); if (adJobId != null) { // check if AD job is running on the detector, if yes, we can't delete the detector - channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Detector job is running: " + adJobId)); - return; + try (XContentParser parser = RestHandlerUtils.createXContentParser(channel, response.getSourceAsBytesRef())) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + AnomalyDetectorJob adJob = AnomalyDetectorJob.parse(parser); + if (adJob.isEnabled()) { + channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Detector job is running: " + adJobId)); + return; + } + } catch (IOException e) { + String message = "Failed to parse anomaly detector job " + adJobId; + logger.error(message, e); + channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, message)); + } } } function.execute(); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index e97a1cdf..516286f2 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -15,7 +15,6 @@ package com.amazon.opendistroforelasticsearch.ad.rest.handler; -import com.amazon.opendistroforelasticsearch.ad.AnomalyDetectorPlugin; import com.amazon.opendistroforelasticsearch.ad.indices.AnomalyDetectionIndices; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; @@ -29,9 +28,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; @@ -40,23 +37,19 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.rest.action.RestResponseListener; import java.io.IOException; import java.time.Duration; import java.time.Instant; -import java.util.Locale; import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE; +import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.createXContentParser; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -110,11 +103,13 @@ public IndexAnomalyDetectorJobActionHandler( } /** - * Create anomaly detector job. + * Start anomaly detector job. + * 1.If job not exists, create new job. + * 2.If job exists: a). if job enabled, return error message; b). if job disabled, enable job. * * @throws IOException IOException from {@link AnomalyDetectionIndices#getAnomalyDetectorJobMappings} */ - public void createAnomalyDetectorJob() throws IOException { + public void startAnomalyDetectorJob() throws IOException { if (!anomalyDetectionIndices.doesAnomalyDetectorJobIndexExist()) { anomalyDetectionIndices .initAnomalyDetectorJobIndex( @@ -125,6 +120,19 @@ public void createAnomalyDetectorJob() throws IOException { } } + private void onCreateMappingsResponse(CreateIndexResponse response) throws IOException { + if (response.isAcknowledged()) { + logger.info("Created {} with mappings.", ANOMALY_DETECTORS_INDEX); + prepareAnomalyDetectorJobIndexing(); + } else { + logger.warn("Created {} with mappings call not acknowledged.", ANOMALY_DETECTORS_INDEX); + channel + .sendResponse( + new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS)) + ); + } + } + private void prepareAnomalyDetectorJobIndexing() { GetRequest getRequest = new GetRequest(AnomalyDetector.ANOMALY_DETECTORS_INDEX).id(detectorId); client.get(getRequest, ActionListener.wrap(response -> onGetAnomalyDetectorResponse(response), exception -> onFailure(exception))); @@ -140,147 +148,154 @@ private void onGetAnomalyDetectorResponse(GetResponse response) throws IOExcepti channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, response.toXContent(builder, EMPTY_PARAMS))); return; } - XContentParser parser = XContentType.JSON - .xContent() - .createParser( - channel.request().getXContentRegistry(), - LoggingDeprecationHandler.INSTANCE, - response.getSourceAsBytesRef().streamInput() - ); + try (XContentParser parser = RestHandlerUtils.createXContentParser(channel, response.getSourceAsBytesRef())) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion()); - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); - AnomalyDetector detector = AnomalyDetector.parse(parser, response.getId(), response.getVersion()); + IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getDetectionInterval(); + Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit()); + Duration duration = Duration.of(interval.getInterval(), interval.getUnit()); - IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getDetectionInterval(); - Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit()); - Duration duration = Duration.of(interval.getInterval(), interval.getUnit()); - AnomalyDetectorJob job = new AnomalyDetectorJob( - detector.getDetectorId(), - schedule, - true, - Instant.now(), - Instant.now(), - duration.getSeconds() - ); + AnomalyDetectorJob job = new AnomalyDetectorJob( + detector.getDetectorId(), + schedule, + true, + Instant.now(), + null, + Instant.now(), + duration.getSeconds() + ); - getAnomalyDetectorJob(job); + getAnomalyDetectorJobForWrite(job); + } catch (IOException e) { + String message = "Failed to parse anomaly detector job " + detectorId; + logger.error(message, e); + channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, message)); + } } - private void getAnomalyDetectorJob(AnomalyDetectorJob job) { + private void getAnomalyDetectorJobForWrite(AnomalyDetectorJob job) { GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId); - client.get(getRequest, ActionListener.wrap(response -> onGetAnomalyDetectorJob(response, job), exception -> onFailure(exception))); + client + .get( + getRequest, + ActionListener.wrap(response -> onGetAnomalyDetectorJobForWrite(response, job), exception -> onFailure(exception)) + ); } - private void onGetAnomalyDetectorJob(GetResponse response, AnomalyDetectorJob job) throws IOException { + private void onGetAnomalyDetectorJobForWrite(GetResponse response, AnomalyDetectorJob job) throws IOException { if (response.isExists()) { - XContentBuilder builder = channel - .newErrorBuilder() - .startObject() - .field("Message", "AnomalyDetectorJob exists: " + detectorId) - .endObject(); - channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, response.toXContent(builder, EMPTY_PARAMS))); - return; + try (XContentParser parser = createXContentParser(channel, response.getSourceAsBytesRef())) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + AnomalyDetectorJob currentAdJob = AnomalyDetectorJob.parse(parser); + if (currentAdJob.isEnabled()) { + channel.sendResponse(new BytesRestResponse(RestStatus.OK, "Anomaly detector job is already running: " + detectorId)); + return; + } else { + AnomalyDetectorJob newJob = new AnomalyDetectorJob( + job.getName(), + job.getSchedule(), + job.isEnabled(), + Instant.now(), + currentAdJob.getDisabledTime(), + Instant.now(), + job.getLockDurationSeconds() + ); + indexAnomalyDetectorJob(newJob, null); + } + } catch (IOException e) { + String message = "Failed to parse anomaly detector job " + job.getName(); + logger.error(message, e); + channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, message)); + } + } else { + indexAnomalyDetectorJob(job, null); } - - indexAnomalyDetectorJob(job); } - private void indexAnomalyDetectorJob(AnomalyDetectorJob job) throws IOException { + private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunction function) throws IOException { IndexRequest indexRequest = new IndexRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX) - .setRefreshPolicy(refreshPolicy) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(job.toXContent(channel.newBuilder(), XCONTENT_WITH_TYPE)) .setIfSeqNo(seqNo) .setIfPrimaryTerm(primaryTerm) .timeout(requestTimeout) .id(detectorId); - client.index(indexRequest, indexAnomalyDetectorJobResponse()); - } - - private ActionListener indexAnomalyDetectorJobResponse() { - return new RestResponseListener(channel) { - @Override - public RestResponse buildResponse(IndexResponse response) throws Exception { - if (response.getShardInfo().getSuccessful() < 1) { - return new BytesRestResponse(response.status(), response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS)); - } - - XContentBuilder builder = channel - .newBuilder() - .startObject() - .field(RestHandlerUtils._ID, response.getId()) - .field(RestHandlerUtils._VERSION, response.getVersion()) - .field(RestHandlerUtils._SEQ_NO, response.getSeqNo()) - .field(RestHandlerUtils._PRIMARY_TERM, response.getPrimaryTerm()) - .endObject(); - - BytesRestResponse restResponse = new BytesRestResponse(response.status(), builder); - if (response.status() == RestStatus.CREATED) { - String location = String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.AD_BASE_URI, response.getId()); - restResponse.addHeader("Location", location); - } - return restResponse; - } - }; + client + .index( + indexRequest, + ActionListener.wrap(response -> onIndexAnomalyDetectorJobResponse(response, function), exception -> onFailure(exception)) + ); } - private void onCreateMappingsResponse(CreateIndexResponse response) throws IOException { - if (response.isAcknowledged()) { - logger.info("Created {} with mappings.", ANOMALY_DETECTORS_INDEX); - prepareAnomalyDetectorJobIndexing(); + private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) throws IOException { + if (response.getShardInfo().getSuccessful() < 1) { + channel.sendResponse(new BytesRestResponse(response.status(), response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS))); + } + if (function != null) { + function.execute(); } else { - logger.warn("Created {} with mappings call not acknowledged.", ANOMALY_DETECTORS_INDEX); - channel - .sendResponse( - new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS)) - ); + XContentBuilder builder = channel + .newBuilder() + .startObject() + .field(RestHandlerUtils._ID, response.getId()) + .field(RestHandlerUtils._VERSION, response.getVersion()) + .field(RestHandlerUtils._SEQ_NO, response.getSeqNo()) + .field(RestHandlerUtils._PRIMARY_TERM, response.getPrimaryTerm()) + .endObject(); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); } } /** - * Delete anomaly detector job + * Stop anomaly detector job. + * 1.If job not exists, return error message + * 2.If job exists: a).if job state is disabled, return error message; b).if job state is enabled, disable job. + * * @param detectorId detector identifier */ - public void deleteAnomalyDetectorJob(String detectorId) { + public void stopAnomalyDetectorJob(String detectorId) { GetRequest getRequest = new GetRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX).id(detectorId); - client - .get( - getRequest, - ActionListener - .wrap( - response -> deleteAnomalyDetectorJobDoc(client, detectorId, channel, refreshPolicy), - exception -> onFailure(exception) - ) - ); - } - - private void deleteAnomalyDetectorJobDoc( - NodeClient client, - String detectorId, - RestChannel channel, - WriteRequest.RefreshPolicy refreshPolicy - ) { - logger.info("Delete anomaly detector job {}", detectorId); - DeleteRequest deleteRequest = new DeleteRequest(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, detectorId) - .setRefreshPolicy(refreshPolicy); - client.delete(deleteRequest, ActionListener.wrap(response -> { - if (response.getResult() == DocWriteResponse.Result.DELETED) { - logger.info("Stop anomaly detector {}", detectorId); - StopDetectorRequest stopDetectorRequest = new StopDetectorRequest(detectorId); - client.execute(StopDetectorAction.INSTANCE, stopDetectorRequest, stopAdDetectorListener(channel, detectorId)); - } else if (response.getResult() == DocWriteResponse.Result.NOT_FOUND) { - logger.info("Anomaly detector job not found: {}", detectorId); - StopDetectorRequest stopDetectorRequest = new StopDetectorRequest(detectorId); - client.execute(StopDetectorAction.INSTANCE, stopDetectorRequest, stopAdDetectorListener(channel, detectorId)); + client.get(getRequest, ActionListener.wrap(response -> { + if (response.isExists()) { + try (XContentParser parser = createXContentParser(channel, response.getSourceAsBytesRef())) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); + AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser); + if (!job.isEnabled()) { + channel + .sendResponse(new BytesRestResponse(RestStatus.OK, "Anomaly detector job is already stopped: " + detectorId)); + return; + } else { + AnomalyDetectorJob newJob = new AnomalyDetectorJob( + job.getName(), + job.getSchedule(), + false, + job.getEnabledTime(), + Instant.now(), + Instant.now(), + job.getLockDurationSeconds() + ); + indexAnomalyDetectorJob( + newJob, + () -> client + .execute( + StopDetectorAction.INSTANCE, + new StopDetectorRequest(detectorId), + stopAdDetectorListener(channel, detectorId) + ) + ); + } + } catch (IOException e) { + String message = "Failed to parse anomaly detector job " + detectorId; + logger.error(message, e); + channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, message)); + } } else { - channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Failed to stop AD job " + detectorId)); + channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Anomaly detector job not exist: " + detectorId)); } - }, exception -> { - logger.error("Failed to stop AD job " + detectorId, exception); - onFailure(exception); - })); - + }, exception -> onFailure(exception))); } private ActionListener stopAdDetectorListener(RestChannel channel, String detectorId) { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtils.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtils.java index 8beffdbe..7416c0a3 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtils.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtils.java @@ -18,10 +18,18 @@ import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.google.common.collect.ImmutableMap; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import java.io.IOException; + /** * Utility functions for REST handlers. */ @@ -36,6 +44,7 @@ public final class RestHandlerUtils { public static final String REFRESH = "refresh"; public static final String DETECTOR_ID = "detectorID"; public static final String ANOMALY_DETECTOR = "anomaly_detector"; + public static final String ANOMALY_DETECTOR_JOB = "anomaly_detector_job"; public static final String RUN = "_run"; public static final String PREVIEW = "_preview"; public static final String START_JOB = "_start"; @@ -62,4 +71,9 @@ public static FetchSourceContext getSourceContext(RestRequest request) { return null; } } + + public static XContentParser createXContentParser(RestChannel channel, BytesReference bytesReference) throws IOException { + return XContentHelper + .createParser(channel.request().getXContentRegistry(), LoggingDeprecationHandler.INSTANCE, bytesReference, XContentType.JSON); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRestTestCase.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRestTestCase.java index 5eaa85df..2d71c522 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRestTestCase.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorRestTestCase.java @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.ad; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.http.HttpEntity; @@ -106,13 +107,28 @@ protected AnomalyDetector createAnomalyDetector(AnomalyDetector detector, Boolea } public AnomalyDetector getAnomalyDetector(String detectorId) throws IOException { - BasicHeader header = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"); - return getAnomalyDetector(detectorId, header); + return (AnomalyDetector) getAnomalyDetector(detectorId, false)[0]; } public AnomalyDetector getAnomalyDetector(String detectorId, BasicHeader header) throws IOException { + return (AnomalyDetector) getAnomalyDetector(detectorId, header, false)[0]; + } + + public ToXContentObject[] getAnomalyDetector(String detectorId, boolean returnJob) throws IOException { + BasicHeader header = new BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + return getAnomalyDetector(detectorId, header, returnJob); + } + + public ToXContentObject[] getAnomalyDetector(String detectorId, BasicHeader header, boolean returnJob) throws IOException { Response response = TestHelpers - .makeRequest(client(), "GET", TestHelpers.AD_BASE_DETECTORS_URI + "/" + detectorId, null, "", ImmutableList.of(header)); + .makeRequest( + client(), + "GET", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + detectorId + "?job=" + returnJob, + null, + "", + ImmutableList.of(header) + ); assertEquals("Unable to get anomaly detector " + detectorId, RestStatus.OK, restStatus(response)); XContentParser parser = createAdParser(XContentType.JSON.xContent(), response.getEntity().getContent()); XContentParser.Token token = parser.nextToken(); @@ -121,6 +137,7 @@ public AnomalyDetector getAnomalyDetector(String detectorId, BasicHeader header) String id = null; Long version = null; AnomalyDetector detector = null; + AnomalyDetectorJob detectorJob = null; while (parser.nextToken() != XContentParser.Token.END_OBJECT) { String fieldName = parser.currentName(); parser.nextToken(); @@ -134,24 +151,29 @@ public AnomalyDetector getAnomalyDetector(String detectorId, BasicHeader header) case "anomaly_detector": detector = AnomalyDetector.parse(parser); break; + case "anomaly_detector_job": + detectorJob = AnomalyDetectorJob.parse(parser); + break; } } - return new AnomalyDetector( - id, - version, - detector.getName(), - detector.getDescription(), - detector.getTimeField(), - detector.getIndices(), - detector.getFeatureAttributes(), - detector.getFilterQuery(), - detector.getDetectionInterval(), - detector.getWindowDelay(), - detector.getUiMetadata(), - detector.getSchemaVersion(), - detector.getLastUpdateTime() - ); + return new ToXContentObject[] { + new AnomalyDetector( + id, + version, + detector.getName(), + detector.getDescription(), + detector.getTimeField(), + detector.getIndices(), + detector.getFeatureAttributes(), + detector.getFilterQuery(), + detector.getDetectionInterval(), + detector.getWindowDelay(), + detector.getUiMetadata(), + detector.getSchemaVersion(), + detector.getLastUpdateTime() + ), + detectorJob }; } protected HttpEntity toHttpEntity(ToXContentObject object) throws IOException { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index e02d5bdd..03263265 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -256,6 +256,7 @@ public static AnomalyDetectorJob randomAnomalyDetectorJob() { true, Instant.now().truncatedTo(ChronoUnit.SECONDS), Instant.now().truncatedTo(ChronoUnit.SECONDS), + Instant.now().truncatedTo(ChronoUnit.SECONDS), 60L ); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/AnomalyDetectorRestApiIT.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/AnomalyDetectorRestApiIT.java index d7659d4f..6892428c 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/AnomalyDetectorRestApiIT.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/rest/AnomalyDetectorRestApiIT.java @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.ad.TestHelpers; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorExecutionInput; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; import com.google.common.collect.ImmutableMap; import org.apache.http.entity.ContentType; @@ -27,6 +28,7 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -339,7 +341,7 @@ public void testDeleteAnomalyDetectorWithRunningAdJob() throws Exception { null ); - assertEquals("Fail to start AD job", RestStatus.CREATED, restStatus(startAdJobResponse)); + assertEquals("Fail to start AD job", RestStatus.OK, restStatus(startAdJobResponse)); TestHelpers .assertFailWith( @@ -370,7 +372,7 @@ public void testUpdateAnomalyDetectorWithRunningAdJob() throws Exception { null ); - assertEquals("Fail to start AD job", RestStatus.CREATED, restStatus(startAdJobResponse)); + assertEquals("Fail to start AD job", RestStatus.OK, restStatus(startAdJobResponse)); String newDescription = randomAlphaOfLength(5); @@ -406,6 +408,31 @@ public void testUpdateAnomalyDetectorWithRunningAdJob() throws Exception { ); } + public void testGetDetectorWithAdJob() throws IOException { + AnomalyDetector detector = createRandomAnomalyDetector(true, false); + + Response startAdJobResponse = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + detector.getDetectorId() + "/_start", + ImmutableMap.of(), + "", + null + ); + + assertEquals("Fail to start AD job", RestStatus.OK, restStatus(startAdJobResponse)); + + ToXContentObject[] results = getAnomalyDetector(detector.getDetectorId(), true); + assertEquals("Incorrect Location header", detector, results[0]); + assertEquals("Incorrect detector job name", detector.getDetectorId(), ((AnomalyDetectorJob) results[1]).getName()); + assertTrue(((AnomalyDetectorJob) results[1]).isEnabled()); + + results = getAnomalyDetector(detector.getDetectorId(), false); + assertEquals("Incorrect Location header", detector, results[0]); + assertEquals("Should not return detector job", null, results[1]); + } + public void testStartAdJobWithExistingDetector() throws Exception { AnomalyDetector detector = createRandomAnomalyDetector(true, false); @@ -419,7 +446,19 @@ public void testStartAdJobWithExistingDetector() throws Exception { null ); - assertEquals("Fail to start AD job", RestStatus.CREATED, restStatus(startAdJobResponse)); + assertEquals("Fail to start AD job", RestStatus.OK, restStatus(startAdJobResponse)); + + startAdJobResponse = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + detector.getDetectorId() + "/_start", + ImmutableMap.of(), + "", + null + ); + + assertEquals("Fail to start AD job", RestStatus.OK, restStatus(startAdJobResponse)); } public void testStartAdJobWithNonexistingDetectorIndex() throws Exception { @@ -468,7 +507,7 @@ public void testStopAdJob() throws Exception { "", null ); - assertEquals("Fail to start AD job", RestStatus.CREATED, restStatus(startAdJobResponse)); + assertEquals("Fail to start AD job", RestStatus.OK, restStatus(startAdJobResponse)); Response stopAdJobResponse = TestHelpers .makeRequest( @@ -480,6 +519,17 @@ public void testStopAdJob() throws Exception { null ); assertEquals("Fail to stop AD job", RestStatus.OK, restStatus(stopAdJobResponse)); + + stopAdJobResponse = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + detector.getDetectorId() + "/_stop", + ImmutableMap.of(), + "", + null + ); + assertEquals("Fail to stop AD job", RestStatus.OK, restStatus(stopAdJobResponse)); } public void testStopNonExistingAdJobIndex() throws Exception { @@ -510,12 +560,12 @@ public void testStopNonExistingAdJob() throws Exception { "", null ); - assertEquals("Fail to start AD job", RestStatus.CREATED, restStatus(startAdJobResponse)); + assertEquals("Fail to start AD job", RestStatus.OK, restStatus(startAdJobResponse)); TestHelpers .assertFailWith( ResponseException.class, - "Failed to stop AD job", + "Anomaly detector job not exist", () -> TestHelpers .makeRequest( client(), @@ -528,4 +578,41 @@ public void testStopNonExistingAdJob() throws Exception { ); } + public void testStartDisabledAdjob() throws IOException { + AnomalyDetector detector = createRandomAnomalyDetector(true, false); + Response startAdJobResponse = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + detector.getDetectorId() + "/_start", + ImmutableMap.of(), + "", + null + ); + assertEquals("Fail to start AD job", RestStatus.OK, restStatus(startAdJobResponse)); + + Response stopAdJobResponse = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + detector.getDetectorId() + "/_stop", + ImmutableMap.of(), + "", + null + ); + assertEquals("Fail to stop AD job", RestStatus.OK, restStatus(stopAdJobResponse)); + + startAdJobResponse = TestHelpers + .makeRequest( + client(), + "POST", + TestHelpers.AD_BASE_DETECTORS_URI + "/" + detector.getDetectorId() + "/_start", + ImmutableMap.of(), + "", + null + ); + + assertEquals("Fail to start AD job", RestStatus.OK, restStatus(startAdJobResponse)); + } + } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtilsTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtilsTests.java index 959302e6..a1304737 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtilsTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/RestHandlerUtilsTests.java @@ -17,12 +17,21 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestChannel; import org.elasticsearch.test.rest.FakeRestRequest; +import java.io.IOException; + +import static com.amazon.opendistroforelasticsearch.ad.TestHelpers.builder; + public class RestHandlerUtilsTests extends ESTestCase { public void testGetSourceContext() { @@ -38,4 +47,12 @@ public void testGetSourceContextForKibana() { assertNull(context); } + public void testCreateXContentParser() throws IOException { + RestRequest request = new FakeRestRequest(); + RestChannel channel = new FakeRestChannel(request, false, 1); + XContentBuilder builder = builder().startObject().field("test", "value").endObject(); + BytesReference bytesReference = BytesReference.bytes(builder); + XContentParser parser = RestHandlerUtils.createXContentParser(channel, bytesReference); + parser.close(); + } } From 7090b9e96ef11b27a19158ab2aa4c9d62e8a4a68 Mon Sep 17 00:00:00 2001 From: Yaliang Wu Date: Tue, 3 Mar 2020 18:11:06 -0800 Subject: [PATCH 2/4] fix comments --- build.gradle | 1 - .../ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 94117ebf..15f6b18b 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,6 @@ buildscript { ext { - es_mv = '7.2' es_group = "org.elasticsearch" es_version = '7.4.2' es_distribution = 'oss-zip' diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index 516286f2..0eaf0ac7 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -232,6 +232,7 @@ private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunc private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) throws IOException { if (response.getShardInfo().getSuccessful() < 1) { channel.sendResponse(new BytesRestResponse(response.status(), response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS))); + return; } if (function != null) { function.execute(); From 8a50ffd8f43893f7880a2d18ee5c9975afdd92f2 Mon Sep 17 00:00:00 2001 From: Yaliang Wu Date: Wed, 4 Mar 2020 15:30:35 -0800 Subject: [PATCH 3/4] handle null index response --- .../ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index 0eaf0ac7..2e94d614 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -50,6 +50,8 @@ import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE; import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.createXContentParser; +import static org.elasticsearch.action.DocWriteResponse.Result.CREATED; +import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -230,7 +232,7 @@ private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunc } private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) throws IOException { - if (response.getShardInfo().getSuccessful() < 1) { + if (response == null || (response.getResult() != CREATED && response.getResult() != UPDATED)) { channel.sendResponse(new BytesRestResponse(response.status(), response.toXContent(channel.newErrorBuilder(), EMPTY_PARAMS))); return; } From b5e8396af18f44519f877abb35b9b71757dffe16 Mon Sep 17 00:00:00 2001 From: Yaliang Wu Date: Thu, 5 Mar 2020 20:27:00 -0800 Subject: [PATCH 4/4] add comment on ad function interface --- .../ad/rest/handler/AnomalyDetectorFunction.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AnomalyDetectorFunction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AnomalyDetectorFunction.java index 1e9f494b..4f0d12fd 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AnomalyDetectorFunction.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/rest/handler/AnomalyDetectorFunction.java @@ -20,6 +20,8 @@ public interface AnomalyDetectorFunction { /** * Performs this operation. + * + * Notes: don't forget to send back response via channel if you process response with this method. */ void execute(); }