Skip to content

Commit

Permalink
Merge pull request #14 from Indicio-tech/feature/authlib
Browse files Browse the repository at this point in the history
feat: Add Authlib backend and test compatibility with Askar
  • Loading branch information
dbluhm authored Nov 13, 2023
2 parents ca755c4 + 44f7472 commit 675a4c6
Show file tree
Hide file tree
Showing 16 changed files with 926 additions and 227 deletions.
95 changes: 3 additions & 92 deletions didcomm_messaging/crypto/__init__.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,5 @@
"""Key Management Service (CryptoService) interface for DIDComm Messaging."""
"""DIDComm Messaging Cryptography and Secrets Interfaces."""

from .base import CryptoService, SecretsManager, PublicKey, SecretKey, P, S

from abc import ABC, abstractmethod
from typing import Generic, Optional, Sequence, TypeVar, Union

from pydid import VerificationMethod

from .jwe import JweEnvelope


class CryptoServiceError(Exception):
"""Represents an error from a CryptoService."""


class PublicKey(ABC):
"""Key representation for CryptoService."""

@classmethod
@abstractmethod
def from_verification_method(cls, vm: VerificationMethod) -> "PublicKey":
"""Create a Key instance from a DID Document Verification Method."""

@property
@abstractmethod
def kid(self) -> str:
"""Get the key ID."""

@property
@abstractmethod
def multikey(self) -> str:
"""Get the key in multikey format."""


class SecretKey(ABC):
"""Secret Key Type."""

@property
@abstractmethod
def kid(self) -> str:
"""Get the key ID."""


P = TypeVar("P", bound=PublicKey)
S = TypeVar("S", bound=SecretKey)


class CryptoService(ABC, Generic[P, S]):
"""Key Management Service (CryptoService) interface for DIDComm Messaging."""

@abstractmethod
async def ecdh_es_encrypt(self, to_keys: Sequence[P], message: bytes) -> bytes:
"""Encode a message into DIDComm v2 anonymous encryption."""

@abstractmethod
async def ecdh_es_decrypt(
self, wrapper: Union[JweEnvelope, str, bytes], recip_key: S
) -> bytes:
"""Decode a message from DIDComm v2 anonymous encryption."""

@abstractmethod
async def ecdh_1pu_encrypt(
self,
to_keys: Sequence[P],
sender_key: S,
message: bytes,
) -> bytes:
"""Encode a message into DIDComm v2 authenticated encryption."""

@abstractmethod
async def ecdh_1pu_decrypt(
self,
wrapper: Union[JweEnvelope, str, bytes],
recip_key: S,
sender_key: P,
) -> bytes:
"""Decode a message from DIDComm v2 authenticated encryption."""

@classmethod
@abstractmethod
def verification_method_to_public_key(cls, vm: VerificationMethod) -> P:
"""Convert a verification method to a public key."""


class SecretsManager(ABC, Generic[S]):
"""Secrets Resolver interface.
Thie secrets resolver may be used to supplement the CryptoService backend to provide
greater flexibility.
"""

