From 65920dcd0a48d38f12bb340fe7d4930227857206 Mon Sep 17 00:00:00 2001 From: Ro Date: Tue, 11 Oct 2022 15:52:21 -0700 Subject: [PATCH] Apply black and linter. Improve test coverage. TimeoutException no longer inherits from ExportException. Small fix to export metadata and get_partitioned_devices. --- securedrop_export/archive.py | 80 +++---- securedrop_export/command.py | 4 +- securedrop_export/directory_util.py | 15 +- securedrop_export/disk/cli.py | 126 ++++++---- securedrop_export/disk/new_service.py | 17 +- securedrop_export/disk/new_status.py | 25 +- securedrop_export/disk/service.py | 62 +++-- securedrop_export/disk/status.py | 9 +- securedrop_export/disk/volume.py | 8 +- securedrop_export/exceptions.py | 4 +- securedrop_export/main.py | 162 +++++++------ securedrop_export/print/service.py | 73 +++--- securedrop_export/print/status.py | 6 +- securedrop_export/status.py | 2 + setup.py | 4 +- tests/disk/test_cli.py | 185 ++++++++++----- tests/disk/test_service.py | 82 ++++--- tests/disk/test_volume.py | 21 +- tests/print/test_service.py | 317 +++++++++++++++++++++++--- tests/test_archive.py | 79 ++++--- tests/test_directory_util.py | 55 +++-- tests/test_exceptions.py | 6 +- tests/test_main.py | 166 ++++++++++++-- 23 files changed, 1058 insertions(+), 450 deletions(-) diff --git a/securedrop_export/archive.py b/securedrop_export/archive.py index 0cbb982..2ec50b5 100755 --- a/securedrop_export/archive.py +++ b/securedrop_export/archive.py @@ -4,9 +4,6 @@ import json import logging import os -import shutil -import subprocess -import sys import tempfile from securedrop_export.exceptions import ExportException @@ -16,51 +13,26 @@ logger = logging.getLogger(__name__) + class Status(BaseStatus): ERROR_ARCHIVE_METADATA = "ERROR_ARCHIVE_METADATA" ERROR_METADATA_PARSING = "ERROR_METADATA_PARSING" ERROR_EXTRACTION = "ERROR_EXTRACTION" + class Metadata(object): """ Object to parse, validate and store json metadata from the sd-export archive. - - Create a Metadata object by using the `create_and_validate()` method to - ensure well-formed and valid metadata. """ METADATA_FILE = "metadata.json" SUPPORTED_ENCRYPTION_METHODS = ["luks"] - # Slightly underhanded way of ensuring that a Metadata object is not instantiated - # directly; instead, the create_and_validate() method is used - __key = object() - - - def __init__(self, key: object, archive_path: str): - if not key == Metadata.__key: - raise ValueError("Must use create_and_validate() to create Metadata object") - - # Initialize + def __init__(self, archive_path: str): self.metadata_path = os.path.join(archive_path, self.METADATA_FILE) - - @classmethod - def create_and_validate(cls, archive_path) -> 'Metadata': - """ - Create and validate metadata object. Raise ExportException for invalid metadata. - """ - md = Metadata(cls.__key, archive_path) - md.validate() - - return md - - - def validate(self): - """ - Validate Metadata. - Throw ExportException if invalid state is found. - """ + def validate(self) -> "Metadata": + # Read metadata json and set relevant attributes try: with open(self.metadata_path) as f: logger.info("Parsing archive metadata") @@ -69,7 +41,7 @@ def validate(self): self.encryption_method = json_config.get("encryption_method", None) self.encryption_key = json_config.get("encryption_key", None) logger.info( - "Exporting to device {} with encryption_method {}".format( + "Target: {}, encryption_method {}".format( self.export_method, self.encryption_method ) ) @@ -78,32 +50,52 @@ def validate(self): logger.error("Metadata parsing failure") raise ExportException(sdstatus=Status.ERROR_METADATA_PARSING) from ex - # Validate metadata - this will fail if command is not in list of supported commands - try: + # Validate action - fails if command is not in list of supported commands + try: + logger.debug("Validate export action") self.command = Command(self.export_method) - if self.command is Command.EXPORT and not self.encryption_method in self.SUPPORTED_ENCRYPTION_METHODS: + if ( + self.command is Command.EXPORT + and self.encryption_method not in self.SUPPORTED_ENCRYPTION_METHODS + ): logger.error("Unsupported encryption method") raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) except ValueError as v: - raise ExportException(sdstatus=Status.ERROR_METADATA_PARSING) from v + raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) from v + + return self class Archive(object): - def __init__(self, archive): + def __init__(self, archive_path: str): os.umask(0o077) - self.archive = archive - self.submission_dirname = os.path.basename(self.archive).split(".")[0] + self.archive = archive_path self.target_dirname = "sd-export-{}".format( datetime.datetime.now().strftime("%Y%m%d-%H%M%S") ) self.tmpdir = tempfile.mkdtemp() - def extract_tarball(self): + def extract_tarball(self) -> "Archive": + """ + Extract tarball, checking for path traversal, and return Archive object. + """ try: - logger.info("Extracting tarball {} into {}".format(self.archive, self.tmpdir)) + logger.info( + "Extracting tarball {} into {}".format(self.archive, self.tmpdir) + ) safe_extractall(self.archive, self.tmpdir) + return self except Exception as ex: logger.error("Unable to extract tarball: {}".format(ex)) raise ExportException(sdstatus=Status.ERROR_EXTRACTION) from ex - \ No newline at end of file + def set_metadata(self, metadata: Metadata) -> "Archive": + """ + Set relevant metadata attributes for a given archive. + """ + self.command = metadata.command + if self.command is Command.EXPORT: + # When we support multiple encryption types, we will also want to add the + # encryption_method here + self.encryption_key = metadata.encryption_key + return self diff --git a/securedrop_export/command.py b/securedrop_export/command.py index 382b4fe..06a3167 100644 --- a/securedrop_export/command.py +++ b/securedrop_export/command.py @@ -1,12 +1,14 @@ from enum import Enum + class Command(Enum): """ All supported commands. - Values are as supplied by the calling VM (sd-app), and a change in any values require + Values are as supplied by the calling VM (sd-app), and a change in any values requires corresponding changes in the calling VM. """ + PRINTER_PREFLIGHT = "printer-preflight" PRINTER_TEST = "printer-test" PRINT = "printer" diff --git a/securedrop_export/directory_util.py b/securedrop_export/directory_util.py index b460885..a2a866c 100644 --- a/securedrop_export/directory_util.py +++ b/securedrop_export/directory_util.py @@ -2,7 +2,6 @@ import tarfile from pathlib import Path from typing import Optional, Union -import subprocess import logging logger = logging.getLogger(__name__) @@ -105,7 +104,9 @@ def _check_path_traversal(filename_or_filepath: Union[str, Path]) -> None: if filename_or_filepath.is_absolute(): base_path = filename_or_filepath else: - base_path = Path.cwd() # use cwd so we can next ensure relative path does not traverse up + base_path = ( + Path.cwd() + ) # use cwd so we can next ensure relative path does not traverse up try: relative_path = relative_filepath(filename_or_filepath, base_path) @@ -114,7 +115,10 @@ def _check_path_traversal(filename_or_filepath: Union[str, Path]) -> None: # base, but can still have harmful side effects to the application. If this kind of # traversal is needed, then call relative_filepath instead in order to check that the # desired traversal does not go past a safe base directory. - if relative_path != filename_or_filepath and not filename_or_filepath.is_absolute(): + if ( + relative_path != filename_or_filepath + and not filename_or_filepath.is_absolute() + ): raise ValueError except ValueError: raise ValueError(f"Unsafe file or directory name: '{filename_or_filepath}'") @@ -147,5 +151,6 @@ def _check_dir_permissions(dir_path: Union[str, Path]) -> None: stat_res = os.stat(dir_path).st_mode masked = stat_res & 0o777 if masked & 0o077: - raise RuntimeError("Unsafe permissions ({}) on {}".format(oct(stat_res), dir_path)) - + raise RuntimeError( + "Unsafe permissions ({}) on {}".format(oct(stat_res), dir_path) + ) diff --git a/securedrop_export/disk/cli.py b/securedrop_export/disk/cli.py index 8f9dc6d..7abb33b 100644 --- a/securedrop_export/disk/cli.py +++ b/securedrop_export/disk/cli.py @@ -1,11 +1,6 @@ -import datetime -import json import logging import os -import shutil import subprocess -import tempfile -import sys from typing import List, Optional @@ -36,12 +31,18 @@ def get_connected_devices(self) -> List[str]: Raise ExportException if any commands fail. """ + logger.info("Checking connected volumes") try: lsblk = subprocess.Popen( - ["lsblk", "-o", "NAME,TYPE"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ["lsblk", "-o", "NAME,TYPE"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, ) grep = subprocess.Popen( - ["grep", "disk"], stdin=lsblk.stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ["grep", "disk"], + stdin=lsblk.stdout, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, ) command_output = grep.stdout.readlines() @@ -57,12 +58,14 @@ def _get_removable_devices(self, attached_devices: List[str]) -> List[str]: """ Determine which block devices are USBs by selecting those that are removable. """ + logger.info("Checking removable devices") usb_devices = [] for device in attached_devices: is_removable = False try: removable = subprocess.check_output( - ["cat", f"/sys/class/block/{device}/removable"], stderr=subprocess.PIPE + ["cat", f"/sys/class/block/{device}/removable"], + stderr=subprocess.PIPE, ) # 0 for non-removable device, 1 for removable @@ -75,6 +78,7 @@ def _get_removable_devices(self, attached_devices: List[str]) -> List[str]: if is_removable: usb_devices.append(f"/dev/{device}") + logger.info(f"{len(usb_devices)} connected") return usb_devices def get_partitioned_device(self, blkid: str) -> str: @@ -85,30 +89,40 @@ def get_partitioned_device(self, blkid: str) -> str: Raise ExportException if partition check fails or device has unsupported partition scheme (currently, multiple partitions are unsupported). """ - try: + device_and_partitions = self._check_partitions(blkid) - device_and_partitions = subprocess.check_output( - ["lsblk", "-o", "TYPE", "--noheadings", blkid], stderr=subprocess.PIPE + if device_and_partitions: + partition_count = ( + device_and_partitions.decode("utf-8").split("\n").count("part") ) + logger.debug(f"Counted {partition_count} partitions") + if partition_count > 1: + # We don't currently support devices with multiple partitions + logger.error( + f"Multiple partitions not supported ({partition_count} partitions" + f" on {blkid})" + ) + raise ExportException(sdstatus=Status.INVALID_DEVICE_DETECTED) - if device_and_partitions: - partition_count = device_and_partitions.decode("utf-8").split("\n").count("part") - if partition_count > 1: - # We don't currently support devices with multiple partitions - logger.error( - f"Multiple partitions not supported (found {partition_count} partitions on {blkid}" - ) - raise ExportException(sdstatus=Status.INVALID_DEVICE_DETECTED) + # redefine device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted + if partition_count == 1: + logger.debug("One partition found") + blkid += "1" - # redefine device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted - if partition_count == 1: - blkid += "1" + return blkid - return blkid + else: + # lsblk did not return output we could process + logger.error("Error checking device partitions") + raise ExportException(sdstatus=Status.DEVICE_ERROR) - else: - # lsblk did not return output we could process - raise ExportException(sdstatus=Status.DEVICE_ERROR) + def _check_partitions(self, blkid: str) -> str: + try: + logger.debug(f"Checking device partitions on {blkid}") + device_and_partitions = subprocess.check_output( + ["lsblk", "-o", "TYPE", "--noheadings", blkid], stderr=subprocess.PIPE + ) + return device_and_partitions except subprocess.CalledProcessError as ex: logger.error(f"Error checking block deivce {blkid}") @@ -122,7 +136,7 @@ def is_luks_volume(self, device: str) -> bool: isLuks = False try: - logger.debug(f"Checking if {device} is luks encrypted") + logger.debug("Checking if target device is luks encrypted") # cryptsetup isLuks returns 0 if the device is a luks volume # subprocess will throw if the device is not luks (rc !=0) @@ -130,9 +144,9 @@ def is_luks_volume(self, device: str) -> bool: isLuks = True - except subprocess.CalledProcessError as ex: + except subprocess.CalledProcessError: # Not necessarily an error state, just means the volume is not LUKS encrypted - logger.debug(f"{device} is not LUKS-encrypted") + logger.info("Target device is not LUKS-encrypted") return isLuks @@ -142,8 +156,11 @@ def _get_luks_name_from_headers(self, device: str) -> str: Raise ExportException if errors encounterd during attempt to parse LUKS headers. """ + logger.debug("Get LUKS name from headers") try: - luks_header = subprocess.check_output(["sudo", "cryptsetup", "luksDump", device]) + luks_header = subprocess.check_output( + ["sudo", "cryptsetup", "luksDump", device] + ) if luks_header: luks_header_list = luks_header.decode("utf-8").split("\n") for line in luks_header_list: @@ -151,11 +168,13 @@ def _get_luks_name_from_headers(self, device: str) -> str: if "UUID" in items[0]: return "luks-" + items[1] - # If no header or no UUID field, we can't use this drive - logger.error(f"Failed to get UUID from LUKS header; {device} may not be correctly formatted") + # If no header or no UUID field, we can't use this drive + logger.error( + f"Failed to get UUID from LUKS header; {device} may not be correctly formatted" + ) raise ExportException(sdstatus=Status.INVALID_DEVICE_DETECTED) except subprocess.CalledProcessError as ex: - logger.error(f"Failed to dump LUKS header") + logger.error("Failed to dump LUKS header") raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex def get_luks_volume(self, device: str) -> Volume: @@ -174,10 +193,13 @@ def get_luks_volume(self, device: str) -> Volume: """ try: mapped_name = self._get_luks_name_from_headers(device) + logger.debug(f"Mapped name is {mapped_name}") # Setting the mapped_name does not mean the device has already been unlocked. luks_volume = Volume( - device_name=device, mapped_name=mapped_name, encryption=EncryptionScheme.LUKS + device_name=device, + mapped_name=mapped_name, + encryption=EncryptionScheme.LUKS, ) # If the device has been unlocked, we can see if it's mounted and @@ -199,14 +221,20 @@ def unlock_luks_volume(self, volume: Volume, decryption_key: str) -> Volume: Raise ExportException if errors are encountered during device unlocking. """ - if not volume.encryption is EncryptionScheme.LUKS: + if volume.encryption is not EncryptionScheme.LUKS: logger.error("Must call unlock_luks_volume() on LUKS-encrypted device") raise ExportException(sdstatus=Status.DEVICE_ERROR) try: logger.debug("Unlocking luks volume {}".format(volume.device_name)) p = subprocess.Popen( - ["sudo", "cryptsetup", "luksOpen", volume.device_name, volume.mapped_name], + [ + "sudo", + "cryptsetup", + "luksOpen", + volume.device_name, + volume.mapped_name, + ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -217,7 +245,9 @@ def unlock_luks_volume(self, volume: Volume, decryption_key: str) -> Volume: if rc == 0: return Volume( - device_name=volume.device_name, mapped_name=volume.mapped_name, encryption=EncryptionScheme.LUKS + device_name=volume.device_name, + mapped_name=volume.mapped_name, + encryption=EncryptionScheme.LUKS, ) else: logger.error("Bad volume passphrase") @@ -231,6 +261,7 @@ def _get_mountpoint(self, volume: Volume) -> Optional[str]: Check for existing mountpoint. Raise ExportException if errors encountered during command. """ + logger.debug("Checking mountpoint") try: output = subprocess.check_output( ["lsblk", "-o", "MOUNTPOINT", "--noheadings", volume.device_name] @@ -251,22 +282,23 @@ def mount_volume(self, volume: Volume) -> Volume: Raise ExportException if errors are encountered during mounting. """ if not volume.unlocked: + logger.error("Volume is not unlocked.") raise ExportException(sdstatus=Status.ERROR_MOUNT) mountpoint = self._get_mountpoint(volume) if mountpoint: - logger.debug("The device is already mounted") + logger.info("The device is already mounted") if volume.mountpoint is not mountpoint: - logger.warning(f"Mountpoint was inaccurate, updating") + logger.warning("Mountpoint was inaccurate, updating") volume.mountpoint = mountpoint return volume else: + logger.info("Mount volume at default mountpoint") return self._mount_at_mountpoint(volume, self._DEFAULT_MOUNTPOINT) - def _mount_at_mountpoint(self, volume: Volume, mountpoint: str) -> Volume: """ Mount a volume at the supplied mountpoint, creating the mountpoint directory and @@ -283,10 +315,12 @@ def _mount_at_mountpoint(self, volume: Volume, mountpoint: str) -> Volume: raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex # Mount device /dev/mapper/{mapped_name} at /media/usb/ - mapped_device_path = os.path.join(volume.MAPPED_VOLUME_PREFIX, volume.mapped_name) + mapped_device_path = os.path.join( + volume.MAPPED_VOLUME_PREFIX, volume.mapped_name + ) try: - logger.debug(f"Mounting volume {volume.device_name} at {mountpoint}") + logger.info(f"Mounting volume at {mountpoint}") subprocess.check_call(["sudo", "mount", mapped_device_path, mountpoint]) subprocess.check_call(["sudo", "chown", "-R", "user:user", mountpoint]) @@ -311,10 +345,12 @@ def write_data_to_device( subprocess.check_call(["mkdir", target_path]) export_data = os.path.join(submission_tmpdir, "export_data/") - logger.info("Copying file to {}".format(submission_target_dirname)) + logger.debug("Copying file to {}".format(submission_target_dirname)) subprocess.check_call(["cp", "-r", export_data, target_path]) - logger.info("File copied successfully to {}".format(submission_target_dirname)) + logger.info( + "File copied successfully to {}".format(submission_target_dirname) + ) except (subprocess.CalledProcessError, OSError) as ex: raise ExportException(sdstatus=Status.ERROR_EXPORT) from ex @@ -330,7 +366,7 @@ def cleanup_drive_and_tmpdir(self, volume: Volume, submission_tmpdir: str): Raise ExportException if errors during cleanup are encoutered. """ - logger.info("Syncing filesystems") + logger.debug("Syncing filesystems") try: subprocess.check_call(["sync"]) umounted = self._unmount_volume(volume) diff --git a/securedrop_export/disk/new_service.py b/securedrop_export/disk/new_service.py index 1084f75..b5702a4 100644 --- a/securedrop_export/disk/new_service.py +++ b/securedrop_export/disk/new_service.py @@ -1,17 +1,10 @@ import logging -import os -import subprocess -import sys - -from enum import Enum - -from typing import List from securedrop_export.archive import Archive from .cli import CLI from .status import Status -from .volume import EncryptionScheme, Volume +from .volume import Volume from securedrop_export.exceptions import ExportException @@ -29,7 +22,7 @@ def __init__(self, cli: CLI): def run(self, arg: str) -> Status: """ - Run export actions. + Run export actions. """ def scan_all_devices(self) -> Status: @@ -46,13 +39,13 @@ def scan_all_devices(self) -> Status: elif number_devices > 1: return Status.MULTI_DEVICE_DETECTED else: - return scan_single_device(all_devices[0]) + return self.scan_single_device(all_devices[0]) - except ExportException: + except ExportException as ex: logger.error(ex) return Status.DEVICE_ERROR # Could not assess devices - def scan_single_device(self, str: blkid) -> Status: + def scan_single_device(self, blkid: str) -> Status: """ Given a string representing a single block device, see if it is a suitable export target and return information about its state. diff --git a/securedrop_export/disk/new_status.py b/securedrop_export/disk/new_status.py index 2bb0c24..285d9f8 100644 --- a/securedrop_export/disk/new_status.py +++ b/securedrop_export/disk/new_status.py @@ -1,20 +1,29 @@ from securedrop_export.status import BaseStatus + class Status(BaseStatus): NO_DEVICE_DETECTED = "NO_DEVICE_DETECTED" - INVALID_DEVICE_DETECTED = "INVALID_DEVICE_DETECTED" # Multi partitioned, not encrypted, etc - MULTI_DEVICE_DETECTED = "MULTI_DEVICE_DETECTED" # Not currently supported + INVALID_DEVICE_DETECTED = ( + "INVALID_DEVICE_DETECTED" # Multi partitioned, not encrypted, etc + ) + MULTI_DEVICE_DETECTED = "MULTI_DEVICE_DETECTED" # Not currently supported - DEVICE_LOCKED = "DEVICE_LOCKED" # One device detected, and it's locked - DEVICE_WRITABLE = "DEVICE_WRITABLE" # One device detected, and it's unlocked (and mounted) + DEVICE_LOCKED = "DEVICE_LOCKED" # One device detected, and it's locked + DEVICE_WRITABLE = ( + "DEVICE_WRITABLE" # One device detected, and it's unlocked (and mounted) + ) ERROR_UNLOCK_LUKS = "ERROR_UNLOCK_LUKS" ERROR_UNLOCK_GENERIC = "ERROR_UNLOCK_GENERIC" - ERROR_MOUNT = "ERROR_MOUNT" # Unlocked but not mounted + ERROR_MOUNT = "ERROR_MOUNT" # Unlocked but not mounted SUCCESS_EXPORT = "SUCCESS_EXPORT" - ERROR_EXPORT = "ERROR_EXPORT" # Could not write to disk - ERROR_EXPORT_CLEANUP = "ERROR_EXPORT_CLEANUP" # If export succeeds but drives were not properly unmounted + ERROR_EXPORT = "ERROR_EXPORT" # Could not write to disk + + # export succeeds but drives were not properly unmounted + ERROR_EXPORT_CLEANUP = "ERROR_EXPORT_CLEANUP" - DEVICE_ERROR = "DEVICE_ERROR" # Something went wrong while trying to check the device + DEVICE_ERROR = ( + "DEVICE_ERROR" # Something went wrong while trying to check the device + ) diff --git a/securedrop_export/disk/service.py b/securedrop_export/disk/service.py index adabf4e..e87386a 100644 --- a/securedrop_export/disk/service.py +++ b/securedrop_export/disk/service.py @@ -1,11 +1,5 @@ import logging -import os -import subprocess -import sys -from typing import List - -from securedrop_export.archive import Archive from securedrop_export.exceptions import ExportException from .cli import CLI @@ -16,7 +10,6 @@ class Service: - def __init__(self, submission, cli=None): self.submission = submission self.cli = cli or CLI() @@ -32,6 +25,7 @@ def check_connected_devices(self) -> Status: num_devices = len(all_devices) except ExportException as ex: + logger.error(f"Error encountered during USB check: {ex.sdstatus.value}") # Use legacy status instead of new status values raise ExportException(sdstatus=Status.LEGACY_ERROR_USB_CHECK) from ex @@ -42,7 +36,6 @@ def check_connected_devices(self) -> Status: elif num_devices > 1: raise ExportException(sdstatus=Status.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED) - def check_disk_format(self) -> Status: """ Check if volume is correctly formatted for export. @@ -51,15 +44,21 @@ def check_disk_format(self) -> Status: all_devices = self.cli.get_connected_devices() if len(all_devices) == 1: - device = self.cli.get_partitioned_device(all_devices) + device = self.cli.get_partitioned_device(all_devices[0]) + logger.info("Check if LUKS") if not self.cli.is_luks_volume(device): - raise ExportException(sdstatus=Status.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED) + raise ExportException( + sdstatus=Status.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + ) # We can support checking if a drive is already unlocked, but for # backwards compatibility, this is the only expected status - # at this stage + # at this stage return Status.LEGACY_USB_ENCRYPTED except ExportException as ex: + logger.error( + f"Error encountered during disk format check: {ex.sdstatus.value}" + ) # Return legacy status values for now for ongoing client compatibility if ex.sdstatus in [s for s in NewStatus]: status = self._legacy_status(ex.sdstatus) @@ -67,10 +66,9 @@ def check_disk_format(self) -> Status: elif ex.sdstatus: raise else: - raise ExportException(sdstatus=Status.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED) - + raise ExportException(sdstatus=Status.LEGACY_USB_DISK_ERROR) - def export(self) -> Status: + def export(self): """ Export all files to target device. """ @@ -83,27 +81,38 @@ def export(self) -> Status: device = self.cli.get_partitioned_device(all_devices[0]) # Decide what kind of volume it is + logger.info("Check if LUKS") if self.cli.is_luks_volume(device): volume = self.cli.get_luks_volume(device) + logger.info("Check if writable") if not volume.writable: - unlocked = self.cli.unlock_luks_volume( - volume, self.submission.archive_metadata.encryption_key + logger.info("Not writable-will try unlocking") + volume = self.cli.unlock_luks_volume( + volume, self.submission.encryption_key ) - mounted = self.cli.mount_volume(unlocked) + volume = self.cli.mount_volume(volume) - logger.debug(f"Export submission to {mounted.mountpoint}") - self.cli.write_data_to_device(self.submission.tmpdir, self.submission.target_dirname, mounted) - return Status.SUCCESS_EXPORT + logger.info(f"Export submission to {volume.mountpoint}") + self.cli.write_data_to_device( + self.submission.tmpdir, self.submission.target_dirname, volume + ) + # This is SUCCESS_EXPORT, but the 0.7.0 client is not expecting + # a return status from a successful export operation. + # When the client is updated, we will return SUCCESS_EXPORT here. else: # Another kind of drive: VeraCrypt/TC, or unsupported. # For now this is an error--in future there will be support # for additional encryption formats logger.error(f"Export failed because {device} is not supported") - raise ExportException(sdstatus=Status.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED) + raise ExportException( + sdstatus=Status.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + ) except ExportException as ex: - print(ex) + logger.error( + f"Error encountered during disk format check: {ex.sdstatus.value}" + ) # Return legacy status values for now for ongoing client compatibility if ex.sdstatus in [s for s in NewStatus]: status = self._legacy_status(ex.sdstatus) @@ -113,20 +122,23 @@ def export(self) -> Status: else: raise ExportException(sdstatus=Status.LEGACY_ERROR_GENERIC) - def _legacy_status(self, status: NewStatus) -> Status: """ Backwards-compatibility - status values that client (@0.7.0) is expecting. """ + logger.info(f"Convert to legacy: {status.value}") if status is NewStatus.ERROR_MOUNT: return Status.LEGACY_ERROR_USB_MOUNT elif status in [NewStatus.ERROR_EXPORT, NewStatus.ERROR_EXPORT_CLEANUP]: return Status.LEGACY_ERROR_USB_WRITE elif status in [NewStatus.ERROR_UNLOCK_LUKS, NewStatus.ERROR_UNLOCK_GENERIC]: return Status.LEGACY_USB_BAD_PASSPHRASE - elif status in [NewStatus.INVALID_DEVICE_DETECTED, NewStatus.MULTI_DEVICE_DETECTED]: + elif status in [ + NewStatus.INVALID_DEVICE_DETECTED, + NewStatus.MULTI_DEVICE_DETECTED, + ]: return Status.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED # The other status values, such as Status.NO_DEVICE_DETECTED, are not returned by the # CLI, so we don't need to check for them here else: - return Status.LEGACY_ERROR_GENERIC \ No newline at end of file + return Status.LEGACY_ERROR_GENERIC diff --git a/securedrop_export/disk/status.py b/securedrop_export/disk/status.py index 4a3aa88..fa0bdf8 100644 --- a/securedrop_export/disk/status.py +++ b/securedrop_export/disk/status.py @@ -1,19 +1,20 @@ from securedrop_export.status import BaseStatus + class Status(BaseStatus): LEGACY_ERROR_GENERIC = "ERROR_GENERIC" # Legacy USB preflight related - LEGACY_USB_CONNECTED = "USB_CONNECTED" # Success + LEGACY_USB_CONNECTED = "USB_CONNECTED" # Success LEGACY_USB_NOT_CONNECTED = "USB_NOT_CONNECTED" LEGACY_ERROR_USB_CHECK = "ERROR_USB_CHECK" # Legacy USB Disk preflight related errors - LEGACY_USB_ENCRYPTED = "USB_ENCRYPTED" # Success + LEGACY_USB_ENCRYPTED = "USB_ENCRYPTED" # Success LEGACY_USB_ENCRYPTION_NOT_SUPPORTED = "USB_ENCRYPTION_NOT_SUPPORTED" - #@todo - this can be raised during disk format check + # Can be raised during disk format check LEGACY_USB_DISK_ERROR = "USB_DISK_ERROR" # Legacy Disk export errors @@ -22,4 +23,4 @@ class Status(BaseStatus): LEGACY_ERROR_USB_WRITE = "ERROR_USB_WRITE" # New - SUCCESS_EXPORT = "SUCCESS_EXPORT" \ No newline at end of file + SUCCESS_EXPORT = "SUCCESS_EXPORT" diff --git a/securedrop_export/disk/volume.py b/securedrop_export/disk/volume.py index a3049e1..c6bc2f8 100644 --- a/securedrop_export/disk/volume.py +++ b/securedrop_export/disk/volume.py @@ -17,8 +17,8 @@ class Volume: """ A volume on a removable device. - Volumes have a device name ("/dev/sdX"), a mapped name ("/dev/mapper/xxx"), an encryption scheme, - and a mountpoint if they are mounted. + Volumes have a device name ("/dev/sdX"), a mapped name ("/dev/mapper/xxx"), an encryption + scheme, and a mountpoint if they are mounted. """ def __init__( @@ -53,5 +53,7 @@ def unlocked(self) -> bool: return ( self.mapped_name is not None and self.encryption is not EncryptionScheme.UNKNOWN - and os.path.exists(os.path.join(self.MAPPED_VOLUME_PREFIX, self.mapped_name)) + and os.path.exists( + os.path.join(self.MAPPED_VOLUME_PREFIX, self.mapped_name) + ) ) diff --git a/securedrop_export/exceptions.py b/securedrop_export/exceptions.py index d740fc3..78c0519 100644 --- a/securedrop_export/exceptions.py +++ b/securedrop_export/exceptions.py @@ -1,5 +1,4 @@ import logging -from typing import Optional logger = logging.getLogger(__name__) @@ -16,7 +15,8 @@ def __init__(self, *args, **kwargs): self.sdstatus = kwargs.get("sdstatus") self.sderror = kwargs.get("sderror") -class TimeoutException(ExportException): + +class TimeoutException(Exception): pass diff --git a/securedrop_export/main.py b/securedrop_export/main.py index 4081736..dca8e3f 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -3,12 +3,12 @@ import platform import logging import sys -import subprocess from securedrop_export.archive import Archive, Metadata from securedrop_export.command import Command from securedrop_export.status import BaseStatus from securedrop_export.directory_util import safe_mkdir +from securedrop_export.exceptions import ExportException from securedrop_export.disk.service import Service as ExportService from securedrop_export.print.service import Service as PrintService @@ -22,139 +22,136 @@ logger = logging.getLogger(__name__) + class Status(BaseStatus): """ Status values that can occur during initialization. """ + ERROR_LOGGING = "ERROR_LOGGING" ERROR_GENERIC = "ERROR_GENERIC" ERROR_FILE_NOT_FOUND = "ERROR_FILE_NOT_FOUND" + def entrypoint(): """ Entrypoint method (Note: a method is required for setuptools). Configure logging, extract tarball, and run desired export service, exiting with return code 0. """ - try: - _configure_logging() - except Exception: - _exit_gracefully(submission=None, status=Status.ERROR_LOGGING) - - logger.info("Starting SecureDrop Export {}".format(__version__)) - data = Archive(sys.argv[1]) + status, stacktrace, submission = None, None, None try: - # Halt immediately if target file is absent - if not os.path.exists(data.archive): - logger.info("Archive is not found {}.".format(data.archive)) - _exit_gracefully(data, Status.ERROR_FILE_NOT_FOUND) - - # Extract archive and either print or export to disk. - # Includes cleanup logic, which removes any temporary directories associated with - # the archive. - _extract_and_run(data) - - except Exception as e: - _exit_gracefully(data, Status.ERROR_GENERIC, e.output) - - -def _configure_logging(): - """ - All logging related settings are set up by this function. - """ - safe_mkdir(DEFAULT_HOME) - safe_mkdir(DEFAULT_HOME, LOG_DIR_NAME) + _configure_logging() + logger.info("Starting SecureDrop Export {}".format(__version__)) - log_file = os.path.join(DEFAULT_HOME, LOG_DIR_NAME, EXPORT_LOG_FILENAME) + data_path = sys.argv[1] - # set logging format - log_fmt = "%(asctime)s - %(name)s:%(lineno)d(%(funcName)s) " "%(levelname)s: %(message)s" - formatter = logging.Formatter(log_fmt) + # Halt if target file is absent + if not os.path.exists(data_path): + logger.info("Archive is not found {}.".format(data_path)) + status = Status.ERROR_FILE_NOT_FOUND - handler = TimedRotatingFileHandler(log_file) - handler.setFormatter(formatter) + else: + logger.debug("Extract tarball") + submission = Archive(data_path).extract_tarball() + logger.debug("Validate metadata") + metadata = Metadata(submission.tmpdir).validate() + logger.info("Archive extraction and metadata validation successful") + + # If all we're doing is starting the vm, we're done; otherwise, + # run the appropriate print or export routine + if metadata.command is not Command.START_VM: + submission.set_metadata(metadata) + logger.info(f"Start {metadata.command.value} service") + status = _start_service(submission) - # For rsyslog handler - if platform.system() != "Linux": # pragma: no cover - syslog_file = "/var/run/syslog" - else: - syslog_file = "/dev/log" + except ExportException as ex: + logger.error(f"Encountered exception {ex.sdstatus.value}, exiting") + status = ex.sdstatus + stacktrace = ex.output - sysloghandler = SysLogHandler(address=syslog_file) - sysloghandler.setFormatter(formatter) - handler.setLevel(logging.DEBUG) + except Exception as exc: + logger.error("Encountered exception during export, exiting") + status = Status.ERROR_GENERIC + stacktrace = exc.output - # set up primary log - log = logging.getLogger() - log.setLevel(logging.DEBUG) - log.addHandler(handler) - # add the second logger - log.addHandler(sysloghandler) + finally: + _exit_gracefully(submission, status=status, e=stacktrace) -def _extract_and_run(submission: Archive): +def _configure_logging(): """ - Extract tarball and metadata and run appropriate command based on metadata instruction. - Always exits with return code 0 and writes exit status, if applicable, to stderr. + All logging related settings are set up by this function. """ - status = None - stacktrace = None - try: - submission.extract_tarball() + safe_mkdir(DEFAULT_HOME) + safe_mkdir(DEFAULT_HOME, LOG_DIR_NAME) - # Validates metadata and ensures requested action is supported - submission.archive_metadata = Metadata.create_and_validate(submission.tmpdir) + log_file = os.path.join(DEFAULT_HOME, LOG_DIR_NAME, EXPORT_LOG_FILENAME) - # If we just wanted to start the VM, our work here is done - if submission.archive_metadata.command is Command.START_VM: - _exit_gracefully(submission) - else: - status = _start_service(submission, command) + # set logging format + log_fmt = ( + "%(asctime)s - %(name)s:%(lineno)d(%(funcName)s) " + "%(levelname)s: %(message)s" + ) + formatter = logging.Formatter(log_fmt) - except ExportException as ex: - status = ex.value.sdstatus - stacktrace = ex.output + handler = TimedRotatingFileHandler(log_file) + handler.setFormatter(formatter) - except Exception as exc: - # All exceptions are wrapped in ExportException, but we are being cautious - logger.error("Encountered exception during export, exiting") - status = Status.ERROR_GENERIC - stacktrace = exc.output - - finally: - _exit_gracefully(submission, status, stacktrace) + # For rsyslog handler + if platform.system() != "Linux": # pragma: no cover + syslog_file = "/var/run/syslog" + else: + syslog_file = "/dev/log" + + sysloghandler = SysLogHandler(address=syslog_file) + sysloghandler.setFormatter(formatter) + handler.setLevel(logging.DEBUG) + + # set up primary log + log = logging.getLogger() + log.setLevel(logging.DEBUG) + log.addHandler(handler) + # add the second logger + log.addHandler(sysloghandler) + except Exception as ex: + raise ExportException(sdstatus=Status.ERROR_LOGGING) from ex -def _start_service(submission: Archive, cmd: Command) -> Status: +def _start_service(submission: Archive) -> Status: """ Start print or export service. """ # Print Routines - if cmd is Commmand.PRINTER: + if submission.command is Command.PRINT: return PrintService(submission).print() - elif cmd is Commmand.PRINTER_TEST: + elif submission.command is Command.PRINTER_PREFLIGHT: return PrintService(submission).printer_preflight() - elif cmd is Commmand.PRINTER_TEST: + elif submission.command is Command.PRINTER_TEST: return PrintService(submission).printer_test() # Export routines - elif cmd is Commmand.EXPORT: + elif submission.command is Command.EXPORT: return ExportService(submission).export() - elif cmd is Commmand.CHECK_USBS: + elif submission.command is Command.CHECK_USBS: return ExportService(submission).check_connected_devices() - elif cmd is Commmand.CHECK_VOLUME: + elif submission.command is Command.CHECK_VOLUME: return ExportService(submission).check_disk_format() -def _exit_gracefully(submission: Archive, status: Status=None, e=None): + +def _exit_gracefully(submission: Archive, status: BaseStatus = None, e: str = None): """ Utility to print error messages, mostly used during debugging, then exits successfully despite the error. Always exits 0, since non-zero exit values will cause system to try alternative solutions for mimetype handling, which we want to avoid. """ - logger.info(f"Exiting with status: {status.value}") + if status: + logger.info(f"Exit gracefully with status: {status.value}") + else: + logger.info("Exit gracefully (no status code supplied)") if e: logger.error("Captured exception output: {}".format(e.output)) try: @@ -178,6 +175,7 @@ def _write_status(status: BaseStatus): Write string to stderr. """ if status: + logger.info(f"Write status {status.value}") sys.stderr.write(status.value) sys.stderr.write("\n") else: diff --git a/securedrop_export/print/service.py b/securedrop_export/print/service.py index a8e22d2..4cfad87 100644 --- a/securedrop_export/print/service.py +++ b/securedrop_export/print/service.py @@ -7,13 +7,6 @@ from securedrop_export.exceptions import handler, TimeoutException, ExportException from .status import Status -PRINTER_NAME = "sdw-printer" -PRINTER_WAIT_TIMEOUT = 60 -BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" -BRLASER_PPD = "/usr/share/cups/model/br7030.ppd" -LASERJET_DRIVER = "/usr/share/cups/drv/hpcups.drv" -LASERJET_PPD = "/usr/share/cups/model/hp-laserjet_6l.ppd" - logger = logging.getLogger(__name__) @@ -22,12 +15,19 @@ class Service: Printer service """ + PRINTER_NAME = "sdw-printer" + PRINTER_WAIT_TIMEOUT = 60 + BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" + BRLASER_PPD = "/usr/share/cups/model/br7030.ppd" + LASERJET_DRIVER = "/usr/share/cups/drv/hpcups.drv" + LASERJET_PPD = "/usr/share/cups/model/hp-laserjet_6l.ppd" + def __init__(self, submission): self.submission = submission - self.printer_name = PRINTER_NAME - self.printer_wait_timeout = PRINTER_WAIT_TIMEOUT + self.printer_name = self.PRINTER_NAME + self.printer_wait_timeout = self.PRINTER_WAIT_TIMEOUT - def print(self) -> Status: + def print(self): """ Routine to print all files. Throws ExportException if an error is encountered. @@ -35,40 +35,49 @@ def print(self) -> Status: logger.info("Printing all files from archive") self._check_printer_setup() self._print_all_files() - return Status.PRINT_SUCCESS + # When client can accept new print statuses, we will return + # a success status here + # return Status.PRINT_SUCCESS - def printer_preflight(self) -> Status: + def printer_preflight(self): """ Routine to perform preflight printer testing. Throws ExportException if an error is encoutered. """ - logger.info("Running printer preflight") + logger.info("Running printer preflight") self._check_printer_setup() - return Status.PREFLIGHT_SUCCESS + # When client can accept new print statuses, we will return + # a success status here + # return Status.PREFLIGHT_SUCCESS - def printer_test(self) -> Status: + def printer_test(self): """ Routine to print a test page. Throws ExportException if an error is encountered. """ - logger.info("Printing test page") + logger.info("Printing test page") self._check_printer_setup() self._print_test_page() - return Status.PRINT_SUCCESS + # When client can accept new print statuses, we will return + # a success status here + # return Status.TEST_SUCCESS def _wait_for_print(self): """ Use lpstat to ensure the job was fully transfered to the printer Return True if print was successful, otherwise throw ExportException. + Currently, the handler `handler` is defined in `exceptions.py`. """ signal.signal(signal.SIGALRM, handler) signal.alarm(self.printer_wait_timeout) printer_idle_string = "printer {} is idle".format(self.printer_name) while True: try: - logger.info("Running lpstat waiting for printer {}".format(self.printer_name)) + logger.info( + "Running lpstat waiting for printer {}".format(self.printer_name) + ) output = subprocess.check_output(["lpstat", "-p", self.printer_name]) if printer_idle_string in output.decode("utf-8"): logger.info("Print completed") @@ -108,10 +117,10 @@ def _check_printer_setup(self) -> None: printer_uri = printers[0] printer_ppd = self._install_printer_ppd(printer_uri) - self.setup_printer(printer_uri, printer_ppd) + self._setup_printer(printer_uri, printer_ppd) except subprocess.CalledProcessError as e: logger.error(e) - raise ExportException(sdstatus=Status.ERROR_GENERIC) + raise ExportException(sdstatus=Status.ERROR_UNKNOWN) def _get_printer_uri(self) -> str: """ @@ -123,6 +132,7 @@ def _get_printer_uri(self) -> str: try: output = subprocess.check_output(["sudo", "lpinfo", "-v"]) except subprocess.CalledProcessError: + logger.error("Error attempting to retrieve printer uri with lpinfo") raise ExportException(sdstatus=Status.ERROR_PRINTER_URI) # fetch the usb printer uri @@ -146,16 +156,17 @@ def _get_printer_uri(self) -> str: def _install_printer_ppd(self, uri): if not any(x in uri for x in ("Brother", "LaserJet")): - logger.error("Cannot install printer ppd for unsupported printer: {}".format(uri)) + logger.error( + "Cannot install printer ppd for unsupported printer: {}".format(uri) + ) raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_SUPPORTED) - return if "Brother" in uri: - printer_driver = BRLASER_DRIVER - printer_ppd = BRLASER_PPD + printer_driver = self.BRLASER_DRIVER + printer_ppd = self.BRLASER_PPD elif "LaserJet" in uri: - printer_driver = LASERJET_DRIVER - printer_ppd = LASERJET_PPD + printer_driver = self.LASERJET_DRIVER + printer_ppd = self.LASERJET_PPD # Compile and install drivers that are not already installed if not os.path.exists(printer_ppd): @@ -177,7 +188,7 @@ def _install_printer_ppd(self, uri): def _setup_printer(self, printer_uri, printer_ppd): # Add the printer using lpadmin logger.info("Setting up printer {}".format(self.printer_name)) - safe_check_call( + self.safe_check_call( command=[ "sudo", "lpadmin", @@ -235,7 +246,7 @@ def _print_file(self, file_to_print): folder = os.path.dirname(file_to_print) converted_filename = file_to_print + ".pdf" converted_path = os.path.join(folder, converted_filename) - safe_check_call( + self.safe_check_call( command=["unoconv", "-o", converted_path, file_to_print], error_status=Status.ERROR_PRINT, ) @@ -248,8 +259,9 @@ def _print_file(self, file_to_print): error_status=Status.ERROR_PRINT, ) - - def safe_check_call(command: str, error_status: Status, ignore_stderr_startswith=None): + def safe_check_call( + self, command: str, error_status: Status, ignore_stderr_startswith=None + ): """ Wrap subprocess.check_output to ensure we wrap CalledProcessError and return our own exception, and log the error messages. @@ -267,4 +279,3 @@ def safe_check_call(command: str, error_status: Status, ignore_stderr_startswith raise ExportException(sdstatus=error_status, sderror=err) except subprocess.CalledProcessError as ex: raise ExportException(sdstatus=error_status, sderror=ex.output) - diff --git a/securedrop_export/print/status.py b/securedrop_export/print/status.py index fef0dbd..5ec81c8 100644 --- a/securedrop_export/print/status.py +++ b/securedrop_export/print/status.py @@ -1,5 +1,6 @@ from securedrop_export.status import BaseStatus + class Status(BaseStatus): # Printer preflight related errors @@ -8,11 +9,14 @@ class Status(BaseStatus): ERROR_PRINTER_NOT_SUPPORTED = "ERROR_PRINTER_NOT_SUPPORTED" ERROR_PRINTER_DRIVER_UNAVAILABLE = "ERROR_PRINTER_DRIVER_UNAVAILABLE" ERROR_PRINTER_INSTALL = "ERROR_PRINTER_INSTALL" + ERROR_PRINTER_URI = "ERROR_PRINTER_URI" # new - # Printer export errors + # Print error ERROR_PRINT = "ERROR_PRINT" # New PREFLIGHT_SUCCESS = "PRINTER_PREFLIGHT_SUCCESS" TEST_SUCCESS = "PRINTER_TEST_SUCCESS" PRINT_SUCCESS = "PRINTER_SUCCESS" + + ERROR_UNKNOWN = "ERROR_GENERIC" # Unknown printer error, backwards-compatible diff --git a/securedrop_export/status.py b/securedrop_export/status.py index 29b3045..bc3d29d 100644 --- a/securedrop_export/status.py +++ b/securedrop_export/status.py @@ -1,5 +1,6 @@ from enum import Enum + class BaseStatus(Enum): """ Base class for export and print statuses. A Status represents a string that can be returned @@ -7,4 +8,5 @@ class BaseStatus(Enum): Status values are defined in subclasses in their respective packages. A full list is available in the project's README. """ + pass diff --git a/setup.py b/setup.py index 485a880..b04979d 100644 --- a/setup.py +++ b/setup.py @@ -31,5 +31,7 @@ "Intended Audience :: Developers", "Operating System :: OS Independent", ), - entry_points={"console_scripts": ["send-to-usb = securedrop_export.main:entrypoint"]}, + entry_points={ + "console_scripts": ["send-to-usb = securedrop_export.main:entrypoint"] + }, ) diff --git a/tests/disk/test_cli.py b/tests/disk/test_cli.py index 55d79a4..d174dc4 100644 --- a/tests/disk/test_cli.py +++ b/tests/disk/test_cli.py @@ -1,10 +1,7 @@ import pytest from unittest import mock -import os -import pytest import subprocess -import sys from securedrop_export.disk.cli import CLI from securedrop_export.disk.volume import EncryptionScheme, Volume @@ -32,6 +29,7 @@ class TestCli: Test the CLI wrapper that handless identification and locking/unlocking of USB volumes. """ + @classmethod def setup_class(cls): cls.cli = CLI() @@ -49,7 +47,8 @@ def _setup_usb_devices(self, mocker, disks, is_removable): Parameters: disks (byte array): Array of disk names separated by newline. - is_removable (byte array): Array of removable status results (1 for removable) separated by newline + is_removable (byte array): Array of removable status results (1 for removable), + separated by newline """ # Patch commandline calls to `lsblk | grep disk` @@ -58,7 +57,7 @@ def _setup_usb_devices(self, mocker, disks, is_removable): command_output.stdout.readlines = mock.MagicMock(return_value=disks) mocker.patch("subprocess.Popen", return_value=command_output) - # Pactch commandline call to 'cat /sys/class/block/{device}/removable' + # Patch commandline call to 'cat /sys/class/block/{device}/removable' # Using side_effect with an iterable allows for different return value each time, # which matches what would happen if iterating through list of devices @@ -73,7 +72,10 @@ def test_get_connected_devices(self, mocker): assert result[0] == "/dev/sda" and result[1] == "/dev/sdb" - @mock.patch("subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "check_output")) + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) def test_get_removable_devices_none_removable(self, mocker): disks = [b"sda disk\n", b"sdb disk\n"] removable = [b"0\n", b"0\n"] @@ -83,7 +85,9 @@ def test_get_removable_devices_none_removable(self, mocker): result = self.cli._get_removable_devices(disks) assert len(result) == 0 - @mock.patch("subprocess.Popen", side_effect=subprocess.CalledProcessError(1, "Popen")) + @mock.patch( + "subprocess.Popen", side_effect=subprocess.CalledProcessError(1, "Popen") + ) def test_get_connected_devices_error(self, mocked_subprocess): with pytest.raises(ExportException): @@ -91,15 +95,20 @@ def test_get_connected_devices_error(self, mocked_subprocess): @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_NO_PART) def test_get_partitioned_device_no_partition(self, mocked_call): - assert self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) == _DEFAULT_USB_DEVICE + assert ( + self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) == _DEFAULT_USB_DEVICE + ) @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_ONE_PART) def test_get_partitioned_device_one_partition(self, mocked_call): - assert self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) == _DEFAULT_USB_DEVICE+"1" + assert ( + self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) + == _DEFAULT_USB_DEVICE + "1" + ) @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_MULTI_PART) def test_get_partitioned_device_multi_partition(self, mocked_call): - + with pytest.raises(ExportException) as ex: self.cli.get_partitioned_device(_SAMPLE_OUTPUT_MULTI_PART) @@ -113,10 +122,11 @@ def test_get_partitioned_device_lsblk_error(self, mocked_subprocess): assert ex.value.sdstatus is Status.DEVICE_ERROR @mock.patch( - "subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "check_output") + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), ) def test_get_partitioned_device_multi_partition_error(self, mocked_call): - + # Make sure we wrap CalledProcessError and throw our own exception with pytest.raises(ExportException) as ex: self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) @@ -125,11 +135,14 @@ def test_get_partitioned_device_multi_partition_error(self, mocked_call): @mock.patch("subprocess.check_call", return_value=0) def test_is_luks_volume_true(self, mocked_call): - + # `sudo cryptsetup isLuks` returns 0 if true assert self.cli.is_luks_volume(_SAMPLE_OUTPUT_ONE_PART) - @mock.patch("subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call")) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) def test_is_luks_volume_false(self, mocked_subprocess): # `sudo cryptsetup isLuks` returns 1 if false; CalledProcessError is thrown @@ -139,10 +152,14 @@ def test_is_luks_volume_false(self, mocked_subprocess): def test__get_luks_name_from_headers(self, mocked_subprocess): result = self.cli._get_luks_name_from_headers(_DEFAULT_USB_DEVICE) - assert result is not None and result.split("-")[1] in _SAMPLE_LUKS_HEADER.decode("utf8") + assert result is not None and result.split("-")[ + 1 + ] in _SAMPLE_LUKS_HEADER.decode("utf8") - @mock.patch("subprocess.check_output", return_value=b"corrupted-or-invalid-header\n") - def test__get_luks_name_from_headers_error(self, mocked_subprocess): + @mock.patch( + "subprocess.check_output", return_value=b"corrupted-or-invalid-header\n" + ) + def test__get_luks_name_from_headers_error_invalid(self, mocked_subprocess): with pytest.raises(ExportException) as ex: self.cli._get_luks_name_from_headers(_DEFAULT_USB_DEVICE) @@ -158,7 +175,9 @@ def test__get_luks_name_from_headers_error_no_header(self, mocked_subprocess): assert ex.value.sdstatus is Status.INVALID_DEVICE_DETECTED @mock.patch("subprocess.check_output", return_value=None) - def test__get_luks_name_from_headers_error_nothing_returned(self, mocked_subprocess): + def test__get_luks_name_from_headers_error_nothing_returned( + self, mocked_subprocess + ): with pytest.raises(ExportException) as ex: self.cli._get_luks_name_from_headers(_DEFAULT_USB_DEVICE) @@ -166,7 +185,8 @@ def test__get_luks_name_from_headers_error_nothing_returned(self, mocked_subproc assert ex.value.sdstatus is Status.INVALID_DEVICE_DETECTED @mock.patch( - "subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "check_output") + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), ) def test__get_luks_name_from_headers_error(self, mocked_subprocess): with pytest.raises(ExportException): @@ -188,7 +208,10 @@ def test_get_luks_volume_still_locked(self, mocked_subprocess, mocked_os_call): assert result.encryption is EncryptionScheme.LUKS assert not result.unlocked - @mock.patch("subprocess.check_output", side_effect=subprocess.CalledProcessError("check_output", 1)) + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError("check_output", 1), + ) def test_get_luks_volume_error(self, mocked_subprocess): with pytest.raises(ExportException) as ex: self.cli.get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) @@ -202,13 +225,19 @@ def test_unlock_luks_volume_success(self, mock_path, mocker): mock_popen.returncode = 0 mocker.patch("subprocess.Popen", return_value=mock_popen) - mocker.patch("subprocess.Popen.communicate", return_value=mock_popen_communicate) + mocker.patch( + "subprocess.Popen.communicate", return_value=mock_popen_communicate + ) mapped_name = "luks-id-123456" - vol = Volume(device_name=_DEFAULT_USB_DEVICE, mapped_name=mapped_name, encryption=EncryptionScheme.LUKS) + vol = Volume( + device_name=_DEFAULT_USB_DEVICE, + mapped_name=mapped_name, + encryption=EncryptionScheme.LUKS, + ) key = "a_key&_!" result = self.cli.unlock_luks_volume(vol, key) - assert vol.unlocked + assert result.unlocked @mock.patch("os.path.exists", return_value=True) def test_unlock_luks_volume_not_luks(self, mocker): @@ -218,9 +247,12 @@ def test_unlock_luks_volume_not_luks(self, mocker): mocker.patch("subprocess.Popen", mock_popen) - vol = Volume(device_name=_DEFAULT_USB_DEVICE, mapped_name=_PRETEND_LUKS_ID, encryption=EncryptionScheme.UNKNOWN) + vol = Volume( + device_name=_DEFAULT_USB_DEVICE, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.UNKNOWN, + ) key = "a key!" - mapped_name = "luks-id-123456" with pytest.raises(ExportException) as ex: self.cli.unlock_luks_volume(vol, key) @@ -234,18 +266,26 @@ def test_unlock_luks_volume_passphrase_failure(self, mocker): mocker.patch("subprocess.Popen", mock_popen) - vol = Volume(device_name=_DEFAULT_USB_DEVICE, mapped_name=_PRETEND_LUKS_ID, encryption=EncryptionScheme.LUKS) + vol = Volume( + device_name=_DEFAULT_USB_DEVICE, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) key = "a key!" - mapped_name = "luks-id-123456" with pytest.raises(ExportException): self.cli.unlock_luks_volume(vol, key) - @mock.patch("subprocess.Popen", side_effect=subprocess.CalledProcessError("1", "Popen")) + @mock.patch( + "subprocess.Popen", side_effect=subprocess.CalledProcessError("1", "Popen") + ) def test_unlock_luks_volume_luksOpen_exception(self, mocked_subprocess): - pd = Volume(device_name=_DEFAULT_USB_DEVICE, mapped_name=_PRETEND_LUKS_ID, encryption=EncryptionScheme.LUKS) + pd = Volume( + device_name=_DEFAULT_USB_DEVICE, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) key = "a key!" - mapped_name = "luks-id-123456" with pytest.raises(ExportException) as ex: self.cli.unlock_luks_volume(pd, key) @@ -261,13 +301,17 @@ def test_mount_volume(self, mocked_call, mocked_output, mocked_path): mapped_name=_PRETEND_LUKS_ID, encryption=EncryptionScheme.LUKS, ) - result = self.cli.mount_volume(vol) + self.cli.mount_volume(vol) assert vol.mountpoint is self.cli._DEFAULT_MOUNTPOINT @mock.patch("os.path.exists", return_value=True) - @mock.patch("subprocess.check_output", return_value=b"/dev/pretend/luks-id-123456\n") + @mock.patch( + "subprocess.check_output", return_value=b"/dev/pretend/luks-id-123456\n" + ) @mock.patch("subprocess.check_call", return_value=0) - def test_mount_volume_already_mounted(self, mocked_output, mocked_call, mocked_path): + def test_mount_volume_already_mounted( + self, mocked_output, mocked_call, mocked_path + ): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, @@ -288,7 +332,10 @@ def test_mount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path) assert self.cli.mount_volume(md).mapped_name == _PRETEND_LUKS_ID @mock.patch("subprocess.check_output", return_value=b"\n") - @mock.patch("subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call")) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) def test_mount_volume_error(self, mocked_subprocess, mocked_output): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, @@ -302,7 +349,10 @@ def test_mount_volume_error(self, mocked_subprocess, mocked_output): assert ex.value.sdstatus is Status.ERROR_MOUNT @mock.patch("os.path.exists", return_value=False) - @mock.patch("subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call")) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) def test_mount_at_mountpoint_mkdir_error(self, mocked_subprocess, mocked_path): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, @@ -317,7 +367,10 @@ def test_mount_at_mountpoint_mkdir_error(self, mocked_subprocess, mocked_path): assert ex.value.sdstatus is Status.ERROR_MOUNT @mock.patch("os.path.exists", return_value=True) - @mock.patch("subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call")) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) def test_mount_at_mountpoint_mounting_error(self, mocked_subprocess, mocked_path): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, @@ -345,7 +398,10 @@ def test__unmount_volume(self, mocked_subprocess, mocked_mountpath): assert result.mountpoint is None @mock.patch("os.path.exists", return_value=True) - @mock.patch("subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call")) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) def test__unmount_volume_error(self, mocked_subprocess, mocked_mountpath): mounted = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, @@ -372,7 +428,10 @@ def test__close_luks_volume(self, mocked_subprocess, mocked_os_call): self.cli._close_luks_volume(mapped) @mock.patch("os.path.exists", return_value=True) - @mock.patch("subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call")) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) def test__close_luks_volume_error(self, mocked_subprocess, mocked_os_call): mapped = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, @@ -385,7 +444,10 @@ def test__close_luks_volume_error(self, mocked_subprocess, mocked_os_call): assert ex.value.sdstatus is Status.DEVICE_ERROR - @mock.patch("subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call")) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) def test__remove_temp_directory_error(self, mocked_subprocess): with pytest.raises(ExportException): self.cli._remove_temp_directory("tmp") @@ -412,7 +474,10 @@ def test_write_to_disk(self, mock_check_call): # Don't want to patch it indefinitely though, that will mess with the other tests patch.stop() - @mock.patch("subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call")) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) def test_write_to_disk_error_still_does_cleanup(self, mock_call): # see above - patch internal method only for this test patch = mock.patch.object(self.cli, "cleanup_drive_and_tmpdir") @@ -428,16 +493,21 @@ def test_write_to_disk_error_still_does_cleanup(self, mock_call): submission = Archive("testfile") with pytest.raises(ExportException): - self.cli.write_data_to_device(submission.tmpdir, submission.target_dirname, vol) + self.cli.write_data_to_device( + submission.tmpdir, submission.target_dirname, vol + ) self.cli.cleanup_drive_and_tmpdir.assert_called_once() patch.stop() - @mock.patch("subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call")) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) def test_cleanup_drive_and_tmpdir_error(self, mocked_subprocess): submission = Archive("testfile") mock_volume = mock.MagicMock(Volume) - + with pytest.raises(ExportException) as ex: self.cli.cleanup_drive_and_tmpdir(mock_volume, submission.tmpdir) assert ex.value.sdstatus is Status.ERROR_EXPORT_CLEANUP @@ -469,10 +539,19 @@ def test_cleanup_drive_and_tmpdir(self, mock_subprocess, mocked_path): close_patch.stop() remove_tmpdir_patch.stop() - @mock.patch("subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "check_output")) + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) def test_mountpoint_error(self, mock_subprocess): with pytest.raises(ExportException) as ex: - self.cli._get_mountpoint(Volume(device_name=_DEFAULT_USB_DEVICE, mapped_name=_PRETEND_LUKS_ID, encryption=EncryptionScheme.LUKS)) + self.cli._get_mountpoint( + Volume( + device_name=_DEFAULT_USB_DEVICE, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + ) assert ex.value.sdstatus is Status.ERROR_MOUNT @@ -480,15 +559,15 @@ def test_mountpoint_error(self, mock_subprocess): def test_mount_mkdir_fails(self, mocked_path): mock_mountpoint = mock.patch.object(self.cli, "_get_mountpoint") mock_mountpoint.return_value = None - # mock.patch("subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "check_output")) - mock_volume = mock.MagicMock() - mock_volume.device_name = _DEFAULT_USB_DEVICE_ONE_PART - mock_volume.mapped_name = _PRETEND_LUKS_ID - mock_volume.EncryptionScheme = EncryptionScheme.LUKS - mock_volume.unlocked = True + vol = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + mock.patch.object(vol, "unlocked", return_value=True) with pytest.raises(ExportException) as ex: - self.cli.mount_volume(mock_volume) + self.cli.mount_volume(vol) - assert ex.value.sdstatus is Status.ERROR_MOUNT \ No newline at end of file + assert ex.value.sdstatus is Status.ERROR_MOUNT diff --git a/tests/disk/test_service.py b/tests/disk/test_service.py index 6cda027..800a4fd 100644 --- a/tests/disk/test_service.py +++ b/tests/disk/test_service.py @@ -1,19 +1,12 @@ import pytest from unittest import mock - import os -import pytest -import sys import tempfile -import subprocess -from subprocess import CalledProcessError - from securedrop_export.exceptions import ExportException from securedrop_export.disk.status import Status from securedrop_export.disk.new_status import Status as NewStatus from securedrop_export.disk.volume import Volume, EncryptionScheme - from securedrop_export.archive import Archive, Metadata from securedrop_export.disk.service import Service from securedrop_export.disk.cli import CLI @@ -22,15 +15,24 @@ SAMPLE_OUTPUT_USB = "/dev/sda" # noqa SAMPLE_OUTPUT_USB_PARTITIONED = "/dev/sda1" -class TestExportService: +class TestExportService: @classmethod def setup_class(cls): cls.mock_cli = mock.MagicMock(CLI) cls.mock_submission = cls._setup_submission() - cls.mock_luks_volume_unmounted = Volume(device_name=SAMPLE_OUTPUT_USB, mapped_name="fake-luks-id-123456", encryption=EncryptionScheme.LUKS) - cls.mock_luks_volume_mounted = Volume(device_name=SAMPLE_OUTPUT_USB, mapped_name="fake-luks-id-123456", mountpoint="/media/usb", encryption=EncryptionScheme.LUKS) + cls.mock_luks_volume_unmounted = Volume( + device_name=SAMPLE_OUTPUT_USB, + mapped_name="fake-luks-id-123456", + encryption=EncryptionScheme.LUKS, + ) + cls.mock_luks_volume_mounted = Volume( + device_name=SAMPLE_OUTPUT_USB, + mapped_name="fake-luks-id-123456", + mountpoint="/media/usb", + encryption=EncryptionScheme.LUKS, + ) cls.service = Service(cls.mock_submission, cls.mock_cli) @@ -49,11 +51,12 @@ def _setup_submission(cls) -> Archive: temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: - f.write('{"device": "disk", "encryption_method": "luks", "encryption_key": "hunter1"}') - - submission.archive_metadata = Metadata.create_and_validate(temp_folder) + f.write( + '{"device": "disk", "encryption_method":' + ' "luks", "encryption_key": "hunter1"}' + ) - return submission + return submission.set_metadata(Metadata(temp_folder).validate()) def setup_method(self, method): """ @@ -63,7 +66,9 @@ def setup_method(self, method): test methods. """ self.mock_cli.get_connected_devices.return_value = [SAMPLE_OUTPUT_USB] - self.mock_cli.get_partitioned_device.return_value = SAMPLE_OUTPUT_USB_PARTITIONED + self.mock_cli.get_partitioned_device.return_value = ( + SAMPLE_OUTPUT_USB_PARTITIONED + ) self.mock_cli.get_luks_volume.return_value = self.mock_luks_volume_unmounted self.mock_cli.mount_volume.return_value = self.mock_luks_volume_mounted @@ -83,7 +88,10 @@ def test_no_devices_connected(self): assert ex.value.sdstatus is Status.LEGACY_USB_NOT_CONNECTED def test_too_many_devices_connected(self): - self.mock_cli.get_connected_devices.return_value = [SAMPLE_OUTPUT_USB, "/dev/sdb"] + self.mock_cli.get_connected_devices.return_value = [ + SAMPLE_OUTPUT_USB, + "/dev/sdb", + ] with pytest.raises(ExportException) as ex: self.service.check_connected_devices() @@ -100,7 +108,9 @@ def test_device_is_not_luks(self): assert ex.value.sdstatus is Status.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED def test_check_usb_error(self): - self.mock_cli.get_connected_devices.side_effect = ExportException(sdstatus=Status.LEGACY_ERROR_USB_CHECK) + self.mock_cli.get_connected_devices.side_effect = ExportException( + sdstatus=Status.LEGACY_ERROR_USB_CHECK + ) with pytest.raises(ExportException) as ex: self.service.check_connected_devices() @@ -113,17 +123,21 @@ def test_check_disk_format(self): assert status is Status.LEGACY_USB_ENCRYPTED def test_check_disk_format_error(self): - self.mock_cli.get_partitioned_device.side_effect=ExportException(sdstatus=NewStatus.INVALID_DEVICE_DETECTED) + self.mock_cli.get_partitioned_device.side_effect = ExportException( + sdstatus=NewStatus.INVALID_DEVICE_DETECTED + ) with pytest.raises(ExportException) as ex: self.service.check_disk_format() - # We still return the legacy status for now + # We still return the legacy status for now assert ex.value.sdstatus is Status.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED def test_export(self): - status = self.service.export() - assert status is Status.SUCCESS_EXPORT + # Currently, a successful export does not return a success status. + # When the client is updated, this will change to assert EXPORT_SUCCESS + # is returned. + self.service.export() def test_export_disk_not_supported(self): self.mock_cli.is_luks_volume.return_value = False @@ -134,8 +148,10 @@ def test_export_disk_not_supported(self): assert ex.value.sdstatus is Status.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED def test_export_write_error(self): - self.mock_cli.is_luks_volume.return_value=True - self.mock_cli.write_data_to_device.side_effect = ExportException(sdstatus=Status.LEGACY_ERROR_USB_WRITE) + self.mock_cli.is_luks_volume.return_value = True + self.mock_cli.write_data_to_device.side_effect = ExportException( + sdstatus=Status.LEGACY_ERROR_USB_WRITE + ) with pytest.raises(ExportException) as ex: self.service.export() @@ -143,7 +159,9 @@ def test_export_write_error(self): assert ex.value.sdstatus is Status.LEGACY_ERROR_USB_WRITE def test_export_throws_new_exception_return_legacy_status(self): - self.mock_cli.get_connected_devices.side_effect = ExportException(sdstatus=NewStatus.ERROR_MOUNT) + self.mock_cli.get_connected_devices.side_effect = ExportException( + sdstatus=NewStatus.ERROR_MOUNT + ) with pytest.raises(ExportException) as ex: self.service.export() @@ -152,8 +170,10 @@ def test_export_throws_new_exception_return_legacy_status(self): @mock.patch("os.path.exists", return_value=True) def test_write_error_returns_legacy_status(self, mock_path): - self.mock_cli.is_luks_volume.return_value=True - self.mock_cli.write_data_to_device.side_effect = ExportException(sdstatus=NewStatus.ERROR_EXPORT) + self.mock_cli.is_luks_volume.return_value = True + self.mock_cli.write_data_to_device.side_effect = ExportException( + sdstatus=NewStatus.ERROR_EXPORT + ) with pytest.raises(ExportException) as ex: self.service.export() @@ -162,7 +182,9 @@ def test_write_error_returns_legacy_status(self, mock_path): @mock.patch("os.path.exists", return_value=True) def test_unlock_error_returns_legacy_status(self, mock_path): - self.mock_cli.unlock_luks_volume.side_effect = ExportException(sdstatus=NewStatus.ERROR_UNLOCK_LUKS) + self.mock_cli.unlock_luks_volume.side_effect = ExportException( + sdstatus=NewStatus.ERROR_UNLOCK_LUKS + ) with pytest.raises(ExportException) as ex: self.service.export() @@ -171,9 +193,11 @@ def test_unlock_error_returns_legacy_status(self, mock_path): @mock.patch("os.path.exists", return_value=True) def test_unexpected_error_returns_legacy_status_generic(self, mock_path): - self.mock_cli.unlock_luks_volume.side_effect = ExportException(sdstatus=NewStatus.DEVICE_ERROR) + self.mock_cli.unlock_luks_volume.side_effect = ExportException( + sdstatus=NewStatus.DEVICE_ERROR + ) with pytest.raises(ExportException) as ex: self.service.export() - assert ex.value.sdstatus is Status.LEGACY_ERROR_GENERIC \ No newline at end of file + assert ex.value.sdstatus is Status.LEGACY_ERROR_GENERIC diff --git a/tests/disk/test_volume.py b/tests/disk/test_volume.py index 8651bdb..f28e711 100644 --- a/tests/disk/test_volume.py +++ b/tests/disk/test_volume.py @@ -1,4 +1,3 @@ -import pytest from unittest import mock from securedrop_export.disk.volume import Volume, EncryptionScheme @@ -6,7 +5,11 @@ class TestVolume: def test_overwrite_valid_encryption_scheme(self): - volume = Volume(device_name="/dev/sda", mapped_name="pretend-luks-mapper-id", encryption=EncryptionScheme.LUKS) + volume = Volume( + device_name="/dev/sda", + mapped_name="pretend-luks-mapper-id", + encryption=EncryptionScheme.LUKS, + ) assert volume.encryption is EncryptionScheme.LUKS volume.encryption = None assert volume.encryption is EncryptionScheme.UNKNOWN @@ -14,7 +17,9 @@ def test_overwrite_valid_encryption_scheme(self): @mock.patch("os.path.exists", return_value=True) def test_is_unlocked_true(self, mock_os_path): volume = Volume( - device_name="/dev/sda1", mapped_name="pretend-luks-mapper-id", encryption=EncryptionScheme.LUKS + device_name="/dev/sda1", + mapped_name="pretend-luks-mapper-id", + encryption=EncryptionScheme.LUKS, ) assert volume.unlocked @@ -22,7 +27,9 @@ def test_is_unlocked_true(self, mock_os_path): @mock.patch("os.path.exists", return_value=False) def test_is_unlocked_false_no_path(self, mock_os_path): volume = Volume( - device_name="/dev/sda1", mapped_name="pretend-luks-mapper-id", encryption=EncryptionScheme.LUKS + device_name="/dev/sda1", + mapped_name="pretend-luks-mapper-id", + encryption=EncryptionScheme.LUKS, ) assert not volume.unlocked @@ -30,13 +37,15 @@ def test_is_unlocked_false_no_path(self, mock_os_path): @mock.patch("os.path.exists", return_value=True) def test_writable_false(self, mock_os_path): vol = Volume( - device_name="dev/sda1", mapped_name="pretend-luks-id", encryption=EncryptionScheme.LUKS + device_name="dev/sda1", + mapped_name="pretend-luks-id", + encryption=EncryptionScheme.LUKS, ) assert not vol.writable @mock.patch("os.path.exists", return_value=True) - def test_writable_false(self, mock_os_path): + def test_writable(self, mock_os_path): vol = Volume( device_name="dev/sda1", mapped_name="pretend-luks-id", diff --git a/tests/print/test_service.py b/tests/print/test_service.py index 317cc99..dfff606 100644 --- a/tests/print/test_service.py +++ b/tests/print/test_service.py @@ -1,9 +1,11 @@ -from unittest import mock +import pytest +from unittest import mock import os -import pytest +import subprocess from subprocess import CalledProcessError -import sys + +from securedrop_export.directory_util import safe_mkdir from securedrop_export.exceptions import ExportException from securedrop_export.archive import Archive @@ -13,15 +15,24 @@ SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa SAMPLE_OUTPUT_BROTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa SAMPLE_OUTPUT_LASERJET_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000\nnetwork lpd" # noqa -TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") - +SAMPLE_OUTPUT_UNSUPPORTED_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Canon/QL-700%?serial=A00000A000000\nnetwork lpd" # noqa -class PrinterTest: +class TestPrint: @classmethod def setup_class(cls): - cls.submission = Archive("testfile", TEST_CONFIG) - cls.service = Service(submission) + cls.submission = Archive("testfile") + cls.service = Service(cls.submission) + + # Set up files as if extracted from tarball + fp = os.path.join(cls.submission.tmpdir, "export_data") + if not os.path.exists(fp): + safe_mkdir(fp) + + for i in ["file1", "file2", "file3"]: + with open(f"{cls.submission.tmpdir}/export_data/{i}.txt", "a+") as file: + file.write(f"It's a pretend file {i}") + file.write("\n") @classmethod def teardown_class(cls): @@ -29,16 +40,72 @@ def teardown_class(cls): cls.submission = None @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_BROTHER_PRINTER) - def test_get_good_printer_uri_laserjet(mocked_call): - assert self.service._get_printer_uri() == "usb://Brother/HL-L2320D%20series?serial=A00000A000000" + def test_get_good_printer_uri_laserjet(self, mocked_call): + assert ( + self.service._get_printer_uri() + == "usb://Brother/HL-L2320D%20series?serial=A00000A000000" + ) + + def test_service_initialized_correctly(self): + assert self.service.printer_wait_timeout == 60 + assert self.service.printer_name == "sdw-printer" + + def test_print_all_methods_called(self): + patch_setup = mock.patch.object(self.service, "_check_printer_setup") + patch_print = mock.patch.object(self.service, "_print_all_files") + + mock_setup = patch_setup.start() + mock_print = patch_print.start() + + self.service.print() + + # When the client can accept new status values, we will assert that the + # above call results in Status.PRINT_SUCCESS + assert mock_setup.called_once() + assert mock_print.called_once() + + patch_setup.stop() + patch_print.stop() + + def test_printer_test_all_methods_called(self): + patch_setup = mock.patch.object(self.service, "_check_printer_setup") + + mock_setup = patch_setup.start() + + self.service.printer_preflight() + + # When the client can accept new status values, we will assert that the + # above call results in Status.PREFLIGHT_SUCCESS + assert mock_setup.called_once() + + patch_setup.stop() + + def test_print_all_checks_called(self): + patch_setup = mock.patch.object(self.service, "_check_printer_setup") + patch_print = mock.patch.object(self.service, "_print_test_page") + + mock_setup = patch_setup.start() + mock_print = patch_print.start() + self.service.printer_test() + # When the client can accept new status values, we will assert that the + # above call results in Status.TEST_SUCCESS + + assert mock_setup.called_once() + assert mock_print.called_once() + + patch_setup.stop() + patch_print.stop() @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_LASERJET_PRINTER) - def test_get_good_printer_uri_brother(mocked_call): - assert self.service._get_printer_uri() == "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000" + def test_get_good_printer_uri_brother(self, mocked_call): + assert ( + self.service._get_printer_uri() + == "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000" + ) @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER) - def test_get_bad_printer_uri(mocked_call, capsys, mocker): + def test_get_bad_printer_uri(self, mocked_call, capsys, mocker): with pytest.raises(ExportException) as ex: self.service._get_printer_uri() @@ -53,7 +120,7 @@ def test_get_bad_printer_uri(mocked_call, capsys, mocker): "/tmp/tmpJf83j9/secret.pptx" ], ) - def test_is_open_office_file(capsys, open_office_paths): + def test_is_open_office_file(self, capsys, open_office_paths): assert self.service._is_open_office_file(open_office_paths) @pytest.mark.parametrize( @@ -65,37 +132,44 @@ def test_is_open_office_file(capsys, open_office_paths): "/tmp/tmpJf83j9/secret.gpg" ], ) - def test_is_not_open_office_file(capsys, open_office_paths): + def test_is_not_open_office_file(self, capsys, open_office_paths): assert not self.service._is_open_office_file(open_office_paths) @mock.patch("subprocess.run") - def test_install_printer_ppd_laserjet(mocker): - ppd = self.service._install_printer_ppd("usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A00000") + def test_install_printer_ppd_laserjet(self, mocker): + ppd = self.service._install_printer_ppd( + "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A00000" + ) assert ppd == "/usr/share/cups/model/hp-laserjet_6l.ppd" @mock.patch("subprocess.run") - def test_install_printer_ppd_brother(mocker): - ppd = self.service._install_printer_ppd("usb://Brother/HL-L2320D%20series?serial=A00000A000000") + def test_install_printer_ppd_brother(self, mocker): + ppd = self.service._install_printer_ppd( + "usb://Brother/HL-L2320D%20series?serial=A00000A000000" + ) assert ppd == "/usr/share/cups/model/br7030.ppd" - - def test_install_printer_ppd_error_no_driver(mocker): + def test_install_printer_ppd_error_no_driver(self, mocker): mocker.patch("subprocess.run", side_effect=CalledProcessError(1, "run")) with pytest.raises(ExportException) as ex: - self.service._install_printer_ppd("usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000") + self.service._install_printer_ppd( + "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000" + ) assert ex.value.sdstatus is Status.ERROR_PRINTER_DRIVER_UNAVAILABLE - def test_install_printer_ppd_error_not_supported(mocker): + def test_install_printer_ppd_error_not_supported(self, mocker): mocker.patch("subprocess.run", side_effect=CalledProcessError(1, "run")) with pytest.raises(ExportException) as ex: - self.service._install_printer_ppd("usb://Not/Supported?serial=A00000A000000") + self.service._install_printer_ppd( + "usb://Not/Supported?serial=A00000A000000" + ) assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_SUPPORTED - def test_setup_printer_error(mocker): + def test_setup_printer_error(self, mocker): mocker.patch("subprocess.run", side_effect=CalledProcessError(1, "run")) with pytest.raises(ExportException) as ex: @@ -106,7 +180,6 @@ def test_setup_printer_error(mocker): assert ex.value.sdstatus is Status.ERROR_PRINTER_INSTALL - def test_safe_check_call(self): # This works, since `ls` is a valid comand self.service.safe_check_call(["ls"], Status.TEST_SUCCESS) @@ -115,12 +188,12 @@ def test_safe_check_call_invalid_call(self): with pytest.raises(ExportException) as ex: self.service.safe_check_call(["ls", "kjdsfhkdjfh"], Status.ERROR_PRINT) - assert ex.value.sdstatus is FakeStatus.ERROR_PRINT + assert ex.value.sdstatus is Status.ERROR_PRINT def test_safe_check_call_write_to_stderr_and_ignore_error(self): self.service.safe_check_call( ["python3", "-c", "import sys;sys.stderr.write('hello')"], - Status.TEST_SUCCESS, + error_status=Status.TEST_SUCCESS, ignore_stderr_startswith=b"hello", ) @@ -129,8 +202,192 @@ def test_safe_check_call_write_to_stderr_wrong_ignore_param(self): with pytest.raises(ExportException) as ex: self.service.safe_check_call( ["python3", "-c", "import sys;sys.stderr.write('hello\n')"], - Status.ERROR_PRINT, + error_status=Status.ERROR_PRINT, ignore_stderr_startswith=b"world", ) - assert ex.value.sdstatus is Status.ERROR_PRINT \ No newline at end of file + assert ex.value.sdstatus is Status.ERROR_PRINT + + @mock.patch("time.sleep", return_value=None) + @mock.patch( + "subprocess.check_output", + side_effect=[ + b"printer sdw-printer is busy\n", + b"printer sdw-printer is idle\n", + ], + ) + def test__wait_for_print(self, mock_subprocess, mock_time): + assert self.service._wait_for_print() + + @mock.patch("time.sleep", return_value=None) + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) + def test__wait_for_print_print_exception(self, mock_subprocess, mock_time): + with pytest.raises(ExportException) as ex: + self.service._wait_for_print() + + assert ex.value.sdstatus is Status.ERROR_PRINT + + @mock.patch( + "subprocess.check_output", return_value=b"printer sdw-printer is busy\n" + ) + def test__wait_for_print_timeout_exception(self, mock_subprocess): + self.service.printer_wait_timeout = 1 + + with pytest.raises(ExportException) as ex: + self.service._wait_for_print() + + assert ex.value.sdstatus is Status.ERROR_PRINT + + self.service.printer_wait_timeout = self.service.PRINTER_WAIT_TIMEOUT + + @pytest.mark.parametrize( + "printers", [SAMPLE_OUTPUT_BROTHER_PRINTER, SAMPLE_OUTPUT_LASERJET_PRINTER] + ) + def test__check_printer_setup(self, printers, mocker): + mocker.patch("subprocess.check_output", return_value=printers) + p = mocker.patch.object(self.service, "_setup_printer") + p2 = mocker.patch.object(self.service, "_install_printer_ppd") + p.start() + p2.start() + + self.service._check_printer_setup() + p.assert_called_once() + p2.assert_called_once() + + p.stop() + p2.stop() + + @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER) + def test__check_printer_setup_error_no_printer(self, mock_output): + + with pytest.raises(ExportException) as ex: + self.service._check_printer_setup() + assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_FOUND + + @mock.patch( + "subprocess.check_output", + return_value=SAMPLE_OUTPUT_BROTHER_PRINTER + + b"\n" + + SAMPLE_OUTPUT_LASERJET_PRINTER, + ) + def test__check_printer_setup_error_too_many_printers(self, mock_output): + + with pytest.raises(ExportException) as ex: + self.service._check_printer_setup() + assert ex.value.sdstatus is Status.ERROR_MULTIPLE_PRINTERS_FOUND + + @mock.patch( + "subprocess.check_output", return_value=SAMPLE_OUTPUT_UNSUPPORTED_PRINTER + ) + def test__check_printer_setup_error_unsupported_printer(self, mock_output): + + with pytest.raises(ExportException) as ex: + self.service._check_printer_setup() + assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_SUPPORTED + + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) + def test__check_printer_setup_error_checking_printer(self, mock_output): + + with pytest.raises(ExportException) as ex: + self.service._check_printer_setup() + assert ex.value.sdstatus is Status.ERROR_UNKNOWN + + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) + def test__get_printer_uri_error(self, mocked_subprocess): + with pytest.raises(ExportException) as ex: + self.service._get_printer_uri() + assert ex.value.sdstatus is Status.ERROR_PRINTER_URI + + @mock.patch( + "subprocess.check_output", return_value=SAMPLE_OUTPUT_UNSUPPORTED_PRINTER + ) + def test__get_printer_uri_error_unsupported(self, mocked_subprocess): + with pytest.raises(ExportException) as ex: + self.service._get_printer_uri() + assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_SUPPORTED + + def test__install_printer_ppd_error_unsupported_uri(self): + with pytest.raises(ExportException) as ex: + self.service._install_printer_ppd( + "usb://YOURE_NOT_MY_REAL_PRINTER/A00000A000000" + ) + assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_SUPPORTED + + def test__print_test_page_calls_method(self): + p = mock.patch.object(self.service, "_print_file") + mock_print = p.start() + + self.service._print_test_page() + mock_print.assert_called_once_with("/usr/share/cups/data/testprint") + p.stop() + + def test__print_all_files(self): + p = mock.patch.object(self.service, "_print_file") + mock_print = p.start() + + self.service._print_all_files() + mock_print.assert_has_calls( + [ + mock.call(f"{self.submission.tmpdir}/export_data/file1.txt"), + mock.call(f"{self.submission.tmpdir}/export_data/file2.txt"), + mock.call(f"{self.submission.tmpdir}/export_data/file3.txt"), + ], + any_order=True, + ) + p.stop() + + def test_open_office_file_convert_to_pdf(self): + file = "/tmp/definitely-an-office-file.odt" + + with mock.patch.object(self.service, "safe_check_call") as scc, mock.patch( + "securedrop_export.print.service.logger.info" + ) as log: + self.service._print_file(file) + + assert scc.call_count == 2 + scc.assert_has_calls( + [ + mock.call( + command=[ + "unoconv", + "-o", + "/tmp/definitely-an-office-file.odt.pdf", + "/tmp/definitely-an-office-file.odt", + ], + error_status=Status.ERROR_PRINT, + ), + mock.call( + command=[ + "xpp", + "-P", + "sdw-printer", + "/tmp/definitely-an-office-file.odt.pdf", + ], + error_status=Status.ERROR_PRINT, + ), + ] + ) + assert log.call_count == 2 + log.assert_has_calls( + [ + mock.call("Converting Office document to pdf"), + mock.call("Sending file to printer sdw-printer"), + ] + ) + + def test_safe_check_call_has_error_in_stderr(self): + mock.patch("subprocess.run") + + with mock.patch("subprocess.run"), pytest.raises(ExportException) as ex: + self.service.safe_check_call(command="ls", error_status=Status.TEST_SUCCESS) + + assert ex.value.sdstatus is Status.TEST_SUCCESS diff --git a/tests/test_archive.py b/tests/test_archive.py index 4e840ab..57791a8 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -12,6 +12,7 @@ from securedrop_export.exceptions import ExportException from securedrop_export.archive import Archive, Metadata, Status + def test_extract_tarball(): """ Check that we can successfully extract a valid tarball. @@ -42,20 +43,25 @@ def test_extract_tarball(): archive.close() - submission = Archive(archive_path) + submission = Archive(archive_path).extract_tarball() assert oct(os.stat(submission.tmpdir).st_mode) == "0o40700" - submission.extract_tarball() - - extracted_file_path = os.path.join(submission.tmpdir, "some", "dirs", "file.txt") + extracted_file_path = os.path.join( + submission.tmpdir, "some", "dirs", "file.txt" + ) assert os.path.exists(extracted_file_path) assert oct(os.stat(extracted_file_path).st_mode) == "0o100600" # Subdirectories that are added as members are extracted with 700 permissions - assert oct(os.stat(os.path.join(submission.tmpdir, "some")).st_mode) == "0o40700" + assert ( + oct(os.stat(os.path.join(submission.tmpdir, "some")).st_mode) == "0o40700" + ) # Subdirectories that are not added as members are extracted with 700 permissions # because os.umask(0o077) is set in the Archive constructor. - assert oct(os.stat(os.path.join(submission.tmpdir, "some", "dirs")).st_mode) == "0o40700" + assert ( + oct(os.stat(os.path.join(submission.tmpdir, "some", "dirs")).st_mode) + == "0o40700" + ) def test_extract_tarball_with_symlink(): @@ -84,7 +90,7 @@ def test_extract_tarball_with_symlink(): submission = Archive(archive_path) assert oct(os.stat(submission.tmpdir).st_mode) == "0o40700" - submission.extract_tarball() + submission = submission.extract_tarball() symlink_path = os.path.join(submission.tmpdir, "symlink") assert os.path.islink(symlink_path) @@ -111,7 +117,9 @@ def test_extract_tarball_raises_if_doing_path_traversal(): metadata_file_info.size = len(metadata_str) archive.addfile(metadata_file_info, metadata_bytes) content = b"test" - traversed_file_info = tarfile.TarInfo("../../../../../../../../../tmp/traversed") + traversed_file_info = tarfile.TarInfo( + "../../../../../../../../../tmp/traversed" + ) traversed_file_info.size = len(content) archive.addfile(traversed_file_info, BytesIO(content)) archive.close() @@ -288,7 +296,9 @@ def test_extract_tarball_raises_if_name_has_unsafe_absolute_path_with_symlink(): archive_path = os.path.join(temp_dir, "archive.sd-export") symlink_path = os.path.join(temp_dir, "symlink") - os.system(f"ln -s {tmp}/unsafe {symlink_path}") # create symlink to "/tmp/unsafe" + os.system( + f"ln -s {tmp}/unsafe {symlink_path}" + ) # create symlink to "/tmp/unsafe" with tarfile.open(archive_path, "w:gz") as archive: metadata = { @@ -402,7 +412,8 @@ def test_empty_config(capsys): f.write("{}") with pytest.raises(ExportException) as ex: - config = Metadata.create_and_validate(temp_folder) + Metadata(temp_folder).validate() + assert ex.value.sdstatus is Status.ERROR_ARCHIVE_METADATA def test_valid_printer_test_config(capsys): @@ -412,7 +423,7 @@ def test_valid_printer_test_config(capsys): with open(metadata, "w") as f: f.write('{"device": "printer-test"}') - config = Metadata.create_and_validate(temp_folder) + config = Metadata(temp_folder).validate() assert config.encryption_key is None assert config.encryption_method is None @@ -425,7 +436,7 @@ def test_valid_printer_config(capsys): with open(metadata, "w") as f: f.write('{"device": "printer"}') - config = Metadata.create_and_validate(temp_folder) + config = Metadata(temp_folder).validate() assert config.encryption_key is None assert config.encryption_method is None @@ -437,14 +448,17 @@ def test_invalid_encryption_config(capsys): temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: - f.write('{"device": "disk", "encryption_method": "base64", "encryption_key": "hunter1"}') + f.write( + '{"device": "disk", "encryption_method": "base64", "encryption_key": "hunter1"}' + ) with pytest.raises(ExportException) as ex: - config = Metadata.create_and_validate(temp_folder) + Metadata(temp_folder).validate() assert ex.value.sdstatus is Status.ERROR_ARCHIVE_METADATA -def test_malforned_config(capsys): + +def test_invalid_config(capsys): Archive("testfile") temp_folder = tempfile.mkdtemp() @@ -453,37 +467,46 @@ def test_malforned_config(capsys): f.write('{"device": "asdf", "encryption_method": "OHNO"}') with pytest.raises(ExportException) as ex: - config = Metadata.create_and_validate(temp_folder) + Metadata(temp_folder).validate() + + assert ex.value.sdstatus is Status.ERROR_ARCHIVE_METADATA + + +def test_malformed_config(capsys): + Archive("testfile") + + temp_folder = tempfile.mkdtemp() + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) + with open(metadata, "w") as f: + f.write('{"device": "asdf", "encryption_method": {"OHNO", "MALFORMED"}') + + with pytest.raises(ExportException) as ex: + Metadata(temp_folder).validate() assert ex.value.sdstatus is Status.ERROR_METADATA_PARSING + def test_valid_encryption_config(capsys): Archive("testfile") temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: - f.write('{"device": "disk", "encryption_method": "luks", "encryption_key": "hunter1"}') + f.write( + '{"device": "disk", "encryption_method": "luks", "encryption_key": "hunter1"}' + ) - config = Metadata.create_and_validate(temp_folder) + config = Metadata(temp_folder).validate() assert config.encryption_key == "hunter1" assert config.encryption_method == "luks" -def test_cannot_use_metadata_constructor(): - """ - Require the `create_and_validate()` method for returning a Metadata object - """ - with pytest.raises(ValueError): - Metadata(object(), tempfile.mkdtemp()) - - @mock.patch("json.loads", side_effect=json.decoder.JSONDecodeError("ugh", "badjson", 0)) def test_metadata_parsing_error(mock_json): """ Handle exception caused when loading metadata JSON """ with pytest.raises(ExportException) as ex: - Metadata.create_and_validate(tempfile.mkdtemp()) + Metadata(tempfile.mkdtemp()).validate() - assert ex.value.sdstatus is Status.ERROR_METADATA_PARSING \ No newline at end of file + assert ex.value.sdstatus is Status.ERROR_METADATA_PARSING diff --git a/tests/test_directory_util.py b/tests/test_directory_util.py index 18eb6dd..cc1f304 100644 --- a/tests/test_directory_util.py +++ b/tests/test_directory_util.py @@ -1,24 +1,34 @@ import pytest import os +import tempfile +import shutil from pathlib import Path from securedrop_export import directory_util -from securedrop_export.exceptions import ExportException -class TestUtil: - _TMPDIR_PATH = "/tmp/pretendium/" +class TestDirectoryUtil: + _REL_TRAVERSAL = "../../../whee" _SAFE_RELPATH = "./hi" _SAFE_RELPATH2 = "yay/a/path" _UNSAFE_RELPATH = "lgtm/../ohwait" + @classmethod + def setup_class(cls): + cls.homedir = tempfile.mkdtemp() + "/" + + @classmethod + def teardown_class(cls): + if os.path.exists(cls.homedir): + shutil.rmtree(cls.homedir) + def setup_method(self, method): pass def teadown_method(self, method): - if (os.path.exists(self._TMPDIR_PATH)): - os.remove(self._TMPDIR_PATH) + if os.path.exists(self.homedir): + os.remove(self.homedir) def test_safe_mkdir_error_base_relpath(self): with pytest.raises(ValueError): @@ -26,40 +36,49 @@ def test_safe_mkdir_error_base_relpath(self): def test_safe_mkdir_error_basepath_path_traversal(self): with pytest.raises(ValueError): - directory_util.safe_mkdir(f"{self._TMPDIR_PATH}{self._REL_TRAVERSAL}") + directory_util.safe_mkdir(f"{self.homedir}{self._REL_TRAVERSAL}") def test_safe_mkdir_error_relpath_path_traversal(self): with pytest.raises(ValueError): - directory_util.safe_mkdir(f"{self._TMPDIR_PATH}", f"{self._REL_TRAVERSAL}") + directory_util.safe_mkdir(f"{self.homedir}", f"{self._REL_TRAVERSAL}") def test_safe_mkdir_success(self): - directory_util.safe_mkdir(f"{self._TMPDIR_PATH}") + directory_util.safe_mkdir(f"{self.homedir}") def test_safe_mkdir_success_with_relpath(self): - directory_util.safe_mkdir(f"{self._TMPDIR_PATH}", f"{self._SAFE_RELPATH}") + directory_util.safe_mkdir(f"{self.homedir}", f"{self._SAFE_RELPATH}") - assert (os.path.exists(f"{self._TMPDIR_PATH}{self._SAFE_RELPATH}")) + assert os.path.exists(f"{self.homedir}{self._SAFE_RELPATH}") def test_safe_mkdir_success_another_relpath(self): - directory_util.safe_mkdir(f"{self._TMPDIR_PATH}", f"{self._SAFE_RELPATH2}") + directory_util.safe_mkdir(f"{self.homedir}", f"{self._SAFE_RELPATH2}") + + assert os.path.exists(f"{self.homedir}{self._SAFE_RELPATH2}") - assert (os.path.exists(f"{self._TMPDIR_PATH}{self._SAFE_RELPATH2}")) - def test_safe_mkdir_weird_path(self): with pytest.raises(ValueError): - directory_util.safe_mkdir(f"{self._TMPDIR_PATH}", f"{self._UNSAFE_RELPATH}") + directory_util.safe_mkdir(f"{self.homedir}", f"{self._UNSAFE_RELPATH}") def test__check_all_permissions_path_missing(self): with pytest.raises(ValueError): - directory_util._check_all_permissions(f"{self._TMPDIR_PATH}", f"{self._SAFE_RELPATH}") + directory_util._check_all_permissions( + f"{self.homedir}", f"{self._SAFE_RELPATH}" + ) def test_check_dir_perms_unsafe(self): - path = Path(f"{self._TMPDIR_PATH}{self._SAFE_RELPATH}") + path = Path(f"{self.homedir}{self._SAFE_RELPATH}") directory_util.safe_mkdir(path) # Not what we want, ever path.chmod(0o666) - + with pytest.raises(RuntimeError): - directory_util._check_dir_permissions(path) + directory_util._check_dir_permissions(path) + + def test_check_all_perms_invalid_full_path(self): + path = Path(f"{self.homedir}/idontexist") + base = Path(f"{self.homedir}") + + # Returns without error + assert directory_util._check_all_permissions(path, base) is None diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py index 577fae1..71af411 100644 --- a/tests/test_exceptions.py +++ b/tests/test_exceptions.py @@ -3,18 +3,18 @@ from securedrop_export.exceptions import handler, TimeoutException + def test_handler(): signal.signal(signal.SIGALRM, handler) signal.setitimer(signal.ITIMER_REAL, 0.001) - with pytest.raises(TimeoutException) as ex: + with pytest.raises(TimeoutException): _run_handler_routine() + def _run_handler_routine(): try: while True: continue except TimeoutException: raise - - \ No newline at end of file diff --git a/tests/test_main.py b/tests/test_main.py index e309ec3..b94109a 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,17 +1,41 @@ import pytest -from unittest import mock +import tempfile import os +from unittest import mock +import shutil + +from securedrop_export.archive import Archive, Metadata, Status as ArchiveStatus +from securedrop_export.status import BaseStatus +from securedrop_export.command import Command +from securedrop_export.exceptions import ExportException + +from securedrop_export.main import ( + Status, + entrypoint, + _exit_gracefully, + _write_status, + _start_service, + _configure_logging, +) + +SUBMISSION_SAMPLE_ARCHIVE = "pretendfile.tar.gz" -from securedrop_export.main import Status, entrypoint, _extract_and_run, _exit_gracefully, _write_status # noqa: F401 -from securedrop_export.archive import Archive -class TestMain(): +class TestMain: + def setup_method(self, method): + # This can't be a class method, since we expect sysexit during this test suite, + # which + self.submission = Archive("pretendfile.tar.gz") + assert os.path.exists(self.submission.tmpdir) + + def teardown_method(self, method): + if os.path.exists(self.submission.tmpdir): + shutil.rmtree(self.submission.tmpdir) + self.submission = None def test_exit_gracefully_no_exception(self, capsys): - submission = Archive("testfile") - with pytest.raises(SystemExit) as sysexit: - _exit_gracefully(submission, Status.ERROR_GENERIC) + _exit_gracefully(self.submission, Status.ERROR_GENERIC) # A graceful exit means a return code of 0 assert sysexit.value.code == 0 @@ -20,23 +44,19 @@ def test_exit_gracefully_no_exception(self, capsys): assert captured.err == "{}\n".format(Status.ERROR_GENERIC.value) assert captured.out == "" - def test_exit_gracefully_exception(self, capsys): - submission = Archive("testfile") - with pytest.raises(SystemExit) as sysexit: exception = mock.MagicMock() exception.output = "BANG!" - _exit_gracefully(submission, Status.ERROR_GENERIC, e=exception) + _exit_gracefully(self.submission, Status.ERROR_GENERIC, e=exception) # A graceful exit means a return code of 0 assert sysexit.value.code == 0 captured = capsys.readouterr() - assert captured.err.rstrip() == Status.ERROR_GENERIC.value + assert captured.err.rstrip() == Status.ERROR_GENERIC.value # todo assert captured.out == "" - @pytest.mark.parametrize("status", [s for s in Status]) def test_write_status(self, status, capsys): _write_status(status) @@ -49,13 +69,121 @@ def test_write_status_error(self, invalid_status, capsys): with pytest.raises(ValueError): _write_status(Status(invalid_status)) + def _did_exit_gracefully(self, exit, capsys, status: BaseStatus) -> bool: + """ + Helper. True if exited with 0, writing supplied status to stderr. + """ + return exit.value.code == 0 and capsys.readouterr().err == status.value + "\n" + + @pytest.mark.parametrize("command", list(Command)) + @mock.patch("securedrop_export.main._configure_logging") + @mock.patch("os.path.exists", return_value=True) + def test_entrypoint_success_start_service(self, mock_log, mock_path, command): + metadata = os.path.join(self.submission.tmpdir, Metadata.METADATA_FILE) + + with open(metadata, "w") as f: + f.write(f'{{"device": "{command.value}", "encryption_method": "luks"}}') + + with mock.patch( + "sys.argv", ["qvm-send-to-usb", SUBMISSION_SAMPLE_ARCHIVE] + ), mock.patch( + "securedrop_export.main._start_service" + ) as mock_service, mock.patch( + "securedrop_export.main.Archive.extract_tarball", + return_value=self.submission, + ), pytest.raises( + SystemExit + ): + entrypoint() + + if command is not Command.START_VM: + assert self.submission.command == command + assert mock_service.call_args[0][0].archive == SUBMISSION_SAMPLE_ARCHIVE + mock_service.assert_called_once_with(self.submission) + + def test_valid_printer_test_config(self, capsys): + Archive("testfile") + temp_folder = tempfile.mkdtemp() + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) + with open(metadata, "w") as f: + f.write('{"device": "printer-test"}') + + config = Metadata(temp_folder).validate() + + assert config.encryption_key is None + assert config.encryption_method is None + + @mock.patch( + "securedrop_export.archive.safe_extractall", + side_effect=ValueError("A tarball problem!"), + ) + @mock.patch("securedrop_export.main.os.path.exists", return_value=True) + @mock.patch("securedrop_export.main.shutil.rmtree") + def test_entrypoint_failure_extraction( + self, mock_rm, mock_path, mock_extract, capsys + ): + with mock.patch( + "sys.argv", ["qvm-send-to-usb", SUBMISSION_SAMPLE_ARCHIVE] + ), pytest.raises(SystemExit) as sysexit: + entrypoint() + + assert self._did_exit_gracefully( + sysexit, capsys, ArchiveStatus.ERROR_EXTRACTION + ) + + @mock.patch( + "securedrop_export.main._configure_logging", + side_effect=ExportException( + sdstatus=Status.ERROR_LOGGING, + message="Zounds, an error setting up logging!", + ), + ) + def test_entrypoint_logging_fails(self, mock_mkdir, capsys): + with pytest.raises(SystemExit) as sysexit: + entrypoint() + + assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_LOGGING) + + @mock.patch( + "securedrop_export.main._configure_logging", + side_effect=RuntimeError("Zounds, an uncaught error!"), + ) + def test_entrypoint_fails_unexpected(self, mock_mkdir, capsys): + with pytest.raises(SystemExit) as sysexit: + entrypoint() + + assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_GENERIC) + + @mock.patch("os.path.exists", return_value=False) + def test_entrypoint_archive_path_fails(self, mock_path, capsys): + with pytest.raises(SystemExit) as sysexit: + entrypoint() + + assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_FILE_NOT_FOUND) + + @mock.patch( + "securedrop_export.main.safe_mkdir", + side_effect=ValueError(1, "No logs for you!"), + ) + def test__configure_logging_error(self, mock_mkdir, capsys): + with pytest.raises(ExportException) as ex: + _configure_logging() + + assert ex.value.sdstatus is Status.ERROR_LOGGING - def test__extract_and_run(self): - pass + @pytest.mark.parametrize("command", list(Command)) + def test__start_service_calls_correct_services(self, command): + if command is Command.START_VM: + pytest.skip("Command does not start a service") + self.submission.command = command - def test__extract_and_run_failure(self): - pass + with mock.patch("securedrop_export.main.PrintService") as ps, mock.patch( + "securedrop_export.main.ExportService" + ) as es: + _start_service(self.submission) - def test_entrypoint(self): - pass + if command in [Command.PRINT, Command.PRINTER_TEST, Command.PRINTER_PREFLIGHT]: + assert ps.call_args[0][0] is self.submission + else: + assert es.call_args[0][0] is self.submission