Skip to content

Commit

Permalink
[Extensions] Migrates Search Detector Info action to extension (#875)
Browse files Browse the repository at this point in the history
* Migrates Search Detector Info action to extension

Signed-off-by: Owais Kazi <[email protected]>

* Added wildcards for exclusion

Signed-off-by: Owais Kazi <[email protected]>

* Refactored ImmutableMap coming from OpenSearch

Signed-off-by: Owais Kazi <[email protected]>

---------

Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 authored Apr 27, 2023
1 parent 9ad715a commit cf70e72
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 98 deletions.
16 changes: 7 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,8 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.model.EntityProfile.Builder',
'org.opensearch.ad.transport.AnomalyResultTransportAction.PageListener',
'org.opensearch.ad.feature.CompositeRetriever.Page',
'org.opensearch.ad.feature.CompositeRetriever.PageIterator.1',
'org.opensearch.ad.feature.CompositeRetriever.PageIterator',
'org.opensearch.ad.feature.CompositeRetriever.PageIterator.*',
'org.opensearch.ad.cluster.ADDataMigrator',
'org.opensearch.ad.AnomalyDetectorExtension',
'org.opensearch.ad.transport.ADJobRunnerTransportAction*',
Expand All @@ -734,6 +734,7 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.transport.AnomalyResultResponse',
'org.opensearch.ad.MemoryTracker',
'org.opensearch.ad.caching.PriorityCache',
'org.opensearch.ad.caching.PriorityCache.*',
'org.opensearch.ad.common.exception.EndRunException',
'org.opensearch.ad.common.exception.LimitExceededException',
'org.opensearch.ad.common.exception.ClientException',
Expand All @@ -744,12 +745,10 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.task.ADHCBatchTaskRunState:',
'org.opensearch.ad.task.ADBatchTaskCache',
'org.opensearch.ad.caching.DoorKeeper',
'org.opensearch.ad.caching.PriorityCache.1',
'org.opensearch.ad.caching.CacheProvider',
'org.opensearch.ad.util.IndexUtils',
'org.opensearch.ad.cluster.ClusterManagerEventListener',
'org.opensearch.ad.cluster.ClusterManagerEventListener.1',
'org.opensearch.ad.cluster.ClusterManagerEventListener.2',
'org.opensearch.ad.cluster.ClusterManagerEventListener.*',
'org.opensearch.ad.transport.ADStatsNodesAction',
'org.opensearch.ad.transport.BackPressureRouting',
'org.opensearch.ad.transport.ADStatsNodesTransportAction',
Expand All @@ -768,17 +767,16 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.ratelimit.RateLimitedRequestWorker.RequestQueue',
'org.opensearch.ad.stats.StatNames',
'org.opensearch.ad.MaintenanceState',
'org.opensearch.ad.AnomalyDetectorExtension.1',
'org.opensearch.ad.AnomalyDetectorExtension.1.1',
'org.opensearch.ad.AnomalyDetectorExtension.*',
'org.opensearch.ad.EntityProfileRunner',
'org.opensearch.ad.caching.CacheProvider',
'org.opensearch.ad.transport.ADResultBulkTransportAction',
'org.opensearch.ad.transport.ADResultBulkRequest',
'org.opensearch.ad.transport.ADResultBulkAction',
'org.opensearch.ad.ratelimit.ResultWriteRequest',
'org.opensearch.ad.AnomalyDetectorJobRunner.1',
'org.opensearch.ad.AnomalyDetectorJobRunner.2',
'org.opensearch.ad.util.RestHandlerUtils'
'org.opensearch.ad.AnomalyDetectorJobRunner.*',
'org.opensearch.ad.util.RestHandlerUtils',
'org.opensearch.ad.transport.SearchAnomalyDetectorInfoTransportAction.*'
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.opensearch.ad.rest.RestPreviewAnomalyDetectorAction;
import org.opensearch.ad.rest.RestSearchADTasksAction;
import org.opensearch.ad.rest.RestSearchAnomalyDetectorAction;
import org.opensearch.ad.rest.RestSearchAnomalyDetectorInfoAction;
import org.opensearch.ad.rest.RestSearchAnomalyResultAction;
import org.opensearch.ad.rest.RestStatsAnomalyDetectorAction;
import org.opensearch.ad.rest.RestValidateAnomalyDetectorAction;
Expand Down Expand Up @@ -109,6 +110,8 @@
import org.opensearch.ad.transport.SearchADTasksAction;
import org.opensearch.ad.transport.SearchADTasksTransportAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorInfoAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorInfoTransportAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorTransportAction;
import org.opensearch.ad.transport.SearchAnomalyResultAction;
import org.opensearch.ad.transport.SearchAnomalyResultTransportAction;
Expand Down Expand Up @@ -204,7 +207,8 @@ public List<ExtensionRestHandler> getExtensionRestHandlers() {
new RestStatsAnomalyDetectorAction(extensionsRunner(), restClient(), adStats, nodeFilter),
new RestSearchAnomalyDetectorAction(extensionsRunner(), restClient()),
new RestSearchAnomalyResultAction(extensionsRunner(), restClient()),
new RestSearchADTasksAction(extensionsRunner(), restClient())
new RestSearchADTasksAction(extensionsRunner(), restClient()),
new RestSearchAnomalyDetectorInfoAction(extensionsRunner(), restClient())
);
}

