Skip to content

Commit

Permalink
openssh_cert - Adding regenerate option (#256)
Browse files Browse the repository at this point in the history
* Initial commit

* Fixing unit tests

* More unit fixes

* Adding changelog fragment

* Minor refactor in Certificate.generate()

* Addressing option case-sensitivity and directive overrides

* Renaming idempotency to regenerate

* updating changelog

* Minor refactoring of default options

* Cleaning up with inline functions

* Fixing false failures when regenerate=fail and improving clarity

* Applying second round of review suggestions

* adding helper for safe atomic moves
  • Loading branch information
Ajpantuso authored Jul 31, 2021
1 parent d6403ac commit aaba87a
Show file tree
Hide file tree
Showing 10 changed files with 704 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
minor_changes:
- openssh_cert - added ``regenerate`` option to validate additional certificate parameters which trigger
regeneration of an existing certificate (https://github.com/ansible-collections/community.crypto/pull/256).
42 changes: 42 additions & 0 deletions plugins/module_utils/openssh/backends/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2021, Andrew Pantuso (@ajpantuso) <[email protected]>
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.

from __future__ import absolute_import, division, print_function
__metaclass__ = type

import os


def restore_on_failure(f):
def backup_and_restore(module, path, *args, **kwargs):
backup_file = module.backup_local(path) if os.path.exists(path) else None

try:
f(module, path, *args, **kwargs)
except Exception:
if backup_file is not None:
module.atomic_move(backup_file, path)
raise
else:
module.add_cleanup_file(backup_file)

return backup_and_restore


@restore_on_failure
def safe_atomic_move(module, path, destination):
module.atomic_move(path, destination)
176 changes: 165 additions & 11 deletions plugins/module_utils/openssh/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from hashlib import sha256

from ansible.module_utils import six
from ansible.module_utils.common.text.converters import to_text
from ansible_collections.community.crypto.plugins.module_utils.crypto.support import convert_relative_to_datetime
from ansible_collections.community.crypto.plugins.module_utils.openssh.utils import (
OpensshParser,
Expand Down Expand Up @@ -74,6 +75,29 @@
_ALWAYS = datetime(1970, 1, 1)
_FOREVER = datetime.max

_CRITICAL_OPTIONS = (
'force-command',
'source-address',
'verify-required',
)

_DIRECTIVES = (
'clear',
'no-x11-forwarding',
'no-agent-forwarding',
'no-port-forwarding',
'no-pty',
'no-user-rc',
)

_EXTENSIONS = (
'permit-x11-forwarding',
'permit-agent-forwarding',
'permit-port-forwarding',
'permit-pty',
'permit-user-rc'
)

if six.PY3:
long = int

Expand All @@ -92,6 +116,9 @@ def __eq__(self, other):
else:
return self._valid_from == other._valid_from and self._valid_to == other._valid_to

def __ne__(self, other):
return not self == other

@property
def validity_string(self):
if not (self._valid_from == _ALWAYS and self._valid_to == _FOREVER):
Expand Down Expand Up @@ -131,12 +158,14 @@ def format_datetime(dt, date_format):
@staticmethod
def to_datetime(time_string_or_timestamp):
try:
if isinstance(time_string_or_timestamp, str):
if isinstance(time_string_or_timestamp, six.string_types):
result = OpensshCertificateTimeParameters._time_string_to_datetime(time_string_or_timestamp.strip())
elif isinstance(time_string_or_timestamp, (long, int)):
result = OpensshCertificateTimeParameters._timestamp_to_datetime(time_string_or_timestamp)
else:
raise ValueError("Value must be of type (str, int, long) not %s" % type(time_string_or_timestamp))
raise ValueError(
"Value must be of type (str, unicode, int, long) not %s" % type(time_string_or_timestamp)
)
except ValueError:
raise
return result
Expand Down Expand Up @@ -174,6 +203,78 @@ def _time_string_to_datetime(time_string):
return result


class OpensshCertificateOption(object):
def __init__(self, option_type, name, data):
if option_type not in ('critical', 'extension'):
raise ValueError("type must be either 'critical' or 'extension'")

if not isinstance(name, six.string_types):
raise TypeError("name must be a string not %s" % type(name))

if not isinstance(data, six.string_types):
raise TypeError("data must be a string not %s" % type(data))

self._option_type = option_type
self._name = name.lower()
self._data = data

def __eq__(self, other):
if not isinstance(other, type(self)):
return NotImplemented

return all([
self._option_type == other._option_type,
self._name == other._name,
self._data == other._data,
])

def __hash__(self):
return hash((self._option_type, self._name, self._data))

def __ne__(self, other):
return not self == other

def __str__(self):
if self._data:
return "%s=%s" % (self._name, self._data)
return self._name

@property
def data(self):
return self._data

@property
def name(self):
return self._name

@property
def type(self):
return self._option_type

@classmethod
def from_string(cls, option_string):
if not isinstance(option_string, six.string_types):
raise ValueError("option_string must be a string not %s" % type(option_string))
option_type = None

if ':' in option_string:
option_type, value = option_string.strip().split(':', 1)
if '=' in value:
name, data = value.split('=', 1)
else:
name, data = value, ''
elif '=' in option_string:
name, data = option_string.strip().split('=', 1)
else:
name, data = option_string.strip(), ''

return cls(
option_type=option_type or get_option_type(name.lower()),
name=name,
data=data
)


@six.add_metaclass(abc.ABCMeta)
class OpensshCertificateInfo:
"""Encapsulates all certificate information which is signed by a CA key"""
Expand Down Expand Up @@ -402,15 +503,15 @@ def load(cls, path):

@property
def type_string(self):
return self._cert_info.type_string
return to_text(self._cert_info.type_string)

@property
def nonce(self):
return self._cert_info.nonce

@property
def public_key(self):
return self._cert_info.public_key_fingerprint()
return to_text(self._cert_info.public_key_fingerprint())

@property
def serial(self):
Expand All @@ -422,11 +523,11 @@ def type(self):

@property
def key_id(self):
return self._cert_info.key_id
return to_text(self._cert_info.key_id)

@property
def principals(self):
return self._cert_info.principals
return [to_text(p) for p in self._cert_info.principals]

@property
def valid_after(self):
Expand All @@ -438,19 +539,21 @@ def valid_before(self):

@property
def critical_options(self):
return self._cert_info.critical_options
return [
OpensshCertificateOption('critical', to_text(n), to_text(d)) for n, d in self._cert_info.critical_options
]

@property
def extensions(self):
return self._cert_info.extensions
return [OpensshCertificateOption('extension', to_text(n), to_text(d)) for n, d in self._cert_info.extensions]

@property
def reserved(self):
return self._cert_info.reserved

@property
def signing_key(self):
return self._cert_info.signing_key_fingerprint()
return to_text(self._cert_info.signing_key_fingerprint())

@staticmethod
def _parse_cert_info(pub_key_type, parser):
Expand Down Expand Up @@ -484,14 +587,36 @@ def to_dict(self):
'principals': self.principals,
'valid_after': time_parameters.valid_from(date_format='human_readable'),
'valid_before': time_parameters.valid_to(date_format='human_readable'),
'critical_options': self.critical_options,
'extensions': [e[0] for e in self.extensions],
'critical_options': [str(critical_option) for critical_option in self.critical_options],
'extensions': [str(extension) for extension in self.extensions],
'reserved': self.reserved,
'public_key': self.public_key,
'signing_key': self.signing_key,
}


def apply_directives(directives):
if any(d not in _DIRECTIVES for d in directives):
raise ValueError("directives must be one of %s" % ", ".join(_DIRECTIVES))

directive_to_option = {
'no-x11-forwarding': OpensshCertificateOption('extension', 'permit-x11-forwarding', ''),
'no-agent-forwarding': OpensshCertificateOption('extension', 'permit-agent-forwarding', ''),
'no-port-forwarding': OpensshCertificateOption('extension', 'permit-port-forwarding', ''),
'no-pty': OpensshCertificateOption('extension', 'permit-pty', ''),
'no-user-rc': OpensshCertificateOption('extension', 'permit-user-rc', ''),
}

if 'clear' in directives:
return []
else:
return list(set(default_options()) - set(directive_to_option[d] for d in directives))


def default_options():
return [OpensshCertificateOption('extension', name, '') for name in _EXTENSIONS]


def fingerprint(public_key):
"""Generates a SHA256 hash and formats output to resemble ``ssh-keygen``"""
h = sha256()
Expand All @@ -514,5 +639,34 @@ def get_cert_info_object(key_type):
return cert_info


def get_option_type(name):
if name in _CRITICAL_OPTIONS:
result = 'critical'
elif name in _EXTENSIONS:
result = 'extension'
else:
raise ValueError("%s is not a valid option. " % name +
"Custom options must start with 'critical:' or 'extension:' to indicate type")
return result


def is_relative_time_string(time_string):
return time_string.startswith("+") or time_string.startswith("-")


def parse_option_list(option_list):
critical_options = []
directives = []
extensions = []

for option in option_list:
if option.lower() in _DIRECTIVES:
directives.append(option.lower())
else:
option_object = OpensshCertificateOption.from_string(option)
if option_object.type == 'critical':
critical_options.append(option_object)
else:
extensions.append(option_object)

return critical_options, list(set(extensions + apply_directives(directives)))
Loading

0 comments on commit aaba87a

Please sign in to comment.