From 72448e68e69caa997906c0f764a4467dc2b389e7 Mon Sep 17 00:00:00 2001 From: Ilya Taratukhin Date: Fri, 9 Aug 2024 12:15:07 +0200 Subject: [PATCH 1/3] refactor: move deserializer from ApiClient to static class --- fingerprint_pro_server_api_sdk/api_client.py | 133 ++++++++++--------- template/api_client.mustache | 133 ++++++++++--------- 2 files changed, 142 insertions(+), 124 deletions(-) diff --git a/fingerprint_pro_server_api_sdk/api_client.py b/fingerprint_pro_server_api_sdk/api_client.py index d5a85495..f97a3a12 100644 --- a/fingerprint_pro_server_api_sdk/api_client.py +++ b/fingerprint_pro_server_api_sdk/api_client.py @@ -25,6 +25,18 @@ from fingerprint_pro_server_api_sdk.rest import ApiException, RESTResponse from fingerprint_pro_server_api_sdk.base_model import BaseModel +PRIMITIVE_TYPES = (float, bool, bytes, str, int) +NATIVE_TYPES_MAPPING = { + 'int': int, + 'long': int, + 'float': float, + 'str': str, + 'bool': bool, + 'date': date, + 'datetime': datetime, + 'object': object, +} + class ApiClient: """Generic API client for Swagger client library builds. @@ -46,18 +58,6 @@ class ApiClient: to the API """ - PRIMITIVE_TYPES = (float, bool, bytes, str, int) - NATIVE_TYPES_MAPPING = { - 'int': int, - 'long': int, - 'float': float, - 'str': str, - 'bool': bool, - 'date': date, - 'datetime': datetime, - 'object': object, - } - def __init__(self, configuration: Optional[Configuration] = None, header_name: Optional[str] = None, header_value: Optional[str] = None, cookie: Optional[str] = None, pool: Optional[Pool] = None): if configuration is None: @@ -197,7 +197,7 @@ def sanitize_for_serialization(self, obj: Union[Dict[str, Any], List[Tuple[str, """ if obj is None: return None - elif isinstance(obj, self.PRIMITIVE_TYPES): + elif isinstance(obj, PRIMITIVE_TYPES): return obj elif isinstance(obj, list): return [self.sanitize_for_serialization(sub_obj) @@ -251,46 +251,7 @@ def deserialize(self, response: Union[RESTResponse, ApiException], response_type except ValueError: data = response.data - return self.__deserialize(data, response_type) - - def __deserialize(self, data: Union[Dict, List, str], klass: Any): - """Deserializes dict, list, str into an object. - - :param data: dict, list or str. - :param klass: class literal, or string of class name. - - :return: object. - """ - if data is None: - return None - - if type(klass) == str: - if klass.startswith('list['): - sub_kls = re.match(r'list\[(.*)\]', klass).group(1) - return [self.__deserialize(sub_data, sub_kls) - for sub_data in data] - - if klass.startswith('dict('): - sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2) - return {k: self.__deserialize(v, sub_kls) - for k, v in data.items()} - - # convert str to class - if klass in self.NATIVE_TYPES_MAPPING: - klass = self.NATIVE_TYPES_MAPPING[klass] - else: - klass = getattr(fingerprint_pro_server_api_sdk.models, klass) - - if klass in self.PRIMITIVE_TYPES: - return self.__deserialize_primitive(data, klass) - elif klass == object: - return data - elif klass == date: - return self.__deserialize_date(data) - elif klass == datetime: - return self.__deserialize_datatime(data) - else: - return self.__deserialize_model(data, klass) + return ApiClientDeserializer.deserialize(data, response_type) def call_api(self, resource_path: str, method: str, path_params: Optional[Dict[str, Any]] = None, query_params: Optional[List[Tuple[str, Any]]] = None, header_params: Optional[Dict[str, Any]] = None, @@ -515,7 +476,51 @@ def __deserialize_file(self, response): f.write(response_data) return path - def __deserialize_primitive(self, data, klass): + +class ApiClientDeserializer: + """Deserializes server response into appropriate type.""" + @staticmethod + def deserialize(data: Union[Dict, List, str], klass: Any): + """Deserializes dict, list, str into an object. + + :param data: dict, list or str. + :param klass: class literal, or string of class name. + + :return: object. + """ + if data is None: + return None + + if type(klass) == str: + if klass.startswith('list['): + sub_kls = re.match(r'list\[(.*)\]', klass).group(1) + return [ApiClientDeserializer.deserialize(sub_data, sub_kls) + for sub_data in data] + + if klass.startswith('dict('): + sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2) + return {k: ApiClientDeserializer.deserialize(v, sub_kls) + for k, v in data.items()} + + # convert str to class + if klass in NATIVE_TYPES_MAPPING: + klass = NATIVE_TYPES_MAPPING[klass] + else: + klass = getattr(fingerprint_pro_server_api_sdk.models, klass) + + if klass in PRIMITIVE_TYPES: + return ApiClientDeserializer.__deserialize_primitive(data, klass) + elif klass == object: + return data + elif klass == date: + return ApiClientDeserializer.__deserialize_date(data) + elif klass == datetime: + return ApiClientDeserializer.__deserialize_datatime(data) + else: + return ApiClientDeserializer.__deserialize_model(data, klass) + + @staticmethod + def __deserialize_primitive(data, klass): """Deserializes string to primitive type. :param data: str. @@ -530,7 +535,8 @@ def __deserialize_primitive(self, data, klass): except TypeError: return data - def __deserialize_date(self, string: str) -> date: + @staticmethod + def __deserialize_date(string: str) -> date: """Deserializes string to date. :param string: str. @@ -546,7 +552,8 @@ def __deserialize_date(self, string: str) -> date: reason="Failed to parse `{0}` as date object".format(string) ) - def __deserialize_datatime(self, string: str) -> datetime: + @staticmethod + def __deserialize_datatime(string: str) -> datetime: """Deserializes string to datetime. The string should be in iso8601 datetime format. @@ -567,10 +574,12 @@ def __deserialize_datatime(self, string: str) -> datetime: ) ) - def __hasattr(self, object, name): + @staticmethod + def __hasattr(object, name): return name in object.__class__.__dict__ - def __deserialize_model(self, data, klass): + @staticmethod + def __deserialize_model(data, klass): """Deserializes list or dict to model. :param data: dict, list. @@ -578,7 +587,7 @@ def __deserialize_model(self, data, klass): :return: model object. """ - if not klass.swagger_types and not self.__hasattr(klass, 'get_real_child_model'): + if not klass.swagger_types and not ApiClientDeserializer.__hasattr(klass, 'get_real_child_model'): return data kwargs = {} @@ -588,7 +597,7 @@ def __deserialize_model(self, data, klass): klass.attribute_map[attr] in data and isinstance(data, (list, dict))): value = data[klass.attribute_map[attr]] - kwargs[attr] = self.__deserialize(value, attr_type) + kwargs[attr] = ApiClientDeserializer.deserialize(value, attr_type) instance = klass(**kwargs) @@ -598,8 +607,8 @@ def __deserialize_model(self, data, klass): for key, value in data.items(): if key not in klass.swagger_types: instance[key] = value - if self.__hasattr(instance, 'get_real_child_model'): + if ApiClientDeserializer.__hasattr(instance, 'get_real_child_model'): klass_name = instance.get_real_child_model(data) if klass_name: - instance = self.__deserialize(data, klass_name) + instance = ApiClientDeserializer.deserialize(data, klass_name) return instance diff --git a/template/api_client.mustache b/template/api_client.mustache index 29557724..b15f0c65 100644 --- a/template/api_client.mustache +++ b/template/api_client.mustache @@ -17,6 +17,18 @@ from {{packageName}} import rest from {{packageName}}.rest import ApiException, RESTResponse from {{packageName}}.base_model import BaseModel +PRIMITIVE_TYPES = (float, bool, bytes, str, int) +NATIVE_TYPES_MAPPING = { + 'int': int, + 'long': int, + 'float': float, + 'str': str, + 'bool': bool, + 'date': date, + 'datetime': datetime, + 'object': object, +} + class ApiClient: """Generic API client for Swagger client library builds. @@ -38,18 +50,6 @@ class ApiClient: to the API """ - PRIMITIVE_TYPES = (float, bool, bytes, str, int) - NATIVE_TYPES_MAPPING = { - 'int': int, - 'long': int, - 'float': float, - 'str': str, - 'bool': bool, - 'date': date, - 'datetime': datetime, - 'object': object, - } - def __init__(self, configuration: Optional[Configuration] = None, header_name: Optional[str] = None, header_value: Optional[str] = None, cookie: Optional[str] = None, pool: Optional[Pool] = None): if configuration is None: @@ -189,7 +189,7 @@ class ApiClient: """ if obj is None: return None - elif isinstance(obj, self.PRIMITIVE_TYPES): + elif isinstance(obj, PRIMITIVE_TYPES): return obj elif isinstance(obj, list): return [self.sanitize_for_serialization(sub_obj) @@ -243,46 +243,7 @@ class ApiClient: except ValueError: data = response.data - return self.__deserialize(data, response_type) - - def __deserialize(self, data: Union[Dict, List, str], klass: Any): - """Deserializes dict, list, str into an object. - - :param data: dict, list or str. - :param klass: class literal, or string of class name. - - :return: object. - """ - if data is None: - return None - - if type(klass) == str: - if klass.startswith('list['): - sub_kls = re.match(r'list\[(.*)\]', klass).group(1) - return [self.__deserialize(sub_data, sub_kls) - for sub_data in data] - - if klass.startswith('dict('): - sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2) - return {k: self.__deserialize(v, sub_kls) - for k, v in data.items()} - - # convert str to class - if klass in self.NATIVE_TYPES_MAPPING: - klass = self.NATIVE_TYPES_MAPPING[klass] - else: - klass = getattr({{modelPackage}}, klass) - - if klass in self.PRIMITIVE_TYPES: - return self.__deserialize_primitive(data, klass) - elif klass == object: - return data - elif klass == date: - return self.__deserialize_date(data) - elif klass == datetime: - return self.__deserialize_datatime(data) - else: - return self.__deserialize_model(data, klass) + return ApiClientDeserializer.deserialize(data, response_type) def call_api(self, resource_path: str, method: str, path_params: Optional[Dict[str, Any]] = None, query_params: Optional[List[Tuple[str, Any]]] = None, header_params: Optional[Dict[str, Any]] = None, @@ -507,7 +468,51 @@ class ApiClient: f.write(response_data) return path - def __deserialize_primitive(self, data, klass): + +class ApiClientDeserializer: + """Deserializes server response into appropriate type.""" + @staticmethod + def deserialize(data: Union[Dict, List, str], klass: Any): + """Deserializes dict, list, str into an object. + + :param data: dict, list or str. + :param klass: class literal, or string of class name. + + :return: object. + """ + if data is None: + return None + + if type(klass) == str: + if klass.startswith('list['): + sub_kls = re.match(r'list\[(.*)\]', klass).group(1) + return [ApiClientDeserializer.deserialize(sub_data, sub_kls) + for sub_data in data] + + if klass.startswith('dict('): + sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2) + return {k: ApiClientDeserializer.deserialize(v, sub_kls) + for k, v in data.items()} + + # convert str to class + if klass in NATIVE_TYPES_MAPPING: + klass = NATIVE_TYPES_MAPPING[klass] + else: + klass = getattr({{modelPackage}}, klass) + + if klass in PRIMITIVE_TYPES: + return ApiClientDeserializer.__deserialize_primitive(data, klass) + elif klass == object: + return data + elif klass == date: + return ApiClientDeserializer.__deserialize_date(data) + elif klass == datetime: + return ApiClientDeserializer.__deserialize_datatime(data) + else: + return ApiClientDeserializer.__deserialize_model(data, klass) + + @staticmethod + def __deserialize_primitive(data, klass): """Deserializes string to primitive type. :param data: str. @@ -522,7 +527,8 @@ class ApiClient: except TypeError: return data - def __deserialize_date(self, string: str) -> date: + @staticmethod + def __deserialize_date(string: str) -> date: """Deserializes string to date. :param string: str. @@ -538,7 +544,8 @@ class ApiClient: reason="Failed to parse `{0}` as date object".format(string) ) - def __deserialize_datatime(self, string: str) -> datetime: + @staticmethod + def __deserialize_datatime(string: str) -> datetime: """Deserializes string to datetime. The string should be in iso8601 datetime format. @@ -559,10 +566,12 @@ class ApiClient: ) ) - def __hasattr(self, object, name): + @staticmethod + def __hasattr(object, name): return name in object.__class__.__dict__ - def __deserialize_model(self, data, klass): + @staticmethod + def __deserialize_model(data, klass): """Deserializes list or dict to model. :param data: dict, list. @@ -570,7 +579,7 @@ class ApiClient: :return: model object. """ - if not klass.swagger_types and not self.__hasattr(klass, 'get_real_child_model'): + if not klass.swagger_types and not ApiClientDeserializer.__hasattr(klass, 'get_real_child_model'): return data kwargs = {} @@ -580,7 +589,7 @@ class ApiClient: klass.attribute_map[attr] in data and isinstance(data, (list, dict))): value = data[klass.attribute_map[attr]] - kwargs[attr] = self.__deserialize(value, attr_type) + kwargs[attr] = ApiClientDeserializer.deserialize(value, attr_type) instance = klass(**kwargs) @@ -590,8 +599,8 @@ class ApiClient: for key, value in data.items(): if key not in klass.swagger_types: instance[key] = value - if self.__hasattr(instance, 'get_real_child_model'): + if ApiClientDeserializer.__hasattr(instance, 'get_real_child_model'): klass_name = instance.get_real_child_model(data) if klass_name: - instance = self.__deserialize(data, klass_name) + instance = ApiClientDeserializer.deserialize(data, klass_name) return instance From 1ed2fa3938bc589e76a834375b2779dc71d8c183 Mon Sep 17 00:00:00 2001 From: Ilya Taratukhin Date: Fri, 9 Aug 2024 12:16:01 +0200 Subject: [PATCH 2/3] chore: fix forgotten template for `setup.sh` --- template/setup.mustache | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/template/setup.mustache b/template/setup.mustache index ad748fee..74130502 100644 --- a/template/setup.mustache +++ b/template/setup.mustache @@ -46,7 +46,11 @@ setup( license_files=["LICENSE"], author="Fingerprint", author_email="{{infoEmail}}", - url="{{packageUrl}}", + project_urls={ + "Changelog": "https://github.com/fingerprintjs/fingerprint-pro-server-api-python-sdk/blob/main/CHANGELOG.md", + "Code": "https://github.com/fingerprintjs/fingerprint-pro-server-api-python-sdk", + "Issue Tracker": "https://github.com/fingerprintjs/fingerprint-pro-server-api-python-sdk/issues", + }, keywords=["Swagger", "{{appName}}", "browser", "detection", "fingerprint", "identification", "fingerprinting", "browser-fingerprinting", "browser-fingerprint", "fraud-detection", "fraud", "audio-fingerprinting", "fingerprintjs", "fingerprintjs-pro", "visitor-identifier"], From a7a0e2d3c0c42fb225545673954ee816917d3124 Mon Sep 17 00:00:00 2001 From: Ilya Taratukhin Date: Fri, 9 Aug 2024 12:28:58 +0200 Subject: [PATCH 3/3] fix: `unseal_event_response` returns correct `EventResponse` structure BREAKING CHANGE: rename `unseal_events_response` to `unseal_event_response` to keep proper naming --- README.md | 7 ++-- docs/SealedResults.md | 4 +- fingerprint_pro_server_api_sdk/__init__.py | 2 + fingerprint_pro_server_api_sdk/sealed.py | 44 +++++++++++++++------- sealed_results_example.py | 7 ++-- template/README.mustache | 7 ++-- template/__init__package.mustache | 2 + test/test_sealed.py | 43 +++++++++++---------- 8 files changed, 68 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 2366ca7a..0da0b992 100644 --- a/README.md +++ b/README.md @@ -182,8 +182,7 @@ import os from dotenv import load_dotenv -from fingerprint_pro_server_api_sdk import EventResponse -from fingerprint_pro_server_api_sdk.sealed import unseal_events_response, DecryptionKey, DecryptionAlgorithm +from fingerprint_pro_server_api_sdk import unseal_event_response, DecryptionKey, DecryptionAlgorithm load_dotenv() @@ -191,8 +190,8 @@ sealed_result = base64.b64decode(os.environ["BASE64_SEALED_RESULT"]) key = base64.b64decode(os.environ["BASE64_KEY"]) try: - events_response: EventResponse = unseal_events_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])]) - print("\n\n\nEvent response: \n", events_response.products) + event_response = unseal_event_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])]) + print("\n\n\nEvent response: \n", event_response.products) except Exception as e: print("Exception when calling unsealing events response: %s\n" % e) exit(1) diff --git a/docs/SealedResults.md b/docs/SealedResults.md index 469235e6..6455816f 100644 --- a/docs/SealedResults.md +++ b/docs/SealedResults.md @@ -1,7 +1,7 @@ # Sealed results -## **UnsealEventsResponse** -> unseal_events_response(sealed bytes, keys DecryptionKey[]) -> EventResponse +## **UnsealEventResponse** +> unseal_event_response(sealed: bytes, keys: DecryptionKey[]) -> EventResponse Decrypts the sealed response with provided keys. ### Required Parameters diff --git a/fingerprint_pro_server_api_sdk/__init__.py b/fingerprint_pro_server_api_sdk/__init__.py index def5164f..72371d07 100644 --- a/fingerprint_pro_server_api_sdk/__init__.py +++ b/fingerprint_pro_server_api_sdk/__init__.py @@ -113,3 +113,5 @@ from fingerprint_pro_server_api_sdk.models.webhook_visit import WebhookVisit # import custom methods into sdk package from fingerprint_pro_server_api_sdk.webhook import Webhook +from fingerprint_pro_server_api_sdk.sealed import ApiClientDeserializer, DecryptionAlgorithm, DecryptionKey, \ + UnsealError, UnsealAggregateError, unseal_event_response diff --git a/fingerprint_pro_server_api_sdk/sealed.py b/fingerprint_pro_server_api_sdk/sealed.py index a2b50e82..c05bdf6b 100644 --- a/fingerprint_pro_server_api_sdk/sealed.py +++ b/fingerprint_pro_server_api_sdk/sealed.py @@ -1,8 +1,11 @@ import json +from typing import List + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend import zlib +from fingerprint_pro_server_api_sdk.api_client import ApiClientDeserializer from fingerprint_pro_server_api_sdk.models.event_response import EventResponse SEALED_HEADER = bytes([0x9e, 0x85, 0xdc, 0xed]) @@ -12,41 +15,53 @@ class DecryptionKey: - def __init__(self, key, algorithm): + """Key for decryption of sealed data.""" + exception: Exception + algorithm: str + + def __init__(self, key: bytes, algorithm: str): self.key = key self.algorithm = algorithm class UnsealError(Exception): + """Error during unsealing.""" exception: Exception key: DecryptionKey - def __init__(self, exception, key): + def __init__(self, exception: Exception, key: DecryptionKey): self.exception = exception self.key = key class UnsealAggregateError(Exception): - def __init__(self, errors): + """Aggregated error during unsealing.""" + errors: List[UnsealError] + + def __init__(self, errors: List[UnsealError]): self.errors = errors super().__init__("Unable to decrypt sealed data") -def parse_events_response(unsealed): - json_data = json.loads(unsealed) +def unseal_event_response(sealed_data: bytes, decryption_keys: List[DecryptionKey]) -> EventResponse: + """Unseal event response with one of the provided keys.""" + unsealed = __unseal(sealed_data, decryption_keys) + return __parse_event_response(unsealed) - if 'products' not in json_data: - raise ValueError('Sealed data is not valid events response') - return EventResponse(json_data['products']) +def __parse_event_response(unsealed: str) -> EventResponse: + """Parse event response from unsealed data.""" + json_data = json.loads(unsealed) + if 'products' not in json_data: + raise ValueError('Sealed data is not valid event response') -def unseal_events_response(sealed_data, decryption_keys): - unsealed = unseal(sealed_data, decryption_keys) - return parse_events_response(unsealed) + result: EventResponse = ApiClientDeserializer.deserialize(json_data, 'EventResponse') + return result -def unseal(sealed_data, decryption_keys): +def __unseal(sealed_data: bytes, decryption_keys: List[DecryptionKey]) -> str: + """Unseal data with one of the provided keys.""" if sealed_data[:len(SEALED_HEADER)].hex() != SEALED_HEADER.hex(): raise ValueError('Invalid sealed data header') @@ -54,7 +69,7 @@ def unseal(sealed_data, decryption_keys): for decryption_key in decryption_keys: if decryption_key.algorithm == DecryptionAlgorithm['Aes256Gcm']: try: - return unseal_aes256gcm(sealed_data, decryption_key.key) + return __unseal_aes256gcm(sealed_data, decryption_key.key) except Exception as e: errors.append(UnsealError(e, decryption_key)) continue @@ -64,7 +79,8 @@ def unseal(sealed_data, decryption_keys): raise UnsealAggregateError(errors) -def unseal_aes256gcm(sealed_data, decryption_key): +def __unseal_aes256gcm(sealed_data: bytes, decryption_key: bytes) -> str: + """Unseal data with AES-256-GCM.""" nonce_length = 12 nonce = sealed_data[len(SEALED_HEADER):len(SEALED_HEADER) + nonce_length] diff --git a/sealed_results_example.py b/sealed_results_example.py index dd78ed9f..747c47c0 100644 --- a/sealed_results_example.py +++ b/sealed_results_example.py @@ -3,8 +3,7 @@ from dotenv import load_dotenv -from fingerprint_pro_server_api_sdk import EventResponse -from fingerprint_pro_server_api_sdk.sealed import unseal_events_response, DecryptionKey, DecryptionAlgorithm +from fingerprint_pro_server_api_sdk.sealed import unseal_event_response, DecryptionKey, DecryptionAlgorithm load_dotenv() @@ -12,8 +11,8 @@ key = base64.b64decode(os.environ["BASE64_KEY"]) try: - events_response: EventResponse = unseal_events_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])]) - print("\n\n\nEvent response: \n", events_response.products) + event_response = unseal_event_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])]) + print("\n\n\nEvent response: \n", event_response.products) except Exception as e: print("Exception when calling unsealing events response: %s\n" % e) exit(1) diff --git a/template/README.mustache b/template/README.mustache index 8aae40da..8a0007e7 100644 --- a/template/README.mustache +++ b/template/README.mustache @@ -188,8 +188,7 @@ import os from dotenv import load_dotenv -from fingerprint_pro_server_api_sdk import EventResponse -from fingerprint_pro_server_api_sdk.sealed import unseal_events_response, DecryptionKey, DecryptionAlgorithm +from fingerprint_pro_server_api_sdk import unseal_event_response, DecryptionKey, DecryptionAlgorithm load_dotenv() @@ -197,8 +196,8 @@ sealed_result = base64.b64decode(os.environ["BASE64_SEALED_RESULT"]) key = base64.b64decode(os.environ["BASE64_KEY"]) try: - events_response: EventResponse = unseal_events_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])]) - print("\n\n\nEvent response: \n", events_response.products) + event_response = unseal_event_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])]) + print("\n\n\nEvent response: \n", event_response.products) except Exception as e: print("Exception when calling unsealing events response: %s\n" % e) exit(1) diff --git a/template/__init__package.mustache b/template/__init__package.mustache index 85c95b59..58f8e5ed 100644 --- a/template/__init__package.mustache +++ b/template/__init__package.mustache @@ -17,3 +17,5 @@ from {{packageName}}.base_model import BaseModel {{/model}}{{/models}} # import custom methods into sdk package from {{packageName}}.webhook import Webhook +from {{packageName}}.sealed import ApiClientDeserializer, DecryptionAlgorithm, DecryptionKey, \ + UnsealError, UnsealAggregateError, unseal_event_response diff --git a/test/test_sealed.py b/test/test_sealed.py index 30ee0c28..658f870c 100644 --- a/test/test_sealed.py +++ b/test/test_sealed.py @@ -3,10 +3,9 @@ import json import unittest -from fingerprint_pro_server_api_sdk import EventResponse -from fingerprint_pro_server_api_sdk.sealed import DecryptionAlgorithm, DecryptionKey, unseal_events_response, \ - UnsealError, UnsealAggregateError - +from fingerprint_pro_server_api_sdk import ApiClientDeserializer, DecryptionAlgorithm, DecryptionKey, \ + unseal_event_response, UnsealError, UnsealAggregateError, EventResponse, ProductsResponse, \ + ProductsResponseIdentification, BrowserDetails class TestSealed(unittest.TestCase): valid_key = base64.b64decode('p2PA7MGy5tx56cnyJaFZMr96BCFwZeHjZV2EqMvTq53=') @@ -14,24 +13,28 @@ class TestSealed(unittest.TestCase): def test_unseal_aes256gcm(self): with io.open("./test/mocks/sealed_result.json", 'r', encoding='utf-8') as f: - expected_result = EventResponse(json.load(f)['products']) + expected_result = ApiClientDeserializer.deserialize(json.load(f), 'EventResponse') sealed_data = base64.b64decode( 'noXc7SXO+mqeAGrvBMgObi/S0fXTpP3zupk8qFqsO/1zdtWCD169iLA3VkkZh9ICHpZ0oWRzqG0M9/TnCeKFohgBLqDp6O0zEfXOv6i5q++aucItznQdLwrKLP+O0blfb4dWVI8/aSbd4ELAZuJJxj9bCoVZ1vk+ShbUXCRZTD30OIEAr3eiG9aw00y1UZIqMgX6CkFlU9L9OnKLsNsyomPIaRHTmgVTI5kNhrnVNyNsnzt9rY7fUD52DQxJILVPrUJ1Q+qW7VyNslzGYBPG0DyYlKbRAomKJDQIkdj/Uwa6bhSTq4XYNVvbk5AJ/dGwvsVdOnkMT2Ipd67KwbKfw5bqQj/cw6bj8Cp2FD4Dy4Ud4daBpPRsCyxBM2jOjVz1B/lAyrOp8BweXOXYugwdPyEn38MBZ5oL4D38jIwR/QiVnMHpERh93jtgwh9Abza6i4/zZaDAbPhtZLXSM5ztdctv8bAb63CppLU541Kf4OaLO3QLvfLRXK2n8bwEwzVAqQ22dyzt6/vPiRbZ5akh8JB6QFXG0QJF9DejsIspKF3JvOKjG2edmC9o+GfL3hwDBiihYXCGY9lElZICAdt+7rZm5UxMx7STrVKy81xcvfaIp1BwGh/HyMsJnkE8IczzRFpLlHGYuNDxdLoBjiifrmHvOCUDcV8UvhSV+UAZtAVejdNGo5G/bz0NF21HUO4pVRPu6RqZIs/aX4hlm6iO/0Ru00ct8pfadUIgRcephTuFC2fHyZxNBC6NApRtLSNLfzYTTo/uSjgcu6rLWiNo5G7yfrM45RXjalFEFzk75Z/fu9lCJJa5uLFgDNKlU+IaFjArfXJCll3apbZp4/LNKiU35ZlB7ZmjDTrji1wLep8iRVVEGht/DW00MTok7Zn7Fv+MlxgWmbZB3BuezwTmXb/fNw==') - result = unseal_events_response(sealed_data, [ + result = unseal_event_response(sealed_data, [ DecryptionKey(self.invalid_key, DecryptionAlgorithm['Aes256Gcm']), DecryptionKey(self.valid_key, DecryptionAlgorithm['Aes256Gcm']), ]) self.assertEqual(result, expected_result) + self.assertIsInstance(result, EventResponse) + self.assertIsInstance(result.products, ProductsResponse) + self.assertIsInstance(result.products.identification, ProductsResponseIdentification) + self.assertIsInstance(result.products.identification.data.browser_details, BrowserDetails) def test_unseal_invalid_header(self): sealed_data = base64.b64decode( 'xzXc7SXO+mqeAGrvBMgObi/S0fXTpP3zupk8qFqsO/1zdtWCD169iLA3VkkZh9ICHpZ0oWRzqG0M9/TnCeKFohgBLqDp6O0zEfXOv6i5q++aucItznQdLwrKLP+O0blfb4dWVI8/aSbd4ELAZuJJxj9bCoVZ1vk+ShbUXCRZTD30OIEAr3eiG9aw00y1UZIqMgX6CkFlU9L9OnKLsNsyomPIaRHTmgVTI5kNhrnVNyNsnzt9rY7fUD52DQxJILVPrUJ1Q+qW7VyNslzGYBPG0DyYlKbRAomKJDQIkdj/Uwa6bhSTq4XYNVvbk5AJ/dGwvsVdOnkMT2Ipd67KwbKfw5bqQj/cw6bj8Cp2FD4Dy4Ud4daBpPRsCyxBM2jOjVz1B/lAyrOp8BweXOXYugwdPyEn38MBZ5oL4D38jIwR/QiVnMHpERh93jtgwh9Abza6i4/zZaDAbPhtZLXSM5ztdctv8bAb63CppLU541Kf4OaLO3QLvfLRXK2n8bwEwzVAqQ22dyzt6/vPiRbZ5akh8JB6QFXG0QJF9DejsIspKF3JvOKjG2edmC9o+GfL3hwDBiihYXCGY9lElZICAdt+7rZm5UxMx7STrVKy81xcvfaIp1BwGh/HyMsJnkE8IczzRFpLlHGYuNDxdLoBjiifrmHvOCUDcV8UvhSV+UAZtAVejdNGo5G/bz0NF21HUO4pVRPu6RqZIs/aX4hlm6iO/0Ru00ct8pfadUIgRcephTuFC2fHyZxNBC6NApRtLSNLfzYTTo/uSjgcu6rLWiNo5G7yfrM45RXjalFEFzk75Z/fu9lCJJa5uLFgDNKlU+IaFjArfXJCll3apbZp4/LNKiU35ZlB7ZmjDTrji1wLep8iRVVEGht/DW00MTok7Zn7Fv+MlxgWmbZB3BuezwTmXb/fNw==') - with self.assertRaisesRegex(Exception, "Invalid sealed data header"): - unseal_events_response(sealed_data, [ + with self.assertRaisesRegex(ValueError, "Invalid sealed data header"): + unseal_event_response(sealed_data, [ DecryptionKey(self.invalid_key, DecryptionAlgorithm['Aes256Gcm']), DecryptionKey(self.valid_key, DecryptionAlgorithm['Aes256Gcm']), ]) @@ -40,8 +43,8 @@ def test_unseal_invalid_algorithm(self): sealed_data = base64.b64decode( 'noXc7SXO+mqeAGrvBMgObi/S0fXTpP3zupk8qFqsO/1zdtWCD169iLA3VkkZh9ICHpZ0oWRzqG0M9/TnCeKFohgBLqDp6O0zEfXOv6i5q++aucItznQdLwrKLP+O0blfb4dWVI8/aSbd4ELAZuJJxj9bCoVZ1vk+ShbUXCRZTD30OIEAr3eiG9aw00y1UZIqMgX6CkFlU9L9OnKLsNsyomPIaRHTmgVTI5kNhrnVNyNsnzt9rY7fUD52DQxJILVPrUJ1Q+qW7VyNslzGYBPG0DyYlKbRAomKJDQIkdj/Uwa6bhSTq4XYNVvbk5AJ/dGwvsVdOnkMT2Ipd67KwbKfw5bqQj/cw6bj8Cp2FD4Dy4Ud4daBpPRsCyxBM2jOjVz1B/lAyrOp8BweXOXYugwdPyEn38MBZ5oL4D38jIwR/QiVnMHpERh93jtgwh9Abza6i4/zZaDAbPhtZLXSM5ztdctv8bAb63CppLU541Kf4OaLO3QLvfLRXK2n8bwEwzVAqQ22dyzt6/vPiRbZ5akh8JB6QFXG0QJF9DejsIspKF3JvOKjG2edmC9o+GfL3hwDBiihYXCGY9lElZICAdt+7rZm5UxMx7STrVKy81xcvfaIp1BwGh/HyMsJnkE8IczzRFpLlHGYuNDxdLoBjiifrmHvOCUDcV8UvhSV+UAZtAVejdNGo5G/bz0NF21HUO4pVRPu6RqZIs/aX4hlm6iO/0Ru00ct8pfadUIgRcephTuFC2fHyZxNBC6NApRtLSNLfzYTTo/uSjgcu6rLWiNo5G7yfrM45RXjalFEFzk75Z/fu9lCJJa5uLFgDNKlU+IaFjArfXJCll3apbZp4/LNKiU35ZlB7ZmjDTrji1wLep8iRVVEGht/DW00MTok7Zn7Fv+MlxgWmbZB3BuezwTmXb/fNw==') - with self.assertRaisesRegex(Exception, "Unsupported decryption algorithm: invalid"): - unseal_events_response(sealed_data, [ + with self.assertRaisesRegex(ValueError, "Unsupported decryption algorithm: invalid"): + unseal_event_response(sealed_data, [ DecryptionKey(self.invalid_key, 'invalid'), DecryptionKey(self.valid_key, DecryptionAlgorithm['Aes256Gcm']), ]) @@ -51,8 +54,8 @@ def test_unseal_invalid_data(self): # "{\"invalid\":true}" 'noXc7VOpBstjjcavDKSKr4HTavt4mdq8h6NC32T0hUtw9S0jXT8lPjZiWL8SyHxmrF3uTGqO+g==') - with self.assertRaisesRegex(Exception, "Sealed data is not valid events response"): - unseal_events_response(sealed_data, [ + with self.assertRaisesRegex(ValueError, "Sealed data is not valid event response"): + unseal_event_response(sealed_data, [ DecryptionKey(self.invalid_key, DecryptionAlgorithm['Aes256Gcm']), DecryptionKey(self.valid_key, DecryptionAlgorithm['Aes256Gcm']), ]) @@ -61,8 +64,8 @@ def test_unseal_not_compressed_data(self): sealed_data = base64.b64decode( 'noXc7dtuk0smGE+ZbaoXzrp6Rq8ySxLepejTsu7+jUXlPhV1w+WuHx9gbPhaENJnOQo8BcGmsaRhL5k2NVj+DRNzYO9cQD7wHxmXKCyTbl/dvSYOMoHziUZ2VbQ7tmaorFny26v8jROr/UBGfvPE0dLKC36IN9ZlJ3X0NZJO8SY+8bCr4mTrkVZsv/hpvZp+OjC4h7e5vxcpmnBWXzxfaO79Lq3aMRIEf9XfK7/bVIptHaEqtPKCTwl9rz1KUpUUNQSHTPM0NlqJe9bjYf5mr1uYvWHhcJoXSyRyVMxIv/quRiw3SKJzAMOTBiAvFICpWuRFa+T/xIMHK0g96w/IMQo0jdY1E067ZEvBUOBmsJnGJg1LllS3rbJVe+E2ClFNL8SzFphyvtlcfvYB+SVSD4bzI0w/YCldv5Sq42BFt5bn4n4aE5A6658DYsfSRYWqP6OpqPJx96cY34W7H1t/ZG0ulez6zF5NvWhc1HDQ1gMtXd+K/ogt1n+FyFtn8xzvtSGkmrc2jJgYNI5Pd0Z0ent73z0MKbJx9v2ta/emPEzPr3cndN5amdr6TmRkDU4bq0vyhAh87DJrAnJQLdrvYLddnrr8xTdeXxj1i1Yug6SGncPh9sbTYkdOfuamPAYOuiJVBAMcfYsYEiQndZe8mOQ4bpCr+hxAAqixhZ16pQ8CeUwa247+D2scRymLB8qJXlaERuFZtWGVAZ8VP/GS/9EXjrzpjGX9vlrIPeJP8fh2S5QPzw55cGNJ7JfAdOyManXnoEw2/QzDhSZQARVl+akFgSO0Y13YmbiL7H6HcKWGcJ2ipDKIaj2fJ7GE0Vzyt+CBEezSQR99Igd8x3p2JtvsVKp35iLPksjS1VqtSCTbuIRUlINlfQHNjeQiE/B/61jo3Mf7SmjYjqtvXt5e9RKb+CQku2qH4ZU8xN3DSg+4mLom3BgKBkm/MoyGBpMK41c96d2tRp3tp4hV0F6ac02Crg7P2lw8IUct+i2VJ8VUjcbRfTIPQs0HjNjM6/gLfLCkWOHYrlFjwusXWQCJz91Kq+hVxj7M9LtplPO4AUq6RUMNhlPGUmyOI2tcUMrjq9vMLXGlfdkH185zM4Mk+O7DRLC8683lXZFZvcBEmxr855PqLLH/9SpYKHBoGRatDRdQe3oRp6gHS0jpQ1SW/si4kvLKiUNjiBExvbQVOUV7/VFXvG1RpM9wbzSoOd40gg7ZzD/72QshUC/25DkM/Pm7RBzwtjgmnRKjT+mROeC/7VQLoz3amv09O8Mvbt+h/lX5+51Q834F7NgIGagbB20WtWcMtrmKrvCEZlaoiZrmYVSbi1RfknRK7CTPJkopw9IjO7Ut2EhKZ+jL4rwk6TlVm6EC6Kuj7KNqp6wB/UNe9eM2Eym/aiHAcja8XN4YQhSIuJD2Wxb0n3LkKnAjK1/GY65c8K6rZsVYQ0MQL1j4lMl0UZPjG/vzKyetIsVDyXc4J9ZhOEMYnt/LaxEeSt4EMJGBA9wpTmz33X4h3ij0Y3DY/rH7lrEScUknw20swTZRm5T6q1bnimj7M1OiOkebdI09MZ0nyaTWRHdB7B52C/moh89Q7qa2Fulp5h8Us1FYRkWBLt37a5rGI1IfVeP38KaPbagND+XzWpNqX4HVrAVPLQVK5EwUvGamED3ooJ0FMieTc0IH0N+IeUYG7Q8XmrRVBcw32W8pEfYLO9L71An/J0jQZCIP8DuQnUG0mOvunOuloBGvP/9LvkBlkamh68F0a5f5ny1jloyIFJhRh5dt2SBlbsXS9AKqUwARYSSsA9Ao4WJWOZMyjp8A+qIBAfW65MdhhUDKYMBgIAbMCc3uiptzElQQopE5TT5xIhwfYxa503jVzQbz1Q==') - with self.assertRaisesRegex(Exception, "Unable to decrypt sealed data") as context: - unseal_events_response(sealed_data, [ + with self.assertRaisesRegex(UnsealAggregateError, "Unable to decrypt sealed data") as context: + unseal_event_response(sealed_data, [ DecryptionKey(self.invalid_key, DecryptionAlgorithm['Aes256Gcm']), DecryptionKey(self.valid_key, DecryptionAlgorithm['Aes256Gcm']), ]) @@ -76,8 +79,8 @@ def test_unseal_all_keys_invalid(self): sealed_data = base64.b64decode( 'noXc7SXO+mqeAGrvBMgObi/S0fXTpP3zupk8qFqsO/1zdtWCD169iLA3VkkZh9ICHpZ0oWRzqG0M9/TnCeKFohgBLqDp6O0zEfXOv6i5q++aucItznQdLwrKLP+O0blfb4dWVI8/aSbd4ELAZuJJxj9bCoVZ1vk+ShbUXCRZTD30OIEAr3eiG9aw00y1UZIqMgX6CkFlU9L9OnKLsNsyomPIaRHTmgVTI5kNhrnVNyNsnzt9rY7fUD52DQxJILVPrUJ1Q+qW7VyNslzGYBPG0DyYlKbRAomKJDQIkdj/Uwa6bhSTq4XYNVvbk5AJ/dGwvsVdOnkMT2Ipd67KwbKfw5bqQj/cw6bj8Cp2FD4Dy4Ud4daBpPRsCyxBM2jOjVz1B/lAyrOp8BweXOXYugwdPyEn38MBZ5oL4D38jIwR/QiVnMHpERh93jtgwh9Abza6i4/zZaDAbPhtZLXSM5ztdctv8bAb63CppLU541Kf4OaLO3QLvfLRXK2n8bwEwzVAqQ22dyzt6/vPiRbZ5akh8JB6QFXG0QJF9DejsIspKF3JvOKjG2edmC9o+GfL3hwDBiihYXCGY9lElZICAdt+7rZm5UxMx7STrVKy81xcvfaIp1BwGh/HyMsJnkE8IczzRFpLlHGYuNDxdLoBjiifrmHvOCUDcV8UvhSV+UAZtAVejdNGo5G/bz0NF21HUO4pVRPu6RqZIs/aX4hlm6iO/0Ru00ct8pfadUIgRcephTuFC2fHyZxNBC6NApRtLSNLfzYTTo/uSjgcu6rLWiNo5G7yfrM45RXjalFEFzk75Z/fu9lCJJa5uLFgDNKlU+IaFjArfXJCll3apbZp4/LNKiU35ZlB7ZmjDTrji1wLep8iRVVEGht/DW00MTok7Zn7Fv+MlxgWmbZB3BuezwTmXb/fNw==') - with self.assertRaisesRegex(Exception, 'Unable to decrypt sealed data'): - unseal_events_response(sealed_data, [ + with self.assertRaisesRegex(UnsealAggregateError, 'Unable to decrypt sealed data'): + unseal_event_response(sealed_data, [ DecryptionKey(self.invalid_key, DecryptionAlgorithm['Aes256Gcm']), DecryptionKey(self.invalid_key, DecryptionAlgorithm['Aes256Gcm']), ]) @@ -85,8 +88,8 @@ def test_unseal_all_keys_invalid(self): def test_unseal_empty_data(self): sealed_data = bytearray(b'') - with self.assertRaisesRegex(Exception, 'Invalid sealed data header'): - unseal_events_response(sealed_data, [ + with self.assertRaisesRegex(ValueError, 'Invalid sealed data header'): + unseal_event_response(sealed_data, [ DecryptionKey(self.invalid_key, DecryptionAlgorithm['Aes256Gcm']), DecryptionKey(self.valid_key, DecryptionAlgorithm['Aes256Gcm']), ]) @@ -94,8 +97,8 @@ def test_unseal_empty_data(self): def test_unseal_invalid_nonce(self): sealed_data = bytes([0x9E, 0x85, 0xDC, 0xED, 0xAA, 0xBB, 0xCC]) - with self.assertRaisesRegex(Exception, 'Unable to decrypt sealed data') as context: - unseal_events_response(sealed_data, [ + with self.assertRaisesRegex(UnsealAggregateError, 'Unable to decrypt sealed data') as context: + unseal_event_response(sealed_data, [ DecryptionKey(self.invalid_key, DecryptionAlgorithm['Aes256Gcm']), DecryptionKey(self.valid_key, DecryptionAlgorithm['Aes256Gcm']), ])