Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encryption Api for Bulk Operations #26672

Merged
merged 18 commits into from
Feb 5, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.ItemDeserializer;
import com.azure.cosmos.implementation.batch.ItemBatchOperation;
import com.azure.cosmos.implementation.batch.ItemBulkOperation;
import com.azure.cosmos.models.CosmosItemOperation;
import com.azure.cosmos.models.CosmosBulkExecutionOptions;
import com.azure.cosmos.models.CosmosBulkOperationResponse;
import com.azure.cosmos.models.CosmosBulkItemResponse;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.query.Transformer;
import com.azure.cosmos.models.CosmosBatch;
Expand All @@ -35,10 +40,10 @@
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
import com.azure.cosmos.implementation.patch.PatchOperation;
import com.azure.cosmos.implementation.patch.PatchOperationCore;
import com.azure.cosmos.implementation.patch.PatchOperationType;
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -78,6 +83,8 @@ public class CosmosEncryptionAsyncContainer {
ImplementationBridgeHelpers.CosmosBatchOperationResultHelper.CosmosBatchOperationResultAccessor cosmosBatchOperationResultAccessor;
ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.CosmosBatchRequestOptionsAccessor cosmosBatchRequestOptionsAccessor;
ImplementationBridgeHelpers.CosmosPatchOperationsHelper.CosmosPatchOperationsAccessor cosmosPatchOperationsAccessor;
ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.CosmosBulkExecutionOptionsAccessor cosmosBulkExecutionOptionsAccessor;
ImplementationBridgeHelpers.CosmosBulkItemResponseHelper.CosmosBulkItemResponseAccessor cosmosBulkItemResponseAccessor;

CosmosEncryptionAsyncContainer(CosmosAsyncContainer container,
CosmosEncryptionAsyncClient cosmosEncryptionAsyncClient) {
Expand All @@ -100,6 +107,8 @@ public class CosmosEncryptionAsyncContainer {
this.cosmosBatchOperationResultAccessor = ImplementationBridgeHelpers.CosmosBatchOperationResultHelper.getCosmosBatchOperationResultAccessor();
this.cosmosBatchRequestOptionsAccessor = ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.getCosmosBatchRequestOptionsAccessor();
this.cosmosPatchOperationsAccessor = ImplementationBridgeHelpers.CosmosPatchOperationsHelper.getCosmosPatchOperationsAccessor();
this.cosmosBulkExecutionOptionsAccessor = ImplementationBridgeHelpers.CosmosBulkExecutionOptionsHelper.getCosmosBulkExecutionOptionsAccessor();
this.cosmosBulkItemResponseAccessor = ImplementationBridgeHelpers.CosmosBulkItemResponseHelper.getCosmosBulkItemResponseAccessor();
}

EncryptionProcessor getEncryptionProcessor() {
Expand Down Expand Up @@ -1087,6 +1096,131 @@ private Mono<CosmosBatchResponse> executeCosmosBatchHelper(CosmosBatch encrypted
});
}

/**
* Executes flux of operations in Bulk.
*
* @param <TContext> The context for the bulk processing.
* @param operations Flux of operation which will be executed by this container.
*
* @return A Flux of {@link CosmosBulkOperationResponse} which contains operation and it's response or exception.
* <p>
* To create a operation which can be executed here, use {@link com.azure.cosmos.models.CosmosBulkOperations}. For eg.
* for a upsert operation use {@link com.azure.cosmos.models.CosmosBulkOperations#getUpsertItemOperation(Object, PartitionKey)}
* </p>
* <p>
* We can get the corresponding operation using {@link CosmosBulkOperationResponse#getOperation()} and
* it's response using {@link CosmosBulkOperationResponse#getResponse()}. If the operation was executed
* successfully, the value returned by {@link com.azure.cosmos.models.CosmosBulkItemResponse#isSuccessStatusCode()} will be true. To get
* actual status use {@link com.azure.cosmos.models.CosmosBulkItemResponse#getStatusCode()}.
* </p>
* To check if the operation had any exception, use {@link CosmosBulkOperationResponse#getException()} to
* get the exception.
*/
@Beta(value = Beta.SinceVersion.V1, warningText =
Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public <TContext> Flux<CosmosBulkOperationResponse<TContext>> executeBulkOperations(
Flux<CosmosItemOperation> operations) {

return this.executeBulkOperations(operations, new CosmosBulkExecutionOptions());
}

/**
* Executes flux of operations in Bulk.
*
* @param <TContext> The context for the bulk processing.
*
* @param operations Flux of operation which will be executed by this container.
* @param bulkOptions Options that apply for this Bulk request which specifies options regarding execution like
* concurrency, batching size, interval and context.
*
* @return A Flux of {@link CosmosBulkOperationResponse} which contains operation and it's response or exception.
* <p>
* To create a operation which can be executed here, use {@link com.azure.cosmos.models.CosmosBulkOperations}. For eg.
* for a upsert operation use {@link com.azure.cosmos.models.CosmosBulkOperations#getUpsertItemOperation(Object, PartitionKey)}
* </p>
* <p>
* We can get the corresponding operation using {@link CosmosBulkOperationResponse#getOperation()} and
* it's response using {@link CosmosBulkOperationResponse#getResponse()}. If the operation was executed
* successfully, the value returned by {@link com.azure.cosmos.models.CosmosBulkItemResponse#isSuccessStatusCode()} will be true. To get
* actual status use {@link com.azure.cosmos.models.CosmosBulkItemResponse#getStatusCode()}.
* </p>
* To check if the operation had any exception, use {@link CosmosBulkOperationResponse#getException()} to
* get the exception.
*/
@Beta(value = Beta.SinceVersion.V1, warningText =
Beta.PREVIEW_SUBJECT_TO_CHANGE_WARNING)
public <TContext> Flux<CosmosBulkOperationResponse<TContext>> executeBulkOperations(
Flux<CosmosItemOperation> operations,
CosmosBulkExecutionOptions bulkOptions) {
if (bulkOptions == null) {
bulkOptions = new CosmosBulkExecutionOptions();
}

Flux<CosmosItemOperation> operationFlux = operations.flatMap(cosmosItemOperation -> {
Mono<CosmosItemOperation> cosmosItemOperationMono;
if (cosmosItemOperation.getItem() != null) {
ObjectNode objectNode =
EncryptionUtils.getSimpleObjectMapper().valueToTree(cosmosItemOperation.getItem());
assert cosmosItemOperation instanceof ItemBulkOperation;
cosmosItemOperationMono =
this.encryptionProcessor.encryptObjectNode(objectNode).map(encryptedItem -> new ItemBulkOperation<>(
cosmosItemOperation.getOperationType(),
cosmosItemOperation.getId(),
cosmosItemOperation.getPartitionKeyValue(),
((ItemBulkOperation<JsonNode, TContext>) cosmosItemOperation).getRequestOptions(),
encryptedItem,
cosmosItemOperation.getContext()
));
} else {
cosmosItemOperationMono = Mono.just(
new ItemBulkOperation<>(
cosmosItemOperation.getOperationType(),
cosmosItemOperation.getId(),
cosmosItemOperation.getPartitionKeyValue(),
((ItemBulkOperation<JsonNode, TContext>) cosmosItemOperation).getRequestOptions(),
null,
cosmosItemOperation.getContext()
)
);
}
return cosmosItemOperationMono;
});

final CosmosBulkExecutionOptions cosmosBulkExecutionOptions = bulkOptions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work, because encryptionProcessor is not initialized if the encryption does not happen. Encryption happens only when the above flux gets executed. And this header code will get executed before encryption is happening.
This header needs to be set in the flux chain. Please revert this code to its original state.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted the code.

Mono<Flux<CosmosItemOperation>> then = operationFlux.then(Mono.defer(() -> {
aayush3011 marked this conversation as resolved.
Show resolved Hide resolved
setRequestHeaders(cosmosBulkExecutionOptions);
return Mono.just(operationFlux);
}));

return then.flatMapMany(cosmosItemOperationFlux -> {
return executeBulkOperationsHelper(operationFlux, cosmosBulkExecutionOptions, false);
});
}

private <TContext> Flux<CosmosBulkOperationResponse<TContext>> executeBulkOperationsHelper(Flux<CosmosItemOperation> operations,
CosmosBulkExecutionOptions bulkOptions,
boolean isRetry) {
return this.container.executeBulkOperations(operations, bulkOptions).flatMap(cosmosBulkOperationResponse -> {

CosmosBulkItemResponse cosmosBulkItemResponse = cosmosBulkOperationResponse.getResponse();
ObjectNode objectNode = this.cosmosBulkItemResponseAccessor.getResourceObject(cosmosBulkItemResponse);

if(objectNode != null) {
Mono<JsonNode> jsonNodeMono = encryptionProcessor.decryptJsonNode(objectNode).flatMap(jsonNode -> {
this.cosmosBulkItemResponseAccessor.setResourceObject(cosmosBulkItemResponse,
(ObjectNode) jsonNode);
int x =1;
aayush3011 marked this conversation as resolved.
Show resolved Hide resolved
// this.cosmosBulkOperationResponseAccessor.setResponse(cosmosBulkOperationResponse,
aayush3011 marked this conversation as resolved.
Show resolved Hide resolved
// cosmosBulkItemResponse);
return Mono.just(jsonNode);
});
return jsonNodeMono.flux().flatMap(jsonNode -> Flux.just((CosmosBulkOperationResponse<TContext>) cosmosBulkOperationResponse));
}
return Flux.just((CosmosBulkOperationResponse<TContext>) cosmosBulkOperationResponse);
aayush3011 marked this conversation as resolved.
Show resolved Hide resolved
});
}


private void setRequestHeaders(CosmosItemRequestOptions requestOptions) {
this.cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
this.cosmosItemRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());
Expand All @@ -1107,6 +1241,11 @@ private void setRequestHeaders(CosmosBatchRequestOptions requestOptions) {
this.cosmosBatchRequestOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());
}

private void setRequestHeaders(CosmosBulkExecutionOptions requestOptions) {
this.cosmosBulkExecutionOptionsAccessor.setHeader(requestOptions, Constants.IS_CLIENT_ENCRYPTED_HEADER, "true");
this.cosmosBulkExecutionOptionsAccessor.setHeader(requestOptions, Constants.INTENDED_COLLECTION_RID_HEADER, this.encryptionProcessor.getContainerRid());
}

boolean isIncorrectContainerRid(CosmosException cosmosException) {
return cosmosException.getStatusCode() == HttpConstants.StatusCodes.BADREQUEST &&
cosmosException.getResponseHeaders().get(HttpConstants.HttpHeaders.SUB_STATUS)
Expand Down
Loading