Skip to content

Commit

Permalink
clean-up now-unused functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Perfect5th committed Aug 20, 2024
1 parent be09d17 commit 8d09a58
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 255 deletions.
68 changes: 0 additions & 68 deletions landscape/client/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,74 +718,6 @@ def store_public_key_data(config, certificate_data):
return key_filename


def failure(add_result, reason=None):
"""Handle a failed communication by recording the kind of failure."""
if reason:
add_result(reason)


def exchange_failure(add_result, ssl_error=False):
"""Handle a failed call by recording if the failure was SSL-related."""
if ssl_error:
add_result("ssl-error")
else:
add_result("non-ssl-error")


def handle_registration_errors(add_result, failure, connector):
"""Handle registration errors.
The connection to the broker succeeded but the registration itself
failed, because of invalid credentials or excessive pending computers.
We need to trap the exceptions so they don't stacktrace (we know what is
going on), and try to cleanly disconnect from the broker.
Note: "results" contains a failure indication already (or will shortly)
since the registration-failed signal will fire."""
error = failure.trap(RegistrationError, MethodCallError)
if error is RegistrationError:
add_result(str(failure.value))
connector.disconnect()


def success(add_result):
"""Handle a successful communication by recording the fact."""
add_result("success")


def done(ignored_result, connector, reactor):
"""Clean up after communicating with the server."""
connector.disconnect()
reactor.stop()


def got_connection(add_result, connector, reactor, remote):
"""Handle becoming connected to a broker."""
handlers = {
"registration-done": partial(success, add_result),
"registration-failed": partial(failure, add_result),
"exchange-failed": partial(exchange_failure, add_result),
}
deferreds = [
remote.call_on_event(handlers),
remote.register().addErrback(
partial(handle_registration_errors, add_result),
connector,
),
]
results = gather_results(deferreds)
results.addCallback(done, connector, reactor)
return results


def got_error(failure, reactor, add_result, print=print):
"""Handle errors contacting broker."""
print(failure.getTraceback(), file=sys.stderr)
# Can't just raise SystemExit; it would be ignored by the reactor.
add_result(SystemExit())
reactor.stop()


