diff --git a/cert_manager/__init__.py b/cert_manager/__init__.py index 38d3037..90162f3 100644 --- a/cert_manager/__init__.py +++ b/cert_manager/__init__.py @@ -4,6 +4,7 @@ from .acme import ACMEAccount from .admin import Admin from .client import Client +from .dcv import DomainControlValidation from .domain import Domain from .organization import Organization from .person import Person @@ -12,5 +13,6 @@ from .ssl import SSL __all__ = [ - "ACMEAccount", "Admin", "Client", "Domain", "Organization", "PendingError", "Person", "Report", "SMIME", "SSL" + "ACMEAccount", "Admin", "Client", "Domain", "DomainControlValidation", + "Organization", "PendingError", "Person", "Report", "SMIME", "SSL" ] diff --git a/cert_manager/acme.py b/cert_manager/acme.py index bfadb8c..6f92aa2 100644 --- a/cert_manager/acme.py +++ b/cert_manager/acme.py @@ -175,6 +175,21 @@ def add_domains(self, acme_id, domains): return result.json() + @paginate + def get_domains (self, acme_id, **kwargs): + """List ACME account’s domains. + + :param int acme_id: The ID of the acme account to list domains + """ + params = { + self._find_params_to_api[param]: kwargs.get(param) + for param in self._find_params_to_api # pylint:disable=consider-using-dict-items + } + url = self._url(f"{acme_id}", "domain") + result = self._client.get(url, params=params) + + return result.json() + def remove_domains(self, acme_id, domains): """Remove domains from an acme account. diff --git a/cert_manager/dcv.py b/cert_manager/dcv.py index a7953e5..1184213 100644 --- a/cert_manager/dcv.py +++ b/cert_manager/dcv.py @@ -5,11 +5,11 @@ from requests.exceptions import HTTPError from ._endpoint import Endpoint +from ._helpers import paginate class DomainControlValidation(Endpoint): """Query the Sectigo Cert Manager REST API for Domain Control Validation (DCV) data.""" - def __init__(self, client, api_version="v1"): """Initialize the class. @@ -17,7 +17,24 @@ def __init__(self, client, api_version="v1"): :param string api_version: The API version to use; the default is "v1" """ super().__init__(client=client, endpoint="/dcv", api_version=api_version) + self.__dcv_domains = None + + def all(self, force=False): + """Return list of all domain control validations. + + :param bool force: If set to True, force refreshing the data from the API + """ + if (self.__dcv_domains) and (not force): + return self.__dcv_domains + + self.__dcv_domains = [] + result = self.search() + for dom in result: + self.__dcv_domains.append(dom) + + return self.__dcv_domains + @paginate def search(self, **kwargs): """Search the DCV statuses of domains. @@ -85,6 +102,26 @@ def start_validation_cname(self, domain: str): return result.json() + def start_validation_email(self, domain: str): + """Start Domain Control Validation using Email method. + + :param string domain: The domain to validate + :return response: List of valid email addresses + """ + url = self._url("validation", "start", "domain", "email") + data = {"domain": domain} + + try: + result = self._client.post(url, data=data) + except HTTPError as exc: + status_code = exc.response.status_code + if status_code == HTTPStatus.BAD_REQUEST: + err_response = exc.response.json() + raise ValueError(err_response["description"]) from exc + raise exc + + return result.json() + def submit_validation_cname(self, domain: str): """Finish Domain Control Validation using the CNAME method. @@ -111,3 +148,29 @@ def submit_validation_cname(self, domain: str): raise exc return result.json() + + def submit_validation_email(self, domain: str, email: str): + """Finish Domain Control Validation using the email method. + + :param string domain: The domain to validate + :param string email: validation email sent to + + :return response: a dictionary containing + status: The status of the validation + orderStatus: The status of the validation request + message: An optional message to help with debugging + """ + url = self._url("validation", "submit", "domain", "email") + data = {"domain": domain, + "email": email} + + try: + result = self._client.post(url, data=data) + except HTTPError as exc: + status_code = exc.response.status_code + if status_code == HTTPStatus.BAD_REQUEST: + err_response = exc.response.json() + raise ValueError(err_response["description"]) from exc + raise exc + + return result.json() diff --git a/tests/test_acme.py b/tests/test_acme.py index 6a7d7ee..d844901 100644 --- a/tests/test_acme.py +++ b/tests/test_acme.py @@ -354,6 +354,32 @@ def test_ne_acme_id(self): self.assertRaises(HTTPError, acme.get, acme_id) +class TestGetDomains(TestACMEAccount): + """Test the .get_domains method.""" + + @responses.activate + def test_acme_id(self): + """Return all domains from the specified ACME ID.""" + acme_id = 1234 + api_url = self.get_acme_account_url(acme_id) + "/domain?size=200&position=0" + valid_response = [] + + # Setup the mocked response + responses.add(responses.GET, api_url, json=valid_response, status=200) + + acme = ACMEAccount(client=self.client) + data = acme.get_domains(acme_id) + + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, api_url) + self.assertEqual(data, valid_response) + + + def test_need_acme_id(self): + """Raise an exception without an acme_id parameter.""" + acme = ACMEAccount(client=self.client) + self.assertRaises(TypeError, acme.get_domains) + def _test_create_test_factory(acme_id=1234, header="location", **kwargs): """Act as a wrapper to inject headers and parameters.""" params = ["name", "acmeServer", "organizationId", "evDetails"] diff --git a/tests/test_dcv.py b/tests/test_dcv.py index 4f169e0..fcbeb1c 100644 --- a/tests/test_dcv.py +++ b/tests/test_dcv.py @@ -72,6 +72,68 @@ def test_need_client(self): self.assertRaises(TypeError, DomainControlValidation) +class TestAll(TestDcv): + """Test the all method.""" + + def setUp(self): # pylint: disable=invalid-name + """Initialize the class.""" + # Call the inherited setUp method + super().setUp() + self.api_url = f"{self.cfixt.base_url}/dcv/v1/validation?size=200&position=0" + self.valid_response = [ + { + "domain": "*.mydomain.org", + "dcvStatus": "VALIDATED", + "dcvOrderStatus": "NOT_INITIATED", + "dcvMethod": "CNAME", + "expirationDate": "2024-03-19", + }, + { + "domain": "mydomain.org", + "dcvStatus": "VALIDATED", + "dcvOrderStatus": "NOT_INITIATED", + "dcvMethod": "CNAME", + "expirationDate": "2024-03-19", + }, + ] + + @responses.activate + def test_all(self): + """Return all the data, but it should query the API twice.""" + # Setup the mocked response + responses.add( + responses.GET, self.api_url, json=self.valid_response, status=HTTPStatus.OK + ) + + dcv = DomainControlValidation(client=self.client) + data = dcv.all() + + # Verify all the query information + # There should only be one call the first time "all" is called. + # Due to pagination, this is only guaranteed as long as the number of + # entries returned is less than the page size + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + self.assertEqual(data, self.valid_response) + + @responses.activate + def test_bad_http(self): + """Raise an exception if domains cannot be retrieved from the API.""" + # Setup the mocked response + responses.add( + responses.GET, + self.api_url, + json=self.error_response, + status=HTTPStatus.BAD_REQUEST, + ) + + domain = DomainControlValidation(client=self.client) + self.assertRaises(HTTPError, domain.all) + + # Verify all the query information + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + class TestSearch(TestDcv): """Test the .all method.""" @@ -292,6 +354,79 @@ def test_server_error(self): self.assertEqual(responses.calls[0].request.url, self.api_url) +class TestStartValidationEmail(TestDcv): + """Test the .all method.""" + + def setUp(self): # pylint: disable=invalid-name + """Initialize the class.""" + # Call the inherited setUp method + super().setUp() + self.params = { + "domain": "ccmqa.com", + } + self.api_url = f"{self.cfixt.base_url}/dcv/v1/validation/start/domain/email" + self.valid_response = { + "emails":["admin@ccmqa.com","administrator@ccmqa.com","hostmaster@ccmqa.com","postmaster@ccmqa.com","webmaster@ccmqa.com","domain-admin@ccmqa.com"] + } + + @responses.activate + def test_success(self): + """Return all the data, but it should query the API twice.""" + # Setup the mocked response + responses.add( + responses.POST, self.api_url, json=self.valid_response, status=HTTPStatus.OK + ) + + dcv = DomainControlValidation(client=self.client) + data = dcv.start_validation_email(**self.params) + + # Verify all the query information + # There should only be one call the first time "all" is called. + # Due to pagination, this is only guaranteed as long as the number of + # entries returned is less than the page size + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + self.assertEqual( + json.loads(responses.calls[0].request.body)["domain"], "ccmqa.com" + ) + self.assertEqual(data, self.valid_response) + + @responses.activate + def test_bad_req(self): + """Raise an exception if domains cannot be retrieved from the API.""" + # Setup the mocked response + responses.add( + responses.POST, + self.api_url, + json=self.error_response, + status=HTTPStatus.BAD_REQUEST, + ) + + domain = DomainControlValidation(client=self.client) + self.assertRaises(ValueError, domain.start_validation_email, **self.params) + + # Verify all the query information + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + + @responses.activate + def test_server_error(self): + """Raise an exception if domains cannot be retrieved from the API.""" + # Setup the mocked response + responses.add( + responses.POST, + self.api_url, + json=self.error_response, + status=HTTPStatus.INTERNAL_SERVER_ERROR, + ) + + domain = DomainControlValidation(client=self.client) + self.assertRaises(HTTPError, domain.start_validation_email, **self.params) + + # Verify all the query information + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + class TestSubmitValidationCname(TestDcv): """Test the .all method.""" @@ -366,3 +501,78 @@ def test_server_error(self): # Verify all the query information self.assertEqual(len(responses.calls), 1) self.assertEqual(responses.calls[0].request.url, self.api_url) + +class TestSubmitValidationEmail(TestDcv): + """Test the .all method.""" + + def setUp(self): # pylint: disable=invalid-name + """Initialize the class.""" + # Call the inherited setUp method + super().setUp() + self.params = { + "domain": "mydomain.org", + "email": "administrator@mydomain.org" + } + self.api_url = f"{self.cfixt.base_url}/dcv/v1/validation/submit/domain/email" + self.valid_response = { + "status":"VALIDATED", + "orderStatus":"SUBMITTED", + "message":"Submitted successfully"} + + @responses.activate + def test_success(self): + """Return all the data, but it should query the API twice.""" + # Setup the mocked response + responses.add( + responses.POST, self.api_url, json=self.valid_response, status=HTTPStatus.OK + ) + + dcv = DomainControlValidation(client=self.client) + data = dcv.submit_validation_email(**self.params) + + # Verify all the query information + # There should only be one call the first time "all" is called. + # Due to pagination, this is only guaranteed as long as the number of + # entries returned is less than the page size + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + self.assertEqual( + json.loads(responses.calls[0].request.body)["domain"], "mydomain.org" + ) + self.assertEqual(data, self.valid_response) + + @responses.activate + def test_bad_req(self): + """Raise an exception if domains cannot be retrieved from the API.""" + # Setup the mocked response + responses.add( + responses.POST, + self.api_url, + json=self.error_response, + status=HTTPStatus.BAD_REQUEST, + ) + + domain = DomainControlValidation(client=self.client) + self.assertRaises(ValueError, domain.submit_validation_email, **self.params) + + # Verify all the query information + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url) + + @responses.activate + def test_server_error(self): + """Raise an exception if domains cannot be retrieved from the API.""" + # Setup the mocked response + responses.add( + responses.POST, + self.api_url, + json=self.error_response, + status=HTTPStatus.INTERNAL_SERVER_ERROR, + ) + + domain = DomainControlValidation(client=self.client) + self.assertRaises(HTTPError, domain.submit_validation_email, **self.params) + + # Verify all the query information + self.assertEqual(len(responses.calls), 1) + self.assertEqual(responses.calls[0].request.url, self.api_url)