Skip to content

Commit

Permalink
Added CRL support
Browse files Browse the repository at this point in the history
  • Loading branch information
tsaarni committed Oct 30, 2023
1 parent 5b5cbbe commit bc838f6
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 7 deletions.
7 changes: 7 additions & 0 deletions .vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"recommendations": [
"ms-python.python",
"ms-python.vscode-pylance",
"ms-python.black-formatter"
]
}
9 changes: 8 additions & 1 deletion src/certy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,12 @@
__version__ = "0.1.4"

from .credential import Credential, KeyType, KeyUsage, ExtendedKeyUsage
from .certificate_revocation_list import CertificateRevocationList

__all__ = ["Credential", "KeyType", "KeyUsage", "ExtendedKeyUsage"]
__all__ = [
"Credential",
"KeyType",
"KeyUsage",
"ExtendedKeyUsage",
"CertificateRevocationList",
]
149 changes: 149 additions & 0 deletions src/certy/certificate_revocation_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#
# Copyright Certy Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import annotations

import datetime

from cryptography import x509
from cryptography.hazmat.primitives import serialization

from certy import Credential


class CertificateRevocationList(object):
"""CertificateRevocationList is a builder for X.509 CRLs."""

def __init__(
self,
issuer: Credential | None = None,
revoked_certificates: list[Credential] | None = None,
this_update: datetime.datetime | None = None,
next_update: datetime.datetime | None = None,
):
self._issuer = issuer
self._revoked_certificates = revoked_certificates or []
self._this_update = this_update
self._next_update = next_update

# Generated attributes
self._crl: x509.CertificateRevocationList | None = None

def __repr__(self) -> str:
return f"CertificateRevocationList(issuer={self._issuer}, revoked_certificates={self._revoked_certificates}, this_update={self._this_update}, next_update={self._next_update})"

# Setter methods

def issuer(self, issuer: Credential) -> CertificateRevocationList:
"""Set the issuer of the CRL."""
self._issuer = issuer
return self

def this_update(self, this_update: datetime.datetime) -> CertificateRevocationList:
"""Set the thisUpdate field of the CRL."""
self._this_update = this_update
return self

def next_update(self, next_update: datetime.datetime) -> CertificateRevocationList:
"""Set the nextUpdate field of the CRL."""
self._next_update = next_update
return self

def add(self, certificate: Credential) -> CertificateRevocationList:
"""Add a certificate to the CRL."""
self._revoked_certificates.append(certificate)
return self

# Builder methods

def generate(self) -> CertificateRevocationList:
"""Generate the CRL."""

if not self._issuer:
if len(self._revoked_certificates) == 0:
raise ValueError(
"issuer not known: either set issuer or add certificates to the CRL"
)
if self._revoked_certificates[0]._issuer is None:
raise ValueError(
"cannot determine issuer from first certificate in CRL"
)
self._issuer = self._revoked_certificates[0]._issuer

# Ensure that the issuer has a key pair.
self._issuer._ensure_generated()

effective_revocation_time = datetime.datetime.utcnow()
if self._this_update:
effective_revocation_time = self._this_update

effective_expiry_time = effective_revocation_time + datetime.timedelta(days=7)
if self._next_update:
effective_expiry_time = self._next_update

builder = (
x509.CertificateRevocationListBuilder()
.issuer_name(self._issuer._certificate.subject) # type: ignore
.last_update(effective_revocation_time)
.next_update(effective_expiry_time)
)

for certificate in self._revoked_certificates:
certificate._ensure_generated()
builder = builder.add_revoked_certificate(
x509.RevokedCertificateBuilder()
.serial_number(certificate._certificate.serial_number) # type: ignore
.revocation_date(effective_revocation_time)
.build()
)

self._crl = builder.sign(
private_key=self._issuer._private_key, # type: ignore
algorithm=self._issuer._certificate.signature_hash_algorithm, # type: ignore
)

return self

def get_as_pem(self) -> bytes:
"""Get the CRL as PEM."""
self._ensure_generated()
return self._crl.public_bytes(encoding=serialization.Encoding.PEM) # type: ignore

def get_as_der(self) -> bytes:
"""Get the CRL as DER."""
self._ensure_generated()
return self._crl.public_bytes(encoding=serialization.Encoding.DER) # type: ignore

def write_pem(self, filename: str) -> CertificateRevocationList:
"""Write the CRL as PEM to a file."""
self._ensure_generated()
with open(filename, "wb") as f:
f.write(self.get_as_pem())
return self

def write_der(self, filename: str) -> CertificateRevocationList:
self._ensure_generated()
with open(filename, "wb") as f:
f.write(self.get_as_der())
return self

# Helper methods

def _ensure_generated(self) -> CertificateRevocationList:
"""Ensure that the CRL has been generated."""
if not self._crl:
self.generate()
return self
54 changes: 53 additions & 1 deletion src/certy/credential.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
#
# Copyright Certy Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import annotations

