Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

openssh_cert - Adding regenerate option #256

Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
minor_changes:
- openssh_cert - added ``idempotency`` option to validate additional certificate parameters which trigger
regeneration of an existing certificate (https://github.com/ansible-collections/community.crypto/pull/256).
180 changes: 169 additions & 11 deletions plugins/module_utils/openssh/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from hashlib import sha256

from ansible.module_utils import six
from ansible.module_utils.common.text.converters import to_text
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
OpensshParser,
Expand Down Expand Up @@ -74,6 +75,29 @@
_ALWAYS = datetime(1970, 1, 1)
_FOREVER = datetime.max

_CRITICAL_OPTIONS = (
'force-command',
'source-address',
'verify-required',
)

_DIRECTIVES = (
'clear',
'no-x11-forwarding',
'no-agent-forwarding',
'no-port-forwarding',
'no-pty',
'no-user-rc',
)

_EXTENSIONS = (
'permit-x11-forwarding',
'permit-agent-forwarding',
'permit-port-forwarding',
'permit-pty',
'permit-user-rc'
)

if six.PY3:
long = int

Expand All @@ -92,6 +116,9 @@ def __eq__(self, other):
else:
return self._valid_from == other._valid_from and self._valid_to == other._valid_to

def __ne__(self, other):
return not self == other

@property
def validity_string(self):
if not (self._valid_from == _ALWAYS and self._valid_to == _FOREVER):
Expand Down Expand Up @@ -131,12 +158,14 @@ def format_datetime(dt, date_format):
@staticmethod
def to_datetime(time_string_or_timestamp):
try:
if isinstance(time_string_or_timestamp, str):
if isinstance(time_string_or_timestamp, six.string_types):
result = OpensshCertificateTimeParameters._time_string_to_datetime(time_string_or_timestamp.strip())
elif isinstance(time_string_or_timestamp, (long, int)):
result = OpensshCertificateTimeParameters._timestamp_to_datetime(time_string_or_timestamp)
else:
raise ValueError("Value must be of type (str, int, long) not %s" % type(time_string_or_timestamp))
raise ValueError(
"Value must be of type (str, unicode, int, long) not %s" % type(time_string_or_timestamp)
)
except ValueError:
raise
return result
Expand Down Expand Up @@ -174,6 +203,89 @@ def _time_string_to_datetime(time_string):
return result


class OpensshCertificateOption(object):
def __init__(self, option_type, name, data):
if not isinstance(option_type, six.string_types) or option_type not in ('critical', 'extension'):
Ajpantuso marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("type must be either 'critical' or 'extension'")

if not isinstance(name, six.string_types):
raise TypeError("name must be a string not %s" % type(name))

if not isinstance(data, six.string_types):
raise TypeError("data must be a string not %s" % type(data))

self._option_type = option_type
self._name = name.lower()
self._data = data

def __eq__(self, other):
if not isinstance(other, type(self)):
return NotImplemented

return all([
self._option_type == other._option_type,
self._name == other._name,
self._data == other._data,
])

def __hash__(self):
return hash((self._option_type, self._name, self._data))

def __ne__(self, other):
return not self == other

def __str__(self):
if self._data:
return "%s=%s" % (self._name, self._data)
return self._name

@property
def data(self):
return self._data

@property
def name(self):
return self._name

@property
def type(self):
return self._option_type

@classmethod
def from_string(cls, option_string):
if not isinstance(option_string, six.string_types):
raise ValueError("option_string must be a string not %s" % type(option_string))
option_type = None

if ':' in option_string:
option_type, value = option_string.strip().split(':', 1)
if '=' in value:
name, data = value.split('=', 1)
else:
name, data = value, ''
elif '=' in option_string:
name, data = option_string.strip().split('=', 1)
else:
name, data = option_string.strip(), ''

name = name.lower()

if option_type is None:
if name in _CRITICAL_OPTIONS:
option_type = 'critical'
elif name in _EXTENSIONS:
option_type = 'extension'
else:
raise ValueError("%s is not a valid option. " % name +
"Custom options must start with 'critical:' or 'extension:' to indicate type")

return cls(
option_type=option_type,
name=name,
data=data
)


@six.add_metaclass(abc.ABCMeta)
class OpensshCertificateInfo:
"""Encapsulates all certificate information which is signed by a CA key"""
Expand Down Expand Up @@ -402,15 +514,15 @@ def load(cls, path):

@property
def type_string(self):
return self._cert_info.type_string
return to_text(self._cert_info.type_string)

@property
def nonce(self):
return self._cert_info.nonce

@property
def public_key(self):
return self._cert_info.public_key_fingerprint()
return to_text(self._cert_info.public_key_fingerprint())

@property
def serial(self):
Expand All @@ -422,11 +534,11 @@ def type(self):

@property
def key_id(self):
return self._cert_info.key_id
return to_text(self._cert_info.key_id)

@property
def principals(self):
return self._cert_info.principals
return [to_text(p) for p in self._cert_info.principals]

@property
def valid_after(self):
Expand All @@ -438,19 +550,21 @@ def valid_before(self):

@property
def critical_options(self):
return self._cert_info.critical_options
return [
OpensshCertificateOption('critical', to_text(n), to_text(d)) for n, d in self._cert_info.critical_options
]

@property
def extensions(self):
return self._cert_info.extensions
return [OpensshCertificateOption('extension', to_text(n), to_text(d)) for n, d in self._cert_info.extensions]

@property
def reserved(self):
return self._cert_info.reserved

@property
def signing_key(self):
return self._cert_info.signing_key_fingerprint()
return to_text(self._cert_info.signing_key_fingerprint())

@staticmethod
def _parse_cert_info(pub_key_type, parser):
Expand Down Expand Up @@ -484,14 +598,40 @@ def to_dict(self):
'principals': self.principals,
'valid_after': time_parameters.valid_from(date_format='human_readable'),
'valid_before': time_parameters.valid_to(date_format='human_readable'),
'critical_options': self.critical_options,
'extensions': [e[0] for e in self.extensions],
'critical_options': [str(critical_option) for critical_option in self.critical_options],
'extensions': [str(extension) for extension in self.extensions],
'reserved': self.reserved,
'public_key': self.public_key,
'signing_key': self.signing_key,
}


