diff --git a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java index 8488743051b6b..258d91bc27219 100644 --- a/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java +++ b/server/src/main/java/org/opensearch/extensions/ExtensionsManager.java @@ -32,7 +32,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionModule; -import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; import org.opensearch.cluster.node.DiscoveryNode; @@ -246,8 +246,8 @@ private void registerRequestHandler() { ThreadPool.Names.GENERIC, false, false, - ExtensionRequest::new, - ((request, channel, task) -> channel.sendResponse(handleExtensionRequest(request))) + ClusterStateRequest::new, + ((request, channel, task) -> channel.sendResponse(extensionTransportActionsHandler.handleClusterStateRequest(request))) ); transportService.registerRequestHandler( REQUEST_EXTENSION_CLUSTER_SETTINGS, @@ -436,8 +436,6 @@ public String executor() { */ TransportResponse handleExtensionRequest(ExtensionRequest extensionRequest) throws Exception { switch (extensionRequest.getRequestType()) { - case REQUEST_EXTENSION_CLUSTER_STATE: - return new ClusterStateResponse(clusterService.getClusterName(), clusterService.state(), false); case REQUEST_EXTENSION_CLUSTER_SETTINGS: return new ClusterSettingsResponse(clusterService); case REQUEST_EXTENSION_ENVIRONMENT_SETTINGS: diff --git a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java index 3fba76b7d3c59..fc322a61c8a09 100644 --- a/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java +++ b/server/src/main/java/org/opensearch/extensions/action/ExtensionTransportActionsHandler.java @@ -13,6 +13,9 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.ActionModule; import org.opensearch.action.ActionModule.DynamicActionRegistry; +import org.opensearch.action.admin.cluster.state.ClusterStateAction; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.client.node.NodeClient; import org.opensearch.common.io.stream.StreamInput; @@ -31,6 +34,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -310,4 +314,42 @@ public String executor() { } return extensionActionResponse; } + + /** + * Handles a {@link ClusterStateRequest}. + * + * @param request The request to handle. + * @return A {@link ClusterStateResponse}. + */ + public ClusterStateResponse handleClusterStateRequest(ClusterStateRequest request) { + final CompletableFuture inProgressFuture = new CompletableFuture<>(); + client.execute(ClusterStateAction.INSTANCE, request, new ActionListener() { + @Override + public void onResponse(ClusterStateResponse response) { + inProgressFuture.complete(response); + } + + @Override + public void onFailure(Exception exp) { + logger.debug("Cluster State Request failed", exp); + inProgressFuture.completeExceptionally(exp); + } + }); + try { + return inProgressFuture.orTimeout(ExtensionsManager.EXTENSION_REQUEST_WAIT_TIMEOUT, TimeUnit.SECONDS).get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.debug("No response from extension to request."); + } + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException) e.getCause(); + } else if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } else { + throw new RuntimeException(e.getCause()); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } diff --git a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java index 040c799efce2a..2f7be30129262 100644 --- a/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java +++ b/server/src/test/java/org/opensearch/extensions/ExtensionsManagerTests.java @@ -42,7 +42,6 @@ import org.junit.Before; import org.opensearch.Version; import org.opensearch.action.ActionModule; -import org.opensearch.action.admin.cluster.state.ClusterStateResponse; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterSettingsResponse; import org.opensearch.env.EnvironmentSettingsResponse; @@ -524,9 +523,6 @@ public void testHandleExtensionRequest() throws Exception { ExtensionsManager extensionsManager = new ExtensionsManager(settings, extensionDir); initialize(extensionsManager); - ExtensionRequest clusterStateRequest = new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_CLUSTER_STATE); - assertEquals(ClusterStateResponse.class, extensionsManager.handleExtensionRequest(clusterStateRequest).getClass()); - ExtensionRequest clusterSettingRequest = new ExtensionRequest(ExtensionsManager.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS); assertEquals(ClusterSettingsResponse.class, extensionsManager.handleExtensionRequest(clusterSettingRequest).getClass()); diff --git a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java index 3fea207cbb700..d2eadc2c9c286 100644 --- a/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java +++ b/server/src/test/java/org/opensearch/extensions/action/ExtensionTransportActionsHandlerTests.java @@ -13,6 +13,8 @@ import org.opensearch.Version; import org.opensearch.action.ActionModule; import org.opensearch.action.ActionModule.DynamicActionRegistry; +import org.opensearch.action.admin.cluster.state.ClusterStateAction; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.support.ActionFilters; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.node.DiscoveryNode; @@ -43,7 +45,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static java.util.Collections.emptyMap; @@ -95,7 +102,7 @@ public void setup() throws Exception { Version.fromString("3.0.0"), Collections.emptyList() ); - client = new NoOpNodeClient(this.getTestName()); + client = spy(new NoOpNodeClient(this.getTestName())); ActionModule mockActionModule = mock(ActionModule.class); DynamicActionRegistry dynamicActionRegistry = new DynamicActionRegistry(); dynamicActionRegistry.registerUnmodifiableActionMap(Collections.emptyMap()); @@ -176,4 +183,11 @@ public void testSendTransportRequestToExtension() throws InterruptedException { expectThrows(NodeNotConnectedException.class, () -> extensionTransportActionsHandler.sendTransportRequestToExtension(request)); } + + public void testHandleClusterStateRequest() throws Exception { + ClusterStateRequest clusterStateRequest = new ClusterStateRequest().all(); + // The client is no-op so doesn't actually execute this request, but we can verify it's called + extensionTransportActionsHandler.handleClusterStateRequest(clusterStateRequest); + verify(client, times(1)).execute(eq(ClusterStateAction.INSTANCE), eq(clusterStateRequest), any()); + } }