diff --git a/.github/workflows/hsm.yml b/.github/workflows/hsm.yml new file mode 100644 index 000000000..7cdf497af --- /dev/null +++ b/.github/workflows/hsm.yml @@ -0,0 +1,59 @@ +name: Run Securesystemslib HSM tests + +on: + push: + branches: + - master + pull_request: + workflow_dispatch: + +permissions: {} + +jobs: + build: + strategy: + fail-fast: false + matrix: + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11"] + os: [ubuntu-latest, macos-latest, windows-latest] + + runs-on: ${{ matrix.os }} + + steps: + - name: Checkout securesystemslib + uses: actions/checkout@93ea575cb5d8a053eaa0ac8fa3b40d7e05a33cc8 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@13ae5bb136fac2878aff31522b9efb785519f984 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + cache-dependency-path: "requirements*.txt" + + - name: Install system dependencies + shell: bash + run: | + if [ "$RUNNER_OS" == "Linux" ]; then + sudo apt-get install -y softhsm2 + echo "PYKCS11LIB=/usr/lib/softhsm/libsofthsm2.so" >> $GITHUB_ENV + + elif [ "$RUNNER_OS" == "macOS" ]; then + brew install softhsm + echo "PYKCS11LIB=$(brew --prefix softhsm)/lib/softhsm/libsofthsm2.so" >> $GITHUB_ENV + + elif [ "$RUNNER_OS" == "Windows" ]; then + choco install softhsm.install + echo "PYKCS11LIB=C:\SoftHSM2\lib\softhsm2-x64.dll" >> $GITHUB_ENV + + else + echo "$RUNNER_OS not supported" + exit 1 + fi + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install --upgrade tox + + - name: Run tox + run: tox -e hsm diff --git a/mypy.ini b/mypy.ini index 21fa9ec39..cbe4e0f72 100644 --- a/mypy.ini +++ b/mypy.ini @@ -12,4 +12,11 @@ follow_imports = silent # let's not install typeshed annotations for GCPSigner [mypy-google.*] -ignore_missing_imports = True \ No newline at end of file +ignore_missing_imports = True + +# Suppress error messages for non-annotating dependencies +[mypy-PyKCS11.*] +ignore_missing_imports = True + +[mypy-asn1crypto.*] +ignore_missing_imports = True diff --git a/pylintrc b/pylintrc index 6539d77f4..951650497 100644 --- a/pylintrc +++ b/pylintrc @@ -51,3 +51,4 @@ check-quote-consistency=yes [TYPECHECK] generated-members=shake_128s.* +ignored-modules=PyKCS11 diff --git a/pyproject.toml b/pyproject.toml index 68a851f7e..0a5cf4690 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,7 @@ crypto = ["cryptography>=37.0.0"] gcpkms = ["google-cloud-kms"] pynacl = ["pynacl>1.2.0"] PySPX = ["PySPX==0.5.0"] +hsm = ["asn1crypto", "cryptography", "PyKCS11"] [tool.setuptools] include-package-data = true diff --git a/requirements-hsm-pinned.txt b/requirements-hsm-pinned.txt new file mode 100644 index 000000000..ec7ae8da4 --- /dev/null +++ b/requirements-hsm-pinned.txt @@ -0,0 +1,16 @@ +# +# This file is autogenerated by pip-compile with python 3.8 +# To update, run: +# +# pip-compile --output-file=requirements-hsm-pinned.txt requirements-hsm.txt +# +asn1crypto==1.5.1 + # via -r requirements-hsm.txt +cffi==1.15.1 + # via cryptography +cryptography==38.0.4 + # via -r requirements-hsm.txt +pycparser==2.21 + # via cffi +pykcs11==1.5.11 + # via -r requirements-hsm.txt diff --git a/requirements-hsm.txt b/requirements-hsm.txt new file mode 100644 index 000000000..a8c6e9d1f --- /dev/null +++ b/requirements-hsm.txt @@ -0,0 +1,3 @@ +asn1crypto +cryptography +PyKCS11 diff --git a/securesystemslib/signer/__init__.py b/securesystemslib/signer/__init__.py index dfbdf55f8..4b59aaded 100644 --- a/securesystemslib/signer/__init__.py +++ b/securesystemslib/signer/__init__.py @@ -5,11 +5,17 @@ Some implementations are provided by default but more can be added by users. """ from securesystemslib.signer._gcp_signer import GCPSigner -from securesystemslib.signer._key import KEY_FOR_TYPE_AND_SCHEME, Key, SSlibKey +from securesystemslib.signer._key import ( + KEY_FOR_TYPE_AND_SCHEME, + HSMKey, + Key, + SSlibKey, +) from securesystemslib.signer._signature import GPGSignature, Signature from securesystemslib.signer._signer import ( SIGNER_FOR_URI_SCHEME, GPGSigner, + HSMSigner, SecretsHandler, Signer, SSlibSigner, diff --git a/securesystemslib/signer/_key.py b/securesystemslib/signer/_key.py index 24508d781..2b782b3a1 100644 --- a/securesystemslib/signer/_key.py +++ b/securesystemslib/signer/_key.py @@ -4,9 +4,39 @@ from typing import Any, Dict, Optional, Tuple, Type import securesystemslib.keys as sslib_keys -from securesystemslib import exceptions +from securesystemslib import KEY_TYPE_ECDSA, exceptions from securesystemslib.signer._signature import Signature +# pylint: disable=wrong-import-position +CRYPTO_IMPORT_ERROR = None +try: + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric.ec import ( + SECP256R1, + SECP384R1, + EllipticCurvePublicKey, + ObjectIdentifier, + get_curve_for_oid, + ) + +except ImportError: # pragma: no cover + CRYPTO_IMPORT_ERROR = "'cryptography' required" + +PYKCS11_IMPORT_ERROR = None +try: + from PyKCS11 import PyKCS11 + +except ImportError: # pragma: no cover + PYKCS11_IMPORT_ERROR = "'PyKCS11' required" + +ASN1CRYPTO_IMPORT_ERROR = None +try: + from asn1crypto.keys import ECDomainParameters, ECPoint + +except ImportError: # pragma: no cover + ASN1CRYPTO_IMPORT_ERROR = "'asn1crypto' required" +# pylint: enable=wrong-import-position + logger = logging.getLogger(__name__) # NOTE Key dispatch table is defined here so it's usable by Key, @@ -180,3 +210,91 @@ def verify_signature(self, signature: Signature, data: bytes) -> None: raise exceptions.VerificationError( f"Unknown failure to verify signature by {self.keyid}" ) from e + + +class HSMKey(SSlibKey): + """Hardware Security Module (HSM) Key + + HSMKey is a regular SSlibKey with an additional `from_hsm` method to + export public keys from hardware security modules. + """ + + @classmethod + def from_hsm( + cls, + hsm_session: "PyKCS11.Session", + hsm_keyid: Tuple[int, ...], + keyid: str, + ): + """Export public key from HSM + + Supports ecdsa on SECG curves secp256r1 (NIST P-256) or secp384r1 (NIST P-384). + + Arguments: + hsm_session: An open ``PyKCS11.Session`` to the token with the public key. + hsm_keyid: Key identifier on the token. + keyid: Key identifier that is unique within the metadata it is used in. + + Raises: + ValueError: No compatible key for ``hsm_keyid`` found on HSM. + PyKCS11.PyKCS11Error: Various HSM communication errors. + + """ + if CRYPTO_IMPORT_ERROR: + raise exceptions.UnsupportedLibraryError(CRYPTO_IMPORT_ERROR) + + if PYKCS11_IMPORT_ERROR: + raise exceptions.UnsupportedLibraryError(PYKCS11_IMPORT_ERROR) + + # Search for ecdsa public keys with passed keyid on HSM + keys = hsm_session.findObjects( + [ + (PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY), + (PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_ECDSA), + (PyKCS11.CKA_ID, hsm_keyid), + ] + ) + + if len(keys) != 1: + raise ValueError( + f"hsm_keyid must identify one {KEY_TYPE_ECDSA} key, found {len(keys)}" + ) + + # Extract public key domain parameters and point from HSM + hsm_params, hsm_point = hsm_session.getAttributeValue( + keys[0], [PyKCS11.CKA_EC_PARAMS, PyKCS11.CKA_EC_POINT] + ) + + params = ECDomainParameters.load(bytes(hsm_params)) + + # TODO: Define as module level constant and don't hardcode scheme strings + scheme_for_curve = { + SECP256R1: "ecdsa-sha2-nistp256", + SECP384R1: "ecdsa-sha2-nistp384", + } + curve_names = [curve.name for curve in scheme_for_curve] + + if params.chosen.native not in curve_names: + raise ValueError( + f"found key on {params.chosen.native}, should be on one of {curve_names}" + ) + + # Create PEM from key + curve = get_curve_for_oid(ObjectIdentifier(params.chosen.dotted)) + public_pem = ( + EllipticCurvePublicKey.from_encoded_point( + curve(), ECPoint().load(bytes(hsm_point)).native + ) + .public_bytes( + serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo, + ) + .decode() + ) + + return HSMKey( + keyid, + KEY_TYPE_ECDSA, + scheme_for_curve[curve], + {"public": public_pem}, + ) diff --git a/securesystemslib/signer/_signer.py b/securesystemslib/signer/_signer.py index fc81aa372..6f494bdcc 100644 --- a/securesystemslib/signer/_signer.py +++ b/securesystemslib/signer/_signer.py @@ -1,13 +1,33 @@ """Signer interface and the default implementations""" import abc +import binascii import logging import os -from typing import Callable, Dict, Optional, Type +from typing import Callable, Dict, Optional, Tuple, Type from urllib import parse +# pylint: disable=wrong-import-position +CRYPTO_IMPORT_ERROR = None +try: + from cryptography.hazmat.primitives.asymmetric.utils import ( + encode_dss_signature, + ) +except ImportError: # pragma: no cover + CRYPTO_IMPORT_ERROR = "'cryptography' required" + +PYKCS11_IMPORT_ERROR = None +try: + from PyKCS11 import PyKCS11 + +except ImportError: # pragma: no cover + PYKCS11_IMPORT_ERROR = "'PyKCS11' required" +# pylint: enable=wrong-import-position + import securesystemslib.gpg.functions as gpg import securesystemslib.keys as sslib_keys +from securesystemslib import KEY_TYPE_ECDSA +from securesystemslib.exceptions import UnsupportedLibraryError from securesystemslib.signer._key import Key, SSlibKey from securesystemslib.signer._signature import GPGSignature, Signature @@ -273,3 +293,100 @@ def sign(self, payload: bytes) -> GPGSignature: sig_dict = gpg.create_signature(payload, self.keyid, self.homedir) return GPGSignature(**sig_dict) + + +class HSMSigner(Signer): + """Hardware Security Module (HSM) Signer. + + HSMSigner uses the PKCS#11/Cryptoki API to sign on an HSM (e.g. YubiKey). It + supports ecdsa on SECG curves secp256r1 (NIST P-256) or secp384r1 (NIST P-384). + + Arguments: + hsm_session: An open and logged-in ``PyKCS11.Session`` to the token with the + private key. + hsm_keyid: Key identifier on the token. + public_key: The related public key instance. + + Raises: + UnsupportedLibraryError: ``PyKCS11`` and ``cryptography`` libraries not found. + ValueError: ``public_key.scheme`` not supported. + """ + + def __init__( + self, + hsm_session: "PyKCS11.Session", + hsm_keyid: Tuple[int, ...], + public_key: Key, + ): + if CRYPTO_IMPORT_ERROR: + raise UnsupportedLibraryError(CRYPTO_IMPORT_ERROR) + + if PYKCS11_IMPORT_ERROR: + raise UnsupportedLibraryError(PYKCS11_IMPORT_ERROR) + + # TODO: Define as module level constant and don't hardcode scheme strings + supported_schemes = { + "ecdsa-sha2-nistp256": PyKCS11.Mechanism(PyKCS11.CKM_ECDSA_SHA256), + "ecdsa-sha2-nistp384": PyKCS11.Mechanism(PyKCS11.CKM_ECDSA_SHA384), + } + + if public_key.scheme not in supported_schemes: + raise ValueError(f"unsupported scheme {public_key.scheme}") + + self._mechanism = supported_schemes[public_key.scheme] + self.hsm_session = hsm_session + self.hsm_keyid = hsm_keyid + self.public_key = public_key + + @classmethod + def from_priv_key_uri( + cls, + priv_key_uri: str, + public_key: Key, + secrets_handler: Optional[SecretsHandler] = None, + ) -> "HSMSigner": + raise NotImplementedError("Incompatible with private key URIs") + + def sign(self, payload: bytes) -> Signature: + """Signs payload with Hardware Security Module (HSM). + + Arguments: + payload: bytes to be signed. + + Raises: + ValueError: No compatible key for ``hsm_keyid`` found on HSM. + PyKCS11.PyKCS11Error: Various HSM communication errors. + + Returns: + Signature. + """ + + # Search for ecdsa public keys with passed keyid on HSM + keys = self.hsm_session.findObjects( + [ + (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY), + (PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_ECDSA), + (PyKCS11.CKA_ID, self.hsm_keyid), + ] + ) + if len(keys) != 1: + raise ValueError( + f"hsm_keyid must identify one {KEY_TYPE_ECDSA} key, found {len(keys)}" + ) + + signature = self.hsm_session.sign(keys[0], payload, self._mechanism) + + # The PKCS11 signature octets correspond to the concatenation of the ECDSA + # values r and s, both represented as an octet string of equal length of at + # most nLen with the most significant byte first (i.e. big endian) + # https://docs.oasis-open.org/pkcs11/pkcs11-curr/v3.0/cs01/pkcs11-curr-v3.0-cs01.html#_Toc30061178 + r_s_len = int(len(signature) / 2) + r = int.from_bytes(signature[:r_s_len], byteorder="big") + s = int.from_bytes(signature[r_s_len:], byteorder="big") + + # Create an ASN.1 encoded Dss-Sig-Value to be used with pyca/cryptography + dss_sig_value = binascii.hexlify(encode_dss_signature(r, s)).decode( + "ascii" + ) + + return Signature(self.public_key.keyid, dss_sig_value) diff --git a/tests/check_hsm_signer.py b/tests/check_hsm_signer.py new file mode 100644 index 000000000..06605a570 --- /dev/null +++ b/tests/check_hsm_signer.py @@ -0,0 +1,156 @@ +#!/usr/bin/env python +"""Test HSMSigner +""" +import os +import shutil +import tempfile +import unittest + +from asn1crypto.keys import ( # pylint: disable=import-error + ECDomainParameters, + NamedCurve, +) +from cryptography.hazmat.primitives.asymmetric.ec import SECP256R1, SECP384R1 +from PyKCS11 import PyKCS11 + +import securesystemslib.hash +from securesystemslib.signer import HSMKey, HSMSigner + + +class TestHSM(unittest.TestCase): + """Test HSMSigner and HSMKey with SoftHSM + + Requirements: + - install SoftHSM2 + - set environment variable ``PYKCS11LIB`` to SoftHSM library path + + See .github/workflows/hsm.yml for how this can be done on Linux, macOS and Windows. + """ + + hsm_user_pin = "1234" + + @classmethod + def setUpClass(cls): + """Initialize SoftHSM token and generate ecdsa test keys""" + + # Configure SoftHSM to create test token in temporary test directory + cls.original_cwd = os.getcwd() + cls.test_dir = os.path.realpath(tempfile.mkdtemp()) + os.chdir(cls.test_dir) + + with open("softhsm2.conf", "w", encoding="utf-8") as f: + f.write("directories.tokendir = " + os.path.join(cls.test_dir, "")) + + os.environ["SOFTHSM2_CONF"] = os.path.join( + cls.test_dir, "softhsm2.conf" + ) + + # Initialize test token + cls.pkcs11 = PyKCS11.PyKCS11Lib() + cls.pkcs11.load() + hsm_token_label = "Test SoftHSM" + hsm_so_pin = "abcd" + + hsm_slot_id = cls.pkcs11.getSlotList(tokenPresent=True)[0] + cls.pkcs11.initToken(hsm_slot_id, hsm_so_pin, hsm_token_label) + + session = cls.pkcs11.openSession(hsm_slot_id, PyKCS11.CKF_RW_SESSION) + session.login(hsm_so_pin, PyKCS11.CKU_SO) + session.initPin(cls.hsm_user_pin) + session.logout() + + session.login(cls.hsm_user_pin) + + # Generate test ecdsa key pairs for curves secp256r1 and secp384r1 on test token + cls.hsm_keyids = [] + for keyid, curve in ( + ((0,), SECP256R1), + ((1,), SECP384R1), + ): + + params = ECDomainParameters( + name="named", value=NamedCurve(curve.name) + ).dump() + + public_template = [ + (PyKCS11.CKA_CLASS, PyKCS11.CKO_PUBLIC_KEY), + (PyKCS11.CKA_PRIVATE, PyKCS11.CK_FALSE), + (PyKCS11.CKA_TOKEN, PyKCS11.CK_TRUE), + (PyKCS11.CKA_ENCRYPT, PyKCS11.CK_FALSE), + (PyKCS11.CKA_VERIFY, PyKCS11.CK_TRUE), + (PyKCS11.CKA_WRAP, PyKCS11.CK_FALSE), + (PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_ECDSA), + (PyKCS11.CKA_EC_PARAMS, params), + (PyKCS11.CKA_LABEL, curve.name), + (PyKCS11.CKA_ID, keyid), + ] + private_template = [ + (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY), + (PyKCS11.CKA_KEY_TYPE, PyKCS11.CKK_ECDSA), + (PyKCS11.CKA_TOKEN, PyKCS11.CK_TRUE), + (PyKCS11.CKA_SENSITIVE, PyKCS11.CK_TRUE), + (PyKCS11.CKA_DECRYPT, PyKCS11.CK_FALSE), + (PyKCS11.CKA_SIGN, PyKCS11.CK_TRUE), + (PyKCS11.CKA_UNWRAP, PyKCS11.CK_FALSE), + (PyKCS11.CKA_LABEL, curve.name), + (PyKCS11.CKA_ID, keyid), + ] + + session.generateKeyPair( + public_template, + private_template, + mecha=PyKCS11.MechanismECGENERATEKEYPAIR, + ) + + cls.hsm_keyids.append(keyid) + + session.logout() + session.closeSession() + + @classmethod + def tearDownClass(cls): + os.chdir(cls.original_cwd) + shutil.rmtree(cls.test_dir) + del os.environ["SOFTHSM2_CONF"] + + def test_hsm(self): + """Test public key export, HSM signing, and verification w/o HSM""" + + def _pre_hash(data, scheme): + """Generate hash for scheme (test hack)""" + hasher = securesystemslib.hash.digest(algorithm=f"sha{scheme[-3:]}") + hasher.update(data) + return hasher.digest() + + hsm_slot_id = self.pkcs11.getSlotList(tokenPresent=True)[0] + session = self.pkcs11.openSession(hsm_slot_id) + + keyid = ( + "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef" + ) + data = b"deadbeef" + + for hsm_keyid in self.hsm_keyids: + public_key = HSMKey.from_hsm(session, hsm_keyid, keyid) + + session.login(self.hsm_user_pin) # Login for signing + signer = HSMSigner(session, hsm_keyid, public_key) + + # NOTE: HSMSigner supports CKM_ECDSA_SHA256 and CKM_ECDSA_SHA384 + # mechanisms. But SoftHSM only supports CKM_ECDSA. During testing we + # patch the HSMSigner mechanisms and pre-hash the data ourselves. + signer._mechanism = ( # pylint: disable=protected-access + PyKCS11.Mechanism(PyKCS11.CKM_ECDSA) + ) + sig = signer.sign(_pre_hash(data, public_key.scheme)) + + session.logout() # Logout after signing + + public_key.verify_signature(sig, data) + + session.closeSession() + + +# Run the unit tests. +if __name__ == "__main__": + unittest.main() diff --git a/tox.ini b/tox.ini index 551ae87ca..a9a187e00 100644 --- a/tox.ini +++ b/tox.ini @@ -18,7 +18,7 @@ deps = commands = python -m tests.check_gpg_available coverage run tests/aggregate_tests.py - coverage report -m --fail-under 96 + coverage report -m --fail-under 95 [testenv:purepy311] deps = @@ -42,6 +42,14 @@ passenv = commands = python -m tests.check_kms_signers +[testenv:hsm] +deps = + -r{toxinidir}/requirements-hsm-pinned.txt +passenv = + PYKCS11LIB +commands = + python -m tests.check_hsm_signer + # This checks that importing securesystemslib.gpg.constants doesn't shell out on # import. [testenv:py311-test-gpg-fails]