diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..cb18a74 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,11 @@ +[mypy] +[mypy-cbor2] +ignore_missing_imports = True +[mypy-pycose.*] +ignore_missing_imports = True +[mypy-ccf.*] +ignore_missing_imports = True +[mypy-aiotools.*] +ignore_missing_imports = True +[mypy-setuptools.*] +ignore_missing_imports = True diff --git a/pyscitt/pyscitt/cli/create_did_web.py b/pyscitt/pyscitt/cli/create_did_web.py index ca59666..4cc2229 100644 --- a/pyscitt/pyscitt/cli/create_did_web.py +++ b/pyscitt/pyscitt/cli/create_did_web.py @@ -13,25 +13,6 @@ DID_FILENAME = "did.json" -def get_did_web_doc_url_from_did(did: str) -> str: - rest = did.replace(DID_WEB_PREFIX, "") - try: - rest.index(":") - except: - return ( - DID_WEB_DOC_URL_PREFIX - + rest.replace(ENCODED_COLON, ":") - + DID_WEB_DOC_WELLKNOWN_PATH - + DID_WEB_DOC_URL_SUFFIX - ) - else: - return ( - DID_WEB_DOC_URL_PREFIX - + rest.replace(":", "/").replace(ENCODED_COLON, ":") - + DID_WEB_DOC_URL_SUFFIX - ) - - def write_file(path: Path, contents: str): print(f"Writing {path}") path.write_text(contents) @@ -46,6 +27,7 @@ def create_did_web( ): parsed = urlsplit(base_url) + assert parsed.hostname did = format_did_web( host=parsed.hostname, port=parsed.port, path=parsed.path.lstrip("/") ) diff --git a/pyscitt/pyscitt/cli/prefix_tree.py b/pyscitt/pyscitt/cli/prefix_tree.py index 0910896..65dc026 100644 --- a/pyscitt/pyscitt/cli/prefix_tree.py +++ b/pyscitt/pyscitt/cli/prefix_tree.py @@ -25,7 +25,7 @@ def prefix_tree_debug(client: Client): def prefix_tree_get_receipt( client: Client, - claim_path: Optional[str], + claim_path: Optional[Path], issuer: Optional[str], feed: Optional[str], output: Optional[Path], diff --git a/pyscitt/pyscitt/cli/sign_claims.py b/pyscitt/pyscitt/cli/sign_claims.py index a9a5630..329e355 100644 --- a/pyscitt/pyscitt/cli/sign_claims.py +++ b/pyscitt/pyscitt/cli/sign_claims.py @@ -53,11 +53,11 @@ def value(self) -> crypto.RegistrationInfoValue: else: data = self.content.encode("ascii") - if self.type == RegistrationInfoType.INT: + if self.type is RegistrationInfoType.INT: return int(data.decode("utf-8")) - elif self.type == RegistrationInfoType.TEXT: + elif self.type is RegistrationInfoType.TEXT: return data.decode("utf-8") - elif self.type == RegistrationInfoType.BYTES: + elif self.type is RegistrationInfoType.BYTES: return data diff --git a/pyscitt/pyscitt/cli/submit_signed_claims.py b/pyscitt/pyscitt/cli/submit_signed_claims.py index 4dac5b9..fd17d9f 100644 --- a/pyscitt/pyscitt/cli/submit_signed_claims.py +++ b/pyscitt/pyscitt/cli/submit_signed_claims.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +import argparse from pathlib import Path from typing import Optional @@ -23,15 +24,15 @@ def submit_signed_claimset( with open(path, "rb") as f: signed_claimset = f.read() - submission = client.submit_claim( - signed_claimset, skip_confirmation=skip_confirmation, decode=False - ) - - print(f"Submitted {path} as transaction {submission.tx}") if skip_confirmation: - print("Confirmation fo submission was skipped! Claim may not be registered.") + tx = client.submit_claim(signed_claimset, skip_confirmation=True).tx + print(f"Submitted {path} as transaction {tx}") + print("Confirmation of submission was skipped! Claim may not be registered.") return + submission = client.submit_claim(signed_claimset) + print(f"Submitted {path} as transaction {submission.tx}") + if receipt_path: with open(receipt_path, "wb") as f: f.write(submission.receipt) diff --git a/pyscitt/pyscitt/client.py b/pyscitt/pyscitt/client.py index 8a8a2a0..27a4c73 100644 --- a/pyscitt/pyscitt/client.py +++ b/pyscitt/pyscitt/client.py @@ -6,7 +6,7 @@ import time from dataclasses import dataclass from http import HTTPStatus -from typing import Iterable, Optional, Tuple, Union +from typing import Generic, Iterable, Literal, Optional, Tuple, TypeVar, Union, overload from urllib.parse import urlencode import httpx @@ -66,6 +66,9 @@ def __str__(self): return f"{self.code}: {self.message}" +SelfClient = TypeVar("SelfClient", bound="BaseClient") + + class BaseClient: """ Wrapper around an HTTP client, with facilities to interact with a CCF-based @@ -128,14 +131,14 @@ def __init__( base_url=url, headers=headers, verify=not development ) - def replace(self, **kwargs): + def replace(self: SelfClient, **kwargs) -> SelfClient: """ Create a new instance with certain parameters modified. Any parameters that weren't specified will be inherited from the current instance. The accepted keyword arguments are the same as those of the constructor. """ - values = { + values: dict = { "url": self.url, "auth_token": self.auth_token, "member_auth": self.member_auth, @@ -269,8 +272,11 @@ def get_historical(self, *args, retry_on=[], **kwargs): ) +T = TypeVar("T", bound=Optional[bytes], covariant=True) + + @dataclass -class Submission: +class Submission(Generic[T]): """ The result of submitting a claim to the service. The presence and format of the receipt is depends on arguments passed to the `submit_claim` @@ -278,7 +284,7 @@ class Submission: """ tx: str - receipt: Optional[Union[bytes, Receipt]] + receipt: T @property def seqno(self) -> int: @@ -307,9 +313,21 @@ def get_constitution(self) -> str: def get_version(self) -> dict: return self.get("/version").json() + @overload + def submit_claim( + self, claim: bytes, *, skip_confirmation: Literal[False] = False + ) -> Submission[bytes]: + ... + + @overload + def submit_claim( + self, claim: bytes, *, skip_confirmation: Literal[True] + ) -> Submission[None]: + ... + def submit_claim( - self, claim: bytes, *, skip_confirmation=False, decode=True - ) -> Submission: + self, claim: bytes, *, skip_confirmation=False + ) -> Union[Submission[bytes], Submission[None]]: headers = {"Content-Type": "application/cose"} response = self.post( "/entries", @@ -324,7 +342,7 @@ def submit_claim( if skip_confirmation: return Submission(tx, None) else: - receipt = self.get_receipt(tx, decode=decode) + receipt = self.get_receipt(tx, decode=False) return Submission(tx, receipt) def get_claim(self, tx: str, *, embed_receipt=False) -> bytes: @@ -333,6 +351,14 @@ def get_claim(self, tx: str, *, embed_receipt=False) -> bytes: ) return response.content + @overload + def get_receipt(self, tx: str, *, decode: Literal[True] = True) -> Receipt: + ... + + @overload + def get_receipt(self, tx: str, *, decode: Literal[False]) -> bytes: + ... + def get_receipt(self, tx: str, *, decode=True) -> Union[bytes, Receipt]: response = self.get_historical(f"/entries/{tx}/receipt") if decode: diff --git a/pyscitt/pyscitt/crypto.py b/pyscitt/pyscitt/crypto.py index f78473d..836f391 100644 --- a/pyscitt/pyscitt/crypto.py +++ b/pyscitt/pyscitt/crypto.py @@ -7,7 +7,7 @@ import json import warnings from pathlib import Path -from typing import Dict, Optional, Tuple, Union +from typing import Dict, List, Optional, Tuple, Union from uuid import uuid4 warnings.filterwarnings("ignore", category=Warning) @@ -174,7 +174,12 @@ def generate_cert( if not cn: cn = str(uuid4()) subject_priv = load_pem_private_key(private_key_pem.encode("ascii"), None) + assert isinstance( + subject_priv, (RSAPrivateKey, EllipticCurvePrivateKey, Ed25519PrivateKey) + ) + subject_pub_key = subject_priv.public_key() + subject = x509.Name( [ x509.NameAttribute(NameOID.COMMON_NAME, cn), @@ -196,6 +201,9 @@ def generate_cert( issuer_private_key_pem.encode("ascii"), None, ) + assert isinstance( + issuer_priv_key, (RSAPrivateKey, EllipticCurvePrivateKey, Ed25519PrivateKey) + ) else: issuer_priv_key = subject_priv cert = ( @@ -234,15 +242,6 @@ def get_pub_key_type(pub_pem: str) -> str: raise NotImplementedError("unsupported key type") -def get_cert_key_type(cert_pem: str) -> str: - cert = load_pem_x509_certificate(cert_pem.encode("ascii")) - if isinstance(cert.public_key(), RSAPublicKey): - return "rsa" - elif isinstance(cert.public_key(), EllipticCurvePublicKey): - return "ec" - raise NotImplementedError("unsupported key type") - - def get_cert_info(pem: str) -> dict: cert = load_pem_x509_certificate(pem.encode("ascii")) cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0].value @@ -323,13 +322,15 @@ def default_algorithm_for_private_key(key_pem: Pem) -> str: def verify_cose_sign1(buf: bytes, cert_pem: str): - key_type = get_cert_key_type(cert_pem) cert = load_pem_x509_certificate(cert_pem.encode("ascii")) key = cert.public_key() - if key_type == "rsa": + if isinstance(key, RSAPublicKey): cose_key = from_cryptography_rsakey_obj(key) - else: + elif isinstance(key, EllipticCurvePublicKey): cose_key = from_cryptography_eckey_obj(key) + else: + raise NotImplementedError("unsupported key type") + msg = Sign1Message.decode(buf) msg.key = cose_key if not msg.verify_signature(): @@ -386,7 +387,7 @@ def pretty_cose_sign1(buf: bytes) -> str: # temporary, from https://github.com/BrianSipos/pycose/blob/rsa_keys_algs/cose/keys/rsa.py # until https://github.com/TimothyClaeys/pycose/issues/44 is implemented -def from_cryptography_rsakey_obj(ext_key) -> RSAKey: +def from_cryptography_rsakey_obj(ext_key: Union[RSAPrivateKey, RSAPublicKey]) -> RSAKey: """ Returns an initialized COSE Key object of type RSAKey. :param ext_key: Python cryptography key. @@ -426,7 +427,9 @@ def to_bstr(dec): return RSAKey.from_dict(cose_key) -def from_cryptography_eckey_obj(ext_key) -> EC2Key: +def from_cryptography_eckey_obj( + ext_key: Union[EllipticCurvePrivateKey, EllipticCurvePublicKey] +) -> EC2Key: """ Returns an initialized COSE Key object of type EC2Key. :param ext_key: Python cryptography key. @@ -463,7 +466,9 @@ def from_cryptography_eckey_obj(ext_key) -> EC2Key: return EC2Key.from_dict(cose_key) -def from_cryptography_ed25519key_obj(ext_key) -> OKPKey: +def from_cryptography_ed25519key_obj( + ext_key: Union[Ed25519PrivateKey, Ed25519PublicKey] +) -> OKPKey: """ Returns an initialized COSE Key object of type OKPKey. :param ext_key: Python cryptography key. @@ -654,7 +659,7 @@ class Signer: issuer: Optional[str] kid: Optional[str] algorithm: str - x5c: Optional[list] + x5c: Optional[List[Pem]] def __init__( self, @@ -662,7 +667,7 @@ def __init__( issuer: Optional[str] = None, kid: Optional[str] = None, algorithm: Optional[str] = None, - x5c: Optional[str] = None, + x5c: Optional[List[Pem]] = None, ): """ If no algorithm is specified, a sensible default is inferred from the private key. @@ -682,7 +687,7 @@ def sign_claimset( feed: Optional[str] = None, registration_info: RegistrationInfo = {}, ) -> bytes: - headers = {} + headers: dict = {} headers[pycose.headers.Algorithm] = signer.algorithm headers[pycose.headers.ContentType] = content_type @@ -704,7 +709,7 @@ def sign_claimset( def sign_json_claimset( signer: Signer, - claims: json, + claims: dict, content_type: str = "application/vnd.dummy+json", feed: Optional[str] = None, ) -> bytes: diff --git a/pyscitt/pyscitt/prefix_tree.py b/pyscitt/pyscitt/prefix_tree.py index 75137e2..d205277 100644 --- a/pyscitt/pyscitt/prefix_tree.py +++ b/pyscitt/pyscitt/prefix_tree.py @@ -62,13 +62,13 @@ def __post_init__(self): def hash(self, index: bytes, leaf: bytes) -> bytes: positions = bitvector(self.positions) hashes = reversed(self.hashes) - index = bitvector(index) + index_bits = bitvector(index) current = leaf for i in reversed(range(256)): if positions[i]: - node = hashlib.sha256(index.prefix(i)) - if index[i]: + node = hashlib.sha256(index_bits.prefix(i)) + if index_bits[i]: node.update(next(hashes)) node.update(current) else: diff --git a/pyscitt/pyscitt/py.typed b/pyscitt/pyscitt/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/pyscitt/pyscitt/receipt.py b/pyscitt/pyscitt/receipt.py index 6c525ac..f2e006c 100644 --- a/pyscitt/pyscitt/receipt.py +++ b/pyscitt/pyscitt/receipt.py @@ -21,7 +21,7 @@ TREE_ALGORITHM_CCF = "CCF" -def hdr_as_dict(phdr: list) -> dict: +def hdr_as_dict(phdr: dict) -> dict: """ Return a representation of a list of COSE header parameters that is amenable to pretty-printing. @@ -58,7 +58,12 @@ def as_dict(self) -> dict: @classmethod def from_cose_obj(self, headers: dict, cose_obj: Any) -> "ReceiptContents": if headers.get(HEADER_PARAM_TREE_ALGORITHM) == TREE_ALGORITHM_CCF: - return CCFReceiptContents.from_cose_obj(cose_obj) + return CCFReceiptContents( + cose_obj[0], + cose_obj[1], + cose_obj[2], + LeafInfo.from_cose_obj(cose_obj[3]), + ) else: raise ValueError("unsupported tree algorithm, cannot decode receipt") @@ -70,12 +75,6 @@ class CCFReceiptContents(ReceiptContents): inclusion_proof: list leaf_info: LeafInfo - @classmethod - def from_cose_obj(cls, cose_obj: list) -> "ReceiptContents": - return cls( - cose_obj[0], cose_obj[1], cose_obj[2], LeafInfo.from_cose_obj(cose_obj[3]) - ) - def root(self, claims_digest: bytes) -> bytes: leaf = self.leaf_info.digest(claims_digest).hex() diff --git a/scripts/ci-checks.sh b/scripts/ci-checks.sh index 8259323..bb72798 100755 --- a/scripts/ci-checks.sh +++ b/scripts/ci-checks.sh @@ -30,7 +30,12 @@ then fi source scripts/venv/bin/activate -pip install --disable-pip-version-check -q -U black isort +pip install --disable-pip-version-check -q -U black isort mypy +pip install --disable-pip-version-check -q -e ./pyscitt +pip install --disable-pip-version-check -q -r test/requirements.txt + +echo "-- Python types" +git ls-files | grep -e '\.py$' | xargs mypy echo "-- Python imports" if [ $FIX -ne 0 ]; then diff --git a/test/infra/__init__.py b/test/infra/__init__.py new file mode 100644 index 0000000..59e481e --- /dev/null +++ b/test/infra/__init__.py @@ -0,0 +1,2 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. diff --git a/test/infra/x5chain_certificate_authority.py b/test/infra/x5chain_certificate_authority.py index 90d5464..4a1ad15 100644 --- a/test/infra/x5chain_certificate_authority.py +++ b/test/infra/x5chain_certificate_authority.py @@ -1,7 +1,10 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +from typing import List, Tuple + from pyscitt import crypto +from pyscitt.crypto import Pem class X5ChainCertificateAuthority: @@ -15,32 +18,25 @@ def __init__(self, host: str = "localhost", **kwargs): def cert_bundle(self) -> str: return self.root_cert_pem - def create_identity(self, x5c_len: int, **kwargs) -> crypto.Signer: + def create_identity(self, length: int, **kwargs) -> crypto.Signer: """ Create a new identity for x5c signer - """ algorithm = kwargs.pop("alg") - x5c, x5c_priv_key_list = self.create_x5c_node_pem( - self.root_key_pem, self.root_cert_pem, x5c_len, **kwargs - ) - return crypto.Signer(x5c_priv_key_list[0], algorithm=algorithm, x5c=x5c) - - def create_x5c_node_pem( - self, - root_priv_key_pem: crypto.Pem, - root_cert_pem: crypto.Pem, - x5c_len: int, - **kwargs - ): - x5c = [root_cert_pem] - x5c_priv_key_list = [root_priv_key_pem] - for i in range(1, x5c_len): - priv_key_pem, _ = crypto.generate_keypair(**kwargs) - ca = i < x5c_len - 1 + x5c, private_key = self.create_chain(length, **kwargs) + return crypto.Signer(private_key, algorithm=algorithm, x5c=x5c) + + def create_chain(self, length: int, **kwargs) -> Tuple[List[Pem], Pem]: + x5c = [self.root_cert_pem] + private_key = self.root_key_pem + + for i in range(length): + next_private_key, _ = crypto.generate_keypair(**kwargs) + ca = i < length cert_pem = crypto.generate_cert( - priv_key_pem, x5c[-1], x5c_priv_key_list[-1], ca=ca + next_private_key, x5c[-1], private_key, ca=ca ) x5c.append(cert_pem) - x5c_priv_key_list.append(priv_key_pem) - return x5c[::-1], x5c_priv_key_list[::-1] + private_key = next_private_key + + return x5c[::-1], private_key diff --git a/test/load_test/locustfile.py b/test/load_test/locustfile.py index 6a67975..23d5d37 100644 --- a/test/load_test/locustfile.py +++ b/test/load_test/locustfile.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +# type: ignore import random import subprocess diff --git a/test/load_test/start.sh b/test/load_test/start.sh deleted file mode 100755 index d5c5782..0000000 --- a/test/load_test/start.sh +++ /dev/null @@ -1,28 +0,0 @@ -#!/bin/bash -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -set -e - -HEADLESS=${HEADLESS:-1} -CCF_URL=${CCF_URL:-"https://127.0.0.1:8000"} -NUM_USERS=${NUM_USERS:-100} -SPAWN_RATE=${SPAWN_RATE:-10} # per second - -if [ ! -f "venv/bin/activate" ]; then - python3.8 -m venv "venv" -fi -source venv/bin/activate -pip install --disable-pip-version-check -q -r test/requirements.txt -export PYTHONPATH="test/" - -if [ "$HEADLESS" -eq 1 ]; then - echo "Running in headless mode" - args="--headless -t 15" -else - echo "Running in interactive mode" - args="--autostart" -fi - -locust -f test/load_test/locustfile.py \ - -H "$CCF_URL" -u "$NUM_USERS" -r "$SPAWN_RATE" $args diff --git a/test/test_auth.py b/test/test_auth.py index 13ce412..cb1891b 100644 --- a/test/test_auth.py +++ b/test/test_auth.py @@ -7,7 +7,7 @@ from infra.jwt_issuer import JwtIssuer from pyscitt import crypto -from pyscitt.client import ServiceError +from pyscitt.client import Client, ServiceError class TestAuthentication: @@ -45,7 +45,7 @@ def f(*, allow_unauthenticated: bool, required_claims=None): return f @pytest.fixture - def submit(self, client, claims): + def submit(self, client: Client, claims): def f(**kwargs): client.replace(**kwargs).submit_claim(claims) diff --git a/test/test_ccf.py b/test/test_ccf.py index 5169c81..992952e 100644 --- a/test/test_ccf.py +++ b/test/test_ccf.py @@ -8,7 +8,7 @@ from infra.did_web_server import DIDWebServer from infra.x5chain_certificate_authority import X5ChainCertificateAuthority from pyscitt import crypto, governance -from pyscitt.client import ServiceError +from pyscitt.client import Client, ServiceError @pytest.mark.parametrize( @@ -23,7 +23,7 @@ {"alg": "EdDSA", "kty": "ed25519"}, ], ) -def test_submit_claim(client, did_web, trust_store, params): +def test_submit_claim(client: Client, did_web, trust_store, params): """ Submit claims to the SCITT CCF ledger and verify the resulting receipts. @@ -33,7 +33,7 @@ def test_submit_claim(client, did_web, trust_store, params): # Sign and submit a dummy claim using our new identity claims = crypto.sign_json_claimset(identity, {"foo": "bar"}) - receipt = client.submit_claim(claims, decode=False).receipt + receipt = client.submit_claim(claims).receipt crypto.verify_cose_with_receipt(claims, trust_store, receipt) @@ -42,21 +42,20 @@ def test_submit_claim(client, did_web, trust_store, params): @pytest.mark.parametrize( - "params", + "params,x5c_len", [ - {"alg": "PS384", "kty": "rsa", "x5c_len": 1}, - {"alg": "PS384", "kty": "rsa", "x5c_len": 2}, - {"alg": "PS384", "kty": "rsa", "x5c_len": 3}, - {"alg": "ES256", "kty": "ec", "ec_curve": "P-256", "x5c_len": 1}, + ({"alg": "PS384", "kty": "rsa"}, 1), + ({"alg": "PS384", "kty": "rsa"}, 2), + ({"alg": "PS384", "kty": "rsa"}, 3), + ({"alg": "ES256", "kty": "ec", "ec_curve": "P-256"}, 1), ], ) -def test_submit_claim_x5c(client, trust_store, params: dict): +def test_submit_claim_x5c(client: Client, trust_store, params: dict, x5c_len: int): """ Submit claims to the SCITT CCF ledger and verify the resulting receipts for x5c. Test is parametrized over different signing parameters. """ - x5c_len = params.pop("x5c_len") x5c_ca = X5ChainCertificateAuthority(**params) client.governance.propose( @@ -68,19 +67,27 @@ def test_submit_claim_x5c(client, trust_store, params: dict): # Sign and submit a dummy claim using our new identity claims = crypto.sign_json_claimset(identity, {"foo": "bar"}) - receipt = client.submit_claim(claims, decode=False).receipt + receipt = client.submit_claim(claims).receipt crypto.verify_cose_with_receipt(claims, trust_store, receipt) - # x5c chain missing cert - if x5c_len > 1: - identity.x5c = identity.x5c[1:] - claims = crypto.sign_json_claimset(identity, {"foo": "bar"}) - with pytest.raises(ServiceError, match="Signature verification failed"): - client.submit_claim(claims) +def test_invalid_x5c(client: Client, trust_store): + x5c_ca = X5ChainCertificateAuthority(alg="ES256", kty="ec") + client.governance.propose( + governance.set_ca_bundle_proposal("x509_roots", x5c_ca.cert_bundle), + must_pass=True, + ) + + # Create a signer with a missing certificate in the chain + x5c, private_key = x5c_ca.create_chain(length=3, kty="ec") + identity = crypto.Signer(private_key, x5c=x5c[1:]) + claims = crypto.sign_json_claimset(identity, {"foo": "bar"}) + + with pytest.raises(ServiceError, match="Signature verification failed"): + client.submit_claim(claims) -def test_default_did_port(client, trust_store, tmp_path): +def test_default_did_port(client: Client, trust_store, tmp_path): """ Submit a claim using a DID web server running on the default port 443. @@ -111,5 +118,5 @@ def test_default_did_port(client, trust_store, tmp_path): # Sign and submit a dummy claim using our new identity claims = crypto.sign_json_claimset(identity, {"foo": "bar"}) - receipt = client.submit_claim(claims, decode=False).receipt + receipt = client.submit_claim(claims).receipt crypto.verify_cose_with_receipt(claims, trust_store, receipt) diff --git a/test/test_configuration.py b/test/test_configuration.py index f355fad..e885e5c 100644 --- a/test/test_configuration.py +++ b/test/test_configuration.py @@ -8,7 +8,8 @@ from infra.did_web_server import DIDWebServer from pyscitt import crypto -from pyscitt.client import ServiceError +from pyscitt.client import Client, ServiceError +from pyscitt.receipt import Receipt class TestAcceptedAlgorithms: @@ -17,7 +18,7 @@ def not_allowed(self, f): f() @pytest.fixture - def submit(self, client, did_web: DIDWebServer): + def submit(self, client: Client, did_web: DIDWebServer): def f(**kwargs): """Sign and submit the claims with a new identity""" identity = did_web.create_identity(**kwargs) @@ -67,32 +68,32 @@ def identity(self, did_web): def claims(self, identity): return crypto.sign_json_claimset(identity, {"foo": "bar"}) - def test_reject_all_issuers(self, client, configure_service, claims): + def test_reject_all_issuers(self, client: Client, configure_service, claims): # Start with a configuration with no accepted issuers. # The service should reject anything we submit to it. configure_service({"policy": {"accepted_did_issuers": []}}) self.not_allowed(lambda: client.submit_claim(claims)) - def test_wrong_accepted_issuer(self, client, configure_service, claims): + def test_wrong_accepted_issuer(self, client: Client, configure_service, claims): # Add just one issuer to the policy. Claims signed not with this # issuer are rejected. configure_service({"policy": {"accepted_did_issuers": ["else"]}}) self.not_allowed(lambda: client.submit_claim(claims)) - def test_allow_any_issuer(self, client, configure_service, claims): + def test_allow_any_issuer(self, client: Client, configure_service, claims): # If no accepted_issuers are defined in the policy, any issuers # are accepted. configure_service({"policy": {}}) client.submit_claim(claims) - def test_valid_issuer(self, client, configure_service, identity, claims): + def test_valid_issuer(self, client: Client, configure_service, identity, claims): # Add just one issuer to the policy. Claims signed with this # issuer are accepted. configure_service({"policy": {"accepted_did_issuers": [identity.issuer]}}) client.submit_claim(claims) def test_multiple_accepted_issuers( - self, client, configure_service, identity, claims + self, client: Client, configure_service, identity, claims ): # Add multiple issuers to the policy. Claims signed with this # issuer are accepted. @@ -108,16 +109,16 @@ def claim(self, did_web): identity = did_web.create_identity() return crypto.sign_json_claimset(identity, {"foo": "bar"}) - def test_without_identifier(self, client, configure_service, claim): + def test_without_identifier(self, client: Client, configure_service, claim): configure_service({}) # By default, the service runs without a configured identity. # The receipts it returns have no issuer or kid. - receipt = client.submit_claim(claim).receipt + receipt = Receipt.decode(client.submit_claim(claim).receipt) assert crypto.COSE_HEADER_PARAM_ISSUER not in receipt.phdr assert pycose.headers.KID not in receipt.phdr - def test_with_identifier(self, client, configure_service, claim): + def test_with_identifier(self, client: Client, configure_service, claim): parameters = client.get_parameters() service_identifier = "did:web:ledger.example.com" configure_service({"service_identifier": service_identifier}) @@ -126,7 +127,7 @@ def test_with_identifier(self, client, configure_service, claim): # and kid. # Somewhat confusingly, what the old `/parameters` endpoint calls the # "service identity" is used as a KID in the receipts. - receipt = client.submit_claim(claim).receipt + receipt = Receipt.decode(client.submit_claim(claim).receipt) assert receipt.phdr[crypto.COSE_HEADER_PARAM_ISSUER] == service_identifier assert ( receipt.phdr[pycose.headers.KID].decode("ascii") == parameters["serviceId"] diff --git a/test/test_encoding.py b/test/test_encoding.py index 0efda0f..c92d99d 100644 --- a/test/test_encoding.py +++ b/test/test_encoding.py @@ -8,6 +8,7 @@ from pycose.messages import Sign1Message from pyscitt import crypto +from pyscitt.client import Client class TestNonCanonicalEncoding: @@ -48,23 +49,23 @@ def claim(self, did_web): message = [protected, dict(), payload, signature] return cbor2.dumps(cbor2.CBORTag(Sign1Message.cbor_tag, message)) - def test_submit_claim(self, client, trust_store, claim): + def test_submit_claim(self, client: Client, trust_store, claim): """The ledger should accept claims even if not canonically encoded.""" - client.submit_claim(claim, decode=False) + client.submit_claim(claim) @pytest.mark.xfail( reason="pycose does not preserve the original encoding (https://github.com/TimothyClaeys/pycose/pull/91)", raises=InvalidSignature, ) - def test_verify_receipt(self, client, trust_store, claim): + def test_verify_receipt(self, client: Client, trust_store, claim): """We should be able to verify the produced receipt.""" # Once the xfail is fixed, this test can be merged with test_submit_claim. - receipt = client.submit_claim(claim, decode=False).receipt + receipt = client.submit_claim(claim).receipt crypto.verify_cose_with_receipt(claim, trust_store, receipt) - def test_embed_receipt(self, client, trust_store, claim): + def test_embed_receipt(self, client: Client, trust_store, claim): """When embedding a receipt in a claim, the ledger should not affect the original encoding.""" - tx = client.submit_claim(claim, decode=False).tx + tx = client.submit_claim(claim).tx embedded = client.get_claim(tx, embed_receipt=True) original_pieces = cbor2.loads(claim).value diff --git a/test/test_historical.py b/test/test_historical.py index 6e1d32c..1a4df1b 100644 --- a/test/test_historical.py +++ b/test/test_historical.py @@ -6,17 +6,18 @@ import pytest from pyscitt import crypto +from pyscitt.client import Client class TestHistorical: @pytest.fixture(scope="class") - def submissions(self, did_web, client): + def submissions(self, client: Client, did_web): COUNT = 5 identity = did_web.create_identity() result = [] for i in range(COUNT): claim = crypto.sign_json_claimset(identity, {"value": i}) - submission = client.submit_claim(claim, decode=False) + submission = client.submit_claim(claim) result.append( SimpleNamespace( claim=claim, @@ -27,7 +28,7 @@ def submissions(self, did_web, client): ) return result - def test_enumerate_claims(self, client, submissions): + def test_enumerate_claims(self, client: Client, submissions): seqnos = list( client.enumerate_claims( start=submissions[0].seqno, end=submissions[-1].seqno @@ -38,17 +39,19 @@ def test_enumerate_claims(self, client, submissions): # If we did, we'd have to check for a sub-list instead. assert [s.tx for s in submissions] == seqnos - def test_get_receipt(self, client, trust_store, submissions): + def test_get_receipt(self, client: Client, trust_store, submissions): for s in submissions: receipt = client.get_receipt(s.tx, decode=False) crypto.verify_cose_with_receipt(s.claim, trust_store, receipt) - def test_get_claim(self, client, trust_store, submissions): + def test_get_claim(self, client: Client, trust_store, submissions): for s in submissions: claim = client.get_claim(s.tx) crypto.verify_cose_with_receipt(claim, trust_store, s.receipt) - def test_get_claim_with_embedded_receipt(self, client, trust_store, submissions): + def test_get_claim_with_embedded_receipt( + self, client: Client, trust_store, submissions + ): for s in submissions: claim = client.get_claim(s.tx, embed_receipt=True) crypto.verify_cose_with_receipt(claim, trust_store)