Skip to content

Commit

Permalink
Merge pull request #65 from fingerprintjs/fix-sealed-results-types-IN…
Browse files Browse the repository at this point in the history
…TER-819-

Fix sealed results types
  • Loading branch information
ilfa authored Aug 14, 2024
2 parents b83c5af + a7a0e2d commit adae9e8
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 173 deletions.
7 changes: 3 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,16 @@ import os

from dotenv import load_dotenv

from fingerprint_pro_server_api_sdk import EventResponse
from fingerprint_pro_server_api_sdk.sealed import unseal_events_response, DecryptionKey, DecryptionAlgorithm
from fingerprint_pro_server_api_sdk import unseal_event_response, DecryptionKey, DecryptionAlgorithm

load_dotenv()

sealed_result = base64.b64decode(os.environ["BASE64_SEALED_RESULT"])
key = base64.b64decode(os.environ["BASE64_KEY"])

try:
events_response: EventResponse = unseal_events_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])])
print("\n\n\nEvent response: \n", events_response.products)
event_response = unseal_event_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])])
print("\n\n\nEvent response: \n", event_response.products)
except Exception as e:
print("Exception when calling unsealing events response: %s\n" % e)
exit(1)
Expand Down
4 changes: 2 additions & 2 deletions docs/SealedResults.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Sealed results

## **UnsealEventsResponse**
> unseal_events_response(sealed bytes, keys DecryptionKey[]) -> EventResponse
## **UnsealEventResponse**
> unseal_event_response(sealed: bytes, keys: DecryptionKey[]) -> EventResponse
Decrypts the sealed response with provided keys.
### Required Parameters
Expand Down
2 changes: 2 additions & 0 deletions fingerprint_pro_server_api_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,5 @@
from fingerprint_pro_server_api_sdk.models.webhook_visit import WebhookVisit
# import custom methods into sdk package
from fingerprint_pro_server_api_sdk.webhook import Webhook
from fingerprint_pro_server_api_sdk.sealed import ApiClientDeserializer, DecryptionAlgorithm, DecryptionKey, \
UnsealError, UnsealAggregateError, unseal_event_response
133 changes: 71 additions & 62 deletions fingerprint_pro_server_api_sdk/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@
from fingerprint_pro_server_api_sdk.rest import ApiException, RESTResponse
from fingerprint_pro_server_api_sdk.base_model import BaseModel

PRIMITIVE_TYPES = (float, bool, bytes, str, int)
NATIVE_TYPES_MAPPING = {
'int': int,
'long': int,
'float': float,
'str': str,
'bool': bool,
'date': date,
'datetime': datetime,
'object': object,
}


class ApiClient:
"""Generic API client for Swagger client library builds.
Expand All @@ -46,18 +58,6 @@ class ApiClient:
to the API
"""

PRIMITIVE_TYPES = (float, bool, bytes, str, int)
NATIVE_TYPES_MAPPING = {
'int': int,
'long': int,
'float': float,
'str': str,
'bool': bool,
'date': date,
'datetime': datetime,
'object': object,
}

