From 22dccf4af0c41f07625ffe6857fd17df80b0bf79 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Tue, 29 Oct 2024 22:23:39 -0700 Subject: [PATCH] Integrate with SdkClient and stub implementations Signed-off-by: Daniel Widdis --- .../opensearch/sdk/BulkDataObjectRequest.java | 10 ++++ .../org/opensearch/sdk/DataObjectRequest.java | 10 +++- .../java/org/opensearch/sdk/SdkClient.java | 40 ++++++++++++++- .../org/opensearch/sdk/SdkClientDelegate.java | 44 ++++++++++++++-- .../sdk/client/LocalClusterIndicesClient.java | 16 +++++- .../org/opensearch/sdk/SdkClientTests.java | 51 +++++++++++++++++++ .../ml/sdkclient/DDBOpenSearchClient.java | 12 +++++ .../sdkclient/RemoteClusterIndicesClient.java | 12 +++++ 8 files changed, 186 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/org/opensearch/sdk/BulkDataObjectRequest.java b/common/src/main/java/org/opensearch/sdk/BulkDataObjectRequest.java index 2c2c6775e8..e50c3f88a5 100644 --- a/common/src/main/java/org/opensearch/sdk/BulkDataObjectRequest.java +++ b/common/src/main/java/org/opensearch/sdk/BulkDataObjectRequest.java @@ -22,6 +22,7 @@ public class BulkDataObjectRequest { private final List requests = new ArrayList<>(); private final Set indices = new HashSet<>(); private String globalIndex; + private String globalTenantId; public BulkDataObjectRequest() {} @@ -51,6 +52,14 @@ public Set getIndices() { return Collections.unmodifiableSet(indices); } + /** + * Return the global tenant id to be applied to all requests + * @return the globalTenantId + */ + public String globalTenantId() { + return this.globalTenantId; + } + /** * Add the given request to the {@link BulkDataObjectRequest} * @param request The request to add @@ -71,6 +80,7 @@ public BulkDataObjectRequest add(DataObjectRequest request) { } else { indices.add(request.index()); } + request.tenantId(globalTenantId); requests.add(request); return this; } diff --git a/common/src/main/java/org/opensearch/sdk/DataObjectRequest.java b/common/src/main/java/org/opensearch/sdk/DataObjectRequest.java index 192ba9c967..477203f4a8 100644 --- a/common/src/main/java/org/opensearch/sdk/DataObjectRequest.java +++ b/common/src/main/java/org/opensearch/sdk/DataObjectRequest.java @@ -12,7 +12,7 @@ public abstract class DataObjectRequest { private String index; private final String id; - private final String tenantId; + private String tenantId; /** * Instantiate this request with an index and id. @@ -60,6 +60,14 @@ public String tenantId() { return this.tenantId; } + /** + * Sets the tenant id + * @param tenantId The new tenant id to set + */ + public void tenantId(String tenantId) { + this.tenantId = tenantId; + } + /** * Returns whether the subclass can be used in a {@link BulkDataObjectRequest} * @return whether the subclass is a write request diff --git a/common/src/main/java/org/opensearch/sdk/SdkClient.java b/common/src/main/java/org/opensearch/sdk/SdkClient.java index 97bfd93dd3..8931b9b623 100644 --- a/common/src/main/java/org/opensearch/sdk/SdkClient.java +++ b/common/src/main/java/org/opensearch/sdk/SdkClient.java @@ -20,10 +20,10 @@ import static org.opensearch.sdk.SdkClientUtils.unwrapAndConvertToException; public class SdkClient { - + private final SdkClientDelegate delegate; private final Boolean isMultiTenancyEnabled; - + public SdkClient(SdkClientDelegate delegate, Boolean multiTenancy) { this.delegate = delegate; this.isMultiTenancyEnabled = multiTenancy; @@ -167,6 +167,42 @@ public DeleteDataObjectResponse deleteDataObject(DeleteDataObjectRequest request } } + /** + * Perform a bulk request for multiple data objects/documents in potentially multiple tables/indices. + * + * @param request A request identifying the data object to delete + * @param executor the executor to use for asynchronous execution + * @return A completion stage encapsulating the response or exception + */ + public CompletionStage bulkDataObjectAsync(BulkDataObjectRequest request, Executor executor) { + validateTenantId(request.globalTenantId()); + return delegate.bulkDataObjectAsync(request, executor, isMultiTenancyEnabled); + } + + /** + * Perform a bulk request for multiple data objects/documents in potentially multiple tables/indices. + * + * @param request A request identifying the data object to delete + * @return A completion stage encapsulating the response or exception + */ + public CompletionStage bulkDataObjectAsync(BulkDataObjectRequest request) { + return bulkDataObjectAsync(request, ForkJoinPool.commonPool()); + } + + /** + * Perform a bulk request for multiple data objects/documents in potentially multiple tables/indices. + * + * @param request A request identifying the data object to delete + * @return A response on success. Throws unchecked exceptions or {@link OpenSearchException} wrapping the cause on checked exception. + */ + public BulkDataObjectResponse bulkDataObject(BulkDataObjectRequest request) { + try { + return bulkDataObjectAsync(request).toCompletableFuture().join(); + } catch (CompletionException e) { + throw ExceptionsHelper.convertToRuntime(unwrapAndConvertToException(e)); + } + } + /** * Search for data objects/documents in a table/index. * diff --git a/common/src/main/java/org/opensearch/sdk/SdkClientDelegate.java b/common/src/main/java/org/opensearch/sdk/SdkClientDelegate.java index 7e0ab0a3ee..5b036f08ba 100644 --- a/common/src/main/java/org/opensearch/sdk/SdkClientDelegate.java +++ b/common/src/main/java/org/opensearch/sdk/SdkClientDelegate.java @@ -20,7 +20,11 @@ public interface SdkClientDelegate { * @param isMultiTenancyEnabled whether multitenancy is enabled * @return A completion stage encapsulating the response or exception */ - CompletionStage putDataObjectAsync(PutDataObjectRequest request, Executor executor, Boolean isMultiTenancyEnabled); + CompletionStage putDataObjectAsync( + PutDataObjectRequest request, + Executor executor, + Boolean isMultiTenancyEnabled + ); /** * Read/Get a data object/document from a table/index. @@ -30,7 +34,11 @@ public interface SdkClientDelegate { * @param isMultiTenancyEnabled whether multitenancy is enabled * @return A completion stage encapsulating the response or exception */ - CompletionStage getDataObjectAsync(GetDataObjectRequest request, Executor executor, Boolean isMultiTenancyEnabled); + CompletionStage getDataObjectAsync( + GetDataObjectRequest request, + Executor executor, + Boolean isMultiTenancyEnabled + ); /** * Update a data object/document in a table/index. @@ -40,7 +48,11 @@ public interface SdkClientDelegate { * @param isMultiTenancyEnabled whether multitenancy is enabled * @return A completion stage encapsulating the response or exception */ - CompletionStage updateDataObjectAsync(UpdateDataObjectRequest request, Executor executor, Boolean isMultiTenancyEnabled); + CompletionStage updateDataObjectAsync( + UpdateDataObjectRequest request, + Executor executor, + Boolean isMultiTenancyEnabled + ); /** * Delete a data object/document from a table/index. @@ -50,7 +62,25 @@ public interface SdkClientDelegate { * @param isMultiTenancyEnabled whether multitenancy is enabled * @return A completion stage encapsulating the response or exception */ - CompletionStage deleteDataObjectAsync(DeleteDataObjectRequest request, Executor executor, Boolean isMultiTenancyEnabled); + CompletionStage deleteDataObjectAsync( + DeleteDataObjectRequest request, + Executor executor, + Boolean isMultiTenancyEnabled + ); + + /** + * Perform a bulk request for multiple data objects/documents in potentially multiple tables/indices. + * + * @param request A request identifying the requests to process in bulk + * @param executor the executor to use for asynchronous execution + * @param isMultiTenancyEnabled whether multitenancy is enabled + * @return A completion stage encapsulating the response or exception + */ + CompletionStage bulkDataObjectAsync( + BulkDataObjectRequest request, + Executor executor, + Boolean isMultiTenancyEnabled + ); /** * Search for data objects/documents in a table/index. @@ -60,5 +90,9 @@ public interface SdkClientDelegate { * @param isMultiTenancyEnabled whether multitenancy is enabled * @return A completion stage encapsulating the response or exception */ - CompletionStage searchDataObjectAsync(SearchDataObjectRequest request, Executor executor, Boolean isMultiTenancyEnabled); + CompletionStage searchDataObjectAsync( + SearchDataObjectRequest request, + Executor executor, + Boolean isMultiTenancyEnabled + ); } diff --git a/common/src/main/java/org/opensearch/sdk/client/LocalClusterIndicesClient.java b/common/src/main/java/org/opensearch/sdk/client/LocalClusterIndicesClient.java index aae00a2dc2..a2c9fb69a6 100644 --- a/common/src/main/java/org/opensearch/sdk/client/LocalClusterIndicesClient.java +++ b/common/src/main/java/org/opensearch/sdk/client/LocalClusterIndicesClient.java @@ -53,6 +53,8 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.query.TermQueryBuilder; import org.opensearch.ml.common.CommonValue; +import org.opensearch.sdk.BulkDataObjectRequest; +import org.opensearch.sdk.BulkDataObjectResponse; import org.opensearch.sdk.DeleteDataObjectRequest; import org.opensearch.sdk.DeleteDataObjectResponse; import org.opensearch.sdk.GetDataObjectRequest; @@ -201,7 +203,9 @@ public CompletionStage deleteDataObjectAsync( return CompletableFuture.supplyAsync(() -> AccessController.doPrivileged((PrivilegedAction) () -> { try { log.info("Deleting {} from {}", request.id(), request.index()); - DeleteResponse deleteResponse = client.delete(new DeleteRequest(request.index(), request.id()).setRefreshPolicy(IMMEDIATE)).actionGet(); + DeleteResponse deleteResponse = client + .delete(new DeleteRequest(request.index(), request.id()).setRefreshPolicy(IMMEDIATE)) + .actionGet(); log.info("Deletion status for id {}: {}", deleteResponse.getId(), deleteResponse.getResult()); return DeleteDataObjectResponse.builder().id(deleteResponse.getId()).parser(createParser(deleteResponse)).build(); } catch (IOException e) { @@ -214,6 +218,16 @@ public CompletionStage deleteDataObjectAsync( }), executor); } + @Override + public CompletionStage bulkDataObjectAsync( + BulkDataObjectRequest request, + Executor executor, + Boolean isMultiTenancyEnabled + ) { + // TODO Complete this + return null; + } + @Override public CompletionStage searchDataObjectAsync( SearchDataObjectRequest request, diff --git a/common/src/test/java/org/opensearch/sdk/SdkClientTests.java b/common/src/test/java/org/opensearch/sdk/SdkClientTests.java index 7af96cd370..ead2c88bcd 100644 --- a/common/src/test/java/org/opensearch/sdk/SdkClientTests.java +++ b/common/src/test/java/org/opensearch/sdk/SdkClientTests.java @@ -53,6 +53,10 @@ public class SdkClientTests { @Mock private DeleteDataObjectResponse deleteResponse; @Mock + private BulkDataObjectRequest bulkRequest; + @Mock + private BulkDataObjectResponse bulkResponse; + @Mock private SearchDataObjectRequest searchRequest; @Mock private SearchDataObjectResponse searchResponse; @@ -67,6 +71,7 @@ public void setUp() { when(getRequest.tenantId()).thenReturn(TENANT_ID); when(updateRequest.tenantId()).thenReturn(TENANT_ID); when(deleteRequest.tenantId()).thenReturn(TENANT_ID); + when(bulkRequest.globalTenantId()).thenReturn(TENANT_ID); when(searchRequest.tenantId()).thenReturn(TENANT_ID); sdkClientImpl = spy(new SdkClientDelegate() { @@ -106,6 +111,15 @@ public CompletionStage deleteDataObjectAsync( return CompletableFuture.completedFuture(deleteResponse); } + @Override + public CompletionStage bulkDataObjectAsync( + BulkDataObjectRequest request, + Executor executor, + Boolean isMultiTenancyEnabled + ) { + return CompletableFuture.completedFuture(bulkResponse); + } + @Override public CompletionStage searchDataObjectAsync( SearchDataObjectRequest request, @@ -266,6 +280,43 @@ public void testDeleteDataObjectInterrupted() { assertTrue(Thread.interrupted()); verify(sdkClientImpl).deleteDataObjectAsync(any(DeleteDataObjectRequest.class), any(Executor.class), anyBoolean()); } + + + @Test + public void testBulkDataObjectSuccess() { + assertEquals(bulkResponse, sdkClient.bulkDataObject(bulkRequest)); + verify(sdkClientImpl).bulkDataObjectAsync(any(BulkDataObjectRequest.class), any(Executor.class), anyBoolean()); + } + + @Test + public void testBulkDataObjectNullTenantId() { + when(bulkRequest.globalTenantId()).thenReturn(null); + assertThrows(IllegalArgumentException.class, () -> sdkClient.bulkDataObject(bulkRequest)); + } + + @Test + public void testBulkDataObjectException() { + when(sdkClientImpl.bulkDataObjectAsync(any(BulkDataObjectRequest.class), any(Executor.class), anyBoolean())) + .thenReturn(CompletableFuture.failedFuture(testException)); + OpenSearchStatusException exception = assertThrows(OpenSearchStatusException.class, () -> { + sdkClient.bulkDataObject(bulkRequest); + }); + assertEquals(testException, exception); + assertFalse(Thread.interrupted()); + verify(sdkClientImpl).bulkDataObjectAsync(any(BulkDataObjectRequest.class), any(Executor.class), anyBoolean()); + } + + @Test + public void testBulkDataObjectInterrupted() { + when(sdkClientImpl.bulkDataObjectAsync(any(BulkDataObjectRequest.class), any(Executor.class), anyBoolean())) + .thenReturn(CompletableFuture.failedFuture(interruptedException)); + OpenSearchException exception = assertThrows(OpenSearchException.class, () -> { + sdkClient.bulkDataObject(bulkRequest); + }); + assertEquals(interruptedException, exception.getCause()); + assertTrue(Thread.interrupted()); + verify(sdkClientImpl).bulkDataObjectAsync(any(BulkDataObjectRequest.class), any(Executor.class), anyBoolean()); + } @Test public void testSearchDataObjectSuccess() { diff --git a/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java b/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java index c2621606f9..2003998fd2 100644 --- a/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java +++ b/plugin/src/main/java/org/opensearch/ml/sdkclient/DDBOpenSearchClient.java @@ -38,6 +38,8 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.ml.sdkclient.util.JsonTransformer; +import org.opensearch.sdk.BulkDataObjectRequest; +import org.opensearch.sdk.BulkDataObjectResponse; import org.opensearch.sdk.DeleteDataObjectRequest; import org.opensearch.sdk.DeleteDataObjectResponse; import org.opensearch.sdk.GetDataObjectRequest; @@ -352,6 +354,16 @@ public CompletionStage deleteDataObjectAsync( }), executor); } + @Override + public CompletionStage bulkDataObjectAsync( + BulkDataObjectRequest request, + Executor executor, + Boolean isMultiTenancyEnabled + ) { + // TODO Complete this + return null; + } + /** * DDB data needs to be synced with opensearch cluster. {@link RemoteClusterIndicesClient} will then be used to * search data in opensearch cluster. diff --git a/plugin/src/main/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClient.java b/plugin/src/main/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClient.java index 73ce589249..4927657023 100644 --- a/plugin/src/main/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClient.java +++ b/plugin/src/main/java/org/opensearch/ml/sdkclient/RemoteClusterIndicesClient.java @@ -56,6 +56,8 @@ import org.opensearch.index.query.MatchPhraseQueryBuilder; import org.opensearch.ml.common.CommonValue; import org.opensearch.ml.sdkclient.util.JsonTransformer; +import org.opensearch.sdk.BulkDataObjectRequest; +import org.opensearch.sdk.BulkDataObjectResponse; import org.opensearch.sdk.DeleteDataObjectRequest; import org.opensearch.sdk.DeleteDataObjectResponse; import org.opensearch.sdk.GetDataObjectRequest; @@ -226,6 +228,16 @@ public CompletionStage deleteDataObjectAsync( }), executor); } + @Override + public CompletionStage bulkDataObjectAsync( + BulkDataObjectRequest request, + Executor executor, + Boolean isMultiTenancyEnabled + ) { + // TODO Complete this + return null; + } + @Override public CompletionStage searchDataObjectAsync( SearchDataObjectRequest request,