def apply_directives(directives):
if any(d not in _DIRECTIVES for d in directives):
raise ValueError("directives must be one of %s" % ", ".join(_DIRECTIVES))

default_options = [
OpensshCertificateOption('extension', 'permit-x11-forwarding', ''),
OpensshCertificateOption('extension', 'permit-agent-forwarding', ''),
OpensshCertificateOption('extension', 'permit-port-forwarding', ''),
OpensshCertificateOption('extension', 'permit-pty', ''),
OpensshCertificateOption('extension', 'permit-user-rc', ''),
]

directive_to_option = {
'no-x11-forwarding': OpensshCertificateOption('extension', 'permit-x11-forwarding', ''),
'no-agent-forwarding': OpensshCertificateOption('extension', 'permit-agent-forwarding', ''),
'no-port-forwarding': OpensshCertificateOption('extension', 'permit-port-forwarding', ''),
'no-pty': OpensshCertificateOption('extension', 'permit-pty', ''),
'no-user-rc': OpensshCertificateOption('extension', 'permit-user-rc', ''),
}

if 'clear' in directives:
return []
else:
return list(set(default_options) - set(directive_to_option[d] for d in directives))


def fingerprint(public_key):
"""Generates a SHA256 hash and formats output to resemble ``ssh-keygen``"""
h = sha256()
Expand All @@ -516,3 +656,21 @@ def get_cert_info_object(key_type):

def is_relative_time_string(time_string):
return time_string.startswith("+") or time_string.startswith("-")


def parse_option_list(option_list):
critical_options = []
directives = []
extensions = []

for option in option_list:
if option.lower() in _DIRECTIVES:
directives.append(option.lower())
else:
option_object = OpensshCertificateOption.from_string(option)
if option_object.type == 'critical':
critical_options.append(option_object)
else:
extensions.append(option_object)

return critical_options, list(set(extensions + apply_directives(directives)))
Loading