From c6fbe58382b4a67fb8d86aaec11f89f7445f15ea Mon Sep 17 00:00:00 2001 From: Felix Fontein Date: Mon, 29 Apr 2024 22:29:43 +0200 Subject: [PATCH] Implement certificate information retrieval code in the ACME backends. (#736) --- changelogs/fragments/736-cert-info.yml | 2 + .../module_utils/acme/backend_cryptography.py | 44 +++++++++ .../module_utils/acme/backend_openssl_cli.py | 95 ++++++++++++++++--- plugins/module_utils/acme/backends.py | 26 +++++ plugins/module_utils/crypto/math.py | 16 ++++ .../plugins/module_utils/acme/backend_data.py | 25 +++++ .../module_utils/acme/fixtures/cert_1.txt | 38 ++++++++ .../acme/fixtures/cert_1.txt.license | 3 + .../acme/test_backend_cryptography.py | 28 ++++++ .../acme/test_backend_openssl_cli.py | 30 ++++++ .../plugins/module_utils/crypto/test_math.py | 15 +++ 11 files changed, 309 insertions(+), 13 deletions(-) create mode 100644 changelogs/fragments/736-cert-info.yml create mode 100644 tests/unit/plugins/module_utils/acme/fixtures/cert_1.txt create mode 100644 tests/unit/plugins/module_utils/acme/fixtures/cert_1.txt.license diff --git a/changelogs/fragments/736-cert-info.yml b/changelogs/fragments/736-cert-info.yml new file mode 100644 index 000000000..2b8489b90 --- /dev/null +++ b/changelogs/fragments/736-cert-info.yml @@ -0,0 +1,2 @@ +deprecated_features: + - "acme.backends module utils - the ``get_cert_information()`` method for a ACME crypto backend must be implemented from community.crypto 3.0.0 on (https://github.com/ansible-collections/community.crypto/pull/736)." diff --git a/plugins/module_utils/acme/backend_cryptography.py b/plugins/module_utils/acme/backend_cryptography.py index 0722c1f99..e6e76b80d 100644 --- a/plugins/module_utils/acme/backend_cryptography.py +++ b/plugins/module_utils/acme/backend_cryptography.py @@ -19,6 +19,7 @@ from ansible_collections.community.crypto.plugins.module_utils.version import LooseVersion from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( + CertificateInformation, CryptoBackend, ) @@ -49,7 +50,9 @@ from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( CRYPTOGRAPHY_TIMEZONE, cryptography_name_to_oid, + cryptography_serial_number_of_cert, get_not_valid_after, + get_not_valid_before, ) from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import ( @@ -386,3 +389,44 @@ def create_chain_matcher(self, criterium): Given a Criterium object, creates a ChainMatcher object. ''' return CryptographyChainMatcher(criterium, self.module) + + def get_cert_information(self, cert_filename=None, cert_content=None): + ''' + Return some information on a X.509 certificate as a CertificateInformation object. + ''' + if cert_filename is not None: + cert_content = read_file(cert_filename) + else: + cert_content = to_bytes(cert_content) + + # Make sure we have at most one PEM. Otherwise cryptography 36.0.0 will barf. + cert_content = to_bytes(extract_first_pem(to_text(cert_content)) or '') + + try: + cert = cryptography.x509.load_pem_x509_certificate(cert_content, _cryptography_backend) + except Exception as e: + if cert_filename is None: + raise BackendException('Cannot parse certificate: {0}'.format(e)) + raise BackendException('Cannot parse certificate {0}: {1}'.format(cert_filename, e)) + + ski = None + try: + ext = cert.extensions.get_extension_for_class(cryptography.x509.SubjectKeyIdentifier) + ski = ext.value.digest + except cryptography.x509.ExtensionNotFound: + pass + + aki = None + try: + ext = cert.extensions.get_extension_for_class(cryptography.x509.AuthorityKeyIdentifier) + aki = ext.value.key_identifier + except cryptography.x509.ExtensionNotFound: + pass + + return CertificateInformation( + not_valid_after=get_not_valid_after(cert), + not_valid_before=get_not_valid_before(cert), + serial_number=cryptography_serial_number_of_cert(cert), + subject_key_identifier=ski, + authority_key_identifier=aki, + ) diff --git a/plugins/module_utils/acme/backend_openssl_cli.py b/plugins/module_utils/acme/backend_openssl_cli.py index 9a1ed1f5a..b3d1f73e1 100644 --- a/plugins/module_utils/acme/backend_openssl_cli.py +++ b/plugins/module_utils/acme/backend_openssl_cli.py @@ -20,6 +20,7 @@ from ansible.module_utils.common.text.converters import to_native, to_text, to_bytes from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( + CertificateInformation, CryptoBackend, ) @@ -30,6 +31,8 @@ from ansible_collections.community.crypto.plugins.module_utils.acme.utils import nopad_b64 +from ansible_collections.community.crypto.plugins.module_utils.crypto.math import convert_bytes_to_int + try: import ipaddress except ImportError: @@ -39,6 +42,33 @@ _OPENSSL_ENVIRONMENT_UPDATE = dict(LANG='C', LC_ALL='C', LC_MESSAGES='C', LC_CTYPE='C') +def _extract_date(out_text, name, cert_filename_suffix=""): + try: + date_str = re.search(r"\s+%s\s*:\s+(.*)" % name, out_text).group(1) + return datetime.datetime.strptime(date_str, '%b %d %H:%M:%S %Y %Z') + except AttributeError: + raise BackendException("No '{0}' date found{1}".format(name, cert_filename_suffix)) + except ValueError as exc: + raise BackendException("Failed to parse '{0}' date{1}: {2}".format(name, cert_filename_suffix, exc)) + + +def _decode_octets(octets_text): + return binascii.unhexlify(re.sub(r"(\s|:)", "", octets_text).encode("utf-8")) + + +def _extract_octets(out_text, name, required=True): + match = re.search( + r"\s+%s:\s*\n\s+([A-Fa-f0-9]{2}(?::[A-Fa-f0-9]{2})*)\s*\n" % name, + out_text, + re.MULTILINE | re.DOTALL, + ) + if match is not None: + return _decode_octets(match.group(1)) + if not required: + return None + raise BackendException("No '{0}' octet string found".format(name)) + + class OpenSSLCLIBackend(CryptoBackend): def __init__(self, module, openssl_binary=None): super(OpenSSLCLIBackend, self).__init__(module) @@ -89,10 +119,12 @@ def parse_key(self, key_file=None, key_content=None, passphrase=None): dummy, out, dummy = self.module.run_command( openssl_keydump_cmd, check_rc=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE) + out_text = to_text(out, errors='surrogate_or_strict') + if account_key_type == 'rsa': - pub_hex, pub_exp = re.search( - r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent: ([0-9]+)", - to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL).groups() + pub_hex = re.search(r"modulus:\n\s+00:([a-f0-9\:\s]+?)\npublicExponent", out_text, re.MULTILINE | re.DOTALL).group(1) + + pub_exp = re.search(r"\npublicExponent: ([0-9]+)", out_text, re.MULTILINE | re.DOTALL).group(1) pub_exp = "{0:x}".format(int(pub_exp)) if len(pub_exp) % 2: pub_exp = "0{0}".format(pub_exp) @@ -104,17 +136,19 @@ def parse_key(self, key_file=None, key_content=None, passphrase=None): 'jwk': { "kty": "RSA", "e": nopad_b64(binascii.unhexlify(pub_exp.encode("utf-8"))), - "n": nopad_b64(binascii.unhexlify(re.sub(r"(\s|:)", "", pub_hex).encode("utf-8"))), + "n": nopad_b64(_decode_octets(pub_hex)), }, 'hash': 'sha256', } elif account_key_type == 'ec': pub_data = re.search( r"pub:\s*\n\s+04:([a-f0-9\:\s]+?)\nASN1 OID: (\S+)(?:\nNIST CURVE: (\S+))?", - to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL) + out_text, + re.MULTILINE | re.DOTALL, + ) if pub_data is None: raise KeyParsingError('cannot parse elliptic curve key') - pub_hex = binascii.unhexlify(re.sub(r"(\s|:)", "", pub_data.group(1)).encode("utf-8")) + pub_hex = _decode_octets(pub_data.group(1)) asn1_oid_curve = pub_data.group(2).lower() nist_curve = pub_data.group(3).lower() if pub_data.group(3) else None if asn1_oid_curve == 'prime256v1' or nist_curve == 'p-256': @@ -303,13 +337,8 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None): openssl_cert_cmd = [self.openssl_binary, "x509", "-in", filename, "-noout", "-text"] dummy, out, dummy = self.module.run_command( openssl_cert_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE) - try: - not_after_str = re.search(r"\s+Not After\s*:\s+(.*)", to_text(out, errors='surrogate_or_strict')).group(1) - not_after = datetime.datetime.strptime(not_after_str, '%b %d %H:%M:%S %Y %Z') - except AttributeError: - raise BackendException("No 'Not after' date found{0}".format(cert_filename_suffix)) - except ValueError: - raise BackendException("Failed to parse 'Not after' date{0}".format(cert_filename_suffix)) + out_text = to_text(out, errors='surrogate_or_strict') + not_after = _extract_date(out_text, 'Not After', cert_filename_suffix=cert_filename_suffix) if now is None: now = datetime.datetime.now() return (not_after - now).days @@ -319,3 +348,43 @@ def create_chain_matcher(self, criterium): Given a Criterium object, creates a ChainMatcher object. ''' raise BackendException('Alternate chain matching can only be used with the "cryptography" backend.') + + def get_cert_information(self, cert_filename=None, cert_content=None): + ''' + Return some information on a X.509 certificate as a CertificateInformation object. + ''' + filename = cert_filename + data = None + if cert_filename is not None: + cert_filename_suffix = ' in {0}'.format(cert_filename) + else: + filename = '/dev/stdin' + data = to_bytes(cert_content) + cert_filename_suffix = '' + + openssl_cert_cmd = [self.openssl_binary, "x509", "-in", filename, "-noout", "-text"] + dummy, out, dummy = self.module.run_command( + openssl_cert_cmd, data=data, check_rc=True, binary_data=True, environ_update=_OPENSSL_ENVIRONMENT_UPDATE) + out_text = to_text(out, errors='surrogate_or_strict') + + not_after = _extract_date(out_text, 'Not After', cert_filename_suffix=cert_filename_suffix) + not_before = _extract_date(out_text, 'Not Before', cert_filename_suffix=cert_filename_suffix) + + sn = re.search( + r" Serial Number: ([0-9]+)", + to_text(out, errors='surrogate_or_strict'), re.MULTILINE | re.DOTALL) + if sn: + serial = int(sn.group(1)) + else: + serial = convert_bytes_to_int(_extract_octets(out_text, 'Serial Number', required=True)) + + ski = _extract_octets(out_text, 'X509v3 Subject Key Identifier', required=False) + aki = _extract_octets(out_text, 'X509v3 Authority Key Identifier', required=False) + + return CertificateInformation( + not_valid_after=not_after, + not_valid_before=not_before, + serial_number=serial, + subject_key_identifier=ski, + authority_key_identifier=aki, + ) diff --git a/plugins/module_utils/acme/backends.py b/plugins/module_utils/acme/backends.py index 2d95a3ee3..78ff0f181 100644 --- a/plugins/module_utils/acme/backends.py +++ b/plugins/module_utils/acme/backends.py @@ -9,10 +9,27 @@ __metaclass__ = type +from collections import namedtuple import abc from ansible.module_utils import six +from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ( + BackendException, +) + + +CertificateInformation = namedtuple( + 'CertificateInformation', + ( + 'not_valid_after', + 'not_valid_before', + 'serial_number', + 'subject_key_identifier', + 'authority_key_identifier', + ), +) + @six.add_metaclass(abc.ABCMeta) class CryptoBackend(object): @@ -74,3 +91,12 @@ def create_chain_matcher(self, criterium): ''' Given a Criterium object, creates a ChainMatcher object. ''' + + def get_cert_information(self, cert_filename=None, cert_content=None): + ''' + Return some information on a X.509 certificate as a CertificateInformation object. + ''' + # Not implementing this method in a backend is DEPRECATED and will be + # disallowed in community.crypto 3.0.0. This method will be marked as + # @abstractmethod by then. + raise BackendException('This backend does not support get_cert_information()') diff --git a/plugins/module_utils/crypto/math.py b/plugins/module_utils/crypto/math.py index b329dbe10..1ec43e9f2 100644 --- a/plugins/module_utils/crypto/math.py +++ b/plugins/module_utils/crypto/math.py @@ -110,6 +110,9 @@ def count_bits(no): def _convert_int_to_bytes(count, no): return no.to_bytes(count, byteorder='big') + def _convert_bytes_to_int(data): + return int.from_bytes(data, byteorder='big', signed=False) + def _to_hex(no): return hex(no)[2:] else: @@ -122,6 +125,12 @@ def _convert_int_to_bytes(count, n): raise Exception('Number {1} needs more than {0} bytes!'.format(count, n)) return ('0' * (2 * count - len(h)) + h).decode('hex') + def _convert_bytes_to_int(data): + v = 0 + for x in data: + v = (v << 8) | ord(x) + return v + def _to_hex(no): return '%x' % no @@ -155,3 +164,10 @@ def convert_int_to_hex(no, digits=None): if digits is not None and len(value) < digits: value = '0' * (digits - len(value)) + value return value + + +def convert_bytes_to_int(data): + """ + Convert a byte string to an unsigned integer in network byte order. + """ + return _convert_bytes_to_int(data) diff --git a/tests/unit/plugins/module_utils/acme/backend_data.py b/tests/unit/plugins/module_utils/acme/backend_data.py index 988bcdaeb..31e0ef006 100644 --- a/tests/unit/plugins/module_utils/acme/backend_data.py +++ b/tests/unit/plugins/module_utils/acme/backend_data.py @@ -11,6 +11,7 @@ import os from ansible_collections.community.crypto.plugins.module_utils.acme.backends import ( + CertificateInformation, CryptoBackend, ) @@ -81,6 +82,9 @@ def load_fixture(name): TEST_CERT = load_fixture("cert_1.pem") +TEST_CERT_OPENSSL_OUTPUT = load_fixture("cert_1.txt") + + TEST_CERT_DAYS = [ (datetime.datetime(2018, 11, 15, 1, 2, 3), 11), (datetime.datetime(2018, 11, 25, 15, 20, 0), 1), @@ -88,6 +92,21 @@ def load_fixture(name): ] +TEST_CERT_INFO = [ + ( + TEST_CERT, + CertificateInformation( + not_valid_after=datetime.datetime(2018, 11, 26, 15, 28, 24), + not_valid_before=datetime.datetime(2018, 11, 25, 15, 28, 23), + serial_number=1, + subject_key_identifier=b'\x98\xD2\xFD\x3C\xCC\xCD\x69\x45\xFB\xE2\x8C\x30\x2C\x54\x62\x18\x34\xB7\x07\x73', + authority_key_identifier=None, + ), + TEST_CERT_OPENSSL_OUTPUT, + ), +] + + class FakeBackend(CryptoBackend): def parse_key(self, key_file=None, key_content=None, passphrase=None): raise BackendException('Not implemented in fake backend') @@ -98,6 +117,9 @@ def sign(self, payload64, protected64, key_data): def create_mac_key(self, alg, key): raise BackendException('Not implemented in fake backend') + def get_ordered_csr_identifiers(self, csr_filename=None, csr_content=None): + raise BackendException('Not implemented in fake backend') + def get_csr_identifiers(self, csr_filename=None, csr_content=None): raise BackendException('Not implemented in fake backend') @@ -106,3 +128,6 @@ def get_cert_days(self, cert_filename=None, cert_content=None, now=None): def create_chain_matcher(self, criterium): raise BackendException('Not implemented in fake backend') + + def get_cert_information(self, cert_filename=None, cert_content=None): + raise BackendException('Not implemented in fake backend') diff --git a/tests/unit/plugins/module_utils/acme/fixtures/cert_1.txt b/tests/unit/plugins/module_utils/acme/fixtures/cert_1.txt new file mode 100644 index 000000000..e989d914d --- /dev/null +++ b/tests/unit/plugins/module_utils/acme/fixtures/cert_1.txt @@ -0,0 +1,38 @@ +Certificate: + Data: + Version: 3 (0x2) + Serial Number: 1 (0x1) + Signature Algorithm: ecdsa-with-SHA256 + Issuer: CN=ansible.com + Validity + Not Before: Nov 25 15:28:23 2018 GMT + Not After : Nov 26 15:28:24 2018 GMT + Subject: CN=ansible.com + Subject Public Key Info: + Public Key Algorithm: id-ecPublicKey + Public-Key: (256 bit) + pub: + 04:00:9c:f4:c8:00:17:03:01:26:3a:14:d1:92:35: + f1:c2:07:9d:6d:63:ba:82:86:d8:33:79:56:b3:3a: + d2:eb:c1:bc:41:2c:e1:5d:1e:80:99:0d:c8:cd:90: + e2:9a:74:d3:5c:ee:d7:85:5c:a5:0d:3f:12:2f:31: + 38:e3:f1:29:9b + ASN1 OID: prime256v1 + NIST CURVE: P-256 + X509v3 extensions: + X509v3 Subject Alternative Name: + DNS:example.com, DNS:example.org + X509v3 Basic Constraints: critical + CA:FALSE + X509v3 Key Usage: critical + Digital Signature + X509v3 Extended Key Usage: + TLS Web Server Authentication + X509v3 Subject Key Identifier: + 98:D2:FD:3C:CC:CD:69:45:FB:E2:8C:30:2C:54:62:18:34:B7:07:73 + Signature Algorithm: ecdsa-with-SHA256 + Signature Value: + 30:46:02:21:00:bc:fb:52:bf:7a:93:2d:0e:7c:ce:43:f4:cc: + 05:98:28:36:8d:c7:2a:9b:f5:20:94:62:3d:fb:82:9e:38:42: + 32:02:21:00:c0:55:f8:b5:d9:65:41:2a:dd:d4:76:3f:8c:cb: + 07:c1:d2:b9:c0:7d:c9:90:af:fd:f9:f1:b0:c9:13:f5:d5:52 diff --git a/tests/unit/plugins/module_utils/acme/fixtures/cert_1.txt.license b/tests/unit/plugins/module_utils/acme/fixtures/cert_1.txt.license new file mode 100644 index 000000000..edff8c768 --- /dev/null +++ b/tests/unit/plugins/module_utils/acme/fixtures/cert_1.txt.license @@ -0,0 +1,3 @@ +GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) +SPDX-License-Identifier: GPL-3.0-or-later +SPDX-FileCopyrightText: Ansible Project diff --git a/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py b/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py index 59da68a3b..c3b713ee6 100644 --- a/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py +++ b/tests/unit/plugins/module_utils/acme/test_backend_cryptography.py @@ -16,11 +16,20 @@ CryptographyBackend, ) +from ansible_collections.community.crypto.plugins.module_utils.crypto.support import ( + ensure_utc_timezone, +) + +from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import ( + CRYPTOGRAPHY_TIMEZONE, +) + from .backend_data import ( TEST_KEYS, TEST_CSRS, TEST_CERT, TEST_CERT_DAYS, + TEST_CERT_INFO, ) @@ -64,3 +73,22 @@ def test_certdays_cryptography(now, expected_days, tmpdir): assert days == expected_days days = backend.get_cert_days(cert_content=TEST_CERT, now=now) assert days == expected_days + + +@pytest.mark.parametrize("cert_content, expected_cert_info, openssl_output", TEST_CERT_INFO) +def test_get_cert_information(cert_content, expected_cert_info, openssl_output, tmpdir): + fn = tmpdir / 'test-cert.pem' + fn.write(cert_content) + module = MagicMock() + backend = CryptographyBackend(module) + + if CRYPTOGRAPHY_TIMEZONE: + expected_cert_info = expected_cert_info._replace( + not_valid_after=ensure_utc_timezone(expected_cert_info.not_valid_after), + not_valid_before=ensure_utc_timezone(expected_cert_info.not_valid_before), + ) + + cert_info = backend.get_cert_information(cert_filename=str(fn)) + assert cert_info == expected_cert_info + cert_info = backend.get_cert_information(cert_content=cert_content) + assert cert_info == expected_cert_info diff --git a/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py b/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py index dd30cf795..c0a108611 100644 --- a/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py +++ b/tests/unit/plugins/module_utils/acme/test_backend_openssl_cli.py @@ -18,6 +18,10 @@ from .backend_data import ( TEST_KEYS, TEST_CSRS, + TEST_CERT, + TEST_CERT_OPENSSL_OUTPUT, + TEST_CERT_DAYS, + TEST_CERT_INFO, ) @@ -61,3 +65,29 @@ def test_normalize_ip(ip, result): module = MagicMock() backend = OpenSSLCLIBackend(module, openssl_binary='openssl') assert backend._normalize_ip(ip) == result + + +@pytest.mark.parametrize("now, expected_days", TEST_CERT_DAYS) +def test_certdays_cryptography(now, expected_days, tmpdir): + fn = tmpdir / 'test-cert.pem' + fn.write(TEST_CERT) + module = MagicMock() + module.run_command = MagicMock(return_value=(0, TEST_CERT_OPENSSL_OUTPUT, 0)) + backend = OpenSSLCLIBackend(module, openssl_binary='openssl') + days = backend.get_cert_days(cert_filename=str(fn), now=now) + assert days == expected_days + days = backend.get_cert_days(cert_content=TEST_CERT, now=now) + assert days == expected_days + + +@pytest.mark.parametrize("cert_content, expected_cert_info, openssl_output", TEST_CERT_INFO) +def test_get_cert_information(cert_content, expected_cert_info, openssl_output, tmpdir): + fn = tmpdir / 'test-cert.pem' + fn.write(cert_content) + module = MagicMock() + module.run_command = MagicMock(return_value=(0, openssl_output, 0)) + backend = OpenSSLCLIBackend(module, openssl_binary='openssl') + cert_info = backend.get_cert_information(cert_filename=str(fn)) + assert cert_info == expected_cert_info + cert_info = backend.get_cert_information(cert_content=cert_content) + assert cert_info == expected_cert_info diff --git a/tests/unit/plugins/module_utils/crypto/test_math.py b/tests/unit/plugins/module_utils/crypto/test_math.py index 31ccad2ed..4fd917713 100644 --- a/tests/unit/plugins/module_utils/crypto/test_math.py +++ b/tests/unit/plugins/module_utils/crypto/test_math.py @@ -15,6 +15,7 @@ quick_is_not_prime, convert_int_to_bytes, convert_int_to_hex, + convert_bytes_to_int, ) @@ -100,3 +101,17 @@ def test_convert_int_to_hex(no, digits, result): value = convert_int_to_hex(no, digits=digits) print(value) assert value == result + + +@pytest.mark.parametrize('data, result', [ + (b'', 0), + (b'\x00', 0), + (b'\x00\x01', 1), + (b'\x01', 1), + (b'\xff', 255), + (b'\x01\x00', 256), +]) +def test_convert_bytes_to_int(data, result): + value = convert_bytes_to_int(data) + print(value) + assert value == result