Skip to content

Commit

Permalink
support only searching results in custom result index (#292)
Browse files Browse the repository at this point in the history
* support only searching results in custom result index

Signed-off-by: Yaliang Wu <[email protected]>

* add/fix comments

Signed-off-by: Yaliang Wu <[email protected]>
  • Loading branch information
ylwu-amzn authored Nov 9, 2021
1 parent d02fb0a commit 29c6e08
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,26 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}

// resultIndex could be concrete index or index pattern
String resultIndex = Strings.trimToNull(request.param(RESULT_INDEX));
boolean onlyQueryCustomResultIndex = request.paramAsBoolean("only_query_custom_result_index", false);
if (resultIndex == null && onlyQueryCustomResultIndex) {
throw new IllegalStateException("No custom result index set.");
}

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser());
searchSourceBuilder.fetchSource(getSourceContext(request));
searchSourceBuilder.seqNoAndPrimaryTerm(true).version(true);
SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(this.index);

// resultIndex could be concrete index or index pattern
String resultIndex = Strings.trimToNull(request.param(RESULT_INDEX));

if (resultIndex != null) {
searchRequest.indices(resultIndex);
if (onlyQueryCustomResultIndex) {
searchRequest.indices(resultIndex);
} else {
searchRequest.indices(this.index, resultIndex);
}
}
return channel -> client.execute(actionType, searchRequest, search(channel));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,26 +73,41 @@ public SearchAnomalyResultTransportAction(
@Override
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
String[] indices = request.indices();
if (indices == null || indices.length == 0) {
listener.onFailure(new IllegalArgumentException("No indices set in search request"));
return;
}
// Set query indices as default result indices, will check custom result indices permission and add
// custom indices which user has search permission later.
request.indices(ALL_AD_RESULTS_INDEX_PATTERN);

Set<String> customResultIndices = new HashSet<>();
if (indices != null && indices.length > 0) {
String[] concreteIndices = indexNameExpressionResolver
.concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpandOpen(), indices);
if (concreteIndices == null || concreteIndices.length == 0) {
// No custom result indices found, just search default result index
searchHandler.search(request, listener);
return;
boolean onlyQueryCustomResultIndex = true;
for (String indexName : indices) {
// If only query custom result index, don't need to set ALL_AD_RESULTS_INDEX_PATTERN in search request
if (ALL_AD_RESULTS_INDEX_PATTERN.equals(indexName)) {
onlyQueryCustomResultIndex = false;
}
for (String index : concreteIndices) {
if (index.startsWith(CUSTOM_RESULT_INDEX_PREFIX)) {
customResultIndices.add(index);
}
}

String[] concreteIndices = indexNameExpressionResolver
.concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpandOpen(), indices);
if (concreteIndices == null || concreteIndices.length == 0) {
// No result indices found, will throw exception
listener.onFailure(new IllegalArgumentException("No indices found"));
return;
}

Set<String> customResultIndices = new HashSet<>();
for (String index : concreteIndices) {
if (index.startsWith(CUSTOM_RESULT_INDEX_PREFIX)) {
customResultIndices.add(index);
}
}

if (onlyQueryCustomResultIndex && customResultIndices.size() == 0) {
listener.onFailure(new IllegalArgumentException("No custom result indices found"));
return;
}

if (customResultIndices.size() > 0) {
// Search both custom AD result index and default result index
String resultIndexAggName = "result_index";
Expand All @@ -105,6 +120,9 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
// Search result indices of all detectors. User may create index with same prefix of custom result index
// which not used for AD, so we should avoid searching extra indices which not used by anomaly detectors.
// Variable used in lambda expression should be final or effectively final, so copy to a final boolean and
// use the final boolean in lambda below.
boolean finalOnlyQueryCustomResultIndex = onlyQueryCustomResultIndex;
client.search(searchResultIndex, ActionListener.wrap(allResultIndicesResponse -> {
Aggregations aggregations = allResultIndicesResponse.getAggregations();
StringTerms resultIndicesAgg = aggregations.get(resultIndexAggName);
Expand Down Expand Up @@ -132,7 +150,9 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
.add(new SearchRequest(index).source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(0)));
}
List<String> readableIndices = new ArrayList<>();
readableIndices.add(ALL_AD_RESULTS_INDEX_PATTERN);
if (!finalOnlyQueryCustomResultIndex) {
readableIndices.add(ALL_AD_RESULTS_INDEX_PATTERN);
}

context.restore();
// Send multiple search to check which index a user has permission to read. If search all indices directly,
Expand All @@ -146,6 +166,10 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
readableIndices.add(indexName);
}
}
if (readableIndices.size() == 0) {
listener.onFailure(new IllegalArgumentException("No readable custom result indices found"));
return;
}
request.indices(readableIndices.toArray(new String[0]));
searchHandler.search(request, listener);
}, multiSearchException -> {
Expand All @@ -161,7 +185,9 @@ protected void doExecute(Task task, SearchRequest request, ActionListener<Search
listener.onFailure(e);
}
} else {
// onlyQueryCustomResultIndex is false in this branch
// Search only default result index
request.indices(ALL_AD_RESULTS_INDEX_PATTERN);
searchHandler.search(request, listener);
}
}
Expand Down
59 changes: 58 additions & 1 deletion src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public ToXContentObject[] getAnomalyDetector(
detector.getLastUpdateTime(),
null,
detector.getUser(),
null
detector.getResultIndex()
),
detectorJob,
historicalAdTask,
Expand Down Expand Up @@ -448,6 +448,40 @@ public Response createIndexRole(String role, String index) throws IOException {
);
}

