Skip to content

Commit

Permalink
Migrates Search Detector Info action to extension
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Apr 24, 2023
1 parent 86b1084 commit 5f9b723
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 80 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,8 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.AnomalyDetectorExtension.1.1',
'org.opensearch.ad.EntityProfileRunner',
'org.opensearch.ad.caching.CacheProvider',
'org.opensearch.ad.util.RestHandlerUtils'
'org.opensearch.ad.util.RestHandlerUtils',
'org.opensearch.ad.transport.SearchAnomalyDetectorInfoTransportAction.*'
]


Expand Down
9 changes: 7 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.ad.rest.RestIndexAnomalyDetectorAction;
import org.opensearch.ad.rest.RestSearchADTasksAction;
import org.opensearch.ad.rest.RestSearchAnomalyDetectorAction;
import org.opensearch.ad.rest.RestSearchAnomalyDetectorInfoAction;
import org.opensearch.ad.rest.RestSearchAnomalyResultAction;
import org.opensearch.ad.rest.RestStatsAnomalyDetectorAction;
import org.opensearch.ad.rest.RestValidateAnomalyDetectorAction;
Expand Down Expand Up @@ -89,6 +90,8 @@
import org.opensearch.ad.transport.SearchADTasksAction;
import org.opensearch.ad.transport.SearchADTasksTransportAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorInfoAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorInfoTransportAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.SearchAnomalyResultAction;
import org.opensearch.ad.transport.SearchAnomalyResultTransportAction;
Expand Down Expand Up @@ -178,7 +181,8 @@ public List<ExtensionRestHandler> getExtensionRestHandlers() {
new RestStatsAnomalyDetectorAction(extensionsRunner(), restClient(), adStats, nodeFilter),
new RestSearchAnomalyDetectorAction(extensionsRunner(), restClient()),
new RestSearchAnomalyResultAction(extensionsRunner(), restClient()),
new RestSearchADTasksAction(extensionsRunner(), restClient())
new RestSearchADTasksAction(extensionsRunner(), restClient()),
new RestSearchAnomalyDetectorInfoAction(extensionsRunner(), restClient())
);
}

