diff --git a/.semgrep/custom-rules.yaml b/.semgrep/custom-rules.yaml index 6793bb2..377e55f 100644 --- a/.semgrep/custom-rules.yaml +++ b/.semgrep/custom-rules.yaml @@ -47,7 +47,7 @@ rules: languages: - python severity: ERROR - message: Possible path traversal or insecure directory and file permissions through os.mkdir(). Use securedrop_export.utils.safe_mkdir instead. + message: Possible path traversal or insecure directory and file permissions through os.mkdir(). Use securedrop_export.directory.safe_mkdir instead. patterns: - pattern: "....mkdir(...)" - pattern-not-inside: | @@ -58,7 +58,7 @@ rules: languages: - python severity: ERROR - message: Possible path traversal or insecure directory and file permissions through os.makedirs(). Use securedrop_export.utils.safe_mkdir instead. + message: Possible path traversal or insecure directory and file permissions through os.makedirs(). Use securedrop_export.directory.safe_mkdir instead. patterns: - pattern: "....makedirs(...)" - pattern-not-inside: | diff --git a/securedrop_export/archive.py b/securedrop_export/archive.py new file mode 100755 index 0000000..ed81082 --- /dev/null +++ b/securedrop_export/archive.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 + +import datetime +import json +import logging +import os +import tempfile + +from securedrop_export.exceptions import ExportException +from securedrop_export.status import BaseStatus +from securedrop_export.command import Command +from securedrop_export.directory import safe_extractall + +logger = logging.getLogger(__name__) + + +class Status(BaseStatus): + ERROR_ARCHIVE_METADATA = "ERROR_ARCHIVE_METADATA" + ERROR_METADATA_PARSING = "ERROR_METADATA_PARSING" + ERROR_EXTRACTION = "ERROR_EXTRACTION" + + +class Metadata(object): + """ + Object to parse, validate and store json metadata from the sd-export archive. + """ + + METADATA_FILE = "metadata.json" + SUPPORTED_ENCRYPTION_METHODS = ["luks"] + + def __init__(self, archive_path: str): + self.metadata_path = os.path.join(archive_path, self.METADATA_FILE) + + def validate(self) -> "Metadata": + # Read metadata json and set relevant attributes + try: + with open(self.metadata_path) as f: + logger.info("Parsing archive metadata") + json_config = json.loads(f.read()) + self.export_method = json_config.get("device", None) + self.encryption_method = json_config.get("encryption_method", None) + self.encryption_key = json_config.get("encryption_key", None) + logger.info( + "Target: {}, encryption_method {}".format( + self.export_method, self.encryption_method + ) + ) + + except Exception as ex: + logger.error("Metadata parsing failure") + raise ExportException(sdstatus=Status.ERROR_METADATA_PARSING) from ex + + # Validate action - fails if command is not in list of supported commands + try: + logger.debug("Validate export action") + self.command = Command(self.export_method) + if ( + self.command is Command.EXPORT + and self.encryption_method not in self.SUPPORTED_ENCRYPTION_METHODS + ): + logger.error("Unsupported encryption method") + raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) + except ValueError as v: + raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) from v + + return self + + +class Archive(object): + def __init__(self, archive_path: str): + os.umask(0o077) + self.archive = archive_path + self.target_dirname = "sd-export-{}".format( + datetime.datetime.now().strftime("%Y%m%d-%H%M%S") + ) + self.tmpdir = tempfile.mkdtemp() + + def extract_tarball(self) -> "Archive": + """ + Extract tarball, checking for path traversal, and return Archive object. + """ + try: + logger.info( + "Extracting tarball {} into {}".format(self.archive, self.tmpdir) + ) + safe_extractall(self.archive, self.tmpdir) + return self + except Exception as ex: + logger.error("Unable to extract tarball: {}".format(ex)) + raise ExportException(sdstatus=Status.ERROR_EXTRACTION) from ex + + def set_metadata(self, metadata: Metadata) -> "Archive": + """ + Set relevant metadata attributes for a given archive. + """ + self.command = metadata.command + if self.command is Command.EXPORT: + # When we support multiple encryption types, we will also want to add the + # encryption_method here + self.encryption_key = metadata.encryption_key + return self diff --git a/securedrop_export/command.py b/securedrop_export/command.py new file mode 100644 index 0000000..06a3167 --- /dev/null +++ b/securedrop_export/command.py @@ -0,0 +1,18 @@ +from enum import Enum + + +class Command(Enum): + """ + All supported commands. + + Values are as supplied by the calling VM (sd-app), and a change in any values requires + corresponding changes in the calling VM. + """ + + PRINTER_PREFLIGHT = "printer-preflight" + PRINTER_TEST = "printer-test" + PRINT = "printer" + CHECK_USBS = "usb-test" + CHECK_VOLUME = "disk-test" + EXPORT = "disk" + START_VM = "" diff --git a/securedrop_export/utils.py b/securedrop_export/directory.py similarity index 89% rename from securedrop_export/utils.py rename to securedrop_export/directory.py index f5e1229..4f5edf5 100644 --- a/securedrop_export/utils.py +++ b/securedrop_export/directory.py @@ -23,10 +23,10 @@ def safe_mkdir( if not base_path.is_absolute(): raise ValueError(f"Base directory '{base_path}' must be an absolute path") - check_path_traversal(base_path) + _check_path_traversal(base_path) if relative_path: - check_path_traversal(relative_path) + _check_path_traversal(relative_path) full_path = base_path.joinpath(relative_path) else: full_path = base_path @@ -35,7 +35,7 @@ def safe_mkdir( # # Note: We do not use parents=True because the parent directories will not be created with the # specified mode. Parents are created using system default permissions, which we modify to be - # 700 via os.umask in the SDExport contructor. Creating directories one-by-one with mode=0o0700 + # 700 via os.umask in the Archive contructor. Creating directories one-by-one with mode=0o0700 # is not necessary but adds defense in depth. relative_path = relative_filepath(full_path, base_path) for parent in reversed(relative_path.parents): @@ -45,7 +45,7 @@ def safe_mkdir( full_path.mkdir(mode=0o0700, exist_ok=True) # Check permissions after creating the directories - check_all_permissions(relative_path, base_path) + _check_all_permissions(relative_path, base_path) def safe_extractall(archive_file_path: str, dest_path: str) -> None: @@ -65,14 +65,14 @@ def safe_extractall(archive_file_path: str, dest_path: str) -> None: for file_info in tar.getmembers(): file_info.mode = 0o700 if file_info.isdir() else 0o600 - check_path_traversal(file_info.name) + _check_path_traversal(file_info.name) # If the path is relative then we don't need to check that it resolves to dest_path if Path(file_info.name).is_absolute(): relative_filepath(file_info.name, dest_path) if file_info.islnk() or file_info.issym(): - check_path_traversal(file_info.linkname) + _check_path_traversal(file_info.linkname) # If the path is relative then we don't need to check that it resolves to dest_path if Path(file_info.linkname).is_absolute(): relative_filepath(file_info.linkname, dest_path) @@ -92,7 +92,7 @@ def relative_filepath(filepath: Union[str, Path], base_dir: Union[str, Path]) -> return Path(filepath).resolve().relative_to(base_dir) -def check_path_traversal(filename_or_filepath: Union[str, Path]) -> None: +def _check_path_traversal(filename_or_filepath: Union[str, Path]) -> None: """ Raise ValueError if filename_or_filepath does any path traversal. This works on filenames, relative paths, and absolute paths. @@ -121,7 +121,7 @@ def check_path_traversal(filename_or_filepath: Union[str, Path]) -> None: raise ValueError(f"Unsafe file or directory name: '{filename_or_filepath}'") -def check_all_permissions(path: Union[str, Path], base_path: Union[str, Path]) -> None: +def _check_all_permissions(path: Union[str, Path], base_path: Union[str, Path]) -> None: """ Check that the permissions of each directory between base_path and path are set to 700. """ @@ -131,16 +131,16 @@ def check_all_permissions(path: Union[str, Path], base_path: Union[str, Path]) - return Path(full_path).chmod(0o700) - check_dir_permissions(full_path) + _check_dir_permissions(full_path) relative_path = relative_filepath(full_path, base_path) for parent in relative_path.parents: full_path = base_path.joinpath(parent) Path(full_path).chmod(0o700) - check_dir_permissions(str(full_path)) + _check_dir_permissions(str(full_path)) -def check_dir_permissions(dir_path: Union[str, Path]) -> None: +def _check_dir_permissions(dir_path: Union[str, Path]) -> None: """ Check that a directory has ``700`` as the final 3 bytes. Raises a ``RuntimeError`` otherwise. """ diff --git a/securedrop_export/disk/__init__.py b/securedrop_export/disk/__init__.py index e69de29..3fa6c36 100644 --- a/securedrop_export/disk/__init__.py +++ b/securedrop_export/disk/__init__.py @@ -0,0 +1 @@ +from .service import Service # noqa: F401 diff --git a/securedrop_export/disk/actions.py b/securedrop_export/disk/actions.py deleted file mode 100644 index 9619aba..0000000 --- a/securedrop_export/disk/actions.py +++ /dev/null @@ -1,252 +0,0 @@ -import logging -import os -import subprocess -import sys - -from typing import List - -from securedrop_export.export import ExportAction -from securedrop_export.exceptions import ExportStatus - -MOUNTPOINT = "/media/usb" -ENCRYPTED_DEVICE = "encrypted_volume" - -logger = logging.getLogger(__name__) - - -class DiskAction(ExportAction): - def __init__(self, submission): - self.submission = submission - self.device = None # Optional[str] - self.mountpoint = MOUNTPOINT - self.encrypted_device = ENCRYPTED_DEVICE - - def run(self) -> None: - """Run logic""" - raise NotImplementedError - - def check_usb_connected(self, exit=False) -> None: - usb_devices = self._get_connected_usbs() - - if len(usb_devices) == 0: - logger.info("0 USB devices connected") - self.submission.exit_gracefully(ExportStatus.USB_NOT_CONNECTED.value) - elif len(usb_devices) == 1: - logger.info("1 USB device connected") - self.device = usb_devices[0] - if exit: - self.submission.exit_gracefully(ExportStatus.USB_CONNECTED.value) - elif len(usb_devices) > 1: - logger.info(">1 USB devices connected") - # Return generic error until freedomofpress/securedrop-export/issues/25 - self.submission.exit_gracefully(ExportStatus.ERROR_GENERIC.value) - - def _get_connected_usbs(self) -> List[str]: - logger.info("Performing usb preflight") - # List all block devices attached to VM that are disks and not partitions. - try: - lsblk = subprocess.Popen( - ["lsblk", "-o", "NAME,TYPE"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - grep = subprocess.Popen( - ["grep", "disk"], - stdin=lsblk.stdout, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - command_output = grep.stdout.readlines() - - # The first word in each element of the command_output list is the device name - attached_devices = [x.decode("utf8").split()[0] for x in command_output] - except subprocess.CalledProcessError: - self.submission.exit_gracefully(ExportStatus.ERROR_GENERIC.value) - - # Determine which are USBs by selecting those block devices that are removable disks. - usb_devices = [] - for device in attached_devices: - try: - removable = subprocess.check_output( - ["cat", "/sys/class/block/{}/removable".format(device)], - stderr=subprocess.PIPE, - ) - is_removable = int(removable.decode("utf8").strip()) - except subprocess.CalledProcessError: - is_removable = False - - if is_removable: - usb_devices.append("/dev/{}".format(device)) - - return usb_devices - - def set_extracted_device_name(self): - try: - device_and_partitions = subprocess.check_output( - ["lsblk", "-o", "TYPE", "--noheadings", self.device], - stderr=subprocess.PIPE, - ) - - # we don't support multiple partitions - partition_count = ( - device_and_partitions.decode("utf-8").split("\n").count("part") - ) - if partition_count > 1: - logger.debug("multiple partitions not supported") - self.submission.exit_gracefully( - ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value - ) - - # redefine device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted - self.device = self.device if partition_count == 0 else self.device + "1" - except subprocess.CalledProcessError: - self.submission.exit_gracefully( - ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value - ) - - def check_luks_volume(self): - # cryptsetup isLuks returns 0 if the device is a luks volume - # subprocess with throw if the device is not luks (rc !=0) - logger.info("Checking if volume is luks-encrypted") - self.set_extracted_device_name() - logger.debug("checking if {} is luks encrypted".format(self.device)) - self.submission.safe_check_call( - command=["sudo", "cryptsetup", "isLuks", self.device], - error_message=ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value, - ) - self.submission.exit_gracefully(ExportStatus.USB_ENCRYPTED.value) - - def unlock_luks_volume(self, encryption_key): - try: - # get the encrypted device name - self.set_extracted_device_name() - luks_header = subprocess.check_output( - ["sudo", "cryptsetup", "luksDump", self.device] - ) - luks_header_list = luks_header.decode("utf-8").split("\n") - for line in luks_header_list: - items = line.split("\t") - if "UUID" in items[0]: - self.encrypted_device = "luks-" + items[1] - - # the luks device is already unlocked - if os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)): - logger.debug("Device already unlocked") - return - - logger.debug("Unlocking luks volume {}".format(self.encrypted_device)) - p = subprocess.Popen( - ["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - logger.debug("Passing key") - p.communicate(input=str.encode(encryption_key, "utf-8")) - rc = p.returncode - if rc != 0: - logger.error("Bad phassphrase for {}".format(self.encrypted_device)) - self.submission.exit_gracefully(ExportStatus.USB_BAD_PASSPHRASE.value) - except subprocess.CalledProcessError: - self.submission.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED) - - def mount_volume(self): - # If the drive is already mounted then we don't need to mount it again - output = subprocess.check_output( - ["lsblk", "-o", "MOUNTPOINT", "--noheadings", self.device] - ) - mountpoint = output.decode("utf-8").strip() - if mountpoint: - logger.debug("The device is already mounted") - self.mountpoint = mountpoint - return - - # mount target not created, create folder - if not os.path.exists(self.mountpoint): - self.submission.safe_check_call( - command=["sudo", "mkdir", self.mountpoint], - error_message=ExportStatus.ERROR_USB_MOUNT, - ) - - mapped_device_path = os.path.join("/dev/mapper/", self.encrypted_device) - logger.info("Mounting {}".format(mapped_device_path)) - self.submission.safe_check_call( - command=["sudo", "mount", mapped_device_path, self.mountpoint], - error_message=ExportStatus.ERROR_USB_MOUNT.value, - ) - self.submission.safe_check_call( - command=["sudo", "chown", "-R", "user:user", self.mountpoint], - error_message=ExportStatus.ERROR_USB_MOUNT.value, - ) - - def copy_submission(self): - # move files to drive (overwrites files with same filename) and unmount drive - # we don't use safe_check_call here because we must lock and - # unmount the drive as part of the finally block - try: - target_path = os.path.join(self.mountpoint, self.submission.target_dirname) - subprocess.check_call(["mkdir", target_path]) - export_data = os.path.join(self.submission.tmpdir, "export_data/") - logger.info("Copying file to {}".format(self.submission.target_dirname)) - subprocess.check_call(["cp", "-r", export_data, target_path]) - logger.info( - "File copied successfully to {}".format(self.submission.target_dirname) - ) - except (subprocess.CalledProcessError, OSError): - self.submission.exit_gracefully(ExportStatus.ERROR_USB_WRITE.value) - finally: - logger.info("Syncing filesystems") - subprocess.check_call(["sync"]) - - if os.path.exists(self.mountpoint): - logger.info("Unmounting drive from {}".format(self.mountpoint)) - subprocess.check_call(["sudo", "umount", self.mountpoint]) - - if os.path.exists(os.path.join("/dev/mapper", self.encrypted_device)): - logger.info("Locking luks volume {}".format(self.encrypted_device)) - subprocess.check_call( - ["sudo", "cryptsetup", "luksClose", self.encrypted_device] - ) - - logger.info( - "Deleting temporary directory {}".format(self.submission.tmpdir) - ) - subprocess.check_call(["rm", "-rf", self.submission.tmpdir]) - sys.exit(0) - - -class USBTestAction(DiskAction): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def run(self): - logger.info("Export archive is usb-test") - self.check_usb_connected(exit=True) - - -class DiskTestAction(DiskAction): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def run(self): - logger.info("Export archive is disk-test") - # check_usb_connected looks for the drive, sets the drive to use - self.check_usb_connected() - self.check_luks_volume() - - -class DiskExportAction(DiskAction): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def run(self): - logger.info("Export archive is disk") - # check_usb_connected looks for the drive, sets the drive to use - self.check_usb_connected() - logger.info("Unlocking volume") - # exports all documents in the archive to luks-encrypted volume - self.unlock_luks_volume(self.submission.archive_metadata.encryption_key) - logger.info("Mounting volume") - self.mount_volume() - logger.info("Copying submission to drive") - self.copy_submission() diff --git a/securedrop_export/disk/cli.py b/securedrop_export/disk/cli.py new file mode 100644 index 0000000..5d07c9d --- /dev/null +++ b/securedrop_export/disk/cli.py @@ -0,0 +1,423 @@ +import logging +import os +import subprocess + +from typing import List, Optional + +from securedrop_export.exceptions import ExportException + +from .volume import EncryptionScheme, Volume +from .status import Status + +logger = logging.getLogger(__name__) + + +class CLI: + """ + A Python wrapper for various shell commands required to detect, map, and + mount Export devices. + + CLI callers must handle ExportException and all exceptions and exit with + sys.exit(0) so that another program does not attempt to open the submission. + """ + + # Default mountpoint (unless drive is already mounted manually by the user) + _DEFAULT_MOUNTPOINT = "/media/usb" + + def get_connected_devices(self) -> List[str]: + """ + List all block devices attached to VM that are disks and not partitions. + Return list of all removable connected block devices. + + Raise ExportException if any commands fail. + """ + logger.info("Checking connected volumes") + try: + lsblk = subprocess.Popen( + ["lsblk", "-o", "NAME,TYPE"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + grep = subprocess.Popen( + ["grep", "disk"], + stdin=lsblk.stdout, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + command_output = grep.stdout.readlines() + + # The first word in each element of the command_output list is the device name + attached_devices = [x.decode("utf8").split()[0] for x in command_output] + + except subprocess.CalledProcessError as ex: + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + + return self._get_removable_devices(attached_devices) + + def _get_removable_devices(self, attached_devices: List[str]) -> List[str]: + """ + Determine which block devices are USBs by selecting those that are removable. + """ + logger.info("Checking removable devices") + usb_devices = [] + for device in attached_devices: + is_removable = False + try: + removable = subprocess.check_output( + ["cat", f"/sys/class/block/{device}/removable"], + stderr=subprocess.PIPE, + ) + + # 0 for non-removable device, 1 for removable + is_removable = int(removable.decode("utf8").strip()) + + except subprocess.CalledProcessError: + # Not a removable device + continue + + if is_removable: + usb_devices.append(f"/dev/{device}") + + logger.info(f"{len(usb_devices)} connected") + return usb_devices + + def get_partitioned_device(self, blkid: str) -> str: + """ + Given a string representing a block device, return string that includes correct partition + (such as "/dev/sda" or "/dev/sda1"). + + Raise ExportException if partition check fails or device has unsupported partition scheme + (currently, multiple partitions are unsupported). + """ + device_and_partitions = self._check_partitions(blkid) + + if device_and_partitions: + partition_count = ( + device_and_partitions.decode("utf-8").split("\n").count("part") + ) + logger.debug(f"Counted {partition_count} partitions") + if partition_count > 1: + # We don't currently support devices with multiple partitions + logger.error( + f"Multiple partitions not supported ({partition_count} partitions" + f" on {blkid})" + ) + raise ExportException(sdstatus=Status.INVALID_DEVICE_DETECTED) + + # redefine device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted + if partition_count == 1: + logger.debug("One partition found") + blkid += "1" + + return blkid + + else: + # lsblk did not return output we could process + logger.error("Error checking device partitions") + raise ExportException(sdstatus=Status.DEVICE_ERROR) + + def _check_partitions(self, blkid: str) -> str: + try: + logger.debug(f"Checking device partitions on {blkid}") + device_and_partitions = subprocess.check_output( + ["lsblk", "-o", "TYPE", "--noheadings", blkid], stderr=subprocess.PIPE + ) + return device_and_partitions + + except subprocess.CalledProcessError as ex: + logger.error(f"Error checking block device {blkid}") + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + + def is_luks_volume(self, device: str) -> bool: + """ + Given a string representing a volume (/dev/sdX or /dev/sdX1), return True if volume is + LUKS-encrypted, otherwise False. + """ + isLuks = False + + try: + logger.debug("Checking if target device is luks encrypted") + + # cryptsetup isLuks returns 0 if the device is a luks volume + # subprocess will throw if the device is not luks (rc !=0) + subprocess.check_call(["sudo", "cryptsetup", "isLuks", device]) + + isLuks = True + + except subprocess.CalledProcessError: + # Not necessarily an error state, just means the volume is not LUKS encrypted + logger.info("Target device is not LUKS-encrypted") + + return isLuks + + def _get_luks_name_from_headers(self, device: str) -> str: + """ + Dump LUKS header and determine name of volume. + + Raise ExportException if errors encounterd during attempt to parse LUKS headers. + """ + logger.debug("Get LUKS name from headers") + try: + luks_header = subprocess.check_output( + ["sudo", "cryptsetup", "luksDump", device] + ) + if luks_header: + luks_header_list = luks_header.decode("utf-8").split("\n") + for line in luks_header_list: + items = line.split("\t") + if "UUID" in items[0]: + return "luks-" + items[1] + + # If no header or no UUID field, we can't use this drive + logger.error( + f"Failed to get UUID from LUKS header; {device} may not be correctly formatted" + ) + raise ExportException(sdstatus=Status.INVALID_DEVICE_DETECTED) + except subprocess.CalledProcessError as ex: + logger.error("Failed to dump LUKS header") + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + + def get_luks_volume(self, device: str) -> Volume: + """ + Given a string corresponding to a LUKS-partitioned volume, return a corresponding Volume + object. + + If LUKS volume is already mounted, existing mountpoint will be preserved. + If LUKS volume is unlocked but not mounted, volume will be mounted at _DEFAULT_MOUNTPOINT. + + If device is still locked, mountpoint will not be set. Once the decrpytion passphrase is + available, call unlock_luks_volume(), passing the Volume object and passphrase, to + unlock the volume. + + Raise ExportException if errors are encountered. + """ + try: + mapped_name = self._get_luks_name_from_headers(device) + logger.debug(f"Mapped name is {mapped_name}") + + # Setting the mapped_name does not mean the device has already been unlocked. + luks_volume = Volume( + device_name=device, + mapped_name=mapped_name, + encryption=EncryptionScheme.LUKS, + ) + + # If the device has been unlocked, we can see if it's mounted and + # use the existing mountpoint, or mount it ourselves. + if os.path.exists(os.path.join("/dev/mapper/", mapped_name)): + return self.mount_volume(luks_volume) + + # It's still locked + else: + return luks_volume + + except ExportException: + logger.error("Failed to return luks volume") + raise + + def unlock_luks_volume(self, volume: Volume, decryption_key: str) -> Volume: + """ + Unlock a LUKS-encrypted volume. + + Raise ExportException if errors are encountered during device unlocking. + """ + if volume.encryption is not EncryptionScheme.LUKS: + logger.error("Must call unlock_luks_volume() on LUKS-encrypted device") + raise ExportException(sdstatus=Status.DEVICE_ERROR) + + try: + logger.debug("Unlocking luks volume {}".format(volume.device_name)) + p = subprocess.Popen( + [ + "sudo", + "cryptsetup", + "luksOpen", + volume.device_name, + volume.mapped_name, + ], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + logger.debug("Passing key") + p.communicate(input=str.encode(decryption_key, "utf-8")) + rc = p.returncode + + if rc == 0: + return Volume( + device_name=volume.device_name, + mapped_name=volume.mapped_name, + encryption=EncryptionScheme.LUKS, + ) + else: + logger.error("Bad volume passphrase") + raise ExportException(sdstatus=Status.ERROR_UNLOCK_LUKS) + + except subprocess.CalledProcessError as ex: + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + + def _get_mountpoint(self, volume: Volume) -> Optional[str]: + """ + Check for existing mountpoint. + Raise ExportException if errors encountered during command. + """ + logger.debug("Checking mountpoint") + try: + output = subprocess.check_output( + ["lsblk", "-o", "MOUNTPOINT", "--noheadings", volume.device_name] + ) + return output.decode("utf-8").strip() + + except subprocess.CalledProcessError as ex: + logger.error(ex) + raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex + + def mount_volume(self, volume: Volume) -> Volume: + """ + Given an unlocked LUKS volume, return a mounted LUKS volume. + + If volume is already mounted, mountpoint is not changed. Otherwise, + volume is mounted at _DEFAULT_MOUNTPOINT. + + Raise ExportException if errors are encountered during mounting. + """ + if not volume.unlocked: + logger.error("Volume is not unlocked.") + raise ExportException(sdstatus=Status.ERROR_MOUNT) + + mountpoint = self._get_mountpoint(volume) + + if mountpoint: + logger.info("The device is already mounted") + if volume.mountpoint is not mountpoint: + logger.warning("Mountpoint was inaccurate, updating") + + volume.mountpoint = mountpoint + return volume + + else: + logger.info("Mount volume at default mountpoint") + return self._mount_at_mountpoint(volume, self._DEFAULT_MOUNTPOINT) + + def _mount_at_mountpoint(self, volume: Volume, mountpoint: str) -> Volume: + """ + Mount a volume at the supplied mountpoint, creating the mountpoint directory and + adjusting permissions (user:user) if need be. `mountpoint` must be a full path. + + Return Volume object. + Raise ExportException if unable to mount volume at target mountpoint. + """ + if not os.path.exists(mountpoint): + try: + subprocess.check_call(["sudo", "mkdir", mountpoint]) + except subprocess.CalledProcessError as ex: + logger.error(ex) + raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex + + # Mount device /dev/mapper/{mapped_name} at /media/usb/ + mapped_device_path = os.path.join( + volume.MAPPED_VOLUME_PREFIX, volume.mapped_name + ) + + try: + logger.info(f"Mounting volume at {mountpoint}") + subprocess.check_call(["sudo", "mount", mapped_device_path, mountpoint]) + subprocess.check_call(["sudo", "chown", "-R", "user:user", mountpoint]) + + volume.mountpoint = mountpoint + + except subprocess.CalledProcessError as ex: + logger.error(ex) + raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex + + return volume + + def write_data_to_device( + self, submission_tmpdir: str, submission_target_dirname: str, device: Volume + ): + """ + Move files to drive (overwrites files with same filename) and unmount drive. + Drive is unmounted and files are cleaned up as part of the `finally` block to ensure + that cleanup happens even if export fails or only partially succeeds. + """ + try: + target_path = os.path.join(device.mountpoint, submission_target_dirname) + subprocess.check_call(["mkdir", target_path]) + + export_data = os.path.join(submission_tmpdir, "export_data/") + logger.debug("Copying file to {}".format(submission_target_dirname)) + + subprocess.check_call(["cp", "-r", export_data, target_path]) + logger.info( + "File copied successfully to {}".format(submission_target_dirname) + ) + + except (subprocess.CalledProcessError, OSError) as ex: + raise ExportException(sdstatus=Status.ERROR_EXPORT) from ex + + finally: + self.cleanup_drive_and_tmpdir(device, submission_tmpdir) + + def cleanup_drive_and_tmpdir(self, volume: Volume, submission_tmpdir: str): + """ + Post-export cleanup method. Unmount and lock drive and remove temporary + directory. Currently called at end of `write_data_to_device()` to ensure + device is always locked after export. + + Raise ExportException if errors during cleanup are encountered. + """ + logger.debug("Syncing filesystems") + try: + subprocess.check_call(["sync"]) + umounted = self._unmount_volume(volume) + if umounted: + self._close_luks_volume(umounted) + self._remove_temp_directory(submission_tmpdir) + + except subprocess.CalledProcessError as ex: + logger.error("Error syncing filesystem") + raise ExportException(sdstatus=Status.ERROR_EXPORT_CLEANUP) from ex + + def _unmount_volume(self, volume: Volume) -> Volume: + """ + Helper. Unmount volume + """ + if os.path.exists(volume.mountpoint): + logger.debug(f"Unmounting drive from {volume.mountpoint}") + try: + subprocess.check_call(["sudo", "umount", volume.mountpoint]) + volume.mountpoint = None + + except subprocess.CalledProcessError as ex: + logger.error("Error unmounting device") + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + else: + logger.info("Mountpoint does not exist; volume was already unmounted") + + return volume + + def _close_luks_volume(self, unlocked_device: Volume) -> None: + """ + Helper. Close LUKS volume + """ + if os.path.exists(os.path.join("/dev/mapper", unlocked_device.mapped_name)): + logger.debug("Locking luks volume {}".format(unlocked_device)) + try: + subprocess.check_call( + ["sudo", "cryptsetup", "luksClose", unlocked_device.mapped_name] + ) + + except subprocess.CalledProcessError as ex: + logger.error("Error closing device") + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + + def _remove_temp_directory(self, tmpdir: str): + """ + Helper. Remove temporary directory used during archive export. + """ + logger.debug(f"Deleting temporary directory {tmpdir}") + try: + subprocess.check_call(["rm", "-rf", tmpdir]) + except subprocess.CalledProcessError as ex: + logger.error("Error removing temporary directory") + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex diff --git a/securedrop_export/disk/legacy_service.py b/securedrop_export/disk/legacy_service.py new file mode 100644 index 0000000..279a84d --- /dev/null +++ b/securedrop_export/disk/legacy_service.py @@ -0,0 +1,146 @@ +import logging + +from securedrop_export.exceptions import ExportException + +from .cli import CLI +from .legacy_status import Status as LegacyStatus +from .status import Status as Status + +logger = logging.getLogger(__name__) + + +class Service: + def __init__(self, submission, cli=None): + self.submission = submission + self.cli = cli or CLI() + + def check_connected_devices(self) -> LegacyStatus: + """ + Check if single USB is inserted. + """ + logger.info("Export archive is usb-test") + + try: + all_devices = self.cli.get_connected_devices() + num_devices = len(all_devices) + + except ExportException as ex: + logger.error(f"Error encountered during USB check: {ex.sdstatus.value}") + # Use legacy status instead of new status values + raise ExportException(sdstatus=LegacyStatus.LEGACY_ERROR_USB_CHECK) from ex + + if num_devices == 0: + raise ExportException(sdstatus=LegacyStatus.LEGACY_USB_NOT_CONNECTED) + elif num_devices == 1: + return LegacyStatus.LEGACY_USB_CONNECTED + elif num_devices > 1: + raise ExportException( + sdstatus=LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + ) + + def check_disk_format(self) -> LegacyStatus: + """ + Check if volume is correctly formatted for export. + """ + try: + all_devices = self.cli.get_connected_devices() + + if len(all_devices) == 1: + device = self.cli.get_partitioned_device(all_devices[0]) + logger.info("Check if LUKS") + if not self.cli.is_luks_volume(device): + raise ExportException( + sdstatus=LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + ) + # We can support checking if a drive is already unlocked, but for + # backwards compatibility, this is the only expected status + # at this stage + return LegacyStatus.LEGACY_USB_ENCRYPTED + + except ExportException as ex: + logger.error( + f"Error encountered during disk format check: {ex.sdstatus.value}" + ) + # Return legacy status values for now for ongoing client compatibility + if ex.sdstatus in [s for s in Status]: + status = self._legacy_status(ex.sdstatus) + raise ExportException(sdstatus=status) + elif ex.sdstatus: + raise + else: + raise ExportException(sdstatus=LegacyStatus.LEGACY_USB_DISK_ERROR) + + def export(self): + """ + Export all files to target device. + """ + logger.info("Export archive is disk") + + try: + all_devices = self.cli.get_connected_devices() + + if len(all_devices) == 1: + device = self.cli.get_partitioned_device(all_devices[0]) + + # Decide what kind of volume it is + logger.info("Check if LUKS") + if self.cli.is_luks_volume(device): + volume = self.cli.get_luks_volume(device) + logger.info("Check if writable") + if not volume.writable: + logger.info("Not writable-will try unlocking") + volume = self.cli.unlock_luks_volume( + volume, self.submission.encryption_key + ) + volume = self.cli.mount_volume(volume) + + logger.info(f"Export submission to {volume.mountpoint}") + self.cli.write_data_to_device( + self.submission.tmpdir, self.submission.target_dirname, volume + ) + # This is SUCCESS_EXPORT, but the 0.7.0 client is not expecting + # a return status from a successful export operation. + # When the client is updated, we will return SUCCESS_EXPORT here. + + else: + # Another kind of drive: VeraCrypt/TC, or unsupported. + # For now this is an error--in future there will be support + # for additional encryption formats + logger.error(f"Export failed because {device} is not supported") + raise ExportException( + sdstatus=LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + ) + + except ExportException as ex: + logger.error( + f"Error encountered during disk format check: {ex.sdstatus.value}" + ) + # Return legacy status values for now for ongoing client compatibility + if ex.sdstatus in [s for s in Status]: + status = self._legacy_status(ex.sdstatus) + raise ExportException(sdstatus=status) + elif ex.sdstatus: + raise + else: + raise ExportException(sdstatus=LegacyStatus.LEGACY_ERROR_GENERIC) + + def _legacy_status(self, status: Status) -> LegacyStatus: + """ + Backwards-compatibility - status values that client (@0.7.0) is expecting. + """ + logger.info(f"Convert to legacy: {status.value}") + if status is Status.ERROR_MOUNT: + return LegacyStatus.LEGACY_ERROR_USB_MOUNT + elif status in [Status.ERROR_EXPORT, Status.ERROR_EXPORT_CLEANUP]: + return LegacyStatus.LEGACY_ERROR_USB_WRITE + elif status in [Status.ERROR_UNLOCK_LUKS, Status.ERROR_UNLOCK_GENERIC]: + return LegacyStatus.LEGACY_USB_BAD_PASSPHRASE + elif status in [ + Status.INVALID_DEVICE_DETECTED, + Status.MULTI_DEVICE_DETECTED, + ]: + return LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + # The other status values, such as Status.NO_DEVICE_DETECTED, are not returned by the + # CLI, so we don't need to check for them here + else: + return LegacyStatus.LEGACY_ERROR_GENERIC diff --git a/securedrop_export/disk/legacy_status.py b/securedrop_export/disk/legacy_status.py new file mode 100644 index 0000000..fa0bdf8 --- /dev/null +++ b/securedrop_export/disk/legacy_status.py @@ -0,0 +1,26 @@ +from securedrop_export.status import BaseStatus + + +class Status(BaseStatus): + + LEGACY_ERROR_GENERIC = "ERROR_GENERIC" + + # Legacy USB preflight related + LEGACY_USB_CONNECTED = "USB_CONNECTED" # Success + LEGACY_USB_NOT_CONNECTED = "USB_NOT_CONNECTED" + LEGACY_ERROR_USB_CHECK = "ERROR_USB_CHECK" + + # Legacy USB Disk preflight related errors + LEGACY_USB_ENCRYPTED = "USB_ENCRYPTED" # Success + LEGACY_USB_ENCRYPTION_NOT_SUPPORTED = "USB_ENCRYPTION_NOT_SUPPORTED" + + # Can be raised during disk format check + LEGACY_USB_DISK_ERROR = "USB_DISK_ERROR" + + # Legacy Disk export errors + LEGACY_USB_BAD_PASSPHRASE = "USB_BAD_PASSPHRASE" + LEGACY_ERROR_USB_MOUNT = "ERROR_USB_MOUNT" + LEGACY_ERROR_USB_WRITE = "ERROR_USB_WRITE" + + # New + SUCCESS_EXPORT = "SUCCESS_EXPORT" diff --git a/securedrop_export/disk/service.py b/securedrop_export/disk/service.py new file mode 100644 index 0000000..b5702a4 --- /dev/null +++ b/securedrop_export/disk/service.py @@ -0,0 +1,120 @@ +import logging + +from securedrop_export.archive import Archive + +from .cli import CLI +from .status import Status +from .volume import Volume +from securedrop_export.exceptions import ExportException + + +logger = logging.getLogger(__name__) + + +class Service: + """ + Checks that can be performed against the device(s). + This is the "API" portion of the export workflow. + """ + + def __init__(self, cli: CLI): + self.cli = cli + + def run(self, arg: str) -> Status: + """ + Run export actions. + """ + + def scan_all_devices(self) -> Status: + """ + Check all connected devices and return current device + status. + """ + try: + all_devices = self.cli.get_connected_devices() + number_devices = len(all_devices) + + if number_devices == 0: + return Status.NO_DEVICE_DETECTED + elif number_devices > 1: + return Status.MULTI_DEVICE_DETECTED + else: + return self.scan_single_device(all_devices[0]) + + except ExportException as ex: + logger.error(ex) + return Status.DEVICE_ERROR # Could not assess devices + + def scan_single_device(self, blkid: str) -> Status: + """ + Given a string representing a single block device, see if it + is a suitable export target and return information about its state. + """ + try: + target = self.cli.get_partitioned_device(blkid) + + # See if it's a LUKS drive + if self.cli.is_luks_volume(target): + + # Returns Volume or throws ExportException + self.volume = self.cli.get_luks_volume(target) + + # See if it's unlocked and mounted + if self.volume.writable: + logger.debug("LUKS device is already mounted") + return Status.DEVICE_WRITABLE + else: + # Prompt for passphrase + return Status.DEVICE_LOCKED + else: + # Might be VeraCrypt, might be madness + logger.info("LUKS drive not found") + + # Currently we don't support anything other than LUKS. + # In future, we will support TC/VC volumes as well + return Status.INVALID_DEVICE_DETECTED + + except ExportException as ex: + logger.error(ex) + if ex.sdstatus: + return ex.sdstatus + else: + return Status.DEVICE_ERROR + + def unlock_device(self, passphrase: str, volume: Volume) -> Status: + """ + Given provided passphrase, unlock target volume. Currently, + LUKS volumes are supported. + """ + if volume: + try: + self.volume = self.cli.unlock_luks_volume(volume, passphrase) + + if volume.writable: + return Status.DEVICE_WRITABLE + else: + return Status.ERROR_UNLOCK_LUKS + + except ExportException as ex: + logger.error(ex) + return Status.ERROR_UNLOCK_LUKS + else: + # Trying to unlock devices before having an active device + logger.warning("Tried to unlock_device but no current volume detected.") + return Status.NO_DEVICE_DETECTED + + def write_to_device(self, volume: Volume, data: Archive) -> Status: + """ + Export data to volume. CLI unmounts and locks volume on completion, even + if export was unsuccessful. + """ + try: + self.cli.write_data_to_device(data.tmpdir, data.target_dirname, volume) + return Status.SUCCESS_EXPORT + + except ExportException as ex: + logger.error(ex) + if ex.sdstatus: + return ex.sdstatus + else: + return Status.ERROR_EXPORT diff --git a/securedrop_export/disk/status.py b/securedrop_export/disk/status.py new file mode 100644 index 0000000..285d9f8 --- /dev/null +++ b/securedrop_export/disk/status.py @@ -0,0 +1,29 @@ +from securedrop_export.status import BaseStatus + + +class Status(BaseStatus): + + NO_DEVICE_DETECTED = "NO_DEVICE_DETECTED" + INVALID_DEVICE_DETECTED = ( + "INVALID_DEVICE_DETECTED" # Multi partitioned, not encrypted, etc + ) + MULTI_DEVICE_DETECTED = "MULTI_DEVICE_DETECTED" # Not currently supported + + DEVICE_LOCKED = "DEVICE_LOCKED" # One device detected, and it's locked + DEVICE_WRITABLE = ( + "DEVICE_WRITABLE" # One device detected, and it's unlocked (and mounted) + ) + + ERROR_UNLOCK_LUKS = "ERROR_UNLOCK_LUKS" + ERROR_UNLOCK_GENERIC = "ERROR_UNLOCK_GENERIC" + ERROR_MOUNT = "ERROR_MOUNT" # Unlocked but not mounted + + SUCCESS_EXPORT = "SUCCESS_EXPORT" + ERROR_EXPORT = "ERROR_EXPORT" # Could not write to disk + + # export succeeds but drives were not properly unmounted + ERROR_EXPORT_CLEANUP = "ERROR_EXPORT_CLEANUP" + + DEVICE_ERROR = ( + "DEVICE_ERROR" # Something went wrong while trying to check the device + ) diff --git a/securedrop_export/disk/volume.py b/securedrop_export/disk/volume.py new file mode 100644 index 0000000..c6bc2f8 --- /dev/null +++ b/securedrop_export/disk/volume.py @@ -0,0 +1,59 @@ +from enum import Enum +import os + + +class EncryptionScheme(Enum): + """ + Supported encryption schemes. + """ + + UNKNOWN = 0 + LUKS = 1 + + +class Volume: + + MAPPED_VOLUME_PREFIX = "/dev/mapper/" + + """ + A volume on a removable device. + Volumes have a device name ("/dev/sdX"), a mapped name ("/dev/mapper/xxx"), an encryption + scheme, and a mountpoint if they are mounted. + """ + + def __init__( + self, + device_name: str, + mapped_name: str, + encryption: EncryptionScheme, + mountpoint: str = None, + ): + self.device_name = device_name + self.mapped_name = mapped_name + self.mountpoint = mountpoint + self.encryption = encryption + + @property + def encryption(self): + return self._encryption + + @encryption.setter + def encryption(self, scheme: EncryptionScheme): + if scheme: + self._encryption = scheme + else: + self._encryption = EncryptionScheme.UNKNOWN + + @property + def writable(self) -> bool: + return self.unlocked and self.mountpoint is not None + + @property + def unlocked(self) -> bool: + return ( + self.mapped_name is not None + and self.encryption is not EncryptionScheme.UNKNOWN + and os.path.exists( + os.path.join(self.MAPPED_VOLUME_PREFIX, self.mapped_name) + ) + ) diff --git a/securedrop_export/entrypoint.py b/securedrop_export/entrypoint.py deleted file mode 100755 index 3bb86ba..0000000 --- a/securedrop_export/entrypoint.py +++ /dev/null @@ -1,81 +0,0 @@ -import logging -import os -import shutil -import sys -import platform - -from logging.handlers import TimedRotatingFileHandler, SysLogHandler -from securedrop_export import __version__ -from securedrop_export import export -from securedrop_export import main -from securedrop_export.utils import safe_mkdir - -CONFIG_PATH = "/etc/sd-export-config.json" -DEFAULT_HOME = os.path.join(os.path.expanduser("~"), ".securedrop_export") -LOG_DIR_NAME = "logs" -EXPORT_LOG_FILENAME = "export.log" - -logger = logging.getLogger(__name__) - - -def configure_logging(): - """ - All logging related settings are set up by this function. - """ - safe_mkdir(DEFAULT_HOME) - safe_mkdir(DEFAULT_HOME, LOG_DIR_NAME) - - log_file = os.path.join(DEFAULT_HOME, LOG_DIR_NAME, EXPORT_LOG_FILENAME) - - # set logging format - log_fmt = ( - "%(asctime)s - %(name)s:%(lineno)d(%(funcName)s) " "%(levelname)s: %(message)s" - ) - formatter = logging.Formatter(log_fmt) - - handler = TimedRotatingFileHandler(log_file) - handler.setFormatter(formatter) - - # For rsyslog handler - if platform.system() != "Linux": # pragma: no cover - syslog_file = "/var/run/syslog" - else: - syslog_file = "/dev/log" - - sysloghandler = SysLogHandler(address=syslog_file) - sysloghandler.setFormatter(formatter) - handler.setLevel(logging.DEBUG) - - # set up primary log - log = logging.getLogger() - log.setLevel(logging.DEBUG) - log.addHandler(handler) - # add the second logger - log.addHandler(sysloghandler) - - -def start(): - try: - configure_logging() - except Exception: - msg = "ERROR_LOGGING" - export.SDExport.exit_gracefully(msg) - - logger.info("Starting SecureDrop Export {}".format(__version__)) - my_sub = export.SDExport(sys.argv[1], CONFIG_PATH) - - try: - # Halt immediately if target file is absent - if not os.path.exists(my_sub.archive): - logger.info("Archive is not found {}.".format(my_sub.archive)) - msg = "ERROR_FILE_NOT_FOUND" - my_sub.exit_gracefully(msg) - main.__main__(my_sub) - # Delete extracted achive from tempfile - shutil.rmtree(my_sub.tmpdir) - except Exception as e: - # exit with 0 return code otherwise the os will attempt to open - # the file with another application - logger.error(e) - msg = "ERROR_GENERIC" - my_sub.exit_gracefully(msg) diff --git a/securedrop_export/exceptions.py b/securedrop_export/exceptions.py index 11855c0..c70fac6 100644 --- a/securedrop_export/exceptions.py +++ b/securedrop_export/exceptions.py @@ -1,40 +1,14 @@ -from enum import Enum - - -class ExportStatus(Enum): - - # General errors - ERROR_FILE_NOT_FOUND = "ERROR_FILE_NOT_FOUND" - ERROR_EXTRACTION = "ERROR_EXTRACTION" - ERROR_METADATA_PARSING = "ERROR_METADATA_PARSING" - ERROR_ARCHIVE_METADATA = "ERROR_ARCHIVE_METADATA" - ERROR_USB_CONFIGURATION = "ERROR_USB_CONFIGURATION" - ERROR_GENERIC = "ERROR_GENERIC" - - # USB preflight related errors - USB_CONNECTED = "USB_CONNECTED" - USB_NOT_CONNECTED = "USB_NOT_CONNECTED" - ERROR_USB_CHECK = "ERROR_USB_CHECK" - - # USB Disk preflight related errors - USB_ENCRYPTED = "USB_ENCRYPTED" - USB_ENCRYPTION_NOT_SUPPORTED = "USB_ENCRYPTION_NOT_SUPPORTED" - USB_DISK_ERROR = "USB_DISK_ERROR" - - # Printer preflight related errors - ERROR_MULTIPLE_PRINTERS_FOUND = "ERROR_MULTIPLE_PRINTERS_FOUND" - ERROR_PRINTER_NOT_FOUND = "ERROR_PRINTER_NOT_FOUND" - ERROR_PRINTER_NOT_SUPPORTED = "ERROR_PRINTER_NOT_SUPPORTED" - ERROR_PRINTER_DRIVER_UNAVAILABLE = "ERROR_PRINTER_DRIVER_UNAVAILABLE" - ERROR_PRINTER_INSTALL = "ERROR_PRINTER_INSTALL" - - # Disk export errors - USB_BAD_PASSPHRASE = "USB_BAD_PASSPHRASE" - ERROR_USB_MOUNT = "ERROR_USB_MOUNT" - ERROR_USB_WRITE = "ERROR_USB_WRITE" +class ExportException(Exception): + """ + Base class for exceptions encountered during export. + In order to make use of additional attributes `sdstatus` and `sderror`, + pass them as keyword arguments when raising ExportException. + """ - # Printer export errors - ERROR_PRINT = "ERROR_PRINT" + def __init__(self, *args, **kwargs): + super().__init__(*args) + self.sdstatus = kwargs.get("sdstatus") + self.sderror = kwargs.get("sderror") class TimeoutException(Exception): diff --git a/securedrop_export/export.py b/securedrop_export/export.py deleted file mode 100755 index 02d12c6..0000000 --- a/securedrop_export/export.py +++ /dev/null @@ -1,151 +0,0 @@ -#!/usr/bin/env python3 - -import abc -import datetime -import json -import logging -import os -import shutil -import subprocess -import sys -import tempfile - -from securedrop_export.exceptions import ExportStatus -from securedrop_export.utils import safe_extractall - -logger = logging.getLogger(__name__) - - -class Metadata(object): - """ - Object to parse, validate and store json metadata from the sd-export archive. - """ - - METADATA_FILE = "metadata.json" - SUPPORTED_EXPORT_METHODS = [ - "start-vm", - "usb-test", # general preflight check - "disk", - "disk-test", # disk preflight test - "printer", - "printer-test", # print test page - "printer-preflight", - ] - SUPPORTED_ENCRYPTION_METHODS = ["luks"] - - def __init__(self, archive_path): - self.metadata_path = os.path.join(archive_path, self.METADATA_FILE) - - try: - with open(self.metadata_path) as f: - logger.info("Parsing archive metadata") - json_config = json.loads(f.read()) - self.export_method = json_config.get("device", None) - self.encryption_method = json_config.get("encryption_method", None) - self.encryption_key = json_config.get("encryption_key", None) - logger.info( - "Exporting to device {} with encryption_method {}".format( - self.export_method, self.encryption_method - ) - ) - - except Exception: - logger.error("Metadata parsing failure") - raise - - def is_valid(self): - logger.info("Validating metadata contents") - if self.export_method not in self.SUPPORTED_EXPORT_METHODS: - logger.error( - "Archive metadata: Export method {} is not supported".format( - self.export_method - ) - ) - return False - - if self.export_method == "disk": - if self.encryption_method not in self.SUPPORTED_ENCRYPTION_METHODS: - logger.error( - "Archive metadata: Encryption method {} is not supported".format( - self.encryption_method - ) - ) - return False - return True - - -class SDExport(object): - def __init__(self, archive, config_path): - os.umask(0o077) - self.archive = archive - self.submission_dirname = os.path.basename(self.archive).split(".")[0] - self.target_dirname = "sd-export-{}".format( - datetime.datetime.now().strftime("%Y%m%d-%H%M%S") - ) - self.tmpdir = tempfile.mkdtemp() - - def extract_tarball(self): - try: - logger.info( - "Extracting tarball {} into {}".format(self.archive, self.tmpdir) - ) - safe_extractall(self.archive, self.tmpdir) - except Exception as ex: - logger.error("Unable to extract tarball: {}".format(ex)) - self.exit_gracefully(ExportStatus.ERROR_EXTRACTION.value) - - def exit_gracefully(self, msg, e=False): - """ - Utility to print error messages, mostly used during debugging, - then exits successfully despite the error. Always exits 0, - since non-zero exit values will cause system to try alternative - solutions for mimetype handling, which we want to avoid. - """ - logger.info("Exiting with message: {}".format(msg)) - if e: - logger.error("Captured exception output: {}".format(e.output)) - try: - # If the file archive was extracted, delete before returning - if os.path.isdir(self.tmpdir): - shutil.rmtree(self.tmpdir) - # Do this after deletion to avoid giving the client two error messages in case of the - # block above failing - sys.stderr.write(msg) - sys.stderr.write("\n") - except Exception as ex: - logger.error("Unhandled exception: {}".format(ex)) - sys.stderr.write(ExportStatus.ERROR_GENERIC.value) - # exit with 0 return code otherwise the os will attempt to open - # the file with another application - sys.exit(0) - - def safe_check_call(self, command, error_message, ignore_stderr_startswith=None): - """ - Safely wrap subprocess.check_output to ensure we always return 0 and - log the error messages - """ - try: - err = subprocess.run(command, check=True, capture_output=True).stderr - # ppdc and lpadmin may emit warnings we are aware of which should not be treated as - # user facing errors - if ignore_stderr_startswith and err.startswith(ignore_stderr_startswith): - logger.info("Encountered warning: {}".format(err.decode("utf-8"))) - elif err == b"": - # Nothing on stderr and returncode is 0, we're good - pass - else: - self.exit_gracefully(msg=error_message, e=err) - except subprocess.CalledProcessError as ex: - self.exit_gracefully(msg=error_message, e=ex.output) - - -class ExportAction(abc.ABC): - """ - This export interface defines the method that export - methods should implement. - """ - - @abc.abstractmethod - def run(self) -> None: - """Run logic""" - pass diff --git a/securedrop_export/main.py b/securedrop_export/main.py index 042c0cd..e2910d7 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -1,46 +1,182 @@ +import os +import shutil +import platform import logging +import sys -from securedrop_export import export -from securedrop_export.exceptions import ExportStatus -from securedrop_export.print.actions import ( - PrintExportAction, - PrintTestPageAction, - PrintPreflightAction, -) -from securedrop_export.disk.actions import ( - DiskTestAction, - DiskExportAction, - USBTestAction, -) +from securedrop_export.archive import Archive, Metadata +from securedrop_export.command import Command +from securedrop_export.status import BaseStatus +from securedrop_export.directory import safe_mkdir +from securedrop_export.exceptions import ExportException + +from securedrop_export.disk import Service as ExportService +from securedrop_export.print import Service as PrintService + +from logging.handlers import TimedRotatingFileHandler, SysLogHandler +from securedrop_export import __version__ + +DEFAULT_HOME = os.path.join(os.path.expanduser("~"), ".securedrop_export") +LOG_DIR_NAME = "logs" +EXPORT_LOG_FILENAME = "export.log" logger = logging.getLogger(__name__) -def __main__(submission): - submission.extract_tarball() +class Status(BaseStatus): + """ + Status values that can occur during initialization. + """ + + ERROR_LOGGING = "ERROR_LOGGING" + ERROR_GENERIC = "ERROR_GENERIC" + ERROR_FILE_NOT_FOUND = "ERROR_FILE_NOT_FOUND" + + +def entrypoint(): + """ + Entrypoint method (Note: a method is required for setuptools). + Configure logging, extract tarball, and run desired export service, + exiting with return code 0. + + Non-zero exit values will cause the system to try alternative + solutions for mimetype handling, which we want to avoid. + """ + status, submission = None, None try: - submission.archive_metadata = export.Metadata(submission.tmpdir) - except Exception: - submission.exit_gracefully(ExportStatus.ERROR_METADATA_PARSING.value) - - if not submission.archive_metadata.is_valid(): - submission.exit_gracefully(ExportStatus.ERROR_ARCHIVE_METADATA.value) - - if submission.archive_metadata.export_method == "start-vm": - submission.exit_gracefully("") - - if submission.archive_metadata.export_method == "usb-test": - action = USBTestAction(submission) - elif submission.archive_metadata.export_method == "disk": - action = DiskExportAction(submission) - elif submission.archive_metadata.export_method == "disk-test": - action = DiskTestAction(submission) - elif submission.archive_metadata.export_method == "printer-preflight": - action = PrintPreflightAction(submission) - elif submission.archive_metadata.export_method == "printer": - action = PrintExportAction(submission) - elif submission.archive_metadata.export_method == "printer-test": - action = PrintTestPageAction(submission) - - action.run() + _configure_logging() + logger.info("Starting SecureDrop Export {}".format(__version__)) + + data_path = sys.argv[1] + + # Halt if target file is absent + if not os.path.exists(data_path): + logger.info("Archive is not found {}.".format(data_path)) + status = Status.ERROR_FILE_NOT_FOUND + + else: + logger.debug("Extract tarball") + submission = Archive(data_path).extract_tarball() + logger.debug("Validate metadata") + metadata = Metadata(submission.tmpdir).validate() + logger.info("Archive extraction and metadata validation successful") + + # If all we're doing is starting the vm, we're done; otherwise, + # run the appropriate print or export routine + if metadata.command is not Command.START_VM: + submission.set_metadata(metadata) + logger.info(f"Start {metadata.command.value} service") + status = _start_service(submission) + + except ExportException as ex: + logger.error(f"Encountered exception {ex.sdstatus.value}, exiting") + logger.error(ex) + status = ex.sdstatus + + except Exception as exc: + logger.error("Encountered exception during export, exiting") + logger.error(exc) + status = Status.ERROR_GENERIC + + finally: + _exit_gracefully(submission, status) + + +def _configure_logging(): + """ + All logging related settings are set up by this function. + """ + try: + safe_mkdir(DEFAULT_HOME) + safe_mkdir(DEFAULT_HOME, LOG_DIR_NAME) + + log_file = os.path.join(DEFAULT_HOME, LOG_DIR_NAME, EXPORT_LOG_FILENAME) + + # set logging format + log_fmt = ( + "%(asctime)s - %(name)s:%(lineno)d(%(funcName)s) " + "%(levelname)s: %(message)s" + ) + formatter = logging.Formatter(log_fmt) + + handler = TimedRotatingFileHandler(log_file) + handler.setFormatter(formatter) + + # For rsyslog handler + if platform.system() != "Linux": # pragma: no cover + syslog_file = "/var/run/syslog" + else: + syslog_file = "/dev/log" + + sysloghandler = SysLogHandler(address=syslog_file) + sysloghandler.setFormatter(formatter) + handler.setLevel(logging.DEBUG) + + # set up primary log + log = logging.getLogger() + log.setLevel(logging.DEBUG) + log.addHandler(handler) + # add the second logger + log.addHandler(sysloghandler) + except Exception as ex: + raise ExportException(sdstatus=Status.ERROR_LOGGING) from ex + + +def _start_service(submission: Archive) -> Status: + """ + Start print or export service. + """ + # Print Routines + if submission.command is Command.PRINT: + return PrintService(submission).print() + elif submission.command is Command.PRINTER_PREFLIGHT: + return PrintService(submission).printer_preflight() + elif submission.command is Command.PRINTER_TEST: + return PrintService(submission).printer_test() + + # Export routines + elif submission.command is Command.EXPORT: + return ExportService(submission).export() + elif submission.command is Command.CHECK_USBS: + return ExportService(submission).check_connected_devices() + elif submission.command is Command.CHECK_VOLUME: + return ExportService(submission).check_disk_format() + + +def _exit_gracefully(submission: Archive, status: BaseStatus = None): + """ + Write status code, ensure file cleanup, and exit with return code 0. + Non-zero exit values will cause the system to try alternative + solutions for mimetype handling, which we want to avoid. + """ + if status: + logger.info(f"Exit gracefully with status: {status.value}") + else: + logger.info("Exit gracefully (no status code supplied)") + try: + # If the file archive was extracted, delete before returning + if submission and os.path.isdir(submission.tmpdir): + shutil.rmtree(submission.tmpdir) + # Do this after deletion to avoid giving the client two error messages in case of the + # block above failing + _write_status(status) + except Exception as ex: + logger.error("Unhandled exception: {}".format(ex)) + _write_status(Status.ERROR_GENERIC) + finally: + # exit with 0 return code otherwise the os will attempt to open + # the file with another application + sys.exit(0) + + +def _write_status(status: BaseStatus): + """ + Write string to stderr. + """ + if status: + logger.info(f"Write status {status.value}") + sys.stderr.write(status.value) + sys.stderr.write("\n") + else: + logger.info("No status value supplied") diff --git a/securedrop_export/print/__init__.py b/securedrop_export/print/__init__.py index e69de29..3fa6c36 100644 --- a/securedrop_export/print/__init__.py +++ b/securedrop_export/print/__init__.py @@ -0,0 +1 @@ +from .service import Service # noqa: F401 diff --git a/securedrop_export/print/actions.py b/securedrop_export/print/actions.py deleted file mode 100644 index 78a5e3a..0000000 --- a/securedrop_export/print/actions.py +++ /dev/null @@ -1,253 +0,0 @@ -import logging -import os -import signal -import subprocess -import time - -from securedrop_export.exceptions import ExportStatus, handler, TimeoutException -from securedrop_export.export import ExportAction - - -PRINTER_NAME = "sdw-printer" -PRINTER_WAIT_TIMEOUT = 60 -BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" -BRLASER_PPD = "/usr/share/cups/model/br7030.ppd" -LASERJET_DRIVER = "/usr/share/cups/drv/hpcups.drv" -LASERJET_PPD = "/usr/share/cups/model/hp-laserjet_6l.ppd" - -logger = logging.getLogger(__name__) - - -class PrintAction(ExportAction): - def __init__(self, submission): - self.submission = submission - self.printer_name = PRINTER_NAME - self.printer_wait_timeout = PRINTER_WAIT_TIMEOUT - - def run(self) -> None: - """Run logic""" - raise NotImplementedError - - def wait_for_print(self): - # use lpstat to ensure the job was fully transfered to the printer - # returns True if print was successful, otherwise will throw exceptions - signal.signal(signal.SIGALRM, handler) - signal.alarm(self.printer_wait_timeout) - printer_idle_string = "printer {} is idle".format(self.printer_name) - while True: - try: - logger.info( - "Running lpstat waiting for printer {}".format(self.printer_name) - ) - output = subprocess.check_output(["lpstat", "-p", self.printer_name]) - if printer_idle_string in output.decode("utf-8"): - logger.info("Print completed") - return True - else: - time.sleep(5) - except subprocess.CalledProcessError: - self.submission.exit_gracefully(ExportStatus.ERROR_PRINT.value) - except TimeoutException: - logger.error("Timeout waiting for printer {}".format(self.printer_name)) - self.submission.exit_gracefully(ExportStatus.ERROR_PRINT.value) - return True - - def check_printer_setup(self) -> None: - try: - logger.info("Searching for printer") - output = subprocess.check_output(["sudo", "lpinfo", "-v"]) - printers = [x for x in output.decode("utf-8").split() if "usb://" in x] - if not printers: - logger.info("No usb printers connected") - self.submission.exit_gracefully( - ExportStatus.ERROR_PRINTER_NOT_FOUND.value - ) - - supported_printers = [ - p for p in printers if any(sub in p for sub in ("Brother", "LaserJet")) - ] - if not supported_printers: - logger.info("{} are unsupported printers".format(printers)) - self.submission.exit_gracefully( - ExportStatus.ERROR_PRINTER_NOT_SUPPORTED.value - ) - - if len(supported_printers) > 1: - logger.info("Too many usb printers connected") - self.submission.exit_gracefully( - ExportStatus.ERROR_MULTIPLE_PRINTERS_FOUND.value - ) - - printer_uri = printers[0] - printer_ppd = self.install_printer_ppd(printer_uri) - self.setup_printer(printer_uri, printer_ppd) - except subprocess.CalledProcessError as e: - logger.error(e) - self.submission.exit_gracefully(ExportStatus.ERROR_GENERIC.value) - - def get_printer_uri(self): - # Get the URI via lpinfo and only accept URIs of supported printers - printer_uri = "" - try: - output = subprocess.check_output(["sudo", "lpinfo", "-v"]) - except subprocess.CalledProcessError: - self.submission.exit_gracefully(ExportStatus.ERROR_PRINTER_URI.value) - - # fetch the usb printer uri - for line in output.split(): - if "usb://" in line.decode("utf-8"): - printer_uri = line.decode("utf-8") - logger.info("lpinfo usb printer: {}".format(printer_uri)) - - # verify that the printer is supported, else exit - if printer_uri == "": - # No usb printer is connected - logger.info("No usb printers connected") - self.submission.exit_gracefully(ExportStatus.ERROR_PRINTER_NOT_FOUND.value) - elif not any(x in printer_uri for x in ("Brother", "LaserJet")): - # printer url is a make that is unsupported - logger.info("Printer {} is unsupported".format(printer_uri)) - self.submission.exit_gracefully( - ExportStatus.ERROR_PRINTER_NOT_SUPPORTED.value - ) - - logger.info("Printer {} is supported".format(printer_uri)) - return printer_uri - - def install_printer_ppd(self, uri): - if not any(x in uri for x in ("Brother", "LaserJet")): - logger.error( - "Cannot install printer ppd for unsupported printer: {}".format(uri) - ) - self.submission.exit_gracefully( - msg=ExportStatus.ERROR_PRINTER_NOT_SUPPORTED.value - ) - return - - if "Brother" in uri: - printer_driver = BRLASER_DRIVER - printer_ppd = BRLASER_PPD - elif "LaserJet" in uri: - printer_driver = LASERJET_DRIVER - printer_ppd = LASERJET_PPD - - # Compile and install drivers that are not already installed - if not os.path.exists(printer_ppd): - logger.info("Installing printer drivers") - self.submission.safe_check_call( - command=[ - "sudo", - "ppdc", - printer_driver, - "-d", - "/usr/share/cups/model/", - ], - error_message=ExportStatus.ERROR_PRINTER_DRIVER_UNAVAILABLE.value, - ignore_stderr_startswith=b"ppdc: Warning", - ) - - return printer_ppd - - def setup_printer(self, printer_uri, printer_ppd): - # Add the printer using lpadmin - logger.info("Setting up printer {}".format(self.printer_name)) - self.submission.safe_check_call( - command=[ - "sudo", - "lpadmin", - "-p", - self.printer_name, - "-E", - "-v", - printer_uri, - "-P", - printer_ppd, - "-u", - "allow:user", - ], - error_message=ExportStatus.ERROR_PRINTER_INSTALL.value, - ignore_stderr_startswith=b"lpadmin: Printer drivers", - ) - - def print_test_page(self): - logger.info("Printing test page") - self.print_file("/usr/share/cups/data/testprint") - - def print_all_files(self): - files_path = os.path.join(self.submission.tmpdir, "export_data/") - files = os.listdir(files_path) - print_count = 0 - for f in files: - file_path = os.path.join(files_path, f) - self.print_file(file_path) - print_count += 1 - logger.info("Printing document {} of {}".format(print_count, len(files))) - - def is_open_office_file(self, filename): - OPEN_OFFICE_FORMATS = [ - ".doc", - ".docx", - ".xls", - ".xlsx", - ".ppt", - ".pptx", - ".odt", - ".ods", - ".odp", - ".rtf", - ] - for extension in OPEN_OFFICE_FORMATS: - if os.path.basename(filename).endswith(extension): - return True - return False - - def print_file(self, file_to_print): - # If the file to print is an (open)office document, we need to call unoconf to - # convert the file to pdf as printer drivers do not support this format - if self.is_open_office_file(file_to_print): - logger.info("Converting Office document to pdf") - folder = os.path.dirname(file_to_print) - converted_filename = file_to_print + ".pdf" - converted_path = os.path.join(folder, converted_filename) - self.submission.safe_check_call( - command=["unoconv", "-o", converted_path, file_to_print], - error_message=ExportStatus.ERROR_PRINT.value, - ) - file_to_print = converted_path - - logger.info("Sending file to printer {}".format(self.printer_name)) - self.submission.safe_check_call( - command=["xpp", "-P", self.printer_name, file_to_print], - error_message=ExportStatus.ERROR_PRINT.value, - ) - - -class PrintExportAction(PrintAction): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def run(self): - logger.info("Export archive is printer") - self.check_printer_setup() - # prints all documents in the archive - self.print_all_files() - - -class PrintTestPageAction(PrintAction): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def run(self): - logger.info("Export archive is printer-test") - self.check_printer_setup() - # Prints a test page to ensure the printer is functional - self.print_test_page() - - -class PrintPreflightAction(PrintAction): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - def run(self): - logger.info("Export archive is printer-preflight") - self.check_printer_setup() diff --git a/securedrop_export/print/service.py b/securedrop_export/print/service.py new file mode 100644 index 0000000..dbff034 --- /dev/null +++ b/securedrop_export/print/service.py @@ -0,0 +1,290 @@ +import logging +import os +import signal +import subprocess +import time + +from securedrop_export.exceptions import handler, TimeoutException, ExportException +from .status import Status + +logger = logging.getLogger(__name__) + + +class Service: + """ + Printer service + """ + + PRINTER_NAME = "sdw-printer" + PRINTER_WAIT_TIMEOUT = 60 + BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" + BRLASER_PPD = "/usr/share/cups/model/br7030.ppd" + LASERJET_DRIVER = "/usr/share/cups/drv/hpcups.drv" + LASERJET_PPD = "/usr/share/cups/model/hp-laserjet_6l.ppd" + + BROTHER = "Brother" + LASERJET = "LaserJet" + + SUPPORTED_PRINTERS = [BROTHER, LASERJET] + + def __init__(self, submission, printer_timeout_seconds=PRINTER_WAIT_TIMEOUT): + self.submission = submission + self.printer_name = self.PRINTER_NAME + self.printer_wait_timeout = printer_timeout_seconds # Override during testing + + def print(self): + """ + Routine to print all files. + Throws ExportException if an error is encountered. + """ + logger.info("Printing all files from archive") + self._check_printer_setup() + self._print_all_files() + # When client can accept new print statuses, we will return + # a success status here + # return Status.PRINT_SUCCESS + + def printer_preflight(self): + """ + Routine to perform preflight printer testing. + + Throws ExportException if an error is encoutered. + """ + logger.info("Running printer preflight") + self._check_printer_setup() + # When client can accept new print statuses, we will return + # a success status here + # return Status.PREFLIGHT_SUCCESS + + def printer_test(self): + """ + Routine to print a test page. + + Throws ExportException if an error is encountered. + """ + logger.info("Printing test page") + self._check_printer_setup() + self._print_test_page() + # When client can accept new print statuses, we will return + # a success status here + # return Status.TEST_SUCCESS + + def _wait_for_print(self): + """ + Use lpstat to ensure the job was fully transfered to the printer + Return True if print was successful, otherwise throw ExportException. + Currently, the handler `handler` is defined in `exceptions.py`. + """ + signal.signal(signal.SIGALRM, handler) + signal.alarm(self.printer_wait_timeout) + printer_idle_string = "printer {} is idle".format(self.printer_name) + while True: + try: + logger.info( + "Running lpstat waiting for printer {}".format(self.printer_name) + ) + output = subprocess.check_output(["lpstat", "-p", self.printer_name]) + if printer_idle_string in output.decode("utf-8"): + logger.info("Print completed") + return True + else: + time.sleep(5) + except subprocess.CalledProcessError: + raise ExportException(sdstatus=Status.ERROR_PRINT) + except TimeoutException: + logger.error("Timeout waiting for printer {}".format(self.printer_name)) + raise ExportException(sdstatus=Status.ERROR_PRINT) + return True + + def _check_printer_setup(self) -> None: + """ + Check printer setup. + Raise ExportException if supported setup is not found. + """ + try: + logger.info("Searching for printer") + output = subprocess.check_output(["sudo", "lpinfo", "-v"]) + printers = [x for x in output.decode("utf-8").split() if "usb://" in x] + if not printers: + logger.info("No usb printers connected") + raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_FOUND) + + supported_printers = [ + p for p in printers if any(sub in p for sub in self.SUPPORTED_PRINTERS) + ] + if not supported_printers: + logger.info("{} are unsupported printers".format(printers)) + raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_SUPPORTED) + + if len(supported_printers) > 1: + logger.info("Too many usb printers connected") + raise ExportException(sdstatus=Status.ERROR_MULTIPLE_PRINTERS_FOUND) + + printer_uri = printers[0] + printer_ppd = self._install_printer_ppd(printer_uri) + self._setup_printer(printer_uri, printer_ppd) + except subprocess.CalledProcessError as e: + logger.error(e) + raise ExportException(sdstatus=Status.ERROR_UNKNOWN) + + def _get_printer_uri(self) -> str: + """ + Get the URI via lpinfo. Only accept URIs of supported printers. + + Raise ExportException if supported setup is not found. + """ + printer_uri = "" + try: + output = subprocess.check_output(["sudo", "lpinfo", "-v"]) + except subprocess.CalledProcessError: + logger.error("Error attempting to retrieve printer uri with lpinfo") + raise ExportException(sdstatus=Status.ERROR_PRINTER_URI) + + # fetch the usb printer uri + for line in output.split(): + if "usb://" in line.decode("utf-8"): + printer_uri = line.decode("utf-8") + logger.info("lpinfo usb printer: {}".format(printer_uri)) + + # verify that the printer is supported, else throw + if printer_uri == "": + # No usb printer is connected + logger.info("No usb printers connected") + raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_FOUND) + elif not any(x in printer_uri for x in self.SUPPORTED_PRINTERS): + # printer url is a make that is unsupported + logger.info("Printer {} is unsupported".format(printer_uri)) + raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_SUPPORTED) + + logger.info("Printer {} is supported".format(printer_uri)) + return printer_uri + + def _install_printer_ppd(self, uri): + if not any(x in uri for x in self.SUPPORTED_PRINTERS): + logger.error( + "Cannot install printer ppd for unsupported printer: {}".format(uri) + ) + raise ExportException(sdstatus=Status.ERROR_PRINTER_NOT_SUPPORTED) + + if self.BROTHER in uri: + printer_driver = self.BRLASER_DRIVER + printer_ppd = self.BRLASER_PPD + elif self.LASERJET in uri: + printer_driver = self.LASERJET_DRIVER + printer_ppd = self.LASERJET_PPD + + # Compile and install drivers that are not already installed + if not os.path.exists(printer_ppd): + logger.info("Installing printer drivers") + self.safe_check_call( + command=[ + "sudo", + "ppdc", + printer_driver, + "-d", + "/usr/share/cups/model/", + ], + error_status=Status.ERROR_PRINTER_DRIVER_UNAVAILABLE, + ignore_stderr_startswith=b"ppdc: Warning", + ) + + return printer_ppd + + def _setup_printer(self, printer_uri, printer_ppd): + # Add the printer using lpadmin + logger.info("Setting up printer {}".format(self.printer_name)) + self.safe_check_call( + command=[ + "sudo", + "lpadmin", + "-p", + self.printer_name, + "-E", + "-v", + printer_uri, + "-P", + printer_ppd, + "-u", + "allow:user", + ], + error_status=Status.ERROR_PRINTER_INSTALL, + ignore_stderr_startswith=b"lpadmin: Printer drivers", + ) + + def _print_test_page(self): + logger.info("Printing test page") + self._print_file("/usr/share/cups/data/testprint") + + def _print_all_files(self): + files_path = os.path.join(self.submission.tmpdir, "export_data/") + files = os.listdir(files_path) + print_count = 0 + for f in files: + file_path = os.path.join(files_path, f) + self._print_file(file_path) + print_count += 1 + logger.info("Printing document {} of {}".format(print_count, len(files))) + + def _is_open_office_file(self, filename): + OPEN_OFFICE_FORMATS = [ + ".doc", + ".docx", + ".xls", + ".xlsx", + ".ppt", + ".pptx", + ".odt", + ".ods", + ".odp", + ".rtf", + ] + for extension in OPEN_OFFICE_FORMATS: + if os.path.basename(filename).endswith(extension): + return True + return False + + def _print_file(self, file_to_print): + # If the file to print is an (open)office document, we need to call unoconf to + # convert the file to pdf as printer drivers do not support this format + if self._is_open_office_file(file_to_print): + logger.info("Converting Office document to pdf") + folder = os.path.dirname(file_to_print) + converted_filename = file_to_print + ".pdf" + converted_path = os.path.join(folder, converted_filename) + self.safe_check_call( + command=["unoconv", "-o", converted_path, file_to_print], + error_status=Status.ERROR_PRINT, + ) + file_to_print = converted_path + + logger.info("Sending file to printer {}".format(self.printer_name)) + + self.safe_check_call( + command=["xpp", "-P", self.printer_name, file_to_print], + error_status=Status.ERROR_PRINT, + ) + # This is an addition to ensure that the entire print job is transferred over. + # If the job is not fully transferred within the timeout window, the user + # will see an error message. + self._wait_for_print() + + def safe_check_call( + self, command: str, error_status: Status, ignore_stderr_startswith=None + ): + """ + Wrap subprocess.check_output to ensure we wrap CalledProcessError and return + our own exception, and log the error messages. + """ + try: + err = subprocess.run(command, check=True, capture_output=True).stderr + # ppdc and lpadmin may emit warnings we are aware of which should not be treated as + # user facing errors + if ignore_stderr_startswith and err.startswith(ignore_stderr_startswith): + logger.info("Encountered warning: {}".format(err.decode("utf-8"))) + elif err == b"": + # Nothing on stderr and returncode is 0, we're good + pass + else: + raise ExportException(sdstatus=error_status, sderror=err) + except subprocess.CalledProcessError as ex: + raise ExportException(sdstatus=error_status, sderror=ex.output) diff --git a/securedrop_export/print/status.py b/securedrop_export/print/status.py new file mode 100644 index 0000000..5ec81c8 --- /dev/null +++ b/securedrop_export/print/status.py @@ -0,0 +1,22 @@ +from securedrop_export.status import BaseStatus + + +class Status(BaseStatus): + + # Printer preflight related errors + ERROR_MULTIPLE_PRINTERS_FOUND = "ERROR_MULTIPLE_PRINTERS_FOUND" + ERROR_PRINTER_NOT_FOUND = "ERROR_PRINTER_NOT_FOUND" + ERROR_PRINTER_NOT_SUPPORTED = "ERROR_PRINTER_NOT_SUPPORTED" + ERROR_PRINTER_DRIVER_UNAVAILABLE = "ERROR_PRINTER_DRIVER_UNAVAILABLE" + ERROR_PRINTER_INSTALL = "ERROR_PRINTER_INSTALL" + ERROR_PRINTER_URI = "ERROR_PRINTER_URI" # new + + # Print error + ERROR_PRINT = "ERROR_PRINT" + + # New + PREFLIGHT_SUCCESS = "PRINTER_PREFLIGHT_SUCCESS" + TEST_SUCCESS = "PRINTER_TEST_SUCCESS" + PRINT_SUCCESS = "PRINTER_SUCCESS" + + ERROR_UNKNOWN = "ERROR_GENERIC" # Unknown printer error, backwards-compatible diff --git a/securedrop_export/status.py b/securedrop_export/status.py new file mode 100644 index 0000000..bc3d29d --- /dev/null +++ b/securedrop_export/status.py @@ -0,0 +1,12 @@ +from enum import Enum + + +class BaseStatus(Enum): + """ + Base class for export and print statuses. A Status represents a string that can be returned + to the calling VM via stderr to provide diagnostic information about the success of a call. + Status values are defined in subclasses in their respective packages. A full list is available + in the project's README. + """ + + pass diff --git a/setup.py b/setup.py index d215149..b04979d 100644 --- a/setup.py +++ b/setup.py @@ -32,6 +32,6 @@ "Operating System :: OS Independent", ), entry_points={ - "console_scripts": ["send-to-usb = securedrop_export.entrypoint:start"] + "console_scripts": ["send-to-usb = securedrop_export.main:entrypoint"] }, ) diff --git a/tests/disk/test_actions.py b/tests/disk/test_actions.py deleted file mode 100644 index 7d5d24d..0000000 --- a/tests/disk/test_actions.py +++ /dev/null @@ -1,215 +0,0 @@ -from unittest import mock - -import os -import pytest -import sys - -from subprocess import CalledProcessError - -from securedrop_export import export -from securedrop_export.disk.actions import DiskExportAction, DiskTestAction - -TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") -SAMPLE_OUTPUT_NO_PART = b"disk\ncrypt" # noqa -SAMPLE_OUTPUT_ONE_PART = b"disk\npart\ncrypt" # noqa -SAMPLE_OUTPUT_MULTI_PART = b"disk\npart\npart\npart\ncrypt" # noqa -SAMPLE_OUTPUT_USB = b"/dev/sda" # noqa - - -def test_usb_precheck_disconnected(capsys, mocker): - """Tests the scenario where there are disks connected, but none of them are USB""" - submission = export.SDExport("testfile", TEST_CONFIG) - action = DiskTestAction(submission) - expected_message = "USB_NOT_CONNECTED" - assert export.ExportStatus.USB_NOT_CONNECTED.value == expected_message - - # Popen call returns lsblk output - command_output = mock.MagicMock() - command_output.stdout = mock.MagicMock() - command_output.stdout.readlines = mock.MagicMock( - return_value=[b"sda disk\n", b"sdb disk\n"] - ) - mocker.patch("subprocess.Popen", return_value=command_output) - - # check_output returns removable status - mocker.patch("subprocess.check_output", return_value=[b"0\n", b"0\n"]) - - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - - mocker.patch( - "subprocess.check_output", side_effect=CalledProcessError(1, "check_output") - ) - - action.check_usb_connected(exit=True) - - mocked_exit.assert_called_once_with(expected_message) - assert action.device is None - - -def test_usb_precheck_connected(capsys, mocker): - """Tests the scenario where there is one USB connected""" - submission = export.SDExport("testfile", TEST_CONFIG) - action = DiskTestAction(submission) - - # Popen call returns lsblk output - command_output = mock.MagicMock() - command_output.stdout = mock.MagicMock() - command_output.stdout.readlines = mock.MagicMock(return_value=[b"sdb disk\n"]) - mocker.patch("subprocess.Popen", return_value=command_output) - - # check_output returns removable status - mocker.patch("subprocess.check_output", return_value=b"1\n") - - expected_message = "USB_CONNECTED" - assert export.ExportStatus.USB_CONNECTED.value == expected_message - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - - action.check_usb_connected(exit=True) - - mocked_exit.assert_called_once_with(expected_message) - assert action.device == "/dev/sdb" - - -def test_usb_precheck_multiple_devices_connected(capsys, mocker): - """Tests the scenario where there are multiple USB drives connected""" - submission = export.SDExport("testfile", TEST_CONFIG) - action = DiskTestAction(submission) - - # Popen call returns lsblk output - command_output = mock.MagicMock() - command_output.stdout = mock.MagicMock() - command_output.stdout.readlines = mock.MagicMock( - return_value=[b"sdb disk\n", b"sdc disk\n"] - ) - mocker.patch("subprocess.Popen", return_value=command_output) - - # check_output returns removable status - mocker.patch("subprocess.check_output", return_value=b"1\n") - - expected_message = "ERROR_GENERIC" - assert export.ExportStatus.ERROR_GENERIC.value == expected_message - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - - action.check_usb_connected(exit=True) - - mocked_exit.assert_called_once_with(expected_message) - assert action.device is None - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PART) -def test_extract_device_name_no_part(mocked_call, capsys): - submission = export.SDExport("testfile", TEST_CONFIG) - action = DiskExportAction(submission) - - action.device = "/dev/sda" - - action.set_extracted_device_name() - - assert action.device == "/dev/sda" - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) -def test_extract_device_name_single_part(mocked_call, capsys): - submission = export.SDExport("testfile", TEST_CONFIG) - action = DiskExportAction(submission) - - action.device = "/dev/sda" - - action.set_extracted_device_name() - - assert action.device == "/dev/sda1" - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART) -def test_extract_device_name_multiple_part(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = DiskExportAction(submission) - action.device = "/dev/sda" - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - expected_message = export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value - - action.set_extracted_device_name() - - mocked_exit.assert_called_once_with(expected_message) - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PART) -def test_luks_precheck_encrypted_fde(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = DiskExportAction(submission) - - command_output = mock.MagicMock() - command_output.stderr = b"" - mocker.patch("subprocess.run", return_value=command_output) - - expected_message = export.ExportStatus.USB_ENCRYPTED.value - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - - action.check_luks_volume() - - mocked_exit.assert_called_once_with(expected_message) - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) -def test_luks_precheck_encrypted_single_part(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = DiskExportAction(submission) - action.device = "/dev/sda" - expected_message = export.ExportStatus.USB_ENCRYPTED.value - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - - command_output = mock.MagicMock() - command_output.stderr = b"" - mocker.patch("subprocess.run", return_value=command_output) - - action.check_luks_volume() - - mocked_exit.assert_called_once_with(expected_message) - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART) -def test_luks_precheck_encrypted_multi_part(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = DiskExportAction(submission) - action.device = "/dev/sda" - expected_message = export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value - - # Here we need to mock the exit_gracefully method with a side effect otherwise - # program execution will continue after exit_gracefully and exit_gracefully - # may be called a second time. - mocked_exit = mocker.patch.object( - submission, "exit_gracefully", side_effect=lambda x: sys.exit(0) - ) - - # Output of `lsblk -o TYPE --noheadings DEVICE_NAME` when a drive has multiple - # partitions - multi_partition_lsblk_output = b"disk\npart\npart\n" - mocker.patch("subprocess.check_output", return_value=multi_partition_lsblk_output) - - with pytest.raises(SystemExit): - action.check_luks_volume() - - mocked_exit.assert_called_once_with(expected_message) - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) -def test_luks_precheck_encrypted_luks_error(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = DiskExportAction(submission) - action.device = "/dev/sda" - expected_message = "USB_ENCRYPTION_NOT_SUPPORTED" - assert expected_message == export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value - - mocked_exit = mocker.patch.object( - submission, "exit_gracefully", side_effect=lambda msg, e: sys.exit(0) - ) - - single_partition_lsblk_output = b"disk\npart\n" - mocker.patch("subprocess.check_output", return_value=single_partition_lsblk_output) - mocker.patch("subprocess.run", side_effect=CalledProcessError(1, "run")) - - with pytest.raises(SystemExit): - action.check_luks_volume() - - assert mocked_exit.mock_calls[0][2]["msg"] == expected_message - assert mocked_exit.mock_calls[0][2]["e"] is None diff --git a/tests/disk/test_cli.py b/tests/disk/test_cli.py new file mode 100644 index 0000000..310e4b2 --- /dev/null +++ b/tests/disk/test_cli.py @@ -0,0 +1,573 @@ +import pytest +from unittest import mock + +import subprocess + +from securedrop_export.disk.cli import CLI +from securedrop_export.disk.volume import EncryptionScheme, Volume +from securedrop_export.exceptions import ExportException +from securedrop_export.disk.status import Status + +from securedrop_export.archive import Archive + +_DEFAULT_USB_DEVICE = "/dev/sda" +_DEFAULT_USB_DEVICE_ONE_PART = "/dev/sda1" + +_PRETEND_LUKS_ID = "luks-id-123456" + +# Sample stdout from shell commands +_SAMPLE_OUTPUT_NO_PART = b"disk\ncrypt" # noqa +_SAMPLE_OUTPUT_ONE_PART = b"disk\npart\ncrypt" # noqa +_SAMPLE_OUTPUT_MULTI_PART = b"disk\npart\npart\npart\ncrypt" # noqa +_SAMPLE_OUTPUT_USB = b"/dev/sda" # noqa + +_SAMPLE_LUKS_HEADER = b"\n\nUUID:\t123456-DEADBEEF" # noqa + + +class TestCli: + """ + Test the CLI wrapper that handless identification and locking/unlocking of + USB volumes. + """ + + @classmethod + def setup_class(cls): + cls.cli = CLI() + + @classmethod + def teardown_class(cls): + cls.cli = None + + def _setup_usb_devices(self, mocker, disks, is_removable): + """ + Helper function to set up mocked shell calls representing + the search for attached USB devices. + The original calls are `lsblk | grep disk` and + `cat /sys/class/block/{disk}/removable` + + Parameters: + disks (byte array): Array of disk names separated by newline. + is_removable (byte array): Array of removable status results (1 for removable), + separated by newline + """ + + # Patch commandline calls to `lsblk | grep disk` + command_output = mock.MagicMock() + command_output.stdout = mock.MagicMock() + command_output.stdout.readlines = mock.MagicMock(return_value=disks) + mocker.patch("subprocess.Popen", return_value=command_output) + + # Patch commandline call to 'cat /sys/class/block/{device}/removable' + + # Using side_effect with an iterable allows for different return value each time, + # which matches what would happen if iterating through list of devices + mocker.patch("subprocess.check_output", side_effect=is_removable) + + def test_get_connected_devices(self, mocker): + disks = [b"sda disk\n", b"sdb disk\n"] + removable = [b"1\n", b"1\n"] + + self._setup_usb_devices(mocker, disks, removable) + result = self.cli.get_connected_devices() + + assert result[0] == "/dev/sda" and result[1] == "/dev/sdb" + + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) + def test_get_removable_devices_none_removable(self, mocker): + disks = [b"sda disk\n", b"sdb disk\n"] + removable = [b"0\n", b"0\n"] + + self._setup_usb_devices(mocker, disks, removable) + + result = self.cli._get_removable_devices(disks) + assert len(result) == 0 + + @mock.patch( + "subprocess.Popen", side_effect=subprocess.CalledProcessError(1, "Popen") + ) + def test_get_connected_devices_error(self, mocked_subprocess): + + with pytest.raises(ExportException): + self.cli.get_connected_devices() + + @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_NO_PART) + def test_get_partitioned_device_no_partition(self, mocked_call): + assert ( + self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) == _DEFAULT_USB_DEVICE + ) + + @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_ONE_PART) + def test_get_partitioned_device_one_partition(self, mocked_call): + assert ( + self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) + == _DEFAULT_USB_DEVICE + "1" + ) + + @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_MULTI_PART) + def test_get_partitioned_device_multi_partition(self, mocked_call): + + with pytest.raises(ExportException) as ex: + self.cli.get_partitioned_device(_SAMPLE_OUTPUT_MULTI_PART) + + assert ex.value.sdstatus is Status.INVALID_DEVICE_DETECTED + + @mock.patch("subprocess.check_output", return_value=None) + def test_get_partitioned_device_lsblk_error(self, mocked_subprocess): + with pytest.raises(ExportException) as ex: + self.cli.get_partitioned_device(_SAMPLE_OUTPUT_ONE_PART) + + assert ex.value.sdstatus is Status.DEVICE_ERROR + + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) + def test_get_partitioned_device_multi_partition_error(self, mocked_call): + + # Make sure we wrap CalledProcessError and throw our own exception + with pytest.raises(ExportException) as ex: + self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) + + assert ex.value.sdstatus is Status.DEVICE_ERROR + + @mock.patch("subprocess.check_call", return_value=0) + def test_is_luks_volume_true(self, mocked_call): + + # `sudo cryptsetup isLuks` returns 0 if true + assert self.cli.is_luks_volume(_SAMPLE_OUTPUT_ONE_PART) + + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) + def test_is_luks_volume_false(self, mocked_subprocess): + + # `sudo cryptsetup isLuks` returns 1 if false; CalledProcessError is thrown + assert not self.cli.is_luks_volume(_SAMPLE_OUTPUT_ONE_PART) + + @mock.patch("subprocess.check_output", return_value=_SAMPLE_LUKS_HEADER) + def test__get_luks_name_from_headers(self, mocked_subprocess): + result = self.cli._get_luks_name_from_headers(_DEFAULT_USB_DEVICE) + + assert result is not None and result.split("-")[ + 1 + ] in _SAMPLE_LUKS_HEADER.decode("utf8") + + @mock.patch( + "subprocess.check_output", return_value=b"corrupted-or-invalid-header\n" + ) + def test__get_luks_name_from_headers_error_invalid(self, mocked_subprocess): + + with pytest.raises(ExportException) as ex: + self.cli._get_luks_name_from_headers(_DEFAULT_USB_DEVICE) + + assert ex.value.sdstatus is Status.INVALID_DEVICE_DETECTED + + @mock.patch("subprocess.check_output", return_value=b"\n") + def test__get_luks_name_from_headers_error_no_header(self, mocked_subprocess): + + with pytest.raises(ExportException) as ex: + self.cli._get_luks_name_from_headers(_DEFAULT_USB_DEVICE) + + assert ex.value.sdstatus is Status.INVALID_DEVICE_DETECTED + + @mock.patch("subprocess.check_output", return_value=None) + def test__get_luks_name_from_headers_error_nothing_returned( + self, mocked_subprocess + ): + + with pytest.raises(ExportException) as ex: + self.cli._get_luks_name_from_headers(_DEFAULT_USB_DEVICE) + + assert ex.value.sdstatus is Status.INVALID_DEVICE_DETECTED + + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) + def test__get_luks_name_from_headers_error(self, mocked_subprocess): + with pytest.raises(ExportException): + self.cli._get_luks_name_from_headers(_DEFAULT_USB_DEVICE) + + @mock.patch("os.path.exists", return_value=True) + @mock.patch("subprocess.check_output", return_value=_SAMPLE_LUKS_HEADER) + def test_get_luks_volume_already_unlocked(self, mocked_subprocess, mocked_os_call): + result = self.cli.get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) + + assert result.encryption is EncryptionScheme.LUKS + assert result.unlocked + + @mock.patch("os.path.exists", return_value=False) + @mock.patch("subprocess.check_output", return_value=_SAMPLE_LUKS_HEADER) + def test_get_luks_volume_still_locked(self, mocked_subprocess, mocked_os_call): + result = self.cli.get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) + + assert result.encryption is EncryptionScheme.LUKS + assert not result.unlocked + + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError("check_output", 1), + ) + def test_get_luks_volume_error(self, mocked_subprocess): + with pytest.raises(ExportException) as ex: + self.cli.get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) + + assert ex.value.sdstatus is Status.DEVICE_ERROR + + @mock.patch("os.path.exists", return_value=True) + def test_unlock_luks_volume_success(self, mock_path, mocker): + mock_popen = mocker.MagicMock() + mock_popen_communicate = mocker.MagicMock() + mock_popen.returncode = 0 + + mocker.patch("subprocess.Popen", return_value=mock_popen) + mocker.patch( + "subprocess.Popen.communicate", return_value=mock_popen_communicate + ) + + mapped_name = "luks-id-123456" + vol = Volume( + device_name=_DEFAULT_USB_DEVICE, + mapped_name=mapped_name, + encryption=EncryptionScheme.LUKS, + ) + key = "a_key&_!" + result = self.cli.unlock_luks_volume(vol, key) + assert result.unlocked + + @mock.patch("os.path.exists", return_value=True) + def test_unlock_luks_volume_not_luks(self, mocker): + mock_popen = mocker.MagicMock() + mock_popen.communicate = mocker.MagicMock() + mock_popen.communicate.returncode = 1 # An error unlocking + + mocker.patch("subprocess.Popen", mock_popen) + + vol = Volume( + device_name=_DEFAULT_USB_DEVICE, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.UNKNOWN, + ) + key = "a key!" + + with pytest.raises(ExportException) as ex: + self.cli.unlock_luks_volume(vol, key) + + assert ex.value.sdstatus is Status.DEVICE_ERROR + + def test_unlock_luks_volume_passphrase_failure(self, mocker): + mock_popen = mocker.MagicMock() + mock_popen.communicate = mocker.MagicMock() + mock_popen.communicate.returncode = 1 # An error unlocking + + mocker.patch("subprocess.Popen", mock_popen) + + vol = Volume( + device_name=_DEFAULT_USB_DEVICE, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + key = "a key!" + + with pytest.raises(ExportException): + self.cli.unlock_luks_volume(vol, key) + + @mock.patch( + "subprocess.Popen", side_effect=subprocess.CalledProcessError("1", "Popen") + ) + def test_unlock_luks_volume_luksOpen_exception(self, mocked_subprocess): + pd = Volume( + device_name=_DEFAULT_USB_DEVICE, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + key = "a key!" + + with pytest.raises(ExportException) as ex: + self.cli.unlock_luks_volume(pd, key) + + assert ex.value.sdstatus is Status.DEVICE_ERROR + + @mock.patch("os.path.exists", return_value=True) + @mock.patch("subprocess.check_output", return_value=b"\n") + @mock.patch("subprocess.check_call", return_value=0) + def test_mount_volume(self, mocked_call, mocked_output, mocked_path): + vol = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + self.cli.mount_volume(vol) + assert vol.mountpoint is self.cli._DEFAULT_MOUNTPOINT + + @mock.patch("os.path.exists", return_value=True) + @mock.patch( + "subprocess.check_output", return_value=b"/dev/pretend/luks-id-123456\n" + ) + @mock.patch("subprocess.check_call", return_value=0) + def test_mount_volume_already_mounted( + self, mocked_output, mocked_call, mocked_path + ): + md = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + result = self.cli.mount_volume(md) + assert result.mountpoint == "/dev/pretend/luks-id-123456" + + @mock.patch("os.path.exists", return_value=True) + @mock.patch("subprocess.check_output", return_value=b"\n") + @mock.patch("subprocess.check_call", return_value=0) + def test_mount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path): + md = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + assert self.cli.mount_volume(md).mapped_name == _PRETEND_LUKS_ID + + @mock.patch("subprocess.check_output", return_value=b"\n") + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) + def test_mount_volume_error(self, mocked_subprocess, mocked_output): + md = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + + with pytest.raises(ExportException) as ex: + self.cli.mount_volume(md) + + assert ex.value.sdstatus is Status.ERROR_MOUNT + + @mock.patch("os.path.exists", return_value=False) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) + def test_mount_at_mountpoint_mkdir_error(self, mocked_subprocess, mocked_path): + md = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + + with pytest.raises(ExportException) as ex: + volume = self.cli._mount_at_mountpoint(md, self.cli._DEFAULT_MOUNTPOINT) + assert not volume.writable + + assert ex.value.sdstatus is Status.ERROR_MOUNT + + @mock.patch("os.path.exists", return_value=True) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) + def test_mount_at_mountpoint_mounting_error(self, mocked_subprocess, mocked_path): + md = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + + with pytest.raises(ExportException) as ex: + volume = self.cli._mount_at_mountpoint(md, self.cli._DEFAULT_MOUNTPOINT) + assert not volume.writable + + assert ex.value.sdstatus is Status.ERROR_MOUNT + + @mock.patch("os.path.exists", return_value=True) + @mock.patch("subprocess.check_call", return_value=0) + def test__unmount_volume(self, mocked_subprocess, mocked_mountpath): + mounted = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + mountpoint=self.cli._DEFAULT_MOUNTPOINT, + encryption=EncryptionScheme.LUKS, + ) + + result = self.cli._unmount_volume(mounted) + assert result.mountpoint is None + + @mock.patch("os.path.exists", return_value=True) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) + def test__unmount_volume_error(self, mocked_subprocess, mocked_mountpath): + mounted = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + mountpoint=self.cli._DEFAULT_MOUNTPOINT, + encryption=EncryptionScheme.LUKS, + ) + + with pytest.raises(ExportException) as ex: + self.cli._unmount_volume(mounted) + + assert ex.value.sdstatus is Status.DEVICE_ERROR + + @mock.patch("os.path.exists", return_value=True) + @mock.patch("subprocess.check_call", return_value=0) + def test__close_luks_volume(self, mocked_subprocess, mocked_os_call): + mapped = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + + # If call completes without error, drive was successfully closed with luksClose + self.cli._close_luks_volume(mapped) + + @mock.patch("os.path.exists", return_value=True) + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) + def test__close_luks_volume_error(self, mocked_subprocess, mocked_os_call): + mapped = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + + with pytest.raises(ExportException) as ex: + self.cli._close_luks_volume(mapped) + + assert ex.value.sdstatus is Status.DEVICE_ERROR + + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) + def test__remove_temp_directory_error(self, mocked_subprocess): + with pytest.raises(ExportException): + self.cli._remove_temp_directory("tmp") + + @mock.patch("subprocess.check_call", return_value=0) + def test_write_to_disk(self, mock_check_call): + # Temporarily patch a method, to later assert it is called + patch = mock.patch.object(self.cli, "cleanup_drive_and_tmpdir") + patch.return_value = mock.MagicMock() + patch.start() + + vol = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + mountpoint=self.cli._DEFAULT_MOUNTPOINT, + encryption=EncryptionScheme.LUKS, + ) + + submission = Archive("testfile") + + self.cli.write_data_to_device(submission.tmpdir, submission.target_dirname, vol) + self.cli.cleanup_drive_and_tmpdir.assert_called_once() + + # Don't want to patch it indefinitely though, that will mess with the other tests + patch.stop() + + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) + def test_write_to_disk_error_still_does_cleanup(self, mock_call): + # see above - patch internal method only for this test + patch = mock.patch.object(self.cli, "cleanup_drive_and_tmpdir") + patch.return_value = mock.MagicMock() + patch.start() + + vol = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + mountpoint=self.cli._DEFAULT_MOUNTPOINT, + encryption=EncryptionScheme.LUKS, + ) + submission = Archive("testfile") + + with pytest.raises(ExportException): + self.cli.write_data_to_device( + submission.tmpdir, submission.target_dirname, vol + ) + self.cli.cleanup_drive_and_tmpdir.assert_called_once() + + patch.stop() + + @mock.patch( + "subprocess.check_call", + side_effect=subprocess.CalledProcessError(1, "check_call"), + ) + def test_cleanup_drive_and_tmpdir_error(self, mocked_subprocess): + submission = Archive("testfile") + mock_volume = mock.MagicMock(Volume) + + with pytest.raises(ExportException) as ex: + self.cli.cleanup_drive_and_tmpdir(mock_volume, submission.tmpdir) + assert ex.value.sdstatus is Status.ERROR_EXPORT_CLEANUP + + @mock.patch("os.path.exists", return_value=False) + @mock.patch("subprocess.check_call", return_value=0) + def test_cleanup_drive_and_tmpdir(self, mock_subprocess, mocked_path): + submission = Archive("testfile") + vol = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + mountpoint=self.cli._DEFAULT_MOUNTPOINT, + encryption=EncryptionScheme.LUKS, + ) + + close_patch = mock.patch.object(self.cli, "_close_luks_volume") + remove_tmpdir_patch = mock.patch.object(self.cli, "_remove_temp_directory") + + close_mock = close_patch.start() + rm_tpdir_mock = remove_tmpdir_patch.start() + + # That was all setup. Here's our test + self.cli.cleanup_drive_and_tmpdir(vol, submission.tmpdir) + + close_mock.assert_called_once_with(vol) + rm_tpdir_mock.assert_called_once_with(submission.tmpdir) + + # Undo patch changes + close_patch.stop() + remove_tmpdir_patch.stop() + + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) + def test_mountpoint_error(self, mock_subprocess): + with pytest.raises(ExportException) as ex: + self.cli._get_mountpoint( + Volume( + device_name=_DEFAULT_USB_DEVICE, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + ) + + assert ex.value.sdstatus is Status.ERROR_MOUNT + + @mock.patch("os.path.exists", return_value=False) + def test_mount_mkdir_fails(self, mocked_path): + mock_mountpoint = mock.patch.object(self.cli, "_get_mountpoint") + mock_mountpoint.return_value = None + + vol = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + mock.patch.object(vol, "unlocked", return_value=True) + + with pytest.raises(ExportException) as ex: + self.cli.mount_volume(vol) + + assert ex.value.sdstatus is Status.ERROR_MOUNT diff --git a/tests/disk/test_service.py b/tests/disk/test_service.py new file mode 100644 index 0000000..17ad326 --- /dev/null +++ b/tests/disk/test_service.py @@ -0,0 +1,203 @@ +import pytest +from unittest import mock +import os +import tempfile + +from securedrop_export.exceptions import ExportException +from securedrop_export.disk.legacy_status import Status as LegacyStatus +from securedrop_export.disk.status import Status as Status +from securedrop_export.disk.volume import Volume, EncryptionScheme +from securedrop_export.archive import Archive, Metadata +from securedrop_export.disk.legacy_service import Service +from securedrop_export.disk.cli import CLI + +SAMPLE_OUTPUT_LSBLK_NO_PART = b"disk\ncrypt" # noqa +SAMPLE_OUTPUT_USB = "/dev/sda" # noqa +SAMPLE_OUTPUT_USB_PARTITIONED = "/dev/sda1" + + +class TestExportService: + @classmethod + def setup_class(cls): + cls.mock_cli = mock.MagicMock(CLI) + cls.mock_submission = cls._setup_submission() + + cls.mock_luks_volume_unmounted = Volume( + device_name=SAMPLE_OUTPUT_USB, + mapped_name="fake-luks-id-123456", + encryption=EncryptionScheme.LUKS, + ) + cls.mock_luks_volume_mounted = Volume( + device_name=SAMPLE_OUTPUT_USB, + mapped_name="fake-luks-id-123456", + mountpoint="/media/usb", + encryption=EncryptionScheme.LUKS, + ) + + cls.service = Service(cls.mock_submission, cls.mock_cli) + + @classmethod + def teardown_class(cls): + cls.mock_cli = None + cls.mock_submission = None + cls.service = None + + @classmethod + def _setup_submission(cls) -> Archive: + """ + Helper method to set up sample archive + """ + submission = Archive("testfile") + temp_folder = tempfile.mkdtemp() + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) + with open(metadata, "w") as f: + f.write( + '{"device": "disk", "encryption_method":' + ' "luks", "encryption_key": "hunter1"}' + ) + + return submission.set_metadata(Metadata(temp_folder).validate()) + + def setup_method(self, method): + """ + By default, mock CLI will return the "happy path" of a correctly-formatted LUKS drive. + Override this behaviour in the target method as required, for example to simulate CLI + errors. `teardown_method()` will reset the side effects so they do not affect subsequent + test methods. + """ + self.mock_cli.get_connected_devices.return_value = [SAMPLE_OUTPUT_USB] + self.mock_cli.get_partitioned_device.return_value = ( + SAMPLE_OUTPUT_USB_PARTITIONED + ) + self.mock_cli.get_luks_volume.return_value = self.mock_luks_volume_unmounted + self.mock_cli.mount_volume.return_value = self.mock_luks_volume_mounted + + def teardown_method(self, method): + self.mock_cli.reset_mock(return_value=True, side_effect=True) + + def test_check_usb(self): + status = self.service.check_connected_devices() + + assert status is LegacyStatus.LEGACY_USB_CONNECTED + + def test_no_devices_connected(self): + self.mock_cli.get_connected_devices.return_value = [] + with pytest.raises(ExportException) as ex: + self.service.check_connected_devices() + + assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_NOT_CONNECTED + + def test_too_many_devices_connected(self): + self.mock_cli.get_connected_devices.return_value = [ + SAMPLE_OUTPUT_USB, + "/dev/sdb", + ] + with pytest.raises(ExportException) as ex: + self.service.check_connected_devices() + + assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + + def test_device_is_not_luks(self): + self.mock_cli.is_luks_volume.return_value = False + + # When VeraCrypt is supported, this will no longer be an exception + # and the return status will change + with pytest.raises(ExportException) as ex: + self.service.check_disk_format() + + assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + + def test_check_usb_error(self): + self.mock_cli.get_connected_devices.side_effect = ExportException( + sdstatus=LegacyStatus.LEGACY_ERROR_USB_CHECK + ) + + with pytest.raises(ExportException) as ex: + self.service.check_connected_devices() + + assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_CHECK + + def test_check_disk_format(self): + status = self.service.check_disk_format() + + assert status is LegacyStatus.LEGACY_USB_ENCRYPTED + + def test_check_disk_format_error(self): + self.mock_cli.get_partitioned_device.side_effect = ExportException( + sdstatus=Status.INVALID_DEVICE_DETECTED + ) + + with pytest.raises(ExportException) as ex: + self.service.check_disk_format() + + # We still return the legacy status for now + assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + + def test_export(self): + # Currently, a successful export does not return a success status. + # When the client is updated, this will change to assert EXPORT_SUCCESS + # is returned. + self.service.export() + + def test_export_disk_not_supported(self): + self.mock_cli.is_luks_volume.return_value = False + + with pytest.raises(ExportException) as ex: + self.service.export() + + assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + + def test_export_write_error(self): + self.mock_cli.is_luks_volume.return_value = True + self.mock_cli.write_data_to_device.side_effect = ExportException( + sdstatus=LegacyStatus.LEGACY_ERROR_USB_WRITE + ) + + with pytest.raises(ExportException) as ex: + self.service.export() + + assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_WRITE + + def test_export_throws_new_exception_return_legacy_status(self): + self.mock_cli.get_connected_devices.side_effect = ExportException( + sdstatus=Status.ERROR_MOUNT + ) + + with pytest.raises(ExportException) as ex: + self.service.export() + + assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_MOUNT + + @mock.patch("os.path.exists", return_value=True) + def test_write_error_returns_legacy_status(self, mock_path): + self.mock_cli.is_luks_volume.return_value = True + self.mock_cli.write_data_to_device.side_effect = ExportException( + sdstatus=Status.ERROR_EXPORT + ) + + with pytest.raises(ExportException) as ex: + self.service.export() + + assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_WRITE + + @mock.patch("os.path.exists", return_value=True) + def test_unlock_error_returns_legacy_status(self, mock_path): + self.mock_cli.unlock_luks_volume.side_effect = ExportException( + sdstatus=Status.ERROR_UNLOCK_LUKS + ) + + with pytest.raises(ExportException) as ex: + self.service.export() + + assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_BAD_PASSPHRASE + + @mock.patch("os.path.exists", return_value=True) + def test_unexpected_error_returns_legacy_status_generic(self, mock_path): + self.mock_cli.unlock_luks_volume.side_effect = ExportException( + sdstatus=Status.DEVICE_ERROR + ) + + with pytest.raises(ExportException) as ex: + self.service.export() + + assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_GENERIC diff --git a/tests/disk/test_volume.py b/tests/disk/test_volume.py new file mode 100644 index 0000000..f28e711 --- /dev/null +++ b/tests/disk/test_volume.py @@ -0,0 +1,56 @@ +from unittest import mock + +from securedrop_export.disk.volume import Volume, EncryptionScheme + + +class TestVolume: + def test_overwrite_valid_encryption_scheme(self): + volume = Volume( + device_name="/dev/sda", + mapped_name="pretend-luks-mapper-id", + encryption=EncryptionScheme.LUKS, + ) + assert volume.encryption is EncryptionScheme.LUKS + volume.encryption = None + assert volume.encryption is EncryptionScheme.UNKNOWN + + @mock.patch("os.path.exists", return_value=True) + def test_is_unlocked_true(self, mock_os_path): + volume = Volume( + device_name="/dev/sda1", + mapped_name="pretend-luks-mapper-id", + encryption=EncryptionScheme.LUKS, + ) + + assert volume.unlocked + + @mock.patch("os.path.exists", return_value=False) + def test_is_unlocked_false_no_path(self, mock_os_path): + volume = Volume( + device_name="/dev/sda1", + mapped_name="pretend-luks-mapper-id", + encryption=EncryptionScheme.LUKS, + ) + + assert not volume.unlocked + + @mock.patch("os.path.exists", return_value=True) + def test_writable_false(self, mock_os_path): + vol = Volume( + device_name="dev/sda1", + mapped_name="pretend-luks-id", + encryption=EncryptionScheme.LUKS, + ) + + assert not vol.writable + + @mock.patch("os.path.exists", return_value=True) + def test_writable(self, mock_os_path): + vol = Volume( + device_name="dev/sda1", + mapped_name="pretend-luks-id", + encryption=EncryptionScheme.LUKS, + mountpoint="/media/usb", + ) + + assert vol.writable diff --git a/tests/print/test_actions.py b/tests/print/test_actions.py deleted file mode 100644 index 37b2ea9..0000000 --- a/tests/print/test_actions.py +++ /dev/null @@ -1,140 +0,0 @@ -from unittest import mock - -import os -import pytest -from subprocess import CalledProcessError -import sys - -from securedrop_export import export -from securedrop_export.print.actions import PrintExportAction - - -SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa -SAMPLE_OUTPUT_BROTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa -SAMPLE_OUTPUT_LASERJET_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000\nnetwork lpd" # noqa -TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_BROTHER_PRINTER) -def test_get_good_printer_uri_laserjet(mocked_call): - submission = export.SDExport("testfile", TEST_CONFIG) - action = PrintExportAction(submission) - - result = action.get_printer_uri() - - assert result == "usb://Brother/HL-L2320D%20series?serial=A00000A000000" - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_LASERJET_PRINTER) -def test_get_good_printer_uri_brother(mocked_call): - submission = export.SDExport("testfile", TEST_CONFIG) - action = PrintExportAction(submission) - - result = action.get_printer_uri() - assert result == "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000" - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER) -def test_get_bad_printer_uri(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = PrintExportAction(submission) - expected_message = "ERROR_PRINTER_NOT_FOUND" - assert export.ExportStatus.ERROR_PRINTER_NOT_FOUND.value == expected_message - mocked_exit = mocker.patch.object( - submission, "exit_gracefully", side_effect=lambda x: sys.exit(0) - ) - - with pytest.raises(SystemExit): - action.get_printer_uri() - - mocked_exit.assert_called_once_with(expected_message) - - -@pytest.mark.parametrize( - "open_office_paths", - [ - "/tmp/whatver/thisisadoc.doc" - "/home/user/Downloads/thisisadoc.xlsx" - "/home/user/Downloads/file.odt" - "/tmp/tmpJf83j9/secret.pptx" - ], -) -def test_is_open_office_file(capsys, open_office_paths): - submission = export.SDExport("", TEST_CONFIG) - action = PrintExportAction(submission) - assert action.is_open_office_file(open_office_paths) - - -@pytest.mark.parametrize( - "open_office_paths", - [ - "/tmp/whatver/thisisadoc.doccc" - "/home/user/Downloads/thisisa.xlsx.zip" - "/home/user/Downloads/file.odz" - "/tmp/tmpJf83j9/secret.gpg" - ], -) -def test_is_not_open_office_file(capsys, open_office_paths): - submission = export.SDExport("", TEST_CONFIG) - action = PrintExportAction(submission) - assert not action.is_open_office_file(open_office_paths) - - -@mock.patch("subprocess.run") -def test_install_printer_ppd_laserjet(mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = PrintExportAction(submission) - ppd = action.install_printer_ppd( - "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A00000" - ) - assert ppd == "/usr/share/cups/model/hp-laserjet_6l.ppd" - - -@mock.patch("subprocess.run") -def test_install_printer_ppd_brother(mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = PrintExportAction(submission) - ppd = action.install_printer_ppd( - "usb://Brother/HL-L2320D%20series?serial=A00000A000000" - ) - assert ppd == "/usr/share/cups/model/br7030.ppd" - - -def test_install_printer_ppd_error_no_driver(mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = PrintExportAction(submission) - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - mocker.patch("subprocess.run", side_effect=CalledProcessError(1, "run")) - - action.install_printer_ppd( - "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000" - ) - - assert mocked_exit.mock_calls[0][2]["msg"] == "ERROR_PRINTER_DRIVER_UNAVAILABLE" - assert mocked_exit.mock_calls[0][2]["e"] is None - - -def test_install_printer_ppd_error_not_supported(mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = PrintExportAction(submission) - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - mocker.patch("subprocess.run", side_effect=CalledProcessError(1, "run")) - - action.install_printer_ppd("usb://Not/Supported?serial=A00000A000000") - - assert mocked_exit.mock_calls[0][2]["msg"] == "ERROR_PRINTER_NOT_SUPPORTED" - - -def test_setup_printer_error(mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - action = PrintExportAction(submission) - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - mocker.patch("subprocess.run", side_effect=CalledProcessError(1, "run")) - - action.setup_printer( - "usb://Brother/HL-L2320D%20series?serial=A00000A000000", - "/usr/share/cups/model/br7030.ppd", - ) - - assert mocked_exit.mock_calls[0][2]["msg"] == "ERROR_PRINTER_INSTALL" - assert mocked_exit.mock_calls[0][2]["e"] is None diff --git a/tests/print/test_service.py b/tests/print/test_service.py new file mode 100644 index 0000000..ffaee68 --- /dev/null +++ b/tests/print/test_service.py @@ -0,0 +1,439 @@ +import pytest + +from unittest import mock +import os +import subprocess +from subprocess import CalledProcessError + +from securedrop_export.directory import safe_mkdir + +from securedrop_export.exceptions import ExportException +from securedrop_export.archive import Archive +from securedrop_export.print.service import Service +from securedrop_export.print.status import Status + +SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa +SAMPLE_OUTPUT_BROTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa +SAMPLE_OUTPUT_LASERJET_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000\nnetwork lpd" # noqa +SAMPLE_OUTPUT_UNSUPPORTED_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Canon/QL-700%?serial=A00000A000000\nnetwork lpd" # noqa + + +class TestPrint: + @classmethod + def setup_class(cls): + cls.submission = Archive("testfile") + cls.service = Service(cls.submission) + + # Set up files as if extracted from tarball + fp = os.path.join(cls.submission.tmpdir, "export_data") + if not os.path.exists(fp): + safe_mkdir(fp) + + for i in ["file1", "file2", "file3"]: + with open(f"{cls.submission.tmpdir}/export_data/{i}.txt", "a+") as file: + file.write(f"It's a pretend file {i}") + file.write("\n") + + @classmethod + def teardown_class(cls): + cls.service = None + cls.submission = None + + def setup_method(self): + self.service.printer_wait_timeout = self.service.PRINTER_WAIT_TIMEOUT + + @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_BROTHER_PRINTER) + def test_get_good_printer_uri_laserjet(self, mocked_call): + assert ( + self.service._get_printer_uri() + == "usb://Brother/HL-L2320D%20series?serial=A00000A000000" + ) + + def test_service_initialized_correctly(self): + assert self.service.printer_wait_timeout == 60 + assert self.service.printer_name == "sdw-printer" + + def test_print_all_methods_called(self): + patch_setup = mock.patch.object(self.service, "_check_printer_setup") + patch_print = mock.patch.object(self.service, "_print_all_files") + + mock_setup = patch_setup.start() + mock_print = patch_print.start() + + self.service.print() + + # When the client can accept new status values, we will assert that the + # above call results in Status.PRINT_SUCCESS + assert mock_setup.called_once() + assert mock_print.called_once() + + patch_setup.stop() + patch_print.stop() + + @mock.patch("securedrop_export.print.service.Service._wait_for_print") + def test_printer_preflight_all_methods_called(self, mock_wait): + patch_setup = mock.patch.object(self.service, "_check_printer_setup") + + mock_setup = patch_setup.start() + + self.service.printer_preflight() + + # When the client can accept new status values, we will assert that the + # above call results in Status.PREFLIGHT_SUCCESS + assert mock_setup.called_once() + + patch_setup.stop() + + @mock.patch("securedrop_export.print.service.Service._wait_for_print") + def test_print_testpage_all_checks_called(self, mock_wait): + patch_setup = mock.patch.object(self.service, "_check_printer_setup") + patch_print = mock.patch.object(self.service, "_print_test_page") + + mock_setup = patch_setup.start() + mock_print = patch_print.start() + + self.service.printer_test() + # When the client can accept new status values, we will assert that the + # above call results in Status.TEST_SUCCESS + + assert mock_setup.called_once() + assert mock_print.called_once() + + patch_setup.stop() + patch_print.stop() + + @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_LASERJET_PRINTER) + def test_get_good_printer_uri_brother(self, mocked_call): + assert ( + self.service._get_printer_uri() + == "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000" + ) + + @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER) + def test_get_bad_printer_uri(self, mocked_call, capsys, mocker): + with pytest.raises(ExportException) as ex: + self.service._get_printer_uri() + + assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_FOUND + + @pytest.mark.parametrize( + "open_office_paths", + [ + "/tmp/whatver/thisisadoc.doc" + "/home/user/Downloads/thisisadoc.xlsx" + "/home/user/Downloads/file.odt" + "/tmp/tmpJf83j9/secret.pptx" + ], + ) + def test_is_open_office_file(self, capsys, open_office_paths): + assert self.service._is_open_office_file(open_office_paths) + + @pytest.mark.parametrize( + "open_office_paths", + [ + "/tmp/whatver/thisisadoc.doccc" + "/home/user/Downloads/thisisa.xlsx.zip" + "/home/user/Downloads/file.odz" + "/tmp/tmpJf83j9/secret.gpg" + ], + ) + def test_is_not_open_office_file(self, capsys, open_office_paths): + assert not self.service._is_open_office_file(open_office_paths) + + @mock.patch("subprocess.run") + def test_install_printer_ppd_laserjet(self, mocker): + ppd = self.service._install_printer_ppd( + "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A00000" + ) + assert ppd == "/usr/share/cups/model/hp-laserjet_6l.ppd" + + @mock.patch("subprocess.run") + def test_install_printer_ppd_brother(self, mocker): + ppd = self.service._install_printer_ppd( + "usb://Brother/HL-L2320D%20series?serial=A00000A000000" + ) + assert ppd == "/usr/share/cups/model/br7030.ppd" + + def test_install_printer_ppd_error_no_driver(self, mocker): + mocker.patch("subprocess.run", side_effect=CalledProcessError(1, "run")) + + with pytest.raises(ExportException) as ex: + self.service._install_printer_ppd( + "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000" + ) + + assert ex.value.sdstatus is Status.ERROR_PRINTER_DRIVER_UNAVAILABLE + + def test_install_printer_ppd_error_not_supported(self, mocker): + mocker.patch("subprocess.run", side_effect=CalledProcessError(1, "run")) + + with pytest.raises(ExportException) as ex: + self.service._install_printer_ppd( + "usb://Not/Supported?serial=A00000A000000" + ) + + assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_SUPPORTED + + def test_setup_printer_error(self, mocker): + mocker.patch("subprocess.run", side_effect=CalledProcessError(1, "run")) + + with pytest.raises(ExportException) as ex: + self.service._setup_printer( + "usb://Brother/HL-L2320D%20series?serial=A00000A000000", + "/usr/share/cups/model/br7030.ppd", + ) + + assert ex.value.sdstatus is Status.ERROR_PRINTER_INSTALL + + def test_safe_check_call(self): + # This works, since `ls` is a valid comand + self.service.safe_check_call(["ls"], Status.TEST_SUCCESS) + + def test_safe_check_call_invalid_call(self): + with pytest.raises(ExportException) as ex: + self.service.safe_check_call(["ls", "kjdsfhkdjfh"], Status.ERROR_PRINT) + + assert ex.value.sdstatus is Status.ERROR_PRINT + + def test_safe_check_call_write_to_stderr_and_ignore_error(self): + self.service.safe_check_call( + ["python3", "-c", "import sys;sys.stderr.write('hello')"], + error_status=Status.TEST_SUCCESS, + ignore_stderr_startswith=b"hello", + ) + + def test_safe_check_call_write_to_stderr_wrong_ignore_param(self): + # This one writes to stderr and ignores the wrong string, so we expect an exception + with pytest.raises(ExportException) as ex: + self.service.safe_check_call( + ["python3", "-c", "import sys;sys.stderr.write('hello\n')"], + error_status=Status.ERROR_PRINT, + ignore_stderr_startswith=b"world", + ) + + assert ex.value.sdstatus is Status.ERROR_PRINT + + @mock.patch("securedrop_export.print.service.time.sleep", return_value=None) + @mock.patch( + "subprocess.check_output", + side_effect=[ + b"printer sdw-printer is busy\n", + b"printer sdw-printer is idle\n", + ], + ) + def test__wait_for_print(self, mock_subprocess, mock_time): + assert self.service._wait_for_print() + + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) + @mock.patch("time.sleep", return_value=None) + def test__wait_for_print_print_exception(self, mock_time, mock_subprocess): + with pytest.raises(ExportException) as ex: + self.service._wait_for_print() + + assert ex.value.sdstatus is Status.ERROR_PRINT + + @mock.patch( + "subprocess.check_output", return_value=b"printer sdw-printer is busy\n" + ) + def test__wait_for_print_timeout_exception(self, mock_output): + self.service.printer_wait_timeout = 1 + + with pytest.raises(ExportException) as ex: + self.service._wait_for_print() + + assert ex.value.sdstatus is Status.ERROR_PRINT + + @pytest.mark.parametrize( + "printers", [SAMPLE_OUTPUT_BROTHER_PRINTER, SAMPLE_OUTPUT_LASERJET_PRINTER] + ) + def test__check_printer_setup(self, printers, mocker): + mocker.patch("subprocess.check_output", return_value=printers) + p = mocker.patch.object(self.service, "_setup_printer") + p2 = mocker.patch.object(self.service, "_install_printer_ppd") + p.start() + p2.start() + + self.service._check_printer_setup() + p.assert_called_once() + p2.assert_called_once() + + p.stop() + p2.stop() + + @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER) + def test__check_printer_setup_error_no_printer(self, mock_output): + + with pytest.raises(ExportException) as ex: + self.service._check_printer_setup() + assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_FOUND + + @mock.patch( + "subprocess.check_output", + return_value=SAMPLE_OUTPUT_BROTHER_PRINTER + + b"\n" + + SAMPLE_OUTPUT_LASERJET_PRINTER, + ) + def test__check_printer_setup_error_too_many_printers(self, mock_output): + + with pytest.raises(ExportException) as ex: + self.service._check_printer_setup() + assert ex.value.sdstatus is Status.ERROR_MULTIPLE_PRINTERS_FOUND + + @mock.patch( + "subprocess.check_output", return_value=SAMPLE_OUTPUT_UNSUPPORTED_PRINTER + ) + def test__check_printer_setup_error_unsupported_printer(self, mock_output): + + with pytest.raises(ExportException) as ex: + self.service._check_printer_setup() + assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_SUPPORTED + + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) + def test__check_printer_setup_error_checking_printer(self, mock_output): + + with pytest.raises(ExportException) as ex: + self.service._check_printer_setup() + assert ex.value.sdstatus is Status.ERROR_UNKNOWN + + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "check_output"), + ) + def test__get_printer_uri_error(self, mocked_subprocess): + with pytest.raises(ExportException) as ex: + self.service._get_printer_uri() + assert ex.value.sdstatus is Status.ERROR_PRINTER_URI + + @mock.patch( + "subprocess.check_output", return_value=SAMPLE_OUTPUT_UNSUPPORTED_PRINTER + ) + def test__get_printer_uri_error_unsupported(self, mocked_subprocess): + with pytest.raises(ExportException) as ex: + self.service._get_printer_uri() + assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_SUPPORTED + + def test__install_printer_ppd_error_unsupported_uri(self): + with pytest.raises(ExportException) as ex: + self.service._install_printer_ppd( + "usb://YOURE_NOT_MY_REAL_PRINTER/A00000A000000" + ) + assert ex.value.sdstatus is Status.ERROR_PRINTER_NOT_SUPPORTED + + @mock.patch("securedrop_export.print.service.Service._wait_for_print") + def test__print_test_page_calls_method(self, mock_wait): + p = mock.patch.object(self.service, "_print_file") + mock_print = p.start() + + self.service._print_test_page() + mock_print.assert_called_once_with("/usr/share/cups/data/testprint") + p.stop() + + @mock.patch("securedrop_export.print.service.Service._wait_for_print") + def test__print_all_files(self, mock_wait): + p = mock.patch.object(self.service, "_print_file") + mock_print = p.start() + + self.service._print_all_files() + mock_print.assert_has_calls( + [ + mock.call(f"{self.submission.tmpdir}/export_data/file1.txt"), + mock.call(f"{self.submission.tmpdir}/export_data/file2.txt"), + mock.call(f"{self.submission.tmpdir}/export_data/file3.txt"), + ], + any_order=True, + ) + p.stop() + + @mock.patch("securedrop_export.print.service.Service._wait_for_print") + def test_open_office_file_convert_to_pdf(self, mock_wait): + file = "/tmp/definitely-an-office-file.odt" + + with mock.patch.object(self.service, "safe_check_call") as scc, mock.patch( + "securedrop_export.print.service.logger.info" + ) as log: + self.service._print_file(file) + + assert scc.call_count == 2 + scc.assert_has_calls( + [ + mock.call( + command=[ + "unoconv", + "-o", + "/tmp/definitely-an-office-file.odt.pdf", + "/tmp/definitely-an-office-file.odt", + ], + error_status=Status.ERROR_PRINT, + ), + mock.call( + command=[ + "xpp", + "-P", + "sdw-printer", + "/tmp/definitely-an-office-file.odt.pdf", + ], + error_status=Status.ERROR_PRINT, + ), + ] + ) + assert log.call_count == 2 + log.assert_has_calls( + [ + mock.call("Converting Office document to pdf"), + mock.call("Sending file to printer sdw-printer"), + ] + ) + + def test_safe_check_call_has_error_in_stderr(self): + mock.patch("subprocess.run") + + with mock.patch("subprocess.run"), pytest.raises(ExportException) as ex: + self.service.safe_check_call(command="ls", error_status=Status.TEST_SUCCESS) + + assert ex.value.sdstatus is Status.TEST_SUCCESS + + @mock.patch("securedrop_export.print.service.time.sleep", return_value=None) + @mock.patch( + "subprocess.check_output", + side_effect=[ + b"printer sdw-printer is busy\n", + b"printer sdw-printer is idle\n", + ], + ) + def test__wait_for_print_waits_correctly(self, mock_subprocess, mock_time): + file = "/tmp/happy-to-print-you.pdf" + + with mock.patch.object(self.service, "safe_check_call") as scc, mock.patch( + "securedrop_export.print.service.logger.info" + ) as log: + self.service._print_file(file) + + assert scc.call_count == 1 + scc.assert_has_calls( + [ + mock.call( + command=[ + "xpp", + "-P", + "sdw-printer", + "/tmp/happy-to-print-you.pdf", + ], + error_status=Status.ERROR_PRINT, + ), + ] + ) + assert log.call_count == 4 + log.assert_has_calls( + [ + mock.call("Sending file to printer sdw-printer"), + mock.call("Running lpstat waiting for printer sdw-printer"), + mock.call("Running lpstat waiting for printer sdw-printer"), + mock.call("Print completed"), + ] + ) diff --git a/tests/sd-export-config-bad-2.json b/tests/sd-export-config-bad-2.json deleted file mode 100644 index f69e25b..0000000 --- a/tests/sd-export-config-bad-2.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "no_pci_bus_id": "nope" -} diff --git a/tests/sd-export-config-bad.json b/tests/sd-export-config-bad.json deleted file mode 100644 index f7cbf8d..0000000 --- a/tests/sd-export-config-bad.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "pciishf. i3u 2 -} diff --git a/tests/sd-export-config.json b/tests/sd-export-config.json deleted file mode 100644 index d1167cf..0000000 --- a/tests/sd-export-config.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "pci_bus_id": "2" -} diff --git a/tests/test_export.py b/tests/test_archive.py similarity index 77% rename from tests/test_export.py rename to tests/test_archive.py index fb6f586..57791a8 100644 --- a/tests/test_export.py +++ b/tests/test_archive.py @@ -9,13 +9,8 @@ import tarfile from io import BytesIO -from securedrop_export import export - -TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") -BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad.json") -ANOTHER_BAD_TEST_CONFIG = os.path.join( - os.path.dirname(__file__), "sd-export-config-bad-2.json" -) +from securedrop_export.exceptions import ExportException +from securedrop_export.archive import Archive, Metadata, Status def test_extract_tarball(): @@ -48,11 +43,9 @@ def test_extract_tarball(): archive.close() - submission = export.SDExport(archive_path, TEST_CONFIG) + submission = Archive(archive_path).extract_tarball() assert oct(os.stat(submission.tmpdir).st_mode) == "0o40700" - submission.extract_tarball() - extracted_file_path = os.path.join( submission.tmpdir, "some", "dirs", "file.txt" ) @@ -64,7 +57,7 @@ def test_extract_tarball(): oct(os.stat(os.path.join(submission.tmpdir, "some")).st_mode) == "0o40700" ) # Subdirectories that are not added as members are extracted with 700 permissions - # because os.umask(0o077) is set in the SDExport constructor. + # because os.umask(0o077) is set in the Archive constructor. assert ( oct(os.stat(os.path.join(submission.tmpdir, "some", "dirs")).st_mode) == "0o40700" @@ -94,10 +87,10 @@ def test_extract_tarball_with_symlink(): archive.addfile(symlink_info) archive.close() - submission = export.SDExport(archive_path, TEST_CONFIG) + submission = Archive(archive_path) assert oct(os.stat(submission.tmpdir).st_mode) == "0o40700" - submission.extract_tarball() + submission = submission.extract_tarball() symlink_path = os.path.join(submission.tmpdir, "symlink") assert os.path.islink(symlink_path) @@ -131,9 +124,9 @@ def test_extract_tarball_raises_if_doing_path_traversal(): archive.addfile(traversed_file_info, BytesIO(content)) archive.close() - submission = export.SDExport(archive_path, TEST_CONFIG) + submission = Archive(archive_path) - with pytest.raises(SystemExit): + with pytest.raises(ExportException): submission.extract_tarball() assert not os.path.exists("/tmp/traversed") @@ -168,9 +161,9 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_dir(): archive.addfile(dir_info) archive.close() - submission = export.SDExport(archive_path, TEST_CONFIG) + submission = Archive(archive_path) - with pytest.raises(SystemExit): + with pytest.raises(ExportException): submission.extract_tarball() assert not os.path.exists("/tmp/traversed") @@ -207,9 +200,9 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_symlink(): archive.addfile(symlink_info, BytesIO(content)) archive.close() - submission = export.SDExport(archive_path, TEST_CONFIG) + submission = Archive(archive_path) - with pytest.raises(SystemExit): + with pytest.raises(ExportException): submission.extract_tarball() assert not os.path.exists("/tmp/traversed") @@ -246,9 +239,9 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_symlink_linkname(): archive.addfile(symlink_info, BytesIO(content)) archive.close() - submission = export.SDExport(archive_path, TEST_CONFIG) + submission = Archive(archive_path) - with pytest.raises(SystemExit): + with pytest.raises(ExportException): submission.extract_tarball() assert not os.path.exists("/tmp/traversed") @@ -282,9 +275,9 @@ def test_extract_tarball_raises_if_name_has_unsafe_absolute_path(): archive.addfile(file_info, BytesIO(content)) archive.close() - submission = export.SDExport(archive_path, TEST_CONFIG) + submission = Archive(archive_path) - with pytest.raises(SystemExit): + with pytest.raises(ExportException): submission.extract_tarball() assert not os.path.exists("/tmp/unsafe") @@ -321,9 +314,9 @@ def test_extract_tarball_raises_if_name_has_unsafe_absolute_path_with_symlink(): archive.add(symlink_path, "symlink") archive.close() - submission = export.SDExport(archive_path, TEST_CONFIG) + submission = Archive(archive_path) - with pytest.raises(SystemExit): + with pytest.raises(ExportException): submission.extract_tarball() assert not os.path.exists("/tmp/unsafe") @@ -366,9 +359,9 @@ def test_extract_tarball_raises_if_name_has_unsafe_absolute_path_with_symlink_to archive.add(file_path, "symlink/unsafe") archive.close() - submission = export.SDExport(archive_path, TEST_CONFIG) + submission = Archive(archive_path) - with pytest.raises(SystemExit): + with pytest.raises(ExportException): submission.extract_tarball() assert not os.path.exists("/tmp/unsafe") @@ -403,149 +396,117 @@ def test_extract_tarball_raises_if_linkname_has_unsafe_absolute_path(): archive.addfile(symlink_info, BytesIO(content)) archive.close() - submission = export.SDExport(archive_path, TEST_CONFIG) + submission = Archive(archive_path) - with pytest.raises(SystemExit): + with pytest.raises(ExportException): submission.extract_tarball() assert not os.path.exists("/tmp/unsafe") -def test_exit_gracefully_no_exception(capsys): - submission = export.SDExport("testfile", TEST_CONFIG) - test_msg = "test" - - with pytest.raises(SystemExit) as sysexit: - submission.exit_gracefully(test_msg) - - # A graceful exit means a return code of 0 - assert sysexit.value.code == 0 - - captured = capsys.readouterr() - assert captured.err == "{}\n".format(test_msg) - assert captured.out == "" - - -def test_exit_gracefully_exception(capsys): - submission = export.SDExport("testfile", TEST_CONFIG) - test_msg = "ERROR_GENERIC" - - with pytest.raises(SystemExit) as sysexit: - exception = mock.MagicMock() - exception.output = "BANG!" - submission.exit_gracefully(test_msg, e=exception) - - # A graceful exit means a return code of 0 - assert sysexit.value.code == 0 - - captured = capsys.readouterr() - assert captured.err.rstrip() == export.ExportStatus.ERROR_GENERIC.value - assert captured.out == "" - - def test_empty_config(capsys): - export.SDExport("testfile", TEST_CONFIG) + Archive("testfile") temp_folder = tempfile.mkdtemp() - metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: f.write("{}") - config = export.Metadata(temp_folder) - - assert not config.is_valid() + with pytest.raises(ExportException) as ex: + Metadata(temp_folder).validate() + assert ex.value.sdstatus is Status.ERROR_ARCHIVE_METADATA def test_valid_printer_test_config(capsys): - export.SDExport("testfile", TEST_CONFIG) + Archive("testfile") temp_folder = tempfile.mkdtemp() - metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: f.write('{"device": "printer-test"}') - config = export.Metadata(temp_folder) + config = Metadata(temp_folder).validate() - assert config.is_valid() assert config.encryption_key is None assert config.encryption_method is None def test_valid_printer_config(capsys): - export.SDExport("", TEST_CONFIG) + Archive("") temp_folder = tempfile.mkdtemp() - metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: f.write('{"device": "printer"}') - config = export.Metadata(temp_folder) + config = Metadata(temp_folder).validate() - assert config.is_valid() assert config.encryption_key is None assert config.encryption_method is None def test_invalid_encryption_config(capsys): - export.SDExport("testfile", TEST_CONFIG) + Archive("testfile") temp_folder = tempfile.mkdtemp() - metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: f.write( '{"device": "disk", "encryption_method": "base64", "encryption_key": "hunter1"}' ) - config = export.Metadata(temp_folder) + with pytest.raises(ExportException) as ex: + Metadata(temp_folder).validate() - assert config.encryption_key == "hunter1" - assert config.encryption_method == "base64" - assert not config.is_valid() + assert ex.value.sdstatus is Status.ERROR_ARCHIVE_METADATA -def test_valid_encryption_config(capsys): - export.SDExport("testfile", TEST_CONFIG) +def test_invalid_config(capsys): + Archive("testfile") + temp_folder = tempfile.mkdtemp() - metadata = os.path.join(temp_folder, export.Metadata.METADATA_FILE) + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: - f.write( - '{"device": "disk", "encryption_method": "luks", "encryption_key": "hunter1"}' - ) + f.write('{"device": "asdf", "encryption_method": "OHNO"}') - config = export.Metadata(temp_folder) + with pytest.raises(ExportException) as ex: + Metadata(temp_folder).validate() - assert config.encryption_key == "hunter1" - assert config.encryption_method == "luks" - assert config.is_valid() + assert ex.value.sdstatus is Status.ERROR_ARCHIVE_METADATA -def test_safe_check_call(capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - submission.safe_check_call(["ls"], "this will work") - expected_message = "uh oh!!!!" +def test_malformed_config(capsys): + Archive("testfile") - with pytest.raises(SystemExit) as sysexit: - submission.safe_check_call(["ls", "kjdsfhkdjfh"], expected_message) + temp_folder = tempfile.mkdtemp() + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) + with open(metadata, "w") as f: + f.write('{"device": "asdf", "encryption_method": {"OHNO", "MALFORMED"}') - assert sysexit.value.code == 0 + with pytest.raises(ExportException) as ex: + Metadata(temp_folder).validate() - captured = capsys.readouterr() - assert captured.err == "{}\n".format(expected_message) - assert captured.out == "" + assert ex.value.sdstatus is Status.ERROR_METADATA_PARSING - # This should work too - submission.safe_check_call( - ["python3", "-c", "import sys;sys.stderr.write('hello')"], - expected_message, - ignore_stderr_startswith=b"hello", - ) - with pytest.raises(SystemExit) as sysexit: - submission.safe_check_call( - ["python3", "-c", "import sys;sys.stderr.write('hello\n')"], - expected_message, - ignore_stderr_startswith=b"world", +def test_valid_encryption_config(capsys): + Archive("testfile") + temp_folder = tempfile.mkdtemp() + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) + with open(metadata, "w") as f: + f.write( + '{"device": "disk", "encryption_method": "luks", "encryption_key": "hunter1"}' ) - assert sysexit.value.code == 0 + config = Metadata(temp_folder).validate() + + assert config.encryption_key == "hunter1" + assert config.encryption_method == "luks" + + +@mock.patch("json.loads", side_effect=json.decoder.JSONDecodeError("ugh", "badjson", 0)) +def test_metadata_parsing_error(mock_json): + """ + Handle exception caused when loading metadata JSON + """ + with pytest.raises(ExportException) as ex: + Metadata(tempfile.mkdtemp()).validate() - captured = capsys.readouterr() - assert captured.err == "{}\n".format(expected_message) - assert captured.out == "" + assert ex.value.sdstatus is Status.ERROR_METADATA_PARSING diff --git a/tests/test_directory.py b/tests/test_directory.py new file mode 100644 index 0000000..2f0a3a9 --- /dev/null +++ b/tests/test_directory.py @@ -0,0 +1,82 @@ +import pytest +import os +import tempfile +import shutil + +from pathlib import Path +from securedrop_export import directory + + +class TestDirectory: + + _REL_TRAVERSAL = "../../../whee" + _SAFE_RELPATH = "./hi" + _SAFE_RELPATH2 = "yay/a/path" + _UNSAFE_RELPATH = "lgtm/../ohwait" + + @classmethod + def setup_class(cls): + cls.homedir = tempfile.mkdtemp() + "/" + + @classmethod + def teardown_class(cls): + if os.path.exists(cls.homedir): + shutil.rmtree(cls.homedir) + + def setup_method(self, method): + pass + + def teadown_method(self, method): + if os.path.exists(self.homedir): + os.remove(self.homedir) + + def test_safe_mkdir_error_base_relpath(self): + with pytest.raises(ValueError): + directory.safe_mkdir(base_path=Path(".")) + + def test_safe_mkdir_error_basepath_path_traversal(self): + with pytest.raises(ValueError): + directory.safe_mkdir(f"{self.homedir}{self._REL_TRAVERSAL}") + + def test_safe_mkdir_error_relpath_path_traversal(self): + with pytest.raises(ValueError): + directory.safe_mkdir(f"{self.homedir}", f"{self._REL_TRAVERSAL}") + + def test_safe_mkdir_success(self): + directory.safe_mkdir(f"{self.homedir}") + + def test_safe_mkdir_success_with_relpath(self): + directory.safe_mkdir(f"{self.homedir}", f"{self._SAFE_RELPATH}") + + assert os.path.exists(f"{self.homedir}{self._SAFE_RELPATH}") + + def test_safe_mkdir_success_another_relpath(self): + directory.safe_mkdir(f"{self.homedir}", f"{self._SAFE_RELPATH2}") + + assert os.path.exists(f"{self.homedir}{self._SAFE_RELPATH2}") + + def test_safe_mkdir_weird_path(self): + with pytest.raises(ValueError): + directory.safe_mkdir(f"{self.homedir}", f"{self._UNSAFE_RELPATH}") + + def test__check_all_permissions_path_missing(self): + with pytest.raises(ValueError): + directory._check_all_permissions(f"{self.homedir}", f"{self._SAFE_RELPATH}") + + def test_check_dir_perms_unsafe(self): + path = Path(f"{self.homedir}{self._SAFE_RELPATH}") + + directory.safe_mkdir(path) + + # Not what we want, ever + path.chmod(0o666) + + with pytest.raises(RuntimeError): + directory._check_dir_permissions(path) + + def test_check_all_perms_invalid_full_path(self): + path = Path(f"{self.homedir}/idontexist") + base = Path(f"{self.homedir}") + + # Returns without error + assert directory._check_all_permissions(path, base) is None diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py new file mode 100644 index 0000000..71af411 --- /dev/null +++ b/tests/test_exceptions.py @@ -0,0 +1,20 @@ +import pytest +import signal + +from securedrop_export.exceptions import handler, TimeoutException + + +def test_handler(): + signal.signal(signal.SIGALRM, handler) + signal.setitimer(signal.ITIMER_REAL, 0.001) + + with pytest.raises(TimeoutException): + _run_handler_routine() + + +def _run_handler_routine(): + try: + while True: + continue + except TimeoutException: + raise diff --git a/tests/test_main.py b/tests/test_main.py index efa2a6e..41fce7f 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,4 +1,185 @@ -from securedrop_export.main import __main__ # noqa: F401 +import pytest +import tempfile +import os +from unittest import mock +import shutil -# This import ensures at least the imports in main.__main__ -# are executed during a test run +from securedrop_export.archive import Archive, Metadata, Status as ArchiveStatus +from securedrop_export.status import BaseStatus +from securedrop_export.command import Command +from securedrop_export.exceptions import ExportException + +from securedrop_export.main import ( + Status, + entrypoint, + _exit_gracefully, + _write_status, + _start_service, + _configure_logging, +) + +SUBMISSION_SAMPLE_ARCHIVE = "pretendfile.tar.gz" + + +class TestMain: + def setup_method(self, method): + # This can't be a class method, since we expect sysexit during this test suite, + # which + self.submission = Archive("pretendfile.tar.gz") + assert os.path.exists(self.submission.tmpdir) + + def teardown_method(self, method): + if os.path.exists(self.submission.tmpdir): + shutil.rmtree(self.submission.tmpdir) + self.submission = None + + def test_exit_gracefully_no_exception(self, capsys): + with pytest.raises(SystemExit) as sysexit: + _exit_gracefully(self.submission, Status.ERROR_GENERIC) + + assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_GENERIC) + + def test_exit_gracefully_exception(self, capsys): + with pytest.raises(SystemExit) as sysexit: + _exit_gracefully(self.submission, Status.ERROR_GENERIC) + + # A graceful exit means a return code of 0 + assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_GENERIC) + + @pytest.mark.parametrize("status", [s for s in Status]) + def test_write_status(self, status, capsys): + _write_status(status) + captured = capsys.readouterr() + assert captured.err == status.value + "\n" + + @pytest.mark.parametrize("invalid_status", ["foo", ";ls", "&& echo 0", None]) + def test_write_status_error(self, invalid_status, capsys): + + with pytest.raises(ValueError): + _write_status(Status(invalid_status)) + + def _did_exit_gracefully(self, exit, capsys, status: BaseStatus) -> bool: + """ + Helper. True if exited with 0, writing supplied status to stderr. + """ + captured = capsys.readouterr() + + return ( + exit.value.code == 0 + and captured.err.rstrip().endswith(status.value) + and captured.out == "" + ) + + @pytest.mark.parametrize("command", list(Command)) + @mock.patch("securedrop_export.main._configure_logging") + @mock.patch("os.path.exists", return_value=True) + def test_entrypoint_success_start_service(self, mock_log, mock_path, command): + metadata = os.path.join(self.submission.tmpdir, Metadata.METADATA_FILE) + + with open(metadata, "w") as f: + f.write(f'{{"device": "{command.value}", "encryption_method": "luks"}}') + + with mock.patch( + "sys.argv", ["qvm-send-to-usb", SUBMISSION_SAMPLE_ARCHIVE] + ), mock.patch( + "securedrop_export.main._start_service" + ) as mock_service, mock.patch( + "securedrop_export.main.Archive.extract_tarball", + return_value=self.submission, + ), pytest.raises( + SystemExit + ): + entrypoint() + + if command is not Command.START_VM: + assert self.submission.command == command + assert mock_service.call_args[0][0].archive == SUBMISSION_SAMPLE_ARCHIVE + mock_service.assert_called_once_with(self.submission) + + def test_valid_printer_test_config(self, capsys): + Archive("testfile") + temp_folder = tempfile.mkdtemp() + metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) + with open(metadata, "w") as f: + f.write('{"device": "printer-test"}') + + config = Metadata(temp_folder).validate() + + assert config.encryption_key is None + assert config.encryption_method is None + + @mock.patch( + "securedrop_export.archive.safe_extractall", + side_effect=ValueError("A tarball problem!"), + ) + @mock.patch("securedrop_export.main.os.path.exists", return_value=True) + @mock.patch("securedrop_export.main.shutil.rmtree") + @mock.patch("securedrop_export.main._configure_logging") + def test_entrypoint_failure_extraction( + self, mock_log, mock_rm, mock_path, mock_extract, capsys + ): + with mock.patch( + "sys.argv", ["qvm-send-to-usb", SUBMISSION_SAMPLE_ARCHIVE] + ), pytest.raises(SystemExit) as sysexit: + entrypoint() + + assert self._did_exit_gracefully( + sysexit, capsys, ArchiveStatus.ERROR_EXTRACTION + ) + + @mock.patch( + "securedrop_export.main._configure_logging", + side_effect=ExportException( + sdstatus=Status.ERROR_LOGGING, + sderror="Zounds, an error setting up logging!", + ), + ) + def test_entrypoint_logging_fails(self, mock_mkdir, capsys): + with pytest.raises(SystemExit) as sysexit: + entrypoint() + + assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_LOGGING) + + @mock.patch( + "securedrop_export.main._configure_logging", + side_effect=RuntimeError("Zounds, an uncaught error!"), + ) + def test_entrypoint_fails_unexpected(self, mock_mkdir, capsys): + with pytest.raises(SystemExit) as sysexit: + entrypoint() + + assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_GENERIC) + + @mock.patch("os.path.exists", return_value=False) + def test_entrypoint_archive_path_fails(self, mock_path, capsys): + with pytest.raises(SystemExit) as sysexit: + entrypoint() + + assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_FILE_NOT_FOUND) + + @mock.patch( + "securedrop_export.main.safe_mkdir", + side_effect=ValueError(1, "No logs for you!"), + ) + def test__configure_logging_error(self, mock_mkdir, capsys): + with pytest.raises(ExportException) as ex: + _configure_logging() + + assert ex.value.sdstatus is Status.ERROR_LOGGING + + @pytest.mark.parametrize("command", list(Command)) + def test__start_service_calls_correct_services(self, command): + if command is Command.START_VM: + pytest.skip("Command does not start a service") + + self.submission.command = command + + with mock.patch("securedrop_export.main.PrintService") as ps, mock.patch( + "securedrop_export.main.ExportService" + ) as es: + _start_service(self.submission) + + if command in [Command.PRINT, Command.PRINTER_TEST, Command.PRINTER_PREFLIGHT]: + assert ps.call_args[0][0] is self.submission + else: + assert es.call_args[0][0] is self.submission