Skip to content

Commit

Permalink
Patch Api for Encryption (#25195)
Browse files Browse the repository at this point in the history
* Patch Api for Encryption

* Patch Api for Encryption

* Patch Api for Encryption

* Patch Api for Encryption

* Encryption Patch

* Encryption Patch

* Encryption Patch

* Encryption Patch

* Encryption Patch

Co-authored-by: Aayush Kataria <[email protected]>
  • Loading branch information
aayush3011 and Aayush Kataria authored Nov 12, 2021
1 parent 09206e3 commit b03f3da
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.CosmosPatchOperations;
import com.azure.cosmos.implementation.patch.PatchOperation;
import com.azure.cosmos.implementation.patch.PatchOperationCore;
import com.azure.cosmos.implementation.patch.PatchOperationType;
import com.azure.cosmos.models.CosmosPatchItemRequestOptions;
import com.azure.cosmos.util.CosmosPagedFlux;
import com.azure.cosmos.util.UtilBridgeInternal;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -72,6 +77,7 @@ public class CosmosEncryptionAsyncContainer {
ImplementationBridgeHelpers.CosmosBatchResponseHelper.CosmosBatchResponseAccessor cosmosBatchResponseAccessor;
ImplementationBridgeHelpers.CosmosBatchOperationResultHelper.CosmosBatchOperationResultAccessor cosmosBatchOperationResultAccessor;
ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.CosmosBatchRequestOptionsAccessor cosmosBatchRequestOptionsAccessor;
ImplementationBridgeHelpers.CosmosPatchOperationsHelper.CosmosPatchOperationsAccessor cosmosPatchOperationsAccessor;

CosmosEncryptionAsyncContainer(CosmosAsyncContainer container,
CosmosEncryptionAsyncClient cosmosEncryptionAsyncClient) {
Expand All @@ -93,6 +99,7 @@ public class CosmosEncryptionAsyncContainer {
this.cosmosBatchResponseAccessor = ImplementationBridgeHelpers.CosmosBatchResponseHelper.getCosmosBatchResponseAccessor();
this.cosmosBatchOperationResultAccessor = ImplementationBridgeHelpers.CosmosBatchOperationResultHelper.getCosmosBatchOperationResultAccessor();
this.cosmosBatchRequestOptionsAccessor = ImplementationBridgeHelpers.CosmosBatchRequestOptionsHelper.getCosmosBatchRequestOptionsAccessor();
this.cosmosPatchOperationsAccessor = ImplementationBridgeHelpers.CosmosPatchOperationsHelper.getCosmosPatchOperationsAccessor();
}

EncryptionProcessor getEncryptionProcessor() {
Expand Down Expand Up @@ -530,6 +537,105 @@ public <T> CosmosPagedFlux<T> queryChangeFeed(CosmosChangeFeedRequestOptions opt
return queryChangeFeedHelper(options, classType,false);
}

/**
* Run patch operations on an Item.
* <p>
* After subscription the operation will be performed.
* The {@link Mono} upon successful completion will contain a single Cosmos item response with the patched item.
*
* @param <T> the type parameter.
* @param itemId the item id.
* @param partitionKey the partition key.
* @param cosmosPatchOperations Represents a container having list of operations to be sequentially applied to the referred Cosmos item.
* @param options the request options.
* @param itemType the item type.
*
* @return an {@link Mono} containing the Cosmos item resource response with the patched item or an error.
*/
public <T> Mono<CosmosItemResponse<T>> patchItem(
String itemId,
PartitionKey partitionKey,
CosmosPatchOperations cosmosPatchOperations,
CosmosPatchItemRequestOptions options,
Class<T> itemType) {

checkNotNull(itemId, "expected non-null itemId");
checkNotNull(partitionKey, "expected non-null partitionKey for patchItem");
checkNotNull(cosmosPatchOperations, "expected non-null cosmosPatchOperations");

if (options == null) {
options = new CosmosPatchItemRequestOptions();
}

return patchItemHelper(itemId, partitionKey, cosmosPatchOperations, options, itemType);
}

private <T> Mono<CosmosItemResponse<T>> patchItemHelper(String itemId,
PartitionKey partitionKey,
CosmosPatchOperations cosmosPatchOperations,
CosmosPatchItemRequestOptions options,
Class<T> itemType) {
this.setRequestHeaders(options);
List<Mono<PatchOperation>> monoList = new ArrayList<>();
for (PatchOperation patchOperation : this.cosmosPatchOperationsAccessor.getPatchOperations(cosmosPatchOperations)) {
Mono<PatchOperation> itemPatchOperationMono = null;
if (patchOperation.getOperationType() == PatchOperationType.REMOVE) {
itemPatchOperationMono = Mono.just(patchOperation);
}
else if (patchOperation.getOperationType() == PatchOperationType.INCREMENT) {
throw new IllegalArgumentException("Increment patch operation is not allowed for encrypted path");
}
else if (patchOperation instanceof PatchOperationCore) {
JsonNode objectNode = EncryptionUtils.getSimpleObjectMapper().valueToTree(((PatchOperationCore)patchOperation).getResource());
itemPatchOperationMono =
encryptionProcessor.encryptPatchNode(objectNode, ((PatchOperationCore)patchOperation).getPath()).map(encryptedObjectNode -> {
return new PatchOperationCore<>(
patchOperation.getOperationType(),
((PatchOperationCore)patchOperation).getPath(),
encryptedObjectNode
);
});
}
monoList.add(itemPatchOperationMono);
}
Mono<List<PatchOperation>> encryptedPatchOperationsListMono =
Flux.mergeSequential(monoList).collectList();
CosmosPatchItemRequestOptions finalRequestOptions = options;

CosmosPatchOperations encryptedCosmosPatchOperations = CosmosPatchOperations.create();

return encryptedPatchOperationsListMono.flatMap(patchOperations -> {
this.cosmosPatchOperationsAccessor.getPatchOperations(encryptedCosmosPatchOperations).addAll(patchOperations);
return patchItemInternalHelper(itemId, partitionKey, encryptedCosmosPatchOperations, finalRequestOptions,itemType, false);
});
}

@SuppressWarnings("unchecked") // Casting cosmosItemResponse to CosmosItemResponse<byte[]> from CosmosItemResponse<T>
private <T> Mono<CosmosItemResponse<T>> patchItemInternalHelper(String itemId,
PartitionKey partitionKey,
CosmosPatchOperations encryptedCosmosPatchOperations,
CosmosPatchItemRequestOptions requestOptions,
Class<T> itemType,
boolean isRetry) {

setRequestHeaders(requestOptions);
return this.container.patchItem(itemId, partitionKey, encryptedCosmosPatchOperations, requestOptions, itemType).publishOn(encryptionScheduler).
flatMap(cosmosItemResponse -> setByteArrayContent((CosmosItemResponse<byte[]>) cosmosItemResponse,
this.encryptionProcessor.decrypt(this.cosmosItemResponseBuilderAccessor.getByteArrayContent((CosmosItemResponse<byte[]>) cosmosItemResponse)))
.map(bytes -> this.responseFactory.createItemResponse((CosmosItemResponse<byte[]>) cosmosItemResponse,
itemType))).onErrorResume(exception -> {
if (!isRetry && exception instanceof CosmosException) {
final CosmosException cosmosException = (CosmosException) exception;
if (isIncorrectContainerRid(cosmosException)) {
this.encryptionProcessor.getIsEncryptionSettingsInitDone().set(false);
return this.encryptionProcessor.initializeEncryptionSettingsAsync(true).then
(Mono.defer(() -> patchItemInternalHelper(itemId, partitionKey, encryptedCosmosPatchOperations, requestOptions, itemType, true)));
}
}
return Mono.error(exception);
});
}

/**
* Get the CosmosEncryptionAsyncClient
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,51 @@ public Mono<byte[]> encrypt(byte[] payload) {
return encrypt(itemJObj);
}

public Mono<byte[]> encrypt(ObjectNode itemJObj) {
public Mono<byte[]> encrypt(JsonNode itemJObj) {
return encryptObjectNode(itemJObj).map(encryptedObjectNode -> EncryptionUtils.serializeJsonToByteArray(EncryptionUtils.getSimpleObjectMapper(), encryptedObjectNode));
}

public Mono<ObjectNode> encryptObjectNode(ObjectNode itemJObj) {
public Mono<JsonNode> encryptPatchNode(JsonNode itemObj, String patchPropertyPath) {
assert (itemObj != null);
return initEncryptionSettingsIfNotInitializedAsync().then(Mono.defer(() -> {
for (ClientEncryptionIncludedPath includedPath : this.clientEncryptionPolicy.getIncludedPaths()) {
if (StringUtils.isEmpty(includedPath.getPath()) || includedPath.getPath().charAt(0) != '/' || includedPath.getPath().lastIndexOf('/') != 0) {
return Mono.error(new IllegalArgumentException("Invalid encryption path: " + includedPath.getPath()));
}
}

for (ClientEncryptionIncludedPath includedPath : this.clientEncryptionPolicy.getIncludedPaths()) {
String propertyName = includedPath.getPath().substring(1);
if (patchPropertyPath.substring(1).equals(propertyName)) {
if (itemObj.isValueNode()) {
return this.encryptionSettings.getEncryptionSettingForPropertyAsync(propertyName,
this).flatMap(settings -> {
try {
return Mono.just(EncryptionUtils.getSimpleObjectMapper().readTree(EncryptionUtils.getSimpleObjectMapper()
.writeValueAsString(encryptAndSerializeValue(settings,
null, itemObj, propertyName))));
} catch (MicrosoftDataEncryptionException | JsonProcessingException ex) {
return Mono.error(ex);
}
});
} else {
return this.encryptionSettings.getEncryptionSettingForPropertyAsync(propertyName,
this).flatMap(settings -> {
try {
return Mono.just(encryptAndSerializePatchProperty(settings,
itemObj, propertyName));
} catch (MicrosoftDataEncryptionException | JsonProcessingException ex) {
return Mono.error(ex);
}
});
}
}
}
return Mono.empty();
}));
}

public Mono<JsonNode> encryptObjectNode(JsonNode itemJObj) {
assert (itemJObj != null);
return initEncryptionSettingsIfNotInitializedAsync().then(Mono.defer(() -> {
for (ClientEncryptionIncludedPath includedPath : this.clientEncryptionPolicy.getIncludedPaths()) {
Expand All @@ -265,7 +305,7 @@ public Mono<ObjectNode> encryptObjectNode(ObjectNode itemJObj) {
this).flatMap(settings -> {
try {
encryptAndSerializeProperty(settings, itemJObj, propertyValueHolder, propertyName);
} catch (MicrosoftDataEncryptionException ex) {
} catch (MicrosoftDataEncryptionException | JsonProcessingException ex) {
return Mono.error(ex);
}
return Mono.empty();
Expand All @@ -278,8 +318,73 @@ public Mono<ObjectNode> encryptObjectNode(ObjectNode itemJObj) {
}));
}

public void encryptAndSerializeProperty(EncryptionSettings encryptionSettings, ObjectNode objectNode,
JsonNode propertyValueHolder, String propertyName) throws MicrosoftDataEncryptionException {
@SuppressWarnings("unchecked")
public JsonNode encryptAndSerializePatchProperty(EncryptionSettings encryptionSettings,
JsonNode propertyValueHolder, String propertyName) throws MicrosoftDataEncryptionException, JsonProcessingException {
if (propertyValueHolder.isObject()) {
for (Iterator<Map.Entry<String, JsonNode>> it = propertyValueHolder.fields(); it.hasNext(); ) {
Map.Entry<String, JsonNode> child = it.next();
if (child.getValue().isObject() || child.getValue().isArray()) {
JsonNode encryptedValue = encryptAndSerializePatchProperty(encryptionSettings, child.getValue(), child.getKey());
assert propertyValueHolder instanceof ObjectNode;
((ObjectNode) propertyValueHolder).put(child.getKey(), encryptedValue);
} else if (!child.getValue().isNull()){
assert propertyValueHolder instanceof ObjectNode;
encryptAndSerializeValue(encryptionSettings, (ObjectNode) propertyValueHolder, child.getValue(),
child.getKey());
}
}
}

else if (propertyValueHolder.isArray()) {
assert propertyValueHolder instanceof ArrayNode;
ArrayNode arrayNode = (ArrayNode) propertyValueHolder;
if (arrayNode.elements().next().isObject() || arrayNode.elements().next().isArray()) {
List<JsonNode> encryptedArray = new ArrayList<>();
for (Iterator<JsonNode> arrayIterator = arrayNode.elements(); arrayIterator.hasNext(); ) {
JsonNode nodeInArray = arrayIterator.next();
if (nodeInArray.isArray()) {
encryptedArray.add(encryptAndSerializePatchProperty(encryptionSettings, nodeInArray, propertyName));
} else {
for (Iterator<Map.Entry<String, JsonNode>> it = nodeInArray.fields(); it.hasNext(); ) {
Map.Entry<String, JsonNode> child = it.next();
if (child.getValue().isObject() || child.getValue().isArray()) {
JsonNode encryptedValue = encryptAndSerializePatchProperty(encryptionSettings,
child.getValue(), child.getKey());
((ObjectNode) nodeInArray).put(child.getKey(), encryptedValue);

} else if (!child.getValue().isNull()) {
encryptAndSerializeValue(encryptionSettings, (ObjectNode) nodeInArray, child.getValue(),
child.getKey());
}
}
encryptedArray.add(nodeInArray);
}
}
arrayNode.removeAll();
for (JsonNode encryptedValue : encryptedArray) {
arrayNode.add(encryptedValue);
}
} else {
List<byte[]> encryptedArray = new ArrayList<>();
for (Iterator<JsonNode> it = arrayNode.elements(); it.hasNext(); ) {
encryptedArray.add(encryptAndSerializeValue(encryptionSettings, null, it.next(),
StringUtils.EMPTY));
}
arrayNode.removeAll();
for (byte[] encryptedValue : encryptedArray) {
arrayNode.add(encryptedValue);
}
}
return arrayNode;
} else {
encryptAndSerializeValue(encryptionSettings, null, propertyValueHolder, propertyName);
}
return propertyValueHolder;
}

public void encryptAndSerializeProperty(EncryptionSettings encryptionSettings, JsonNode objectNode,
JsonNode propertyValueHolder, String propertyName) throws MicrosoftDataEncryptionException, JsonProcessingException {

if (propertyValueHolder.isObject()) {
for (Iterator<Map.Entry<String, JsonNode>> it = propertyValueHolder.fields(); it.hasNext(); ) {
Expand Down Expand Up @@ -325,7 +430,7 @@ public void encryptAndSerializeProperty(EncryptionSettings encryptionSettings, O
}
}
} else {
encryptAndSerializeValue(encryptionSettings, objectNode, propertyValueHolder, propertyName);
encryptAndSerializeValue(encryptionSettings, (ObjectNode) objectNode, propertyValueHolder, propertyName);
}
}

Expand Down Expand Up @@ -458,7 +563,6 @@ public JsonNode decryptAndSerializeValue(EncryptionSettings encryptionSettings,
JsonNode propertyValueHolder, String propertyName) throws MicrosoftDataEncryptionException, IOException {
byte[] cipherText;
byte[] cipherTextWithTypeMarker;

cipherTextWithTypeMarker = propertyValueHolder.binaryValue();
cipherText = new byte[cipherTextWithTypeMarker.length - 1];
System.arraycopy(cipherTextWithTypeMarker, 1, cipherText, 0,
Expand Down
Loading

0 comments on commit b03f3da

Please sign in to comment.