From 49ab274ff9c3b22d6219a4913f444fd77e2f2cda Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Sun, 22 Jan 2023 20:54:58 -0800 Subject: [PATCH] RestHighLevelClient Wrapper Signed-off-by: Daniel Widdis --- .../java/org/opensearch/sdk/SDKClient.java | 272 ++++++++++++++++-- .../org/opensearch/sdk/TestSDKClient.java | 83 ++++-- 2 files changed, 312 insertions(+), 43 deletions(-) diff --git a/src/main/java/org/opensearch/sdk/SDKClient.java b/src/main/java/org/opensearch/sdk/SDKClient.java index f5873edd..b1274ed8 100644 --- a/src/main/java/org/opensearch/sdk/SDKClient.java +++ b/src/main/java/org/opensearch/sdk/SDKClient.java @@ -27,9 +27,36 @@ import org.apache.hc.core5.http.nio.ssl.TlsStrategy; import org.apache.hc.core5.reactor.ssl.TlsDetails; import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Cancellable; +import org.opensearch.client.Client; +import org.opensearch.client.ClusterAdminClient; +import org.opensearch.client.ClusterClient; +import org.opensearch.client.GetAliasesResponse; +import org.opensearch.client.IndicesClient; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Requests; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.CreateIndexResponse; +import org.opensearch.client.indices.GetMappingsRequest; +import org.opensearch.client.indices.GetMappingsResponse; +import org.opensearch.client.indices.PutMappingRequest; +import org.opensearch.client.indices.rollover.RolloverRequest; +import org.opensearch.client.indices.rollover.RolloverResponse; import org.opensearch.client.json.jackson.JacksonJsonpMapper; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.transport.OpenSearchTransport; @@ -43,16 +70,24 @@ public class SDKClient implements Closeable { private OpenSearchClient javaClient; private RestClient restClient; - private RestHighLevelClient highLevelClient; + private RestHighLevelClient sdkRestClient; - private RestClientBuilder builder(String hostAddress, int port) { + /** + * Create and configure a RestClientBuilder + * + * @param hostAddress The address the client should connect to + * @param port The port the client should connect to + * @return An instance of the builder + */ + private static RestClientBuilder builder(String hostAddress, int port) { RestClientBuilder builder = RestClient.builder(new HttpHost(hostAddress, port)); builder.setStrictDeprecationMode(true); builder.setHttpClientConfigCallback(httpClientBuilder -> { try { final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() .setSslContext(SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build()) - // disable the certificate since our cluster currently just uses the default security configuration + // disable the certificate since our cluster currently just uses the default security + // configuration .setHostnameVerifier(NoopHostnameVerifier.INSTANCE) // See please https://issues.apache.org/jira/browse/HTTPCLIENT-2219 .setTlsDetailsFactory(new Factory() { @@ -76,9 +111,11 @@ public TlsDetails create(final SSLEngine sslEngine) { /** * Creates OpenSearchClient for SDK. It also creates a restClient as a wrapper around Java OpenSearchClient + * * @param hostAddress The address of OpenSearch cluster, client can connect to - * @param port The port of OpenSearch cluster - * @return The SDKClient implementation of OpenSearchClient. The user is responsible for calling {@link #doCloseJavaClient()} when finished with the client + * @param port The port of OpenSearch cluster + * @return The SDKClient implementation of OpenSearchClient. The user is responsible for calling + * {@link #doCloseJavaClient()} when finished with the client */ public OpenSearchClient initializeJavaClient(String hostAddress, int port) { RestClientBuilder builder = builder(hostAddress, port); @@ -99,18 +136,23 @@ public OpenSearchClient initializeJavaClient(String hostAddress, int port) { } /** - * @deprecated Provided for compatibility with existing plugins to permit migration. New development should not use this client - * Creates High Level Rest Client for SDK. + * Initializes a SDK Rest Client wrapping the {@link RestHighLevelClient}. + *

+ * The purpose of this client is to provide a drop-in replacement for the syntax of the {@link Client} + * implementation in existing plugins with a minimum of code changes. + *

+ * Do not use this client for new development. + * * @param hostAddress The address of OpenSearch cluster, client can connect to - * @param port The port of OpenSearch cluster - * @return The SDKClient implementation of RestHighLevelClient. The user is responsible for calling {@link #doCloseHighLevelClient()} when finished with the client + * @param port The port of OpenSearch cluster + * @return The SDKClient implementation of RestHighLevelClient. The user is responsible for calling + * {@link #doCloseHighLevelClient()} when finished with the client + * @deprecated Provided for compatibility with existing plugins to permit migration. Use + * {@link #initializeJavaClient} for new development. */ @Deprecated - public RestHighLevelClient initializeRestClient(String hostAddress, int port) { - RestClientBuilder builder = builder(hostAddress, port); - - highLevelClient = new RestHighLevelClient(builder); - return highLevelClient; + public SDKRestClient initializeRestClient(String hostAddress, int port) { + return new SDKRestClient(new RestHighLevelClient(builder(hostAddress, port))); } /** @@ -130,8 +172,8 @@ public void doCloseJavaClient() throws IOException { * @throws IOException if closing the highLevelClient fails */ public void doCloseHighLevelClient() throws IOException { - if (highLevelClient != null) { - highLevelClient.close(); + if (sdkRestClient != null) { + sdkRestClient.close(); } } @@ -140,4 +182,202 @@ public void close() throws IOException { doCloseJavaClient(); doCloseHighLevelClient(); } + + /** + * Wraps an internal {@link RestHighLevelClient} using method signatures expected by {@link Client} and {@link org.opensearch.client.AdminClient} syntax, providing a drop-in replacement in existing plugins with a minimum of code changes. + *

+ * While some {@link Client} interface methods are implemented here, the interface is intentionally not fully implemented as it is intended to be deprecated. + *

+ * Do not use this client for new development. + * + * @deprecated Use {@link org.opensearch.client.opensearch.OpenSearchClient}. + * @see OpenSearch Issue 5424 + */ + @Deprecated + public static class SDKRestClient implements Closeable { + + private final RestHighLevelClient rhlc; + + /** + * Instantiate this class wrapping a {@link RestHighLevelClient}. + * + * @param restHighLevelClient The client to wrap. + */ + public SDKRestClient(RestHighLevelClient restHighLevelClient) { + this.rhlc = restHighLevelClient; + } + + /** + * A client allowing to perform actions/operations against the cluster. + */ + public SDKClusterAdminClient cluster() { + return new SDKClusterAdminClient(rhlc.cluster()); + } + + /** + * A client allowing to perform actions/operations against the indices. + */ + public SDKIndicesClient indices() { + return new SDKIndicesClient(rhlc.indices()); + } + + /** + * Index a document associated with a given index. + *

+ * The id is optional, if it is not provided, one will be generated automatically. + * + * @param request The index request + * @param listener A listener to be notified with a result + * @see Requests#indexRequest(String) + */ + public void index(IndexRequest request, ActionListener listener) { + rhlc.indexAsync(request, RequestOptions.DEFAULT, listener); + } + + /** + * Gets the document that was indexed from an index with an id. + * + * @param request The get request + * @param listener A listener to be notified with a result + * @see Requests#getRequest(String) + */ + public void get(GetRequest request, ActionListener listener) { + rhlc.getAsync(request, RequestOptions.DEFAULT, listener); + } + + /** + * Deletes a document from the index based on the index, and id. + * + * @param request The delete request + * @param listener A listener to be notified with a result + * @see Requests#deleteRequest(String) + */ + public void delete(DeleteRequest request, ActionListener listener) { + rhlc.deleteAsync(request, RequestOptions.DEFAULT, listener); + } + + /** + * Search across one or more indices with a query. + * + * @param request The search request + * @param listener A listener to be notified of the result + * @see Requests#searchRequest(String...) + */ + public void search(SearchRequest request, ActionListener listener) { + rhlc.searchAsync(request, RequestOptions.DEFAULT, listener); + } + + @Override + public void close() throws IOException { + rhlc.close(); + } + } + + /** + * Wraps an internal {@link ClusterAdminClient}, providing a drop-in replacement in existing plugins with a minimum of code changes. + *

+ * Do not use this client for new development. + */ + public static class SDKClusterAdminClient { + + private final ClusterClient clusterClient; + + /** + * Instantiate this class using a {@link ClusterClient}. + * + * @param clusterClient The client to wrap + */ + public SDKClusterAdminClient(ClusterClient clusterClient) { + this.clusterClient = clusterClient; + } + + // TODO: Implement state() + // https://github.com/opensearch-project/opensearch-sdk-java/issues/354 + + } + + /** + * Wraps an internal {@link IndicesClient}, providing a drop-in replacement in existing plugins with a minimum of code changes. + *

+ * Do not use this client for new development. + */ + public static class SDKIndicesClient { + + private final IndicesClient indicesClient; + + /** + * Instantiate this class wrapping an {@link IndicesClient}. + * + * @param indicesClient The client to wrap + */ + public SDKIndicesClient(IndicesClient indicesClient) { + this.indicesClient = indicesClient; + } + + /** + * Asynchronously creates an index using the Create Index API. + * + * @param createIndexRequest the request + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable create(CreateIndexRequest createIndexRequest, ActionListener listener) { + return indicesClient.createAsync(createIndexRequest, RequestOptions.DEFAULT, listener); + } + + /** + * Asynchronously deletes an index using the Delete Index API. + * + * @param deleteIndexRequest the request + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable delete(DeleteIndexRequest deleteIndexRequest, ActionListener listener) { + return indicesClient.deleteAsync(deleteIndexRequest, RequestOptions.DEFAULT, listener); + } + + /** + * Asynchronously updates the mappings on an index using the Put Mapping API. + * + * @param putMappingRequest the request + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable putMapping(PutMappingRequest putMappingRequest, ActionListener listener) { + return this.indicesClient.putMappingAsync(putMappingRequest, RequestOptions.DEFAULT, listener); + } + + /** + * Asynchronously retrieves the mappings on an index on indices using the Get Mapping API. + * + * @param getMappingsRequest the request + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable getMapping(GetMappingsRequest getMappingsRequest, ActionListener listener) { + return this.indicesClient.getMappingAsync(getMappingsRequest, RequestOptions.DEFAULT, listener); + } + + /** + * Asynchronously rolls over an index using the Rollover Index API. + * + * @param rolloverRequest the request + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable rolloverIndex(RolloverRequest rolloverRequest, ActionListener listener) { + return this.indicesClient.rolloverAsync(rolloverRequest, RequestOptions.DEFAULT, listener); + } + + /** + * Asynchronously gets one or more aliases using the Get Index Aliases API. + * + * @param getAliasesRequest the request + * @param listener the listener to be notified upon request completion + * @return cancellable that may be used to cancel the request + */ + public Cancellable getAliases(GetAliasesRequest getAliasesRequest, ActionListener listener) { + return this.indicesClient.getAliasAsync(getAliasesRequest, RequestOptions.DEFAULT, listener); + } + } } diff --git a/src/test/java/org/opensearch/sdk/TestSDKClient.java b/src/test/java/org/opensearch/sdk/TestSDKClient.java index 19c9e7cf..dacacdfa 100644 --- a/src/test/java/org/opensearch/sdk/TestSDKClient.java +++ b/src/test/java/org/opensearch/sdk/TestSDKClient.java @@ -10,52 +10,81 @@ package org.opensearch.sdk; import org.junit.jupiter.api.Test; -import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.client.Cancellable; +import org.opensearch.client.indices.CreateIndexRequest; +import org.opensearch.client.indices.GetMappingsRequest; +import org.opensearch.client.indices.PutMappingRequest; +import org.opensearch.client.indices.rollover.RolloverRequest; import org.opensearch.client.opensearch.OpenSearchClient; -import org.opensearch.client.opensearch.indices.Alias; -import org.opensearch.client.opensearch.indices.CreateIndexRequest; +import org.opensearch.client.opensearch.cluster.OpenSearchClusterClient; +import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient; +import org.opensearch.sdk.SDKClient.SDKClusterAdminClient; +import org.opensearch.sdk.SDKClient.SDKIndicesClient; +import org.opensearch.sdk.SDKClient.SDKRestClient; import org.opensearch.test.OpenSearchTestCase; -import java.net.ConnectException; - +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +@SuppressWarnings("deprecation") public class TestSDKClient extends OpenSearchTestCase { SDKClient sdkClient = new SDKClient(); @Test public void testCreateJavaClient() throws Exception { - - OpenSearchClient testClient = sdkClient.initializeJavaClient("localhost", 9200); - assertInstanceOf(OpenSearchClient.class, testClient); - - assertThrows( - ConnectException.class, - () -> testClient.indices() - .create( - new CreateIndexRequest.Builder().index("my-index") - .aliases("foo", new Alias.Builder().isWriteIndex(true).build()) - .build() - ) - ); + OpenSearchClient javaClient = sdkClient.initializeJavaClient("localhost", 9200); + assertInstanceOf(OpenSearchIndicesClient.class, javaClient.indices()); + assertInstanceOf(OpenSearchClusterClient.class, javaClient.cluster()); sdkClient.doCloseJavaClient(); } @Test - public void testCreateHighLevelRestClient() throws Exception { - RestHighLevelClient testClient = sdkClient.initializeRestClient("localhost", 9200); + public void testCreateRestClient() throws Exception { + SDKRestClient restClient = sdkClient.initializeRestClient("localhost", 9200); + assertInstanceOf(SDKIndicesClient.class, restClient.indices()); + assertInstanceOf(SDKClusterAdminClient.class, restClient.cluster()); - // Using the package name here as Java uses package name if the filename from different packages are same - org.opensearch.client.indices.CreateIndexRequest createIndexRequest = new org.opensearch.client.indices.CreateIndexRequest( - "my-index" - ); + sdkClient.doCloseHighLevelClient(); + } + + @Test + public void testSDKRestClient() throws Exception { + SDKRestClient restClient = sdkClient.initializeRestClient("localhost", 9200); - assertThrows(ConnectException.class, () -> testClient.indices().create(createIndexRequest, RequestOptions.DEFAULT)); + // Would really prefer to mock/verify the method calls but they are final + assertDoesNotThrow(() -> restClient.index(new IndexRequest(), ActionListener.wrap(r -> {}, e -> {}))); + assertDoesNotThrow(() -> restClient.get(new GetRequest(), ActionListener.wrap(r -> {}, e -> {}))); + assertDoesNotThrow(() -> restClient.delete(new DeleteRequest(), ActionListener.wrap(r -> {}, e -> {}))); + assertDoesNotThrow(() -> restClient.search(new SearchRequest(), ActionListener.wrap(r -> {}, e -> {}))); - sdkClient.doCloseHighLevelClient(); + restClient.close(); + } + + @Test + public void testSDKIndicesClient() throws Exception { + SDKRestClient restClient = sdkClient.initializeRestClient("localhost", 9200); + SDKIndicesClient indicesClient = restClient.indices(); + + // Would really prefer to mock/verify the method calls but the IndicesClient class is final + assertInstanceOf(Cancellable.class, indicesClient.create(new CreateIndexRequest(""), ActionListener.wrap(r -> {}, e -> {}))); + assertInstanceOf(Cancellable.class, indicesClient.delete(new DeleteIndexRequest(), ActionListener.wrap(r -> {}, e -> {}))); + assertInstanceOf(Cancellable.class, indicesClient.getMapping(new GetMappingsRequest(), ActionListener.wrap(r -> {}, e -> {}))); + assertInstanceOf(Cancellable.class, indicesClient.putMapping(new PutMappingRequest(), ActionListener.wrap(r -> {}, e -> {}))); + assertInstanceOf( + Cancellable.class, + indicesClient.rolloverIndex(new RolloverRequest("", ""), ActionListener.wrap(r -> {}, e -> {})) + ); + assertInstanceOf(Cancellable.class, indicesClient.getAliases(new GetAliasesRequest(), ActionListener.wrap(r -> {}, e -> {}))); + restClient.close(); } }