Skip to content

Commit

Permalink
Make a bunch of classes to help unpack StoppedPools
Browse files Browse the repository at this point in the history
Unpack this D-Bus property more rigorously. Use the unpacking for
ClevisInfo and KeyDescription for default pools also.

Signed-off-by: mulhern <[email protected]>
  • Loading branch information
mulkieran committed Aug 21, 2023
1 parent 52b5efc commit 870cafe
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 36 deletions.
66 changes: 36 additions & 30 deletions src/stratis_cli/_actions/_list_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
"""

# isort: STDLIB
import json
from abc import ABC, abstractmethod

# isort: THIRDPARTY
Expand All @@ -35,7 +34,7 @@
print_table,
size_triple,
)
from ._utils import ClevisInfo
from ._utils import EncryptionInfoClevis, EncryptionInfoKeyDescription, StoppedPool


def _fetch_stopped_pools_property(proxy):
Expand All @@ -57,24 +56,22 @@ def _non_existent_or_inconsistent_to_str(
value, *, inconsistent_str="inconsistent", non_existent_str="N/A", interp=str
):
"""
Process dbus value that encodes both inconsistency and existence of the
Process dbus result that encodes both inconsistency and existence of the
value.
:param value: a dbus value
:param EncryptionInfo value: a dbus result
:param str inconsistent_str: value to return if inconsistent
:param str non_existent_str: value to return if non-existent
:param interp: how to interpret the value if it exists and is consistent
:returns: a string to print
:rtype: str
"""
(consistent, tuple_or_err_str) = value

if not consistent: # pragma: no cover
if not value.consistent(): # pragma: no cover
return inconsistent_str

(exists, value) = tuple_or_err_str
value = value.value()

if not exists:
if value is None:
return non_existent_str

return interp(value)
Expand All @@ -92,15 +89,17 @@ def list_pools(uuid_formatter, *, stopped=False, selection=None):
klass.list_pools()


def clevis_to_str(clevis_dbus_object): # pragma: no cover
def _clevis_to_str(clevis_info): # pragma: no cover
"""
:param dbus.Struct clevis_dbus_object: clevis information
:param ClevisInfo clevis_info: the Clevis info to stringify
:return: a string that represents the clevis info
:rtype: str
"""
clevis_pin = str(clevis_dbus_object[0])
clevis_config = json.loads(clevis_dbus_object[1])
clevis_info = ClevisInfo(clevis_pin, clevis_config)
return str(clevis_info)

config_string = " ".join(
f"{key}: {value}" for key, value in clevis_info.config.items()
)
return f"{clevis_info.pin} {config_string}"


class List(ABC): # pylint: disable=too-few-public-methods
Expand Down Expand Up @@ -262,15 +261,17 @@ def _print_detail_view(self, mopool, size_change_codes):
)

key_description_str = (
_non_existent_or_inconsistent_to_str(mopool.KeyDescription())
_non_existent_or_inconsistent_to_str(
EncryptionInfoKeyDescription(mopool.KeyDescription())
)
if encrypted
else "unencrypted"
)
print(f"Key Description: {key_description_str}")

clevis_info_str = (
_non_existent_or_inconsistent_to_str(
mopool.ClevisInfo(), interp=clevis_to_str
EncryptionInfoClevis(mopool.ClevisInfo()), interp=_clevis_to_str
)
if encrypted
else "unencrypted"
Expand Down Expand Up @@ -435,32 +436,34 @@ def _print_detail_view(self, pool_uuid, pool):
Print detailed view of a stopped pool.
:param str pool_uuid: the pool UUID
:param pool: a table of information on this pool
:param StoppedPool pool: information about a single pool
:type pool: dict of str * object
"""
print(f"Name: {self._pool_name(pool.get('name'))}")
print(f"Name: {self._pool_name(pool.name)}")

print(f"UUID: {self.uuid_formatter(pool_uuid)}")

key_description = pool.get("key_description")
key_description = pool.key_description
key_description_str = (
"unencrypted"
if key_description is None
else _non_existent_or_inconsistent_to_str(key_description)
)
print(f"Key Description: {key_description_str}")