Expand Down Expand Up @@ -618,7 +622,8 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
// https://github.com/opensearch-project/opensearch-sdk-java/issues/626
new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class),
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class),
new ActionHandler<>(SearchADTasksAction.INSTANCE, SearchADTasksTransportAction.class)
new ActionHandler<>(SearchADTasksAction.INSTANCE, SearchADTasksTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,59 @@
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.AnomalyDetectorPlugin;
import org.opensearch.action.ActionListener;
import org.opensearch.ad.AnomalyDetectorExtension;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.ad.transport.SearchAnomalyDetectorInfoAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorInfoRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.ad.transport.SearchAnomalyDetectorInfoResponse;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.extensions.rest.ExtensionRestResponse;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.rest.RestStatus;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.rest.BaseExtensionRestHandler;
import org.opensearch.sdk.rest.ReplacedRouteHandler;

import com.google.common.collect.ImmutableList;

public class RestSearchAnomalyDetectorInfoAction extends BaseRestHandler {
public class RestSearchAnomalyDetectorInfoAction extends BaseExtensionRestHandler {

public static final String SEARCH_ANOMALY_DETECTOR_INFO_ACTION = "search_anomaly_detector_info";

private static final Logger logger = LogManager.getLogger(RestSearchAnomalyDetectorInfoAction.class);
private SDKRestClient client;
private ExtensionsRunner extensionsRunner;

public RestSearchAnomalyDetectorInfoAction() {}
public RestSearchAnomalyDetectorInfoAction(ExtensionsRunner extensionsRunner, SDKRestClient client) {
this.extensionsRunner = extensionsRunner;
this.client = client;
}

@Override
public String getName() {
return SEARCH_ANOMALY_DETECTOR_INFO_ACTION;
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, org.opensearch.client.node.NodeClient client) throws IOException {
private Function<RestRequest, ExtensionRestResponse> handleRequest = (request) -> {
try {
return prepareRequest(request);
} catch (Exception e) {
return exceptionalRequest(request, e);
}
};

protected ExtensionRestResponse prepareRequest(RestRequest request) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
Expand All @@ -55,8 +78,30 @@ protected RestChannelConsumer prepareRequest(RestRequest request, org.opensearch
String rawPath = request.rawPath();

SearchAnomalyDetectorInfoRequest searchAnomalyDetectorInfoRequest = new SearchAnomalyDetectorInfoRequest(detectorName, rawPath);
return channel -> client
.execute(SearchAnomalyDetectorInfoAction.INSTANCE, searchAnomalyDetectorInfoRequest, new RestToXContentListener<>(channel));
CompletableFuture<SearchAnomalyDetectorInfoResponse> futureResponse = new CompletableFuture<>();
client
.execute(
SearchAnomalyDetectorInfoAction.INSTANCE,
searchAnomalyDetectorInfoRequest,
ActionListener
.wrap(
adSearchInfoResponse -> futureResponse.complete(adSearchInfoResponse),
ex -> futureResponse.completeExceptionally(ex)
)
);

SearchAnomalyDetectorInfoResponse searchAnomalyDetectorInfoResponse = futureResponse
.orTimeout(
AnomalyDetectorSettings.REQUEST_TIMEOUT.get(extensionsRunner.getEnvironmentSettings()).getMillis(),
TimeUnit.MILLISECONDS
)
.join();
ExtensionRestResponse extensionRestResponse = new ExtensionRestResponse(
request,
RestStatus.OK,
searchAnomalyDetectorInfoResponse.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS)
);
return extensionRestResponse;
}

@Override
Expand All @@ -65,22 +110,24 @@ public List<RestHandler.Route> routes() {
}

@Override
public List<RestHandler.ReplacedRoute> replacedRoutes() {
public List<ReplacedRouteHandler> replacedRouteHandlers() {
return ImmutableList
.of(
// get the count of number of detectors
new ReplacedRoute(
new ReplacedRouteHandler(
RestRequest.Method.GET,
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, COUNT),
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorExtension.AD_BASE_DETECTORS_URI, COUNT),
RestRequest.Method.GET,
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.LEGACY_OPENDISTRO_AD_BASE_URI, COUNT)
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorExtension.LEGACY_OPENDISTRO_AD_BASE_URI, COUNT),
handleRequest
),
// get if a detector name exists with name
new ReplacedRoute(
new ReplacedRouteHandler(
RestRequest.Method.GET,
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, MATCH),
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorExtension.AD_BASE_DETECTORS_URI, MATCH),
RestRequest.Method.GET,
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.LEGACY_OPENDISTRO_AD_BASE_URI, MATCH)
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorExtension.LEGACY_OPENDISTRO_AD_BASE_URI, MATCH),
handleRequest
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,33 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.TransportAction;
import org.opensearch.ad.util.RestHandlerUtils;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.SDKClusterService;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
import org.opensearch.tasks.TaskManager;

import com.google.inject.Inject;

public class SearchAnomalyDetectorInfoTransportAction extends
HandledTransportAction<SearchAnomalyDetectorInfoRequest, SearchAnomalyDetectorInfoResponse> {
TransportAction<SearchAnomalyDetectorInfoRequest, SearchAnomalyDetectorInfoResponse> {
private static final Logger LOG = LogManager.getLogger(SearchAnomalyDetectorInfoTransportAction.class);
private final Client client;
private final ClusterService clusterService;
private final SDKRestClient client;
private final SDKClusterService clusterService;

@Inject
public SearchAnomalyDetectorInfoTransportAction(
TransportService transportService,
TaskManager taskManager,
ActionFilters actionFilters,
Client client,
ClusterService clusterService
SDKRestClient client,
SDKClusterService clusterService
) {
super(SearchAnomalyDetectorInfoAction.NAME, transportService, actionFilters, SearchAnomalyDetectorInfoRequest::new);
super(SearchAnomalyDetectorInfoAction.NAME, actionFilters, taskManager);
this.client = client;
this.clusterService = clusterService;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/opensearch/ad/util/IndexUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public String getIndexHealthStatus(String indexOrAliasName) throws IllegalArgume
* @return The number of documents in an index. 0 is returned if the index does not exist. -1 is returned if the
* request fails.
*/
@Deprecated
public Long getNumberOfDocumentsInIndex(String indexName) {
if (!clusterService.state().getRoutingTable().hasIndex(indexName)) {
return 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
import static org.opensearch.ad.TestHelpers.createEmptySearchResponse;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;

import org.junit.Assert;
import org.junit.Before;
Expand All @@ -32,38 +30,38 @@
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.ad.TestHelpers;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.sdk.Extension;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.SDKClusterService;
import org.opensearch.sdk.SDKClusterService.SDKClusterSettings;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

public class SearchAnomalyDetectorInfoActionTests extends OpenSearchIntegTestCase {
private SearchAnomalyDetectorInfoRequest request;
private ActionListener<SearchAnomalyDetectorInfoResponse> response;
private SearchAnomalyDetectorInfoTransportAction action;
private Task task;
private ClusterService clusterService;
private Client client;
private SDKClusterService clusterService;
private SDKRestClient client;
private ThreadPool threadPool;
private ExtensionsRunner mockRunner;
private SDKClusterSettings clusterSettings;
private PlainActionFuture<SearchAnomalyDetectorInfoResponse> future;

@Override
@Before
public void setUp() throws Exception {
super.setUp();
action = new SearchAnomalyDetectorInfoTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
client(),
clusterService()
);
action = new SearchAnomalyDetectorInfoTransportAction(mock(TaskManager.class), mock(ActionFilters.class), null, null);
task = mock(Task.class);
response = new ActionListener<SearchAnomalyDetectorInfoResponse>() {
@Override
Expand All @@ -79,17 +77,21 @@ public void onFailure(Exception e) {
};

future = mock(PlainActionFuture.class);
client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);
threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
client = mock(SDKRestClient.class);
// when(client.threadPool()).thenReturn(threadPool);
// threadPool = mock(ThreadPool.class);
// when(client.threadPool()).thenReturn(threadPool);
Settings settings = Settings.builder().build();

clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings = new ClusterSettings(
Settings.EMPTY,
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES)))
);
clusterService = mock(SDKClusterService.class);
mockRunner = mock(ExtensionsRunner.class);
List<Setting<?>> settingsList = List.of(AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES);
clusterService = mock(SDKClusterService.class);
Extension mockExtension = mock(Extension.class);
when(mockRunner.getEnvironmentSettings()).thenReturn(settings);
when(mockRunner.getExtension()).thenReturn(mockExtension);
when(mockExtension.getSettings()).thenReturn(settingsList);
SDKClusterSettings clusterSettings = new SDKClusterService(mockRunner).getClusterSettings();
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
}

Expand Down Expand Up @@ -146,12 +148,7 @@ public void testSearchInfoResponse_CountSuccessWithEmptyResponse() throws IOExce
return null;
}).when(client).search(any(), any());

action = new SearchAnomalyDetectorInfoTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
client,
clusterService
);
action = new SearchAnomalyDetectorInfoTransportAction(mock(TaskManager.class), mock(ActionFilters.class), client, clusterService);
SearchAnomalyDetectorInfoRequest request = new SearchAnomalyDetectorInfoRequest("testDetector", "count");
action.doExecute(task, request, future);
verify(future).onResponse(any(SearchAnomalyDetectorInfoResponse.class));
Expand All @@ -166,12 +163,7 @@ public void testSearchInfoResponse_MatchSuccessWithEmptyResponse() throws IOExce
return null;
}).when(client).search(any(), any());

action = new SearchAnomalyDetectorInfoTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
client,
clusterService
);
action = new SearchAnomalyDetectorInfoTransportAction(mock(TaskManager.class), mock(ActionFilters.class), client, clusterService);
SearchAnomalyDetectorInfoRequest request = new SearchAnomalyDetectorInfoRequest("testDetector", "match");
action.doExecute(task, request, future);
verify(future).onResponse(any(SearchAnomalyDetectorInfoResponse.class));
Expand All @@ -184,12 +176,7 @@ public void testSearchInfoResponse_CountRuntimeException() throws IOException {
listener.onFailure(new RuntimeException("searchResponse failed!"));
return null;
}).when(client).search(any(), any());
action = new SearchAnomalyDetectorInfoTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
client,
clusterService
);
action = new SearchAnomalyDetectorInfoTransportAction(mock(TaskManager.class), mock(ActionFilters.class), client, clusterService);
SearchAnomalyDetectorInfoRequest request = new SearchAnomalyDetectorInfoRequest("testDetector", "count");
action.doExecute(task, request, future);
verify(future).onFailure(any(RuntimeException.class));
Expand All @@ -202,12 +189,7 @@ public void testSearchInfoResponse_MatchRuntimeException() throws IOException {
listener.onFailure(new RuntimeException("searchResponse failed!"));
return null;
}).when(client).search(any(), any());
action = new SearchAnomalyDetectorInfoTransportAction(
mock(TransportService.class),
mock(ActionFilters.class),
client,
clusterService
);
action = new SearchAnomalyDetectorInfoTransportAction(mock(TaskManager.class), mock(ActionFilters.class), client, clusterService);
SearchAnomalyDetectorInfoRequest request = new SearchAnomalyDetectorInfoRequest("testDetector", "match");
action.doExecute(task, request, future);
verify(future).onFailure(any(RuntimeException.class));
Expand Down

0 comments on commit 5f9b723

Please sign in to comment.