def __init__(self, configuration: Optional[Configuration] = None, header_name: Optional[str] = None,
header_value: Optional[str] = None, cookie: Optional[str] = None, pool: Optional[Pool] = None):
if configuration is None:
Expand Down Expand Up @@ -197,7 +197,7 @@ def sanitize_for_serialization(self, obj: Union[Dict[str, Any], List[Tuple[str,
"""
if obj is None:
return None
elif isinstance(obj, self.PRIMITIVE_TYPES):
elif isinstance(obj, PRIMITIVE_TYPES):
return obj
elif isinstance(obj, list):
return [self.sanitize_for_serialization(sub_obj)
Expand Down Expand Up @@ -251,46 +251,7 @@ def deserialize(self, response: Union[RESTResponse, ApiException], response_type
except ValueError:
data = response.data

return self.__deserialize(data, response_type)

def __deserialize(self, data: Union[Dict, List, str], klass: Any):
"""Deserializes dict, list, str into an object.
:param data: dict, list or str.
:param klass: class literal, or string of class name.
:return: object.
"""
if data is None:
return None

if type(klass) == str:
if klass.startswith('list['):
sub_kls = re.match(r'list\[(.*)\]', klass).group(1)
return [self.__deserialize(sub_data, sub_kls)
for sub_data in data]

if klass.startswith('dict('):
sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2)
return {k: self.__deserialize(v, sub_kls)
for k, v in data.items()}

# convert str to class
if klass in self.NATIVE_TYPES_MAPPING:
klass = self.NATIVE_TYPES_MAPPING[klass]
else:
klass = getattr(fingerprint_pro_server_api_sdk.models, klass)

if klass in self.PRIMITIVE_TYPES:
return self.__deserialize_primitive(data, klass)
elif klass == object:
return data
elif klass == date:
return self.__deserialize_date(data)
elif klass == datetime:
return self.__deserialize_datatime(data)
else:
return self.__deserialize_model(data, klass)
return ApiClientDeserializer.deserialize(data, response_type)

def call_api(self, resource_path: str, method: str, path_params: Optional[Dict[str, Any]] = None,
query_params: Optional[List[Tuple[str, Any]]] = None, header_params: Optional[Dict[str, Any]] = None,
Expand Down Expand Up @@ -515,7 +476,51 @@ def __deserialize_file(self, response):
f.write(response_data)
return path

def __deserialize_primitive(self, data, klass):

class ApiClientDeserializer:
"""Deserializes server response into appropriate type."""
@staticmethod
def deserialize(data: Union[Dict, List, str], klass: Any):
"""Deserializes dict, list, str into an object.
:param data: dict, list or str.
:param klass: class literal, or string of class name.
:return: object.
"""
if data is None:
return None

if type(klass) == str:
if klass.startswith('list['):
sub_kls = re.match(r'list\[(.*)\]', klass).group(1)
return [ApiClientDeserializer.deserialize(sub_data, sub_kls)
for sub_data in data]

if klass.startswith('dict('):
sub_kls = re.match(r'dict\(([^,]*), (.*)\)', klass).group(2)
return {k: ApiClientDeserializer.deserialize(v, sub_kls)
for k, v in data.items()}

# convert str to class
if klass in NATIVE_TYPES_MAPPING:
klass = NATIVE_TYPES_MAPPING[klass]
else:
klass = getattr(fingerprint_pro_server_api_sdk.models, klass)

if klass in PRIMITIVE_TYPES:
return ApiClientDeserializer.__deserialize_primitive(data, klass)
elif klass == object:
return data
elif klass == date:
return ApiClientDeserializer.__deserialize_date(data)
elif klass == datetime:
return ApiClientDeserializer.__deserialize_datatime(data)
else:
return ApiClientDeserializer.__deserialize_model(data, klass)

@staticmethod
def __deserialize_primitive(data, klass):
"""Deserializes string to primitive type.
:param data: str.
Expand All @@ -530,7 +535,8 @@ def __deserialize_primitive(self, data, klass):
except TypeError:
return data

def __deserialize_date(self, string: str) -> date:
@staticmethod
def __deserialize_date(string: str) -> date:
"""Deserializes string to date.
:param string: str.
Expand All @@ -546,7 +552,8 @@ def __deserialize_date(self, string: str) -> date:
reason="Failed to parse `{0}` as date object".format(string)
)

def __deserialize_datatime(self, string: str) -> datetime:
@staticmethod
def __deserialize_datatime(string: str) -> datetime:
"""Deserializes string to datetime.
The string should be in iso8601 datetime format.
Expand All @@ -567,18 +574,20 @@ def __deserialize_datatime(self, string: str) -> datetime:
)
)

def __hasattr(self, object, name):
@staticmethod
def __hasattr(object, name):
return name in object.__class__.__dict__

def __deserialize_model(self, data, klass):
@staticmethod
def __deserialize_model(data, klass):
"""Deserializes list or dict to model.
:param data: dict, list.
:param klass: class literal.
:return: model object.
"""

if not klass.swagger_types and not self.__hasattr(klass, 'get_real_child_model'):
if not klass.swagger_types and not ApiClientDeserializer.__hasattr(klass, 'get_real_child_model'):
return data

kwargs = {}
Expand All @@ -588,7 +597,7 @@ def __deserialize_model(self, data, klass):
klass.attribute_map[attr] in data and
isinstance(data, (list, dict))):
value = data[klass.attribute_map[attr]]
kwargs[attr] = self.__deserialize(value, attr_type)
kwargs[attr] = ApiClientDeserializer.deserialize(value, attr_type)

instance = klass(**kwargs)

Expand All @@ -598,8 +607,8 @@ def __deserialize_model(self, data, klass):
for key, value in data.items():
if key not in klass.swagger_types:
instance[key] = value
if self.__hasattr(instance, 'get_real_child_model'):
if ApiClientDeserializer.__hasattr(instance, 'get_real_child_model'):
klass_name = instance.get_real_child_model(data)
if klass_name:
instance = self.__deserialize(data, klass_name)
instance = ApiClientDeserializer.deserialize(data, klass_name)
return instance
44 changes: 30 additions & 14 deletions fingerprint_pro_server_api_sdk/sealed.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import json
from typing import List

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import zlib

from fingerprint_pro_server_api_sdk.api_client import ApiClientDeserializer
from fingerprint_pro_server_api_sdk.models.event_response import EventResponse

SEALED_HEADER = bytes([0x9e, 0x85, 0xdc, 0xed])
Expand All @@ -12,49 +15,61 @@


class DecryptionKey:
def __init__(self, key, algorithm):
"""Key for decryption of sealed data."""
exception: Exception
algorithm: str

def __init__(self, key: bytes, algorithm: str):
self.key = key
self.algorithm = algorithm


class UnsealError(Exception):
"""Error during unsealing."""
exception: Exception
key: DecryptionKey

def __init__(self, exception, key):
def __init__(self, exception: Exception, key: DecryptionKey):
self.exception = exception
self.key = key


class UnsealAggregateError(Exception):
def __init__(self, errors):
"""Aggregated error during unsealing."""
errors: List[UnsealError]

def __init__(self, errors: List[UnsealError]):
self.errors = errors
super().__init__("Unable to decrypt sealed data")


def parse_events_response(unsealed):
json_data = json.loads(unsealed)
def unseal_event_response(sealed_data: bytes, decryption_keys: List[DecryptionKey]) -> EventResponse:
"""Unseal event response with one of the provided keys."""
unsealed = __unseal(sealed_data, decryption_keys)
return __parse_event_response(unsealed)

if 'products' not in json_data:
raise ValueError('Sealed data is not valid events response')

return EventResponse(json_data['products'])
def __parse_event_response(unsealed: str) -> EventResponse:
"""Parse event response from unsealed data."""
json_data = json.loads(unsealed)

if 'products' not in json_data:
raise ValueError('Sealed data is not valid event response')

def unseal_events_response(sealed_data, decryption_keys):
unsealed = unseal(sealed_data, decryption_keys)
return parse_events_response(unsealed)
result: EventResponse = ApiClientDeserializer.deserialize(json_data, 'EventResponse')
return result


def unseal(sealed_data, decryption_keys):
def __unseal(sealed_data: bytes, decryption_keys: List[DecryptionKey]) -> str:
"""Unseal data with one of the provided keys."""
if sealed_data[:len(SEALED_HEADER)].hex() != SEALED_HEADER.hex():
raise ValueError('Invalid sealed data header')

errors = []
for decryption_key in decryption_keys:
if decryption_key.algorithm == DecryptionAlgorithm['Aes256Gcm']:
try:
return unseal_aes256gcm(sealed_data, decryption_key.key)
return __unseal_aes256gcm(sealed_data, decryption_key.key)
except Exception as e:
errors.append(UnsealError(e, decryption_key))
continue
Expand All @@ -64,7 +79,8 @@ def unseal(sealed_data, decryption_keys):
raise UnsealAggregateError(errors)


def unseal_aes256gcm(sealed_data, decryption_key):
def __unseal_aes256gcm(sealed_data: bytes, decryption_key: bytes) -> str:
"""Unseal data with AES-256-GCM."""
nonce_length = 12
nonce = sealed_data[len(SEALED_HEADER):len(SEALED_HEADER) + nonce_length]

Expand Down
7 changes: 3 additions & 4 deletions sealed_results_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@

from dotenv import load_dotenv

from fingerprint_pro_server_api_sdk import EventResponse
from fingerprint_pro_server_api_sdk.sealed import unseal_events_response, DecryptionKey, DecryptionAlgorithm
from fingerprint_pro_server_api_sdk.sealed import unseal_event_response, DecryptionKey, DecryptionAlgorithm

load_dotenv()

sealed_result = base64.b64decode(os.environ["BASE64_SEALED_RESULT"])
key = base64.b64decode(os.environ["BASE64_KEY"])

try:
events_response: EventResponse = unseal_events_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])])
print("\n\n\nEvent response: \n", events_response.products)
event_response = unseal_event_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])])
print("\n\n\nEvent response: \n", event_response.products)
except Exception as e:
print("Exception when calling unsealing events response: %s\n" % e)
exit(1)
Expand Down
7 changes: 3 additions & 4 deletions template/README.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,16 @@ import os

