diff --git a/eth_account/_utils/signing.py b/eth_account/_utils/signing.py index 805842ea..5b439a5c 100644 --- a/eth_account/_utils/signing.py +++ b/eth_account/_utils/signing.py @@ -1,21 +1,16 @@ -import json -import re - from cytoolz import ( curry, pipe, ) -from eth_abi import ( - encode_abi, - is_encodable, -) from eth_utils import ( - keccak, to_bytes, to_int, to_text, ) +from eth_account._utils.structured_data import ( + hashStruct, +) from eth_account._utils.transactions import ( ChainAwareUnsignedTransaction, UnsignedTransaction, @@ -32,10 +27,6 @@ INTENDED_VALIDATOR_SIGN_VERSION = b'\x00' # Hex value 0x00 STRUCTURED_DATA_SIGN_VERSION = b'\x01' # Hex value 0x01 -# Regexes -IDENTIFIER_REGEX = r"^[a-zA-Z_$][a-zA-Z_$0-9]*$" -TYPE_REGEX = r"^[a-zA-Z_$][a-zA-Z_$0-9]*(\[([1-9]\d*)*\])*$" - def sign_transaction_dict(eth_key, transaction_dict): # generate RLP-serializable transaction, with defaults filled @@ -58,264 +49,6 @@ def sign_transaction_dict(eth_key, transaction_dict): return (v, r, s, encoded_transaction) -# -# EIP712 Functionalities -# -def dependencies(primaryType, types, found=None): - """ - Recursively get all the dependencies of the primaryType - """ - # This is done to avoid the by-reference call of python - found = found or [] - - if primaryType in found: - return found - if primaryType not in types: - return found - - found.append(primaryType) - for field in types[primaryType]: - for dep in dependencies(field["type"], types, found): - if dep not in found: - found.push(dep) - - return found - - -def dict_to_type_name_converter(field): - """ - Given a dictionary ``field`` of type {'name': NAME, 'type': TYPE}, - this function converts it to ``TYPE NAME`` - """ - return field["type"] + " " + field["name"] - - -def encodeType(primaryType, types): - """ - The type of a struct is encoded as name ‖ "(" ‖ member₁ ‖ "," ‖ member₂ ‖ "," ‖ … ‖ memberₙ ")" - where each member is written as type ‖ " " ‖ name. - """ - # Getting the dependencies and sorting them alphabetically as per EIP712 - deps = dependencies(primaryType, types) - deps_without_primary_type = list(filter(lambda x: x != primaryType, deps)) - sorted_deps = [primaryType] + sorted(deps_without_primary_type) - - result = ''.join( - [ - struct_name + "(" + ','.join(map(dict_to_type_name_converter, types[struct_name])) + ")" - for struct_name in sorted_deps - ] - ) - return result - - -def typeHash(primaryType, types): - return keccak(text=encodeType(primaryType, types)) - - -def encodeData(primaryType, types, data): - encTypes = [] - encValues = [] - - # Add typehash - encTypes.append("bytes32") - encValues.append(typeHash(primaryType, types)) - - # Add field contents - for field in types[primaryType]: - value = data[field["name"]] - if field["type"] == "string": - if not isinstance(value, str): - raise TypeError( - "Value of `{0}` ({2}) in the struct `{1}` is of the type `{3}`, but expected " - "string value".format( - field["name"], - primaryType, - value, - type(value), - ) - ) - # Special case where the values need to be keccak hashed before they are encoded - encTypes.append("bytes32") - hashed_value = keccak(text=value) - encValues.append(hashed_value) - elif field["type"] == "bytes": - if not isinstance(value, bytes): - raise TypeError( - "Value of `{0}` ({2}) in the struct `{1}` is of the type `{3}`, but expected " - "bytes value".format( - field["name"], - primaryType, - value, - type(value), - ) - ) - # Special case where the values need to be keccak hashed before they are encoded - encTypes.append("bytes32") - hashed_value = keccak(primitive=value) - encValues.append(hashed_value) - elif field["type"] in types: - # This means that this type is a user defined type - encTypes.append("bytes32") - hashed_value = keccak(primitive=encodeData(field["type"], types, value)) - encValues.append(hashed_value) - elif field["type"][-1] == "]": - # TODO: Replace the above conditionality with Regex for identifying arrays declaration - raise NotImplementedError("TODO: Arrays currently unimplemented in encodeData") - else: - # First checking to see if the individual values can be encoded - try: - is_encodable(field["type"], value) - except: - raise TypeError( - "Received Invalid type `{0}` in the struct `{1}`".format( - field["type"], - primaryType, - ) - ) - - # Next see if the data fits the specified encoding type - if is_encodable(field["type"], value): - # field["type"] is a valid type and this value corresponds to that type. - encTypes.append(field["type"]) - encValues.append(value) - else: - raise TypeError( - "Value of `{0}` ({2}) in the struct `{1}` is of the type `{3}`, but expected " - "{4} value".format( - field["name"], - primaryType, - value, - type(value), - field["type"], - ) - ) - - return encode_abi(encTypes, encValues) - - -def validate_has_attribute(attr_name, dict_data): - if attr_name not in dict_data: - raise AttributeError( - "Attribute `{0}` not found in the JSON string". - format(attr_name) - ) - - -def validate_types_attribute(structured_data): - # Check that the data has `types` attribute - validate_has_attribute("types", structured_data) - - # Check if all the `name` and the `type` attributes in each field of all the - # `types` attribute are valid (Regex Check) - for struct_name in structured_data["types"]: - # Check that `struct_name` is of the type string - if not isinstance(struct_name, str): - raise TypeError( - "Struct Name of `types` attribute should be a string, but got type `{0}`". - format(type(struct_name)) - ) - for field in structured_data["types"][struct_name]: - # Check that `field["name"]` is of the type string - if not isinstance(field["name"], str): - raise TypeError( - "Field Name `{0}` of struct `{1}` should be a string, but got type `{2}`". - format(field["name"], struct_name, type(field["name"])) - ) - # Check that `field["type"]` is of the type string - if not isinstance(field["type"], str): - raise TypeError( - "Field Type `{0}` of struct `{1}` should be a string, but got type `{2}`". - format(field["type"], struct_name, type(field["type"])) - ) - # Check that field["name"] matches with IDENTIFIER_REGEX - if not re.match(IDENTIFIER_REGEX, field["name"]): - raise AttributeError( - "Invalid Identifier `{0}` in `{1}`".format(field["name"], struct_name) - ) - # Check that field["type"] matches with TYPE_REGEX - if not re.match(TYPE_REGEX, field["type"]): - raise AttributeError( - "Invalid Type `{0}` in `{1}`".format(field["type"], struct_name) - ) - - -def validate_field_declared_only_once_in_struct(field_name, struct_data, struct_name): - if len([field for field in struct_data if field["name"] == field_name]) != 1: - raise AttributeError( - "Attribute `{0}` not declared or declared more than once in {1}". - format(field_name, struct_name) - ) - - -def validate_EIP712Domain_schema(structured_data): - # Check that the `types` attribute contains `EIP712Domain` schema declaration - if "EIP712Domain" not in structured_data["types"]: - raise AttributeError("`EIP712Domain struct` not found in types attribute") - # Check that the names and types in `EIP712Domain` are what are mentioned in the EIP-712 - # and they are declared only once - EIP712Domain_data = structured_data["types"]["EIP712Domain"] - validate_field_declared_only_once_in_struct("name", EIP712Domain_data, "EIP712Domain") - validate_field_declared_only_once_in_struct("version", EIP712Domain_data, "EIP712Domain") - validate_field_declared_only_once_in_struct("chainId", EIP712Domain_data, "EIP712Domain") - validate_field_declared_only_once_in_struct( - "verifyingContract", - EIP712Domain_data, - "EIP712Domain", - ) - - -def validate_primaryType_attribute(structured_data): - # Check that `primaryType` attribute is present - if "primaryType" not in structured_data: - raise AttributeError("The Structured Data needs to have a `primaryType` attribute") - # Check that `primaryType` value is a string - if not isinstance(structured_data["primaryType"], str): - raise TypeError( - "Value of attribute `primaryType` should be `string`, but got type `{0}`". - format(type(structured_data["primaryType"])) - ) - # Check that the value of `primaryType` is present in the `types` attribute - if not structured_data["primaryType"] in structured_data["types"]: - raise AttributeError( - "The Primary Type `{0}` is not present in the `types` attribute". - format(structured_data["primaryType"]) - ) - - -def validate_structured_data(structured_data): - # validate the `types` attribute - validate_types_attribute(structured_data) - # validate the `EIP712Domain` attribute of `types` attribute - validate_EIP712Domain_schema(structured_data) - # validate the `primaryType` attribute - validate_primaryType_attribute(structured_data) - # Check that there is a `domain` attribute in the structured data - validate_has_attribute("domain", structured_data) - # Check that there is a `message` attribute in the structured data - validate_has_attribute("message", structured_data) - - -def hashStruct(structured_json_string_data, is_domain_separator=False): - """ - The structured_json_string_data is expected to have the ``types`` attribute and - the ``primaryType``, ``message``, ``domain`` attribute. - The ``is_domain_separator`` variable is used to calculate the ``hashStruct`` as - part of the ``domainSeparator`` calculation. - """ - structured_data = json.loads(structured_json_string_data) - validate_structured_data(structured_data) - - types = structured_data["types"] - if is_domain_separator: - primaryType = "EIP712Domain" - data = structured_data["domain"] - else: - primaryType = structured_data["primaryType"] - data = structured_data["message"] - return keccak(encodeData(primaryType, types, data)) - - # watch here for updates to signature format: # https://github.com/ethereum/EIPs/blob/master/EIPS/eip-191.md # https://github.com/ethereum/EIPs/blob/master/EIPS/eip-712.md diff --git a/eth_account/_utils/structured_data/__init__.py b/eth_account/_utils/structured_data/__init__.py new file mode 100644 index 00000000..dfcb76f7 --- /dev/null +++ b/eth_account/_utils/structured_data/__init__.py @@ -0,0 +1,3 @@ +from .hashing import ( # noqa: F401 + hashStruct, +) diff --git a/eth_account/_utils/structured_data/hashing.py b/eth_account/_utils/structured_data/hashing.py new file mode 100644 index 00000000..d2bd3f60 --- /dev/null +++ b/eth_account/_utils/structured_data/hashing.py @@ -0,0 +1,212 @@ +import json + +from eth_abi import ( + encode_abi, + is_encodable, +) +from eth_utils import ( + keccak, + to_tuple, +) + +from .validation import ( + validate_structured_data, +) + + +def dependencies(primaryType, types): + """ + Perform BFS to get all the dependencies of the primaryType + """ + deps = set() + struct_names_yet_to_be_expanded = [primaryType] + + while len(struct_names_yet_to_be_expanded) > 0: + struct_name = struct_names_yet_to_be_expanded.pop() + + deps.add(struct_name) + fields = types[struct_name] + for field in fields: + # If this struct type has already been seen, then don't expand on it' + if field["type"] in deps: + continue + # If this struct type is not a customized type, then no need to expand + elif field["type"] not in types: + continue + # Custom Struct Type + else: + struct_names_yet_to_be_expanded.append(field["type"]) + + # Don't need to make a struct as dependency of itself + deps.remove(primaryType) + + return tuple(deps) + + +def dict_to_type_name_converter(field): + """ + Given a dictionary ``field`` of type {'name': NAME, 'type': TYPE}, + this function converts it to ``TYPE NAME`` + """ + return "{0} {1}".format(field["type"], field["name"]) + + +def encode_struct(struct_name, struct_types): + return "{0}({1})".format( + struct_name, + ','.join(map(dict_to_type_name_converter, struct_types)), + ) + + +def encodeType(primaryType, types): + """ + The type of a struct is encoded as name ‖ "(" ‖ member₁ ‖ "," ‖ member₂ ‖ "," ‖ … ‖ memberₙ ")" + where each member is written as type ‖ " " ‖ name. + """ + # Getting the dependencies and sorting them alphabetically as per EIP712 + deps = dependencies(primaryType, types) + sorted_deps = (primaryType,) + tuple(sorted(deps)) + + result = ''.join( + [ + encode_struct(struct_name, types[struct_name]) + for struct_name in sorted_deps + ] + ) + return result + + +def typeHash(primaryType, types): + return keccak(text=encodeType(primaryType, types)) + + +def is_valid_abi_type(type_name): + """ + This function is used to make sure that the ``type_name`` is a valid ABI Type. + + Please note that this is a temporary function and should be replaced by the corresponding + ABI function, once the following issue has been resolved. + https://github.com/ethereum/eth-abi/issues/125 + """ + valid_abi_types = {"address", "bool", "bytes", "int", "string", "uint"} + is_bytesN = type_name.startswith("bytes") and 1 <= int(type_name[5:]) <= 32 + is_intN = ( + type_name.startswith("int") and + 8 <= int(type_name[3:]) <= 256 and + int(type_name[3:]) % 8 == 0 + ) + is_uintN = ( + type_name.startswith("uint") and + 8 <= int(type_name[4:]) <= 256 and + int(type_name[4:]) % 8 == 0 + ) + + if type_name in valid_abi_types: + return True + elif is_bytesN: + # bytes1 to bytes32 + return True + elif is_intN: + # int8 to int256 + return True + elif is_uintN: + # uint8 to uint256 + return True + + return False + + +@to_tuple +def _encodeData(primaryType, types, data): + # Add typehash + yield "bytes32", typeHash(primaryType, types) + + # Add field contents + for field in types[primaryType]: + value = data[field["name"]] + if field["type"] == "string": + if not isinstance(value, str): + raise TypeError( + "Value of `{0}` ({2}) in the struct `{1}` is of the type `{3}`, but expected " + "string value".format( + field["name"], + primaryType, + value, + type(value), + ) + ) + # Special case where the values need to be keccak hashed before they are encoded + hashed_value = keccak(text=value) + yield "bytes32", hashed_value + elif field["type"] == "bytes": + if not isinstance(value, bytes): + raise TypeError( + "Value of `{0}` ({2}) in the struct `{1}` is of the type `{3}`, but expected " + "bytes value".format( + field["name"], + primaryType, + value, + type(value), + ) + ) + # Special case where the values need to be keccak hashed before they are encoded + hashed_value = keccak(primitive=value) + yield "bytes32", hashed_value + elif field["type"] in types: + # This means that this type is a user defined type + hashed_value = keccak(primitive=encodeData(field["type"], types, value)) + yield "bytes32", hashed_value + elif field["type"][-1] == "]": + # TODO: Replace the above conditionality with Regex for identifying arrays declaration + raise NotImplementedError("TODO: Arrays currently unimplemented in encodeData") + else: + # First checking to see if type is valid as per abi + if not is_valid_abi_type(field["type"]): + raise TypeError( + "Received Invalid type `{0}` in the struct `{1}`".format( + field["type"], + primaryType, + ) + ) + + # Next see if the data fits the specified encoding type + if is_encodable(field["type"], value): + # field["type"] is a valid type and this value corresponds to that type. + yield field["type"], value + else: + raise TypeError( + "Value of `{0}` ({2}) in the struct `{1}` is of the type `{3}`, but expected " + "{4} value".format( + field["name"], + primaryType, + value, + type(value), + field["type"], + ) + ) + + +def encodeData(primaryType, types, data): + data_types_and_hashes = _encodeData(primaryType, types, data) + data_types, data_hashes = zip(*data_types_and_hashes) + return encode_abi(data_types, data_hashes) + + +def hashStruct(structured_json_string_data, is_domain_separator=False): + """ + The structured_json_string_data is expected to have the ``types`` attribute and + the ``primaryType``, ``message``, ``domain`` attribute. + The ``is_domain_separator`` variable is used to calculate the ``hashStruct`` as + part of the ``domainSeparator`` calculation. + """ + structured_data = json.loads(structured_json_string_data) + validate_structured_data(structured_data) + + types = structured_data["types"] + if is_domain_separator: + primaryType = "EIP712Domain" + data = structured_data["domain"] + else: + primaryType = structured_data["primaryType"] + data = structured_data["message"] + return keccak(encodeData(primaryType, types, data)) diff --git a/eth_account/_utils/structured_data/validation.py b/eth_account/_utils/structured_data/validation.py new file mode 100644 index 00000000..53b12612 --- /dev/null +++ b/eth_account/_utils/structured_data/validation.py @@ -0,0 +1,111 @@ +import re + +from eth_utils import ( + ValidationError, +) + +# Regexes +IDENTIFIER_REGEX = r"^[a-zA-Z_$][a-zA-Z_$0-9]*$" +TYPE_REGEX = r"^[a-zA-Z_$][a-zA-Z_$0-9]*(\[([1-9]\d*)*\])*$" + + +def validate_has_attribute(attr_name, dict_data): + if attr_name not in dict_data: + raise ValidationError( + "Attribute `{0}` not found in the JSON string". + format(attr_name) + ) + + +def validate_types_attribute(structured_data): + # Check that the data has `types` attribute + validate_has_attribute("types", structured_data) + + # Check if all the `name` and the `type` attributes in each field of all the + # `types` attribute are valid (Regex Check) + for struct_name in structured_data["types"]: + # Check that `struct_name` is of the type string + if not isinstance(struct_name, str): + raise ValidationError( + "Struct Name of `types` attribute should be a string, but got type `{0}`". + format(type(struct_name)) + ) + for field in structured_data["types"][struct_name]: + # Check that `field["name"]` is of the type string + if not isinstance(field["name"], str): + raise ValidationError( + "Field Name `{0}` of struct `{1}` should be a string, but got type `{2}`". + format(field["name"], struct_name, type(field["name"])) + ) + # Check that `field["type"]` is of the type string + if not isinstance(field["type"], str): + raise ValidationError( + "Field Type `{0}` of struct `{1}` should be a string, but got type `{2}`". + format(field["type"], struct_name, type(field["type"])) + ) + # Check that field["name"] matches with IDENTIFIER_REGEX + if not re.match(IDENTIFIER_REGEX, field["name"]): + raise ValidationError( + "Invalid Identifier `{0}` in `{1}`".format(field["name"], struct_name) + ) + # Check that field["type"] matches with TYPE_REGEX + if not re.match(TYPE_REGEX, field["type"]): + raise ValidationError( + "Invalid Type `{0}` in `{1}`".format(field["type"], struct_name) + ) + + +def validate_field_declared_only_once_in_struct(field_name, struct_data, struct_name): + if len([field for field in struct_data if field["name"] == field_name]) != 1: + raise ValidationError( + "Attribute `{0}` not declared or declared more than once in {1}". + format(field_name, struct_name) + ) + + +def validate_EIP712Domain_schema(structured_data): + # Check that the `types` attribute contains `EIP712Domain` schema declaration + if "EIP712Domain" not in structured_data["types"]: + raise ValidationError("`EIP712Domain struct` not found in types attribute") + # Check that the names and types in `EIP712Domain` are what are mentioned in the EIP-712 + # and they are declared only once + EIP712Domain_data = structured_data["types"]["EIP712Domain"] + validate_field_declared_only_once_in_struct("name", EIP712Domain_data, "EIP712Domain") + validate_field_declared_only_once_in_struct("version", EIP712Domain_data, "EIP712Domain") + validate_field_declared_only_once_in_struct("chainId", EIP712Domain_data, "EIP712Domain") + validate_field_declared_only_once_in_struct( + "verifyingContract", + EIP712Domain_data, + "EIP712Domain", + ) + + +def validate_primaryType_attribute(structured_data): + # Check that `primaryType` attribute is present + if "primaryType" not in structured_data: + raise ValidationError("The Structured Data needs to have a `primaryType` attribute") + # Check that `primaryType` value is a string + if not isinstance(structured_data["primaryType"], str): + raise ValidationError( + "Value of attribute `primaryType` should be `string`, but got type `{0}`". + format(type(structured_data["primaryType"])) + ) + # Check that the value of `primaryType` is present in the `types` attribute + if not structured_data["primaryType"] in structured_data["types"]: + raise ValidationError( + "The Primary Type `{0}` is not present in the `types` attribute". + format(structured_data["primaryType"]) + ) + + +def validate_structured_data(structured_data): + # validate the `types` attribute + validate_types_attribute(structured_data) + # validate the `EIP712Domain` struct of `types` attribute + validate_EIP712Domain_schema(structured_data) + # validate the `primaryType` attribute + validate_primaryType_attribute(structured_data) + # Check that there is a `domain` attribute in the structured data + validate_has_attribute("domain", structured_data) + # Check that there is a `message` attribute in the structured data + validate_has_attribute("message", structured_data) diff --git a/tests/core/test_structured_data_signing.py b/tests/core/test_structured_data_signing.py index a5c7d600..4820c88b 100644 --- a/tests/core/test_structured_data_signing.py +++ b/tests/core/test_structured_data_signing.py @@ -3,6 +3,7 @@ import re from eth_utils import ( + ValidationError, keccak, ) from hexbytes import ( @@ -12,14 +13,17 @@ from eth_account import ( Account, ) -from eth_account._utils.signing import ( - TYPE_REGEX, +from eth_account._utils.structured_data.hashing import ( dependencies, + encode_struct, encodeData, encodeType, hashStruct, typeHash, ) +from eth_account._utils.structured_data.validation import ( + TYPE_REGEX, +) from eth_account.messages import ( defunct_hash_message, ) @@ -99,14 +103,29 @@ def signature_kwargs(request, structured_valid_data_json_string): @pytest.mark.parametrize( 'primary_type, expected', ( - ('Mail', ['Mail', 'Person']), - ('Person', ['Person']), + ('Mail', ('Person',)), + ('Person', ()), ) ) def test_dependencies(primary_type, types, expected): assert dependencies(primary_type, types) == expected +@pytest.mark.parametrize( + 'struct_name, expected', + ( + ("Mail", "Mail(Person from,Person to,string contents)"), + ("Person", "Person(string name,address wallet)"), + ("EIP712Domain", ( + "EIP712Domain(string name,string version," + "uint256 chainId,address verifyingContract)" + )), + ) +) +def test_encode_struct(struct_name, types, expected): + assert encode_struct(struct_name, types[struct_name]) == expected + + @pytest.mark.parametrize( 'primary_type, expected', ( @@ -251,7 +270,7 @@ def test_structured_data_invalid_identifier_filtered_by_regex(): "contents": "Hello, Bob!" } }''' - with pytest.raises(AttributeError) as e: + with pytest.raises(ValidationError) as e: hashStruct(invalid_structured_data_string) assert str(e.value) == "Invalid Identifier `hello wallet` in `Person`" @@ -294,7 +313,7 @@ def test_structured_data_invalid_type_filtered_by_regex(): "contents": "Hello, Bob!" } }''' - with pytest.raises(AttributeError) as e: + with pytest.raises(ValidationError) as e: hashStruct(invalid_structured_data_string) assert str(e.value) == "Invalid Type `Hello Person` in `Mail`"