public Response createSearchRole(String role, String index) throws IOException {
return TestHelpers
.makeRequest(
client(),
"PUT",
"/_opendistro/_security/api/roles/" + role,
null,
TestHelpers
.toHttpEntity(
"{\n"
+ "\"cluster_permissions\": [\n"
+ "],\n"
+ "\"index_permissions\": [\n"
+ "{\n"
+ "\"index_patterns\": [\n"
+ "\""
+ index
+ "\"\n"
+ "],\n"
+ "\"dls\": \"\",\n"
+ "\"fls\": [],\n"
+ "\"masked_fields\": [],\n"
+ "\"allowed_actions\": [\n"
+ "\"indices:data/read/search\"\n"
+ "]\n"
+ "}\n"
+ "],\n"
+ "\"tenant_permissions\": []\n"
+ "}"
),
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, "Kibana"))
);
}

public Response deleteUser(String user) throws IOException {
return TestHelpers
.makeRequest(
Expand Down Expand Up @@ -510,4 +544,27 @@ public Response disableFilterBy() throws IOException {
);
}

protected AnomalyDetector cloneDetector(AnomalyDetector anomalyDetector, String resultIndex) {
AnomalyDetector detector = new AnomalyDetector(
null,
null,
randomAlphaOfLength(5),
randomAlphaOfLength(10),
anomalyDetector.getTimeField(),
anomalyDetector.getIndices(),
anomalyDetector.getFeatureAttributes(),
anomalyDetector.getFilterQuery(),
anomalyDetector.getDetectionInterval(),
anomalyDetector.getWindowDelay(),
anomalyDetector.getShingleSize(),
anomalyDetector.getUiMetadata(),
anomalyDetector.getSchemaVersion(),
Instant.now(),
anomalyDetector.getCategoryField(),
null,
resultIndex
);
return detector;
}

}
21 changes: 21 additions & 0 deletions src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.ad.AbstractADTest;
import org.opensearch.ad.TestHelpers;
import org.opensearch.ad.common.exception.ADValidationException;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.ToXContent;
Expand All @@ -46,6 +47,26 @@ public void testParseAnomalyDetector() throws IOException {
assertEquals("Parsing anomaly detector doesn't work", detector, parsedDetector);
}

