diff --git a/securedrop_export/disk/__init__.py b/securedrop_export/disk/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/securedrop_export/disk/actions.py b/securedrop_export/disk/actions.py new file mode 100644 index 0000000..02b09f2 --- /dev/null +++ b/securedrop_export/disk/actions.py @@ -0,0 +1,230 @@ +import logging +import os +import subprocess +import sys + +from typing import List + +from securedrop_export.export import ExportAction +from securedrop_export.exceptions import ExportStatus + +MOUNTPOINT = "/media/usb" +ENCRYPTED_DEVICE = "encrypted_volume" + +logger = logging.getLogger(__name__) + + +class DiskAction(ExportAction): + def __init__(self, submission): + self.submission = submission + self.device = None # Optional[str] + self.mountpoint = MOUNTPOINT + self.encrypted_device = ENCRYPTED_DEVICE + + def run(self) -> None: + """Run logic""" + raise NotImplementedError + + def check_usb_connected(self, exit=False) -> None: + usb_devices = self._get_connected_usbs() + + if len(usb_devices) == 0: + logger.info('0 USB devices connected') + self.submission.exit_gracefully(ExportStatus.USB_NOT_CONNECTED.value) + elif len(usb_devices) == 1: + logger.info('1 USB device connected') + self.device = usb_devices[0] + if exit: + self.submission.exit_gracefully(ExportStatus.USB_CONNECTED.value) + elif len(usb_devices) > 1: + logger.info('>1 USB devices connected') + # Return generic error until freedomofpress/securedrop-export/issues/25 + self.submission.exit_gracefully(ExportStatus.ERROR_GENERIC.value) + + def _get_connected_usbs(self) -> List[str]: + logger.info('Performing usb preflight') + # List all block devices attached to VM that are disks and not partitions. + try: + lsblk = subprocess.Popen(["lsblk", "-o", "NAME,TYPE"], stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + grep = subprocess.Popen(["grep", "disk"], stdin=lsblk.stdout, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + command_output = grep.stdout.readlines() + + # The first word in each element of the command_output list is the device name + attached_devices = [x.decode('utf8').split()[0] for x in command_output] + except subprocess.CalledProcessError: + self.submission.exit_gracefully(ExportStatus.ERROR_GENERIC.value) + + # Determine which are USBs by selecting those block devices that are removable disks. + usb_devices = [] + for device in attached_devices: + try: + removable = subprocess.check_output( + ["cat", "/sys/class/block/{}/removable".format(device)], + stderr=subprocess.PIPE) + is_removable = int(removable.decode('utf8').strip()) + except subprocess.CalledProcessError: + is_removable = False + + if is_removable: + usb_devices.append("/dev/{}".format(device)) + + return usb_devices + + def set_extracted_device_name(self): + try: + device_and_partitions = subprocess.check_output( + ["lsblk", "-o", "TYPE", "--noheadings", self.device], stderr=subprocess.PIPE) + + # we don't support multiple partitions + partition_count = device_and_partitions.decode('utf-8').split('\n').count('part') + if partition_count > 1: + logger.debug("multiple partitions not supported") + self.submission.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value) + + # redefine device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted + self.device = self.device if partition_count == 0 else self.device + '1' + except subprocess.CalledProcessError: + self.submission.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value) + + def check_luks_volume(self): + # cryptsetup isLuks returns 0 if the device is a luks volume + # subprocess with throw if the device is not luks (rc !=0) + logger.info('Checking if volume is luks-encrypted') + self.set_extracted_device_name() + logger.debug("checking if {} is luks encrypted".format(self.device)) + self.submission.safe_check_call( + command=["sudo", "cryptsetup", "isLuks", self.device], + error_message=ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value + ) + self.submission.exit_gracefully(ExportStatus.USB_ENCRYPTED.value) + + def unlock_luks_volume(self, encryption_key): + try: + # get the encrypted device name + self.set_extracted_device_name() + luks_header = subprocess.check_output(["sudo", "cryptsetup", "luksDump", self.device]) + luks_header_list = luks_header.decode('utf-8').split('\n') + for line in luks_header_list: + items = line.split('\t') + if 'UUID' in items[0]: + self.encrypted_device = 'luks-' + items[1] + + # the luks device is already unlocked + if os.path.exists(os.path.join('/dev/mapper/', self.encrypted_device)): + logger.debug('Device already unlocked') + return + + logger.debug('Unlocking luks volume {}'.format(self.encrypted_device)) + p = subprocess.Popen( + ["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + logger.debug('Passing key') + p.communicate(input=str.encode(encryption_key, "utf-8")) + rc = p.returncode + if rc != 0: + logger.error('Bad phassphrase for {}'.format(self.encrypted_device)) + self.submission.exit_gracefully(ExportStatus.USB_BAD_PASSPHRASE.value) + except subprocess.CalledProcessError: + self.submission.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED) + + def mount_volume(self): + # If the drive is already mounted then we don't need to mount it again + output = subprocess.check_output( + ["lsblk", "-o", "MOUNTPOINT", "--noheadings", self.device]) + mountpoint = output.decode('utf-8').strip() + if mountpoint: + logger.debug('The device is already mounted') + self.mountpoint = mountpoint + return + + # mount target not created, create folder + if not os.path.exists(self.mountpoint): + self.submission.safe_check_call( + command=["sudo", "mkdir", self.mountpoint], + error_message=ExportStatus.ERROR_USB_MOUNT + ) + + mapped_device_path = os.path.join("/dev/mapper/", self.encrypted_device) + logger.info('Mounting {}'.format(mapped_device_path)) + self.submission.safe_check_call( + command=["sudo", "mount", mapped_device_path, self.mountpoint], + error_message=ExportStatus.ERROR_USB_MOUNT.value + ) + self.submission.safe_check_call( + command=["sudo", "chown", "-R", "user:user", self.mountpoint], + error_message=ExportStatus.ERROR_USB_MOUNT.value + ) + + def copy_submission(self): + # move files to drive (overwrites files with same filename) and unmount drive + # we don't use safe_check_call here because we must lock and + # unmount the drive as part of the finally block + try: + target_path = os.path.join(self.mountpoint, self.submission.target_dirname) + subprocess.check_call(["mkdir", target_path]) + export_data = os.path.join(self.submission.tmpdir, "export_data/") + logger.info('Copying file to {}'.format(self.submission.target_dirname)) + subprocess.check_call(["cp", "-r", export_data, target_path]) + logger.info('File copied successfully to {}'.format(self.submission.target_dirname)) + self.submission.popup_message("Files exported successfully to disk.") + except (subprocess.CalledProcessError, OSError): + self.submission.exit_gracefully(ExportStatus.ERROR_USB_WRITE.value) + finally: + logger.info('Syncing filesystems') + subprocess.check_call(["sync"]) + + if os.path.exists(self.mountpoint): + logger.info('Unmounting drive from {}'.format(self.mountpoint)) + subprocess.check_call(["sudo", "umount", self.mountpoint]) + + if os.path.exists(os.path.join('/dev/mapper', self.encrypted_device)): + logger.info('Locking luks volume {}'.format(self.encrypted_device)) + subprocess.check_call( + ["sudo", "cryptsetup", "luksClose", self.encrypted_device] + ) + + logger.info('Deleting temporary directory {}'.format(self.submission.tmpdir)) + subprocess.check_call(["rm", "-rf", self.submission.tmpdir]) + sys.exit(0) + + +class USBTestAction(DiskAction): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def run(self): + logger.info('Export archive is usb-test') + self.check_usb_connected(exit=True) + + +class DiskTestAction(DiskAction): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def run(self): + logger.info('Export archive is disk-test') + # check_usb_connected looks for the drive, sets the drive to use + self.check_usb_connected() + self.check_luks_volume() + + +class DiskExportAction(DiskAction): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def run(self): + logger.info('Export archive is disk') + # check_usb_connected looks for the drive, sets the drive to use + self.check_usb_connected() + logger.info('Unlocking volume') + # exports all documents in the archive to luks-encrypted volume + self.unlock_luks_volume(self.submission.archive_metadata.encryption_key) + logger.info('Mounting volume') + self.mount_volume() + logger.info('Copying submission to drive') + self.copy_submission() diff --git a/securedrop_export/exceptions.py b/securedrop_export/exceptions.py new file mode 100644 index 0000000..1c14bc6 --- /dev/null +++ b/securedrop_export/exceptions.py @@ -0,0 +1,48 @@ +from enum import Enum + + +class ExportStatus(Enum): + + # General errors + ERROR_FILE_NOT_FOUND = 'ERROR_FILE_NOT_FOUND' + ERROR_EXTRACTION = 'ERROR_EXTRACTION' + ERROR_METADATA_PARSING = 'ERROR_METADATA_PARSING' + ERROR_ARCHIVE_METADATA = 'ERROR_ARCHIVE_METADATA' + ERROR_USB_CONFIGURATION = 'ERROR_USB_CONFIGURATION' + ERROR_GENERIC = 'ERROR_GENERIC' + + # USB preflight related errors + USB_CONNECTED = 'USB_CONNECTED' + USB_NOT_CONNECTED = 'USB_NOT_CONNECTED' + ERROR_USB_CHECK = 'ERROR_USB_CHECK' + + # USB Disk preflight related errors + USB_ENCRYPTED = 'USB_ENCRYPTED' + USB_ENCRYPTION_NOT_SUPPORTED = 'USB_ENCRYPTION_NOT_SUPPORTED' + USB_DISK_ERROR = 'USB_DISK_ERROR' + + # Printer preflight related errors + ERROR_PRINTER_NOT_FOUND = 'ERROR_PRINTER_NOT_FOUND' + ERROR_PRINTER_NOT_SUPPORTED = 'ERROR_PRINTER_NOT_SUPPORTED' + ERROR_PRINTER_DRIVER_UNAVAILABLE = 'ERROR_PRINTER_DRIVER_UNAVAILABLE' + ERROR_PRINTER_INSTALL = 'ERROR_PRINTER_INSTALL' + + # Disk export errors + USB_BAD_PASSPHRASE = 'USB_BAD_PASSPHRASE' + ERROR_USB_MOUNT = 'ERROR_USB_MOUNT' + ERROR_USB_WRITE = 'ERROR_USB_WRITE' + + # Printer export errors + ERROR_PRINT = 'ERROR_PRINT' + + +class TimeoutException(Exception): + pass + + +def handler(signum, frame): + """ + This is a signal handler used for raising timeouts: + https://docs.python.org/3/library/signal.html#signal.signal + """ + raise TimeoutException("Timeout") diff --git a/securedrop_export/export.py b/securedrop_export/export.py index 500b9cf..976b1bd 100755 --- a/securedrop_export/export.py +++ b/securedrop_export/export.py @@ -1,67 +1,21 @@ #!/usr/bin/env python3 +import abc import datetime import json import logging import os import shutil -import signal import subprocess import sys import tarfile import tempfile -import time -from typing import List, Optional # noqa: F401 -from enum import Enum - -PRINTER_NAME = "sdw-printer" -PRINTER_WAIT_TIMEOUT = 60 -MOUNTPOINT = "/media/usb" -ENCRYPTED_DEVICE = "encrypted_volume" -BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" -BRLASER_PPD = "/usr/share/cups/model/br7030.ppd" -LASERJET_DRIVER = "/usr/share/cups/drv/hpcups.drv" -LASERJET_PPD = "/usr/share/cups/model/hp-laserjet_6l.ppd" +from securedrop_export.exceptions import ExportStatus logger = logging.getLogger(__name__) -class ExportStatus(Enum): - - # General errors - ERROR_FILE_NOT_FOUND = 'ERROR_FILE_NOT_FOUND' - ERROR_EXTRACTION = 'ERROR_EXTRACTION' - ERROR_METADATA_PARSING = 'ERROR_METADATA_PARSING' - ERROR_ARCHIVE_METADATA = 'ERROR_ARCHIVE_METADATA' - ERROR_USB_CONFIGURATION = 'ERROR_USB_CONFIGURATION' - ERROR_GENERIC = 'ERROR_GENERIC' - - # USB preflight related errors - USB_CONNECTED = 'USB_CONNECTED' - USB_NOT_CONNECTED = 'USB_NOT_CONNECTED' - ERROR_USB_CHECK = 'ERROR_USB_CHECK' - - # USB Disk preflight related errors - USB_ENCRYPTED = 'USB_ENCRYPTED' - USB_ENCRYPTION_NOT_SUPPORTED = 'USB_ENCRYPTION_NOT_SUPPORTED' - USB_DISK_ERROR = 'USB_DISK_ERROR' - - # Printer preflight related errors - ERROR_PRINTER_NOT_FOUND = 'ERROR_PRINTER_NOT_FOUND' - ERROR_PRINTER_NOT_SUPPORTED = 'ERROR_PRINTER_NOT_SUPPORTED' - ERROR_PRINTER_DRIVER_UNAVAILABLE = 'ERROR_PRINTER_DRIVER_UNAVAILABLE' - ERROR_PRINTER_INSTALL = 'ERROR_PRINTER_INSTALL' - - # Disk export errors - USB_BAD_PASSPHRASE = 'USB_BAD_PASSPHRASE' - ERROR_USB_MOUNT = 'ERROR_USB_MOUNT' - ERROR_USB_WRITE = 'ERROR_USB_WRITE' - - # Printer export errors - ERROR_PRINT = 'ERROR_PRINT' - - class Metadata(object): """ Object to parse, validate and store json metadata from the sd-export archive. @@ -122,13 +76,6 @@ def is_valid(self): class SDExport(object): def __init__(self, archive, config_path): - self.device = None # Optional[str] - self.mountpoint = MOUNTPOINT - self.encrypted_device = ENCRYPTED_DEVICE - - self.printer_name = PRINTER_NAME - self.printer_wait_timeout = PRINTER_WAIT_TIMEOUT - self.archive = archive self.submission_dirname = os.path.basename(self.archive).split(".")[0] self.target_dirname = "sd-export-{}".format( @@ -136,15 +83,13 @@ def __init__(self, archive, config_path): ) self.tmpdir = tempfile.mkdtemp() - def safe_check_call(self, command, error_message): - """ - Safely wrap subprocess.check_output to ensure we always return 0 and - log the error messages - """ + def extract_tarball(self): try: - subprocess.check_call(command) - except subprocess.CalledProcessError as ex: - self.exit_gracefully(msg=error_message, e=ex.output) + logger.info('Extracting tarball {} into {}'.format(self.archive, self.tmpdir)) + with tarfile.open(self.archive) as tar: + tar.extractall(self.tmpdir) + except Exception: + self.exit_gracefully(ExportStatus.ERROR_EXTRACTION.value) def exit_gracefully(self, msg, e=False): """ @@ -170,7 +115,17 @@ def exit_gracefully(self, msg, e=False): # the file with another application sys.exit(0) - def popup_message(self, msg): + def safe_check_call(self, command, error_message): + """ + Safely wrap subprocess.check_output to ensure we always return 0 and + log the error messages + """ + try: + subprocess.check_call(command) + except subprocess.CalledProcessError as ex: + self.exit_gracefully(msg=error_message, e=ex.output) + + def popup_message(self, msg: str): self.safe_check_call( command=[ "notify-send", @@ -183,357 +138,14 @@ def popup_message(self, msg): error_message="Error sending notification:" ) - def extract_tarball(self): - try: - logger.info('Extracting tarball {} into {}'.format(self.archive, self.tmpdir)) - with tarfile.open(self.archive) as tar: - tar.extractall(self.tmpdir) - except Exception: - self.exit_gracefully(ExportStatus.ERROR_EXTRACTION.value) - - def check_usb_connected(self, exit=False) -> None: - usb_devices = self._get_connected_usbs() - - if len(usb_devices) == 0: - logger.info('0 USB devices connected') - self.exit_gracefully(ExportStatus.USB_NOT_CONNECTED.value) - elif len(usb_devices) == 1: - logger.info('1 USB device connected') - self.device = usb_devices[0] - if exit: - self.exit_gracefully(ExportStatus.USB_CONNECTED.value) - elif len(usb_devices) > 1: - logger.info('>1 USB devices connected') - # Return generic error until freedomofpress/securedrop-export/issues/25 - self.exit_gracefully(ExportStatus.ERROR_GENERIC.value) - - def _get_connected_usbs(self) -> List[str]: - logger.info('Performing usb preflight') - # List all block devices attached to VM that are disks and not partitions. - try: - lsblk = subprocess.Popen(["lsblk", "-o", "NAME,TYPE"], stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - grep = subprocess.Popen(["grep", "disk"], stdin=lsblk.stdout, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - command_output = grep.stdout.readlines() - - # The first word in each element of the command_output list is the device name - attached_devices = [x.decode('utf8').split()[0] for x in command_output] - except subprocess.CalledProcessError: - self.exit_gracefully(ExportStatus.ERROR_GENERIC.value) - - # Determine which are USBs by selecting those block devices that are removable disks. - usb_devices = [] - for device in attached_devices: - try: - removable = subprocess.check_output( - ["cat", "/sys/class/block/{}/removable".format(device)], - stderr=subprocess.PIPE) - is_removable = int(removable.decode('utf8').strip()) - except subprocess.CalledProcessError: - is_removable = False - - if is_removable: - usb_devices.append("/dev/{}".format(device)) - - return usb_devices - - def set_extracted_device_name(self): - try: - device_and_partitions = subprocess.check_output( - ["lsblk", "-o", "TYPE", "--noheadings", self.device], stderr=subprocess.PIPE) - - # we don't support multiple partitions - partition_count = device_and_partitions.decode('utf-8').split('\n').count('part') - if partition_count > 1: - logger.debug("multiple partitions not supported") - self.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value) - - # redefine device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted - self.device = self.device if partition_count == 0 else self.device + '1' - except subprocess.CalledProcessError: - self.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value) - - def check_luks_volume(self): - # cryptsetup isLuks returns 0 if the device is a luks volume - # subprocess with throw if the device is not luks (rc !=0) - logger.info('Checking if volume is luks-encrypted') - self.set_extracted_device_name() - logger.debug("checking if {} is luks encrypted".format(self.device)) - self.safe_check_call( - command=["sudo", "cryptsetup", "isLuks", self.device], - error_message=ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value - ) - self.exit_gracefully(ExportStatus.USB_ENCRYPTED.value) - - def unlock_luks_volume(self, encryption_key): - try: - # get the encrypted device name - self.set_extracted_device_name() - luks_header = subprocess.check_output(["sudo", "cryptsetup", "luksDump", self.device]) - luks_header_list = luks_header.decode('utf-8').split('\n') - for line in luks_header_list: - items = line.split('\t') - if 'UUID' in items[0]: - self.encrypted_device = 'luks-' + items[1] - - # the luks device is already unlocked - if os.path.exists(os.path.join('/dev/mapper/', self.encrypted_device)): - logger.debug('Device already unlocked') - return - - logger.debug('Unlocking luks volume {}'.format(self.encrypted_device)) - p = subprocess.Popen( - ["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - logger.debug('Passing key') - p.communicate(input=str.encode(encryption_key, "utf-8")) - rc = p.returncode - if rc != 0: - logger.error('Bad phassphrase for {}'.format(self.encrypted_device)) - self.exit_gracefully(ExportStatus.USB_BAD_PASSPHRASE.value) - except subprocess.CalledProcessError: - self.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED) - - def mount_volume(self): - # If the drive is already mounted then we don't need to mount it again - output = subprocess.check_output( - ["lsblk", "-o", "MOUNTPOINT", "--noheadings", self.device]) - mountpoint = output.decode('utf-8').strip() - if mountpoint: - logger.debug('The device is already mounted') - self.mountpoint = mountpoint - return - - # mount target not created, create folder - if not os.path.exists(self.mountpoint): - self.safe_check_call( - command=["sudo", "mkdir", self.mountpoint], - error_message=ExportStatus.ERROR_USB_MOUNT - ) - - mapped_device_path = os.path.join("/dev/mapper/", self.encrypted_device) - logger.info('Mounting {}'.format(mapped_device_path)) - self.safe_check_call( - command=["sudo", "mount", mapped_device_path, self.mountpoint], - error_message=ExportStatus.ERROR_USB_MOUNT.value - ) - self.safe_check_call( - command=["sudo", "chown", "-R", "user:user", self.mountpoint], - error_message=ExportStatus.ERROR_USB_MOUNT.value - ) - - def copy_submission(self): - # move files to drive (overwrites files with same filename) and unmount drive - # we don't use safe_check_call here because we must lock and - # unmount the drive as part of the finally block - try: - target_path = os.path.join(self.mountpoint, self.target_dirname) - subprocess.check_call(["mkdir", target_path]) - export_data = os.path.join(self.tmpdir, "export_data/") - logger.info('Copying file to {}'.format(self.target_dirname)) - subprocess.check_call(["cp", "-r", export_data, target_path]) - logger.info('File copied successfully to {}'.format(self.target_dirname)) - self.popup_message("Files exported successfully to disk.") - except (subprocess.CalledProcessError, OSError): - self.exit_gracefully(ExportStatus.ERROR_USB_WRITE.value) - finally: - logger.info('Syncing filesystems') - subprocess.check_call(["sync"]) - - if os.path.exists(self.mountpoint): - logger.info('Unmounting drive from {}'.format(self.mountpoint)) - subprocess.check_call(["sudo", "umount", self.mountpoint]) - - if os.path.exists(os.path.join('/dev/mapper', self.encrypted_device)): - logger.info('Locking luks volume {}'.format(self.encrypted_device)) - subprocess.check_call( - ["sudo", "cryptsetup", "luksClose", self.encrypted_device] - ) - - logger.info('Deleting temporary directory {}'.format(self.tmpdir)) - subprocess.check_call(["rm", "-rf", self.tmpdir]) - sys.exit(0) - - def wait_for_print(self): - # use lpstat to ensure the job was fully transfered to the printer - # returns True if print was successful, otherwise will throw exceptions - signal.signal(signal.SIGALRM, handler) - signal.alarm(self.printer_wait_timeout) - printer_idle_string = "printer {} is idle".format(self.printer_name) - while True: - try: - logger.info('Running lpstat waiting for printer {}'.format(self.printer_name)) - output = subprocess.check_output(["lpstat", "-p", self.printer_name]) - if printer_idle_string in output.decode("utf-8"): - logger.info('Print completed') - return True - else: - time.sleep(5) - except subprocess.CalledProcessError: - self.exit_gracefully(ExportStatus.ERROR_PRINT.value) - except TimeoutException: - logger.error('Timeout waiting for printer {}'.format(self.printer_name)) - self.exit_gracefully(ExportStatus.ERROR_PRINT.value) - return True - - def get_printer_uri(self): - # Get the URI via lpinfo and only accept URIs of supported printers - printer_uri = "" - try: - output = subprocess.check_output(["sudo", "lpinfo", "-v"]) - except subprocess.CalledProcessError: - self.exit_gracefully(ExportStatus.ERROR_PRINTER_URI.value) - - # fetch the usb printer uri - for line in output.split(): - if "usb://" in line.decode("utf-8"): - printer_uri = line.decode("utf-8") - logger.info('lpinfo usb printer: {}'.format(printer_uri)) - - # verify that the printer is supported, else exit - if printer_uri == "": - # No usb printer is connected - logger.info('No usb printers connected') - self.exit_gracefully(ExportStatus.ERROR_PRINTER_NOT_FOUND.value) - elif not any(x in printer_uri for x in ("Brother", "LaserJet")): - # printer url is a make that is unsupported - logger.info('Printer {} is unsupported'.format(printer_uri)) - self.exit_gracefully(ExportStatus.ERROR_PRINTER_NOT_SUPPORTED.value) - - logger.info('Printer {} is supported'.format(printer_uri)) - return printer_uri - - def install_printer_ppd(self, uri): - if not any(x in uri for x in ("Brother", "LaserJet")): - logger.error("Cannot install printer ppd for unsupported printer: {}".format(uri)) - self.exit_gracefully(msg=ExportStatus.ERROR_PRINTER_NOT_SUPPORTED.value) - return - - if "Brother" in uri: - printer_driver = BRLASER_DRIVER - printer_ppd = BRLASER_PPD - elif "LaserJet" in uri: - printer_driver = LASERJET_DRIVER - printer_ppd = LASERJET_PPD - - # Some drivers don't come with ppd files pre-compiled, we must compile them - self.safe_check_call( - command=[ - "sudo", - "ppdc", - printer_driver, - "-d", - "/usr/share/cups/model/", - ], - error_message=ExportStatus.ERROR_PRINTER_DRIVER_UNAVAILABLE.value - ) - return printer_ppd - - def setup_printer(self, printer_uri, printer_ppd): - # Add the printer using lpadmin - logger.info('Setting up printer name {}'.format(self.printer_name)) - self.safe_check_call( - command=[ - "sudo", - "lpadmin", - "-p", - self.printer_name, - "-v", - printer_uri, - "-P", - printer_ppd, - ], - error_message=ExportStatus.ERROR_PRINTER_INSTALL.value - ) - # Activate the printer so that it can receive jobs - logger.info('Activating printer {}'.format(self.printer_name)) - self.safe_check_call( - command=["sudo", "lpadmin", "-p", self.printer_name], - error_message=ExportStatus.ERROR_PRINTER_INSTALL.value - ) - # worksaround for version of lpadmin/cups in debian buster: - # see https://forums.developer.apple.com/thread/106112 - self.safe_check_call( - command=["sudo", "cupsaccept", self.printer_name], - error_message=ExportStatus.ERROR_PRINTER_INSTALL.value - ) - # A non-zero return code is expected here, but the command is required - # and works as expected. - command = ["sudo", "cupsenable", self.printer_name] - try: - subprocess.check_call(command) - except subprocess.CalledProcessError: - pass - - # Allow user to print (without using sudo) - logger.info('Allow user to print {}'.format(self.printer_name)) - self.safe_check_call( - command=["sudo", "lpadmin", "-p", self.printer_name, "-u", "allow:user"], - error_message=ExportStatus.ERROR_PRINTER_INSTALL.value - ) - - def print_test_page(self): - logger.info('Printing test page') - self.print_file("/usr/share/cups/data/testprint") - self.popup_message("Printing test page") - - def print_all_files(self): - files_path = os.path.join(self.tmpdir, "export_data/") - files = os.listdir(files_path) - print_count = 0 - for f in files: - file_path = os.path.join(files_path, f) - self.print_file(file_path) - print_count += 1 - msg = "Printing document {} of {}".format(print_count, len(files)) - self.popup_message(msg) - - def is_open_office_file(self, filename): - OPEN_OFFICE_FORMATS = [ - ".doc", - ".docx", - ".xls", - ".xlsx", - ".ppt", - ".pptx", - ".odt", - ".ods", - ".odp", - ] - for extension in OPEN_OFFICE_FORMATS: - if os.path.basename(filename).endswith(extension): - return True - return False - - def print_file(self, file_to_print): - # If the file to print is an (open)office document, we need to call unoconf to - # convert the file to pdf as printer drivers do not support this format - if self.is_open_office_file(file_to_print): - logger.info('Converting Office document to pdf') - folder = os.path.dirname(file_to_print) - converted_filename = file_to_print + ".pdf" - converted_path = os.path.join(folder, converted_filename) - self.safe_check_call( - command=["unoconv", "-o", converted_path, file_to_print], - error_message=ExportStatus.ERROR_PRINT.value - ) - file_to_print = converted_path - - logger.info('Sending file to printer {}:{}'.format(self.printer_name, file_to_print)) - self.safe_check_call( - command=["xpp", "-P", self.printer_name, file_to_print], - error_message=ExportStatus.ERROR_PRINT.value - ) - - -# class ends here -class TimeoutException(Exception): - pass +class ExportAction(abc.ABC): + """ + This export interface defines the method that export + methods should implement. + """ -def handler(s, f): - raise TimeoutException("Timeout") + @abc.abstractmethod + def run(self) -> None: + """Run logic""" + pass diff --git a/securedrop_export/main.py b/securedrop_export/main.py index d3b5a0b..b68dce1 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -1,7 +1,9 @@ import logging from securedrop_export import export -from securedrop_export.export import ExportStatus +from securedrop_export.exceptions import ExportStatus +from securedrop_export.print.actions import PrintExportAction, PrintTestPageAction +from securedrop_export.disk.actions import DiskTestAction, DiskExportAction, USBTestAction logger = logging.getLogger(__name__) @@ -14,42 +16,18 @@ def __main__(submission): except Exception: submission.exit_gracefully(ExportStatus.ERROR_METADATA_PARSING.value) - if submission.archive_metadata.is_valid(): - if submission.archive_metadata.export_method == "usb-test": - logger.info('Export archive is usb-test') - submission.check_usb_connected(exit=True) - elif submission.archive_metadata.export_method == "disk": - logger.info('Export archive is disk') - # check_usb_connected looks for the drive, sets the drive to use - submission.check_usb_connected() - logger.info('Unlocking volume') - # exports all documents in the archive to luks-encrypted volume - submission.unlock_luks_volume(submission.archive_metadata.encryption_key) - logger.info('Mounting volume') - submission.mount_volume() - logger.info('Copying submission to drive') - submission.copy_submission() - elif submission.archive_metadata.export_method == "disk-test": - logger.info('Export archive is disk-test') - # check_usb_connected looks for the drive, sets the drive to use - submission.check_usb_connected() - submission.check_luks_volume() - elif submission.archive_metadata.export_method == "printer": - logger.info('Export archive is printer') - # prints all documents in the archive - logger.info('Searching for printer') - printer_uri = submission.get_printer_uri() - logger.info('Installing printer drivers') - printer_ppd = submission.install_printer_ppd(printer_uri) - logger.info('Setting up printer') - submission.setup_printer(printer_uri, printer_ppd) - logger.info('Printing files') - submission.print_all_files() - elif submission.archive_metadata.export_method == "printer-test": - # Prints a test page to ensure the printer is functional - printer_uri = submission.get_printer_uri() - printer_ppd = submission.install_printer_ppd(printer_uri) - submission.setup_printer(printer_uri, printer_ppd) - submission.print_test_page() - else: + if not submission.archive_metadata.is_valid(): submission.exit_gracefully(ExportStatus.ERROR_ARCHIVE_METADATA.value) + + if submission.archive_metadata.export_method == "usb-test": + action = USBTestAction(submission) + elif submission.archive_metadata.export_method == "disk": + action = DiskExportAction(submission) + elif submission.archive_metadata.export_method == "disk-test": + action = DiskTestAction(submission) + elif submission.archive_metadata.export_method == "printer": + action = PrintExportAction(submission) + elif submission.archive_metadata.export_method == "printer-test": + action = PrintTestPageAction(submission) + + action.run() diff --git a/securedrop_export/print/__init__.py b/securedrop_export/print/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/securedrop_export/print/actions.py b/securedrop_export/print/actions.py new file mode 100644 index 0000000..f58ff4c --- /dev/null +++ b/securedrop_export/print/actions.py @@ -0,0 +1,229 @@ +import logging +import os +import signal +import subprocess +import time + +from securedrop_export.exceptions import ExportStatus, handler, TimeoutException +from securedrop_export.export import ExportAction + + +PRINTER_NAME = "sdw-printer" +PRINTER_WAIT_TIMEOUT = 60 +BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" +BRLASER_PPD = "/usr/share/cups/model/br7030.ppd" +LASERJET_DRIVER = "/usr/share/cups/drv/hpcups.drv" +LASERJET_PPD = "/usr/share/cups/model/hp-laserjet_6l.ppd" + +logger = logging.getLogger(__name__) + + +class PrintAction(ExportAction): + def __init__(self, submission): + self.submission = submission + self.printer_name = PRINTER_NAME + self.printer_wait_timeout = PRINTER_WAIT_TIMEOUT + + def run(self) -> None: + """Run logic""" + raise NotImplementedError + + def wait_for_print(self): + # use lpstat to ensure the job was fully transfered to the printer + # returns True if print was successful, otherwise will throw exceptions + signal.signal(signal.SIGALRM, handler) + signal.alarm(self.printer_wait_timeout) + printer_idle_string = "printer {} is idle".format(self.printer_name) + while True: + try: + logger.info('Running lpstat waiting for printer {}'.format(self.printer_name)) + output = subprocess.check_output(["lpstat", "-p", self.printer_name]) + if printer_idle_string in output.decode("utf-8"): + logger.info('Print completed') + return True + else: + time.sleep(5) + except subprocess.CalledProcessError: + self.submission.exit_gracefully(ExportStatus.ERROR_PRINT.value) + except TimeoutException: + logger.error('Timeout waiting for printer {}'.format(self.printer_name)) + self.submission.exit_gracefully(ExportStatus.ERROR_PRINT.value) + return True + + def get_printer_uri(self): + # Get the URI via lpinfo and only accept URIs of supported printers + printer_uri = "" + try: + output = subprocess.check_output(["sudo", "lpinfo", "-v"]) + except subprocess.CalledProcessError: + self.submission.exit_gracefully(ExportStatus.ERROR_PRINTER_URI.value) + + # fetch the usb printer uri + for line in output.split(): + if "usb://" in line.decode("utf-8"): + printer_uri = line.decode("utf-8") + logger.info('lpinfo usb printer: {}'.format(printer_uri)) + + # verify that the printer is supported, else exit + if printer_uri == "": + # No usb printer is connected + logger.info('No usb printers connected') + self.submission.exit_gracefully(ExportStatus.ERROR_PRINTER_NOT_FOUND.value) + elif not any(x in printer_uri for x in ("Brother", "LaserJet")): + # printer url is a make that is unsupported + logger.info('Printer {} is unsupported'.format(printer_uri)) + self.submission.exit_gracefully(ExportStatus.ERROR_PRINTER_NOT_SUPPORTED.value) + + logger.info('Printer {} is supported'.format(printer_uri)) + return printer_uri + + def install_printer_ppd(self, uri): + if not any(x in uri for x in ("Brother", "LaserJet")): + logger.error("Cannot install printer ppd for unsupported printer: {}".format(uri)) + self.submission.exit_gracefully(msg=ExportStatus.ERROR_PRINTER_NOT_SUPPORTED.value) + return + + if "Brother" in uri: + printer_driver = BRLASER_DRIVER + printer_ppd = BRLASER_PPD + elif "LaserJet" in uri: + printer_driver = LASERJET_DRIVER + printer_ppd = LASERJET_PPD + + # Some drivers don't come with ppd files pre-compiled, we must compile them + self.submission.safe_check_call( + command=[ + "sudo", + "ppdc", + printer_driver, + "-d", + "/usr/share/cups/model/", + ], + error_message=ExportStatus.ERROR_PRINTER_DRIVER_UNAVAILABLE.value + ) + return printer_ppd + + def setup_printer(self, printer_uri, printer_ppd): + # Add the printer using lpadmin + logger.info('Setting up printer name {}'.format(self.printer_name)) + self.submission.safe_check_call( + command=[ + "sudo", + "lpadmin", + "-p", + self.printer_name, + "-v", + printer_uri, + "-P", + printer_ppd, + ], + error_message=ExportStatus.ERROR_PRINTER_INSTALL.value + ) + # Activate the printer so that it can receive jobs + logger.info('Activating printer {}'.format(self.printer_name)) + self.submission.safe_check_call( + command=["sudo", "lpadmin", "-p", self.printer_name], + error_message=ExportStatus.ERROR_PRINTER_INSTALL.value + ) + # worksaround for version of lpadmin/cups in debian buster: + # see https://forums.developer.apple.com/thread/106112 + self.submission.safe_check_call( + command=["sudo", "cupsaccept", self.printer_name], + error_message=ExportStatus.ERROR_PRINTER_INSTALL.value + ) + # A non-zero return code is expected here, but the command is required + # and works as expected. + command = ["sudo", "cupsenable", self.printer_name] + try: + subprocess.check_call(command) + except subprocess.CalledProcessError: + pass + + # Allow user to print (without using sudo) + logger.info('Allow user to print {}'.format(self.printer_name)) + self.submission.safe_check_call( + command=["sudo", "lpadmin", "-p", self.printer_name, "-u", "allow:user"], + error_message=ExportStatus.ERROR_PRINTER_INSTALL.value + ) + + def print_test_page(self): + logger.info('Printing test page') + self.print_file("/usr/share/cups/data/testprint") + self.submission.popup_message("Printing test page") + + def print_all_files(self): + files_path = os.path.join(self.submission.tmpdir, "export_data/") + files = os.listdir(files_path) + print_count = 0 + for f in files: + file_path = os.path.join(files_path, f) + self.print_file(file_path) + print_count += 1 + msg = "Printing document {} of {}".format(print_count, len(files)) + self.submission.popup_message(msg) + + def is_open_office_file(self, filename): + OPEN_OFFICE_FORMATS = [ + ".doc", + ".docx", + ".xls", + ".xlsx", + ".ppt", + ".pptx", + ".odt", + ".ods", + ".odp", + ] + for extension in OPEN_OFFICE_FORMATS: + if os.path.basename(filename).endswith(extension): + return True + return False + + def print_file(self, file_to_print): + # If the file to print is an (open)office document, we need to call unoconf to + # convert the file to pdf as printer drivers do not support this format + if self.is_open_office_file(file_to_print): + logger.info('Converting Office document to pdf') + folder = os.path.dirname(file_to_print) + converted_filename = file_to_print + ".pdf" + converted_path = os.path.join(folder, converted_filename) + self.submission.safe_check_call( + command=["unoconv", "-o", converted_path, file_to_print], + error_message=ExportStatus.ERROR_PRINT.value + ) + file_to_print = converted_path + + logger.info('Sending file to printer {}:{}'.format(self.printer_name, file_to_print)) + self.submission.safe_check_call( + command=["xpp", "-P", self.printer_name, file_to_print], + error_message=ExportStatus.ERROR_PRINT.value + ) + + +class PrintExportAction(PrintAction): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def run(self): + logger.info('Export archive is printer') + # prints all documents in the archive + logger.info('Searching for printer') + printer_uri = self.get_printer_uri() + logger.info('Installing printer drivers') + printer_ppd = self.install_printer_ppd(printer_uri) + logger.info('Setting up printer') + self.setup_printer(printer_uri, printer_ppd) + logger.info('Printing files') + self.print_all_files() + + +class PrintTestPageAction(PrintAction): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def run(self): + # Prints a test page to ensure the printer is functional + printer_uri = self.get_printer_uri() + printer_ppd = self.install_printer_ppd(printer_uri) + self.setup_printer(printer_uri, printer_ppd) + self.print_test_page() diff --git a/tests/disk/__init__.py b/tests/disk/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/disk/test_actions.py b/tests/disk/test_actions.py new file mode 100644 index 0000000..c349985 --- /dev/null +++ b/tests/disk/test_actions.py @@ -0,0 +1,203 @@ +from unittest import mock + +import os +import pytest +import sys + +from subprocess import CalledProcessError + +from securedrop_export import export +from securedrop_export.disk.actions import DiskExportAction, DiskTestAction + +TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") +SAMPLE_OUTPUT_NO_PART = b"disk\ncrypt" # noqa +SAMPLE_OUTPUT_ONE_PART = b"disk\npart\ncrypt" # noqa +SAMPLE_OUTPUT_MULTI_PART = b"disk\npart\npart\npart\ncrypt" # noqa +SAMPLE_OUTPUT_USB = b"/dev/sda" # noqa + + +def test_usb_precheck_disconnected(capsys, mocker): + """Tests the scenario where there are disks connected, but none of them are USB""" + submission = export.SDExport("testfile", TEST_CONFIG) + action = DiskTestAction(submission) + expected_message = "USB_NOT_CONNECTED" + assert export.ExportStatus.USB_NOT_CONNECTED.value == expected_message + + # Popen call returns lsblk output + command_output = mock.MagicMock() + command_output.stdout = mock.MagicMock() + command_output.stdout.readlines = mock.MagicMock(return_value=[b"sda disk\n", b"sdb disk\n"]) + mocker.patch("subprocess.Popen", return_value=command_output) + + # check_output returns removable status + mocker.patch("subprocess.check_output", return_value=[b'0\n', b'0\n']) + + mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) + + mocker.patch("subprocess.check_output", + side_effect=CalledProcessError(1, 'check_output')) + + action.check_usb_connected(exit=True) + + mocked_exit.assert_called_once_with(expected_message) + assert action.device is None + + +def test_usb_precheck_connected(capsys, mocker): + """Tests the scenario where there is one USB connected""" + submission = export.SDExport("testfile", TEST_CONFIG) + action = DiskTestAction(submission) + + # Popen call returns lsblk output + command_output = mock.MagicMock() + command_output.stdout = mock.MagicMock() + command_output.stdout.readlines = mock.MagicMock(return_value=[b"sdb disk\n"]) + mocker.patch("subprocess.Popen", return_value=command_output) + + # check_output returns removable status + mocker.patch("subprocess.check_output", return_value=b"1\n") + + expected_message = "USB_CONNECTED" + assert export.ExportStatus.USB_CONNECTED.value == expected_message + mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) + + action.check_usb_connected(exit=True) + + mocked_exit.assert_called_once_with(expected_message) + assert action.device == "/dev/sdb" + + +def test_usb_precheck_multiple_devices_connected(capsys, mocker): + """Tests the scenario where there are multiple USB drives connected""" + submission = export.SDExport("testfile", TEST_CONFIG) + action = DiskTestAction(submission) + + # Popen call returns lsblk output + command_output = mock.MagicMock() + command_output.stdout = mock.MagicMock() + command_output.stdout.readlines = mock.MagicMock(return_value=[b"sdb disk\n", b"sdc disk\n"]) + mocker.patch("subprocess.Popen", return_value=command_output) + + # check_output returns removable status + mocker.patch("subprocess.check_output", return_value=b"1\n") + + expected_message = "ERROR_GENERIC" + assert export.ExportStatus.ERROR_GENERIC.value == expected_message + mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) + + action.check_usb_connected(exit=True) + + mocked_exit.assert_called_once_with(expected_message) + assert action.device is None + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PART) +def test_extract_device_name_no_part(mocked_call, capsys): + submission = export.SDExport("testfile", TEST_CONFIG) + action = DiskExportAction(submission) + + action.device = "/dev/sda" + + action.set_extracted_device_name() + + assert action.device == "/dev/sda" + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) +def test_extract_device_name_single_part(mocked_call, capsys): + submission = export.SDExport("testfile", TEST_CONFIG) + action = DiskExportAction(submission) + + action.device = "/dev/sda" + + action.set_extracted_device_name() + + assert action.device == "/dev/sda1" + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART) +def test_extract_device_name_multiple_part(mocked_call, capsys, mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = DiskExportAction(submission) + action.device = "/dev/sda" + mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) + expected_message = export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value + + action.set_extracted_device_name() + + mocked_exit.assert_called_once_with(expected_message) + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PART) +@mock.patch("subprocess.check_call", return_value=0) +def test_luks_precheck_encrypted_fde(mocked_call, capsys, mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = DiskExportAction(submission) + + expected_message = export.ExportStatus.USB_ENCRYPTED.value + mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) + + action.check_luks_volume() + + mocked_exit.assert_called_once_with(expected_message) + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) +@mock.patch("subprocess.check_call", return_value=0) +def test_luks_precheck_encrypted_single_part(mocked_call, capsys, mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = DiskExportAction(submission) + action.device = "/dev/sda" + expected_message = export.ExportStatus.USB_ENCRYPTED.value + mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) + + action.check_luks_volume() + + mocked_exit.assert_called_once_with(expected_message) + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART) +def test_luks_precheck_encrypted_multi_part(mocked_call, capsys, mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = DiskExportAction(submission) + action.device = "/dev/sda" + expected_message = export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value + + # Here we need to mock the exit_gracefully method with a side effect otherwise + # program execution will continue after exit_gracefully and exit_gracefully + # may be called a second time. + mocked_exit = mocker.patch.object(submission, "exit_gracefully", + side_effect=lambda x: sys.exit(0)) + + # Output of `lsblk -o TYPE --noheadings DEVICE_NAME` when a drive has multiple + # partitions + multi_partition_lsblk_output = b"disk\npart\npart\n" + mocker.patch("subprocess.check_call", return_value=0) + mocker.patch("subprocess.check_output", return_value=multi_partition_lsblk_output) + + with pytest.raises(SystemExit): + action.check_luks_volume() + + mocked_exit.assert_called_once_with(expected_message) + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) +def test_luks_precheck_encrypted_luks_error(mocked_call, capsys, mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = DiskExportAction(submission) + action.device = "/dev/sda" + expected_message = "USB_ENCRYPTION_NOT_SUPPORTED" + assert expected_message == export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value + + mocked_exit = mocker.patch.object(submission, "exit_gracefully", + side_effect=lambda msg, e: sys.exit(0)) + + single_partition_lsblk_output = b"disk\npart\n" + mocker.patch("subprocess.check_output", return_value=single_partition_lsblk_output) + mocker.patch("subprocess.check_call", side_effect=CalledProcessError(1, 'check_call')) + + with pytest.raises(SystemExit): + action.check_luks_volume() + + assert mocked_exit.mock_calls[0][2]['msg'] == expected_message + assert mocked_exit.mock_calls[0][2]['e'] is None diff --git a/tests/print/__init__.py b/tests/print/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/print/test_actions.py b/tests/print/test_actions.py new file mode 100644 index 0000000..c500e8f --- /dev/null +++ b/tests/print/test_actions.py @@ -0,0 +1,127 @@ +from unittest import mock + +import os +import pytest +from subprocess import CalledProcessError +import sys + +from securedrop_export import export +from securedrop_export.print.actions import PrintExportAction + + +SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa +SAMPLE_OUTPUT_BROTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa +SAMPLE_OUTPUT_LASERJET_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000\nnetwork lpd" # noqa +TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_BROTHER_PRINTER) +def test_get_good_printer_uri_laserjet(mocked_call): + submission = export.SDExport("testfile", TEST_CONFIG) + action = PrintExportAction(submission) + + result = action.get_printer_uri() + + assert result == "usb://Brother/HL-L2320D%20series?serial=A00000A000000" + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_LASERJET_PRINTER) +def test_get_good_printer_uri_brother(mocked_call): + submission = export.SDExport("testfile", TEST_CONFIG) + action = PrintExportAction(submission) + + result = action.get_printer_uri() + assert result == "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000" + + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER) +def test_get_bad_printer_uri(mocked_call, capsys, mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = PrintExportAction(submission) + expected_message = "ERROR_PRINTER_NOT_FOUND" + assert export.ExportStatus.ERROR_PRINTER_NOT_FOUND.value == expected_message + mocked_exit = mocker.patch.object(submission, "exit_gracefully", + side_effect=lambda x: sys.exit(0)) + + with pytest.raises(SystemExit): + action.get_printer_uri() + + mocked_exit.assert_called_once_with(expected_message) + + +@pytest.mark.parametrize('open_office_paths', [ + "/tmp/whatver/thisisadoc.doc" + "/home/user/Downloads/thisisadoc.xlsx" + "/home/user/Downloads/file.odt" + "/tmp/tmpJf83j9/secret.pptx" +]) +def test_is_open_office_file(capsys, open_office_paths): + submission = export.SDExport("", TEST_CONFIG) + action = PrintExportAction(submission) + assert action.is_open_office_file(open_office_paths) + + +@pytest.mark.parametrize('open_office_paths', [ + "/tmp/whatver/thisisadoc.doccc" + "/home/user/Downloads/thisisa.xlsx.zip" + "/home/user/Downloads/file.odz" + "/tmp/tmpJf83j9/secret.gpg" +]) +def test_is_not_open_office_file(capsys, open_office_paths): + submission = export.SDExport("", TEST_CONFIG) + action = PrintExportAction(submission) + assert not action.is_open_office_file(open_office_paths) + + +@mock.patch("subprocess.check_call") +def test_install_printer_ppd_laserjet(mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = PrintExportAction(submission) + ppd = action.install_printer_ppd("usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A00000") + assert ppd == "/usr/share/cups/model/hp-laserjet_6l.ppd" + + +@mock.patch("subprocess.check_call") +def test_install_printer_ppd_brother(mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = PrintExportAction(submission) + ppd = action.install_printer_ppd("usb://Brother/HL-L2320D%20series?serial=A00000A000000") + assert ppd == "/usr/share/cups/model/br7030.ppd" + + +def test_install_printer_ppd_error_no_driver(mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = PrintExportAction(submission) + mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) + mocker.patch("subprocess.check_call", side_effect=CalledProcessError(1, 'check_call')) + + action.install_printer_ppd("usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000") + + assert mocked_exit.mock_calls[0][2]['msg'] == "ERROR_PRINTER_DRIVER_UNAVAILABLE" + assert mocked_exit.mock_calls[0][2]['e'] is None + + +def test_install_printer_ppd_error_not_supported(mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = PrintExportAction(submission) + mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) + mocker.patch("subprocess.check_call", side_effect=CalledProcessError(1, 'check_call')) + + action.install_printer_ppd("usb://Not/Supported?serial=A00000A000000") + + assert mocked_exit.mock_calls[0][2]['msg'] == "ERROR_PRINTER_NOT_SUPPORTED" + + +def test_setup_printer_error(mocker): + submission = export.SDExport("testfile", TEST_CONFIG) + action = PrintExportAction(submission) + mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) + mocker.patch("subprocess.check_call", side_effect=CalledProcessError(1, 'check_call')) + + action.setup_printer( + "usb://Brother/HL-L2320D%20series?serial=A00000A000000", + "/usr/share/cups/model/br7030.ppd" + ) + + assert mocked_exit.mock_calls[0][2]['msg'] == "ERROR_PRINTER_INSTALL" + assert mocked_exit.mock_calls[0][2]['e'] is None diff --git a/tests/test_export.py b/tests/test_export.py index 9579d43..9566576 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -3,20 +3,10 @@ import os import pytest import subprocess # noqa: F401 -import sys import tempfile -from subprocess import CalledProcessError from securedrop_export import export -SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa -SAMPLE_OUTPUT_BROTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa -SAMPLE_OUTPUT_LASERJET_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000\nnetwork lpd" # noqa - -SAMPLE_OUTPUT_NO_PART = b"disk\ncrypt" # noqa -SAMPLE_OUTPUT_ONE_PART = b"disk\npart\ncrypt" # noqa -SAMPLE_OUTPUT_MULTI_PART = b"disk\npart\npart\npart\ncrypt" # noqa -SAMPLE_OUTPUT_USB = b"/dev/sda" # noqa TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad.json") ANOTHER_BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad-2.json") @@ -140,232 +130,6 @@ def test_popup_message(mocked_call): ]) -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_BROTHER_PRINTER) -def test_get_good_printer_uri_laserjet(mocked_call): - submission = export.SDExport("testfile", TEST_CONFIG) - - result = submission.get_printer_uri() - - assert result == "usb://Brother/HL-L2320D%20series?serial=A00000A000000" - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_LASERJET_PRINTER) -def test_get_good_printer_uri_brother(mocked_call): - submission = export.SDExport("testfile", TEST_CONFIG) - result = submission.get_printer_uri() - assert result == "usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000" - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER) -def test_get_bad_printer_uri(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - expected_message = "ERROR_PRINTER_NOT_FOUND" - assert export.ExportStatus.ERROR_PRINTER_NOT_FOUND.value == expected_message - mocked_exit = mocker.patch.object(submission, "exit_gracefully", - side_effect=lambda x: sys.exit(0)) - - with pytest.raises(SystemExit): - submission.get_printer_uri() - - mocked_exit.assert_called_once_with(expected_message) - - -@pytest.mark.parametrize('open_office_paths', [ - "/tmp/whatver/thisisadoc.doc" - "/home/user/Downloads/thisisadoc.xlsx" - "/home/user/Downloads/file.odt" - "/tmp/tmpJf83j9/secret.pptx" -]) -def test_is_open_office_file(capsys, open_office_paths): - submission = export.SDExport("", TEST_CONFIG) - assert submission.is_open_office_file(open_office_paths) - - -@pytest.mark.parametrize('open_office_paths', [ - "/tmp/whatver/thisisadoc.doccc" - "/home/user/Downloads/thisisa.xlsx.zip" - "/home/user/Downloads/file.odz" - "/tmp/tmpJf83j9/secret.gpg" -]) -def test_is_not_open_office_file(capsys, open_office_paths): - submission = export.SDExport("", TEST_CONFIG) - assert not submission.is_open_office_file(open_office_paths) - - -def test_usb_precheck_disconnected(capsys, mocker): - """Tests the scenario where there are disks connected, but none of them are USB""" - submission = export.SDExport("testfile", TEST_CONFIG) - expected_message = "USB_NOT_CONNECTED" - assert export.ExportStatus.USB_NOT_CONNECTED.value == expected_message - - # Popen call returns lsblk output - command_output = mock.MagicMock() - command_output.stdout = mock.MagicMock() - command_output.stdout.readlines = mock.MagicMock(return_value=[b"sda disk\n", b"sdb disk\n"]) - mocker.patch("subprocess.Popen", return_value=command_output) - - # check_output returns removable status - mocker.patch("subprocess.check_output", return_value=[b'0\n', b'0\n']) - - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - - mocker.patch("subprocess.check_output", - side_effect=CalledProcessError(1, 'check_output')) - - submission.check_usb_connected(exit=True) - - mocked_exit.assert_called_once_with(expected_message) - assert submission.device is None - - -def test_usb_precheck_connected(capsys, mocker): - """Tests the scenario where there is one USB connected""" - submission = export.SDExport("testfile", TEST_CONFIG) - - # Popen call returns lsblk output - command_output = mock.MagicMock() - command_output.stdout = mock.MagicMock() - command_output.stdout.readlines = mock.MagicMock(return_value=[b"sdb disk\n"]) - mocker.patch("subprocess.Popen", return_value=command_output) - - # check_output returns removable status - mocker.patch("subprocess.check_output", return_value=b"1\n") - - expected_message = "USB_CONNECTED" - assert export.ExportStatus.USB_CONNECTED.value == expected_message - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - - submission.check_usb_connected(exit=True) - - mocked_exit.assert_called_once_with(expected_message) - assert submission.device == "/dev/sdb" - - -def test_usb_precheck_multiple_devices_connected(capsys, mocker): - """Tests the scenario where there are multiple USB drives connected""" - submission = export.SDExport("testfile", TEST_CONFIG) - - # Popen call returns lsblk output - command_output = mock.MagicMock() - command_output.stdout = mock.MagicMock() - command_output.stdout.readlines = mock.MagicMock(return_value=[b"sdb disk\n", b"sdc disk\n"]) - mocker.patch("subprocess.Popen", return_value=command_output) - - # check_output returns removable status - mocker.patch("subprocess.check_output", return_value=b"1\n") - - expected_message = "ERROR_GENERIC" - assert export.ExportStatus.ERROR_GENERIC.value == expected_message - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - - submission.check_usb_connected(exit=True) - - mocked_exit.assert_called_once_with(expected_message) - assert submission.device is None - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PART) -def test_extract_device_name_no_part(mocked_call, capsys): - submission = export.SDExport("testfile", TEST_CONFIG) - submission.device = "/dev/sda" - - submission.set_extracted_device_name() - - assert submission.device == "/dev/sda" - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) -def test_extract_device_name_single_part(mocked_call, capsys): - submission = export.SDExport("testfile", TEST_CONFIG) - submission.device = "/dev/sda" - - submission.set_extracted_device_name() - - assert submission.device == "/dev/sda1" - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART) -def test_extract_device_name_multiple_part(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - submission.device = "/dev/sda" - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - expected_message = export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value - - submission.set_extracted_device_name() - - mocked_exit.assert_called_once_with(expected_message) - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PART) -@mock.patch("subprocess.check_call", return_value=0) -def test_luks_precheck_encrypted_fde(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - expected_message = export.ExportStatus.USB_ENCRYPTED.value - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - - submission.check_luks_volume() - - mocked_exit.assert_called_once_with(expected_message) - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) -@mock.patch("subprocess.check_call", return_value=0) -def test_luks_precheck_encrypted_single_part(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - submission.device = "/dev/sda" - expected_message = export.ExportStatus.USB_ENCRYPTED.value - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - - submission.check_luks_volume() - - mocked_exit.assert_called_once_with(expected_message) - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART) -def test_luks_precheck_encrypted_multi_part(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - submission.device = "/dev/sda" - expected_message = export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value - - # Here we need to mock the exit_gracefully method with a side effect otherwise - # program execution will continue after exit_gracefully and exit_gracefully - # may be called a second time. - mocked_exit = mocker.patch.object(submission, "exit_gracefully", - side_effect=lambda x: sys.exit(0)) - - # Output of `lsblk -o TYPE --noheadings DEVICE_NAME` when a drive has multiple - # partitions - multi_partition_lsblk_output = b"disk\npart\npart\n" - mocker.patch("subprocess.check_call", return_value=0) - mocker.patch("subprocess.check_output", return_value=multi_partition_lsblk_output) - - with pytest.raises(SystemExit): - submission.check_luks_volume() - - mocked_exit.assert_called_once_with(expected_message) - - -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) -def test_luks_precheck_encrypted_luks_error(mocked_call, capsys, mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - submission.device = "/dev/sda" - expected_message = "USB_ENCRYPTION_NOT_SUPPORTED" - assert expected_message == export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value - - mocked_exit = mocker.patch.object(submission, "exit_gracefully", - side_effect=lambda msg, e: sys.exit(0)) - - single_partition_lsblk_output = b"disk\npart\n" - mocker.patch("subprocess.check_output", return_value=single_partition_lsblk_output) - mocker.patch("subprocess.check_call", side_effect=CalledProcessError(1, 'check_call')) - - with pytest.raises(SystemExit): - submission.check_luks_volume() - - assert mocked_exit.mock_calls[0][2]['msg'] == expected_message - assert mocked_exit.mock_calls[0][2]['e'] is None - - def test_safe_check_call(capsys, mocker): submission = export.SDExport("testfile", TEST_CONFIG) submission.safe_check_call(['ls'], "this will work") @@ -376,52 +140,3 @@ def test_safe_check_call(capsys, mocker): assert mocked_exit.mock_calls[0][2]['msg'] == expected_message assert mocked_exit.mock_calls[0][2]['e'] is None - - -@mock.patch("subprocess.check_call") -def test_install_printer_ppd_laserjet(mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - ppd = submission.install_printer_ppd("usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A00000") - assert ppd == "/usr/share/cups/model/hp-laserjet_6l.ppd" - - -@mock.patch("subprocess.check_call") -def test_install_printer_ppd_brother(mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - ppd = submission.install_printer_ppd("usb://Brother/HL-L2320D%20series?serial=A00000A000000") - assert ppd == "/usr/share/cups/model/br7030.ppd" - - -def test_install_printer_ppd_error_no_driver(mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - mocker.patch("subprocess.check_call", side_effect=CalledProcessError(1, 'check_call')) - - submission.install_printer_ppd("usb://HP/LaserJet%20Pro%20M404-M405?serial=A00000A000000") - - assert mocked_exit.mock_calls[0][2]['msg'] == "ERROR_PRINTER_DRIVER_UNAVAILABLE" - assert mocked_exit.mock_calls[0][2]['e'] is None - - -def test_install_printer_ppd_error_not_supported(mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - mocker.patch("subprocess.check_call", side_effect=CalledProcessError(1, 'check_call')) - - submission.install_printer_ppd("usb://Not/Supported?serial=A00000A000000") - - assert mocked_exit.mock_calls[0][2]['msg'] == "ERROR_PRINTER_NOT_SUPPORTED" - - -def test_setup_printer_error(mocker): - submission = export.SDExport("testfile", TEST_CONFIG) - mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0) - mocker.patch("subprocess.check_call", side_effect=CalledProcessError(1, 'check_call')) - - submission.setup_printer( - "usb://Brother/HL-L2320D%20series?serial=A00000A000000", - "/usr/share/cups/model/br7030.ppd" - ) - - assert mocked_exit.mock_calls[0][2]['msg'] == "ERROR_PRINTER_INSTALL" - assert mocked_exit.mock_calls[0][2]['e'] is None diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..d1e43d2 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,3 @@ +from securedrop_export.main import __main__ # noqa: F401 +# This import ensures at least the imports in main.__main__ +# are executed during a test run