Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not blocking detector creation on unknown feature validation error #1366

Merged
merged 4 commits into from
Nov 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ buildscript {
js_resource_folder = "src/test/resources/job-scheduler"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
bwcVersionShort = "2.18.0"
bwcVersionShort = "2.19.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ protected void validateConfigFeatures(String id, boolean indexingDryRun, ActionL
feature.getId()
);
ssb.aggregation(internalAgg.getAggregatorFactories().iterator().next());
ssb.trackTotalHits(false);
SearchRequest searchRequest = new SearchRequest().indices(config.getIndices().toArray(new String[0])).source(ssb);
ActionListener<SearchResponse> searchResponseListener = ActionListener.wrap(response -> {
Optional<double[]> aggFeatureResult = searchFeatureDao.parseResponse(response, Arrays.asList(feature.getId()), false);
Expand All @@ -905,13 +906,19 @@ protected void validateConfigFeatures(String id, boolean indexingDryRun, ActionL
}
}, e -> {
String errorMessage;
if (isExceptionCausedByInvalidQuery(e)) {
if (isExceptionCausedByInvalidQuery(e) || e instanceof TimeSeriesException) {
errorMessage = CommonMessages.FEATURE_WITH_INVALID_QUERY_MSG + feature.getName();
logger.error(errorMessage, e);
multiFeatureQueriesResponseListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST, e));
} else {
errorMessage = CommonMessages.UNKNOWN_SEARCH_QUERY_EXCEPTION_MSG + feature.getName();
logger.error(errorMessage, e);
// If we see an unexpected error such as timeout or some task cancellation cause of search backpressure
// we don't want to block detector creation as this is unlikely an error due to wrong configs
// but we want to record what error was seen
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
multiFeatureQueriesResponseListener
.onResponse(new MergeableList<>(new ArrayList<>(Collections.singletonList(Optional.empty()))));
}
logger.error(errorMessage, e);
multiFeatureQueriesResponseListener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST, e));
});
clientUtil.asyncRequestWithInjectedSecurity(searchRequest, client::search, user, client, context, searchResponseListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.rest.RestRequest;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -207,7 +208,7 @@ public void testMoreThanTenThousandSingleEntityDetectors() throws IOException, I

// extend NodeClient since its execute method is final and mockito does not allow to mock final methods
// we can also use spy to overstep the final methods
NodeClient client = getCustomNodeClient(detectorResponse, userIndexResponse, detector, threadPool);
NodeClient client = getCustomNodeClient(detectorResponse, userIndexResponse, null, false, detector, threadPool);
NodeClient clientSpy = spy(client);
NodeStateManager nodeStateManager = mock(NodeStateManager.class);
clientUtil = new SecurityClientUtil(nodeStateManager, settings);
Expand Down Expand Up @@ -546,10 +547,14 @@ public void testUpdateTextField() throws IOException, InterruptedException {
public static NodeClient getCustomNodeClient(
SearchResponse detectorResponse,
SearchResponse userIndexResponse,
SearchResponse configInputIndicesResponse,
boolean useConfigInputIndicesResponse,
AnomalyDetector detector,
ThreadPool pool
) {
return new NodeClient(Settings.EMPTY, pool) {
private int searchCallCount = 0;

@Override
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Expand All @@ -560,8 +565,19 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
if (action.equals(SearchAction.INSTANCE)) {
assertTrue(request instanceof SearchRequest);
SearchRequest searchRequest = (SearchRequest) request;
searchCallCount++;
if (searchRequest.indices()[0].equals(CommonName.CONFIG_INDEX)) {
listener.onResponse((Response) detectorResponse);
} else if (useConfigInputIndicesResponse
&& Arrays.equals(searchRequest.indices(), detector.getIndices().toArray(new String[0]))
&& searchRequest.source().aggregations() == null) {
listener.onResponse((Response) configInputIndicesResponse);
// Call for feature validation occurs on the 3rd call and we want to make sure we supplied a response to the
// previous call.
} else if (searchCallCount == 3 && useConfigInputIndicesResponse) {
// This is the third search call, which should be for featureConfig and we want to replicate something like a
// timeout exception
listener.onFailure(new OpenSearchStatusException("timeout", RestStatus.BAD_REQUEST));
} else {
listener.onResponse((Response) userIndexResponse);
}
Expand Down Expand Up @@ -590,7 +606,7 @@ public void testMoreThanTenMultiEntityDetectors() throws IOException, Interrupte
when(userIndexResponse.getHits()).thenReturn(TestHelpers.createSearchHits(userIndexHits));
// extend NodeClient since its execute method is final and mockito does not allow to mock final methods
// we can also use spy to overstep the final methods
NodeClient client = getCustomNodeClient(detectorResponse, userIndexResponse, detector, threadPool);
NodeClient client = getCustomNodeClient(detectorResponse, userIndexResponse, null, false, detector, threadPool);
NodeClient clientSpy = spy(client);
NodeStateManager nodeStateManager = mock(NodeStateManager.class);
clientUtil = new SecurityClientUtil(nodeStateManager, settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
Expand All @@ -54,6 +55,7 @@
import org.opensearch.timeseries.AbstractTimeSeriesTest;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.TestHelpers;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.common.exception.ValidationException;
import org.opensearch.timeseries.feature.SearchFeatureDao;
import org.opensearch.timeseries.model.ValidationAspect;
Expand Down Expand Up @@ -150,7 +152,7 @@ public void testValidateMoreThanThousandSingleEntityDetectorLimit() throws IOExc
// extend NodeClient since its execute method is final and mockito does not allow to mock final methods
// we can also use spy to overstep the final methods
NodeClient client = IndexAnomalyDetectorActionHandlerTests
.getCustomNodeClient(detectorResponse, userIndexResponse, singleEntityDetector, threadPool);
.getCustomNodeClient(detectorResponse, userIndexResponse, null, false, singleEntityDetector, threadPool);

NodeClient clientSpy = spy(client);
NodeStateManager nodeStateManager = mock(NodeStateManager.class);
Expand Down Expand Up @@ -208,7 +210,7 @@ public void testValidateMoreThanTenMultiEntityDetectorsLimit() throws IOExceptio
// extend NodeClient since its execute method is final and mockito does not allow to mock final methods
// we can also use spy to overstep the final methods
NodeClient client = IndexAnomalyDetectorActionHandlerTests
.getCustomNodeClient(detectorResponse, userIndexResponse, detector, threadPool);
.getCustomNodeClient(detectorResponse, userIndexResponse, null, false, detector, threadPool);
NodeClient clientSpy = spy(client);
NodeStateManager nodeStateManager = mock(NodeStateManager.class);
SecurityClientUtil clientUtil = new SecurityClientUtil(nodeStateManager, settings);
Expand Down Expand Up @@ -250,4 +252,60 @@ public void testValidateMoreThanTenMultiEntityDetectorsLimit() throws IOExceptio
assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS));
verify(clientSpy, never()).execute(eq(GetMappingsAction.INSTANCE), any(), any());
}

// This test also validates that if we get a non timeseries exception or not an invalid query that we will not completely block
// detector creation, this is applicable like things when we get timeout not cause of AD configuration errors but because cluster
// is momentarily under utilized.
public void testValidateMoreThanTenMultiEntityDetectorsLimitDuplicateNameFailure() throws IOException, InterruptedException {
SearchResponse mockResponse = mock(SearchResponse.class);
int totalHits = 1;
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
when(mockResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits));
SearchResponse detectorResponse = mock(SearchResponse.class);
when(detectorResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits));
SearchResponse userIndexResponse = mock(SearchResponse.class);
when(userIndexResponse.getHits()).thenReturn(TestHelpers.createSearchHits(0));
AnomalyDetector singleEntityDetector = TestHelpers.randomAnomalyDetector(TestHelpers.randomUiMetadata(), null, true);

SearchResponse configInputIndicesResponse = mock(SearchResponse.class);
when(configInputIndicesResponse.getHits()).thenReturn(TestHelpers.createSearchHits(2));

// extend NodeClient since its execute method is final and mockito does not allow to mock final methods
// we can also use spy to overstep the final methods
NodeClient client = IndexAnomalyDetectorActionHandlerTests
.getCustomNodeClient(detectorResponse, userIndexResponse, configInputIndicesResponse, true, singleEntityDetector, threadPool);

NodeClient clientSpy = spy(client);
NodeStateManager nodeStateManager = mock(NodeStateManager.class);
SecurityClientUtil clientUtil = new SecurityClientUtil(nodeStateManager, settings);

handler = new ValidateAnomalyDetectorActionHandler(
clusterService,
clientSpy,
clientUtil,
anomalyDetectionIndices,
singleEntityDetector,
requestTimeout,
maxSingleEntityAnomalyDetectors,
maxMultiEntityAnomalyDetectors,
maxAnomalyFeatures,
maxCategoricalFields,
method,
xContentRegistry(),
null,
searchFeatureDao,
ValidationAspect.DETECTOR.getName(),
clock,
settings
);
PlainActionFuture<ValidateConfigResponse> future = PlainActionFuture.newFuture();
handler.start(future);
try {
future.actionGet(100, TimeUnit.SECONDS);
fail("should not reach here");
} catch (Exception e) {
assertTrue(e instanceof TimeSeriesException);
assertTrue(e.getMessage().contains("Cannot create anomaly detector with name"));
}
verify(clientSpy, never()).execute(eq(GetMappingsAction.INSTANCE), any(), any());
}
}
4 changes: 3 additions & 1 deletion src/test/java/org/opensearch/ad/rest/SecureADRestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,9 @@ public void testCreateAnomalyDetectorWithCustomResultIndex() throws IOException
Assert
.assertTrue(
"got " + exception.getMessage(),
exception.getMessage().contains("no permissions for [indices:admin/aliases, indices:admin/create]")
exception.getMessage().contains("indices:admin/aliases")
&& exception.getMessage().contains("indices:admin/create")
&& exception.getMessage().contains("no permissions for")
);

// User cat has permission to create index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testValidateAnomalyDetectorWithInvalidFeatureField() throws IOExcept
}

@Test
public void testValidateAnomalyDetectorWithUnknownFeatureField() throws IOException {
public void testValidateAnomalyDetectorWithInvalidFeatureDueToTimeSeriesException() throws IOException {
AggregationBuilder aggregationBuilder = TestHelpers.parseAggregation("{\"test\":{\"terms\":{\"field\":\"type\"}}}");
AnomalyDetector anomalyDetector = TestHelpers
.randomAnomalyDetector(
Expand All @@ -245,7 +245,7 @@ public void testValidateAnomalyDetectorWithUnknownFeatureField() throws IOExcept
assertNotNull(response.getIssue());
assertEquals(ValidationIssueType.FEATURE_ATTRIBUTES, response.getIssue().getType());
assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect());
assertTrue(response.getIssue().getMessage().contains(CommonMessages.UNKNOWN_SEARCH_QUERY_EXCEPTION_MSG));
assertTrue(response.getIssue().getMessage().contains(CommonMessages.FEATURE_WITH_INVALID_QUERY_MSG));
assertTrue(response.getIssue().getSubIssues().containsKey(nameField));
}

Expand Down
Loading