Expand Down Expand Up @@ -782,8 +786,10 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
new ActionHandler<>(SearchAnomalyDetectorAction.INSTANCE, SearchAnomalyDetectorTransportAction.class),
new ActionHandler<>(SearchAnomalyResultAction.INSTANCE, SearchAnomalyResultTransportAction.class),
new ActionHandler<>(SearchADTasksAction.INSTANCE, SearchADTasksTransportAction.class),
new ActionHandler<>(SearchAnomalyDetectorInfoAction.INSTANCE, SearchAnomalyDetectorInfoTransportAction.class),
new ActionHandler<>(StopDetectorAction.INSTANCE, StopDetectorTransportAction.class),
new ActionHandler<>(DeleteModelAction.INSTANCE, DeleteModelTransportAction.class)

);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ private void updateJobIndexSettingIfNecessary(IndexState jobIndexState, ActionLi

private static Integer getIntegerSetting(GetSettingsResponse settingsResponse, String settingKey) {
Integer value = null;
Iterator<Settings> iter = settingsResponse.getIndexToSettings().valuesIt();
Iterator<Settings> iter = settingsResponse.getIndexToSettings().values().iterator();
while (iter.hasNext()) {
Settings settings = iter.next();
value = settings.getAsInt(settingKey, null);
Expand All @@ -1166,7 +1166,7 @@ private static Integer getIntegerSetting(GetSettingsResponse settingsResponse, S

private static String getStringSetting(GetSettingsResponse settingsResponse, String settingKey) {
String value = null;
Iterator<Settings> iter = settingsResponse.getIndexToSettings().valuesIt();
Iterator<Settings> iter = settingsResponse.getIndexToSettings().values().iterator();
while (iter.hasNext()) {
Settings settings = iter.next();
value = settings.get(settingKey, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,59 @@
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.AnomalyDetectorPlugin;
import org.opensearch.action.ActionListener;
import org.opensearch.ad.AnomalyDetectorExtension;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.ad.transport.SearchAnomalyDetectorInfoAction;
import org.opensearch.ad.transport.SearchAnomalyDetectorInfoRequest;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.ad.transport.SearchAnomalyDetectorInfoResponse;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.extensions.rest.ExtensionRestResponse;
import org.opensearch.rest.RestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;
import org.opensearch.rest.RestStatus;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.rest.BaseExtensionRestHandler;
import org.opensearch.sdk.rest.ReplacedRouteHandler;

import com.google.common.collect.ImmutableList;

public class RestSearchAnomalyDetectorInfoAction extends BaseRestHandler {
public class RestSearchAnomalyDetectorInfoAction extends BaseExtensionRestHandler {

public static final String SEARCH_ANOMALY_DETECTOR_INFO_ACTION = "search_anomaly_detector_info";

private static final Logger logger = LogManager.getLogger(RestSearchAnomalyDetectorInfoAction.class);
private SDKRestClient client;
private ExtensionsRunner extensionsRunner;

public RestSearchAnomalyDetectorInfoAction() {}
public RestSearchAnomalyDetectorInfoAction(ExtensionsRunner extensionsRunner, SDKRestClient client) {
this.extensionsRunner = extensionsRunner;
this.client = client;
}

@Override
public String getName() {
return SEARCH_ANOMALY_DETECTOR_INFO_ACTION;
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, org.opensearch.client.node.NodeClient client) throws IOException {
private Function<RestRequest, ExtensionRestResponse> handleRequest = (request) -> {
try {
return prepareRequest(request);
} catch (Exception e) {
return exceptionalRequest(request, e);
}
};

protected ExtensionRestResponse prepareRequest(RestRequest request) throws IOException {
if (!EnabledSetting.isADPluginEnabled()) {
throw new IllegalStateException(CommonErrorMessages.DISABLED_ERR_MSG);
}
Expand All @@ -55,8 +78,30 @@ protected RestChannelConsumer prepareRequest(RestRequest request, org.opensearch
String rawPath = request.rawPath();

SearchAnomalyDetectorInfoRequest searchAnomalyDetectorInfoRequest = new SearchAnomalyDetectorInfoRequest(detectorName, rawPath);
return channel -> client
.execute(SearchAnomalyDetectorInfoAction.INSTANCE, searchAnomalyDetectorInfoRequest, new RestToXContentListener<>(channel));
CompletableFuture<SearchAnomalyDetectorInfoResponse> futureResponse = new CompletableFuture<>();
client
.execute(
SearchAnomalyDetectorInfoAction.INSTANCE,
searchAnomalyDetectorInfoRequest,
ActionListener
.wrap(
adSearchInfoResponse -> futureResponse.complete(adSearchInfoResponse),
ex -> futureResponse.completeExceptionally(ex)
)
);

SearchAnomalyDetectorInfoResponse searchAnomalyDetectorInfoResponse = futureResponse
.orTimeout(
AnomalyDetectorSettings.REQUEST_TIMEOUT.get(extensionsRunner.getEnvironmentSettings()).getMillis(),
TimeUnit.MILLISECONDS
)
.join();
ExtensionRestResponse extensionRestResponse = new ExtensionRestResponse(
request,
RestStatus.OK,
searchAnomalyDetectorInfoResponse.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS)
);
return extensionRestResponse;
}

@Override
Expand All @@ -65,22 +110,24 @@ public List<RestHandler.Route> routes() {
}

@Override
public List<RestHandler.ReplacedRoute> replacedRoutes() {
public List<ReplacedRouteHandler> replacedRouteHandlers() {
return ImmutableList
.of(
// get the count of number of detectors
new ReplacedRoute(
new ReplacedRouteHandler(
RestRequest.Method.GET,
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, COUNT),
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorExtension.AD_BASE_DETECTORS_URI, COUNT),
RestRequest.Method.GET,
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.LEGACY_OPENDISTRO_AD_BASE_URI, COUNT)
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorExtension.LEGACY_OPENDISTRO_AD_BASE_URI, COUNT),
handleRequest
),
// get if a detector name exists with name
new ReplacedRoute(
new ReplacedRouteHandler(
RestRequest.Method.GET,
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.AD_BASE_DETECTORS_URI, MATCH),
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorExtension.AD_BASE_DETECTORS_URI, MATCH),
RestRequest.Method.GET,
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorPlugin.LEGACY_OPENDISTRO_AD_BASE_URI, MATCH)
String.format(Locale.ROOT, "%s/%s", AnomalyDetectorExtension.LEGACY_OPENDISTRO_AD_BASE_URI, MATCH),
handleRequest
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,33 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.TransportAction;
import org.opensearch.ad.util.RestHandlerUtils;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.sdk.SDKClient.SDKRestClient;
import org.opensearch.sdk.SDKClusterService;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;
import org.opensearch.tasks.TaskManager;

import com.google.inject.Inject;

public class SearchAnomalyDetectorInfoTransportAction extends
HandledTransportAction<SearchAnomalyDetectorInfoRequest, SearchAnomalyDetectorInfoResponse> {
TransportAction<SearchAnomalyDetectorInfoRequest, SearchAnomalyDetectorInfoResponse> {
private static final Logger LOG = LogManager.getLogger(SearchAnomalyDetectorInfoTransportAction.class);
private final Client client;
private final ClusterService clusterService;
private final SDKRestClient client;
private final SDKClusterService clusterService;

@Inject
public SearchAnomalyDetectorInfoTransportAction(
TransportService transportService,
TaskManager taskManager,
ActionFilters actionFilters,
Client client,
ClusterService clusterService
SDKRestClient client,
SDKClusterService clusterService
) {
super(SearchAnomalyDetectorInfoAction.NAME, transportService, actionFilters, SearchAnomalyDetectorInfoRequest::new);
super(SearchAnomalyDetectorInfoAction.NAME, actionFilters, taskManager);
this.client = client;
this.clusterService = clusterService;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/opensearch/ad/util/IndexUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public String getIndexHealthStatus(String indexOrAliasName) throws IllegalArgume
* @return The number of documents in an index. 0 is returned if the index does not exist. -1 is returned if the
* request fails.
*/
@Deprecated
public Long getNumberOfDocumentsInIndex(String indexName) {
if (!clusterService.state().getRoutingTable().hasIndex(indexName)) {
return 0L;
Expand Down
8 changes: 3 additions & 5 deletions src/test/java/org/opensearch/ad/ADIntegTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
Expand All @@ -65,7 +64,6 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableMap;

public abstract class ADIntegTestCase extends OpenSearchIntegTestCase {
Expand Down Expand Up @@ -252,7 +250,7 @@ public ClusterUpdateSettingsResponse updateTransientSettings(Map<String, ?> sett
return clusterAdmin().updateSettings(updateSettingsRequest).actionGet(timeout);
}

public ImmutableOpenMap<String, DiscoveryNode> getDataNodes() {
public Map<String, DiscoveryNode> getDataNodes() {
DiscoveryNodes nodes = clusterService().state().getNodes();
return nodes.getDataNodes();
}
Expand All @@ -268,10 +266,10 @@ public Client getDataNodeClient() {

public DiscoveryNode[] getDataNodesArray() {
DiscoveryNodes nodes = clusterService().state().getNodes();
Iterator<ObjectObjectCursor<String, DiscoveryNode>> iterator = nodes.getDataNodes().iterator();
Iterator<DiscoveryNode> iterator = nodes.getDataNodes().values().iterator();
List<DiscoveryNode> dataNodes = new ArrayList<>();
while (iterator.hasNext()) {
dataNodes.add(iterator.next().value);
dataNodes.add(iterator.next());
}
return dataNodes.toArray(new DiscoveryNode[0]);
}
Expand Down
Loading

0 comments on commit cf70e72

Please sign in to comment.