Skip to content

Commit

Permalink
[Feature/extensions] Adding ActionListener onFailure to ExtensionsOrc…
Browse files Browse the repository at this point in the history
…hestrator (#4191)

* Added ActionListener onFailure logic to ExtensionsOrchestrator

Signed-off-by: Ryan Bogan <[email protected]>

* Addressed PR Comments

Signed-off-by: Ryan Bogan <[email protected]>

* Addressed PR Comments

Signed-off-by: Ryan Bogan <[email protected]>

* Changed success and failure count to response count

Signed-off-by: Ryan Bogan <[email protected]>

* Modified CHANGELOG.md

Signed-off-by: Ryan Bogan <[email protected]>

* Addressed PR Comments

Signed-off-by: Ryan Bogan <[email protected]>

* Addressed PR Comments

Signed-off-by: Ryan Bogan <[email protected]>

* Addressed PR Comments

Signed-off-by: Ryan Bogan <[email protected]>

Signed-off-by: Ryan Bogan <[email protected]>
  • Loading branch information
ryanbogan authored Sep 17, 2022
1 parent 19776fc commit edde6a4
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 21 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Only send one extension info when initializing ([#4302](https://github.com/opensearch-project/OpenSearch/pull/4302))
- Adding support for registering Transport Actions for extensions ([#4326](https://github.com/opensearch-project/OpenSearch/pull/4326))
- Pass full RestResponse to user from Extension ([#4356](https://github.com/opensearch-project/OpenSearch/pull/4356))
- Handle named wildcards (REST path parameters) ([#4415](https://github.com/opensearch-project/OpenSearch/pull/4415))
- Handle named wildcards (REST path parameters) ([#4415](https://github.com/opensearch-project/OpenSearch/pull/4415))
- Adding ActionListener onFailure to ExtensionsOrchestrator ([#61](https://github.com/opensearch-project/opensearch-sdk/issues/61))

## [2.x]
### Added
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.extensions;

import java.util.ArrayList;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.analyze.AnalyzeAction.Response;

/**
* ActionListener for ExtensionsOrchestrator
*
* @opensearch.internal
*/
public class ExtensionActionListener<ExtensionBooleanResponse> implements ActionListener<Response> {

private static final Logger logger = LogManager.getLogger(ExtensionActionListener.class);
private ArrayList<Exception> exceptionList;

public ExtensionActionListener() {
exceptionList = new ArrayList<Exception>();
}

@Override
public void onResponse(Response response) {
logger.info("response {}", response);
}

@Override
public void onFailure(Exception e) {
exceptionList.add(e);
logger.error(e.getMessage());
}

public static Logger getLogger() {
return logger;
}

public ArrayList<Exception> getExceptionList() {
return exceptionList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,29 @@
public class ExtensionRequest extends TransportRequest {
private static final Logger logger = LogManager.getLogger(ExtensionRequest.class);
private ExtensionsOrchestrator.RequestType requestType;
private String failureException;

public ExtensionRequest(ExtensionsOrchestrator.RequestType requestType) {
this.requestType = requestType;
this.failureException = "";
}

public ExtensionRequest(StreamInput in) throws IOException {
super(in);
this.requestType = in.readEnum(ExtensionsOrchestrator.RequestType.class);
this.failureException = in.readOptionalString();
}

public ExtensionRequest(ExtensionsOrchestrator.RequestType requestType, String failureException) {
this.requestType = requestType;
this.failureException = failureException;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeEnum(requestType);
out.writeOptionalString(failureException);
}

public ExtensionsOrchestrator.RequestType getRequestType() {
Expand All @@ -63,4 +72,8 @@ public int hashCode() {
return Objects.hash(requestType);
}

public String getFailureException() {
return failureException;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class ExtensionsOrchestrator implements ReportingService<PluginsAndModule
public static final String REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS = "internal:discovery/registertransportactions";
public static final String REQUEST_OPENSEARCH_NAMED_WRITEABLE_REGISTRY = "internal:discovery/namedwriteableregistry";
public static final String REQUEST_OPENSEARCH_PARSE_NAMED_WRITEABLE = "internal:discovery/parsenamedwriteable";
public static final String REQUEST_EXTENSION_ACTION_LISTENER_ON_FAILURE = "internal:extensions/actionlisteneronfailure";
public static final String REQUEST_REST_EXECUTE_ON_EXTENSION_ACTION = "internal:extensions/restexecuteonextensiontaction";

private static final Logger logger = LogManager.getLogger(ExtensionsOrchestrator.class);
Expand All @@ -87,6 +88,7 @@ public static enum RequestType {
REQUEST_EXTENSION_CLUSTER_STATE,
REQUEST_EXTENSION_LOCAL_NODE,
REQUEST_EXTENSION_CLUSTER_SETTINGS,
REQUEST_EXTENSION_ACTION_LISTENER_ON_FAILURE,
REQUEST_EXTENSION_REGISTER_REST_ACTIONS,
CREATE_COMPONENT,
ON_INDEX_MODULE,
Expand All @@ -111,6 +113,7 @@ public static enum OpenSearchRequestType {
TransportService transportService;
ClusterService clusterService;
ExtensionNamedWriteableRegistry namedWriteableRegistry;
ExtensionActionListener<ExtensionBooleanResponse> listener;

/**
* Instantiate a new ExtensionsOrchestrator object to handle requests and responses from extensions. This is called during Node bootstrap.
Expand All @@ -127,6 +130,7 @@ public ExtensionsOrchestrator(Settings settings, Path extensionsPath) throws IOE
this.extensionIdMap = new HashMap<String, DiscoveryExtension>();
this.clusterService = null;
this.namedWriteableRegistry = null;
this.listener = new ExtensionActionListener<ExtensionBooleanResponse>();

/*
* Now Discover extensions
Expand Down Expand Up @@ -187,6 +191,14 @@ private void registerRequestHandler() {
ExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
);
transportService.registerRequestHandler(
REQUEST_EXTENSION_ACTION_LISTENER_ON_FAILURE,
ThreadPool.Names.GENERIC,
false,
false,
ExtensionRequest::new,
((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request)))
);
transportService.registerRequestHandler(
REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS,
ThreadPool.Names.GENERIC,
Expand Down Expand Up @@ -317,22 +329,6 @@ public String executor() {
}
}

/**
* Handles a {@link RegisterTransportActionsRequest}.
*
* @param transportActionsRequest The request to handle.
* @return A {@link ExtensionBooleanResponse} indicating success.
* @throws Exception if the request is not handled properly.
*/
TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) throws Exception {
/*
* TODO: https://github.com/opensearch-project/opensearch-sdk-java/issues/107
* Register these new Transport Actions with ActionModule
* and add support for NodeClient to recognise these actions when making transport calls.
*/
return new ExtensionBooleanResponse(true);
}

/**
* Handles an {@link ExtensionRequest}.
*
Expand All @@ -341,18 +337,49 @@ TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActions
* @throws Exception if the request is not handled properly.
*/
TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) throws Exception {
// Read enum
switch (extensionRequest.getRequestType()) {
case REQUEST_EXTENSION_CLUSTER_STATE:
return new ClusterStateResponse(clusterService.getClusterName(), clusterService.state(), false);
case REQUEST_EXTENSION_LOCAL_NODE:
return new LocalNodeResponse(clusterService);
LocalNodeResponse localNodeResponse = new LocalNodeResponse(clusterService);
return localNodeResponse;
case REQUEST_EXTENSION_CLUSTER_SETTINGS:
return new ClusterSettingsResponse(clusterService);
ClusterSettingsResponse clusterSettingsResponse = new ClusterSettingsResponse(clusterService);
return clusterSettingsResponse;
case REQUEST_EXTENSION_ACTION_LISTENER_ON_FAILURE:
return handleExtensionActionListenerOnFailureRequest(extensionRequest.getFailureException());
default:
throw new Exception("Handler not present for the provided request");
}
}

private ExtensionBooleanResponse handleExtensionActionListenerOnFailureRequest(String failureException) throws Exception {
try {
listener.onFailure(new Exception(failureException));
return new ExtensionBooleanResponse(true);
} catch (Exception e) {
logger.error(e.getMessage());
throw e;
}
}

/**
* Handles a {@link RegisterTransportActionsRequest}.
*
* @param transportActionsRequest The request to handle.
* @return A {@link ExtensionBooleanResponse} indicating success.
* @throws Exception if the request is not handled properly.
*/
TransportResponse handleRegisterTransportActionsRequest(RegisterTransportActionsRequest transportActionsRequest) throws Exception {
/*
* TODO: https://github.com/opensearch-project/opensearch-sdk-java/issues/107
* Register these new Transport Actions with ActionModule
* and add support for NodeClient to recognise these actions when making transport calls.
*/
return new ExtensionBooleanResponse(true);
}

public void onIndexModule(IndexModule indexModule) throws UnknownHostException {
for (DiscoveryNode extensionNode : extensionIdMap.values()) {
onIndexModule(indexModule, extensionNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,13 @@ public void testHandleExtensionRequest() throws Exception {
ExtensionRequest localNodeRequest = new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE);
assertEquals(LocalNodeResponse.class, extensionsOrchestrator.handleExtensionRequest(localNodeRequest).getClass());

ExtensionRequest listenerFailureRequest = new ExtensionRequest(
ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_ACTION_LISTENER_ON_FAILURE,
"Test failure"
);
assertEquals(ExtensionBooleanResponse.class, extensionsOrchestrator.handleExtensionRequest(listenerFailureRequest).getClass());
assertEquals("Test failure", extensionsOrchestrator.listener.getExceptionList().get(0).getMessage());

ExtensionRequest exceptionRequest = new ExtensionRequest(ExtensionsOrchestrator.RequestType.GET_SETTINGS);
Exception exception = expectThrows(Exception.class, () -> extensionsOrchestrator.handleExtensionRequest(exceptionRequest));
assertEquals("Handler not present for the provided request", exception.getMessage());
Expand All @@ -440,9 +447,8 @@ public void testRegisterHandler() throws Exception {
Collections.emptySet()
)
);

extensionsOrchestrator.initializeServicesAndRestHandler(restController, mockTransportService, clusterService);
verify(mockTransportService, times(5)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any());
verify(mockTransportService, times(6)).registerRequestHandler(anyString(), anyString(), anyBoolean(), anyBoolean(), any(), any());
}

private static class Example implements NamedWriteable {
Expand Down

0 comments on commit edde6a4

Please sign in to comment.