import ipaddress
Expand Down Expand Up @@ -67,6 +83,7 @@ def __init__(
key_usages: list[KeyUsage] | None = None,
ext_key_usages: list[ExtendedKeyUsage] | None = None,
serial: int | None = None,
crl_distribution_point_uri: str | None = None,
):
self._subject = subject
self._subject_alt_names = subject_alt_names
Expand All @@ -80,13 +97,14 @@ def __init__(
self._key_usages = key_usages
self._ext_key_usages = ext_key_usages
self._serial = serial
self._crl_distribution_point_uri = crl_distribution_point_uri

# Generated attributes
self._private_key: rsa.RSAPrivateKey | ec.EllipticCurvePrivateKey | None = None
self._certificate: x509.Certificate | None = None

def __repr__(self):
return f"Credential(subject={self._subject!r}, key_type={self._key_type!r}, key_size={self._key_size!r}, expires={self._expires!r}, not_before={self._not_before!r}, not_after={self._not_after!r}, issuer={self._issuer!r}, is_ca={self._is_ca!r}, key_usages={self._key_usages!r}, ext_key_usages={self._ext_key_usages!r}, serial={self._serial!r})"
return f"Credential(subject={self._subject!r}, key_type={self._key_type!r}, key_size={self._key_size!r}, expires={self._expires!r}, not_before={self._not_before!r}, not_after={self._not_after!r}, issuer={self._issuer!r}, is_ca={self._is_ca!r}, key_usages={self._key_usages!r}, ext_key_usages={self._ext_key_usages!r}, serial={self._serial!r}, crl_distribution_point_uri={self._crl_distribution_point_uri!r})"

# Setter methods

Expand Down Expand Up @@ -289,6 +307,21 @@ def ext_key_usages(self, *ext_key_usages: ExtendedKeyUsage) -> Credential:
self._ext_key_usages = ext_key_usages
return self

def crl_distribution_point_uri(self, uri: str) -> Credential:
"""Set the CRL distribution point URI of this credential.
If not called, the CRL distribution point extension is not included in the certificate.
:param uri: The URI of the CRL distribution point.
:type uri: str
:return: This credential instance.
:rtype: Credential
"""
if not isinstance(uri, str):
raise ValueError("URI must be a string")
self._crl_distribution_point_uri = uri
return self

# Builder methods

def generate(self) -> Credential:
Expand Down Expand Up @@ -396,6 +429,25 @@ def generate(self) -> Credential:
critical=False,
)

if self._crl_distribution_point_uri is not None:
builder = builder.add_extension(
x509.CRLDistributionPoints(
[
x509.DistributionPoint(
full_name=[
x509.UniformResourceIdentifier(
self._crl_distribution_point_uri
)
],
relative_name=None,
crl_issuer=None,
reasons=None,
)
]
),
critical=False,
)

self._certificate = builder.sign(
effective_issuer._private_key, # type: ignore
_preferred_signature_hash_algorithm(effective_issuer._key_type, effective_issuer._key_size), # type: ignore
Expand Down
60 changes: 60 additions & 0 deletions tests/test_certificate_revocation_list.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Copyright Certy Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import pytest
from cryptography import x509

from certy import CertificateRevocationList, Credential


@pytest.fixture
def ca():
return Credential().subject("CN=ca")


def test_add(ca):
first_revoked = Credential().issuer(ca).subject("CN=first-revoked")
second_revoked = Credential().issuer(ca).subject("CN=second-revoked")
not_revoked = Credential().issuer(ca).subject("CN=not-revoked")
crl = (
CertificateRevocationList()
.issuer(ca)
.add(first_revoked)
.add(second_revoked)
.get_as_der()
)

# Decode DER encoded certificate revocation list from string
c = x509.load_der_x509_crl(crl)
assert c is not None
assert (
c.get_revoked_certificate_by_serial_number(
first_revoked.get_certificate().serial_number
)
is not None
)
assert (
c.get_revoked_certificate_by_serial_number(
second_revoked.get_certificate().serial_number
)
is not None
)
assert (
c.get_revoked_certificate_by_serial_number(
not_revoked.get_certificate().serial_number
)
is None
)
26 changes: 21 additions & 5 deletions tests/test_credential.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
#
# Copyright Certy Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import datetime
import ipaddress
from datetime import datetime, timedelta
Expand Down Expand Up @@ -39,7 +55,7 @@ def test_subject_alt_name():
]
)

# Single subject alternative name given instead of list
# Single subject alternative name given instead of list.
cert = (
Credential()
.subject("CN=test")
Expand Down Expand Up @@ -260,13 +276,13 @@ def test_write_pem_files(tmp_path):
wanted.write_certificates_as_pem(tmp_path / "joe.pem")
wanted.write_private_key_as_pem(tmp_path / "joe-key.pem")

# load certificate and key from files
# Load certificate and key from files.
got_cert = x509.load_pem_x509_certificate((tmp_path / "joe.pem").read_bytes())
got_key = serialization.load_pem_private_key(
(tmp_path / "joe-key.pem").read_bytes(), None
)

# check that the certificate and key match
# Check that the certificate and key match.
assert got_cert == wanted.get_certificate()
assert private_keys_equal(got_key, wanted.get_private_key())

Expand All @@ -276,13 +292,13 @@ def test_write_pem_files_with_password(tmp_path):
wanted.write_certificates_as_pem(tmp_path / "joe.pem")
wanted.write_private_key_as_pem(tmp_path / "joe-key.pem", password="secret")

# load certificate and key from files
# Load certificate and key from files.
got_cert = x509.load_pem_x509_certificate((tmp_path / "joe.pem").read_bytes())
got_key = serialization.load_pem_private_key(
(tmp_path / "joe-key.pem").read_bytes(), b"secret"
)

# check that the certificate and key match
# Check that the certificate and key match.
assert got_cert == wanted.get_certificate()
assert private_keys_equal(got_key, wanted.get_private_key())

Expand Down

0 comments on commit bc838f6

Please sign in to comment.