Skip to content

Commit

Permalink
HSMSigner: Implement token identification
Browse files Browse the repository at this point in the history
I bought a second yubikey so need a way to tell them apart... so:
Define a URI that contains identifying information for the HSM token:
 * URI path is the keyid
 * URI query contains token field filters

Example URI: "hsm:2?label=YubiKey+PIV+%2315835999"

This would use keyid 2 from a token with label
"YubiKey+PIV+%2315835999". The example is also what gets automatically
created on HSMSigner.import_(). Other fields can also be used -- I
believe there is no standard for these so this seemed sensible.

Running import_() now fails if there are more than 1 tokens (but a
filter can be provided there as well). Because of this the tests
needed some changes (softhsm creates a new token in InitToken???) --
unfortunately this means the default import_() filter is not
tested as I couldn't figure out how to remove the extra softhsm token.

import_() stays backwards compatible, and old URIs keep working.
The constructor has a new required argument (this could be fixed but I
didn't see it as that important).
  • Loading branch information
jku committed Mar 4, 2023
1 parent 7973b12 commit bc30a6b
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 16 deletions.
93 changes: 80 additions & 13 deletions securesystemslib/signer/_hsm_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""
import binascii
from contextlib import contextmanager
from typing import Optional, Tuple
from typing import Dict, Iterator, List, Optional, Tuple
from urllib import parse

from securesystemslib import KEY_TYPE_ECDSA
Expand Down Expand Up @@ -120,7 +120,11 @@ def pin_handler(secret: str) -> str:
SECRETS_HANDLER_MSG = "pin"

def __init__(
self, hsm_keyid: int, public_key: Key, pin_handler: SecretsHandler
self,
hsm_keyid: int,
token_filter: Dict[str, str],
public_key: Key,
pin_handler: SecretsHandler,
):
if CRYPTO_IMPORT_ERROR:
raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR)
Expand All @@ -133,19 +137,40 @@ def __init__(

self._mechanism = _MECHANISM_FOR_SCHEME[public_key.scheme]
self.hsm_keyid = hsm_keyid
self.token_filter = token_filter
self.public_key = public_key
self.pin_handler = pin_handler

@staticmethod
@contextmanager
def _default_session():
"""Context manager to handle default HSM session on reader slot 1."""
def _get_session(filters: Dict[str, str]) -> Iterator["PyKCS11.Session"]:
"""Context manager to handle a HSM session.
The slot/token is selected by filtering by token info fields.
ValueError is raised if not matching slot/token is not found, or if
more than one are found.
"""
lib = PYKCS11LIB()
slots = lib.getSlotList(tokenPresent=True)
if not slots:
raise ValueError("could not find token")
slots: List[int] = lib.getSlotList(tokenPresent=True)
matching_slots: List[int] = []
for slot in slots:
tokeninfo = lib.getTokenInfo(slot)
match = True
# all values in filters must match token fields
for key, value in filters.items():
tokenvalue: str = getattr(tokeninfo, key, "").strip()
if tokenvalue != value:
match = False

if match:
matching_slots.append(slot)

if len(matching_slots) != 1:
raise ValueError(
f"Found {len(matching_slots)} slots/tokens matching filter {filters}"
)

session = lib.openSession(slots[0])
session = lib.openSession(matching_slots[0])
try:
yield session

Expand Down Expand Up @@ -192,15 +217,49 @@ def _find_key_values(
return ECDomainParameters.load(bytes(params)), bytes(point)

@classmethod
def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]:
def _build_token_filter(cls) -> Dict[str, str]:
"""Builds a token filter for the found slot/token.
The filter will include 'label' if one is found on token.
raises ValueError if less or more than 1 token/slot is found
"""

lib = PYKCS11LIB()
slots: List[int] = lib.getSlotList(tokenPresent=True)
if len(slots) != 1:
raise ValueError(f"Expected 1 token/slot, found {len(slots)}")
filters = {}
tokeninfo = lib.getTokenInfo(slots[0])
# other possible fields include manufacturerID, model and serialNumber
for key in ["label"]:
try:
filters[key] = getattr(tokeninfo, key).strip()
except AttributeError:
pass

return filters

@classmethod
def import_(
cls,
hsm_keyid: Optional[int] = None,
token_filter: Optional[Dict[str, str]] = None,
) -> Tuple[str, SSlibKey]:
"""Import public key and signer details from HSM.
Either only one token/slot must be present when importing or a
token_filter must be provided.
Returns a private key URI (for Signer.from_priv_key_uri()) and a public
key. import_() should be called once and the returned URI and public
key should be stored for later use.
Arguments:
hsm_keyid: Key identifier on the token. Default is 2 (meaning PIV key slot 9c).
token_filter: Dictionary of token field names and values used to
filter the correct slot/token. If no filter is provided one is built
from the fields found on the token.
Raises:
UnsupportedLibraryError: ``PyKCS11``, ``cryptography`` or ``asn1crypto``
Expand All @@ -221,7 +280,12 @@ def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]:
if hsm_keyid is None:
hsm_keyid = cls.SCHEME_KEYID

with cls._default_session() as session:
if token_filter is None:
token_filter = cls._build_token_filter()

uri = f"{cls.SCHEME}:{hsm_keyid}?{parse.urlencode(token_filter)}"

with cls._get_session(token_filter) as session:
params, point = cls._find_key_values(session, hsm_keyid)

if params.chosen.native not in _CURVE_NAMES:
Expand All @@ -247,7 +311,7 @@ def import_(cls, hsm_keyid: Optional[int] = None) -> Tuple[str, SSlibKey]:
keyid = _get_keyid(KEY_TYPE_ECDSA, scheme, keyval)
key = SSlibKey(keyid, KEY_TYPE_ECDSA, scheme, keyval)

return "hsm:", key
return uri, key

@classmethod
def from_priv_key_uri(
Expand All @@ -264,10 +328,13 @@ def from_priv_key_uri(
if uri.scheme != cls.SCHEME:
raise ValueError(f"HSMSigner does not support {priv_key_uri}")

keyid = int(uri.path) if uri.path else cls.SCHEME_KEYID
token_filter = dict(parse.parse_qsl(uri.query))

if secrets_handler is None:
raise ValueError("HSMSigner requires a secrets handler")

return cls(cls.SCHEME_KEYID, public_key, secrets_handler)
return cls(keyid, token_filter, public_key, secrets_handler)

def sign(self, payload: bytes) -> Signature:
"""Signs payload with Hardware Security Module (HSM).
Expand All @@ -283,7 +350,7 @@ def sign(self, payload: bytes) -> Signature:
Signature.
"""
pin = self.pin_handler(self.SECRETS_HANDLER_MSG)
with self._default_session() as session:
with self._get_session(self.token_filter) as session:
session.login(pin)
key = self._find_key(
session, self.hsm_keyid, PyKCS11.CKO_PRIVATE_KEY
Expand Down
13 changes: 10 additions & 3 deletions tests/test_hsm_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ def setUpClass(cls):
slot = lib.getSlotList(tokenPresent=True)[0]
lib.initToken(slot, so_pin, token_label)

# suddenly there are two slots (why?), use label to filter in tests
tokeninfo = lib.getTokenInfo(slot)
cls.token_filter = {"label": getattr(tokeninfo, "label")}
print(cls.token_filter)

session = PYKCS11LIB().openSession(slot, PyKCS11.CKF_RW_SESSION)
session.login(so_pin, PyKCS11.CKU_SO)
session.initPin(cls.hsm_user_pin)
Expand All @@ -138,8 +143,10 @@ def test_hsm(self):
"""Test HSM key export and signing."""

for hsm_keyid in [self.hsm_keyid, self.hsm_keyid_default]:
_, key = HSMSigner.import_(hsm_keyid)
signer = HSMSigner(hsm_keyid, key, lambda sec: self.hsm_user_pin)
_, key = HSMSigner.import_(hsm_keyid, self.token_filter)
signer = HSMSigner(
hsm_keyid, self.token_filter, key, lambda sec: self.hsm_user_pin
)
sig = signer.sign(b"DATA")
key.verify_signature(sig, b"DATA")

Expand All @@ -149,7 +156,7 @@ def test_hsm(self):
def test_hsm_uri(self):
"""Test HSM default key export and signing from URI."""

uri, key = HSMSigner.import_(self.hsm_keyid_default)
uri, key = HSMSigner.import_(self.hsm_keyid_default, self.token_filter)
signer = Signer.from_priv_key_uri(
uri, key, lambda sec: self.hsm_user_pin
)
Expand Down

0 comments on commit bc30a6b

Please sign in to comment.