diff --git a/tests/test_compatibility.py b/tests/test_compatibility.py index 9e09f6bc..b6f75c23 100644 --- a/tests/test_compatibility.py +++ b/tests/test_compatibility.py @@ -50,3 +50,40 @@ def test_encrypt_decrypt_original(implementations): # Decrypt on the source side plaintext_decrypted = umbral_src.decrypt_original(sk_src, capsule_src, ciphertext) assert plaintext_decrypted == plaintext + + +def test_kfrags(implementations): + + umbral_src, umbral_dst = implementations + + threshold = 2 + num_frags = 3 + + sk_src = umbral_src.SecretKey.random() + pk_src = umbral_src.PublicKey.from_secret_key(sk_src) + + # Create kfrags + pk_dst = umbral_dst.PublicKey.from_bytes(bytes(pk_src)) + + delegating_sk_dst = umbral_dst.SecretKey.random() + delegating_pk_dst = umbral_dst.PublicKey.from_secret_key(delegating_sk_dst) + + signing_sk_dst = umbral_dst.SecretKey.random() + signing_pk_dst = umbral_dst.PublicKey.from_secret_key(signing_sk_dst) + + kfrags_dst = umbral_dst.generate_kfrags( + delegating_sk_dst, + pk_dst, + signing_sk_dst, + threshold, + num_frags, + True, + True, + ) + + # Transfer the capsule to the source side + delegating_pk_src = umbral_src.PublicKey.from_bytes(bytes(delegating_pk_dst)) + signing_pk_src = umbral_src.PublicKey.from_bytes(bytes(signing_pk_dst)) + + kfrags_src = [umbral_src.KeyFrag.from_bytes(bytes(kfrag)) for kfrag in kfrags_dst] + assert all(kfrag.verify(signing_pk_src, delegating_pk_src, pk_src) for kfrag in kfrags_src) diff --git a/umbral/__init__.py b/umbral/__init__.py index 7b7856a5..919b1813 100644 --- a/umbral/__init__.py +++ b/umbral/__init__.py @@ -3,6 +3,7 @@ ) from .capsule import Capsule +from .key_frag import KeyFrag, generate_kfrags from .keys import SecretKey, PublicKey from .pre import encrypt, decrypt_original @@ -20,4 +21,5 @@ "Capsule", "encrypt", "decrypt_original", + "generate_kfrags", ] diff --git a/umbral/hashing.py b/umbral/hashing.py index 1d3b982f..622c5fc1 100644 --- a/umbral/hashing.py +++ b/umbral/hashing.py @@ -8,6 +8,9 @@ from .curve import CURVE from .curve_scalar import CurveScalar from .curve_point import CurvePoint +from .keys import PublicKey, SecretKey +from .serializable import serialize_bool +from .signing import Signature, Signer class Hash: @@ -38,6 +41,19 @@ def digest_to_scalar(digest: Hash) -> CurveScalar: return CurveScalar(bignum) +def hash_to_polynomial_arg(precursor: CurvePoint, + pubkey: CurvePoint, + dh_point: CurvePoint, + kfrag_id: 'KeyFragID', + ) -> CurveScalar: + digest = Hash(b"POLYNOMIAL_ARG") + digest.update(bytes(precursor)) + digest.update(bytes(pubkey)) + digest.update(bytes(dh_point)) + digest.update(bytes(kfrag_id)) + return digest_to_scalar(digest) + + def hash_capsule_points(e: CurvePoint, v: CurvePoint) -> CurveScalar: digest = Hash(b"CAPSULE_POINTS") digest.update(bytes(e)) @@ -45,7 +61,61 @@ def hash_capsule_points(e: CurvePoint, v: CurvePoint) -> CurveScalar: return digest_to_scalar(digest) -def unsafe_hash_to_point(dst: bytes, data: bytes) -> 'Point': +def hash_to_shared_secret(precursor: CurvePoint, + pubkey: CurvePoint, + dh_point: CurvePoint + ) -> CurveScalar: + digest = Hash(b"SHARED_SECRET") + digest.update(bytes(precursor)) + digest.update(bytes(pubkey)) + digest.update(bytes(dh_point)) + return digest_to_scalar(digest) + + +def hash_to_cfrag_signature(kfrag_id: 'KeyFragID', + commitment: CurvePoint, + precursor: CurvePoint, + maybe_delegating_pk: Optional[PublicKey], + maybe_receiving_pk: Optional[PublicKey], + ) -> 'SignatureDigest': + + digest = SignatureDigest(b"CFRAG_SIGNATURE") + digest.update(bytes(kfrag_id)) + digest.update(bytes(commitment)) + digest.update(bytes(precursor)) + + if maybe_delegating_pk: + digest.update(serialize_bool(True)) + digest.update(bytes(maybe_delegating_pk)) + else: + digest.update(serialize_bool(False)) + + if maybe_receiving_pk: + digest.update(serialize_bool(True)) + digest.update(bytes(maybe_receiving_pk)) + else: + digest.update(serialize_bool(False)) + + return digest + + +class SignatureDigest: + + def __init__(self, dst: bytes): + self._digest = Hash(dst) + + def update(self, value): + self._digest.update(value) + + def sign(self, sk: SecretKey) -> Signature: + signer = Signer(sk, hashes.SHA256) + return signer.sign_digest(self._digest) + + def verify(self, pk: PublicKey, sig: Signature): + return sig.verify_digest(pk, self._digest, hashes.SHA256) + + +def unsafe_hash_to_point(dst: bytes, data: bytes) -> CurvePoint: """ Hashes arbitrary data into a valid EC point of the specified curve, using the try-and-increment method. diff --git a/umbral/key_frag.py b/umbral/key_frag.py new file mode 100644 index 00000000..35181559 --- /dev/null +++ b/umbral/key_frag.py @@ -0,0 +1,259 @@ +import os +from typing import Tuple, List, Optional + +from .curve_point import CurvePoint +from .curve_scalar import CurveScalar +from .hashing import hash_to_shared_secret, hash_to_cfrag_signature, hash_to_polynomial_arg +from .keys import PublicKey, SecretKey +from .params import PARAMETERS +from .serializable import Serializable, serialize_bool, take_bool +from .signing import Signature + + +class KeyFragID(Serializable): + + _ID_SIZE = 32 + + def __init__(self, id_: bytes): + self._id = id_ + + @classmethod + def random(cls) -> 'KeyFragID': + return cls(os.urandom(cls._ID_SIZE)) + + @classmethod + def __take__(cls, data): + id_, remainder = cls.__take_bytes__(data, cls._ID_SIZE) + return cls(id_), remainder + + def __bytes__(self): + return self._id + + +class KeyFragProof(Serializable): + + @classmethod + def from_factory(cls, + factory: 'KeyFragFactory', + kfrag_id: KeyFragID, + kfrag_key: CurveScalar, + sign_delegating_key: bool, + sign_receiving_key: bool, + ) -> 'KeyFragProof': + + params = PARAMETERS + + kfrag_precursor = factory.precursor + signing_sk = factory.signing_sk + delegating_pk = factory.delegating_pk + receiving_pk = factory.receiving_pk + + commitment = params.u * kfrag_key + + signature_for_bob = hash_to_cfrag_signature(kfrag_id, + commitment, + kfrag_precursor, + delegating_pk, + receiving_pk, + ).sign(signing_sk) + + maybe_delegating_pk = delegating_pk if sign_delegating_key else None + maybe_receiving_pk = receiving_pk if sign_receiving_key else None + signature_for_proxy = hash_to_cfrag_signature(kfrag_id, + commitment, + kfrag_precursor, + maybe_delegating_pk, + maybe_receiving_pk + ).sign(signing_sk) + + return cls(commitment, + signature_for_proxy, + signature_for_bob, + sign_delegating_key, + sign_receiving_key) + + def __init__(self, + commitment: CurvePoint, + signature_for_proxy: Signature, + signature_for_bob: Signature, + delegating_key_signed: bool, + receiving_key_signed: bool + ): + + self.commitment = commitment + self.signature_for_proxy = signature_for_proxy + self.signature_for_bob = signature_for_bob + self.delegating_key_signed = delegating_key_signed + self.receiving_key_signed = receiving_key_signed + + @classmethod + def __take__(cls, data): + types = [CurvePoint, Signature, Signature] + (commitment, sig_proxy, sig_bob), data = cls.__take_types__(data, *types) + delegating_key_signed, data = take_bool(data) + receiving_key_signed, data = take_bool(data) + + obj = cls(commitment, sig_proxy, sig_bob, delegating_key_signed, receiving_key_signed) + return obj, data + + def __bytes__(self): + return (bytes(self.commitment) + + bytes(self.signature_for_proxy) + + bytes(self.signature_for_bob) + + serialize_bool(self.delegating_key_signed) + + serialize_bool(self.receiving_key_signed) + ) + + +# Coefficients of the generating polynomial +def poly_eval(coeffs: List[CurveScalar], x: CurveScalar) -> CurveScalar: + result = coeffs[-1] + for coeff in reversed(coeffs[:-1]): + result = (result * x) + coeff + return result + + +class KeyFrag(Serializable): + + def __init__(self, + id_: KeyFragID, + key: CurveScalar, + precursor: CurvePoint, + proof: KeyFragProof): + self.id = id_ + self.key = key + self.precursor = precursor + self.proof = proof + + @classmethod + def __take__(cls, data): + types = [KeyFragID, CurveScalar, CurvePoint, KeyFragProof] + components, data = cls.__take_types__(data, *types) + return cls(*components), data + + def __bytes__(self): + return bytes(self.id) + bytes(self.key) + bytes(self.precursor) + bytes(self.proof) + + @classmethod + def from_factory(cls, + factory: 'KeyFragFactory', + sign_delegating_key: bool, + sign_receiving_key: bool, + ) -> 'KeyFrag': + + kfrag_id = KeyFragID.random() + + # The index of the re-encryption key share (which in Shamir's Secret + # Sharing corresponds to x in the tuple (x, f(x)), with f being the + # generating polynomial), is used to prevent reconstruction of the + # re-encryption key without Bob's intervention + share_index = hash_to_polynomial_arg( + factory.precursor, + factory.bob_pubkey_point, + factory.dh_point, + kfrag_id, + ) + + # The re-encryption key share is the result of evaluating the generating + # polynomial for the index value + rk = poly_eval(factory.coefficients, share_index) + + proof = KeyFragProof.from_factory(factory, + kfrag_id, + rk, + sign_delegating_key, + sign_receiving_key, + ) + + return cls(kfrag_id, rk, factory.precursor, proof) + + def verify(self, + signing_pk: PublicKey, + maybe_delegating_pk: Optional[PublicKey] = None, + maybe_receiving_pk: Optional[PublicKey] = None, + ) -> bool: + + u = PARAMETERS.u + + kfrag_id = self.id + key = self.key + commitment = self.proof.commitment + precursor = self.precursor + + # We check that the commitment is well-formed + if commitment != u * key: + return False + + # A shortcut, perhaps not necessary + delegating_key_missing = self.proof.delegating_key_signed and not bool(maybe_delegating_pk) + receiving_key_missing = self.proof.receiving_key_signed and not bool(maybe_receiving_pk) + + if delegating_key_missing or receiving_key_missing: + return False + + maybe_delegating_pk = maybe_delegating_pk if self.proof.delegating_key_signed else None + maybe_receiving_pk = maybe_receiving_pk if self.proof.receiving_key_signed else None + sig = hash_to_cfrag_signature(kfrag_id, + commitment, + precursor, + maybe_delegating_pk, + maybe_receiving_pk) + return sig.verify(signing_pk, self.proof.signature_for_proxy) + + +class KeyFragFactory: + + def __init__(self, + delegating_sk: SecretKey, + receiving_pk: PublicKey, + signing_sk: SecretKey, + threshold: int, + ): + + g = CurvePoint.generator() + + delegating_pk = PublicKey.from_secret_key(delegating_sk) + + bob_pubkey_point = receiving_pk.point() + + while True: + # The precursor point is used as an ephemeral public key in a DH key exchange, + # and the resulting shared secret 'dh_point' is used to derive other secret values + private_precursor = CurveScalar.random_nonzero(secure=True) + precursor = g * private_precursor + + dh_point = bob_pubkey_point * private_precursor + + # Secret value 'd' allows to make Umbral non-interactive + d = hash_to_shared_secret(precursor, bob_pubkey_point, dh_point) + + # At the moment we cannot statically ensure `d` is not zero, + # but we need it to be non-zero for the algorithm to work. + if not d.is_zero(): + break + + # Coefficients of the generating polynomial + # `invert()` is guaranteed to work because `d` is nonzero. + coefficients = [ + delegating_sk.secret_scalar() * d.invert(), + *[CurveScalar.random_nonzero() for _ in range(threshold-1)]] + + self.signing_sk = signing_sk + self.precursor = precursor + self.bob_pubkey_point = bob_pubkey_point + self.dh_point = dh_point + self.delegating_pk = delegating_pk + self.receiving_pk = receiving_pk + self.coefficients = coefficients + + +def generate_kfrags(delegating_sk: SecretKey, + receiving_pk: PublicKey, + signing_sk: SecretKey, + threshold: int, + num_kfrags: int, + sign_delegating_key: bool = True, + sign_receiving_key: bool = True, + ) -> List[KeyFrag]: + factory = KeyFragFactory(delegating_sk, receiving_pk, signing_sk, threshold) + return [KeyFrag.from_factory(factory, sign_delegating_key, sign_receiving_key) for _ in range(num_kfrags)] diff --git a/umbral/serializable.py b/umbral/serializable.py index 70e93cbb..05934834 100644 --- a/umbral/serializable.py +++ b/umbral/serializable.py @@ -35,3 +35,18 @@ def __take__(cls: Type[T], data: bytes) -> Tuple[T, bytes]: @abstractmethod def __bytes__(self): raise NotImplementedError + + +def serialize_bool(b: bool) -> bytes: + return b'\x01' if b else b'\x00' + + +def take_bool(data: bytes) -> Tuple[bool, bytes]: + bool_bytes, data = Serializable.__take_bytes__(data, 1) + if bool_bytes == b'\x01': + b = True + elif bool_bytes == b'\x00': + b = False + else: + raise ValueError(f"Incorrectly serialized boolean; expected b\\x00' or b\\x01', got {b}") + return b, data diff --git a/umbral/signing.py b/umbral/signing.py new file mode 100644 index 00000000..c7b5f885 --- /dev/null +++ b/umbral/signing.py @@ -0,0 +1,73 @@ +import hmac +from typing import Optional, Type + +from cryptography.hazmat.backends.openssl import backend +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives.asymmetric import utils +from cryptography.hazmat.primitives.asymmetric.ec import ECDSA + + +from .curve import CURVE +from .curve_scalar import CurveScalar +from .keys import PublicKey, SecretKey +from .serializable import Serializable + + +class Signature(Serializable): + """ + Wrapper for ECDSA signatures. + We store signatures as r and s; this class allows interoperation + between (r, s) and DER formatting. + """ + + def __init__(self, r: CurveScalar, s: CurveScalar): + self.r = r + self.s = s + + def __repr__(self): + return f"ECDSA Signature: {bytes(self).hex()[:15]}" + + def verify_digest(self, verifying_key: PublicKey, digest: 'Hash', backend_hash_algorithm) -> bool: + cryptography_pub_key = verifying_key.to_cryptography_pubkey() + signature_algorithm = ECDSA(utils.Prehashed(backend_hash_algorithm())) + message = digest.finalize() + signature_der_bytes = utils.encode_dss_signature(int(self.r), int(self.s)) + + # TODO: Raise error instead of returning boolean + try: + cryptography_pub_key.verify(signature=signature_der_bytes, + data=message, + signature_algorithm=signature_algorithm) + except InvalidSignature: + return False + return True + + @classmethod + def __take__(cls, data): + (r, s), data = cls.__take_types__(data, CurveScalar, CurveScalar) + return cls(r, s), data + + def __bytes__(self): + return bytes(self.r) + bytes(self.s) + + +class Signer: + + def __init__(self, sk: SecretKey, backend_hash_algorithm) -> None: + self.__cryptography_private_key = sk.to_cryptography_privkey() + self.backend_hash_algorithm = backend_hash_algorithm + + def sign_digest(self, digest: 'Hash') -> Signature: + signature_algorithm = ECDSA(utils.Prehashed(self.backend_hash_algorithm())) + message = digest.finalize() + + signature_der_bytes = self.__cryptography_private_key.sign(message, signature_algorithm) + r, s = utils.decode_dss_signature(signature_der_bytes) + + # Normalize s + # s is public, so no constant-timeness required here + order = backend._bn_to_int(CURVE.order) + if s > (order >> 1): + s = order - s + + return Signature(CurveScalar.from_int(r), CurveScalar.from_int(s))