from dotenv import load_dotenv

from fingerprint_pro_server_api_sdk import EventResponse
from fingerprint_pro_server_api_sdk.sealed import unseal_events_response, DecryptionKey, DecryptionAlgorithm
from fingerprint_pro_server_api_sdk import unseal_event_response, DecryptionKey, DecryptionAlgorithm

load_dotenv()

sealed_result = base64.b64decode(os.environ["BASE64_SEALED_RESULT"])
key = base64.b64decode(os.environ["BASE64_KEY"])

try:
events_response: EventResponse = unseal_events_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])])
print("\n\n\nEvent response: \n", events_response.products)
event_response = unseal_event_response(sealed_result, [DecryptionKey(key, DecryptionAlgorithm['Aes256Gcm'])])
print("\n\n\nEvent response: \n", event_response.products)
except Exception as e:
print("Exception when calling unsealing events response: %s\n" % e)
exit(1)
Expand Down
2 changes: 2 additions & 0 deletions template/__init__package.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ from {{packageName}}.base_model import BaseModel
{{/model}}{{/models}}
# import custom methods into sdk package
from {{packageName}}.webhook import Webhook
from {{packageName}}.sealed import ApiClientDeserializer, DecryptionAlgorithm, DecryptionKey, \
UnsealError, UnsealAggregateError, unseal_event_response
Loading

0 comments on commit adae9e8

Please sign in to comment.