diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultAction.java new file mode 100644 index 00000000..c85c5384 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultAction.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedResponse; + +public class EntityResultAction extends ActionType { + public static final EntityResultAction INSTANCE = new EntityResultAction(); + public static final String NAME = "cluster:admin/opendistro/ad/entity/result"; + + private EntityResultAction() { + super(NAME, AcknowledgedResponse::new); + } + +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultRequest.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultRequest.java new file mode 100644 index 00000000..1057a190 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultRequest.java @@ -0,0 +1,110 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +import java.io.IOException; +import java.util.Locale; +import java.util.Map; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonMessageAttributes; + +public class EntityResultRequest extends ActionRequest implements ToXContentObject { + + private String detectorId; + private Map entities; + private long start; + private long end; + + public EntityResultRequest(StreamInput in) throws IOException { + super(in); + this.detectorId = in.readString(); + this.entities = in.readMap(StreamInput::readString, StreamInput::readDoubleArray); + this.start = in.readLong(); + this.end = in.readLong(); + } + + public EntityResultRequest(String detectorId, Map entities, long start, long end) { + super(); + this.detectorId = detectorId; + this.entities = entities; + this.start = start; + this.end = end; + } + + public String getDetectorId() { + return this.detectorId; + } + + public Map getEntities() { + return this.entities; + } + + public long getStart() { + return this.start; + } + + public long getEnd() { + return this.end; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(this.detectorId); + out.writeMap(this.entities, StreamOutput::writeString, StreamOutput::writeDoubleArray); + out.writeLong(this.start); + out.writeLong(this.end); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (Strings.isEmpty(detectorId)) { + validationException = addValidationError(CommonErrorMessages.AD_ID_MISSING_MSG, validationException); + } + if (start <= 0 || end <= 0 || start > end) { + validationException = addValidationError( + String.format(Locale.ROOT, "%s: start %d, end %d", CommonErrorMessages.INVALID_TIMESTAMP_ERR_MSG, start, end), + validationException + ); + } + return validationException; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(CommonMessageAttributes.ID_JSON_KEY, detectorId); + builder.field(CommonMessageAttributes.START_JSON_KEY, start); + builder.field(CommonMessageAttributes.END_JSON_KEY, end); + for (String entity : entities.keySet()) { + builder.field(entity, entities.get(entity)); + } + builder.endObject(); + return builder; + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java new file mode 100644 index 00000000..4db5c0d0 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportAction.java @@ -0,0 +1,192 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.COOLDOWN_MINUTES; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Map.Entry; +import java.util.Optional; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +import com.amazon.opendistroforelasticsearch.ad.NodeStateManager; +import com.amazon.opendistroforelasticsearch.ad.breaker.ADCircuitBreakerService; +import com.amazon.opendistroforelasticsearch.ad.caching.CacheProvider; +import com.amazon.opendistroforelasticsearch.ad.caching.EntityCache; +import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; +import com.amazon.opendistroforelasticsearch.ad.common.exception.LimitExceededException; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; +import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; +import com.amazon.opendistroforelasticsearch.ad.ml.EntityModel; +import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; +import com.amazon.opendistroforelasticsearch.ad.ml.ModelState; +import com.amazon.opendistroforelasticsearch.ad.ml.ThresholdingResult; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyResult; +import com.amazon.opendistroforelasticsearch.ad.model.Entity; +import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.ad.transport.handler.MultitiEntityResultHandler; +import com.amazon.opendistroforelasticsearch.ad.util.ParseUtils; + +public class EntityResultTransportAction extends HandledTransportAction { + + private static final Logger LOG = LogManager.getLogger(EntityResultTransportAction.class); + private ModelManager manager; + private ADCircuitBreakerService adCircuitBreakerService; + private MultitiEntityResultHandler anomalyResultHandler; + private CheckpointDao checkpointDao; + private EntityCache cache; + private final NodeStateManager stateManager; + private final int coolDownMinutes; + private final Clock clock; + + @Inject + public EntityResultTransportAction( + ActionFilters actionFilters, + TransportService transportService, + ModelManager manager, + ADCircuitBreakerService adCircuitBreakerService, + MultitiEntityResultHandler anomalyResultHandler, + CheckpointDao checkpointDao, + CacheProvider entityCache, + NodeStateManager stateManager, + Settings settings, + Clock clock + ) { + super(EntityResultAction.NAME, transportService, actionFilters, EntityResultRequest::new); + this.manager = manager; + this.adCircuitBreakerService = adCircuitBreakerService; + this.anomalyResultHandler = anomalyResultHandler; + this.checkpointDao = checkpointDao; + this.cache = entityCache; + this.stateManager = stateManager; + this.coolDownMinutes = (int) (COOLDOWN_MINUTES.get(settings).getMinutes()); + this.clock = clock; + } + + @Override + protected void doExecute(Task task, EntityResultRequest request, ActionListener listener) { + if (adCircuitBreakerService.isOpen()) { + listener.onFailure(new LimitExceededException(request.getDetectorId(), CommonErrorMessages.MEMORY_CIRCUIT_BROKEN_ERR_MSG)); + return; + } + + try { + String detectorId = request.getDetectorId(); + stateManager.getAnomalyDetector(detectorId, onGetDetector(listener, detectorId, request)); + } catch (Exception exception) { + LOG.error("fail to get entity's anomaly grade", exception); + listener.onFailure(exception); + } + + } + + private ActionListener> onGetDetector( + ActionListener listener, + String detectorId, + EntityResultRequest request + ) { + return ActionListener.wrap(detectorOptional -> { + if (!detectorOptional.isPresent()) { + listener.onFailure(new EndRunException(detectorId, "AnomalyDetector is not available.", true)); + return; + } + + AnomalyDetector detector = detectorOptional.get(); + // we only support 1 categorical field now + String categoricalField = detector.getCategoryField().get(0); + + ADResultBulkRequest currentBulkRequest = new ADResultBulkRequest(); + // index pressure is high. Only save anomalies + boolean onlySaveAnomalies = stateManager + .getLastIndexThrottledTime() + .plus(Duration.ofMinutes(coolDownMinutes)) + .isAfter(clock.instant()); + + Instant executionStartTime = Instant.now(); + for (Entry entity : request.getEntities().entrySet()) { + String entityName = entity.getKey(); + // For ES, the limit of the document ID is 512 bytes. + // skip an entity if the entity's name is more than 256 characters + // since we are using it as part of document id. + if (entityName.length() > AnomalyDetectorSettings.MAX_ENTITY_LENGTH) { + continue; + } + + double[] datapoint = entity.getValue(); + String modelId = manager.getEntityModelId(detectorId, entityName); + ModelState entityModel = cache.get(modelId, detector, datapoint, entityName); + if (entityModel == null) { + // cache miss + continue; + } + ThresholdingResult result = manager.getAnomalyResultForEntity(detectorId, datapoint, entityName, entityModel, modelId); + // result.getRcfScore() = 0 means the model is not initialized + // result.getGrade() = 0 means it is not an anomaly + // So many EsRejectedExecutionException if we write no matter what + if (result.getRcfScore() > 0 && (!onlySaveAnomalies || result.getGrade() > 0)) { + this.anomalyResultHandler + .write( + new AnomalyResult( + detectorId, + result.getRcfScore(), + result.getGrade(), + result.getConfidence(), + ParseUtils.getFeatureData(datapoint, detector), + Instant.ofEpochMilli(request.getStart()), + Instant.ofEpochMilli(request.getEnd()), + executionStartTime, + Instant.now(), + null, + Arrays.asList(new Entity(categoricalField, entityName)) + ), + currentBulkRequest + ); + } + } + this.anomalyResultHandler.flush(currentBulkRequest, detectorId); + // bulk all accumulated checkpoint requests + this.checkpointDao.flush(); + + listener.onResponse(new AcknowledgedResponse(true)); + }, exception -> { + LOG.error( + new ParameterizedMessage( + "fail to get entity's anomaly grade for detector [{}]: start: [{}], end: [{}]", + detectorId, + request.getStart(), + request.getEnd() + ), + exception + ); + listener.onFailure(exception); + }); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractADTest.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractADTest.java index ded7bffb..9d15ccd0 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractADTest.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractADTest.java @@ -16,6 +16,10 @@ package com.amazon.opendistroforelasticsearch.ad; import static org.hamcrest.Matchers.containsString; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; @@ -23,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -34,8 +39,11 @@ import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.layout.PatternLayout; import org.apache.logging.log4j.util.StackLocatorUtil; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.AliasMetadata; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -318,4 +326,40 @@ public HttpRequest releaseAndCopy() { }, null); } + + protected boolean areEqualWithArrayValue(Map first, Map second) { + if (first.size() != second.size()) { + return false; + } + + return first.entrySet().stream().allMatch(e -> Arrays.equals(e.getValue(), second.get(e.getKey()))); + } + + protected IndexMetadata indexMeta(String name, long creationDate, String... aliases) { + IndexMetadata.Builder builder = IndexMetadata + .builder(name) + .settings( + Settings + .builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.version.created", Version.CURRENT.id) + ); + builder.creationDate(creationDate); + for (String alias : aliases) { + builder.putAlias(AliasMetadata.builder(alias).build()); + } + return builder.build(); + } + + protected void setUpADThreadPool(ThreadPool mockThreadPool) { + ExecutorService executorService = mock(ExecutorService.class); + + when(mockThreadPool.executor(AnomalyDetectorPlugin.AD_THREAD_POOL_NAME)).thenReturn(executorService); + doAnswer(invocation -> { + Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportActionTests.java new file mode 100644 index 00000000..1fd5acd9 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/EntityResultTransportActionTests.java @@ -0,0 +1,289 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.transport; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.startsWith; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.time.Clock; +import java.time.Instant; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.transport.TransportService; +import org.junit.Before; + +import test.com.amazon.opendistroforelasticsearch.ad.util.JsonDeserializer; + +import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.NodeStateManager; +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.breaker.ADCircuitBreakerService; +import com.amazon.opendistroforelasticsearch.ad.caching.CacheProvider; +import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException; +import com.amazon.opendistroforelasticsearch.ad.common.exception.JsonPathNotFoundException; +import com.amazon.opendistroforelasticsearch.ad.common.exception.LimitExceededException; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonMessageAttributes; +import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; +import com.amazon.opendistroforelasticsearch.ad.ml.EntityModel; +import com.amazon.opendistroforelasticsearch.ad.ml.ModelManager; +import com.amazon.opendistroforelasticsearch.ad.ml.ModelState; +import com.amazon.opendistroforelasticsearch.ad.ml.ThresholdingResult; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.ad.transport.handler.MultitiEntityResultHandler; + +public class EntityResultTransportActionTests extends AbstractADTest { + EntityResultTransportAction entityResult; + ActionFilters actionFilters; + TransportService transportService; + ModelManager manager; + ADCircuitBreakerService adCircuitBreakerService; + MultitiEntityResultHandler anomalyResultHandler; + CheckpointDao checkpointDao; + CacheProvider entityCache; + NodeStateManager stateManager; + Settings settings; + Clock clock; + EntityResultRequest request; + String detectorId; + long timeoutMs; + AnomalyDetector detector; + String cacheMissEntity; + String cacheHitEntity; + long start; + long end; + Map entities; + double[] cacheMissData; + double[] cacheHitData; + String tooLongEntity; + double[] tooLongData; + + @SuppressWarnings("unchecked") + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + actionFilters = mock(ActionFilters.class); + transportService = mock(TransportService.class); + + adCircuitBreakerService = mock(ADCircuitBreakerService.class); + when(adCircuitBreakerService.isOpen()).thenReturn(false); + + anomalyResultHandler = mock(MultitiEntityResultHandler.class); + checkpointDao = mock(CheckpointDao.class); + + detectorId = "123"; + entities = new HashMap<>(); + + cacheMissEntity = "0.0.0.1"; + cacheMissData = new double[] { 0.1 }; + cacheHitEntity = "0.0.0.2"; + cacheHitData = new double[] { 0.2 }; + entities.put(cacheMissEntity, cacheMissData); + entities.put(cacheHitEntity, cacheHitData); + tooLongEntity = randomAlphaOfLength(AnomalyDetectorSettings.MAX_ENTITY_LENGTH + 1); + tooLongData = new double[] { 0.3 }; + entities.put(tooLongEntity, tooLongData); + start = 10L; + end = 20L; + request = new EntityResultRequest(detectorId, entities, start, end); + + manager = mock(ModelManager.class); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + // return entity name + return args[1]; + }).when(manager).getEntityModelId(anyString(), anyString()); + when(manager.getAnomalyResultForEntity(anyString(), any(), anyString(), any(), anyString())) + .thenReturn(new ThresholdingResult(1, 1, 1)); + + entityCache = mock(CacheProvider.class); + when(entityCache.get(eq(cacheMissEntity), any(), any(), anyString())).thenReturn(null); + + ModelState state = mock(ModelState.class); + when(entityCache.get(eq(cacheHitEntity), any(), any(), anyString())).thenReturn(state); + + String field = "a"; + detector = TestHelpers.randomAnomalyDetectorUsingCategoryFields(detectorId, Arrays.asList(field)); + stateManager = mock(NodeStateManager.class); + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onResponse(Optional.of(detector)); + return null; + }).when(stateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + when(stateManager.getLastIndexThrottledTime()).thenReturn(Instant.MIN); + + settings = Settings.builder().put(AnomalyDetectorSettings.COOLDOWN_MINUTES.getKey(), TimeValue.timeValueMinutes(5)).build(); + clock = mock(Clock.class); + when(clock.instant()).thenReturn(Instant.now()); + + entityResult = new EntityResultTransportAction( + actionFilters, + transportService, + manager, + adCircuitBreakerService, + anomalyResultHandler, + checkpointDao, + entityCache, + stateManager, + settings, + clock + ); + + // timeout in 60 seconds + timeoutMs = 60000L; + } + + public void testCircuitBreakerOpen() { + when(adCircuitBreakerService.isOpen()).thenReturn(true); + PlainActionFuture future = PlainActionFuture.newFuture(); + + entityResult.doExecute(null, request, future); + + expectThrows(LimitExceededException.class, () -> future.actionGet(timeoutMs)); + } + + public void testNormal() { + PlainActionFuture future = PlainActionFuture.newFuture(); + + entityResult.doExecute(null, request, future); + + future.actionGet(timeoutMs); + + verify(anomalyResultHandler, times(1)).write(any(), any()); + } + + // test get detector failure + @SuppressWarnings("unchecked") + public void testFailtoGetDetector() { + doAnswer(invocation -> { + ActionListener> listener = invocation.getArgument(1); + listener.onResponse(Optional.empty()); + return null; + }).when(stateManager).getAnomalyDetector(any(String.class), any(ActionListener.class)); + + PlainActionFuture future = PlainActionFuture.newFuture(); + + entityResult.doExecute(null, request, future); + + expectThrows(EndRunException.class, () -> future.actionGet(timeoutMs)); + } + + // test index pressure high, anomaly grade is 0 + public void testIndexPressureHigh() { + when(manager.getAnomalyResultForEntity(anyString(), any(), anyString(), any(), anyString())) + .thenReturn(new ThresholdingResult(0, 1, 1)); + when(stateManager.getLastIndexThrottledTime()).thenReturn(Instant.now()); + + PlainActionFuture future = PlainActionFuture.newFuture(); + + entityResult.doExecute(null, request, future); + + future.actionGet(timeoutMs); + + verify(anomalyResultHandler, never()).write(any(), any()); + } + + // test rcf score is 0 + public void testNotInitialized() { + when(manager.getAnomalyResultForEntity(anyString(), any(), anyString(), any(), anyString())) + .thenReturn(new ThresholdingResult(0, 0, 0)); + + PlainActionFuture future = PlainActionFuture.newFuture(); + + entityResult.doExecute(null, request, future); + + future.actionGet(timeoutMs); + + verify(anomalyResultHandler, never()).write(any(), any()); + } + + public void testSerialzationRequest() throws IOException { + BytesStreamOutput output = new BytesStreamOutput(); + request.writeTo(output); + + StreamInput streamInput = output.bytes().streamInput(); + EntityResultRequest readRequest = new EntityResultRequest(streamInput); + assertThat(detectorId, equalTo(readRequest.getDetectorId())); + assertThat(start, equalTo(readRequest.getStart())); + assertThat(end, equalTo(readRequest.getEnd())); + assertTrue(areEqualWithArrayValue(entities, readRequest.getEntities())); + } + + public void testValidRequest() { + ActionRequestValidationException e = request.validate(); + assertThat(e, equalTo(null)); + } + + public void testEmptyId() { + request = new EntityResultRequest("", entities, start, end); + ActionRequestValidationException e = request.validate(); + assertThat(e.validationErrors(), hasItem(CommonErrorMessages.AD_ID_MISSING_MSG)); + } + + public void testReverseTime() { + request = new EntityResultRequest(detectorId, entities, end, start); + ActionRequestValidationException e = request.validate(); + assertThat(e.validationErrors(), hasItem(startsWith(CommonErrorMessages.INVALID_TIMESTAMP_ERR_MSG))); + } + + public void testNegativeTime() { + request = new EntityResultRequest(detectorId, entities, start, -end); + ActionRequestValidationException e = request.validate(); + assertThat(e.validationErrors(), hasItem(startsWith(CommonErrorMessages.INVALID_TIMESTAMP_ERR_MSG))); + } + + public void testJsonResponse() throws IOException, JsonPathNotFoundException { + XContentBuilder builder = jsonBuilder(); + request.toXContent(builder, ToXContent.EMPTY_PARAMS); + + String json = Strings.toString(builder); + assertEquals(JsonDeserializer.getTextValue(json, CommonMessageAttributes.ID_JSON_KEY), detectorId); + assertEquals(JsonDeserializer.getLongValue(json, CommonMessageAttributes.START_JSON_KEY), start); + assertEquals(JsonDeserializer.getLongValue(json, CommonMessageAttributes.END_JSON_KEY), end); + assertEquals(0, Double.compare(JsonDeserializer.getArrayValue(json, cacheMissEntity).get(0).getAsDouble(), cacheMissData[0])); + assertEquals(0, Double.compare(JsonDeserializer.getArrayValue(json, cacheHitEntity).get(0).getAsDouble(), cacheHitData[0])); + assertEquals(0, Double.compare(JsonDeserializer.getArrayValue(json, tooLongEntity).get(0).getAsDouble(), tooLongData[0])); + } +}