def attempt_registration(
identity: Identity,
config: LandscapeSetupConfiguration,
Expand Down
187 changes: 0 additions & 187 deletions landscape/client/tests/test_configuration.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
import os
import sys
import textwrap
import unittest
from functools import partial
from unittest import mock

from twisted.internet.defer import Deferred
from twisted.internet.defer import succeed

from landscape.client.broker.registration import Identity
from landscape.client.broker.registration import RegistrationError
from landscape.client.broker.tests.helpers import BrokerConfigurationHelper
from landscape.client.configuration import bootstrap_tree
from landscape.client.configuration import ConfigurationError
from landscape.client.configuration import done
from landscape.client.configuration import exchange_failure
from landscape.client.configuration import EXIT_NOT_REGISTERED
from landscape.client.configuration import failure
from landscape.client.configuration import get_secure_id
from landscape.client.configuration import got_error
from landscape.client.configuration import handle_registration_errors
from landscape.client.configuration import ImportOptionError
from landscape.client.configuration import is_registered
from landscape.client.configuration import LandscapeSetupConfiguration
Expand All @@ -33,10 +24,8 @@
from landscape.client.configuration import setup
from landscape.client.configuration import show_help
from landscape.client.configuration import store_public_key_data
from landscape.client.configuration import success
from landscape.client.serviceconfig import ServiceConfigException
from landscape.client.tests.helpers import LandscapeTest
from landscape.lib.amp import MethodCallError
from landscape.lib.compat import ConfigParser
from landscape.lib.compat import StringIO
from landscape.lib.fetch import HTTPCodeError
Expand Down Expand Up @@ -64,182 +53,6 @@ def get_config(self, args, data_path=None):
return config


class SuccessTests(unittest.TestCase):
def test_success(self):
"""The success handler records the success."""
results = []
success(results.append)
self.assertEqual(["success"], results)


class FailureTests(unittest.TestCase):
def test_failure(self):
"""The failure handler records the failure and returns non-zero."""
results = []
self.assertNotEqual(0, failure(results.append, "an-error"))
self.assertEqual(["an-error"], results)


class ExchangeFailureTests(unittest.TestCase):
def test_exchange_failure_ssl(self):
"""The exchange_failure() handler records whether or not the failure
involved SSL or not and returns non-zero."""
results = []
self.assertNotEqual(
0,
exchange_failure(results.append, ssl_error=True),
)
self.assertEqual(["ssl-error"], results)

def test_exchange_failure_non_ssl(self):
"""
The exchange_failure() handler records whether or not the failure
involved SSL or not and returns non-zero.
"""
results = []
self.assertNotEqual(
0,
exchange_failure(results.append, ssl_error=False),
)
self.assertEqual(["non-ssl-error"], results)


class HandleRegistrationErrorsTests(unittest.TestCase):
def test_handle_registration_errors_traps(self):
"""
The handle_registration_errors() function traps RegistrationError
and MethodCallError errors.
"""

class FauxFailure:
def trap(self, *trapped):
self.trapped_exceptions = trapped

faux_connector = FauxConnector()
faux_failure = FauxFailure()

results = []
add_result = results.append

self.assertNotEqual(
0,
handle_registration_errors(
add_result,
faux_failure,
faux_connector,
),
)
self.assertTrue(
[RegistrationError, MethodCallError],
faux_failure.trapped_exceptions,
)

def test_handle_registration_errors_disconnects_cleanly(self):
"""
The handle_registration_errors function disconnects the broker
connector cleanly.
"""

class FauxFailure:
def trap(self, *trapped):
pass

faux_connector = FauxConnector()
faux_failure = FauxFailure()

results = []
add_result = results.append

self.assertNotEqual(
0,
handle_registration_errors(
add_result,
faux_failure,
faux_connector,
),
)
self.assertTrue(faux_connector.was_disconnected)

def test_handle_registration_errors_as_errback(self):
"""
The handle_registration_errors functions works as an errback.
This test was put in place to assert the parameters passed to the
function when used as an errback are in the correct order.
"""
faux_connector = FauxConnector()
calls = []

def i_raise(result):
calls.append(True)
return RegistrationError("Bad mojo")

results = []
add_result = results.append

deferred = Deferred()
deferred.addCallback(i_raise)
deferred.addErrback(
partial(handle_registration_errors, add_result),
faux_connector,
)
deferred.callback("") # This kicks off the callback chain.

self.assertEqual([True], calls)


class DoneTests(unittest.TestCase):
def test_done(self):
"""The done() function handles cleaning up."""

class FauxConnector:
was_disconnected = False

def disconnect(self):
self.was_disconnected = True

class FauxReactor:
was_stopped = False

def stop(self):
self.was_stopped = True

faux_connector = FauxConnector()
faux_reactor = FauxReactor()

done(None, faux_connector, faux_reactor)
self.assertTrue(faux_connector.was_disconnected)
self.assertTrue(faux_reactor.was_stopped)


class GotErrorTests(unittest.TestCase):
def test_got_error(self):
"""The got_error() function handles displaying errors and exiting."""

class FauxFailure:
def getTraceback(self): # noqa: N802
return "traceback"

results = []
printed = []

def faux_print(text, file):
printed.append((text, file))

mock_reactor = mock.Mock()

got_error(
FauxFailure(),
reactor=mock_reactor,
add_result=results.append,
print=faux_print,
)
mock_reactor.stop.assert_called_once_with()

self.assertIsInstance(results[0], SystemExit)
self.assertEqual([("traceback", sys.stderr)], printed)


class PrintTextTest(LandscapeTest):
@mock.patch("sys.stdout", new_callable=StringIO)
def test_default(self, stdout):
Expand Down

0 comments on commit 8d09a58

Please sign in to comment.