Skip to content

Commit

Permalink
Refactor transportservice request (opensearch-project#167)
Browse files Browse the repository at this point in the history
* issue opensearch-project#28

Signed-off-by: mloufra <[email protected]>

* Update the lastest coomit

Signed-off-by: mloufra <[email protected]>

* Rename the method and fix the conflict

Signed-off-by: mloufra <[email protected]>

* fix merge conflict

Signed-off-by: mloufra <[email protected]>

* Add code coverage report

Signed-off-by: mloufra <[email protected]>

* Rebase the lastest commit

Signed-off-by: mloufra <[email protected]>

* update the lastest commit

Signed-off-by: mloufra <[email protected]>

* Refactor transportService request

Signed-off-by: mloufra <[email protected]>

* delete transportService object change

Signed-off-by: mloufra <[email protected]>

Signed-off-by: mloufra <[email protected]>
  • Loading branch information
mloufra authored and kokibas committed Mar 17, 2023
1 parent 0f74e30 commit 54d8e95
Showing 1 changed file with 35 additions and 36 deletions.
71 changes: 35 additions & 36 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.extensions.EnvironmentSettingsRequest;
import org.opensearch.extensions.AddSettingsUpdateConsumerRequest;
import org.opensearch.extensions.UpdateSettingsRequest;
import org.opensearch.extensions.ExtensionsOrchestrator.RequestType;
import org.opensearch.extensions.ExtensionRequest;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.index.IndicesModuleRequest;
Expand All @@ -44,6 +45,8 @@
import org.opensearch.sdk.handlers.ExtensionStringResponseHandler;
import org.opensearch.sdk.handlers.OpensearchRequestHandler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;
import org.opensearch.transport.TransportSettings;

Expand Down Expand Up @@ -314,24 +317,32 @@ public void sendRegisterCustomSettingsRequest(TransportService transportService)
}
}

private void sendGenericRequestWithExceptionHandling(
TransportService transportService,
RequestType requestType,
String orchestratorNameString,
TransportResponseHandler<? extends TransportResponse> responseHandler
) {
logger.info("Sending " + requestType + " request to OpenSearch");
try {
transportService.sendRequest(opensearchNode, orchestratorNameString, new ExtensionRequest(requestType), responseHandler);
} catch (Exception e) {
logger.info("Failed to send " + requestType + " request to OpenSearch", e);
}
}

/**
* Requests the cluster state from OpenSearch. The result will be handled by a {@link ClusterStateResponseHandler}.
*
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendClusterStateRequest(TransportService transportService) {
logger.info("Sending Cluster State request to OpenSearch");
ClusterStateResponseHandler clusterStateResponseHandler = new ClusterStateResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE),
clusterStateResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster State request to OpenSearch", e);
}
sendGenericRequestWithExceptionHandling(
transportService,
ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_STATE,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_STATE,
new ClusterStateResponseHandler()
);
}

/**
Expand All @@ -340,18 +351,12 @@ public void sendClusterStateRequest(TransportService transportService) {
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendClusterSettingsRequest(TransportService transportService) {
logger.info("Sending Cluster Settings request to OpenSearch");
ClusterSettingsResponseHandler clusterSettingsResponseHandler = new ClusterSettingsResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_CLUSTER_SETTINGS,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_CLUSTER_SETTINGS),
clusterSettingsResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster Settings request to OpenSearch", e);
}
sendGenericRequestWithExceptionHandling(
transportService,
ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE,
ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE,
new ClusterSettingsResponseHandler()
);
}

/**
Expand All @@ -360,18 +365,12 @@ public void sendClusterSettingsRequest(TransportService transportService) {
* @param transportService The TransportService defining the connection to OpenSearch.
*/
public void sendLocalNodeRequest(TransportService transportService) {
logger.info("Sending Local Node request to OpenSearch");
LocalNodeResponseHandler localNodeResponseHandler = new LocalNodeResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE,
new ExtensionRequest(ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE),
localNodeResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Cluster Settings request to OpenSearch", e);
}
sendGenericRequestWithExceptionHandling(
transportService,
ExtensionsOrchestrator.RequestType.REQUEST_EXTENSION_LOCAL_NODE,
ExtensionsOrchestrator.REQUEST_EXTENSION_LOCAL_NODE,
new LocalNodeResponseHandler()
);
}

/**
Expand Down

0 comments on commit 54d8e95

Please sign in to comment.