Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lndeng 636 #193

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 62 additions & 162 deletions landscape/client/manager/shutdownmanager.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,84 @@
import logging
import dbus

from twisted.internet.defer import Deferred
from twisted.internet.error import ProcessDone
from twisted.internet.protocol import ProcessProtocol
from twisted.internet import reactor

from landscape.client.manager.plugin import FAILED
from landscape.client.manager.plugin import ManagerPlugin
from landscape.client.manager.plugin import SUCCEEDED


class ShutdownFailedError(Exception):
"""Raised when a call to C{/sbin/shutdown} fails.

@ivar data: The data that the process printed before failing.
class ShutdownManager(ManagerPlugin):
"""
Plugin that either shuts down or reboots the device.

def __init__(self, data):
self.data = data

In both cases, the manager sends the success command before attempting
the shutdown/reboot.

With reboot - the call is instanteous but the success message will be
send as soon as the device comes back up.

class ShutdownManager(ManagerPlugin):
def __init__(self, process_factory=None):
if process_factory is None:
from twisted.internet import reactor as process_factory
self._process_factory = process_factory
For shutdown there is a 120 second delay between sending the success and
firing the shutdown. This is usually sufficent.
"""

def register(self, registry):
"""Add this plugin to C{registry}.

The shutdown manager handles C{shutdown} activity messages broadcast
from the server.
"""
super().register(registry)
registry.register_message("shutdown", self.perform_shutdown)

def perform_shutdown(self, message):
"""Request a system restart or shutdown.

If the call to C{/sbin/shutdown} runs without errors the activity
specified in the message will be responded as succeeded. Otherwise,
it will be responded as failed.
self.config = registry.config

registry.register_message("shutdown", self._handle_shutdown)

def _handle_shutdown(self, message):
"""
Choose shutdown or reboot
"""
operation_id = message["operation-id"]
reboot = message["reboot"]
protocol = ShutdownProcessProtocol()
protocol.set_timeout(self.registry.reactor)
protocol.result.addCallback(self._respond_success, operation_id)
protocol.result.addErrback(self._respond_failure, operation_id, reboot)
command, args = self._get_command_and_args(protocol, reboot)
self._process_factory.spawnProcess(protocol, command, args=args)

def _respond_success(self, data, operation_id):
logging.info("Shutdown request succeeded.")

if (reboot):
logging.info("Reboot Requested")
deferred = self._respond_reboot_success(
"Reboot requested of the system",
operation_id)
return deferred
else:
logging.info("Shutdown Requested")
deferred = self._respond_shutdown_success(
"Shutdown requested of the system",
operation_id)
return deferred

def _Reboot(self, _):
logging.info("Sending Reboot Command")
bus = dbus.SystemBus()
bus_object = bus.get_object(
"org.freedesktop.login1",
"/org/freedesktop/login1")
bus_object.Reboot(True, dbus_interface="org.freedesktop.login1.Manager")

def _Shutdown(self):
logging.info("Sending Shutdown Command")
bus = dbus.SystemBus()
bus_object = bus.get_object(
"org.freedesktop.login1",
"/org/freedesktop/login1")
bus_object.PowerOff(True, dbus_interface="org.freedesktop.login1.Manager")

def _respond_reboot_success(self, data, operation_id):
deferred = self._respond(SUCCEEDED, data, operation_id)
# After sending the result to the server, stop accepting messages and
# wait for the reboot/shutdown.
deferred.addCallback(lambda _: self.registry.broker.stop_exchanger())
deferred.addCallback(self._Reboot)
deferred.addErrback(self._respond_fail)
return deferred

