Skip to content

Commit

Permalink
Encryption Api for Bulk Operations (Azure#26672)
Browse files Browse the repository at this point in the history
* Encryption Bulk API

* Encryption Bulk Api

* Added custom options handling in bulk for encryption

* Bulk Encryption Changes

* Bulk Encryption Changes

* Bulk Encryption Changes

* Bulk Encryption Changes

* Bulk Encryption Changes

* Bulk Encryption Changes

* Bulk Encryption Changes

* Bulk Encryption Changes

Co-authored-by: Aayush Kataria <[email protected]>
Co-authored-by: Aayush Kataria <[email protected]>
Co-authored-by: Kushagra Thapar <[email protected]>
  • Loading branch information
4 people authored Feb 5, 2022
1 parent 1fe7ae6 commit 58c802a
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.ItemDeserializer;
import com.azure.cosmos.implementation.batch.ItemBatchOperation;
import com.azure.cosmos.implementation.batch.ItemBulkOperation;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkItemResponse;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.query.Transformer;
import com.azure.cosmos.models.CosmosBatch;
Expand All @@ -35,10 +40,10 @@
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
import com.azure.cosmos.implementation.patch.PatchOperation;
import com.azure.cosmos.implementation.patch.PatchOperationCore;
import com.azure.cosmos.implementation.patch.PatchOperationType;
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -78,6 +83,8 @@ public class CosmosEncryptionAsyncContainer {
ImplementationBridgeHelpers.CosmosBatchOperationResultHelper.CosmosBatchOperationResultAccessor cosmosBatchOperationResultAccessor;
ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.CosmosBatchRequestOptionsAccessor cosmosBatchRequestOptionsAccessor;
ImplementationBridgeHelpers.CosmosPatchOperationsHelper.CosmosPatchOperationsAccessor cosmosPatchOperationsAccessor;
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.CosmosBulkExecutionOptionsAccessor cosmosBulkExecutionOptionsAccessor;
ImplementationBridgeHelpers.CosmosBulkItemResponseHelper.CosmosBulkItemResponseAccessor cosmosBulkItemResponseAccessor;

CosmosEncryptionAsyncContainer(CosmosAsyncContainer container,
CosmosEncryptionAsyncClient cosmosEncryptionAsyncClient) {
Expand All @@ -100,6 +107,8 @@ public class CosmosEncryptionAsyncContainer {
this.cosmosBatchOperationResultAccessor = ImplementationBridgeHelpers.CosmosBatchOperationResultHelper.getCosmosBatchOperationResultAccessor();
this.cosmosBatchRequestOptionsAccessor = ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.getCosmosBatchRequestOptionsAccessor();
this.cosmosPatchOperationsAccessor = ImplementationBridgeHelpers.CosmosPatchOperationsHelper.getCosmosPatchOperationsAccessor();
this.cosmosBulkExecutionOptionsAccessor = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor();
this.cosmosBulkItemResponseAccessor = ImplementationBridgeHelpers.CosmosBulkItemResponseHelper.getCosmosBulkItemResponseAccessor();
}

EncryptionProcessor getEncryptionProcessor() {
Expand Down Expand Up @@ -1087,6 +1096,124 @@ private Mono<CosmosBatchResponse> executeCosmosBatchHelper(CosmosBatch encrypted
});
}

/**
* Executes flux of operations in Bulk.
*
* @param <TContext> The context for the bulk processing.
* @param operations Flux of operation which will be executed by this container.
*
* @return A Flux of {@link CosmosBulkOperationResponse} which contains operation and it's response or exception.
* <p>
* To create a operation which can be executed here, use {@link com.azure.cosmos.models.CosmosBulkOperations}. For eg.
* for a upsert operation use {@link com.azure.cosmos.models.CosmosBulkOperations#getUpsertItemOperation(Object, PartitionKey)}
* </p>
* <p>
* We can get the corresponding operation using {@link CosmosBulkOperationResponse#getOperation()} and
* it's response using {@link CosmosBulkOperationResponse#getResponse()}. If the operation was executed
* successfully, the value returned by {@link com.azure.cosmos.models.CosmosBulkItemResponse#isSuccessStatusCode()} will be true. To get
* actual status use {@link com.azure.cosmos.models.CosmosBulkItemResponse#getStatusCode()}.
* </p>
* To check if the operation had any exception, use {@link CosmosBulkOperationResponse#getException()} to
* get the exception.
*/
@Beta(value = Beta.SinceVersion.V1, warningText =
Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public <TContext> Flux<CosmosBulkOperationResponse<TContext>> executeBulkOperations(
Flux<CosmosItemOperation> operations) {

return this.executeBulkOperations(operations, new CosmosBulkExecutionOptions());
}

/**
* Executes flux of operations in Bulk.
*
* @param <TContext> The context for the bulk processing.
*
* @param operations Flux of operation which will be executed by this container.
* @param bulkOptions Options that apply for this Bulk request which specifies options regarding execution like
* concurrency, batching size, interval and context.
*
* @return A Flux of {@link CosmosBulkOperationResponse} which contains operation and it's response or exception.
* <p>
* To create a operation which can be executed here, use {@link com.azure.cosmos.models.CosmosBulkOperations}. For eg.
* for a upsert operation use {@link com.azure.cosmos.models.CosmosBulkOperations#getUpsertItemOperation(Object, PartitionKey)}
* </p>
* <p>
* We can get the corresponding operation using {@link CosmosBulkOperationResponse#getOperation()} and
* it's response using {@link CosmosBulkOperationResponse#getResponse()}. If the operation was executed
* successfully, the value returned by {@link com.azure.cosmos.models.CosmosBulkItemResponse#isSuccessStatusCode()} will be true. To get
* actual status use {@link com.azure.cosmos.models.CosmosBulkItemResponse#getStatusCode()}.
* </p>
* To check if the operation had any exception, use {@link CosmosBulkOperationResponse#getException()} to
* get the exception.
*/
@Beta(value = Beta.SinceVersion.V1, warningText =
Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public <TContext> Flux<CosmosBulkOperationResponse<TContext>> executeBulkOperations(
Flux<CosmosItemOperation> operations,
CosmosBulkExecutionOptions bulkOptions) {
if (bulkOptions == null) {
bulkOptions = new CosmosBulkExecutionOptions();
}

final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions;
Flux<CosmosItemOperation> operationFlux = operations.flatMap(cosmosItemOperation -> {
Mono<CosmosItemOperation> cosmosItemOperationMono;
if (cosmosItemOperation.getItem() != null) {
ObjectNode objectNode =
EncryptionUtils.getSimpleObjectMapper().valueToTree(cosmosItemOperation.getItem());
assert cosmosItemOperation instanceof ItemBulkOperation;
cosmosItemOperationMono =
this.encryptionProcessor.encryptObjectNode(objectNode).map(encryptedItem -> new ItemBulkOperation<>(
cosmosItemOperation.getOperationType(),
cosmosItemOperation.getId(),
cosmosItemOperation.getPartitionKeyValue(),
((ItemBulkOperation<JsonNode, TContext>) cosmosItemOperation).getRequestOptions(),
encryptedItem,
cosmosItemOperation.getContext()
));
} else {
cosmosItemOperationMono = Mono.just(
new ItemBulkOperation<>(
cosmosItemOperation.getOperationType(),
cosmosItemOperation.getId(),
cosmosItemOperation.getPartitionKeyValue(),
((ItemBulkOperation<JsonNode, TContext>) cosmosItemOperation).getRequestOptions(),
null,
cosmosItemOperation.getContext()
)
);
}
return cosmosItemOperationMono;
});

Mono<List<CosmosItemOperation>> listMono = operationFlux.collectList();
setRequestHeaders(cosmosBulkExecutionOptions);
operationFlux = listMono.flatMapMany(Flux::fromIterable);
return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false);
}

private <TContext> Flux<CosmosBulkOperationResponse<TContext>> executeBulkOperationsHelper(Flux<CosmosItemOperation> operations,
CosmosBulkExecutionOptions bulkOptions,
boolean isRetry) {
return this.container.executeBulkOperations(operations, bulkOptions).flatMap(cosmosBulkOperationResponse -> {

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
ObjectNode objectNode = this.cosmosBulkItemResponseAccessor.getResourceObject(cosmosBulkItemResponse);

if(objectNode != null) {
Mono<JsonNode> jsonNodeMono = encryptionProcessor.decryptJsonNode(objectNode).flatMap(jsonNode -> {
this.cosmosBulkItemResponseAccessor.setResourceObject(cosmosBulkItemResponse,
(ObjectNode) jsonNode);
return Mono.just(jsonNode);
});
return jsonNodeMono.flux().flatMap(jsonNode -> Flux.just((CosmosBulkOperationResponse<TContext>) cosmosBulkOperationResponse));
}
return Mono.just((CosmosBulkOperationResponse<TContext>) cosmosBulkOperationResponse);
});
}


private void setRequestHeaders(CosmosItemRequestOptions requestOptions) {
this.cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
this.cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());
Expand All @@ -1107,6 +1234,11 @@ private void setRequestHeaders(CosmosBatchRequestOptions requestOptions) {
this.cosmosBatchRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());
}

private void setRequestHeaders(CosmosBulkExecutionOptions requestOptions) {
this.cosmosBulkExecutionOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
this.cosmosBulkExecutionOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());
}

boolean isIncorrectContainerRid(CosmosException cosmosException) {
return cosmosException.getStatusCode() == HttpConstants.StatusCodes.BADREQUEST &&
cosmosException.getResponseHeaders().get(HttpConstants.HttpHeaders.SUB_STATUS)
Expand Down
Loading

0 comments on commit 58c802a

Please sign in to comment.