public void testParseAnomalyDetectorWithCustomIndex() throws IOException {
String resultIndex = CommonName.CUSTOM_RESULT_INDEX_PREFIX + "test";
AnomalyDetector detector = TestHelpers
.randomDetector(
ImmutableList.of(TestHelpers.randomFeature()),
randomAlphaOfLength(5),
randomIntBetween(1, 5),
randomAlphaOfLength(5),
ImmutableList.of(randomAlphaOfLength(5)),
resultIndex
);
String detectorString = TestHelpers.xContentBuilderToString(detector.toXContent(TestHelpers.builder(), ToXContent.EMPTY_PARAMS));
LOG.info(detectorString);
detectorString = detectorString
.replaceFirst("\\{", String.format(Locale.ROOT, "{\"%s\":\"%s\",", randomAlphaOfLength(5), randomAlphaOfLength(5)));
AnomalyDetector parsedDetector = AnomalyDetector.parse(TestHelpers.parser(detectorString));
assertEquals("Parsing result index doesn't work", resultIndex, parsedDetector.getResultIndex());
assertEquals("Parsing anomaly detector doesn't work", detector, parsedDetector);
}

public void testParseAnomalyDetectorWithoutParams() throws IOException {
AnomalyDetector detector = TestHelpers.randomAnomalyDetector(TestHelpers.randomUiMetadata(), Instant.now());
String detectorString = TestHelpers.xContentBuilderToString(detector.toXContent(TestHelpers.builder()));
Expand Down
37 changes: 34 additions & 3 deletions src/test/java/org/opensearch/ad/rest/SecureADRestIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.junit.Before;
import org.opensearch.ad.AnomalyDetectorRestTestCase;
import org.opensearch.ad.TestHelpers;
import org.opensearch.ad.constant.CommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorExecutionInput;
import org.opensearch.ad.model.DetectionDateRange;
Expand All @@ -47,12 +48,17 @@ public class SecureADRestIT extends AnomalyDetectorRestTestCase {
RestClient elkClient;
String fishUser = "fish";
RestClient fishClient;
String goatUser = "goat";
RestClient goatClient;
private String indexAllAccessRole = "index_all_access";
private String indexSearchAccessRole = "index_all_search";

@Before
public void setupSecureTests() throws IOException {
if (!isHttps())
throw new IllegalArgumentException("Secure Tests are running but HTTPS is not set");
createIndexRole("index_all_access", "*");
createIndexRole(indexAllAccessRole, "*");
createSearchRole(indexSearchAccessRole, "*");
createUser(aliceUser, aliceUser, new ArrayList<>(Arrays.asList("odfe")));
aliceClient = new SecureRestClientBuilder(getClusterHosts().toArray(new HttpHost[0]), isHttps(), aliceUser, aliceUser)
.setSocketTimeout(60000)
Expand Down Expand Up @@ -83,9 +89,15 @@ public void setupSecureTests() throws IOException {
.setSocketTimeout(60000)
.build();

createUser(goatUser, goatUser, new ArrayList<>(Arrays.asList("opensearch")));
goatClient = new SecureRestClientBuilder(getClusterHosts().toArray(new HttpHost[0]), isHttps(), goatUser, goatUser)
.setSocketTimeout(60000)
.build();

createRoleMapping("anomaly_read_access", new ArrayList<>(Arrays.asList(bobUser)));
createRoleMapping("anomaly_full_access", new ArrayList<>(Arrays.asList(aliceUser, catUser, dogUser, elkUser, fishUser)));
createRoleMapping("index_all_access", new ArrayList<>(Arrays.asList(aliceUser, bobUser, catUser, dogUser, fishUser)));
createRoleMapping("anomaly_full_access", new ArrayList<>(Arrays.asList(aliceUser, catUser, dogUser, elkUser, fishUser, goatUser)));
createRoleMapping(indexAllAccessRole, new ArrayList<>(Arrays.asList(aliceUser, bobUser, catUser, dogUser, fishUser)));
createRoleMapping(indexSearchAccessRole, new ArrayList<>(Arrays.asList(goatUser)));
}

@After
Expand All @@ -96,12 +108,14 @@ public void deleteUserSetup() throws IOException {
dogClient.close();
elkClient.close();
fishClient.close();
goatClient.close();
deleteUser(aliceUser);
deleteUser(bobUser);
deleteUser(catUser);
deleteUser(dogUser);
deleteUser(elkUser);
deleteUser(fishUser);
deleteUser(goatUser);
}

public void testCreateAnomalyDetectorWithWriteAccess() throws IOException {
Expand Down Expand Up @@ -278,6 +292,23 @@ public void testCreateAnomalyDetectorWithNoReadPermissionOfIndex() throws IOExce
Assert.assertTrue(exception.getMessage().contains("no permissions for [indices:data/read/search]"));
}

public void testCreateAnomalyDetectorWithCustomResultIndex() throws IOException {
// User alice has AD full access and index permission, so can create detector
AnomalyDetector anomalyDetector = createRandomAnomalyDetector(false, false, aliceClient);
// User elk has AD full access, but has no read permission of index

String resultIndex = CommonName.CUSTOM_RESULT_INDEX_PREFIX + "test";
AnomalyDetector detector = cloneDetector(anomalyDetector, resultIndex);
// User goat has no permission to create index
Exception exception = expectThrows(IOException.class, () -> { createAnomalyDetector(detector, true, goatClient); });
Assert.assertTrue(exception.getMessage().contains("no permissions for [indices:admin/create]"));

// User cat has permission to create index
resultIndex = CommonName.CUSTOM_RESULT_INDEX_PREFIX + "test2";
AnomalyDetector detectorOfCat = createAnomalyDetector(cloneDetector(anomalyDetector, resultIndex), true, catClient);
assertEquals(resultIndex, detectorOfCat.getResultIndex());
}

public void testPreviewAnomalyDetectorWithWriteAccess() throws IOException {
// User Alice has AD full access, should be able to create/preview a detector
AnomalyDetector aliceDetector = createRandomAnomalyDetector(false, false, aliceClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package org.opensearch.ad.transport;

import static org.opensearch.ad.TestHelpers.matchAllRequest;
import static org.opensearch.ad.indices.AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN;

import java.io.IOException;

Expand All @@ -28,7 +29,9 @@ public void testSearchResultAction() throws IOException {
createADResultIndex();
String adResultId = createADResult(TestHelpers.randomAnomalyDetectResult());

SearchResponse searchResponse = client().execute(SearchAnomalyResultAction.INSTANCE, matchAllRequest()).actionGet(10000);
SearchResponse searchResponse = client()
.execute(SearchAnomalyResultAction.INSTANCE, matchAllRequest().indices(ALL_AD_RESULTS_INDEX_PATTERN))
.actionGet(10000);
assertEquals(1, searchResponse.getInternalResponse().hits().getTotalHits().value);

assertEquals(adResultId, searchResponse.getInternalResponse().hits().getAt(0).getId());
Expand All @@ -37,8 +40,12 @@ public void testSearchResultAction() throws IOException {
@Test
public void testNoIndex() {
deleteIndexIfExists(CommonName.ANOMALY_RESULT_INDEX_ALIAS);
SearchResponse searchResponse = client().execute(SearchAnomalyResultAction.INSTANCE, matchAllRequest()).actionGet(10000);
assertEquals(0, searchResponse.getInternalResponse().hits().getTotalHits().value);
expectThrows(
IllegalArgumentException.class,
() -> client()
.execute(SearchAnomalyResultAction.INSTANCE, matchAllRequest().indices(ALL_AD_RESULTS_INDEX_PATTERN))
.actionGet(10000)
);
}

}

0 comments on commit 29c6e08

Please sign in to comment.