diff --git a/securesystemslib/signer/_hsm_signer.py b/securesystemslib/signer/_hsm_signer.py index fc83f01f..c4cd181a 100644 --- a/securesystemslib/signer/_hsm_signer.py +++ b/securesystemslib/signer/_hsm_signer.py @@ -6,7 +6,7 @@ """ import binascii from contextlib import contextmanager -from typing import Optional, Tuple +from typing import Dict, Iterator, List, Optional, Tuple from urllib import parse from securesystemslib import KEY_TYPE_ECDSA @@ -83,26 +83,35 @@ class HSMSigner(Signer): Supports signing schemes "ecdsa-sha2-nistp256" and "ecdsa-sha2-nistp384". - HSMSigner uses the first token it finds, if multiple tokens are available. They can - be instantiated with Signer.from_priv_key_uri(). These private key URI schemes are - supported: + HSMSigners should be instantiated with Signer.from_priv_key_uri() as in the usage + example below. + The private key URI scheme is: "hsm:?" where both KEYID and + FILTERS are optional. Example URIs: * "hsm:": - Sign with key on PIV digital signature slot 9c. + Sign with a key with default keyid 2 (PIV digital signature slot 9c) on the + only token/smartcard available. + * "hsm:2?label=YubiKey+PIV+%2315835999": + Sign with key with keyid 2 (PIV slot 9c) on a token with label + "YubiKey+PIV+%2315835999" Usage:: - # sign with PIV slot 9c, verify with existing public key + # Store public key and URI for your HSM device for later use. By default + # slot 9c is selected. + uri, pubkey = HSMSigner.import_() + + # later, use the uri and pubkey to sign def pin_handler(secret: str) -> str: return getpass(f"Enter {secret}: ") - signer = Signer.from_priv_key_uri("hsm:", public_key, pin_handler) + signer = Signer.from_priv_key_uri(uri, pubkey, pin_handler) sig = signer.sign(b"DATA") - - public_key.verify_signature(sig, b"DATA") + pubkey.verify_signature(sig, b"DATA") Arguments: hsm_keyid: Key identifier on the token. + token_filter: Dictionary of token field names and values public_key: The related public key instance. pin_handler: A function that returns the HSM user login pin, needed for signing. It receives the string argument "pin". @@ -120,7 +129,11 @@ def pin_handler(secret: str) -> str: SECRETS_HANDLER_MSG = "pin" def __init__( - self, hsm_keyid: int, public_key: Key, pin_handler: SecretsHandler + self, + hsm_keyid: int, + token_filter: Dict[str, str], + public_key: Key, + pin_handler: SecretsHandler, ): if CRYPTO_IMPORT_ERROR: raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR) @@ -133,22 +146,54 @@ def __init__( self._mechanism = _MECHANISM_FOR_SCHEME[public_key.scheme] self.hsm_keyid = hsm_keyid + self.token_filter = token_filter self.public_key = public_key self.pin_handler = pin_handler @staticmethod - @contextmanager - def _default_session(): - """Context manager to handle default HSM session on reader slot 1.""" + def _find_pkcs_slot(filters: Dict[str, str]) -> int: + """Return the PKCS slot with initialized token that matches filter + + Raises ValueError if more or less than 1 PKCS slot is found. + """ lib = PYKCS11LIB() - slots = lib.getSlotList(tokenPresent=True) - if not slots: - raise ValueError("could not find token") + slots: List[int] = [] + for slot in lib.getSlotList(tokenPresent=True): + tokeninfo = lib.getTokenInfo(slot) + if not tokeninfo.flags & PyKCS11.CKF_TOKEN_INITIALIZED: + # useful for tests (softhsm always has an unitialized token) + continue + + match = True + # all values in filters must match token fields + for key, value in filters.items(): + tokenvalue: str = getattr(tokeninfo, key, "").strip() + if tokenvalue != value: + match = False + + if match: + slots.append(slot) + + if len(slots) != 1: + raise ValueError( + f"Found {len(slots)} cryptographic tokens matching filter {filters}" + ) - session = lib.openSession(slots[0]) + return slots[0] + + @staticmethod + @contextmanager + def _get_session(filters: Dict[str, str]) -> Iterator["PyKCS11.Session"]: + """Context manager to handle a HSM session. + + The cryptographic token is selected by filtering by token info fields. + ValueError is raised if matching token is not found, or if more + than one are found. + """ + slot = HSMSigner._find_pkcs_slot(filters) + session = PYKCS11LIB().openSession(slot) try: yield session - finally: session.closeSession() @@ -192,22 +237,54 @@ def _find_key_values( return ECDomainParameters.load(bytes(params)), bytes(point) @classmethod - def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]: + def _build_token_filter(cls) -> Dict[str, str]: + """Builds a token filter for the found cryptographic token. + + The filter will include 'label' if one is found on token. + + raises ValueError if less or more than 1 token is found + """ + + lib = PYKCS11LIB() + slot = cls._find_pkcs_slot({}) + tokeninfo = lib.getTokenInfo(slot) + + filters = {} + # other possible fields include manufacturerID, model and serialNumber + for key in ["label"]: + try: + filters[key] = getattr(tokeninfo, key).strip() + except AttributeError: + pass + + return filters + + @classmethod + def import_( + cls, + hsm_keyid: Optional[int] = None, + token_filter: Optional[Dict[str, str]] = None, + ) -> Tuple[str, SSlibKey]: """Import public key and signer details from HSM. + Either only one cryptographic token must be present when importing or a + token_filter that matches a single token must be provided. + Returns a private key URI (for Signer.from_priv_key_uri()) and a public key. import_() should be called once and the returned URI and public key should be stored for later use. Arguments: hsm_keyid: Key identifier on the token. Default is 2 (meaning PIV key slot 9c). + token_filter: Dictionary of token field names and values used to + filter the correct cryptographic token. If no filter is + provided one is built from the fields found on the token. Raises: UnsupportedLibraryError: ``PyKCS11``, ``cryptography`` or ``asn1crypto`` libraries not found. - ValueError: No compatible key for ``hsm_keyid`` found on HSM. + ValueError: A matching HSM device could not be found. PyKCS11.PyKCS11Error: Various HSM communication errors. - """ if CRYPTO_IMPORT_ERROR: raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR) @@ -221,7 +298,12 @@ def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]: if hsm_keyid is None: hsm_keyid = cls.SCHEME_KEYID - with cls._default_session() as session: + if token_filter is None: + token_filter = cls._build_token_filter() + + uri = f"{cls.SCHEME}:{hsm_keyid}?{parse.urlencode(token_filter)}" + + with cls._get_session(token_filter) as session: params, point = cls._find_key_values(session, hsm_keyid) if params.chosen.native not in _CURVE_NAMES: @@ -247,7 +329,7 @@ def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]: keyid = _get_keyid(KEY_TYPE_ECDSA, scheme, keyval) key = SSlibKey(keyid, KEY_TYPE_ECDSA, scheme, keyval) - return "hsm:", key + return uri, key @classmethod def from_priv_key_uri( @@ -264,10 +346,13 @@ def from_priv_key_uri( if uri.scheme != cls.SCHEME: raise ValueError(f"HSMSigner does not support {priv_key_uri}") + keyid = int(uri.path) if uri.path else cls.SCHEME_KEYID + token_filter = dict(parse.parse_qsl(uri.query)) + if secrets_handler is None: raise ValueError("HSMSigner requires a secrets handler") - return cls(cls.SCHEME_KEYID, public_key, secrets_handler) + return cls(keyid, token_filter, public_key, secrets_handler) def sign(self, payload: bytes) -> Signature: """Signs payload with Hardware Security Module (HSM). @@ -283,7 +368,7 @@ def sign(self, payload: bytes) -> Signature: Signature. """ pin = self.pin_handler(self.SECRETS_HANDLER_MSG) - with self._default_session() as session: + with self._get_session(self.token_filter) as session: session.login(pin) key = self._find_key( session, self.hsm_keyid, PyKCS11.CKO_PRIVATE_KEY diff --git a/tests/test_hsm_signer.py b/tests/test_hsm_signer.py index 6d7cd859..00a4b8e9 100644 --- a/tests/test_hsm_signer.py +++ b/tests/test_hsm_signer.py @@ -114,6 +114,9 @@ def setUpClass(cls): slot = lib.getSlotList(tokenPresent=True)[0] lib.initToken(slot, so_pin, token_label) + tokeninfo = lib.getTokenInfo(slot) + cls.token_filter = {"label": getattr(tokeninfo, "label")} + session = PYKCS11LIB().openSession(slot, PyKCS11.CKF_RW_SESSION) session.login(so_pin, PyKCS11.CKU_SO) session.initPin(cls.hsm_user_pin) @@ -138,8 +141,10 @@ def test_hsm(self): """Test HSM key export and signing.""" for hsm_keyid in [self.hsm_keyid, self.hsm_keyid_default]: - _, key = HSMSigner.import_(hsm_keyid) - signer = HSMSigner(hsm_keyid, key, lambda sec: self.hsm_user_pin) + _, key = HSMSigner.import_(hsm_keyid, self.token_filter) + signer = HSMSigner( + hsm_keyid, self.token_filter, key, lambda sec: self.hsm_user_pin + ) sig = signer.sign(b"DATA") key.verify_signature(sig, b"DATA") @@ -149,7 +154,18 @@ def test_hsm(self): def test_hsm_uri(self): """Test HSM default key export and signing from URI.""" - uri, key = HSMSigner.import_(self.hsm_keyid_default) + # default import + uri, key = HSMSigner.import_() + signer = Signer.from_priv_key_uri( + uri, key, lambda sec: self.hsm_user_pin + ) + sig = signer.sign(b"DATA") + key.verify_signature(sig, b"DATA") + with self.assertRaises(UnverifiedSignatureError): + key.verify_signature(sig, b"NOT DATA") + + # Import with specified values + uri, key = HSMSigner.import_(self.hsm_keyid_default, self.token_filter) signer = Signer.from_priv_key_uri( uri, key, lambda sec: self.hsm_user_pin )