Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't error in DogtagCertsConnectivityCheck with external CAs #286

Merged
merged 1 commit into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 13 additions & 32 deletions src/ipahealthcheck/dogtag/ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
from ipalib import api, errors, x509
from ipaplatform.paths import paths
from ipaserver.install import certs
from ipaserver.install import ca
from ipaserver.install import krainstance
from ipapython.directivesetter import get_directive
from ipapython.dn import DN
from cryptography.hazmat.primitives.serialization import Encoding

logger = logging.getLogger()
Expand Down Expand Up @@ -95,6 +93,10 @@ def check(self):
class DogtagCertsConnectivityCheck(DogtagPlugin):
"""
Test basic connectivity by using cert-show to fetch a cert

The RA agent certificate is used because if a CA is configured we
know this certificate should exist. Use its serial number to do
the lookup.
"""
requires = ('dirsrv',)

Expand All @@ -104,59 +106,38 @@ def check(self):
logger.debug('CA is not configured, skipping connectivity check')
return

config = api.Command.config_show()

subject_base = config['result']['ipacertificatesubjectbase'][0]
ipa_subject = ca.lookup_ca_subject(api, subject_base)
try:
certs = x509.load_certificate_list_from_file(paths.IPA_CA_CRT)
cert = x509.load_certificate_from_file(paths.RA_AGENT_PEM)
except Exception as e:
yield Result(self, constants.ERROR,
key='ipa_ca_crt_file_missing',
path=paths.IPA_CA_CRT,
key='ipa_ra_crt_file_missing',
path=paths.RA_AGENT_PEM,
error=str(e),
msg='The IPA CA cert file {path} could not be '
msg='The IPA RA cert file {path} could not be '
'opened: {error}')
return

found = False
for cert in certs:
if DN(cert.subject) == ipa_subject:
found = True
break

if not found:
yield Result(self, constants.ERROR,
key='ipa_ca_cert_not_found',
subject=str(ipa_subject),
path=paths.IPA_CA_CRT,
msg='The CA certificate with subject {subject} '
'was not found in {path}')
return
# Load the IPA CA certificate to obtain its serial number. This
# was traditionally 1 prior to random serial number support.
# There is nothing special about cert 1. Even if there is no cert
# serial number 1 but the connection is ok it is considered passing.
# We used to use serial #1 but with RSNv3 it can be anything.
try:
api.Command.cert_show(cert.serial_number, all=True)
except errors.CertificateOperationError as e:
if 'not found' in str(e):
yield Result(self, constants.ERROR,
key='cert_show_1',
key='cert_show_ra',
error=str(e),
serial=str(cert.serial_number),
msg='Serial number not found: {error}')
else:
yield Result(self, constants.ERROR,
key='cert_show_1',
key='cert_show_ra',
error=str(e),
serial=str(cert.serial_number),
msg='Request for certificate failed: {error}')
except Exception as e:
yield Result(self, constants.ERROR,
key='cert_show_1',
key='cert_show_ra',
error=str(e),
serial=str(cert.serial_number),
msg='Request for certificate failed: {error')
msg='Request for certificate failed: {error}')
else:
yield Result(self, constants.SUCCESS)
175 changes: 26 additions & 149 deletions tests/test_dogtag_connectivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,23 @@

from ipalib.errors import CertificateOperationError
from ipaplatform.paths import paths
from ipapython.dn import DN


default_subject_base = [{
'result':
{
'ipacertificatesubjectbase': [f'O={m_api.env.realm}'],
},
}]


class IPACertificate:
def __init__(self, serial_number=1,
subject='CN=Certificate Authority, O=%s' % m_api.env.realm):
subject='CN=Certificate Authority, O=%s' % m_api.env.realm,
issuer='CN=Certificate Authority, O=%s' % m_api.env.realm):
self.serial_number = serial_number
self.subject = subject
self.issuer = issuer