def _respond_failure(self, failure, operation_id, reboot):
logging.info("Shutdown request failed.")
failure_report = "\n".join(
[
failure.value.data,
"",
"Attempting to force {operation}. Please note that if this "
"succeeds, Landscape will have no way of knowing and will "
"still mark this activity as having failed. It is recommended "
"you check the state of the machine manually to determine "
"whether {operation} succeeded.".format(
operation="reboot" if reboot else "shutdown",
),
],
)
deferred = self._respond(FAILED, failure_report, operation_id)
# Add another callback spawning the poweroff or reboot command (which
# seem more reliable in aberrant situations like a post-trusty release
# upgrade where upstart has been replaced with systemd). If this
# succeeds, we won't have any opportunity to report it and if it fails
# we'll already have responded indicating we're attempting to force
# the operation so either way there's no sense capturing output
protocol = ProcessProtocol()
command, args = self._get_command_and_args(protocol, reboot, True)
deferred.addCallback(
lambda _: self._process_factory.spawnProcess(
protocol,
command,
args=args,
),
)

def _respond_shutdown_success(self, data, operation_id):
deferred = self._respond(SUCCEEDED, data, operation_id)
reactor.callLater(120, self._Shutdown)
deferred.addErrback(self._respond_fail)
return deferred

def _respond_fail(self, data, operation_id):
logging.info("Shutdown/Reboot request failed.")
deferred = self._respond(FAILED, data, operation_id)
return deferred

def _respond(self, status, data, operation_id):
Expand All @@ -103,93 +93,3 @@ def _respond(self, status, data, operation_id):
self._session_id,
True,
)

def _get_command_and_args(self, protocol, reboot, force=False):
"""
Returns a C{command, args} 2-tuple suitable for use with
L{IReactorProcess.spawnProcess}.
"""
minutes = None if force else f"+{protocol.delay//60:d}"
args = {
(False, False): [
"/sbin/shutdown",
"-h",
minutes,
"Landscape is shutting down the system",
],
(False, True): [
"/sbin/shutdown",
"-r",
minutes,
"Landscape is rebooting the system",
],
(True, False): ["/sbin/poweroff"],
(True, True): ["/sbin/reboot"],
}[force, reboot]
return args[0], args


class ShutdownProcessProtocol(ProcessProtocol):
"""A ProcessProtocol for calling C{/sbin/shutdown}.

C{shutdown} doesn't return immediately when a time specification is
provided. Failures are reported immediately after it starts and return a
non-zero exit code. The process protocol calls C{shutdown} and waits for
failures for C{timeout} seconds. If no failures are reported it fires
C{result}'s callback with whatever output was received from the process.
If failures are reported C{result}'s errback is fired.

@ivar result: A L{Deferred} fired when C{shutdown} fails or
succeeds.
@ivar reboot: A flag indicating whether a shutdown or reboot should be
performed. Default is C{False}.
@ivar delay: The time in seconds from now to schedule the shutdown.
Default is 240 seconds. The time will be converted to minutes using
integer division when passed to C{shutdown}.
"""

def __init__(self, reboot=False, delay=240):
self.result = Deferred()
self.reboot = reboot
self.delay = delay
self._data = []
self._waiting = True

def get_data(self):
"""Get the data printed by the subprocess."""
return b"".join(self._data).decode("utf-8", "replace")

def set_timeout(self, reactor, timeout=10):
"""
Set the error checking timeout, after which C{result}'s callback will
be fired.
"""
reactor.call_later(timeout, self._succeed)

def childDataReceived(self, fd, data): # noqa: N802
"""Some data was received from the child.

Add it to our buffer to pass to C{result} when it's fired.
"""
if self._waiting:
self._data.append(data)

def processEnded(self, reason): # noqa: N802
"""Fire back the C{result} L{Deferred}.

C{result}'s callback will be fired with the string of data received
from the subprocess, or if the subprocess failed C{result}'s errback
will be fired with the string of data received from the subprocess.
"""
if self._waiting:
if reason.check(ProcessDone):
self._succeed()
else:
self.result.errback(ShutdownFailedError(self.get_data()))
self._waiting = False

def _succeed(self):
"""Fire C{result}'s callback with data accumulated from the process."""
if self._waiting:
self.result.callback(self.get_data())
self._waiting = False
4 changes: 2 additions & 2 deletions landscape/client/monitor/computerinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from landscape.lib.fetch import fetch_async
from landscape.lib.fs import read_text_file
from landscape.lib.network import get_fqdn
from landscape.lib.os_release import OS_RELEASE_FILENAME
from landscape.lib.os_release import get_os_filename
from landscape.lib.os_release import parse_os_release

