Skip to content

Commit

Permalink
Fix bug on EndRunException
Browse files Browse the repository at this point in the history
We throw an EndRunException (endNow = true) whenever there is an SearchPhaseExecutionException. EndRunException (endNow = true)  stops a detector immediately. But these query exceptions can happen during the starting phase of the OpenSearch process. To confirm the EndRunException is not a false positive, this PR fixed that by setting the stopNow parameter of these EndRunException to false.

Testing done:
* created unit tests for related changes.
  • Loading branch information
kaituo committed Jul 6, 2021
1 parent 43d46dc commit 5823dec
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 26 deletions.
5 changes: 2 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.util.ThrowingSupplierWrapper',
'org.opensearch.ad.transport.EntityResultTransportAction',
'org.opensearch.ad.transport.EntityResultTransportAction.*',
'org.opensearch.ad.transport.AnomalyResultTransportAction.*',
//'org.opensearch.ad.transport.AnomalyResultTransportAction.*',
'org.opensearch.ad.transport.ProfileNodeResponse',
'org.opensearch.ad.transport.ADResultBulkResponse',
'org.opensearch.ad.transport.AggregationType',
Expand All @@ -338,10 +338,9 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.util.BulkUtil',
'org.opensearch.ad.util.ExceptionUtil',
'org.opensearch.ad.feature.SearchFeatureDao',
'org.opensearch.ad.feature.CompositeRetriever.*',
//'org.opensearch.ad.feature.CompositeRetriever.*',
'org.opensearch.ad.feature.ScriptMaker',
'org.opensearch.ad.ml.EntityModel',
'org.opensearch.ad.caching.PriorityCache',
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,15 @@ public AnomalyResultTransportAction(
* + cold start cannot succeed
* + unknown prediction error
* + memory circuit breaker tripped
* + invalid search query
*
* Known causes of EndRunException with endNow returning true:
* + a model partition's memory size reached limit
* + models' total memory size reached limit
* + Having trouble querying feature data due to
* * index does not exist
* * all features have been disabled
* * invalid search query
*
* + anomaly detector is not available
* + AD plugin is disabled
* + training data is invalid due to serious internal bug(s)
Expand Down Expand Up @@ -456,7 +457,7 @@ private ActionListener<Optional<AnomalyDetector>> onGetDetector(
} catch (Exception e) {
listener
.onFailure(
new EndRunException(anomalyDetector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, true)
new EndRunException(anomalyDetector.getDetectorId(), CommonErrorMessages.INVALID_SEARCH_QUERY_MSG, e, false)
);
return;
}
Expand Down Expand Up @@ -622,21 +623,26 @@ private void handleQueryFailure(Exception exception, ActionListener<AnomalyResul

/**
* Convert a query related exception to EndRunException
*
* These query exception can happen during the starting phase of the OpenSearch
* process. Thus, set the stopNow parameter of these EndRunException to false
* and confirm the EndRunException is not a false positive.
*
* @param exception Exception
* @param adID detector Id
* @return the converted exception if the exception is query related
*/
private Exception convertedQueryFailureException(Exception exception, String adID) {
if (ExceptionUtil.isIndexNotAvailable(exception)) {
return new EndRunException(adID, TROUBLE_QUERYING_ERR_MSG + exception.getMessage(), true).countedInStats(false);
return new EndRunException(adID, TROUBLE_QUERYING_ERR_MSG + exception.getMessage(), false).countedInStats(false);
} else if (exception instanceof SearchPhaseExecutionException && invalidQuery((SearchPhaseExecutionException) exception)) {
// This is to catch invalid aggregation on wrong field type. For example,
// sum aggregation on text field. We should end detector run for such case.
return new EndRunException(
adID,
INVALID_SEARCH_QUERY_MSG + ((SearchPhaseExecutionException) exception).getDetailedMessage(),
INVALID_SEARCH_QUERY_MSG + " " + ((SearchPhaseExecutionException) exception).getDetailedMessage(),
exception,
true
false
).countedInStats(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.ml.EntityColdStarter;
import org.opensearch.ad.ml.EntityModel;
import org.opensearch.ad.ml.ModelManager;
import org.opensearch.ad.ml.ModelState;
Expand Down Expand Up @@ -102,7 +101,6 @@ public class EntityResultTransportAction extends HandledTransportAction<EntityRe
private AnomalyDetectionIndices indexUtil;
private ResultWriteWorker resultWriteQueue;
private CheckpointReadWorker checkpointReadQueue;
private EntityColdStarter coldStarter;
private ColdEntityWorker coldEntityQueue;
private ThreadPool threadPool;

Expand All @@ -117,7 +115,6 @@ public EntityResultTransportAction(
AnomalyDetectionIndices indexUtil,
ResultWriteWorker resultWriteQueue,
CheckpointReadWorker checkpointReadQueue,
EntityColdStarter coldStarer,
ColdEntityWorker coldEntityQueue,
ThreadPool threadPool
) {
Expand All @@ -129,7 +126,6 @@ public EntityResultTransportAction(
this.indexUtil = indexUtil;
this.resultWriteQueue = resultWriteQueue;
this.checkpointReadQueue = checkpointReadQueue;
this.coldStarter = coldStarer;
this.coldEntityQueue = coldEntityQueue;
this.threadPool = threadPool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;

import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -106,9 +105,6 @@
import org.opensearch.ad.stats.ADStats;
import org.opensearch.ad.stats.StatNames;
import org.opensearch.ad.stats.suppliers.CounterSupplier;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.IndexUtils;
import org.opensearch.ad.util.Throttler;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -274,11 +270,6 @@ public void setUp() throws Exception {
}).when(client).index(any(), any());

indexNameResolver = new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY));
Clock clock = mock(Clock.class);
Throttler throttler = new Throttler(clock);
ThreadPool threadpool = mock(ThreadPool.class);
ClientUtil clientUtil = new ClientUtil(Settings.EMPTY, client, throttler, threadpool);
IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameResolver);

Map<String, ADStat<?>> statsMap = new HashMap<String, ADStat<?>>() {
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ public void setUp() throws Exception {
indexUtil,
resultWriteQueue,
checkpointReadQueue,
coldStarter,
coldEntityQueue,
threadPool
);
Expand Down
100 changes: 96 additions & 4 deletions src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

package org.opensearch.ad.transport;

import static org.hamcrest.Matchers.containsString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
Expand All @@ -37,20 +38,28 @@
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
Expand All @@ -71,6 +80,7 @@
import org.opensearch.ad.ml.ModelPartitioner;
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.IntervalTimeConfiguration;
import org.opensearch.ad.ratelimit.CheckpointReadWorker;
import org.opensearch.ad.ratelimit.ColdEntityWorker;
import org.opensearch.ad.ratelimit.ResultWriteWorker;
Expand All @@ -92,7 +102,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.test.ClusterServiceUtils;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -169,7 +178,8 @@ public void setUp() throws Exception {

settings = Settings.builder().put(AnomalyDetectorSettings.COOLDOWN_MINUTES.getKey(), TimeValue.timeValueMinutes(5)).build();

request = new AnomalyResultRequest(detectorId, 100, 200);
// make sure end time is larger enough than Clock.systemUTC().millis() to get PageIterator.hasNext() to pass
request = new AnomalyResultRequest(detectorId, 100, Clock.systemUTC().millis() + 100_000);

transportService = mock(TransportService.class);

Expand Down Expand Up @@ -234,7 +244,7 @@ public void setUp() throws Exception {
adCircuitBreakerService,
adStats,
mockThreadPool,
NamedXContentRegistry.EMPTY
xContentRegistry()
);

provider = mock(CacheProvider.class);
Expand Down Expand Up @@ -339,11 +349,93 @@ private void setUpEntityResult() {
indexUtil,
resultWriteQueue,
checkpointReadQueue,
coldStarer,
coldEntityQueue,
threadPool
);

when(normalModelManager.score(any(), anyString(), any())).thenReturn(new ThresholdingResult(0, 1, 1));
}

/**
* Test query error causes EndRunException but not end now
* @throws InterruptedException when the await are interrupted
* @throws IOException when failing to create anomaly detector
*/
@SuppressWarnings("unchecked")
public void testQueryErrorEndRunNotNow() throws InterruptedException, IOException {
ClientUtil clientUtil = mock(ClientUtil.class);

AnomalyDetector detector = TestHelpers
.randomAnomalyDetectorWithInterval(new IntervalTimeConfiguration(1, ChronoUnit.MINUTES), true, true);
doAnswer(invocation -> {
ActionListener<GetResponse> listener = invocation.getArgument(2);
listener.onResponse(TestHelpers.createGetResponse(detector, detectorId, AnomalyDetector.ANOMALY_DETECTORS_INDEX));
return null;
}).when(clientUtil).asyncRequest(any(GetRequest.class), any(), any(ActionListener.class));

ModelPartitioner modelPartitioner = mock(ModelPartitioner.class);
stateManager = new NodeStateManager(
client,
xContentRegistry(),
settings,
clientUtil,
clock,
AnomalyDetectorSettings.HOURLY_MAINTENANCE,
modelPartitioner
);

action = new AnomalyResultTransportAction(
new ActionFilters(Collections.emptySet()),
transportService,
settings,
client,
stateManager,
featureQuery,
normalModelManager,
normalModelPartitioner,
hashRing,
clusterService,
indexNameResolver,
adCircuitBreakerService,
adStats,
mockThreadPool,
xContentRegistry()
);

final CountDownLatch inProgressLatch = new CountDownLatch(1);

String allShardsFailedMsg = "all shards failed";
// make PageIterator.next return failure
doAnswer(invocation -> {
ActionListener<SearchResponse> listener = invocation.getArgument(1);
listener
.onFailure(
new SearchPhaseExecutionException(
"search",
allShardsFailedMsg,
new ShardSearchFailure[] { new ShardSearchFailure(new IllegalArgumentException("blah")) }
)
);
inProgressLatch.countDown();
return null;
}).when(client).search(any(), any());

PlainActionFuture<AnomalyResultResponse> listener = new PlainActionFuture<>();

action.doExecute(null, request, listener);

AnomalyResultResponse response = listener.actionGet(10000L);
assertEquals(Double.NaN, response.getAnomalyGrade(), 0.001);

assertTrue(inProgressLatch.await(10000L, TimeUnit.MILLISECONDS));

PlainActionFuture<AnomalyResultResponse> listener2 = new PlainActionFuture<>();
action.doExecute(null, request, listener2);
Exception e = expectThrows(EndRunException.class, () -> listener2.actionGet(10000L));
// wrapped INVALID_SEARCH_QUERY_MSG around SearchPhaseExecutionException by convertedQueryFailureException
assertThat("actual message: " + e.getMessage(), e.getMessage(), containsString(CommonErrorMessages.INVALID_SEARCH_QUERY_MSG));
assertThat("actual message: " + e.getMessage(), e.getMessage(), containsString(allShardsFailedMsg));
// not end now
assertTrue(!((EndRunException) e).isEndNow());
}
}

0 comments on commit 5823dec

Please sign in to comment.