def __eq__(self, other):
return self.serial_number == other.serial_number
Expand Down Expand Up @@ -50,18 +59,15 @@ class TestCAConnectivity(BaseTest):
Mock(return_value=CAInstance()),
}

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_ok(self, mock_load_cert, mock_ca_subject):
@patch('ipalib.x509.load_certificate_from_file')
def test_ca_connection_ok(self, mock_load_cert):
"""CA connectivity check when cert_show returns a valid value"""
m_api.Command.cert_show.side_effect = None
m_api.Command.config_show.side_effect = subject_base
m_api.Command.cert_show.return_value = {
u'result': {u'revoked': False}
}
mock_load_cert.return_value = [IPACertificate(12345)]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')
mock_load_cert.return_value = IPACertificate(12345)

framework = object()
registry.initialize(framework, config.Config)
Expand All @@ -76,20 +82,16 @@ def test_ca_connection_ok(self, mock_load_cert, mock_ca_subject):
assert result.source == 'ipahealthcheck.dogtag.ca'
assert result.check == 'DogtagCertsConnectivityCheck'

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_cert_not_found(self, mock_load_cert,
mock_ca_subject):
@patch('ipalib.x509.load_certificate_from_file')
def test_ca_connection_cert_not_found(self, mock_load_cert):
"""CA connectivity check for a cert that doesn't exist"""
m_api.Command.cert_show.reset_mock()
m_api.Command.config_show.side_effect = subject_base
m_api.Command.cert_show.side_effect = CertificateOperationError(
message='Certificate operation cannot be completed: '
'EXCEPTION (Certificate serial number 0x0 not found)'
)
mock_load_cert.return_value = [IPACertificate()]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')
mock_load_cert.return_value = IPACertificate(serial_number=7)

framework = object()
registry.initialize(framework, config.Config)
Expand All @@ -103,46 +105,16 @@ def test_ca_connection_cert_not_found(self, mock_load_cert,
assert result.result == constants.ERROR
assert result.source == 'ipahealthcheck.dogtag.ca'
assert result.check == 'DogtagCertsConnectivityCheck'
assert result.kw.get('key') == 'cert_show_1'
assert result.kw.get('serial') == '1'
assert result.kw.get('key') == 'cert_show_ra'
assert result.kw.get('serial') == '7'
assert result.kw.get('msg') == 'Serial number not found: {error}'

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_cert_file_not_found(self, mock_load_cert,
mock_ca_subject):
@patch('ipalib.x509.load_certificate_from_file')
def test_ca_connection_cert_file_not_found(self, mock_load_cert):
"""CA connectivity check for a cert that doesn't exist"""
m_api.Command.cert_show.reset_mock()
m_api.Command.config_show.side_effect = subject_base
mock_load_cert.side_effect = FileNotFoundError()
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')

framework = object()
registry.initialize(framework, config.Config)
f = DogtagCertsConnectivityCheck(registry)

self.results = capture_results(f)

assert len(self.results) == 1

result = self.results.results[0]
assert result.result == constants.ERROR
assert result.source == 'ipahealthcheck.dogtag.ca'
assert result.check == 'DogtagCertsConnectivityCheck'
assert result.kw.get('key') == 'ipa_ca_crt_file_missing'
assert result.kw.get('path') == paths.IPA_CA_CRT

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_cert_not_in_file_list(self, mock_load_cert,
mock_ca_subject):
"""CA connectivity check for a cert that isn't in IPA_CA_CRT"""
m_api.Command.cert_show.reset_mock()
m_api.Command.config_show.side_effect = bad_subject_base
mock_load_cert.return_value = [IPACertificate()]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
'O=BAD')

