From 5823dec366591f17b1559658c0faa6b20d9c6390 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Tue, 6 Jul 2021 14:51:31 -0700 Subject: [PATCH] Fix bug on EndRunException We throw an EndRunException (endNow = true) whenever there is an SearchPhaseExecutionException. EndRunException (endNow = true) stops a detector immediately. But these query exceptions can happen during the starting phase of the OpenSearch process. To confirm the EndRunException is not a false positive, this PR fixed that by setting the stopNow parameter of these EndRunException to false. Testing done: * created unit tests for related changes. --- build.gradle | 5 +- .../AnomalyResultTransportAction.java | 16 ++- .../EntityResultTransportAction.java | 4 - .../ad/transport/AnomalyResultTests.java | 9 -- .../EntityResultTransportActionTests.java | 1 - .../ad/transport/MultiEntityResultTests.java | 100 +++++++++++++++++- 6 files changed, 109 insertions(+), 26 deletions(-) diff --git a/build.gradle b/build.gradle index b1be11e14..6a6761fd4 100644 --- a/build.gradle +++ b/build.gradle @@ -329,7 +329,7 @@ List jacocoExclusions = [ 'org.opensearch.ad.util.ThrowingSupplierWrapper', 'org.opensearch.ad.transport.EntityResultTransportAction', 'org.opensearch.ad.transport.EntityResultTransportAction.*', - 'org.opensearch.ad.transport.AnomalyResultTransportAction.*', + //'org.opensearch.ad.transport.AnomalyResultTransportAction.*', 'org.opensearch.ad.transport.ProfileNodeResponse', 'org.opensearch.ad.transport.ADResultBulkResponse', 'org.opensearch.ad.transport.AggregationType', @@ -338,10 +338,9 @@ List jacocoExclusions = [ 'org.opensearch.ad.util.BulkUtil', 'org.opensearch.ad.util.ExceptionUtil', 'org.opensearch.ad.feature.SearchFeatureDao', - 'org.opensearch.ad.feature.CompositeRetriever.*', + //'org.opensearch.ad.feature.CompositeRetriever.*', 'org.opensearch.ad.feature.ScriptMaker', 'org.opensearch.ad.ml.EntityModel', - 'org.opensearch.ad.caching.PriorityCache', ] jacocoTestCoverageVerification { diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java index 059047ead..521c05e0d 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyResultTransportAction.java @@ -231,6 +231,7 @@ public AnomalyResultTransportAction( * + cold start cannot succeed * + unknown prediction error * + memory circuit breaker tripped + * + invalid search query * * Known causes of EndRunException with endNow returning true: * + a model partition's memory size reached limit @@ -238,7 +239,7 @@ public AnomalyResultTransportAction( * + Having trouble querying feature data due to * * index does not exist * * all features have been disabled - * * invalid search query + * * + anomaly detector is not available * + AD plugin is disabled * + training data is invalid due to serious internal bug(s) @@ -456,7 +457,7 @@ private ActionListener> onGetDetector( } catch (Exception e) { listener .onFailure( - new EndRunException(anomalyDetector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, true) + new EndRunException(anomalyDetector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, false) ); return; } @@ -622,21 +623,26 @@ private void handleQueryFailure(Exception exception, ActionListener> statsMap = new HashMap>() { { diff --git a/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java index a49480a3b..c74ab847c 100644 --- a/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java @@ -256,7 +256,6 @@ public void setUp() throws Exception { indexUtil, resultWriteQueue, checkpointReadQueue, - coldStarter, coldEntityQueue, threadPool ); diff --git a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java index 5ff455941..c35ab699e 100644 --- a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java @@ -26,6 +26,7 @@ package org.opensearch.ad.transport; +import static org.hamcrest.Matchers.containsString; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; @@ -37,6 +38,7 @@ import java.io.IOException; import java.time.Clock; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -44,6 +46,8 @@ import java.util.NoSuchElementException; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.AfterClass; @@ -51,6 +55,11 @@ import org.junit.BeforeClass; import org.opensearch.Version; import org.opensearch.action.ActionListener; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.ShardSearchFailure; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.action.support.master.AcknowledgedResponse; @@ -71,6 +80,7 @@ import org.opensearch.ad.ml.ModelPartitioner; import org.opensearch.ad.ml.ThresholdingResult; import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.model.IntervalTimeConfiguration; import org.opensearch.ad.ratelimit.CheckpointReadWorker; import org.opensearch.ad.ratelimit.ColdEntityWorker; import org.opensearch.ad.ratelimit.ResultWriteWorker; @@ -92,7 +102,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.ThreadPool; @@ -169,7 +178,8 @@ public void setUp() throws Exception { settings = Settings.builder().put(AnomalyDetectorSettings.COOLDOWN_MINUTES.getKey(), TimeValue.timeValueMinutes(5)).build(); - request = new AnomalyResultRequest(detectorId, 100, 200); + // make sure end time is larger enough than Clock.systemUTC().millis() to get PageIterator.hasNext() to pass + request = new AnomalyResultRequest(detectorId, 100, Clock.systemUTC().millis() + 100_000); transportService = mock(TransportService.class); @@ -234,7 +244,7 @@ public void setUp() throws Exception { adCircuitBreakerService, adStats, mockThreadPool, - NamedXContentRegistry.EMPTY + xContentRegistry() ); provider = mock(CacheProvider.class); @@ -339,11 +349,93 @@ private void setUpEntityResult() { indexUtil, resultWriteQueue, checkpointReadQueue, - coldStarer, coldEntityQueue, threadPool ); when(normalModelManager.score(any(), anyString(), any())).thenReturn(new ThresholdingResult(0, 1, 1)); } + + /** + * Test query error causes EndRunException but not end now + * @throws InterruptedException when the await are interrupted + * @throws IOException when failing to create anomaly detector + */ + @SuppressWarnings("unchecked") + public void testQueryErrorEndRunNotNow() throws InterruptedException, IOException { + ClientUtil clientUtil = mock(ClientUtil.class); + + AnomalyDetector detector = TestHelpers + .randomAnomalyDetectorWithInterval(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES), true, true); + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(2); + listener.onResponse(TestHelpers.createGetResponse(detector, detectorId, AnomalyDetector.ANOMALY_DETECTORS_INDEX)); + return null; + }).when(clientUtil).asyncRequest(any(GetRequest.class), any(), any(ActionListener.class)); + + ModelPartitioner modelPartitioner = mock(ModelPartitioner.class); + stateManager = new NodeStateManager( + client, + xContentRegistry(), + settings, + clientUtil, + clock, + AnomalyDetectorSettings.HOURLY_MAINTENANCE, + modelPartitioner + ); + + action = new AnomalyResultTransportAction( + new ActionFilters(Collections.emptySet()), + transportService, + settings, + client, + stateManager, + featureQuery, + normalModelManager, + normalModelPartitioner, + hashRing, + clusterService, + indexNameResolver, + adCircuitBreakerService, + adStats, + mockThreadPool, + xContentRegistry() + ); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + String allShardsFailedMsg = "all shards failed"; + // make PageIterator.next return failure + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + listener + .onFailure( + new SearchPhaseExecutionException( + "search", + allShardsFailedMsg, + new ShardSearchFailure[] { new ShardSearchFailure(new IllegalArgumentException("blah")) } + ) + ); + inProgressLatch.countDown(); + return null; + }).when(client).search(any(), any()); + + PlainActionFuture listener = new PlainActionFuture<>(); + + action.doExecute(null, request, listener); + + AnomalyResultResponse response = listener.actionGet(10000L); + assertEquals(Double.NaN, response.getAnomalyGrade(), 0.001); + + assertTrue(inProgressLatch.await(10000L, TimeUnit.MILLISECONDS)); + + PlainActionFuture listener2 = new PlainActionFuture<>(); + action.doExecute(null, request, listener2); + Exception e = expectThrows(EndRunException.class, () -> listener2.actionGet(10000L)); + // wrapped INVALID_SEARCH_QUERY_MSG around SearchPhaseExecutionException by convertedQueryFailureException + assertThat("actual message: " + e.getMessage(), e.getMessage(), containsString(CommonErrorMessages.INVALID_SEARCH_QUERY_MSG)); + assertThat("actual message: " + e.getMessage(), e.getMessage(), containsString(allShardsFailedMsg)); + // not end now + assertTrue(!((EndRunException) e).isEndNow()); + } }