@abstractmethod
async def get_secret_by_kid(self, kid: str) -> Optional[S]:
"""Get a secret key by its ID."""
__all__ = ["CryptoService", "SecretsManager", "PublicKey", "SecretKey", "P", "S"]
1 change: 1 addition & 0 deletions didcomm_messaging/crypto/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Cryptography and Secrets Management backends."""
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
"""Askar backend for DIDComm Messaging."""
from collections import OrderedDict
import hashlib
import json
from typing import Optional, Sequence, Union
import hashlib

from pydid import VerificationMethod
from didcomm_messaging.crypto import SecretsManager
from ..jwe import (
JweBuilder,
JweEnvelope,
JweRecipient,
b64url,
)
from didcomm_messaging.crypto import (

from didcomm_messaging.crypto.base import (
CryptoService,
CryptoServiceError,
PublicKey,
SecretKey,
SecretsManager,
)
from didcomm_messaging.crypto.jwe import JweBuilder, JweEnvelope, JweRecipient, b64url
from didcomm_messaging.multiformats import multibase, multicodec

try:
Expand Down Expand Up @@ -75,44 +71,6 @@ def multikey_to_key(cls, multikey: str) -> Key:
except AskarError as err:
raise ValueError("Invalid key") from err

@classmethod
def _expected_alg_and_material_to_key(
cls,
alg: KeyAlg,
public_key_multibase: Optional[str] = None,
public_key_base58: Optional[str] = None,
) -> Key:
"""Convert an Ed25519 key to an Askar Key instance."""
if public_key_multibase and public_key_base58:
raise ValueError(
"Only one of public_key_multibase or public_key_base58 must be given"
)
if not public_key_multibase and not public_key_base58:
raise ValueError(
"One of public_key_multibase or public_key_base58 must be given)"
)

if public_key_multibase:
decoded = multibase.decode(public_key_multibase)
if len(decoded) == 32:
# No multicodec prefix
try:
key = Key.from_public_bytes(alg, decoded)
except AskarError as err:
raise ValueError("Invalid key") from err
return key
else:
key = cls.multikey_to_key(public_key_multibase)
if key.algorithm != alg:
raise ValueError("Type and algorithm mismatch")
return key

if public_key_base58:
decoded = multibase.decode("z" + public_key_base58)
return Key.from_public_bytes(alg, decoded)

raise ValueError("Failed to parse key")

@classmethod
def from_verification_method(cls, vm: VerificationMethod) -> "AskarKey":
"""Create a Key instance from a DID Document Verification Method."""
Expand All @@ -133,11 +91,8 @@ def from_verification_method(cls, vm: VerificationMethod) -> "AskarKey":
if not alg:
raise ValueError("Unsupported verification method type: {vm_type}")

base58 = vm.public_key_base58
multi = vm.public_key_multibase
key = cls._expected_alg_and_material_to_key(
alg, public_key_base58=base58, public_key_multibase=multi
)
key_bytes = cls.key_bytes_from_verification_method(vm)
key = Key.from_public_bytes(alg, key_bytes)
return cls(key, kid)

@property
Expand Down Expand Up @@ -187,26 +142,37 @@ async def ecdh_es_encrypt(
except AskarError:
raise CryptoServiceError("Error creating content encryption key")

apv = []
for recip_key in to_keys:
apv.append(recip_key.kid)
apv.sort()
apv = hashlib.sha256((".".join(apv)).encode()).digest()

for recip_key in to_keys:
try:
epk = Key.generate(recip_key.key.algorithm, ephemeral=True)
except AskarError:
raise CryptoServiceError("Error creating ephemeral key")
enc_key = ecdh.EcdhEs(alg_id, None, None).sender_wrap_key( # type: ignore
enc_key = ecdh.EcdhEs(alg_id, None, apv).sender_wrap_key( # type: ignore
wrap_alg, epk, recip_key.key, cek
)
builder.add_recipient(
JweRecipient(
encrypted_key=enc_key.ciphertext,
header={"kid": recip_key.kid, "epk": epk.get_jwk_public()},
header={
"kid": recip_key.kid,
"epk": json.loads(epk.get_jwk_public()),
},
)
)

builder.set_protected(
OrderedDict(
[
("typ", "application/didcomm-encrypted+json"),
("alg", alg_id),
("enc", enc_id),
("apv", b64url(apv)),
]
)
)
Expand All @@ -220,14 +186,13 @@ async def ecdh_es_encrypt(

async def ecdh_es_decrypt(
self,
wrapper: Union[JweEnvelope, str, bytes],
enc_message: Union[str, bytes],
recip_key: AskarSecretKey,
) -> bytes:
"""Decode a message from DIDComm v2 anonymous encryption."""
if isinstance(wrapper, bytes):
wrapper = wrapper.decode("utf-8")
if not isinstance(wrapper, JweEnvelope):
wrapper = JweEnvelope.from_json(wrapper)
if isinstance(enc_message, bytes):
wrapper = enc_message.decode("utf-8")
wrapper = JweEnvelope.from_json(enc_message)

alg_id = wrapper.protected.get("alg")

Expand Down Expand Up @@ -263,12 +228,10 @@ async def ecdh_es_decrypt(
except AskarError:
raise CryptoServiceError("Error loading ephemeral key")

apu = recip.header.get("apu")
apv = recip.header.get("apv")
# apu and apv are allowed to be None

try:
cek = ecdh.EcdhEs(alg_id, apu, apv).receiver_unwrap_key( # type: ignore
cek = ecdh.EcdhEs(
alg_id, None, wrapper.apv_bytes
).receiver_unwrap_key( # type: ignore
wrap_alg,
enc_alg,
epk,
Expand Down Expand Up @@ -318,7 +281,7 @@ async def ecdh_1pu_encrypt(
except AskarError:
raise CryptoServiceError("Error creating ephemeral key")

apu = b64url(sender_key.kid)
apu = sender_key.kid
apv = []
for recip_key in to_keys:
if agree_alg:
Expand All @@ -328,15 +291,15 @@ async def ecdh_1pu_encrypt(
agree_alg = recip_key.key.algorithm
apv.append(recip_key.kid)
apv.sort()
apv = b64url(hashlib.sha256((".".join(apv)).encode()).digest())
apv = hashlib.sha256((".".join(apv)).encode()).digest()

builder.set_protected(
OrderedDict(
[
("alg", alg_id),
("enc", enc_id),
("apu", apu),
("apv", apv),
("apu", b64url(apu)),
("apv", b64url(apv)),
("epk", json.loads(epk.get_jwk_public())),
("skid", sender_key.kid),
]
Expand All @@ -362,15 +325,14 @@ async def ecdh_1pu_encrypt(

async def ecdh_1pu_decrypt(
self,
wrapper: Union[JweEnvelope, str, bytes],
enc_message: Union[str, bytes],
recip_key: AskarSecretKey,
sender_key: AskarKey,
):
"""Decode a message from DIDComm v2 authenticated encryption."""
if isinstance(wrapper, bytes):
wrapper = wrapper.decode("utf-8")
if not isinstance(wrapper, JweEnvelope):
wrapper = JweEnvelope.from_json(wrapper)
if isinstance(enc_message, bytes):
wrapper = enc_message.decode("utf-8")
wrapper = JweEnvelope.from_json(enc_message)

alg_id = wrapper.protected.get("alg")
if alg_id and alg_id in ("ECDH-1PU+A128KW", "ECDH-1PU+A256KW"):
Expand All @@ -397,12 +359,10 @@ async def ecdh_1pu_decrypt(
except AskarError:
raise CryptoServiceError("Error loading ephemeral key")

apu = wrapper.protected.get("apu")
apv = wrapper.protected.get("apv")
# apu and apv are allowed to be None

try:
cek = ecdh.Ecdh1PU(alg_id, apu, apv).receiver_unwrap_key( # type: ignore
cek = ecdh.Ecdh1PU(
alg_id, wrapper.apu_bytes, wrapper.apv_bytes
).receiver_unwrap_key( # type: ignore
wrap_alg,
enc_alg,
epk,
Expand Down
Loading

0 comments on commit 675a4c6

Please sign in to comment.