framework = object()
registry.initialize(framework, config.Config)
Expand All @@ -156,26 +128,18 @@ def test_ca_connection_cert_not_in_file_list(self, mock_load_cert,
assert result.result == constants.ERROR
assert result.source == 'ipahealthcheck.dogtag.ca'
assert result.check == 'DogtagCertsConnectivityCheck'
bad = bad_subject_base[0]['result']['ipacertificatesubjectbase'][0]
bad_subject = DN(f'CN=Certificate Authority,{bad}')
assert DN(result.kw['subject']) == bad_subject
assert result.kw['path'] == paths.IPA_CA_CRT
assert result.kw['msg'] == (
'The CA certificate with subject {subject} was not found in {path}'
)
assert result.kw.get('key') == 'ipa_ra_crt_file_missing'
assert result.kw.get('path') == paths.RA_AGENT_PEM

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_down(self, mock_load_cert, mock_ca_subject):
@patch('ipalib.x509.load_certificate_from_file')
def test_ca_connection_down(self, mock_load_cert):
"""CA connectivity check with the CA down"""
m_api.Command.cert_show.side_effect = CertificateOperationError(
message='Certificate operation cannot be completed: '
'Unable to communicate with CMS (503)'
)
m_api.Command.config_show.side_effect = subject_base
mock_load_cert.return_value = [IPACertificate()]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')
mock_load_cert.return_value = IPACertificate()

framework = object()
registry.initialize(framework, config.Config)
Expand All @@ -192,90 +156,3 @@ def test_ca_connection_down(self, mock_load_cert, mock_ca_subject):
assert result.kw.get('msg') == (
'Request for certificate failed: {error}'
)

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_multiple_ok(self, mock_load_cert, mock_ca_subject):
"""CA connectivity check when cert_show returns a valid value"""
m_api.Command.cert_show.side_effect = None
m_api.Command.config_show.side_effect = subject_base
m_api.Command.cert_show.return_value = {
u'result': {u'revoked': False}
}
mock_load_cert.return_value = [
IPACertificate(1, 'CN=something'),
IPACertificate(12345),
]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')

framework = object()
registry.initialize(framework, config.Config)
f = DogtagCertsConnectivityCheck(registry)

self.results = capture_results(f)

assert len(self.results) == 1

result = self.results.results[0]
assert result.result == constants.SUCCESS
assert result.source == 'ipahealthcheck.dogtag.ca'

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_multiple_ok_reverse(self, mock_load_cert,
mock_ca_subject):
"""CA connectivity check when cert_show returns a valid value"""
m_api.Command.cert_show.side_effect = None
m_api.Command.config_show.side_effect = subject_base
m_api.Command.cert_show.return_value = {
u'result': {u'revoked': False}
}
mock_load_cert.return_value = [
IPACertificate(12345),
IPACertificate(1, 'CN=something'),
]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')

framework = object()
registry.initialize(framework, config.Config)
f = DogtagCertsConnectivityCheck(registry)

self.results = capture_results(f)

assert len(self.results) == 1

result = self.results.results[0]
assert result.result == constants.SUCCESS
assert result.source == 'ipahealthcheck.dogtag.ca'

@patch('ipaserver.install.ca.lookup_ca_subject')
@patch('ipalib.x509.load_certificate_list_from_file')
def test_ca_connection_not_found(self, mock_load_cert, mock_ca_subject):
"""CA connectivity check when cert_show returns a valid value"""
m_api.Command.cert_show.side_effect = None
m_api.Command.config_show.side_effect = subject_base
m_api.Command.cert_show.return_value = {
u'result': {u'revoked': False}
}
mock_load_cert.return_value = [
IPACertificate(1, 'CN=something'),
]
mock_ca_subject.return_value = DN(('cn', 'Certificate Authority'),
f'O={m_api.env.realm}')

framework = object()
registry.initialize(framework, config.Config)
f = DogtagCertsConnectivityCheck(registry)

self.results = capture_results(f)

assert len(self.results) == 1

result = self.results.results[0]
assert result.result == constants.ERROR
assert result.source == 'ipahealthcheck.dogtag.ca'
assert result.kw['msg'] == (
'The CA certificate with subject {subject} was not found in {path}'
)