Skip to content

Commit

Permalink
[Storage] Changes to decryption codepath to allow v2.1 (#37455)
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenttran-msft authored Sep 25, 2024
1 parent 200f57e commit b989e00
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 25 deletions.
23 changes: 13 additions & 10 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@

_ENCRYPTION_PROTOCOL_V1 = '1.0'
_ENCRYPTION_PROTOCOL_V2 = '2.0'
_ENCRYPTION_PROTOCOL_V2_1 = '2.1'
_VALID_ENCRYPTION_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
_ENCRYPTION_V2_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
_GCM_REGION_DATA_LENGTH = 4 * 1024 * 1024
_GCM_NONCE_LENGTH = 12
_GCM_TAG_LENGTH = 16
Expand Down Expand Up @@ -293,14 +296,14 @@ def encrypt_data_v2(data: bytes, nonce: int, key: bytes) -> bytes:

def is_encryption_v2(encryption_data: Optional[_EncryptionData]) -> bool:
"""
Determine whether the given encryption data signifies version 2.0.
Determine whether the given encryption data signifies version 2.0 or 2.1.
:param Optional[_EncryptionData] encryption_data: The encryption data. Will return False if this is None.
:return: True, if the encryption data indicates encryption V2, false otherwise.
:rtype: bool
"""
# If encryption_data is None, assume no encryption
return bool(encryption_data and (encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2))
return bool(encryption_data and (encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS))


def modify_user_agent_for_encryption(
Expand Down Expand Up @@ -405,7 +408,7 @@ def get_adjusted_download_range_and_offset(
end_offset = 15 - (end % 16)
end += end_offset

elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
start_offset, end_offset = 0, end

if encryption_data.encrypted_region_info is None:
Expand Down Expand Up @@ -550,7 +553,7 @@ def _dict_to_encryption_data(encryption_data_dict: Dict[str, Any]) -> _Encryptio
"""
try:
protocol = encryption_data_dict['EncryptionAgent']['Protocol']
if protocol not in [_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2]:
if protocol not in _VALID_ENCRYPTION_PROTOCOLS:
raise ValueError("Unsupported encryption version.")
except KeyError as exc:
raise ValueError("Unsupported encryption version.") from exc
Expand Down Expand Up @@ -636,7 +639,7 @@ def _validate_and_unwrap_cek(
# Validate we have the right info for the specified version
if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V1:
_validate_not_none('content_encryption_IV', encryption_data.content_encryption_IV)
elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
_validate_not_none('encrypted_region_info', encryption_data.encrypted_region_info)
else:
raise ValueError('Specified encryption version is not supported.')
Expand All @@ -662,8 +665,8 @@ def _validate_and_unwrap_cek(

# For V2, the version is included with the cek. We need to validate it
# and remove it from the actual cek.
if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
version_2_bytes = _ENCRYPTION_PROTOCOL_V2.encode().ljust(8, b'\0')
if encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
version_2_bytes = encryption_data.encryption_agent.protocol.encode().ljust(8, b'\0')
cek_version_bytes = content_encryption_key[:len(version_2_bytes)]
if cek_version_bytes != version_2_bytes:
raise ValueError('The encryption metadata is not valid and may have been modified.')
Expand Down Expand Up @@ -722,7 +725,7 @@ def _decrypt_message(
unpadder = PKCS7(128).unpadder()
decrypted_data = (unpadder.update(decrypted_data) + unpadder.finalize())

elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
block_info = encryption_data.encrypted_region_info
if not block_info or not block_info.nonce_length:
raise ValueError("Missing required metadata for decryption.")
Expand Down Expand Up @@ -894,7 +897,7 @@ def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements
raise ValueError('Specified encryption algorithm is not supported.')

version = encryption_data.encryption_agent.protocol
if version not in (_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2):
if version not in _VALID_ENCRYPTION_PROTOCOLS:
raise ValueError('Specified encryption version is not supported.')

content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
Expand Down Expand Up @@ -945,7 +948,7 @@ def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements

return content[start_offset: len(content) - end_offset]

if version == _ENCRYPTION_PROTOCOL_V2:
if version in _ENCRYPTION_V2_PROTOCOLS:
# We assume the content contains only full encryption regions
total_size = len(content)
offset = 0
Expand Down
40 changes: 25 additions & 15 deletions sdk/storage/azure-storage-queue/azure/storage/queue/_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@

_ENCRYPTION_PROTOCOL_V1 = '1.0'
_ENCRYPTION_PROTOCOL_V2 = '2.0'
_ENCRYPTION_PROTOCOL_V2_1 = '2.1'
_VALID_ENCRYPTION_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
_ENCRYPTION_V2_PROTOCOLS = [_ENCRYPTION_PROTOCOL_V2, _ENCRYPTION_PROTOCOL_V2_1]
_GCM_REGION_DATA_LENGTH = 4 * 1024 * 1024
_GCM_NONCE_LENGTH = 12
_GCM_TAG_LENGTH = 16

_ERROR_OBJECT_INVALID = \
'{0} does not define a complete interface. Value of {1} is either missing or invalid.'

_ERROR_UNSUPPORTED_METHOD_FOR_ENCRYPTION = (
'The require_encryption flag is set, but encryption is not supported'
' for this method.')


class KeyEncryptionKey(Protocol):

Expand Down Expand Up @@ -289,14 +296,14 @@ def encrypt_data_v2(data: bytes, nonce: int, key: bytes) -> bytes:

def is_encryption_v2(encryption_data: Optional[_EncryptionData]) -> bool:
"""
Determine whether the given encryption data signifies version 2.0.
Determine whether the given encryption data signifies version 2.0 or 2.1.
:param Optional[_EncryptionData] encryption_data: The encryption data. Will return False if this is None.
:return: True, if the encryption data indicates encryption V2, false otherwise.
:rtype: bool
"""
# If encryption_data is None, assume no encryption
return bool(encryption_data and (encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2))
return bool(encryption_data and (encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS))


def modify_user_agent_for_encryption(
Expand Down Expand Up @@ -357,7 +364,7 @@ def get_adjusted_upload_size(length: int, encryption_version: str) -> int:
def get_adjusted_download_range_and_offset(
start: int,
end: int,
length: int,
length: Optional[int],
encryption_data: Optional[_EncryptionData]) -> Tuple[Tuple[int, int], Tuple[int, int]]:
"""
Gets the new download range and offsets into the decrypted data for
Expand All @@ -374,7 +381,7 @@ def get_adjusted_download_range_and_offset(
:param int start: The user-requested start index.
:param int end: The user-requested end index.
:param int length: The user-requested length. Only used for V1.
:param Optional[int] length: The user-requested length. Only used for V1.
:param Optional[_EncryptionData] encryption_data: The encryption data to determine version and sizes.
:return: (new start, new end), (start offset, end offset)
:rtype: Tuple[Tuple[int, int], Tuple[int, int]]
Expand All @@ -401,7 +408,7 @@ def get_adjusted_download_range_and_offset(
end_offset = 15 - (end % 16)
end += end_offset

elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
start_offset, end_offset = 0, end

if encryption_data.encrypted_region_info is None:
Expand Down Expand Up @@ -451,17 +458,20 @@ def parse_encryption_data(metadata: Dict[str, Any]) -> Optional[_EncryptionData]
return None


def adjust_blob_size_for_encryption(size: int, encryption_data: _EncryptionData) -> int:
def adjust_blob_size_for_encryption(size: int, encryption_data: Optional[_EncryptionData]) -> int:
"""
Adjusts the given blob size for encryption by subtracting the size of
the encryption data (nonce + tag). This only has an affect for encryption V2.
:param int size: The original blob size.
:param _EncryptionData encryption_data: The encryption data to determine version and sizes.
:param Optional[_EncryptionData] encryption_data: The encryption data to determine version and sizes.
:return: The new blob size.
:rtype: int
"""
if is_encryption_v2(encryption_data) and encryption_data.encrypted_region_info is not None:
if (encryption_data is not None and
encryption_data.encrypted_region_info is not None and
is_encryption_v2(encryption_data)):

nonce_length = encryption_data.encrypted_region_info.nonce_length
data_length = encryption_data.encrypted_region_info.data_length
tag_length = encryption_data.encrypted_region_info.tag_length
Expand Down Expand Up @@ -543,7 +553,7 @@ def _dict_to_encryption_data(encryption_data_dict: Dict[str, Any]) -> _Encryptio
"""
try:
protocol = encryption_data_dict['EncryptionAgent']['Protocol']
if protocol not in [_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2]:
if protocol not in _VALID_ENCRYPTION_PROTOCOLS:
raise ValueError("Unsupported encryption version.")
except KeyError as exc:
raise ValueError("Unsupported encryption version.") from exc
Expand Down Expand Up @@ -629,7 +639,7 @@ def _validate_and_unwrap_cek(
# Validate we have the right info for the specified version
if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V1:
_validate_not_none('content_encryption_IV', encryption_data.content_encryption_IV)
elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
_validate_not_none('encrypted_region_info', encryption_data.encrypted_region_info)
else:
raise ValueError('Specified encryption version is not supported.')
Expand All @@ -655,8 +665,8 @@ def _validate_and_unwrap_cek(

# For V2, the version is included with the cek. We need to validate it
# and remove it from the actual cek.
if encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
version_2_bytes = _ENCRYPTION_PROTOCOL_V2.encode().ljust(8, b'\0')
if encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
version_2_bytes = encryption_data.encryption_agent.protocol.encode().ljust(8, b'\0')
cek_version_bytes = content_encryption_key[:len(version_2_bytes)]
if cek_version_bytes != version_2_bytes:
raise ValueError('The encryption metadata is not valid and may have been modified.')
Expand Down Expand Up @@ -715,7 +725,7 @@ def _decrypt_message(
unpadder = PKCS7(128).unpadder()
decrypted_data = (unpadder.update(decrypted_data) + unpadder.finalize())

elif encryption_data.encryption_agent.protocol == _ENCRYPTION_PROTOCOL_V2:
elif encryption_data.encryption_agent.protocol in _ENCRYPTION_V2_PROTOCOLS:
block_info = encryption_data.encrypted_region_info
if not block_info or not block_info.nonce_length:
raise ValueError("Missing required metadata for decryption.")
Expand Down Expand Up @@ -887,7 +897,7 @@ def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements
raise ValueError('Specified encryption algorithm is not supported.')

version = encryption_data.encryption_agent.protocol
if version not in (_ENCRYPTION_PROTOCOL_V1, _ENCRYPTION_PROTOCOL_V2):
if version not in _VALID_ENCRYPTION_PROTOCOLS:
raise ValueError('Specified encryption version is not supported.')

content_encryption_key = _validate_and_unwrap_cek(encryption_data, key_encryption_key, key_resolver)
Expand Down Expand Up @@ -938,7 +948,7 @@ def decrypt_blob( # pylint: disable=too-many-locals,too-many-statements

return content[start_offset: len(content) - end_offset]

if version == _ENCRYPTION_PROTOCOL_V2:
if version in _ENCRYPTION_V2_PROTOCOLS:
# We assume the content contains only full encryption regions
total_size = len(content)
offset = 0
Expand Down

0 comments on commit b989e00

Please sign in to comment.