From 588a1b2f9a9f6dfc27ac042588120e5591802c74 Mon Sep 17 00:00:00 2001 From: Aayush Kataria Date: Mon, 13 Dec 2021 17:19:15 -0800 Subject: [PATCH 01/11] Encryption Bulk API --- .../CosmosEncryptionAsyncContainer.java | 117 +++++++++++++++++- .../EncryptionAsyncApiCrudTest.java | 55 +++++++- .../ImplementationBridgeHelpers.java | 5 + .../models/CosmosBulkExecutionOptions.java | 51 +++++++- .../models/BulkProcessingOptionsTest.java | 2 +- 5 files changed, 223 insertions(+), 7 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java index 522757d7a13b1..b0a8e706b88fa 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java @@ -19,6 +19,10 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.ItemDeserializer; import com.azure.cosmos.implementation.batch.ItemBatchOperation; +import com.azure.cosmos.implementation.batch.ItemBulkOperation; +import com.azure.cosmos.models.CosmosItemOperation; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; import com.azure.cosmos.implementation.guava25.base.Preconditions; import com.azure.cosmos.implementation.query.Transformer; import com.azure.cosmos.models.CosmosBatch; @@ -35,10 +39,10 @@ import com.azure.cosmos.models.SqlParameter; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.CosmosPatchItemRequestOptions; import com.azure.cosmos.implementation.patch.PatchOperation; import com.azure.cosmos.implementation.patch.PatchOperationCore; import com.azure.cosmos.implementation.patch.PatchOperationType; -import com.azure.cosmos.models.CosmosPatchItemRequestOptions; import com.azure.cosmos.util.CosmosPagedFlux; import com.azure.cosmos.util.UtilBridgeInternal; import com.fasterxml.jackson.databind.JsonNode; @@ -78,6 +82,7 @@ public class CosmosEncryptionAsyncContainer { ImplementationBridgeHelpers.CosmosBatchOperationResultHelper.CosmosBatchOperationResultAccessor cosmosBatchOperationResultAccessor; ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.CosmosBatchRequestOptionsAccessor cosmosBatchRequestOptionsAccessor; ImplementationBridgeHelpers.CosmosPatchOperationsHelper.CosmosPatchOperationsAccessor cosmosPatchOperationsAccessor; + ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.CosmosBulkExecutionOptionsAccessor cosmosBulkExecutionOptionsAccessor; CosmosEncryptionAsyncContainer(CosmosAsyncContainer container, CosmosEncryptionAsyncClient cosmosEncryptionAsyncClient) { @@ -100,6 +105,7 @@ public class CosmosEncryptionAsyncContainer { this.cosmosBatchOperationResultAccessor = ImplementationBridgeHelpers.CosmosBatchOperationResultHelper.getCosmosBatchOperationResultAccessor(); this.cosmosBatchRequestOptionsAccessor = ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.getCosmosBatchRequestOptionsAccessor(); this.cosmosPatchOperationsAccessor = ImplementationBridgeHelpers.CosmosPatchOperationsHelper.getCosmosPatchOperationsAccessor(); + this.cosmosBulkExecutionOptionsAccessor = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor(); } EncryptionProcessor getEncryptionProcessor() { @@ -1087,6 +1093,110 @@ private Mono executeCosmosBatchHelper(CosmosBatch encrypted }); } + /** + * Executes flux of operations in Bulk. + * + * @param The context for the bulk processing. + * @param operations Flux of operation which will be executed by this container. + * + * @return A Flux of {@link CosmosBulkOperationResponse} which contains operation and it's response or exception. + *

+ * To create a operation which can be executed here, use {@link com.azure.cosmos.models.CosmosBulkOperations}. For eg. + * for a upsert operation use {@link com.azure.cosmos.models.CosmosBulkOperations#getUpsertItemOperation(Object, PartitionKey)} + *

+ *

+ * We can get the corresponding operation using {@link CosmosBulkOperationResponse#getOperation()} and + * it's response using {@link CosmosBulkOperationResponse#getResponse()}. If the operation was executed + * successfully, the value returned by {@link com.azure.cosmos.models.CosmosBulkItemResponse#isSuccessStatusCode()} will be true. To get + * actual status use {@link com.azure.cosmos.models.CosmosBulkItemResponse#getStatusCode()}. + *

+ * To check if the operation had any exception, use {@link CosmosBulkOperationResponse#getException()} to + * get the exception. + */ + @Beta(value = Beta.SinceVersion.V1, warningText = + Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) + public Flux> executeBulkOperations( + Flux operations) { + + return this.executeBulkOperations(operations, new CosmosBulkExecutionOptions()); + } + + /** + * Executes flux of operations in Bulk. + * + * @param The context for the bulk processing. + * + * @param operations Flux of operation which will be executed by this container. + * @param bulkOptions Options that apply for this Bulk request which specifies options regarding execution like + * concurrency, batching size, interval and context. + * + * @return A Flux of {@link CosmosBulkOperationResponse} which contains operation and it's response or exception. + *

+ * To create a operation which can be executed here, use {@link com.azure.cosmos.models.CosmosBulkOperations}. For eg. + * for a upsert operation use {@link com.azure.cosmos.models.CosmosBulkOperations#getUpsertItemOperation(Object, PartitionKey)} + *

+ *

+ * We can get the corresponding operation using {@link CosmosBulkOperationResponse#getOperation()} and + * it's response using {@link CosmosBulkOperationResponse#getResponse()}. If the operation was executed + * successfully, the value returned by {@link com.azure.cosmos.models.CosmosBulkItemResponse#isSuccessStatusCode()} will be true. To get + * actual status use {@link com.azure.cosmos.models.CosmosBulkItemResponse#getStatusCode()}. + *

+ * To check if the operation had any exception, use {@link CosmosBulkOperationResponse#getException()} to + * get the exception. + */ + @Beta(value = Beta.SinceVersion.V1, warningText = + Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING) + public Flux> executeBulkOperations( + Flux operations, + CosmosBulkExecutionOptions bulkOptions) { + if (bulkOptions == null) { + bulkOptions = new CosmosBulkExecutionOptions(); + } + + List> monoList = new ArrayList<>(); + operations.collectList().block().forEach(cosmosItemOperation -> { + Mono cosmosItemOperationMono = null; + ObjectNode objectNode = EncryptionUtils.getSimpleObjectMapper().valueToTree(cosmosItemOperation.getItem()); + assert cosmosItemOperation instanceof ItemBulkOperation; + cosmosItemOperationMono = + this.encryptionProcessor.encryptObjectNode(objectNode).map(encryptedItem -> { + return new ItemBulkOperation( + cosmosItemOperation.getOperationType(), + cosmosItemOperation.getId(), + cosmosItemOperation.getPartitionKeyValue(), + ((ItemBulkOperation)cosmosItemOperation).getRequestOptions(), + encryptedItem, + cosmosItemOperation.getContext() + ); + }); + monoList.add(cosmosItemOperationMono); + }); + + Mono> encryptedOperationListMono = + Flux.mergeSequential(monoList).collectList(); + + final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions; + setRequestHeaders(cosmosBulkExecutionOptions); + return this.container.executeBulkOperations(encryptedOperationListMono.flatMapMany(Flux::fromIterable), cosmosBulkExecutionOptions); + + +// return executeBulkOperationsHelper(encryptedOperationListMono.flatMapMany(Flux::fromIterable), cosmosBulkExecutionOptions, false); + } + +// private Flux> executeBulkOperationsHelper(Flux operations, +// CosmosBulkExecutionOptions bulkOptions, +// boolean isRetry) { +// List> monoResponseList = new ArrayList<>(); +// return this.container.executeBulkOperations(operations, bulkOptions) +// +// }); +// +//// return this.container.executeBulkOperations(operations, bulkOptions).flatMap(cosmosBukOperationsResponse -> +//// Mono.just(cosmosBukOperationsResponse).subscribeOn() +//// }) +// } + + private void setRequestHeaders(CosmosItemRequestOptions requestOptions) { this.cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true"); this.cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid()); @@ -1107,6 +1217,11 @@ private void setRequestHeaders(CosmosBatchRequestOptions requestOptions) { this.cosmosBatchRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid()); } + private void setRequestHeaders(CosmosBulkExecutionOptions requestOptions) { + this.cosmosBulkExecutionOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true"); + this.cosmosBulkExecutionOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid()); + } + boolean isIncorrectContainerRid(CosmosException cosmosException) { return cosmosException.getStatusCode() == HttpConstants.StatusCodes.BADREQUEST && cosmosException.getResponseHeaders().get(HttpConstants.HttpHeaders.SUB_STATUS) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java b/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java index d0c7304b22e07..204a83ff19d98 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java @@ -19,6 +19,11 @@ import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.EncryptionKeyWrapMetadata; +import com.azure.cosmos.models.CosmosItemOperation; +import com.azure.cosmos.models.CosmosBulkItemResponse; +import com.azure.cosmos.models.CosmosBulkOperations; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.SqlParameter; @@ -33,10 +38,12 @@ import org.testng.annotations.Factory; import org.testng.annotations.Ignore; import org.testng.annotations.Test; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Instant; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.*; @@ -48,7 +55,7 @@ public class EncryptionAsyncApiCrudTest extends TestSuiteBase { CosmosEncryptionAsyncContainer cosmosEncryptionAsyncContainer; CosmosEncryptionAsyncDatabase cosmosEncryptionAsyncDatabase; - @Factory(dataProvider = "clientBuildersWithSessionConsistency") + @Factory(dataProvider = "clientBuilders") public EncryptionAsyncApiCrudTest(CosmosClientBuilder clientBuilder) { super(clientBuilder); } @@ -663,6 +670,52 @@ public void batchExecutionWithOptionsApi() { validateResponse(batchResponse.getResults().get(3).getItem(EncryptionPojo.class), createPojo); } + private int getTotalRequest() { + int countRequest = new Random().nextInt(100) + 120; + logger.info("Total count of request for this test case: " + countRequest); + + return countRequest; + } + + @Test(groups = {"encryption"}, timeOut = TIMEOUT) + public void bulkExecution() { + int totalRequest = getTotalRequest(); + Flux cosmosItemOperationsFlux = Flux.range(0, 1).map(i -> { + String itemId = UUID.randomUUID().toString(); + EncryptionPojo createPojo = getItem(itemId); + + return CosmosBulkOperations.getCreateItemOperation(createPojo, new PartitionKey(createPojo.getMypk())); + }); + + CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions(); + Flux> responseFlux = this.cosmosEncryptionAsyncContainer. + executeBulkOperations(cosmosItemOperationsFlux, cosmosBulkExecutionOptions); + + AtomicInteger processedDoc = new AtomicInteger(0); + responseFlux + .flatMap((CosmosBulkOperationResponse cosmosBulkOperationResponse) -> { + + processedDoc.incrementAndGet(); + + CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } + + assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); + assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); + assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull(); + assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + + return Mono.just(cosmosBulkItemResponse); + }).blockLast(); + + assertThat(processedDoc.get()).isEqualTo(totalRequest); + } + @Test(groups = {"encryption"}, timeOut = TIMEOUT) public void crudOnDifferentOverload() { List actualProperties = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index b552684badbf0..dc199ebe9b617 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -255,6 +255,11 @@ CosmosBulkExecutionOptions setTargetedMicroBatchRetryRate( int getMaxMicroBatchConcurrency(CosmosBulkExecutionOptions options); Duration getMaxMicroBatchInterval(CosmosBulkExecutionOptions options); + + CosmosBulkExecutionOptions setHeader(CosmosBulkExecutionOptions cosmosBulkExecutionOptions, + String name, String value); + + Map getHeader(CosmosBulkExecutionOptions cosmosBulkExecutionOptions); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java index 99b2c8b2e7725..4e3a056039314 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java @@ -8,6 +8,8 @@ import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; /** * Encapsulates options that can be specified for operations used in Bulk execution. @@ -23,33 +25,39 @@ public final class CosmosBulkExecutionOptions { private final Object legacyBatchScopedContext; private final CosmosBulkExecutionThresholdsState thresholds; private OperationContextAndListenerTuple operationContextAndListenerTuple; + private Map customOptions; /** * Constructor * @param thresholdsState thresholds */ - CosmosBulkExecutionOptions(Object legacyBatchScopedContext, CosmosBulkExecutionThresholdsState thresholdsState) { + CosmosBulkExecutionOptions(Object legacyBatchScopedContext, CosmosBulkExecutionThresholdsState thresholdsState, Map customOptions) { this.legacyBatchScopedContext = legacyBatchScopedContext; if (thresholdsState == null) { this.thresholds = new CosmosBulkExecutionThresholdsState(); } else { this.thresholds = thresholdsState; } + if (customOptions == null) { + this.customOptions = new HashMap<>(); + } else { + this.customOptions = customOptions; + } } /** * Constructor * @param thresholdsState thresholds */ - public CosmosBulkExecutionOptions(CosmosBulkExecutionThresholdsState thresholdsState) { - this(null, thresholdsState); + public CosmosBulkExecutionOptions(CosmosBulkExecutionThresholdsState thresholdsState, Map customOptions) { + this(null, thresholdsState, customOptions); } /** * Constructor */ public CosmosBulkExecutionOptions() { - this(null); + this(null, null); } /** @@ -180,6 +188,30 @@ void setOperationContextAndListenerTuple(OperationContextAndListenerTuple operat this.operationContextAndListenerTuple = operationContextAndListenerTuple; } + /** + * Sets the custom bulk request option value by key + * + * @param name a string representing the custom option's name + * @param value a string representing the custom option's value + * @return the CosmosBulkExecutionOptions. + */ + CosmosBulkExecutionOptions setHeader(String name, String value) { + if (this.customOptions == null) { + this.customOptions = new HashMap<>(); + } + this.customOptions.put(name, value); + return this; + } + + /** + * Gets the custom batch request options + * + * @return Map of custom request options + */ + Map getHeaders() { + return this.customOptions; + } + /////////////////////////////////////////////////////////////////////////////////////////// // the following helper/accessor only helps to access this class outside of this package.// /////////////////////////////////////////////////////////////////////////////////////////// @@ -247,6 +279,17 @@ public CosmosBulkExecutionOptions setTargetedMicroBatchRetryRate( return options.setTargetedMicroBatchRetryRate(minRetryRate, maxRetryRate); } + @Override + public CosmosBulkExecutionOptions setHeader(CosmosBulkExecutionOptions cosmosBulkExecutionOptions, + String name, String value) { + return cosmosBulkExecutionOptions.setHeader(name, value); + } + + @Override + public Map getHeader(CosmosBulkExecutionOptions cosmosBulkExecutionOptions) { + return cosmosBulkExecutionOptions.getHeaders(); + } + }); } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/models/BulkProcessingOptionsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/models/BulkProcessingOptionsTest.java index 4a718ce2963da..cc3c27773c1d4 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/models/BulkProcessingOptionsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/models/BulkProcessingOptionsTest.java @@ -53,7 +53,7 @@ public void thresholdsInstanceCanBePassedAcrossBulkExecutionOptionsInstances() { .getBulkExecutionThresholdsAccessor() .getPartitionScopeThresholds(thresholds); CosmosBulkExecutionOptions optionsWithThresholds = - new CosmosBulkExecutionOptions(null, thresholds); + new CosmosBulkExecutionOptions(null, thresholds, null); assertThat(thresholds).isSameAs(optionsWithThresholds.getThresholdsState()); assertThat(partitionScopeThresholdsMap) From 25497e6bd912d125408c097a127669a3b99272f2 Mon Sep 17 00:00:00 2001 From: Aayush Kataria Date: Thu, 20 Jan 2022 12:28:00 -0800 Subject: [PATCH 02/11] Encryption Bulk Api --- .../CosmosEncryptionAsyncContainer.java | 70 ++++--- .../EncryptionAsyncApiCrudTest.java | 195 +++++++++++++++++- .../ImplementationBridgeHelpers.java | 47 +++-- .../models/CosmosBulkExecutionOptions.java | 13 ++ .../cosmos/models/CosmosBulkItemResponse.java | 8 +- .../models/CosmosBulkOperationResponse.java | 18 +- 6 files changed, 308 insertions(+), 43 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java index b0a8e706b88fa..0a0a907949dbe 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java @@ -23,6 +23,7 @@ import com.azure.cosmos.models.CosmosItemOperation; import com.azure.cosmos.models.CosmosBulkExecutionOptions; import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkItemResponse; import com.azure.cosmos.implementation.guava25.base.Preconditions; import com.azure.cosmos.implementation.query.Transformer; import com.azure.cosmos.models.CosmosBatch; @@ -83,6 +84,8 @@ public class CosmosEncryptionAsyncContainer { ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.CosmosBatchRequestOptionsAccessor cosmosBatchRequestOptionsAccessor; ImplementationBridgeHelpers.CosmosPatchOperationsHelper.CosmosPatchOperationsAccessor cosmosPatchOperationsAccessor; ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.CosmosBulkExecutionOptionsAccessor cosmosBulkExecutionOptionsAccessor; + ImplementationBridgeHelpers.CosmosBulkItemResponseHelper.CosmosBulkItemResponseAccessor cosmosBulkItemResponseAccessor; + ImplementationBridgeHelpers.CosmosBulkOperationResponseHelper.CosmosBulkOperationResponseAccessor cosmosBulkOperationResponseAccessor; CosmosEncryptionAsyncContainer(CosmosAsyncContainer container, CosmosEncryptionAsyncClient cosmosEncryptionAsyncClient) { @@ -106,6 +109,8 @@ public class CosmosEncryptionAsyncContainer { this.cosmosBatchRequestOptionsAccessor = ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.getCosmosBatchRequestOptionsAccessor(); this.cosmosPatchOperationsAccessor = ImplementationBridgeHelpers.CosmosPatchOperationsHelper.getCosmosPatchOperationsAccessor(); this.cosmosBulkExecutionOptionsAccessor = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor(); + this.cosmosBulkItemResponseAccessor = ImplementationBridgeHelpers.CosmosBulkItemResponseHelper.getCosmosBulkItemResponseAccessor(); + this.cosmosBulkOperationResponseAccessor = ImplementationBridgeHelpers.CosmosBulkOperationResponseHelper.getCosmosBulkOperationResponseAccessor(); } EncryptionProcessor getEncryptionProcessor() { @@ -1156,19 +1161,32 @@ public Flux> executeBulkOperati List> monoList = new ArrayList<>(); operations.collectList().block().forEach(cosmosItemOperation -> { Mono cosmosItemOperationMono = null; - ObjectNode objectNode = EncryptionUtils.getSimpleObjectMapper().valueToTree(cosmosItemOperation.getItem()); - assert cosmosItemOperation instanceof ItemBulkOperation; - cosmosItemOperationMono = - this.encryptionProcessor.encryptObjectNode(objectNode).map(encryptedItem -> { - return new ItemBulkOperation( + if (cosmosItemOperation.getItem() != null) { + ObjectNode objectNode = EncryptionUtils.getSimpleObjectMapper().valueToTree(cosmosItemOperation.getItem()); + assert cosmosItemOperation instanceof ItemBulkOperation; + cosmosItemOperationMono = + this.encryptionProcessor.encryptObjectNode(objectNode).map(encryptedItem -> { + return new ItemBulkOperation( + cosmosItemOperation.getOperationType(), + cosmosItemOperation.getId(), + cosmosItemOperation.getPartitionKeyValue(), + ((ItemBulkOperation)cosmosItemOperation).getRequestOptions(), + encryptedItem, + cosmosItemOperation.getContext() + ); + }); + } else { + cosmosItemOperationMono = Mono.just( + new ItemBulkOperation( cosmosItemOperation.getOperationType(), cosmosItemOperation.getId(), cosmosItemOperation.getPartitionKeyValue(), ((ItemBulkOperation)cosmosItemOperation).getRequestOptions(), - encryptedItem, + null, cosmosItemOperation.getContext() - ); - }); + ) + ); + } monoList.add(cosmosItemOperationMono); }); @@ -1177,24 +1195,30 @@ public Flux> executeBulkOperati final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions; setRequestHeaders(cosmosBulkExecutionOptions); - return this.container.executeBulkOperations(encryptedOperationListMono.flatMapMany(Flux::fromIterable), cosmosBulkExecutionOptions); - -// return executeBulkOperationsHelper(encryptedOperationListMono.flatMapMany(Flux::fromIterable), cosmosBulkExecutionOptions, false); + return executeBulkOperationsHelper(encryptedOperationListMono.flatMapMany(Flux::fromIterable), cosmosBulkExecutionOptions, false); } -// private Flux> executeBulkOperationsHelper(Flux operations, -// CosmosBulkExecutionOptions bulkOptions, -// boolean isRetry) { -// List> monoResponseList = new ArrayList<>(); -// return this.container.executeBulkOperations(operations, bulkOptions) -// -// }); -// -//// return this.container.executeBulkOperations(operations, bulkOptions).flatMap(cosmosBukOperationsResponse -> -//// Mono.just(cosmosBukOperationsResponse).subscribeOn() -//// }) -// } + private Flux> executeBulkOperationsHelper(Flux operations, + CosmosBulkExecutionOptions bulkOptions, + boolean isRetry) { + return this.container.executeBulkOperations(operations, bulkOptions).flatMap(cosmosBulkOperationResponse -> { + + CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + ObjectNode objectNode = this.cosmosBulkItemResponseAccessor.getResourceObject(cosmosBulkItemResponse); + + if(objectNode != null) { + encryptionProcessor.decryptJsonNode(objectNode).subscribe(jsonNode -> { + this.cosmosBulkItemResponseAccessor.setResourceObject(cosmosBulkItemResponse, (ObjectNode) jsonNode); + this.cosmosBulkOperationResponseAccessor.setResponse(cosmosBulkOperationResponse, cosmosBulkItemResponse); + }); + } + + CosmosBulkOperationResponse res = (CosmosBulkOperationResponse) cosmosBulkOperationResponse; + + return Flux.just(res); + }); + } private void setRequestHeaders(CosmosItemRequestOptions requestOptions) { diff --git a/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java b/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java index 204a83ff19d98..e5186e544436c 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java @@ -678,9 +678,9 @@ private int getTotalRequest() { } @Test(groups = {"encryption"}, timeOut = TIMEOUT) - public void bulkExecution() { + public void bulkExecution_createItem() { int totalRequest = getTotalRequest(); - Flux cosmosItemOperationsFlux = Flux.range(0, 1).map(i -> { + Flux cosmosItemOperationsFlux = Flux.range(0, totalRequest).map(i -> { String itemId = UUID.randomUUID().toString(); EncryptionPojo createPojo = getItem(itemId); @@ -716,6 +716,197 @@ public void bulkExecution() { assertThat(processedDoc.get()).isEqualTo(totalRequest); } + @Test(groups = {"encryption"}, timeOut = TIMEOUT) + public void bulkExecution_upsertItem() { + int totalRequest = getTotalRequest(); + + Flux cosmosItemOperationsFlux = Flux.range(0, totalRequest).map(i -> { + String itemId = UUID.randomUUID().toString(); + EncryptionPojo createPojo = getItem(itemId); + + return CosmosBulkOperations.getUpsertItemOperation(createPojo, new PartitionKey(createPojo.getMypk())); + }); + + CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions(); + + Flux> responseFlux = this.cosmosEncryptionAsyncContainer + .executeBulkOperations(cosmosItemOperationsFlux, cosmosBulkExecutionOptions); + + AtomicInteger processedDoc = new AtomicInteger(0); + responseFlux + .flatMap(cosmosBulkOperationResponse -> { + + processedDoc.incrementAndGet(); + + CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } + + assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); + assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); + assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull(); + assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + + return Mono.just(cosmosBulkItemResponse); + }).blockLast(); + + assertThat(processedDoc.get()).isEqualTo(totalRequest); + } + + @Test(groups = {"encryption"}, timeOut = TIMEOUT) + public void bulkExecution_deleteItem() { + int totalRequest = Math.min(getTotalRequest(), 20); + + List cosmosItemOperations = new ArrayList<>(); + for (int i = 0; i < totalRequest; i++) { + String itemId = UUID.randomUUID().toString(); + + // use i as a identifier for re check. + EncryptionPojo createPojo = getItem(itemId); + + cosmosItemOperations.add(CosmosBulkOperations.getCreateItemOperation(createPojo, new PartitionKey(createPojo.getMypk()))); +// indexCosmosItemOperationMap.put(createPojo.getId(), cosmosItemOperation); + } + + createItemsAndVerify(cosmosItemOperations); + + Flux deleteCosmosItemOperationsFlux = Flux.fromIterable(cosmosItemOperations).map( cosmosItemOperation -> { + EncryptionPojo encryptionPojo = cosmosItemOperation.getItem(); + return CosmosBulkOperations.getDeleteItemOperation(encryptionPojo.getId(), cosmosItemOperation.getPartitionKeyValue()); + }); + + CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions(); + + Flux> responseFlux = this.cosmosEncryptionAsyncContainer + .executeBulkOperations(deleteCosmosItemOperationsFlux, cosmosBulkExecutionOptions); + + AtomicInteger processedDoc = new AtomicInteger(0); + responseFlux + .flatMap(cosmosBulkOperationResponse -> { + + processedDoc.incrementAndGet(); + + CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } + + assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.NO_CONTENT.code()); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); + assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); + assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull(); + assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + + return Mono.just(cosmosBulkItemResponse); + }).blockLast(); + + assertThat(processedDoc.get()).isEqualTo(totalRequest); + } + + @Test(groups = {"encryption"}, timeOut = TIMEOUT) + public void bulkExecution_readItem() { + int totalRequest = getTotalRequest(); + + List cosmosItemOperations = new ArrayList<>(); +// Map indexCosmosItemOperationMap = new HashMap<>(); + + for (int i = 0; i < totalRequest; i++) { + String itemId = UUID.randomUUID().toString(); + + // use i as a identifier for re check. + EncryptionPojo createPojo = getItem(itemId); + + cosmosItemOperations.add(CosmosBulkOperations.getCreateItemOperation(createPojo, new PartitionKey(createPojo.getMypk()))); +// indexCosmosItemOperationMap.put(createPojo.getId(), cosmosItemOperation); + } + + createItemsAndVerify(cosmosItemOperations); + +// Flux cosmosItemOperationsFlux = Flux.range(0, totalRequest).map(i -> { +// String itemId = UUID.randomUUID().toString(); +// EncryptionPojo createPojo = getItem(itemId); +// +// return CosmosBulkOperations.getCreateItemOperation(createPojo, new PartitionKey(createPojo.getMypk())); +// }); + + Flux readCosmosItemOperationsFlux = Flux.fromIterable(cosmosItemOperations).map( cosmosItemOperation -> { + EncryptionPojo encryptionPojo = cosmosItemOperation.getItem(); + return CosmosBulkOperations.getReadItemOperation(encryptionPojo.getId(), cosmosItemOperation.getPartitionKeyValue()); + }); + + CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions(); + + Flux> responseFlux = this.cosmosEncryptionAsyncContainer + .executeBulkOperations(readCosmosItemOperationsFlux, cosmosBulkExecutionOptions); + + AtomicInteger processedDoc = new AtomicInteger(0); + responseFlux + .flatMap(cosmosBulkOperationResponse -> { + + processedDoc.incrementAndGet(); + + CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } + + assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.OK.code()); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); + assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); + assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull(); + assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + + + + return Mono.just(cosmosBulkItemResponse); + }).blockLast(); + + assertThat(processedDoc.get()).isEqualTo(totalRequest); + } + + private void createItemsAndVerify(List cosmosItemOperations) { + CosmosBulkExecutionOptions cosmosBulkExecutionOptions = new CosmosBulkExecutionOptions(); + + Flux> createResponseFlux = this.cosmosEncryptionAsyncContainer. + executeBulkOperations(Flux.fromIterable(cosmosItemOperations), cosmosBulkExecutionOptions); + + HashSet distinctIndex = new HashSet<>(); + AtomicInteger processedDoc = new AtomicInteger(0); + + createResponseFlux.flatMap(cosmosBulkOperationResponse -> { + processedDoc.incrementAndGet(); + CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse(); + if (cosmosBulkOperationResponse.getException() != null) { + logger.error("Bulk operation failed", cosmosBulkOperationResponse.getException()); + fail(cosmosBulkOperationResponse.getException().toString()); + } + assertThat(cosmosBulkItemResponse.getStatusCode()).isEqualTo(HttpResponseStatus.CREATED.code()); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isGreaterThan(0); + assertThat(cosmosBulkItemResponse.getCosmosDiagnostics().toString()).isNotNull(); + assertThat(cosmosBulkItemResponse.getSessionToken()).isNotNull(); + assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); + assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + + // Using id as list index like we assigned + EncryptionPojo encryptionPojo = cosmosBulkItemResponse.getItem(EncryptionPojo.class); + distinctIndex.add(encryptionPojo.getId()); + + return Mono.just(cosmosBulkItemResponse); + }).blockLast(); + + // Verify if all are distinct and count is equal to request count. + assertThat(processedDoc.get()).isEqualTo(cosmosItemOperations.size()); + assertThat(distinctIndex.size()).isEqualTo(cosmosItemOperations.size()); + } + @Test(groups = {"encryption"}, timeOut = TIMEOUT) public void crudOnDifferentOverload() { List actualProperties = new ArrayList<>(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index dc199ebe9b617..1a12572c2cedb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -15,22 +15,7 @@ import com.azure.cosmos.implementation.patch.PatchOperation; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple; -import com.azure.cosmos.models.CosmosBatch; -import com.azure.cosmos.models.CosmosBatchItemRequestOptions; -import com.azure.cosmos.models.CosmosBatchOperationResult; -import com.azure.cosmos.models.CosmosBatchRequestOptions; -import com.azure.cosmos.models.CosmosBatchResponse; -import com.azure.cosmos.models.CosmosBulkExecutionOptions; -import com.azure.cosmos.models.CosmosBulkExecutionThresholdsState; -import com.azure.cosmos.models.CosmosBulkItemResponse; -import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; -import com.azure.cosmos.models.CosmosContainerProperties; -import com.azure.cosmos.models.CosmosItemRequestOptions; -import com.azure.cosmos.models.CosmosItemResponse; -import com.azure.cosmos.models.CosmosPatchOperations; -import com.azure.cosmos.models.CosmosQueryRequestOptions; -import com.azure.cosmos.models.FeedResponse; -import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.*; import com.azure.cosmos.util.CosmosPagedFlux; import com.fasterxml.jackson.databind.node.ObjectNode; import org.slf4j.Logger; @@ -260,6 +245,8 @@ CosmosBulkExecutionOptions setHeader(CosmosBulkExecutionOptions cosmosBulkExecut String name, String value); Map getHeader(CosmosBulkExecutionOptions cosmosBulkExecutionOptions); + + Map getCustomOptions(CosmosBulkExecutionOptions cosmosBulkExecutionOptions); } } @@ -670,6 +657,31 @@ public interface CosmosBatchAccessor { } } + public static final class CosmosBulkOperationResponseHelper { + private static CosmosBulkOperationResponseAccessor accessor; + + private CosmosBulkOperationResponseHelper() { + + } + + static { + ensureClassLoaded(CosmosBulkOperationResponse.class); + } + + public static CosmosBulkOperationResponseAccessor getCosmosBulkOperationResponseAccessor() { return accessor; } + + public static void setCosmosBulkOperationResponseAccessor(CosmosBulkOperationResponseAccessor accessor) { + CosmosBulkOperationResponseHelper.accessor = accessor; + } + + public interface CosmosBulkOperationResponseAccessor { + + void setResponse(CosmosBulkOperationResponse cosmosBulkOperationResponse, + CosmosBulkItemResponse cosmosBulkItemResponse); + } + + } + public static final class CosmosBulkItemResponseHelper { private static CosmosBulkItemResponseAccessor accessor; @@ -690,6 +702,9 @@ public static void setCosmosBulkItemResponseAccessor(CosmosBulkItemResponseAcces public interface CosmosBulkItemResponseAccessor { ObjectNode getResourceObject(CosmosBulkItemResponse cosmosBulkItemResponse); + + void setResourceObject(CosmosBulkItemResponse cosmosBulkItemResponse, + ObjectNode objectNode); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java index 4e3a056039314..383b146d7f233 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkExecutionOptions.java @@ -48,11 +48,20 @@ public final class CosmosBulkExecutionOptions { /** * Constructor * @param thresholdsState thresholds + * @param customOptions customOptions */ public CosmosBulkExecutionOptions(CosmosBulkExecutionThresholdsState thresholdsState, Map customOptions) { this(null, thresholdsState, customOptions); } + /** + * Constructor + * @param thresholdsState thresholds + */ + public CosmosBulkExecutionOptions(CosmosBulkExecutionThresholdsState thresholdsState) { + this(null, thresholdsState, null); + } + /** * Constructor */ @@ -290,6 +299,10 @@ public Map getHeader(CosmosBulkExecutionOptions cosmosBulkExecut return cosmosBulkExecutionOptions.getHeaders(); } + @Override + public Map getCustomOptions(CosmosBulkExecutionOptions cosmosBulkExecutionOptions) { + return cosmosBulkExecutionOptions.customOptions; + } }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkItemResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkItemResponse.java index f162804daecf8..f7d4fd35630a6 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkItemResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkItemResponse.java @@ -25,7 +25,7 @@ public final class CosmosBulkItemResponse { private final String eTag; private final double requestCharge; - private final ObjectNode resourceObject; + private ObjectNode resourceObject; private final int statusCode; private final Duration retryAfter; private final int subStatusCode; @@ -199,6 +199,12 @@ private ObjectNode getResourceObject() { public ObjectNode getResourceObject(CosmosBulkItemResponse cosmosBulkItemResponse) { return cosmosBulkItemResponse.getResourceObject(); } + + @Override + public void setResourceObject(CosmosBulkItemResponse cosmosBulkItemResponse, + ObjectNode objectNode) { + cosmosBulkItemResponse.resourceObject = objectNode; + } }); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkOperationResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkOperationResponse.java index 5876c4218eb9c..d6a4adb985d5d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkOperationResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkOperationResponse.java @@ -4,6 +4,7 @@ package com.azure.cosmos.models; import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import reactor.core.publisher.Flux; /** @@ -13,7 +14,7 @@ public final class CosmosBulkOperationResponse { private final CosmosItemOperation operation; - private final CosmosBulkItemResponse response; + private CosmosBulkItemResponse response; private final Exception exception; private final TContext batchContext; @@ -83,4 +84,19 @@ public Exception getException() { public TContext getBatchContext() { return batchContext; } + + /////////////////////////////////////////////////////////////////////////////////////////// + // the following helper/accessor only helps to access this class outside of this package.// + /////////////////////////////////////////////////////////////////////////////////////////// + + static { + ImplementationBridgeHelpers.CosmosBulkOperationResponseHelper.setCosmosBulkOperationResponseAccessor( + new ImplementationBridgeHelpers.CosmosBulkOperationResponseHelper.CosmosBulkOperationResponseAccessor() { + + @Override + public void setResponse(CosmosBulkOperationResponse cosmosBulkOperationResponse, CosmosBulkItemResponse cosmosBulkItemResponse) { + cosmosBulkOperationResponse.response = cosmosBulkItemResponse; + } + }); + } } From ebffbe5b04fc91f5184c6460ea51394589cd2593 Mon Sep 17 00:00:00 2001 From: Kushagra Thapar Date: Mon, 24 Jan 2022 13:52:13 -0800 Subject: [PATCH 03/11] Added custom options handling in bulk for encryption --- .../CosmosEncryptionAsyncContainer.java | 59 ++++++++++--------- .../implementation/batch/BulkExecutor.java | 11 ++++ 2 files changed, 41 insertions(+), 29 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java index 0a0a907949dbe..b09a48c91ef50 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java @@ -1158,45 +1158,45 @@ public Flux> executeBulkOperati bulkOptions = new CosmosBulkExecutionOptions(); } - List> monoList = new ArrayList<>(); - operations.collectList().block().forEach(cosmosItemOperation -> { - Mono cosmosItemOperationMono = null; + Flux operationFlux = operations.flatMap(cosmosItemOperation -> { + Mono cosmosItemOperationMono; if (cosmosItemOperation.getItem() != null) { - ObjectNode objectNode = EncryptionUtils.getSimpleObjectMapper().valueToTree(cosmosItemOperation.getItem()); + ObjectNode objectNode = + EncryptionUtils.getSimpleObjectMapper().valueToTree(cosmosItemOperation.getItem()); assert cosmosItemOperation instanceof ItemBulkOperation; cosmosItemOperationMono = - this.encryptionProcessor.encryptObjectNode(objectNode).map(encryptedItem -> { - return new ItemBulkOperation( - cosmosItemOperation.getOperationType(), - cosmosItemOperation.getId(), - cosmosItemOperation.getPartitionKeyValue(), - ((ItemBulkOperation)cosmosItemOperation).getRequestOptions(), - encryptedItem, - cosmosItemOperation.getContext() - ); - }); + this.encryptionProcessor.encryptObjectNode(objectNode).map(encryptedItem -> new ItemBulkOperation<>( + cosmosItemOperation.getOperationType(), + cosmosItemOperation.getId(), + cosmosItemOperation.getPartitionKeyValue(), + ((ItemBulkOperation) cosmosItemOperation).getRequestOptions(), + encryptedItem, + cosmosItemOperation.getContext() + )); } else { cosmosItemOperationMono = Mono.just( - new ItemBulkOperation( + new ItemBulkOperation<>( cosmosItemOperation.getOperationType(), cosmosItemOperation.getId(), cosmosItemOperation.getPartitionKeyValue(), - ((ItemBulkOperation)cosmosItemOperation).getRequestOptions(), + ((ItemBulkOperation) cosmosItemOperation).getRequestOptions(), null, cosmosItemOperation.getContext() ) ); } - monoList.add(cosmosItemOperationMono); + return cosmosItemOperationMono; }); - Mono> encryptedOperationListMono = - Flux.mergeSequential(monoList).collectList(); - final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions; - setRequestHeaders(cosmosBulkExecutionOptions); + Mono> then = operationFlux.then(Mono.defer(() -> { + setRequestHeaders(cosmosBulkExecutionOptions); + return Mono.just(operationFlux); + })); - return executeBulkOperationsHelper(encryptedOperationListMono.flatMapMany(Flux::fromIterable), cosmosBulkExecutionOptions, false); + return then.flatMapMany(cosmosItemOperationFlux -> { + return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false); + }); } private Flux> executeBulkOperationsHelper(Flux operations, @@ -1208,15 +1208,16 @@ private Flux> executeBulkOperat ObjectNode objectNode = this.cosmosBulkItemResponseAccessor.getResourceObject(cosmosBulkItemResponse); if(objectNode != null) { - encryptionProcessor.decryptJsonNode(objectNode).subscribe(jsonNode -> { - this.cosmosBulkItemResponseAccessor.setResourceObject(cosmosBulkItemResponse, (ObjectNode) jsonNode); - this.cosmosBulkOperationResponseAccessor.setResponse(cosmosBulkOperationResponse, cosmosBulkItemResponse); + Mono jsonNodeMono = encryptionProcessor.decryptJsonNode(objectNode).flatMap(jsonNode -> { + this.cosmosBulkItemResponseAccessor.setResourceObject(cosmosBulkItemResponse, + (ObjectNode) jsonNode); + this.cosmosBulkOperationResponseAccessor.setResponse(cosmosBulkOperationResponse, + cosmosBulkItemResponse); + return Mono.just(jsonNode); }); + return jsonNodeMono.flux().flatMap(jsonNode -> Flux.just((CosmosBulkOperationResponse) cosmosBulkOperationResponse)); } - - CosmosBulkOperationResponse res = (CosmosBulkOperationResponse) cosmosBulkOperationResponse; - - return Flux.just(res); + return Flux.just((CosmosBulkOperationResponse) cosmosBulkOperationResponse); }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java index 52df499218b1c..0bb70dab6db34 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java @@ -39,6 +39,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executors; @@ -682,6 +683,16 @@ private Mono> retryOtherExceptions( private Mono executeBatchRequest(PartitionKeyRangeServerBatchRequest serverRequest) { RequestOptions options = new RequestOptions(); + + // This logic is to handle custom bulk options which can be passed through encryption or through some other project + Map customOptions = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper + .getCosmosBulkExecutionOptionsAccessor() + .getCustomOptions(cosmosBulkExecutionOptions); + if (!customOptions.isEmpty()) { + for(Map.Entry entry : customOptions.entrySet()) { + options.setHeader(entry.getKey(), entry.getValue()); + } + } options.setOperationContextAndListenerTuple(operationListener); // The request options here are used for the BulkRequest exchanged with the service From 7ac13c85effeb4d15e8fd7c869aeb4e31527103e Mon Sep 17 00:00:00 2001 From: Aayush Kataria Date: Mon, 24 Jan 2022 14:03:52 -0800 Subject: [PATCH 04/11] Bulk Encryption Changes --- .../encryption/EncryptionAsyncApiCrudTest.java | 2 +- .../ImplementationBridgeHelpers.java | 17 ++++++++++++++++- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java b/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java index 67780f3030138..ffc0e610cbcfe 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java @@ -55,7 +55,7 @@ public class EncryptionAsyncApiCrudTest extends TestSuiteBase { CosmosEncryptionAsyncContainer cosmosEncryptionAsyncContainer; CosmosEncryptionAsyncDatabase cosmosEncryptionAsyncDatabase; - @Factory(dataProvider = "clientBuilders") + @Factory(dataProvider = "clientBuildersWithSessionConsistency") public EncryptionAsyncApiCrudTest(CosmosClientBuilder clientBuilder) { super(clientBuilder); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index 011e054c6258a..65adf8bcf9e18 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -16,7 +16,22 @@ import com.azure.cosmos.implementation.patch.PatchOperation; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.spark.OperationContextAndListenerTuple; -import com.azure.cosmos.models.*; +import com.azure.cosmos.models.CosmosBatch; +import com.azure.cosmos.models.CosmosBatchOperationResult; +import com.azure.cosmos.models.CosmosBatchRequestOptions; +import com.azure.cosmos.models.CosmosBatchResponse; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkExecutionThresholdsState; +import com.azure.cosmos.models.CosmosBulkItemResponse; +import com.azure.cosmos.models.CosmosChangeFeedRequestOptions; +import com.azure.cosmos.models.CosmosContainerProperties; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosPatchOperations; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.CosmosBulkOperationResponse; import com.azure.cosmos.util.CosmosPagedFlux; import com.fasterxml.jackson.databind.node.ObjectNode; import org.slf4j.Logger; From 08e58e3b31837c8c4fa22cbe43a399323fd10bd8 Mon Sep 17 00:00:00 2001 From: Aayush Kataria Date: Mon, 24 Jan 2022 14:08:07 -0800 Subject: [PATCH 05/11] Bulk Encryption Changes --- .../encryption/EncryptionAsyncApiCrudTest.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java b/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java index ffc0e610cbcfe..e23388b78bf19 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java @@ -777,7 +777,7 @@ public void bulkExecution_deleteItem() { createItemsAndVerify(cosmosItemOperations); - Flux deleteCosmosItemOperationsFlux = Flux.fromIterable(cosmosItemOperations).map( cosmosItemOperation -> { + Flux deleteCosmosItemOperationsFlux = Flux.fromIterable(cosmosItemOperations).map(cosmosItemOperation -> { EncryptionPojo encryptionPojo = cosmosItemOperation.getItem(); return CosmosBulkOperations.getDeleteItemOperation(encryptionPojo.getId(), cosmosItemOperation.getPartitionKeyValue()); }); @@ -817,28 +817,17 @@ public void bulkExecution_readItem() { int totalRequest = getTotalRequest(); List cosmosItemOperations = new ArrayList<>(); -// Map indexCosmosItemOperationMap = new HashMap<>(); for (int i = 0; i < totalRequest; i++) { String itemId = UUID.randomUUID().toString(); - - // use i as a identifier for re check. EncryptionPojo createPojo = getItem(itemId); cosmosItemOperations.add(CosmosBulkOperations.getCreateItemOperation(createPojo, new PartitionKey(createPojo.getMypk()))); -// indexCosmosItemOperationMap.put(createPojo.getId(), cosmosItemOperation); } createItemsAndVerify(cosmosItemOperations); -// Flux cosmosItemOperationsFlux = Flux.range(0, totalRequest).map(i -> { -// String itemId = UUID.randomUUID().toString(); -// EncryptionPojo createPojo = getItem(itemId); -// -// return CosmosBulkOperations.getCreateItemOperation(createPojo, new PartitionKey(createPojo.getMypk())); -// }); - - Flux readCosmosItemOperationsFlux = Flux.fromIterable(cosmosItemOperations).map( cosmosItemOperation -> { + Flux readCosmosItemOperationsFlux = Flux.fromIterable(cosmosItemOperations).map(cosmosItemOperation -> { EncryptionPojo encryptionPojo = cosmosItemOperation.getItem(); return CosmosBulkOperations.getReadItemOperation(encryptionPojo.getId(), cosmosItemOperation.getPartitionKeyValue()); }); @@ -868,7 +857,6 @@ public void bulkExecution_readItem() { assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); - return Mono.just(cosmosBulkItemResponse); }).blockLast(); @@ -882,7 +870,7 @@ private void createItemsAndVerify(List cosmosItemOperations executeBulkOperations(Flux.fromIterable(cosmosItemOperations), cosmosBulkExecutionOptions); HashSet distinctIndex = new HashSet<>(); - AtomicInteger processedDoc = new AtomicInteger(0); + AtomicInteger processedDoc = new AtomicInteger(0); createResponseFlux.flatMap(cosmosBulkOperationResponse -> { processedDoc.incrementAndGet(); From e07328e12b7320a434813e44eaed215b0be53bf2 Mon Sep 17 00:00:00 2001 From: Aayush Kataria Date: Mon, 31 Jan 2022 13:13:10 -0800 Subject: [PATCH 06/11] Bulk Encryption Changes --- .../CosmosEncryptionAsyncContainer.java | 7 +++--- .../ImplementationBridgeHelpers.java | 25 ------------------- .../models/CosmosBulkOperationResponse.java | 15 ----------- 3 files changed, 3 insertions(+), 44 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java index b09a48c91ef50..ff75115bb6316 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java @@ -85,7 +85,6 @@ public class CosmosEncryptionAsyncContainer { ImplementationBridgeHelpers.CosmosPatchOperationsHelper.CosmosPatchOperationsAccessor cosmosPatchOperationsAccessor; ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.CosmosBulkExecutionOptionsAccessor cosmosBulkExecutionOptionsAccessor; ImplementationBridgeHelpers.CosmosBulkItemResponseHelper.CosmosBulkItemResponseAccessor cosmosBulkItemResponseAccessor; - ImplementationBridgeHelpers.CosmosBulkOperationResponseHelper.CosmosBulkOperationResponseAccessor cosmosBulkOperationResponseAccessor; CosmosEncryptionAsyncContainer(CosmosAsyncContainer container, CosmosEncryptionAsyncClient cosmosEncryptionAsyncClient) { @@ -110,7 +109,6 @@ public class CosmosEncryptionAsyncContainer { this.cosmosPatchOperationsAccessor = ImplementationBridgeHelpers.CosmosPatchOperationsHelper.getCosmosPatchOperationsAccessor(); this.cosmosBulkExecutionOptionsAccessor = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor(); this.cosmosBulkItemResponseAccessor = ImplementationBridgeHelpers.CosmosBulkItemResponseHelper.getCosmosBulkItemResponseAccessor(); - this.cosmosBulkOperationResponseAccessor = ImplementationBridgeHelpers.CosmosBulkOperationResponseHelper.getCosmosBulkOperationResponseAccessor(); } EncryptionProcessor getEncryptionProcessor() { @@ -1211,8 +1209,9 @@ private Flux> executeBulkOperat Mono jsonNodeMono = encryptionProcessor.decryptJsonNode(objectNode).flatMap(jsonNode -> { this.cosmosBulkItemResponseAccessor.setResourceObject(cosmosBulkItemResponse, (ObjectNode) jsonNode); - this.cosmosBulkOperationResponseAccessor.setResponse(cosmosBulkOperationResponse, - cosmosBulkItemResponse); + int x =1; +// this.cosmosBulkOperationResponseAccessor.setResponse(cosmosBulkOperationResponse, +// cosmosBulkItemResponse); return Mono.just(jsonNode); }); return jsonNodeMono.flux().flatMap(jsonNode -> Flux.just((CosmosBulkOperationResponse) cosmosBulkOperationResponse)); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index 65adf8bcf9e18..655a09b46ab85 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -711,31 +711,6 @@ public interface CosmosBatchAccessor { } } - public static final class CosmosBulkOperationResponseHelper { - private static CosmosBulkOperationResponseAccessor accessor; - - private CosmosBulkOperationResponseHelper() { - - } - - static { - ensureClassLoaded(CosmosBulkOperationResponse.class); - } - - public static CosmosBulkOperationResponseAccessor getCosmosBulkOperationResponseAccessor() { return accessor; } - - public static void setCosmosBulkOperationResponseAccessor(CosmosBulkOperationResponseAccessor accessor) { - CosmosBulkOperationResponseHelper.accessor = accessor; - } - - public interface CosmosBulkOperationResponseAccessor { - - void setResponse(CosmosBulkOperationResponse cosmosBulkOperationResponse, - CosmosBulkItemResponse cosmosBulkItemResponse); - } - - } - public static final class CosmosBulkItemResponseHelper { private static CosmosBulkItemResponseAccessor accessor; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkOperationResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkOperationResponse.java index d6a4adb985d5d..5a84c16044c74 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkOperationResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/CosmosBulkOperationResponse.java @@ -84,19 +84,4 @@ public Exception getException() { public TContext getBatchContext() { return batchContext; } - - /////////////////////////////////////////////////////////////////////////////////////////// - // the following helper/accessor only helps to access this class outside of this package.// - /////////////////////////////////////////////////////////////////////////////////////////// - - static { - ImplementationBridgeHelpers.CosmosBulkOperationResponseHelper.setCosmosBulkOperationResponseAccessor( - new ImplementationBridgeHelpers.CosmosBulkOperationResponseHelper.CosmosBulkOperationResponseAccessor() { - - @Override - public void setResponse(CosmosBulkOperationResponse cosmosBulkOperationResponse, CosmosBulkItemResponse cosmosBulkItemResponse) { - cosmosBulkOperationResponse.response = cosmosBulkItemResponse; - } - }); - } } From 2317d70c11862502d9307ffefe3e3d7db45a8ba3 Mon Sep 17 00:00:00 2001 From: Aayush Kataria Date: Tue, 1 Feb 2022 11:06:47 -0800 Subject: [PATCH 07/11] Bulk Encryption Changes --- .../CosmosEncryptionAsyncContainer.java | 11 +++-------- .../encryption/EncryptionAsyncApiCrudTest.java | 17 ++++++++++++++--- .../implementation/batch/BulkExecutor.java | 2 +- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java index ff75115bb6316..f162246f89cc1 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java @@ -1187,14 +1187,9 @@ public Flux> executeBulkOperati }); final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions; - Mono> then = operationFlux.then(Mono.defer(() -> { - setRequestHeaders(cosmosBulkExecutionOptions); - return Mono.just(operationFlux); - })); + setRequestHeaders(cosmosBulkExecutionOptions); - return then.flatMapMany(cosmosItemOperationFlux -> { - return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false); - }); + return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false); } private Flux> executeBulkOperationsHelper(Flux operations, @@ -1216,7 +1211,7 @@ private Flux> executeBulkOperat }); return jsonNodeMono.flux().flatMap(jsonNode -> Flux.just((CosmosBulkOperationResponse) cosmosBulkOperationResponse)); } - return Flux.just((CosmosBulkOperationResponse) cosmosBulkOperationResponse); + return Mono.just((CosmosBulkOperationResponse) cosmosBulkOperationResponse); }); } diff --git a/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java b/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java index e23388b78bf19..c689ef6e42a9e 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/test/java/com/azure/cosmos/encryption/EncryptionAsyncApiCrudTest.java @@ -683,10 +683,12 @@ private int getTotalRequest() { @Test(groups = {"encryption"}, timeOut = TIMEOUT) public void bulkExecution_createItem() { int totalRequest = getTotalRequest(); + Map idToItemMap = new HashMap<>(); Flux cosmosItemOperationsFlux = Flux.range(0, totalRequest).map(i -> { String itemId = UUID.randomUUID().toString(); EncryptionPojo createPojo = getItem(itemId); + idToItemMap.put(itemId, createPojo); return CosmosBulkOperations.getCreateItemOperation(createPojo, new PartitionKey(createPojo.getMypk())); }); @@ -713,6 +715,9 @@ public void bulkExecution_createItem() { assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + EncryptionPojo item = cosmosBulkItemResponse.getItem(EncryptionPojo.class); + validateResponse(item, idToItemMap.get(item.getId())); + return Mono.just(cosmosBulkItemResponse); }).blockLast(); @@ -723,10 +728,12 @@ public void bulkExecution_createItem() { public void bulkExecution_upsertItem() { int totalRequest = getTotalRequest(); + Map idToItemMap = new HashMap<>(); Flux cosmosItemOperationsFlux = Flux.range(0, totalRequest).map(i -> { String itemId = UUID.randomUUID().toString(); EncryptionPojo createPojo = getItem(itemId); + idToItemMap.put(itemId, createPojo); return CosmosBulkOperations.getUpsertItemOperation(createPojo, new PartitionKey(createPojo.getMypk())); }); @@ -754,6 +761,9 @@ public void bulkExecution_upsertItem() { assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + EncryptionPojo item = cosmosBulkItemResponse.getItem(EncryptionPojo.class); + validateResponse(item, idToItemMap.get(item.getId())); + return Mono.just(cosmosBulkItemResponse); }).blockLast(); @@ -767,12 +777,9 @@ public void bulkExecution_deleteItem() { List cosmosItemOperations = new ArrayList<>(); for (int i = 0; i < totalRequest; i++) { String itemId = UUID.randomUUID().toString(); - - // use i as a identifier for re check. EncryptionPojo createPojo = getItem(itemId); cosmosItemOperations.add(CosmosBulkOperations.getCreateItemOperation(createPojo, new PartitionKey(createPojo.getMypk()))); -// indexCosmosItemOperationMap.put(createPojo.getId(), cosmosItemOperation); } createItemsAndVerify(cosmosItemOperations); @@ -817,11 +824,13 @@ public void bulkExecution_readItem() { int totalRequest = getTotalRequest(); List cosmosItemOperations = new ArrayList<>(); + Map idToItemMap = new HashMap<>(); for (int i = 0; i < totalRequest; i++) { String itemId = UUID.randomUUID().toString(); EncryptionPojo createPojo = getItem(itemId); + idToItemMap.put(itemId, createPojo); cosmosItemOperations.add(CosmosBulkOperations.getCreateItemOperation(createPojo, new PartitionKey(createPojo.getMypk()))); } @@ -856,6 +865,8 @@ public void bulkExecution_readItem() { assertThat(cosmosBulkItemResponse.getActivityId()).isNotNull(); assertThat(cosmosBulkItemResponse.getRequestCharge()).isNotNull(); + EncryptionPojo item = cosmosBulkItemResponse.getItem(EncryptionPojo.class); + validateResponse(item, idToItemMap.get(item.getId())); return Mono.just(cosmosBulkItemResponse); }).blockLast(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java index 0bb70dab6db34..0ba121d563498 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/batch/BulkExecutor.java @@ -688,7 +688,7 @@ private Mono executeBatchRequest(PartitionKeyRangeServerBat Map customOptions = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper .getCosmosBulkExecutionOptionsAccessor() .getCustomOptions(cosmosBulkExecutionOptions); - if (!customOptions.isEmpty()) { + if (customOptions != null && !customOptions.isEmpty()) { for(Map.Entry entry : customOptions.entrySet()) { options.setHeader(entry.getKey(), entry.getValue()); } From b4c1fa8739a9dfc889dcfb463e8a1fea1178560d Mon Sep 17 00:00:00 2001 From: Aayush Kataria Date: Wed, 2 Feb 2022 13:16:45 -0800 Subject: [PATCH 08/11] Bulk Encryption Changes --- .../encryption/CosmosEncryptionAsyncContainer.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java index f162246f89cc1..af16e5ccbb734 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java @@ -1156,6 +1156,7 @@ public Flux> executeBulkOperati bulkOptions = new CosmosBulkExecutionOptions(); } + final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions; Flux operationFlux = operations.flatMap(cosmosItemOperation -> { Mono cosmosItemOperationMono; if (cosmosItemOperation.getItem() != null) { @@ -1184,10 +1185,10 @@ public Flux> executeBulkOperati ); } return cosmosItemOperationMono; - }); + }).doOnComplete(() -> setRequestHeaders(cosmosBulkExecutionOptions)); - final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions; - setRequestHeaders(cosmosBulkExecutionOptions); +// final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions; +// setRequestHeaders(cosmosBulkExecutionOptions); return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false); } @@ -1204,9 +1205,6 @@ private Flux> executeBulkOperat Mono jsonNodeMono = encryptionProcessor.decryptJsonNode(objectNode).flatMap(jsonNode -> { this.cosmosBulkItemResponseAccessor.setResourceObject(cosmosBulkItemResponse, (ObjectNode) jsonNode); - int x =1; -// this.cosmosBulkOperationResponseAccessor.setResponse(cosmosBulkOperationResponse, -// cosmosBulkItemResponse); return Mono.just(jsonNode); }); return jsonNodeMono.flux().flatMap(jsonNode -> Flux.just((CosmosBulkOperationResponse) cosmosBulkOperationResponse)); From a005f08214241d52af7c9cca2f0dce445ed8b96c Mon Sep 17 00:00:00 2001 From: Aayush Kataria Date: Wed, 2 Feb 2022 13:18:12 -0800 Subject: [PATCH 09/11] Bulk Encryption Changes --- .../cosmos/encryption/CosmosEncryptionAsyncContainer.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java index af16e5ccbb734..24f5783361daa 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java @@ -1187,9 +1187,6 @@ public Flux> executeBulkOperati return cosmosItemOperationMono; }).doOnComplete(() -> setRequestHeaders(cosmosBulkExecutionOptions)); -// final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions; -// setRequestHeaders(cosmosBulkExecutionOptions); - return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false); } From e86da5fb1675913bbca3de1153104d0985c62896 Mon Sep 17 00:00:00 2001 From: Aayush Kataria Date: Thu, 3 Feb 2022 13:25:58 -0800 Subject: [PATCH 10/11] Bulk Encryption Changes --- .../encryption/CosmosEncryptionAsyncContainer.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java index 24f5783361daa..0e88ff7997fd1 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java @@ -1185,9 +1185,16 @@ public Flux> executeBulkOperati ); } return cosmosItemOperationMono; - }).doOnComplete(() -> setRequestHeaders(cosmosBulkExecutionOptions)); + }); + + Mono> then = operationFlux.then(Mono.defer(() -> { + setRequestHeaders(cosmosBulkExecutionOptions); + return Mono.just(operationFlux); + })); - return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false); + return then.flatMapMany(cosmosItemOperationFlux -> { + return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false); + }); } private Flux> executeBulkOperationsHelper(Flux operations, From b31eb94c694b887e0cd0ddeb9c5ac31f20a6e9de Mon Sep 17 00:00:00 2001 From: Aayush Kataria Date: Fri, 4 Feb 2022 15:03:18 -0800 Subject: [PATCH 11/11] Bulk Encryption Changes --- .../encryption/CosmosEncryptionAsyncContainer.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java index 0e88ff7997fd1..65696d1b36ef8 100644 --- a/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos-encryption/src/main/java/com/azure/cosmos/encryption/CosmosEncryptionAsyncContainer.java @@ -1187,14 +1187,10 @@ public Flux> executeBulkOperati return cosmosItemOperationMono; }); - Mono> then = operationFlux.then(Mono.defer(() -> { - setRequestHeaders(cosmosBulkExecutionOptions); - return Mono.just(operationFlux); - })); - - return then.flatMapMany(cosmosItemOperationFlux -> { - return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false); - }); + Mono> listMono = operationFlux.collectList(); + setRequestHeaders(cosmosBulkExecutionOptions); + operationFlux = listMono.flatMapMany(Flux::fromIterable); + return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false); } private Flux> executeBulkOperationsHelper(Flux operations,