clevis_info = pool.get("clevis_info")
clevis_info = pool.clevis_info
clevis_info_str = (
"unencrypted"
if clevis_info is None
else _non_existent_or_inconsistent_to_str(clevis_info, interp=clevis_to_str)
else _non_existent_or_inconsistent_to_str(
clevis_info, interp=_clevis_to_str
)
)
print(f"Clevis Configuration: {clevis_info_str}")

print("Devices:")
for dev in pool["devs"]:
print(f"{self.uuid_formatter(dev['uuid'])} {dev['devnode']}")
for dev in pool.devs:
print(f"{self.uuid_formatter(dev.uuid)} {dev.devnode}")

def list_pools(self):
"""
Expand Down Expand Up @@ -488,13 +491,16 @@ def key_description_str(value):
if self.selection is None:
tables = [
(
self._pool_name(info.get("name")),
self._pool_name(sp.name),
self.uuid_formatter(pool_uuid),
str(len(info["devs"])),
key_description_str(info.get("key_description")),
clevis_str(info.get("clevis_info")),
str(len(sp.devs)),
key_description_str(sp.key_description),
clevis_str(sp.clevis_info),
)
for pool_uuid, sp in (
(pool_uuid, StoppedPool(info))
for pool_uuid, info in stopped_pools.items()
)
for (pool_uuid, info) in stopped_pools.items()
]

print_table(
Expand Down Expand Up @@ -531,4 +537,4 @@ def selection_func(_uuid, info):

(pool_uuid, pool) = stopped_pool

self._print_detail_view(pool_uuid, pool)
self._print_detail_view(pool_uuid, StoppedPool(pool))
108 changes: 102 additions & 6 deletions src/stratis_cli/_actions/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
Miscellaneous functions.
"""

# isort: STDLIB
import json
from uuid import UUID

from .._errors import (
StratisCliMissingClevisTangURLError,
StratisCliMissingClevisThumbprintError,
Expand Down Expand Up @@ -46,12 +50,6 @@ def __init__(self, pin, config):
self.pin = pin
self.config = config

def __str__(self): # pragma: no cover
config_string = " ".join(
f"{key}: {value}" for key, value in self.config.items()
)
return f"{self.pin} {config_string}"

@staticmethod
def get_info_from_namespace(namespace):
"""
Expand Down Expand Up @@ -89,3 +87,101 @@ def get_info_from_namespace(namespace):
) # pragma: no cover

return clevis_info


class EncryptionInfo:
"""
Generic information about a single encryption method.
"""

def __init__(self, info):
"""
Initializer.
;param info: info about an encryption method, as a dbus-python type
"""
(consistent, info) = info
if consistent:
(encrypted, base) = info
if encrypted:
self.base = base
else:
self.error = str(info)

def consistent(self):
"""
True if consistent, otherwise False.
"""
return not hasattr(self, "error")

def value(self):
"""
The base value if consistent and this encryption method used.
:rtype: str or NoneType
"""
return self.base if hasattr(self, "base") else None


class EncryptionInfoClevis(EncryptionInfo):
"""
Encryption info for Clevis
"""

def __init__(self, info):
super().__init__(info)

value = getattr(self, "base", None)
if value is not None:
(pin, config) = value # pylint: disable=unpacking-non-sequence
self.base = ClevisInfo(str(pin), json.loads(str(config)))


class EncryptionInfoKeyDescription(EncryptionInfo):
"""
Encryption info for kernel keyring
"""

def __init__(self, info):
super().__init__(info)

value = getattr(self, "base", None)
if value is not None:
self.base = str(value)


class Device: # pylint: disable=too-few-public-methods
"""
A representation of a device in a stopped pool.
"""

def __init__(self, mapping):
self.uuid = UUID(mapping["uuid"])
self.devnode = str(mapping["devnode"])


class StoppedPool: # pylint: disable=too-few-public-methods
"""
A representation of a single stopped pool.
"""

def __init__(self, pool_info):
"""
Initializer.
:param pool_info: a D-Bus structure
"""

self.devs = [Device(info) for info in pool_info["devs"]]

clevis_info = pool_info.get("clevis_info")
self.clevis_info = (
None if clevis_info is None else EncryptionInfoClevis(clevis_info)
)

key_description = pool_info.get("key_description")
self.key_description = (
None
if key_description is None
else EncryptionInfoKeyDescription(key_description)
)

name = pool_info.get("name")
self.name = None if name is None else str(name)

0 comments on commit 870cafe

Please sign in to comment.