METADATA_RETRY_MAX = 3 # Number of retries to get EC2 meta-data
Expand All @@ -29,7 +29,7 @@ def __init__(
self,
get_fqdn=get_fqdn,
meminfo_filename="/proc/meminfo",
os_release_filename=OS_RELEASE_FILENAME,
os_release_filename=get_os_filename(),
root_path="/",
fetch_async=fetch_async,
):
Expand Down
4 changes: 2 additions & 2 deletions landscape/client/package/releaseupgrader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from landscape.lib.fetch import url_to_filename
from landscape.lib.fs import read_text_file
from landscape.lib.gpg import gpg_verify
from landscape.lib.os_release import OS_RELEASE_FILENAME
from landscape.lib.os_release import get_os_filename
from landscape.lib.os_release import parse_os_release
from landscape.lib.twisted_util import spawn_process

Expand Down Expand Up @@ -55,7 +55,7 @@ class ReleaseUpgrader(PackageTaskHandler):

config_factory = ReleaseUpgraderConfiguration
queue_name = "release-upgrader"
os_release_filename = OS_RELEASE_FILENAME
os_release_filename = get_os_filename()
landscape_ppa_url = "http://ppa.launchpad.net/landscape/trunk/ubuntu/"
logs_directory = "/var/log/dist-upgrade"
logs_limit = 100000 # characters
Expand Down
4 changes: 2 additions & 2 deletions landscape/client/package/taskhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from landscape.lib.lock import lock_path
from landscape.lib.lock import LockError
from landscape.lib.log import log_failure
from landscape.lib.os_release import OS_RELEASE_FILENAME
from landscape.lib.os_release import get_os_filename
from landscape.lib.os_release import parse_os_release


Expand Down Expand Up @@ -93,7 +93,7 @@ class PackageTaskHandler:
config_factory = PackageTaskHandlerConfiguration

queue_name = "default"
os_release_filename = OS_RELEASE_FILENAME
os_release_filename = get_os_filename()
package_store_class = PackageStore

# This file is touched after every succesful 'apt-get update' run if the
Expand Down
8 changes: 4 additions & 4 deletions landscape/client/package/tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from landscape.lib.fetch import FetchError
from landscape.lib.fs import create_text_file
from landscape.lib.fs import touch_file
from landscape.lib.os_release import OS_RELEASE_FILENAME
from landscape.lib.os_release import get_os_filename
from landscape.lib.os_release import parse_os_release
from landscape.lib.testing import EnvironSaverHelper
from landscape.lib.testing import FakeReactor
Expand Down Expand Up @@ -1182,7 +1182,7 @@ def test_detect_packages_from_security_pocket(self):
"""Packages versions coming from security are reported as such."""
message_store = self.broker_service.message_store
message_store.set_accepted_types(["packages"])
os_release_info = parse_os_release(OS_RELEASE_FILENAME)
os_release_info = parse_os_release(get_os_filename())
release_path = os.path.join(self.repository_dir, "Release")
with open(release_path, "w") as release:
release.write(
Expand Down Expand Up @@ -1241,7 +1241,7 @@ def test_detect_packages_changes_with_backports(self):
message_store = self.broker_service.message_store
message_store.set_accepted_types(["packages"])

os_release_info = parse_os_release(OS_RELEASE_FILENAME)
os_release_info = parse_os_release(get_os_filename())
release_path = os.path.join(self.repository_dir, "Release")
with open(release_path, "w") as release:
release.write(
Expand Down Expand Up @@ -1301,7 +1301,7 @@ def test_detect_packages_changes_with_backports_both(self):
os.remove(os.path.join(other_backport_dir, "Packages"))
self.facade.add_channel_deb_dir(other_backport_dir)

os_release_info = parse_os_release(OS_RELEASE_FILENAME)
os_release_info = parse_os_release(get_os_filename())
official_release_path = os.path.join(self.repository_dir, "Release")
with open(official_release_path, "w") as release:
release.write(
Expand Down
Loading
Loading