From 131262c86805ab60b7b685826a9eb626b52de5e9 Mon Sep 17 00:00:00 2001 From: Ro Date: Wed, 4 Oct 2023 20:49:54 -0400 Subject: [PATCH 01/14] Add EncryptionScheme.VERACRYPT. Add methods for retrieving unlocked VeraCrypt drive and unlocking VC drive, using cryptsetup. --- securedrop_export/disk/cli.py | 98 ++++++++++++++++++++++++++++++++ securedrop_export/disk/volume.py | 1 + 2 files changed, 99 insertions(+) diff --git a/securedrop_export/disk/cli.py b/securedrop_export/disk/cli.py index abdc0c1..2fa4944 100644 --- a/securedrop_export/disk/cli.py +++ b/securedrop_export/disk/cli.py @@ -11,6 +11,9 @@ logger = logging.getLogger(__name__) +# Entries in /dev/mapper on sd-devices +_DEVMAPPER_SYSTEM = ["control", "dmroot"] + class CLI: """ @@ -260,6 +263,101 @@ def unlock_luks_volume(self, volume: Volume, decryption_key: str) -> Volume: except subprocess.CalledProcessError as ex: raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + def attempt_get_unlocked_veracrypt_volume(self, device: str) -> MountedVolume: + """ + Look for an unlocked VeraCrypt volume in /dev/mapper. + Raise ExportException on error. + """ + + # See if there's a volume in /dev/mapper that is already-unlocked TrueCrypt volume + try: + ls = subprocess.check_output( + ["ls", "/dev/mapper/"], stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + results = ls.decode().rstrip().split("\n") + + # If there are no mapped devices, bail + if len(results) <= len(self._DEVMAPPER_SYSTEM): + raise ExportException(sdstatus=Status.DEVICE_ERROR) + + for entry in results: + if entry not in self._DEVMAPPER_SYSTEM: + res = subprocess.check_output( + [ + "lsblk", + "--noheadings", + "-o", + "NAME,TYPE,MOUNTPOINT", + f"/dev/mapper/{entry}", + ] + ) + name, crypt_type, mountpoint = ( + res.decode().rstrip().split() + ) # Space-separated + + # Notes for our future selves: there is also a `tcrypt-system` identifier. + # It *should* only be used for FDE with TrueCrypt, not for a non-bootable drive. + if crypt_type == "tcrypt" and name == device: + vol = Volume( + device_name=name, + mapped_name=entry, + encryption=EncryptionScheme.VERACRYPT, + ) + if mountpoint: + return MountedVolume.from_volume(vol) + else: + return self.mount_volume(vol) + + except subprocess.CalledProcessError as e: + logger.error(e) + raise ExportException(sdstatus=Status.DEVICE_ERROR) + + def attempt_unlock_veracrypt( + self, volume: Volume, encryption_key: str + ) -> MountedVolume: + """ + Attempt to unlock and mount a VeraCrypt drive at the default mountpoint. + """ + try: + with subprocess.Popen( + [ + "sudo", + "cryptsetup", + "open", + "--type", + "tcrypt", + "--veracrypt", + f"{volume.device_name}", + self._DEFAULT_MOUNTPOINT, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) as p: + p.communicate(input=str.encode(encryption_key, "utf-8")) + rc = p.returncode + + if rc == 0: + return MountedVolume( + device_name=volume.device_name, + mapped_name=volume.mapped_name, + encryption=EncryptionScheme.VERACRYPT, + mountpoint=self._DEFAULT_MOUNTPOINT, + ) + + else: + # Something was wrong and we could not unlock. + logger.error( + "Unlocking failed. Bad passphrase, or unsuitable volume." + ) + raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC) + + except subprocess.CalledProcessError as error: + # What kind of error message do we have? We may be able to get a little + # more granular about what the problem is. But basically, unlocking didn't work. + logger.error("Unlocking failed. Bad passphrase, or unsuitable volume.") + logger.error(error) + raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC) # todo + def _get_mountpoint(self, volume: Volume) -> Optional[str]: """ Check for existing mountpoint. diff --git a/securedrop_export/disk/volume.py b/securedrop_export/disk/volume.py index aae7d93..7eff8df 100644 --- a/securedrop_export/disk/volume.py +++ b/securedrop_export/disk/volume.py @@ -9,6 +9,7 @@ class EncryptionScheme(Enum): UNKNOWN = 0 LUKS = 1 + VERACRYPT = 2 class Volume: From f4476fad0c6f63a5feae4b2932596257659d23fa Mon Sep 17 00:00:00 2001 From: Ro Date: Wed, 4 Oct 2023 20:52:25 -0400 Subject: [PATCH 02/14] Add UNKNOWN device status (refers to device that cannot be unlocked, either bad VC passphrase or corrupted drive). Add logic to detect and use VeraCrypt drives. Use new Status and Service for exporting files. Add method to cli.py to check all volumes and return most detailed information about each candidate Volume, including mounting if the volume is already unlocked. --- securedrop_export/disk/__init__.py | 4 +- securedrop_export/disk/cli.py | 167 +++++++++++++++++++--------- securedrop_export/disk/service.py | 171 +++++++++++++++++------------ securedrop_export/disk/status.py | 7 +- securedrop_export/disk/volume.py | 5 +- securedrop_export/main.py | 14 +-- 6 files changed, 236 insertions(+), 132 deletions(-) diff --git a/securedrop_export/disk/__init__.py b/securedrop_export/disk/__init__.py index e610945..760c6e0 100644 --- a/securedrop_export/disk/__init__.py +++ b/securedrop_export/disk/__init__.py @@ -1,2 +1,2 @@ -from .legacy_service import Service as LegacyService # noqa: F401 -from .legacy_status import Status as LegacyStatus # noqa: F401 +from .service import Service # noqa: F401 +from .status import Status # noqa: F401 diff --git a/securedrop_export/disk/cli.py b/securedrop_export/disk/cli.py index 2fa4944..2263d2e 100644 --- a/securedrop_export/disk/cli.py +++ b/securedrop_export/disk/cli.py @@ -26,8 +26,9 @@ class CLI: # Default mountpoint (unless drive is already mounted manually by the user) _DEFAULT_MOUNTPOINT = "/media/usb" + _DEFAULT_VC_CONTAINER_NAME = "vc-volume" - def get_connected_devices(self) -> List[str]: + def _get_connected_devices(self) -> List[str]: """ List all block devices attached to VM that are disks and not partitions. Return list of all removable connected block devices. @@ -85,7 +86,49 @@ def _get_removable_devices(self, attached_devices: List[str]) -> List[str]: logger.info(f"{len(usb_devices)} connected") return usb_devices - def get_partitioned_device(self, blkid: str) -> str: + def get_all_volumes(self) -> [Volume]: + """ + Returns a list of all currently-attached removable Volumes that are + export device candidates, attempting to get as far towards export process + as possible (i.e. probing if device is already unlocked and/or mounted, + and mounting it if appropriate.) + + Caller must handle ExportException. + """ + volumes = [] + removable_devices = self._get_connected_devices() + try: + for item in removable_devices: + blkid = self._get_partitioned_device(item) + if self.is_luks_volume(blkid): + logger.debug("LUKS volume detected. Checking if unlocked.") + volumes.append(self._get_luks_volume(blkid)) + else: + try: + logger.debug( + "Not a LUKS volume. Checking if unlocked VeraCrypt." + ) + volumes.append( + self._attempt_get_unlocked_veracrypt_volume(blkid) + ) + except ExportException: + logger.info("Device is not an unlocked Veracrypt drive.") + volumes.append( + Volume( + device_name=blkid, + encryption=EncryptionScheme.UNKNOWN, + mapped_name=None, + ) + ) + + return volumes + + except ExportException as ex: + logger.error(f"get_all_volumes failed: {ex.sdstatus.value}") + logger.debug(ex) + raise + + def _get_partitioned_device(self, blkid: str) -> str: """ Given a string representing a block device, return string that includes correct partition (such as "/dev/sda" or "/dev/sda1"). @@ -181,7 +224,7 @@ def _get_luks_name_from_headers(self, device: str) -> str: logger.error("Failed to dump LUKS header") raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex - def get_luks_volume(self, device: str) -> Union[Volume, MountedVolume]: + def _get_luks_volume(self, device: str) -> Union[Volume, MountedVolume]: """ Given a string corresponding to a LUKS-partitioned volume, return a corresponding Volume object. @@ -212,7 +255,7 @@ def get_luks_volume(self, device: str) -> Union[Volume, MountedVolume]: # use the existing mountpoint, or mount it ourselves. # Either way, return a MountedVolume. if os.path.exists(os.path.join("/dev/mapper/", mapped_name)): - return self.mount_volume(luks_volume) + return self._mount_volume(luks_volume) # It's still locked else: @@ -263,50 +306,56 @@ def unlock_luks_volume(self, volume: Volume, decryption_key: str) -> Volume: except subprocess.CalledProcessError as ex: raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex - def attempt_get_unlocked_veracrypt_volume(self, device: str) -> MountedVolume: + def _get_dev_mapper_entries(self) -> [str]: """ - Look for an unlocked VeraCrypt volume in /dev/mapper. - Raise ExportException on error. + Helper function to return a list of entries in /dev/mapper/ + (excluding `system` and `dmroot`). """ - - # See if there's a volume in /dev/mapper that is already-unlocked TrueCrypt volume try: ls = subprocess.check_output( ["ls", "/dev/mapper/"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) results = ls.decode().rstrip().split("\n") - # If there are no mapped devices, bail - if len(results) <= len(self._DEVMAPPER_SYSTEM): - raise ExportException(sdstatus=Status.DEVICE_ERROR) - - for entry in results: - if entry not in self._DEVMAPPER_SYSTEM: - res = subprocess.check_output( - [ - "lsblk", - "--noheadings", - "-o", - "NAME,TYPE,MOUNTPOINT", - f"/dev/mapper/{entry}", - ] + return [r for r in results if r not in self._DEVMAPPER_SYSTEM] + + except subprocess.CalledProcessError as ex: + logger.error(f"Error checking entries in /dev/mapper: {ex}") + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + + def _attempt_get_unlocked_veracrypt_volume(self, device_name: str) -> MountedVolume: + """ + Looks for an already-unlocked VeraCrypt volume in /dev/mapper and see if the + device ID matches given device name. + Raises ExportException on error. + """ + try: + for entry in self._get_dev_mapper_entries(): + res = subprocess.check_output( + [ + "lsblk", + "--noheadings", + "-o", + "NAME,TYPE,MOUNTPOINT", + f"/dev/mapper/{entry}", + ] + ) + name, crypt_type, mountpoint = ( + res.decode().rstrip().split() + ) # Space-separated + + # Notes for our future selves: there is also a `tcrypt-system` identifier. + # It *should* only be used for FDE with TrueCrypt, not for a non-bootable drive. + if crypt_type == "tcrypt" and name == device_name: + vol = Volume( + device_name=name, + mapped_name=entry, + encryption=EncryptionScheme.VERACRYPT, ) - name, crypt_type, mountpoint = ( - res.decode().rstrip().split() - ) # Space-separated - - # Notes for our future selves: there is also a `tcrypt-system` identifier. - # It *should* only be used for FDE with TrueCrypt, not for a non-bootable drive. - if crypt_type == "tcrypt" and name == device: - vol = Volume( - device_name=name, - mapped_name=entry, - encryption=EncryptionScheme.VERACRYPT, - ) - if mountpoint: - return MountedVolume.from_volume(vol) - else: - return self.mount_volume(vol) + if mountpoint: + return MountedVolume.from_volume(vol, mountpoint) + else: + return self._mount_at_mountpoint(vol, self._DEFAULT_MOUNTPOINT) except subprocess.CalledProcessError as e: logger.error(e) @@ -316,7 +365,7 @@ def attempt_unlock_veracrypt( self, volume: Volume, encryption_key: str ) -> MountedVolume: """ - Attempt to unlock and mount a VeraCrypt drive at the default mountpoint. + Attempt to unlock and mount a presumed-Veracrypt drive at the default mountpoint. """ try: with subprocess.Popen( @@ -328,7 +377,7 @@ def attempt_unlock_veracrypt( "tcrypt", "--veracrypt", f"{volume.device_name}", - self._DEFAULT_MOUNTPOINT, + f"{self._DEFAULT_VC_CONTAINER_NAME}", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -337,12 +386,13 @@ def attempt_unlock_veracrypt( rc = p.returncode if rc == 0: - return MountedVolume( - device_name=volume.device_name, - mapped_name=volume.mapped_name, - encryption=EncryptionScheme.VERACRYPT, - mountpoint=self._DEFAULT_MOUNTPOINT, - ) + volume.encryption = EncryptionScheme.VERACRYPT + + # Mapped name is /dev/mapper/${self._DEFAULT_VC_CONTAINER_NAME}, since + # the /dev/mapper entry isn't derived from the header like a LUKS drive + volume.mapped_name = self._DEFAULT_VC_CONTAINER_NAME + + return self._mount_volume(volume) else: # Something was wrong and we could not unlock. @@ -356,7 +406,7 @@ def attempt_unlock_veracrypt( # more granular about what the problem is. But basically, unlocking didn't work. logger.error("Unlocking failed. Bad passphrase, or unsuitable volume.") logger.error(error) - raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC) # todo + raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC) def _get_mountpoint(self, volume: Volume) -> Optional[str]: """ @@ -374,7 +424,7 @@ def _get_mountpoint(self, volume: Volume) -> Optional[str]: logger.error(ex) raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex - def mount_volume(self, volume: Volume) -> MountedVolume: + def _mount_volume(self, volume: Volume) -> MountedVolume: """ Given an unlocked LUKS volume, return MountedVolume object. @@ -422,12 +472,12 @@ def _mount_at_mountpoint(self, volume: Volume, mountpoint: str) -> MountedVolume subprocess.check_call(["sudo", "mount", mapped_device_path, mountpoint]) subprocess.check_call(["sudo", "chown", "-R", "user:user", mountpoint]) + return MountedVolume.from_volume(volume, mountpoint) + except subprocess.CalledProcessError as ex: logger.error(ex) raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex - return MountedVolume.from_volume(volume, mountpoint) - def write_data_to_device( self, submission_tmpdir: str, @@ -515,6 +565,21 @@ def _close_luks_volume(self, unlocked_device: Volume) -> None: logger.error("Error closing device") raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + def _close_veracrypt_volume(self, unlocked_device: MountedVolume) -> None: + """ + Helper. Close VeraCrypt volume. + """ + if os.path.exists(os.path.join("/dev/mapper", unlocked_device.mapped_name)): + logger.debug("Locking luks volume {}".format(unlocked_device)) + try: + subprocess.check_call( + ["sudo", "cryptsetup", "close", unlocked_device.mapped_name] + ) + + except subprocess.CalledProcessError as ex: + logger.error("Error closing device") + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + def _remove_temp_directory(self, tmpdir: str): """ Helper. Remove temporary directory used during archive export. diff --git a/securedrop_export/disk/service.py b/securedrop_export/disk/service.py index 1db9a83..3625218 100644 --- a/securedrop_export/disk/service.py +++ b/securedrop_export/disk/service.py @@ -1,11 +1,11 @@ import logging -from securedrop_export.archive import Archive - from .cli import CLI from .status import Status -from .volume import Volume, MountedVolume +from .volume import Volume, MountedVolume, EncryptionScheme +from securedrop_export.archive import Archive from securedrop_export.exceptions import ExportException +from typing import Optional logger = logging.getLogger(__name__) @@ -13,12 +13,13 @@ class Service: """ - Checks that can be performed against the device(s). + Actions that can be performed against USB device(s). This is the "API" portion of the export workflow. """ - def __init__(self, cli: CLI): - self.cli = cli + def __init__(self, submission: Archive, cli: CLI = None): + self.cli = cli or CLI() + self.submission = submission def scan_all_devices(self) -> Status: """ @@ -26,89 +27,123 @@ def scan_all_devices(self) -> Status: status. """ try: - all_devices = self.cli.get_connected_devices() - number_devices = len(all_devices) - - if number_devices == 0: - return Status.NO_DEVICE_DETECTED - elif number_devices > 1: - return Status.MULTI_DEVICE_DETECTED - else: - return self.scan_single_device(all_devices[0]) + status, _ = self._check_volumes(self.cli.get_all_volumes()) + return status except ExportException as ex: logger.error(ex) - return Status.DEVICE_ERROR # Could not assess devices + return Status.DEVICE_ERROR - def scan_single_device(self, blkid: str) -> Status: + def export(self) -> Status: """ - Given a string representing a single block device, see if it - is a suitable export target and return information about its state. + Export material to USB drive. """ try: - target = self.cli.get_partitioned_device(blkid) - - # See if it's a LUKS drive - if self.cli.is_luks_volume(target): - # Returns Volume or throws ExportException - self.volume = self.cli.get_luks_volume(target) - - # See if it's unlocked and mounted - if isinstance(self.volume, MountedVolume): - logger.debug("LUKS device is already mounted") - return Status.DEVICE_WRITABLE + volumes = self.cli.get_all_volumes() + status, target = self._check_volumes(volumes) + + if status == Status.DEVICE_WRITABLE: + return self._write_to_device(target, self.submission) + elif status == Status.DEVICE_LOCKED: + status, unlocked_volume = self._unlock_device( + self.submission.encryption_key, target + ) + if status == Status.DEVICE_WRITABLE: + return self._write_to_device(target, self.submission) else: - # Prompt for passphrase - return Status.DEVICE_LOCKED - else: - # Might be VeraCrypt, might be madness - logger.info("LUKS drive not found") - - # Currently we don't support anything other than LUKS. - # In future, we will support TC/VC volumes as well - return Status.INVALID_DEVICE_DETECTED + return status except ExportException as ex: - logger.error(ex) - if ex.sdstatus: - return ex.sdstatus - else: - return Status.DEVICE_ERROR + logger.error(f"Enountered {ex.sdstatus.value} while trying to export") + logger.debug(ex) + return ex.sdstatus.value - def unlock_device(self, passphrase: str, volume: Volume) -> Status: + def _check_volumes(self, all_volumes: [Volume]) -> (Status, Optional[Volume]): """ - Given provided passphrase, unlock target volume. Currently, - LUKS volumes are supported. + Check all potentially-compatible export devices (removable, + single-partition USB devices). + """ + number_devices = len(all_volumes) + if number_devices == 0: + return (Status.NO_DEVICE_DETECTED, None) + + # At some point we could consider returning all devices, so + # that the user can select their desired target device, but for + # now, only one attached device is supported. + elif number_devices > 1: + return (Status.MULTI_DEVICE_DETECTED, None) + else: + target_volume = all_volumes[0] + if isinstance(target_volume, MountedVolume): + logger.debug("Device is unlocked and mounted") + return (Status.DEVICE_WRITABLE, target_volume) + elif target_volume.encryption is EncryptionScheme.LUKS: + logger.debug("Device is locked LUKS drive") + return (Status.DEVICE_LOCKED, target_volume) + else: + logger.debug("Device status is unknown") + # This could be a locked VeraCrypt drive, or it could be an + # invalid drive (another encryption type). + # The client has to decide whether or not to try to use it + # (i.e. by prompting for passphrase) or to error. + # The simplest implementation will have the client error unless + # it is supplied with an already-unlocked VeraCrypt drive that + # it can use; a more sophisticated implementation might allow for + # a finite number of re-prompts before giving up, in case of + # user error with typing the password, and would return the volume + # (eg to print information about which drive failed). + return (Status.UKNOWN_DEVICE_DETECTED, target_volume) + + def _unlock_device( + self, passphrase: str, volume: Volume + ) -> (Status, Optional[Volume]): + """ + Given provided passphrase, unlock target volume. """ if volume: - try: - self.volume = self.cli.unlock_luks_volume(volume, passphrase) - - if isinstance(volume, MountedVolume): - return Status.DEVICE_WRITABLE - else: - return Status.ERROR_UNLOCK_LUKS + if volume.encryption is EncryptionScheme.LUKS: + try: + logger.info("Unlocking LUKS drive") + volume = self.cli.unlock_luks_volume(volume, passphrase) + if isinstance(volume, MountedVolume): + return (Status.DEVICE_WRITABLE, volume) + else: + return (Status.ERROR_UNLOCK_LUKS, volume) + + except ExportException as ex: + logger.error(ex) + return (Status.ERROR_UNLOCK_LUKS, volume) + + # Try to unlock another drive, opportunistically + # hoping it is VeraCrypt/TC. + else: + try: + logger.info( + "Encryption scheme is not LUKS. Attempt VeraCrypt unlock." + ) + volume = self.cli.attempt_unlock_veracrypt(volume, passphrase) + + if isinstance(volume, MountedVolume): + return (Status.DEVICE_WRITABLE, volume) + else: + # Might be VeraCrypt, might be madness + return (Status.ERROR_UNLOCK_GENERIC, volume) + except ExportException as ex: + logger.error(ex) + return (Status.ERROR_UNLOCK_GENERIC, volume) - except ExportException as ex: - logger.error(ex) - return Status.ERROR_UNLOCK_LUKS else: # Trying to unlock devices before having an active device logger.warning("Tried to unlock_device but no current volume detected.") - return Status.NO_DEVICE_DETECTED + return Status.NO_DEVICE_DETECTED, None - def write_to_device(self, volume: MountedVolume, data: Archive) -> Status: + def _write_to_device(self, volume: MountedVolume, data: Archive) -> Status: """ Export data to volume. CLI unmounts and locks volume on completion, even if export was unsuccessful. + + Calling method should handle ExportException. """ - try: - self.cli.write_data_to_device(data.tmpdir, data.target_dirname, volume) - return Status.SUCCESS_EXPORT - except ExportException as ex: - logger.error(ex) - if ex.sdstatus: - return ex.sdstatus - else: - return Status.ERROR_EXPORT + self.cli.write_data_to_device(data.tmpdir, data.target_dirname, volume) + return Status.SUCCESS_EXPORT diff --git a/securedrop_export/disk/status.py b/securedrop_export/disk/status.py index 7ce7139..eed4ea6 100644 --- a/securedrop_export/disk/status.py +++ b/securedrop_export/disk/status.py @@ -7,10 +7,13 @@ class Status(BaseStatus): "INVALID_DEVICE_DETECTED" # Multi partitioned, not encrypted, etc ) MULTI_DEVICE_DETECTED = "MULTI_DEVICE_DETECTED" # Not currently supported + UKNOWN_DEVICE_DETECTED = ( + "UNKNOWN_DEVICE_DETECTED" # Badly-formatted USB or VeraCrypt/TC + ) - DEVICE_LOCKED = "DEVICE_LOCKED" # One device detected, and it's locked + DEVICE_LOCKED = "DEVICE_LOCKED" # One valid device detected, and it's locked DEVICE_WRITABLE = ( - "DEVICE_WRITABLE" # One device detected, and it's unlocked (and mounted) + "DEVICE_WRITABLE" # One valid device detected, and it's unlocked (and mounted) ) ERROR_UNLOCK_LUKS = "ERROR_UNLOCK_LUKS" diff --git a/securedrop_export/disk/volume.py b/securedrop_export/disk/volume.py index 7eff8df..968874f 100644 --- a/securedrop_export/disk/volume.py +++ b/securedrop_export/disk/volume.py @@ -17,8 +17,8 @@ class Volume: """ A volume on a removable device. - Volumes have a device name ("/dev/sdX"), a mapped name ("/dev/mapper/xxx"), an encryption - scheme, and a mountpoint if they are mounted. + Volumes have a device name ("/dev/sdX"), and may have a mapped name + ("/dev/mapper/xxx"), an encryption scheme, and a mountpoint. """ def __init__( @@ -46,6 +46,7 @@ def encryption(self, scheme: EncryptionScheme): def unlocked(self) -> bool: return ( self.mapped_name is not None + and self.encryption is not None and self.encryption is not EncryptionScheme.UNKNOWN and os.path.exists( os.path.join(self.MAPPED_VOLUME_PREFIX, self.mapped_name) diff --git a/securedrop_export/main.py b/securedrop_export/main.py index bc55ae1..27d0478 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -11,8 +11,7 @@ from securedrop_export.directory import safe_mkdir from securedrop_export.exceptions import ExportException -from securedrop_export.disk import LegacyService as ExportService -from securedrop_export.disk import LegacyStatus +from securedrop_export.disk import Service as ExportService from securedrop_export.print import Service as PrintService from logging.handlers import TimedRotatingFileHandler, SysLogHandler @@ -125,7 +124,7 @@ def _configure_logging(): raise ExportException(sdstatus=Status.ERROR_LOGGING) from ex -def _start_service(submission: Archive) -> LegacyStatus: +def _start_service(submission: Archive) -> Status: """ Start print or export service. """ @@ -140,10 +139,11 @@ def _start_service(submission: Archive) -> LegacyStatus: # Export routines elif submission.command is Command.EXPORT: return ExportService(submission).export() - elif submission.command is Command.CHECK_USBS: - return ExportService(submission).check_connected_devices() - elif submission.command is Command.CHECK_VOLUME: - return ExportService(submission).check_disk_format() + elif ( + submission.command is Command.CHECK_USBS + or submission.command is Command.CHECK_VOLUME + ): + return ExportService(submission).scan_all_devices() # Unreachable raise ExportException( From e5ed236ff9edc80b81e397129101ab0374df58cf Mon Sep 17 00:00:00 2001 From: Ro Date: Fri, 3 Nov 2023 12:51:32 -0400 Subject: [PATCH 03/14] Delete legacy_status and legacy_service --- securedrop_export/disk/legacy_service.py | 156 ----------------------- securedrop_export/disk/legacy_status.py | 25 ---- 2 files changed, 181 deletions(-) delete mode 100644 securedrop_export/disk/legacy_service.py delete mode 100644 securedrop_export/disk/legacy_status.py diff --git a/securedrop_export/disk/legacy_service.py b/securedrop_export/disk/legacy_service.py deleted file mode 100644 index 3dbe6ac..0000000 --- a/securedrop_export/disk/legacy_service.py +++ /dev/null @@ -1,156 +0,0 @@ -import logging - -from securedrop_export.exceptions import ExportException - -from .cli import CLI -from .legacy_status import Status as LegacyStatus -from .status import Status as Status -from .volume import MountedVolume - -logger = logging.getLogger(__name__) - - -class Service: - def __init__(self, submission, cli=None): - self.submission = submission - self.cli = cli or CLI() - - def check_connected_devices(self) -> LegacyStatus: - """ - Check if single USB is inserted. - """ - logger.info("Export archive is usb-test") - - try: - all_devices = self.cli.get_connected_devices() - num_devices = len(all_devices) - - except ExportException as ex: - logger.error(f"Error encountered during USB check: {ex.sdstatus.value}") - # Use legacy status instead of new status values - raise ExportException(sdstatus=LegacyStatus.LEGACY_ERROR_USB_CHECK) from ex - - if num_devices == 0: - raise ExportException(sdstatus=LegacyStatus.LEGACY_USB_NOT_CONNECTED) - elif num_devices == 1: - return LegacyStatus.LEGACY_USB_CONNECTED - elif num_devices > 1: - raise ExportException( - sdstatus=LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - ) - else: - # Unreachable, num_devices is a non-negative integer, - # and we handled all possible cases already - raise ValueError(f"unreachable: num_devices is negative: {num_devices}") - - def check_disk_format(self) -> LegacyStatus: - """ - Check if volume is correctly formatted for export. - """ - try: - all_devices = self.cli.get_connected_devices() - - if len(all_devices) == 1: - device = self.cli.get_partitioned_device(all_devices[0]) - logger.info("Check if LUKS") - if not self.cli.is_luks_volume(device): - raise ExportException( - sdstatus=LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - ) - # We can support checking if a drive is already unlocked, but for - # backwards compatibility, this is the only expected status - # at this stage - return LegacyStatus.LEGACY_USB_ENCRYPTED - else: - logger.error("Multiple partitions not supported") - return LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - - except ExportException as ex: - logger.error( - f"Error encountered during disk format check: {ex.sdstatus.value}" - ) - # Return legacy status values for now for ongoing client compatibility - if ex.sdstatus in [s for s in Status]: - status = self._legacy_status(ex.sdstatus) - raise ExportException(sdstatus=status) - elif ex.sdstatus: - raise - else: - raise ExportException(sdstatus=LegacyStatus.LEGACY_USB_DISK_ERROR) - - def export(self): - """ - Export all files to target device. - """ - logger.info("Export archive is disk") - - try: - all_devices = self.cli.get_connected_devices() - - if len(all_devices) == 1: - device = self.cli.get_partitioned_device(all_devices[0]) - - # Decide what kind of volume it is - logger.info("Check if LUKS") - if self.cli.is_luks_volume(device): - volume = self.cli.get_luks_volume(device) - logger.info("Check if writable") - if not isinstance(volume, MountedVolume): - logger.info("Not writable-will try unlocking") - volume = self.cli.unlock_luks_volume( - volume, self.submission.encryption_key - ) - mounted_volume = self.cli.mount_volume(volume) - - logger.info(f"Export submission to {mounted_volume.mountpoint}") - self.cli.write_data_to_device( - self.submission.tmpdir, - self.submission.target_dirname, - mounted_volume, - ) - # This is SUCCESS_EXPORT, but the 0.7.0 client is not expecting - # a return status from a successful export operation. - # When the client is updated, we will return SUCCESS_EXPORT here. - - else: - # Another kind of drive: VeraCrypt/TC, or unsupported. - # For now this is an error--in future there will be support - # for additional encryption formats - logger.error(f"Export failed because {device} is not supported") - raise ExportException( - sdstatus=LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - ) - - except ExportException as ex: - logger.error( - f"Error encountered during disk format check: {ex.sdstatus.value}" - ) - # Return legacy status values for now for ongoing client compatibility - if ex.sdstatus in [s for s in Status]: - status = self._legacy_status(ex.sdstatus) - raise ExportException(sdstatus=status) - elif ex.sdstatus: - raise - else: - raise ExportException(sdstatus=LegacyStatus.LEGACY_ERROR_GENERIC) - - def _legacy_status(self, status: Status) -> LegacyStatus: - """ - Backwards-compatibility - status values that client (@0.7.0) is expecting. - """ - logger.info(f"Convert to legacy: {status.value}") - if status is Status.ERROR_MOUNT: - return LegacyStatus.LEGACY_ERROR_USB_MOUNT - elif status in [Status.ERROR_EXPORT, Status.ERROR_EXPORT_CLEANUP]: - return LegacyStatus.LEGACY_ERROR_USB_WRITE - elif status in [Status.ERROR_UNLOCK_LUKS, Status.ERROR_UNLOCK_GENERIC]: - return LegacyStatus.LEGACY_USB_BAD_PASSPHRASE - elif status in [ - Status.INVALID_DEVICE_DETECTED, - Status.MULTI_DEVICE_DETECTED, - ]: - return LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - # The other status values, such as Status.NO_DEVICE_DETECTED, are not returned by the - # CLI, so we don't need to check for them here - else: - return LegacyStatus.LEGACY_ERROR_GENERIC diff --git a/securedrop_export/disk/legacy_status.py b/securedrop_export/disk/legacy_status.py deleted file mode 100644 index 77f0fa6..0000000 --- a/securedrop_export/disk/legacy_status.py +++ /dev/null @@ -1,25 +0,0 @@ -from securedrop_export.status import BaseStatus - - -class Status(BaseStatus): - LEGACY_ERROR_GENERIC = "ERROR_GENERIC" - - # Legacy USB preflight related - LEGACY_USB_CONNECTED = "USB_CONNECTED" # Success - LEGACY_USB_NOT_CONNECTED = "USB_NOT_CONNECTED" - LEGACY_ERROR_USB_CHECK = "ERROR_USB_CHECK" - - # Legacy USB Disk preflight related errors - LEGACY_USB_ENCRYPTED = "USB_ENCRYPTED" # Success - LEGACY_USB_ENCRYPTION_NOT_SUPPORTED = "USB_ENCRYPTION_NOT_SUPPORTED" - - # Can be raised during disk format check - LEGACY_USB_DISK_ERROR = "USB_DISK_ERROR" - - # Legacy Disk export errors - LEGACY_USB_BAD_PASSPHRASE = "USB_BAD_PASSPHRASE" - LEGACY_ERROR_USB_MOUNT = "ERROR_USB_MOUNT" - LEGACY_ERROR_USB_WRITE = "ERROR_USB_WRITE" - - # New - SUCCESS_EXPORT = "SUCCESS_EXPORT" From cad5655e8a08dfbecb0e677ce99a3b18bab9ab4b Mon Sep 17 00:00:00 2001 From: Ro Date: Fri, 3 Nov 2023 12:51:57 -0400 Subject: [PATCH 04/14] Update tests to use new service and status files and add additional vc test coverage. Changes to VC unlock logic and service.py to mount LUKS drive immediately after unlocking. --- securedrop_export/disk/cli.py | 78 +++++++------- securedrop_export/disk/service.py | 48 +++++---- securedrop_export/disk/status.py | 2 +- securedrop_export/disk/volume.py | 4 +- securedrop_export/main.py | 2 +- tests/disk/test_cli.py | 66 +++++++----- tests/disk/test_service.py | 163 +++++++++++------------------- 7 files changed, 170 insertions(+), 193 deletions(-) diff --git a/securedrop_export/disk/cli.py b/securedrop_export/disk/cli.py index 2263d2e..5f402d1 100644 --- a/securedrop_export/disk/cli.py +++ b/securedrop_export/disk/cli.py @@ -86,12 +86,12 @@ def _get_removable_devices(self, attached_devices: List[str]) -> List[str]: logger.info(f"{len(usb_devices)} connected") return usb_devices - def get_all_volumes(self) -> [Volume]: + def get_all_volumes(self) -> List[Volume]: """ Returns a list of all currently-attached removable Volumes that are export device candidates, attempting to get as far towards export process as possible (i.e. probing if device is already unlocked and/or mounted, - and mounting it if appropriate.) + and mounting it if unlocked but unmounted.) Caller must handle ExportException. """ @@ -117,7 +117,9 @@ def get_all_volumes(self) -> [Volume]: Volume( device_name=blkid, encryption=EncryptionScheme.UNKNOWN, - mapped_name=None, + # This will be the name we use if + # trying to unlock the drive. + mapped_name=self._DEFAULT_VC_CONTAINER_NAME, ) ) @@ -255,7 +257,7 @@ def _get_luks_volume(self, device: str) -> Union[Volume, MountedVolume]: # use the existing mountpoint, or mount it ourselves. # Either way, return a MountedVolume. if os.path.exists(os.path.join("/dev/mapper/", mapped_name)): - return self._mount_volume(luks_volume) + return self.mount_volume(luks_volume) # It's still locked else: @@ -294,11 +296,8 @@ def unlock_luks_volume(self, volume: Volume, decryption_key: str) -> Volume: rc = p.returncode if rc == 0: - return Volume( - device_name=volume.device_name, - mapped_name=volume.mapped_name, - encryption=EncryptionScheme.LUKS, - ) + logger.debug("Successfully unlocked.") + return volume else: logger.error("Bad volume passphrase") raise ExportException(sdstatus=Status.ERROR_UNLOCK_LUKS) @@ -306,20 +305,18 @@ def unlock_luks_volume(self, volume: Volume, decryption_key: str) -> Volume: except subprocess.CalledProcessError as ex: raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex - def _get_dev_mapper_entries(self) -> [str]: + def _get_dev_mapper_entries(self) -> List[str]: """ Helper function to return a list of entries in /dev/mapper/ (excluding `system` and `dmroot`). """ try: - ls = subprocess.check_output( - ["ls", "/dev/mapper/"], stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - results = ls.decode().rstrip().split("\n") + ls = subprocess.check_output(["ls", "/dev/mapper/"], stderr=subprocess.PIPE) + entries = ls.decode().rstrip().split("\n") - return [r for r in results if r not in self._DEVMAPPER_SYSTEM] + return [r for r in entries if r not in _DEVMAPPER_SYSTEM] - except subprocess.CalledProcessError as ex: + except (subprocess.CalledProcessError, ValueError) as ex: logger.error(f"Error checking entries in /dev/mapper: {ex}") raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex @@ -327,7 +324,7 @@ def _attempt_get_unlocked_veracrypt_volume(self, device_name: str) -> MountedVol """ Looks for an already-unlocked VeraCrypt volume in /dev/mapper and see if the device ID matches given device name. - Raises ExportException on error. + Returns MountedVolume object if a drive is found. Otherwise, raises ExportException. """ try: for entry in self._get_dev_mapper_entries(): @@ -357,6 +354,14 @@ def _attempt_get_unlocked_veracrypt_volume(self, device_name: str) -> MountedVol else: return self._mount_at_mountpoint(vol, self._DEFAULT_MOUNTPOINT) + # If we got here, there is no unlocked VC drive present. Not an error, but not + # a state we can continue the workflow in, so raise ExportException. + logger.info("No unlocked Veracrypt drive found.") + raise ExportException(sdstatus=Status.UNKNOWN_DEVICE_DETECTED) + + # If we got here, there were no entries in `/dev/mapper/` for us to look at. + raise ExportException(sdstatus=Status.DEVICE_ERROR) + except subprocess.CalledProcessError as e: logger.error(e) raise ExportException(sdstatus=Status.DEVICE_ERROR) @@ -368,7 +373,7 @@ def attempt_unlock_veracrypt( Attempt to unlock and mount a presumed-Veracrypt drive at the default mountpoint. """ try: - with subprocess.Popen( + p = subprocess.Popen( [ "sudo", "cryptsetup", @@ -381,31 +386,26 @@ def attempt_unlock_veracrypt( ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, - ) as p: - p.communicate(input=str.encode(encryption_key, "utf-8")) - rc = p.returncode - - if rc == 0: - volume.encryption = EncryptionScheme.VERACRYPT + ) + p.communicate(input=str.encode(encryption_key, "utf-8")) + rc = p.returncode - # Mapped name is /dev/mapper/${self._DEFAULT_VC_CONTAINER_NAME}, since - # the /dev/mapper entry isn't derived from the header like a LUKS drive - volume.mapped_name = self._DEFAULT_VC_CONTAINER_NAME + if rc == 0: + volume.encryption = EncryptionScheme.VERACRYPT - return self._mount_volume(volume) + # Mapped name is /dev/mapper/${self._DEFAULT_VC_CONTAINER_NAME}, since + # the /dev/mapper entry isn't derived from the header like a LUKS drive + volume.mapped_name = self._DEFAULT_VC_CONTAINER_NAME + return self.mount_volume(volume) - else: - # Something was wrong and we could not unlock. - logger.error( - "Unlocking failed. Bad passphrase, or unsuitable volume." - ) - raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC) + else: + # Something was wrong and we could not unlock. + logger.error("Unlocking failed. Bad passphrase, or unsuitable volume.") + raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC) except subprocess.CalledProcessError as error: - # What kind of error message do we have? We may be able to get a little - # more granular about what the problem is. But basically, unlocking didn't work. - logger.error("Unlocking failed. Bad passphrase, or unsuitable volume.") - logger.error(error) + logger.error("Error during unlock/mount attempt.") + logger.debug(error) raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC) def _get_mountpoint(self, volume: Volume) -> Optional[str]: @@ -424,7 +424,7 @@ def _get_mountpoint(self, volume: Volume) -> Optional[str]: logger.error(ex) raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex - def _mount_volume(self, volume: Volume) -> MountedVolume: + def mount_volume(self, volume: Volume) -> MountedVolume: """ Given an unlocked LUKS volume, return MountedVolume object. diff --git a/securedrop_export/disk/service.py b/securedrop_export/disk/service.py index 3625218..49df835 100644 --- a/securedrop_export/disk/service.py +++ b/securedrop_export/disk/service.py @@ -5,7 +5,7 @@ from .volume import Volume, MountedVolume, EncryptionScheme from securedrop_export.archive import Archive from securedrop_export.exceptions import ExportException -from typing import Optional +from typing import List, Optional, Tuple logger = logging.getLogger(__name__) @@ -17,8 +17,8 @@ class Service: This is the "API" portion of the export workflow. """ - def __init__(self, submission: Archive, cli: CLI = None): - self.cli = cli or CLI() + def __init__(self, submission: Archive, cli: CLI = CLI()): + self.cli = cli self.submission = submission def scan_all_devices(self) -> Status: @@ -42,23 +42,36 @@ def export(self) -> Status: volumes = self.cli.get_all_volumes() status, target = self._check_volumes(volumes) - if status == Status.DEVICE_WRITABLE: + if not target: + logger.error(f"Could not export, no available volumes ({status.value})") + return status + + # If it's writable, it's a MountedVolume object + if status == Status.DEVICE_WRITABLE and isinstance(target, MountedVolume): return self._write_to_device(target, self.submission) elif status == Status.DEVICE_LOCKED: status, unlocked_volume = self._unlock_device( self.submission.encryption_key, target ) - if status == Status.DEVICE_WRITABLE: + if status == Status.DEVICE_WRITABLE and isinstance( + target, MountedVolume + ): return self._write_to_device(target, self.submission) else: return status + else: + logger.info(f"Could not export, volume check was {status.value}") + return status except ExportException as ex: - logger.error(f"Enountered {ex.sdstatus.value} while trying to export") logger.debug(ex) - return ex.sdstatus.value + status = ex.sdstatus if ex.sdstatus is not None else Status.ERROR_EXPORT + logger.error(f"Enountered {status.value} while trying to export") + return status - def _check_volumes(self, all_volumes: [Volume]) -> (Status, Optional[Volume]): + def _check_volumes( + self, all_volumes: List[Volume] + ) -> Tuple[Status, Optional[Volume]]: """ Check all potentially-compatible export devices (removable, single-partition USB devices). @@ -92,11 +105,11 @@ def _check_volumes(self, all_volumes: [Volume]) -> (Status, Optional[Volume]): # a finite number of re-prompts before giving up, in case of # user error with typing the password, and would return the volume # (eg to print information about which drive failed). - return (Status.UKNOWN_DEVICE_DETECTED, target_volume) + return (Status.UNKNOWN_DEVICE_DETECTED, target_volume) def _unlock_device( self, passphrase: str, volume: Volume - ) -> (Status, Optional[Volume]): + ) -> Tuple[Status, Optional[Volume]]: """ Given provided passphrase, unlock target volume. """ @@ -105,14 +118,14 @@ def _unlock_device( try: logger.info("Unlocking LUKS drive") volume = self.cli.unlock_luks_volume(volume, passphrase) - if isinstance(volume, MountedVolume): - return (Status.DEVICE_WRITABLE, volume) - else: - return (Status.ERROR_UNLOCK_LUKS, volume) - + if volume.unlocked: + logger.debug("Volume unlocked, attempt to mount") + # Returns MountedVolume or errors + return (Status.DEVICE_WRITABLE, self.cli.mount_volume(volume)) except ExportException as ex: logger.error(ex) - return (Status.ERROR_UNLOCK_LUKS, volume) + + return (Status.ERROR_UNLOCK_LUKS, volume) # Try to unlock another drive, opportunistically # hoping it is VeraCrypt/TC. @@ -135,7 +148,7 @@ def _unlock_device( else: # Trying to unlock devices before having an active device logger.warning("Tried to unlock_device but no current volume detected.") - return Status.NO_DEVICE_DETECTED, None + return (Status.NO_DEVICE_DETECTED, None) def _write_to_device(self, volume: MountedVolume, data: Archive) -> Status: """ @@ -144,6 +157,5 @@ def _write_to_device(self, volume: MountedVolume, data: Archive) -> Status: Calling method should handle ExportException. """ - self.cli.write_data_to_device(data.tmpdir, data.target_dirname, volume) return Status.SUCCESS_EXPORT diff --git a/securedrop_export/disk/status.py b/securedrop_export/disk/status.py index eed4ea6..9767c4a 100644 --- a/securedrop_export/disk/status.py +++ b/securedrop_export/disk/status.py @@ -7,7 +7,7 @@ class Status(BaseStatus): "INVALID_DEVICE_DETECTED" # Multi partitioned, not encrypted, etc ) MULTI_DEVICE_DETECTED = "MULTI_DEVICE_DETECTED" # Not currently supported - UKNOWN_DEVICE_DETECTED = ( + UNKNOWN_DEVICE_DETECTED = ( "UNKNOWN_DEVICE_DETECTED" # Badly-formatted USB or VeraCrypt/TC ) diff --git a/securedrop_export/disk/volume.py b/securedrop_export/disk/volume.py index 968874f..4a8aa60 100644 --- a/securedrop_export/disk/volume.py +++ b/securedrop_export/disk/volume.py @@ -17,8 +17,8 @@ class Volume: """ A volume on a removable device. - Volumes have a device name ("/dev/sdX"), and may have a mapped name - ("/dev/mapper/xxx"), an encryption scheme, and a mountpoint. + Volumes have a device name ("/dev/sdX"), a mapped name that represents their intended + name ("/dev/mapper/xxx"), an encryption scheme, and a mountpoint. """ def __init__( diff --git a/securedrop_export/main.py b/securedrop_export/main.py index 27d0478..53f3073 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -124,7 +124,7 @@ def _configure_logging(): raise ExportException(sdstatus=Status.ERROR_LOGGING) from ex -def _start_service(submission: Archive) -> Status: +def _start_service(submission: Archive) -> BaseStatus: """ Start print or export service. """ diff --git a/tests/disk/test_cli.py b/tests/disk/test_cli.py index 7989809..c751878 100644 --- a/tests/disk/test_cli.py +++ b/tests/disk/test_cli.py @@ -63,12 +63,12 @@ def _setup_usb_devices(self, mocker, disks, is_removable): # which matches what would happen if iterating through list of devices mocker.patch("subprocess.check_output", side_effect=is_removable) - def test_get_connected_devices(self, mocker): + def test__get_connected_devices(self, mocker): disks = [b"sda disk\n", b"sdb disk\n"] removable = [b"1\n", b"1\n"] self._setup_usb_devices(mocker, disks, removable) - result = self.cli.get_connected_devices() + result = self.cli._get_connected_devices() assert result[0] == "/dev/sda" and result[1] == "/dev/sdb" @@ -88,34 +88,34 @@ def test_get_removable_devices_none_removable(self, mocker): @mock.patch( "subprocess.Popen", side_effect=subprocess.CalledProcessError(1, "Popen") ) - def test_get_connected_devices_error(self, mocked_subprocess): + def test__get_connected_devices_error(self, mocked_subprocess): with pytest.raises(ExportException): - self.cli.get_connected_devices() + self.cli._get_connected_devices() @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_NO_PART) - def test_get_partitioned_device_no_partition(self, mocked_call): + def test__get_partitioned_device_no_partition(self, mocked_call): assert ( - self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) == _DEFAULT_USB_DEVICE + self.cli._get_partitioned_device(_DEFAULT_USB_DEVICE) == _DEFAULT_USB_DEVICE ) @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_ONE_PART) - def test_get_partitioned_device_one_partition(self, mocked_call): + def test__get_partitioned_device_one_partition(self, mocked_call): assert ( - self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) + self.cli._get_partitioned_device(_DEFAULT_USB_DEVICE) == _DEFAULT_USB_DEVICE + "1" ) @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_MULTI_PART) - def test_get_partitioned_device_multi_partition(self, mocked_call): + def test__get_partitioned_device_multi_partition(self, mocked_call): with pytest.raises(ExportException) as ex: - self.cli.get_partitioned_device(_SAMPLE_OUTPUT_MULTI_PART) + self.cli._get_partitioned_device(_SAMPLE_OUTPUT_MULTI_PART) assert ex.value.sdstatus is Status.INVALID_DEVICE_DETECTED @mock.patch("subprocess.check_output", return_value=None) - def test_get_partitioned_device_lsblk_error(self, mocked_subprocess): + def test__get_partitioned_device_lsblk_error(self, mocked_subprocess): with pytest.raises(ExportException) as ex: - self.cli.get_partitioned_device(_SAMPLE_OUTPUT_ONE_PART) + self.cli._get_partitioned_device(_SAMPLE_OUTPUT_ONE_PART) assert ex.value.sdstatus is Status.DEVICE_ERROR @@ -123,10 +123,10 @@ def test_get_partitioned_device_lsblk_error(self, mocked_subprocess): "subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "check_output"), ) - def test_get_partitioned_device_multi_partition_error(self, mocked_call): + def test__get_partitioned_device_multi_partition_error(self, mocked_call): # Make sure we wrap CalledProcessError and throw our own exception with pytest.raises(ExportException) as ex: - self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) + self.cli._get_partitioned_device(_DEFAULT_USB_DEVICE) assert ex.value.sdstatus is Status.DEVICE_ERROR @@ -186,16 +186,16 @@ def test__get_luks_name_from_headers_error(self, mocked_subprocess): @mock.patch("os.path.exists", return_value=True) @mock.patch("subprocess.check_output", return_value=_SAMPLE_LUKS_HEADER) - def test_get_luks_volume_already_unlocked(self, mocked_subprocess, mocked_os_call): - result = self.cli.get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) + def test__get_luks_volume_already_unlocked(self, mocked_subprocess, mocked_os_call): + result = self.cli._get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) assert result.encryption is EncryptionScheme.LUKS assert result.unlocked @mock.patch("os.path.exists", return_value=False) @mock.patch("subprocess.check_output", return_value=_SAMPLE_LUKS_HEADER) - def test_get_luks_volume_still_locked(self, mocked_subprocess, mocked_os_call): - result = self.cli.get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) + def test__get_luks_volume_still_locked(self, mocked_subprocess, mocked_os_call): + result = self.cli._get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) assert result.encryption is EncryptionScheme.LUKS assert not result.unlocked @@ -204,9 +204,9 @@ def test_get_luks_volume_still_locked(self, mocked_subprocess, mocked_os_call): "subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "check_output"), ) - def test_get_luks_volume_error(self, mocked_subprocess): + def test__get_luks_volume_error(self, mocked_subprocess): with pytest.raises(ExportException) as ex: - self.cli.get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) + self.cli._get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) assert ex.value.sdstatus is Status.DEVICE_ERROR @@ -215,6 +215,7 @@ def test_unlock_luks_volume_success(self, mock_path, mocker): mock_popen = mocker.MagicMock() mock_popen_communicate = mocker.MagicMock() mock_popen.returncode = 0 + mock_popen.return_value = (("stdout", "stderr"), 0) mocker.patch("subprocess.Popen", return_value=mock_popen) mocker.patch( @@ -232,7 +233,7 @@ def test_unlock_luks_volume_success(self, mock_path, mocker): assert result.unlocked @mock.patch("os.path.exists", return_value=True) - def test_unlock_luks_volume_not_luks(self, mocker): + def test_unlock_luks_volume_unknown(self, mocker): mock_popen = mocker.MagicMock() mock_popen.communicate = mocker.MagicMock() mock_popen.communicate.returncode = 1 # An error unlocking @@ -287,7 +288,7 @@ def test_unlock_luks_volume_luksOpen_exception(self, mocked_subprocess): @mock.patch("os.path.exists", return_value=True) @mock.patch("subprocess.check_output", return_value=b"\n") @mock.patch("subprocess.check_call", return_value=0) - def test_mount_volume(self, mocked_call, mocked_output, mocked_path): + def testmount_volume(self, mocked_call, mocked_output, mocked_path): vol = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, @@ -302,9 +303,7 @@ def test_mount_volume(self, mocked_call, mocked_output, mocked_path): "subprocess.check_output", return_value=b"/dev/pretend/luks-id-123456\n" ) @mock.patch("subprocess.check_call", return_value=0) - def test_mount_volume_already_mounted( - self, mocked_output, mocked_call, mocked_path - ): + def testmount_volume_already_mounted(self, mocked_output, mocked_call, mocked_path): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, @@ -317,7 +316,7 @@ def test_mount_volume_already_mounted( @mock.patch("os.path.exists", return_value=True) @mock.patch("subprocess.check_output", return_value=b"\n") @mock.patch("subprocess.check_call", return_value=0) - def test_mount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path): + def testmount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, @@ -332,7 +331,7 @@ def test_mount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path) "subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call"), ) - def test_mount_volume_error(self, mocked_subprocess, mocked_output): + def testmount_volume_error(self, mocked_subprocess, mocked_output): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, @@ -565,3 +564,16 @@ def test_mount_mkdir_fails(self, mocked_path): self.cli.mount_volume(vol) assert ex.value.sdstatus is Status.ERROR_MOUNT + + def test_mount_fails_with_locked_device(self): + vol = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + mock.patch.object(vol, "unlocked", return_value=False) + + with pytest.raises(ExportException) as ex: + self.cli.mount_volume(vol) + + assert ex.value.sdstatus == Status.ERROR_MOUNT diff --git a/tests/disk/test_service.py b/tests/disk/test_service.py index d7053e1..9843ca7 100644 --- a/tests/disk/test_service.py +++ b/tests/disk/test_service.py @@ -1,14 +1,12 @@ -import pytest from unittest import mock import os import tempfile from securedrop_export.exceptions import ExportException -from securedrop_export.disk.legacy_status import Status as LegacyStatus -from securedrop_export.disk.status import Status as Status +from securedrop_export.disk.status import Status from securedrop_export.disk.volume import Volume, MountedVolume, EncryptionScheme from securedrop_export.archive import Archive, Metadata -from securedrop_export.disk.legacy_service import Service +from securedrop_export.disk.service import Service from securedrop_export.disk.cli import CLI SAMPLE_OUTPUT_LSBLK_NO_PART = b"disk\ncrypt" # noqa @@ -19,7 +17,7 @@ class TestExportService: @classmethod def setup_class(cls): - cls.mock_cli = mock.MagicMock(CLI) + cls.mock_cli = mock.MagicMock(spec=CLI) cls.mock_submission = cls._setup_submission() cls.mock_luks_volume_unmounted = Volume( @@ -34,6 +32,19 @@ def setup_class(cls): encryption=EncryptionScheme.LUKS, ) + cls.mock_vc_volume_mounted = MountedVolume( + device_name=SAMPLE_OUTPUT_USB, + mapped_name="mock-veracrypt-vol", + encryption=EncryptionScheme.VERACRYPT, + mountpoint="/media/usb/", + ) + + cls.mock_vc_volume_unmounted = Volume( + device_name=SAMPLE_OUTPUT_USB, + mapped_name=None, + encryption=EncryptionScheme.UNKNOWN, + ) + cls.service = Service(cls.mock_submission, cls.mock_cli) @classmethod @@ -65,139 +76,81 @@ def setup_method(self, method): errors. `teardown_method()` will reset the side effects so they do not affect subsequent test methods. """ - self.mock_cli.get_connected_devices.return_value = [SAMPLE_OUTPUT_USB] - self.mock_cli.get_partitioned_device.return_value = ( - SAMPLE_OUTPUT_USB_PARTITIONED - ) - self.mock_cli.get_luks_volume.return_value = self.mock_luks_volume_unmounted + self.mock_cli._get_luks_volume.return_value = self.mock_luks_volume_unmounted self.mock_cli.mount_volume.return_value = self.mock_luks_volume_mounted + self.mock_cli.get_all_volumes.return_value = [self.mock_luks_volume_unmounted] def teardown_method(self, method): self.mock_cli.reset_mock(return_value=True, side_effect=True) - def test_check_usb(self): - status = self.service.check_connected_devices() + def test_scan_all_device_is_locked_luks(self): + status = self.service.scan_all_devices() - assert status is LegacyStatus.LEGACY_USB_CONNECTED + assert status == Status.DEVICE_LOCKED - def test_no_devices_connected(self): - self.mock_cli.get_connected_devices.return_value = [] - with pytest.raises(ExportException) as ex: - self.service.check_connected_devices() + def test_scan_all_no_devices_connected(self): + self.mock_cli.get_all_volumes.return_value = [] - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_NOT_CONNECTED + assert self.service.scan_all_devices() == Status.NO_DEVICE_DETECTED - def test_too_many_devices_connected(self): - self.mock_cli.get_connected_devices.return_value = [ + def test_scan_all_too_many_devices_connected(self): + self.mock_cli.get_all_volumes.return_value = [ SAMPLE_OUTPUT_USB, "/dev/sdb", ] - with pytest.raises(ExportException) as ex: - self.service.check_connected_devices() - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + assert self.service.scan_all_devices() == Status.MULTI_DEVICE_DETECTED - def test_device_is_not_luks(self): - self.mock_cli.is_luks_volume.return_value = False + def test_scan_all_devices_error(self): + self.mock_cli.get_all_volumes.side_effect = ExportException("zounds!") - # When VeraCrypt is supported, this will no longer be an exception - # and the return status will change - with pytest.raises(ExportException) as ex: - self.service.check_disk_format() + assert self.service.scan_all_devices() == Status.DEVICE_ERROR - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + def test_scan_all_device_is_not_luks_and_unlocked(self): + self.mock_cli.get_all_volumes.return_value = [self.mock_vc_volume_mounted] - def test_check_usb_error(self): - self.mock_cli.get_connected_devices.side_effect = ExportException( - sdstatus=LegacyStatus.LEGACY_ERROR_USB_CHECK - ) + assert self.service.scan_all_devices() == Status.DEVICE_WRITABLE - with pytest.raises(ExportException) as ex: - self.service.check_connected_devices() + def test_scan_all_device_is_not_luks_and_not_unlocked_error(self): + self.mock_cli.get_all_volumes.return_value = [self.mock_vc_volume_unmounted] - assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_CHECK + assert self.service.scan_all_devices() == Status.UNKNOWN_DEVICE_DETECTED - def test_check_disk_format(self): - status = self.service.check_disk_format() - - assert status is LegacyStatus.LEGACY_USB_ENCRYPTED - - def test_check_disk_format_error(self): - self.mock_cli.get_partitioned_device.side_effect = ExportException( - sdstatus=Status.INVALID_DEVICE_DETECTED + self.mock_cli.get_all_volumes.side_effect = ExportException( + sdstatus=Status.DEVICE_ERROR ) - with pytest.raises(ExportException) as ex: - self.service.check_disk_format() - - # We still return the legacy status for now - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + assert self.service.scan_all_devices() == Status.DEVICE_ERROR - def test_export(self): - # Currently, a successful export does not return a success status. - # When the client is updated, this will change to assert EXPORT_SUCCESS - # is returned. - self.service.export() + def test_scan_all_device_is_locked_veracrypt_volume(self): + self.mock_cli.get_all_volumes.return_value = [self.mock_vc_volume_unmounted] - def test_export_disk_not_supported(self): - self.mock_cli.is_luks_volume.return_value = False + assert self.service.scan_all_devices() == Status.UNKNOWN_DEVICE_DETECTED - with pytest.raises(ExportException) as ex: - self.service.export() + def test_export_already_mounted_no_cleanup(self): + self.mock_cli.get_all_volumes.return_value = [self.mock_luks_volume_mounted] + mock_write = mock.MagicMock() + self.mock_cli.write_data_to_device = mock_write + result = self.service.export() - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - - def test_export_write_error(self): - self.mock_cli.is_luks_volume.return_value = True - self.mock_cli.write_data_to_device.side_effect = ExportException( - sdstatus=LegacyStatus.LEGACY_ERROR_USB_WRITE + assert result == Status.SUCCESS_EXPORT + mock_write.assert_called_once_with( + self.mock_submission.tmpdir, + self.mock_submission.target_dirname, + self.mock_luks_volume_mounted, ) - with pytest.raises(ExportException) as ex: - self.service.export() - - assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_WRITE - - def test_export_throws_new_exception_return_legacy_status(self): - self.mock_cli.get_connected_devices.side_effect = ExportException( - sdstatus=Status.ERROR_MOUNT - ) - - with pytest.raises(ExportException) as ex: - self.service.export() - - assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_MOUNT - @mock.patch("os.path.exists", return_value=True) - def test_write_error_returns_legacy_status(self, mock_path): - self.mock_cli.is_luks_volume.return_value = True + def test_export_write_error(self, mock_path): + self.mock_cli.get_all_volumes.return_value = [self.mock_luks_volume_mounted] self.mock_cli.write_data_to_device.side_effect = ExportException( sdstatus=Status.ERROR_EXPORT ) - with pytest.raises(ExportException) as ex: - self.service.export() - - assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_WRITE - - @mock.patch("os.path.exists", return_value=True) - def test_unlock_error_returns_legacy_status(self, mock_path): - self.mock_cli.unlock_luks_volume.side_effect = ExportException( - sdstatus=Status.ERROR_UNLOCK_LUKS - ) - - with pytest.raises(ExportException) as ex: - self.service.export() - - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_BAD_PASSPHRASE + assert self.service.export() == Status.ERROR_EXPORT @mock.patch("os.path.exists", return_value=True) - def test_unexpected_error_returns_legacy_status_generic(self, mock_path): - self.mock_cli.unlock_luks_volume.side_effect = ExportException( - sdstatus=Status.DEVICE_ERROR - ) - - with pytest.raises(ExportException) as ex: - self.service.export() + def test_export_unlock_error(self, mock_path): + self.mock_cli.unlock_luks_volume.side_effect = ExportException("oh no!") - assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_GENERIC + assert self.service.export() == Status.ERROR_UNLOCK_LUKS From 39cb393bea69cf25a97548312ab4d4eff8f6e9aa Mon Sep 17 00:00:00 2001 From: Ro Date: Thu, 23 Nov 2023 14:43:32 -0500 Subject: [PATCH 05/14] Add sample export tarballs and update test_main to use them --- securedrop_export/main.py | 5 +- tests/files/sample_export.sd_export | Bin 0 -> 714 bytes tests/files/sample_print.sd_export | Bin 0 -> 754 bytes tests/test_main.py | 158 ++++++++++++++++------------ 4 files changed, 92 insertions(+), 71 deletions(-) create mode 100644 tests/files/sample_export.sd_export create mode 100644 tests/files/sample_print.sd_export diff --git a/securedrop_export/main.py b/securedrop_export/main.py index 53f3073..e62bf00 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -42,6 +42,8 @@ def entrypoint(): Non-zero exit values will cause the system to try alternative solutions for mimetype handling, which we want to avoid. + + The program is called with the archive name as the first argument. """ status, submission = None, None @@ -53,7 +55,8 @@ def entrypoint(): # Halt if target file is absent if not os.path.exists(data_path): - logger.info("Archive is not found {}.".format(data_path)) + logger.error("Archive not found at provided path.") + logger.debug("Archive missing, path: {}".format(data_path)) status = Status.ERROR_FILE_NOT_FOUND else: diff --git a/tests/files/sample_export.sd_export b/tests/files/sample_export.sd_export new file mode 100644 index 0000000000000000000000000000000000000000..dab6433525ebaf9599ddb292ebad04665e3da54b GIT binary patch literal 714 zcmV;*0yX^~iwFP!000001MQd1j@mE~$2s#9v+1E1HsFvD39Tyi(kIwsrIm??BuipP zwzC0MmHH5U!ahmIVc8WcEvOZ+{qRpAfy~$*GX9U9a6C9W6C#~V+?A!t-gU=-#mSVh zgt2r+_tP|*4j?&yXoEp(EY|>ZMSa-su>CJ39OJrC+TICn`I+WIpZ_E|mj8&dPA9t0 zq9~d9{9l6o{7|+&sdZ&%74tr9VVv?;x+#B{NEO(0Xnz@ zT=TlA5Y!SlG!-u;SX^6JmUOdiD#baW%qLORmb_A`wr)OU-OTM7`BDl;^!(sz9O6Uqj@d%KL8c=_>RMPb* zR4H>B$hF`3Y0MN=C;>~YYCtKROF#53*bykbwOYP%){Z)G;cme24k$QIu6g0y@PWED zWiBbOj7tl;G^^*KjS7cvowAd%uYaq#G?^|Nd&(McO#f*dz1aUH(PZlNe+kTX&EHM} z=(MspA3kjgXais@Da{z3Q=KSnp<4RQ^{Q>IhSxoZydPa@cOf7vxaOHfZNRiy;bixD z!W1j+Tnj@y6$VMu+nsj%&2si9fujLVuPy*6TlxU)4XOcf-B+e-_1z zdHr94FM+^^GQ(gFfuN->povE7x85{v&Ap?^Q7GYzRl71Tj4RwY%SU*Z>nXR-mWCVC w6q*}^Tp?{5P~%nb_1{`KJsyw83_st`XXTmi>SZsImUT~a0P z=omv`z?LGx#D+u}+ZY?$7$AP60TU;aY{p3>xCh?=w?I;;MA7J&)OH&9Q6)!?^L?)0 zzMq|n{i&`a#psfhw6dOY7ET@hDtXLBS1XWQwo=|A8AbuKxwg_CfrC4nS2nSS}&p761h060{wFQ5A!N9d}mBB^R3F zUQd_PVOsrf_G|}(%btr-PPlg9?7>5a zFCIB@?D)XRk5^%Pgco$ z7A%tI=!5-ZH^fsEMW8`&ICz}GjMeoqNbTXQw)AatgrA<0TXXrSq z-w?K@STIl4S@0&Bpw}C@>MWdO!Cdnjv@DU29Gnx*$0acbn!DwW1Jyp5&wFUdh8URx z7J{bj6#Goq5Q|sDV!7B1V4@O=M{pf>r-P3S4`II{#a{bqztLL%e_6>6=Gf`{r)0Cu z`mf3vHLm}vn&tJsC!F(&mADuIY(Wf$-3l;W=O_jS^bmk~w}Rt42zYkUim%+nd_&L} z26<$_ctsjdSa1jd$Hi77jteZ?M9GoTae-+U3doDCn#6a24~LU6+omaa2m!y0kYTlr z9>);5%R7b`zw=?3m|wi)yG~bfH2klse}4Z{vaSFB$Vx`fa{u>&k+d0~OAM6GfplYM kMP7Q8`-{Wja5x+ehr{7;I2;a#!}%xp4g~bamjEaL0Nz}4zyJUM literal 0 HcmV?d00001 diff --git a/tests/test_main.py b/tests/test_main.py index 06da9ef..08ceeb8 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,13 +1,13 @@ import pytest -import tempfile -import os from unittest import mock import shutil +from pathlib import Path from securedrop_export.archive import Archive, Metadata, Status as ArchiveStatus from securedrop_export.status import BaseStatus from securedrop_export.command import Command from securedrop_export.exceptions import ExportException +from securedrop_export.disk.status import Status as ExportStatus from securedrop_export.main import ( Status, @@ -18,107 +18,123 @@ _configure_logging, ) -SUBMISSION_SAMPLE_ARCHIVE = "pretendfile.tar.gz" +_PRINT_SAMPLE_ARCHIVE = "sample_print.sd_export" +_EXPORT_SAMPLE_ARCHIVE = "sample_export.sd_export" class TestMain: def setup_method(self, method): - # This can't be a class method, since we expect sysexit during this test suite, - # which - self.submission = Archive("pretendfile.tar.gz") - assert os.path.exists(self.submission.tmpdir) + # These can't be class setup methods, since we expect sysexit during this test suite + self.print_archive_path = Path.cwd() / "tests/files" / _PRINT_SAMPLE_ARCHIVE + assert self.print_archive_path.exists() + + self.export_archive_path = Path.cwd() / "tests/files" / _EXPORT_SAMPLE_ARCHIVE + assert self.export_archive_path.exists() + + self.print_submission = Archive(str(self.print_archive_path)) + assert Path(self.print_submission.tmpdir).exists() + + self.export_submission = Archive(str(self.export_archive_path)) + assert Path(self.export_submission.tmpdir).exists() def teardown_method(self, method): - if os.path.exists(self.submission.tmpdir): - shutil.rmtree(self.submission.tmpdir) - self.submission = None + if Path(self.print_submission.tmpdir).exists(): + shutil.rmtree(self.print_submission.tmpdir) + + if Path(self.export_submission.tmpdir).exists(): + shutil.rmtree(self.export_submission.tmpdir) - def test_exit_gracefully_no_exception(self, capsys): + def _did_exit_gracefully(self, exit, capsys, status: BaseStatus) -> bool: + """ + Helper. True if exited with 0, writing supplied status to stderr. + """ + captured = capsys.readouterr() + + return ( + exit.value.code == 0 + and captured.err.rstrip().endswith(status.value) + and captured.out == "" + ) + + def test__exit_gracefully_no_exception(self, capsys): with pytest.raises(SystemExit) as sysexit: - _exit_gracefully(self.submission, Status.ERROR_GENERIC) + # `ERROR_GENERIC` is just a placeholder status here + _exit_gracefully(self.print_submission, Status.ERROR_GENERIC) assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_GENERIC) - def test_exit_gracefully_exception(self, capsys): - with pytest.raises(SystemExit) as sysexit: - _exit_gracefully(self.submission, Status.ERROR_GENERIC) + @mock.patch( + "securedrop_export.main.shutil.rmtree", + side_effect=FileNotFoundError("oh no", 0), + ) + def test__exit_gracefully_even_with_cleanup_exception(self, mock_rm, capsys): + with mock.patch( + "sys.argv", ["qvm-send-to-usb", self.export_archive_path] + ), mock.patch( + "securedrop_export.main._start_service", + return_value=ExportStatus.SUCCESS_EXPORT, + ), pytest.raises( + SystemExit + ) as sysexit: + entrypoint() - # A graceful exit means a return code of 0 assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_GENERIC) + def test_entrypoint_success(self, capsys): + with mock.patch( + "sys.argv", ["qvm-send-to-usb", self.export_archive_path] + ), mock.patch( + "securedrop_export.main._start_service", + return_value=ExportStatus.SUCCESS_EXPORT, + ), pytest.raises( + SystemExit + ) as sysexit: + entrypoint() + + assert self._did_exit_gracefully(sysexit, capsys, ExportStatus.SUCCESS_EXPORT) + @pytest.mark.parametrize("status", [s for s in Status]) - def test_write_status(self, status, capsys): + def test__write_status_success(self, status, capsys): _write_status(status) captured = capsys.readouterr() assert captured.err == status.value + "\n" @pytest.mark.parametrize("invalid_status", ["foo", ";ls", "&& echo 0", None]) - def test_write_status_error(self, invalid_status, capsys): + def test__write_status_will_not_write_bad_value(self, invalid_status, capsys): with pytest.raises(ValueError): _write_status(Status(invalid_status)) - def _did_exit_gracefully(self, exit, capsys, status: BaseStatus) -> bool: - """ - Helper. True if exited with 0, writing supplied status to stderr. - """ captured = capsys.readouterr() + assert captured.err == "" + assert captured.out == "" - return ( - exit.value.code == 0 - and captured.err.rstrip().endswith(status.value) - and captured.out == "" - ) - - @pytest.mark.parametrize("command", list(Command)) - @mock.patch("securedrop_export.main._configure_logging") - @mock.patch("os.path.exists", return_value=True) - def test_entrypoint_success_start_service(self, mock_log, mock_path, command): - metadata = os.path.join(self.submission.tmpdir, Metadata.METADATA_FILE) - - with open(metadata, "w") as f: - f.write(f'{{"device": "{command.value}", "encryption_method": "luks"}}') - + def test_entrypoint_success_start_service(self): with mock.patch( - "sys.argv", ["qvm-send-to-usb", SUBMISSION_SAMPLE_ARCHIVE] + "sys.argv", ["qvm-send-to-usb", self.export_archive_path] ), mock.patch( "securedrop_export.main._start_service" - ) as mock_service, mock.patch( - "securedrop_export.main.Archive.extract_tarball", - return_value=self.submission, - ), pytest.raises( + ) as mock_service, pytest.raises( SystemExit ): entrypoint() - if command is not Command.START_VM: - assert self.submission.command == command - assert mock_service.call_args[0][0].archive == SUBMISSION_SAMPLE_ARCHIVE - mock_service.assert_called_once_with(self.submission) - - def test_valid_printer_test_config(self, capsys): - Archive("testfile") - temp_folder = tempfile.mkdtemp() - metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) - with open(metadata, "w") as f: - f.write('{"device": "printer-test"}') + assert mock_service.call_args[0][0].archive == self.export_archive_path + assert mock_service.call_args[0][0].command == Command.EXPORT - config = Metadata(temp_folder).validate() + def test_validate_metadata(self): + for archive_path in [self.print_archive_path, self.export_archive_path]: + archive = Archive(archive_path) + extracted = archive.extract_tarball() - assert config.encryption_key is None - assert config.encryption_method is None + assert Metadata(extracted.tmpdir).validate() @mock.patch( "securedrop_export.archive.safe_extractall", side_effect=ValueError("A tarball problem!"), ) - @mock.patch("securedrop_export.main.os.path.exists", return_value=True) - @mock.patch("securedrop_export.main.shutil.rmtree") - @mock.patch("securedrop_export.main._configure_logging") - def test_entrypoint_failure_extraction( - self, mock_log, mock_rm, mock_path, mock_extract, capsys - ): + def test_entrypoint_failure_extraction(self, mock_extract, capsys): with mock.patch( - "sys.argv", ["qvm-send-to-usb", SUBMISSION_SAMPLE_ARCHIVE] + "sys.argv", ["qvm-send-to-usb", self.export_archive_path] ), pytest.raises(SystemExit) as sysexit: entrypoint() @@ -149,9 +165,10 @@ def test_entrypoint_fails_unexpected(self, mock_mkdir, capsys): assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_GENERIC) - @mock.patch("os.path.exists", return_value=False) - def test_entrypoint_archive_path_fails(self, mock_path, capsys): - with pytest.raises(SystemExit) as sysexit: + def test_entrypoint_archive_path_fails(self, capsys): + with mock.patch( + "sys.argv", ["qvm-send-to-usb", "THIS_FILE_DOES_NOT_EXIST.sd_export"] + ), pytest.raises(SystemExit) as sysexit: entrypoint() assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_FILE_NOT_FOUND) @@ -171,14 +188,15 @@ def test__start_service_calls_correct_services(self, command): if command is Command.START_VM: pytest.skip("Command does not start a service") - self.submission.command = command + mock_submission = Archive("mock_submission.sd_export") + mock_submission.command = command with mock.patch("securedrop_export.main.PrintService") as ps, mock.patch( "securedrop_export.main.ExportService" ) as es: - _start_service(self.submission) + _start_service(mock_submission) if command in [Command.PRINT, Command.PRINTER_TEST, Command.PRINTER_PREFLIGHT]: - assert ps.call_args[0][0] is self.submission + assert ps.call_args[0][0] is mock_submission else: - assert es.call_args[0][0] is self.submission + assert es.call_args[0][0] is mock_submission From 6ce9855db12478d2bd93da888bb27e08e84d50c5 Mon Sep 17 00:00:00 2001 From: Ro Date: Tue, 28 Nov 2023 18:44:46 -0500 Subject: [PATCH 06/14] Check /dev/mapper + use cryptsetup status to detect veracrypt drives. --- securedrop_export/disk/cli.py | 102 +++++++++++++++++++++++----------- tests/disk/test_cli.py | 70 +++++++++++++++++++++-- 2 files changed, 135 insertions(+), 37 deletions(-) diff --git a/securedrop_export/disk/cli.py b/securedrop_export/disk/cli.py index 5f402d1..6652747 100644 --- a/securedrop_export/disk/cli.py +++ b/securedrop_export/disk/cli.py @@ -312,7 +312,7 @@ def _get_dev_mapper_entries(self) -> List[str]: """ try: ls = subprocess.check_output(["ls", "/dev/mapper/"], stderr=subprocess.PIPE) - entries = ls.decode().rstrip().split("\n") + entries = ls.decode("utf-8").rstrip().split("\n") return [r for r in entries if r not in _DEVMAPPER_SYSTEM] @@ -322,49 +322,85 @@ def _get_dev_mapper_entries(self) -> List[str]: def _attempt_get_unlocked_veracrypt_volume(self, device_name: str) -> MountedVolume: """ - Looks for an already-unlocked VeraCrypt volume in /dev/mapper and see if the - device ID matches given device name. + Looks for an already-unlocked volume in /dev/mapper to see if the name matches + given device name. Returns MountedVolume object if a drive is found. Otherwise, raises ExportException. """ try: - for entry in self._get_dev_mapper_entries(): - res = subprocess.check_output( - [ - "lsblk", - "--noheadings", - "-o", - "NAME,TYPE,MOUNTPOINT", - f"/dev/mapper/{entry}", - ] + devmapper_entries = self._get_dev_mapper_entries() + for item in devmapper_entries: + # Check it out with cryptsetup, see if it's a VeraCrypt/TrueCrypt drive. + # Example format (some lines ommitted for brevity): + # + # b'/dev/mapper/vc is active and is in use.\n type: TCRYPT\n cipher: + # aes-xts-plain64\n keysize: 512 bits\n key location: dm-crypt\n device: + # /dev/sdc\n sector size: 512\noffset: 256 sectors\n size: + # 1968640 sectors\n skipped: 256 sectors\n mode: read/write\n' + # + # (A mapped entry can also have a null device, if it wasn't properly removed + # from /dev/mapper using `cryptsetup close`.) + status = ( + subprocess.check_output( + ["sudo", "cryptsetup", "status", f"/dev/mapper/{item}"] + ) + .decode("utf-8") + .split("\n ") ) - name, crypt_type, mountpoint = ( - res.decode().rstrip().split() - ) # Space-separated - - # Notes for our future selves: there is also a `tcrypt-system` identifier. - # It *should* only be used for FDE with TrueCrypt, not for a non-bootable drive. - if crypt_type == "tcrypt" and name == device_name: - vol = Volume( - device_name=name, - mapped_name=entry, + + logger.debug(f"{status}") + + if "type: TCRYPT" in status and f"device: {device_name}" in status: + logger.info("Unlocked VeraCrypt volume detected") + volume = Volume( + device_name=device_name, + mapped_name=item, encryption=EncryptionScheme.VERACRYPT, ) + + # Is it mounted? + mountpoint = ( + subprocess.check_output( + [ + "lsblk", + f"/dev/mapper/{item}", + "--noheadings", + "-o", + "MOUNTPOINT", + ] + ) + .decode() + .strip() + ) if mountpoint: - return MountedVolume.from_volume(vol, mountpoint) + # Note: Here we're accepting the user's choice of how they + # have mounted the drive, including whatever permissions/ + # options they have set. + logger.info(f"Drive is already mounted at {mountpoint}") + return MountedVolume.from_volume(volume, mountpoint) else: - return self._mount_at_mountpoint(vol, self._DEFAULT_MOUNTPOINT) + logger.info( + "Drive is not mounted; mounting at default mountpoint" + ) - # If we got here, there is no unlocked VC drive present. Not an error, but not - # a state we can continue the workflow in, so raise ExportException. - logger.info("No unlocked Veracrypt drive found.") - raise ExportException(sdstatus=Status.UNKNOWN_DEVICE_DETECTED) + # Fixme: we can't reliably use chown as we do with luks+ext4, + # since we don't know what filesystem is inside the veracrypt container. + return self._mount_at_mountpoint( + volume, self._DEFAULT_MOUNTPOINT + ) - # If we got here, there were no entries in `/dev/mapper/` for us to look at. - raise ExportException(sdstatus=Status.DEVICE_ERROR) + else: # somehow it didn't work. dump the device info for now. + # fixme: this isn't necessarily. an error + logger.error(f"Did not parse veracrypt drive from: {status}") - except subprocess.CalledProcessError as e: - logger.error(e) - raise ExportException(sdstatus=Status.DEVICE_ERROR) + # If we got here, there is no unlocked VC drive present. Not an error, but not + # a state we can continue the workflow in, so raise ExportException. + logger.info("No unlocked Veracrypt drive found.") + raise ExportException(sdstatus=Status.UNKNOWN_DEVICE_DETECTED) + + except subprocess.CalledProcessError as ex: + logger.error("Encountered exception while checking /dev/mapper entries") + logger.debug(ex) + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex def attempt_unlock_veracrypt( self, volume: Volume, encryption_key: str diff --git a/tests/disk/test_cli.py b/tests/disk/test_cli.py index c751878..3b770d7 100644 --- a/tests/disk/test_cli.py +++ b/tests/disk/test_cli.py @@ -22,6 +22,10 @@ _SAMPLE_OUTPUT_USB = b"/dev/sda" # noqa _SAMPLE_LUKS_HEADER = b"\n\nUUID:\t123456-DEADBEEF" # noqa +_SAMPLE_CRYPTSETUP_STATUS_OUTPUT = b"/dev/mapper/vc is active and is in use.\n type: \ +TCRYPT\n cipher: aes-xts-plain64\n keysize: 512 bits\n key location: dm-crypt\n device: \ +/dev/sdc\n sector size: 512\n mode: read/write\n" +_SAMPLE_MOUNTED_VC_OUTPUT = b"/media/custom_mount" class TestCli: @@ -288,7 +292,7 @@ def test_unlock_luks_volume_luksOpen_exception(self, mocked_subprocess): @mock.patch("os.path.exists", return_value=True) @mock.patch("subprocess.check_output", return_value=b"\n") @mock.patch("subprocess.check_call", return_value=0) - def testmount_volume(self, mocked_call, mocked_output, mocked_path): + def test_mount_volume(self, mocked_call, mocked_output, mocked_path): vol = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, @@ -303,7 +307,9 @@ def testmount_volume(self, mocked_call, mocked_output, mocked_path): "subprocess.check_output", return_value=b"/dev/pretend/luks-id-123456\n" ) @mock.patch("subprocess.check_call", return_value=0) - def testmount_volume_already_mounted(self, mocked_output, mocked_call, mocked_path): + def test_mount_volume_already_mounted( + self, mocked_output, mocked_call, mocked_path + ): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, @@ -316,7 +322,7 @@ def testmount_volume_already_mounted(self, mocked_output, mocked_call, mocked_pa @mock.patch("os.path.exists", return_value=True) @mock.patch("subprocess.check_output", return_value=b"\n") @mock.patch("subprocess.check_call", return_value=0) - def testmount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path): + def test_mount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, @@ -331,7 +337,7 @@ def testmount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path): "subprocess.check_call", side_effect=subprocess.CalledProcessError(1, "check_call"), ) - def testmount_volume_error(self, mocked_subprocess, mocked_output): + def test_mount_volume_error(self, mocked_subprocess, mocked_output): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, @@ -577,3 +583,59 @@ def test_mount_fails_with_locked_device(self): self.cli.mount_volume(vol) assert ex.value.sdstatus == Status.ERROR_MOUNT + + @mock.patch("os.path.exists", return_value=True) + @mock.patch( + "subprocess.check_output", + side_effect=[b"vc", _SAMPLE_CRYPTSETUP_STATUS_OUTPUT, b""], + ) + @mock.patch("subprocess.check_call") + def test_get_unlocked_veracrypt_unmounted( + self, mock_call, mock_subprocess, mock_os + ): + vol = self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc") + + assert vol.unlocked + assert vol.mountpoint == self.cli._DEFAULT_MOUNTPOINT + + @mock.patch("os.path.exists", return_value=True) + @mock.patch( + "subprocess.check_output", + side_effect=[ + b"vc", + _SAMPLE_CRYPTSETUP_STATUS_OUTPUT, + _SAMPLE_MOUNTED_VC_OUTPUT, + ], + ) + def test_get_unlocked_veracrypt_mounted(self, mock_subprocess, mock_os): + v = self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc") + + assert v.unlocked + assert v.mountpoint == _SAMPLE_MOUNTED_VC_OUTPUT.decode("utf-8") + + @mock.patch("os.path.exists", return_value=True) + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "Oh no!"), + ) + def test_get_unlocked_veracrypt_lsblk_error(self, mock_subprocess, mock_os): + with pytest.raises(ExportException) as ex: + self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc") + + assert ex.value.sdstatus == Status.DEVICE_ERROR + + @mock.patch("os.path.exists", return_value=True) + @mock.patch("subprocess.check_output", return_value=b"sdc disk\n") + def test_get_unlocked_veracrypt_no_vc_drive(self, mock_subprocess, mock_os): + with pytest.raises(ExportException) as ex: + self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc") + + assert ex.value.sdstatus == Status.UNKNOWN_DEVICE_DETECTED + + @mock.patch("os.path.exists", return_value=True) + @mock.patch("subprocess.check_output", return_value=b"\n") + def test_get_unlocked_veracrypt_empty_lsblk_error(self, mock_subprocess, mock_os): + with pytest.raises(ExportException) as ex: + self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc") + + assert ex.value.sdstatus == Status.UNKNOWN_DEVICE_DETECTED From e588a53c2fd61052852c3709cafbc3c70b408f57 Mon Sep 17 00:00:00 2001 From: Ro Date: Thu, 30 Nov 2023 02:41:36 -0500 Subject: [PATCH 07/14] Use udisksctl for USB mounting --- securedrop_export/disk/cli.py | 70 +++++++++++---------------------- tests/disk/test_cli.py | 73 ++++------------------------------- 2 files changed, 30 insertions(+), 113 deletions(-) diff --git a/securedrop_export/disk/cli.py b/securedrop_export/disk/cli.py index 6652747..aa2c58f 100644 --- a/securedrop_export/disk/cli.py +++ b/securedrop_export/disk/cli.py @@ -24,8 +24,6 @@ class CLI: sys.exit(0) so that another program does not attempt to open the submission. """ - # Default mountpoint (unless drive is already mounted manually by the user) - _DEFAULT_MOUNTPOINT = "/media/usb" _DEFAULT_VC_CONTAINER_NAME = "vc-volume" def _get_connected_devices(self) -> List[str]: @@ -71,7 +69,6 @@ def _get_removable_devices(self, attached_devices: List[str]) -> List[str]: ["cat", f"/sys/class/block/{device}/removable"], stderr=subprocess.PIPE, ) - # removable is "0" for non-removable device, "1" for removable, # convert that into a Python boolean is_removable = bool(int(removable.decode("utf8").strip())) @@ -96,7 +93,9 @@ def get_all_volumes(self) -> List[Volume]: Caller must handle ExportException. """ volumes = [] + removable_devices = self._get_connected_devices() + try: for item in removable_devices: blkid = self._get_partitioned_device(item) @@ -233,8 +232,8 @@ def _get_luks_volume(self, device: str) -> Union[Volume, MountedVolume]: If LUKS volume is already mounted, existing mountpoint will be preserved and a MountedVolume object will be returned. - If LUKS volume is unlocked but not mounted, volume will be mounted at _DEFAULT_MOUNTPOINT, - and a MountedVolume object will be returned. + If LUKS volume is unlocked but not mounted, volume will be mounted and a MountedVolume + object will be returned. If device is still locked, mountpoint will not be set, and a Volume object will be retuned. Once the decrpytion passphrase is available, call unlock_luks_volume(), passing the Volume @@ -378,18 +377,11 @@ def _attempt_get_unlocked_veracrypt_volume(self, device_name: str) -> MountedVol logger.info(f"Drive is already mounted at {mountpoint}") return MountedVolume.from_volume(volume, mountpoint) else: - logger.info( - "Drive is not mounted; mounting at default mountpoint" - ) - - # Fixme: we can't reliably use chown as we do with luks+ext4, - # since we don't know what filesystem is inside the veracrypt container. - return self._mount_at_mountpoint( - volume, self._DEFAULT_MOUNTPOINT - ) + logger.info("Drive is not mounted; mounting") + return self.mount_volume(volume) - else: # somehow it didn't work. dump the device info for now. - # fixme: this isn't necessarily. an error + else: + # Somehow it didn't work. logger.error(f"Did not parse veracrypt drive from: {status}") # If we got here, there is no unlocked VC drive present. Not an error, but not @@ -465,7 +457,7 @@ def mount_volume(self, volume: Volume) -> MountedVolume: Given an unlocked LUKS volume, return MountedVolume object. If volume is already mounted, mountpoint is not changed. Otherwise, - volume is mounted at _DEFAULT_MOUNTPOINT. + volume is mounted inside /media/user/ by udisksctl. Raise ExportException if errors are encountered during mounting. """ @@ -480,40 +472,22 @@ def mount_volume(self, volume: Volume) -> MountedVolume: return MountedVolume.from_volume(volume, mountpoint) else: - logger.info("Mount volume at default mountpoint") - return self._mount_at_mountpoint(volume, self._DEFAULT_MOUNTPOINT) + try: + logger.info("Mount volume in /media/user using udisksctl") + output = subprocess.check_output( + ["udisksctl", "mount", "-b", f"/dev/mapper/{volume.mapped_name}"] + ).decode("utf-8") - def _mount_at_mountpoint(self, volume: Volume, mountpoint: str) -> MountedVolume: - """ - Mount a volume at the supplied mountpoint, creating the mountpoint directory and - adjusting permissions (user:user) if need be. `mountpoint` must be a full path. + # Success is "Mounted $device at $path" + if output.startswith("Mounted "): + mountpoint = output.split()[-1] + + return MountedVolume.from_volume(volume, mountpoint) - Return MountedVolume object. - Raise ExportException if unable to mount volume at target mountpoint. - """ - if not os.path.exists(mountpoint): - try: - subprocess.check_call(["sudo", "mkdir", mountpoint]) except subprocess.CalledProcessError as ex: logger.error(ex) raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex - # Mount device /dev/mapper/{mapped_name} at /media/usb/ - mapped_device_path = os.path.join( - volume.MAPPED_VOLUME_PREFIX, volume.mapped_name - ) - - try: - logger.info(f"Mounting volume at {mountpoint}") - subprocess.check_call(["sudo", "mount", mapped_device_path, mountpoint]) - subprocess.check_call(["sudo", "chown", "-R", "user:user", mountpoint]) - - return MountedVolume.from_volume(volume, mountpoint) - - except subprocess.CalledProcessError as ex: - logger.error(ex) - raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex - def write_data_to_device( self, submission_tmpdir: str, @@ -557,8 +531,10 @@ def cleanup_drive_and_tmpdir(self, volume: MountedVolume, submission_tmpdir: str try: subprocess.check_call(["sync"]) umounted = self._unmount_volume(volume) - if umounted: + if umounted.encryption is EncryptionScheme.LUKS: self._close_luks_volume(umounted) + elif umounted.encryption is EncryptionScheme.VERACRYPT: + self._close_veracrypt_volume(umounted) self._remove_temp_directory(submission_tmpdir) except subprocess.CalledProcessError as ex: @@ -572,7 +548,7 @@ def _unmount_volume(self, volume: MountedVolume) -> Volume: if os.path.exists(volume.mountpoint): logger.debug(f"Unmounting drive from {volume.mountpoint}") try: - subprocess.check_call(["sudo", "umount", volume.mountpoint]) + subprocess.check_call(["udisksctl", "unmount", volume.mountpoint]) except subprocess.CalledProcessError as ex: logger.error("Error unmounting device") diff --git a/tests/disk/test_cli.py b/tests/disk/test_cli.py index 3b770d7..761beb7 100644 --- a/tests/disk/test_cli.py +++ b/tests/disk/test_cli.py @@ -27,6 +27,8 @@ /dev/sdc\n sector size: 512\n mode: read/write\n" _SAMPLE_MOUNTED_VC_OUTPUT = b"/media/custom_mount" +_EXAMPLE_MOUNTPOINT = "/media/usb" + class TestCli: """ @@ -289,19 +291,6 @@ def test_unlock_luks_volume_luksOpen_exception(self, mocked_subprocess): assert ex.value.sdstatus is Status.DEVICE_ERROR - @mock.patch("os.path.exists", return_value=True) - @mock.patch("subprocess.check_output", return_value=b"\n") - @mock.patch("subprocess.check_call", return_value=0) - def test_mount_volume(self, mocked_call, mocked_output, mocked_path): - vol = Volume( - device_name=_DEFAULT_USB_DEVICE_ONE_PART, - mapped_name=_PRETEND_LUKS_ID, - encryption=EncryptionScheme.LUKS, - ) - mv = self.cli.mount_volume(vol) - assert isinstance(mv, MountedVolume) - assert mv.mountpoint is self.cli._DEFAULT_MOUNTPOINT - @mock.patch("os.path.exists", return_value=True) @mock.patch( "subprocess.check_output", return_value=b"/dev/pretend/luks-id-123456\n" @@ -349,47 +338,13 @@ def test_mount_volume_error(self, mocked_subprocess, mocked_output): assert ex.value.sdstatus is Status.ERROR_MOUNT - @mock.patch("os.path.exists", return_value=False) - @mock.patch( - "subprocess.check_call", - side_effect=subprocess.CalledProcessError(1, "check_call"), - ) - def test_mount_at_mountpoint_mkdir_error(self, mocked_subprocess, mocked_path): - md = Volume( - device_name=_DEFAULT_USB_DEVICE_ONE_PART, - mapped_name=_PRETEND_LUKS_ID, - encryption=EncryptionScheme.LUKS, - ) - - with pytest.raises(ExportException) as ex: - self.cli._mount_at_mountpoint(md, self.cli._DEFAULT_MOUNTPOINT) - - assert ex.value.sdstatus is Status.ERROR_MOUNT - - @mock.patch("os.path.exists", return_value=True) - @mock.patch( - "subprocess.check_call", - side_effect=subprocess.CalledProcessError(1, "check_call"), - ) - def test_mount_at_mountpoint_mounting_error(self, mocked_subprocess, mocked_path): - md = Volume( - device_name=_DEFAULT_USB_DEVICE_ONE_PART, - mapped_name=_PRETEND_LUKS_ID, - encryption=EncryptionScheme.LUKS, - ) - - with pytest.raises(ExportException) as ex: - self.cli._mount_at_mountpoint(md, self.cli._DEFAULT_MOUNTPOINT) - - assert ex.value.sdstatus is Status.ERROR_MOUNT - @mock.patch("os.path.exists", return_value=True) @mock.patch("subprocess.check_call", return_value=0) def test__unmount_volume(self, mocked_subprocess, mocked_mountpath): mounted = MountedVolume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, - mountpoint=self.cli._DEFAULT_MOUNTPOINT, + mountpoint="/media/usb", encryption=EncryptionScheme.LUKS, ) @@ -405,7 +360,7 @@ def test__unmount_volume_error(self, mocked_subprocess, mocked_mountpath): mounted = MountedVolume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, - mountpoint=self.cli._DEFAULT_MOUNTPOINT, + mountpoint=_EXAMPLE_MOUNTPOINT, encryption=EncryptionScheme.LUKS, ) @@ -461,7 +416,7 @@ def test_write_to_disk(self, mock_check_call): vol = MountedVolume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, - mountpoint=self.cli._DEFAULT_MOUNTPOINT, + mountpoint=_EXAMPLE_MOUNTPOINT, encryption=EncryptionScheme.LUKS, ) @@ -486,7 +441,7 @@ def test_write_to_disk_error_still_does_cleanup(self, mock_call): vol = MountedVolume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, - mountpoint=self.cli._DEFAULT_MOUNTPOINT, + mountpoint=_EXAMPLE_MOUNTPOINT, encryption=EncryptionScheme.LUKS, ) submission = Archive("testfile") @@ -520,7 +475,7 @@ def test_cleanup_drive_and_tmpdir(self, mock_subprocess, mocked_path): mapped_name=_PRETEND_LUKS_ID, encryption=EncryptionScheme.LUKS, ) - mv = MountedVolume.from_volume(vol, mountpoint=self.cli._DEFAULT_MOUNTPOINT) + mv = MountedVolume.from_volume(vol, mountpoint=_EXAMPLE_MOUNTPOINT) close_patch = mock.patch.object(self.cli, "_close_luks_volume") remove_tmpdir_patch = mock.patch.object(self.cli, "_remove_temp_directory") @@ -584,20 +539,6 @@ def test_mount_fails_with_locked_device(self): assert ex.value.sdstatus == Status.ERROR_MOUNT - @mock.patch("os.path.exists", return_value=True) - @mock.patch( - "subprocess.check_output", - side_effect=[b"vc", _SAMPLE_CRYPTSETUP_STATUS_OUTPUT, b""], - ) - @mock.patch("subprocess.check_call") - def test_get_unlocked_veracrypt_unmounted( - self, mock_call, mock_subprocess, mock_os - ): - vol = self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc") - - assert vol.unlocked - assert vol.mountpoint == self.cli._DEFAULT_MOUNTPOINT - @mock.patch("os.path.exists", return_value=True) @mock.patch( "subprocess.check_output", From d86720369f09b7593c2a16e5cd7bbaef63df8dac Mon Sep 17 00:00:00 2001 From: Ro Date: Thu, 30 Nov 2023 08:28:04 -0500 Subject: [PATCH 08/14] Error sooner when multiple drives attached --- securedrop_export/disk/service.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/securedrop_export/disk/service.py b/securedrop_export/disk/service.py index 49df835..ced2447 100644 --- a/securedrop_export/disk/service.py +++ b/securedrop_export/disk/service.py @@ -27,7 +27,13 @@ def scan_all_devices(self) -> Status: status. """ try: - status, _ = self._check_volumes(self.cli.get_all_volumes()) + volumes = self.cli.get_all_volumes() + if len(volumes) == 0: + status = Status.NO_DEVICE_DETECTED + elif len(volumes) > 1: + status = Status.MULTI_DEVICE_DETECTED + else: + status, _ = self._check_volumes(volumes) return status except ExportException as ex: From 1efe27f3c8ff25c22a4338bc92af72421fc00494 Mon Sep 17 00:00:00 2001 From: Ro Date: Thu, 30 Nov 2023 13:13:00 -0500 Subject: [PATCH 09/14] Remove encryption_method parameter. --- securedrop_export/archive.py | 14 ++------------ tests/test_archive.py | 31 +------------------------------ 2 files changed, 3 insertions(+), 42 deletions(-) diff --git a/securedrop_export/archive.py b/securedrop_export/archive.py index ed81082..fb39abf 100755 --- a/securedrop_export/archive.py +++ b/securedrop_export/archive.py @@ -26,7 +26,6 @@ class Metadata(object): """ METADATA_FILE = "metadata.json" - SUPPORTED_ENCRYPTION_METHODS = ["luks"] def __init__(self, archive_path: str): self.metadata_path = os.path.join(archive_path, self.METADATA_FILE) @@ -38,11 +37,10 @@ def validate(self) -> "Metadata": logger.info("Parsing archive metadata") json_config = json.loads(f.read()) self.export_method = json_config.get("device", None) - self.encryption_method = json_config.get("encryption_method", None) self.encryption_key = json_config.get("encryption_key", None) logger.info( - "Target: {}, encryption_method {}".format( - self.export_method, self.encryption_method + "Command: {}".format( + self.export_method ) ) @@ -54,12 +52,6 @@ def validate(self) -> "Metadata": try: logger.debug("Validate export action") self.command = Command(self.export_method) - if ( - self.command is Command.EXPORT - and self.encryption_method not in self.SUPPORTED_ENCRYPTION_METHODS - ): - logger.error("Unsupported encryption method") - raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) except ValueError as v: raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) from v @@ -95,7 +87,5 @@ def set_metadata(self, metadata: Metadata) -> "Archive": """ self.command = metadata.command if self.command is Command.EXPORT: - # When we support multiple encryption types, we will also want to add the - # encryption_method here self.encryption_key = metadata.encryption_key return self diff --git a/tests/test_archive.py b/tests/test_archive.py index 57791a8..f8c4c65 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -22,7 +22,6 @@ def test_extract_tarball(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -73,7 +72,6 @@ def test_extract_tarball_with_symlink(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -108,7 +106,6 @@ def test_extract_tarball_raises_if_doing_path_traversal(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -147,7 +144,6 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_dir(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -184,7 +180,6 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_symlink(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -223,7 +218,6 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_symlink_linkname(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -260,7 +254,6 @@ def test_extract_tarball_raises_if_name_has_unsafe_absolute_path(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -303,7 +296,6 @@ def test_extract_tarball_raises_if_name_has_unsafe_absolute_path_with_symlink(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -347,7 +339,6 @@ def test_extract_tarball_raises_if_name_has_unsafe_absolute_path_with_symlink_to with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -380,7 +371,6 @@ def test_extract_tarball_raises_if_linkname_has_unsafe_absolute_path(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -426,7 +416,6 @@ def test_valid_printer_test_config(capsys): config = Metadata(temp_folder).validate() assert config.encryption_key is None - assert config.encryption_method is None def test_valid_printer_config(capsys): @@ -439,23 +428,6 @@ def test_valid_printer_config(capsys): config = Metadata(temp_folder).validate() assert config.encryption_key is None - assert config.encryption_method is None - - -def test_invalid_encryption_config(capsys): - Archive("testfile") - - temp_folder = tempfile.mkdtemp() - metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) - with open(metadata, "w") as f: - f.write( - '{"device": "disk", "encryption_method": "base64", "encryption_key": "hunter1"}' - ) - - with pytest.raises(ExportException) as ex: - Metadata(temp_folder).validate() - - assert ex.value.sdstatus is Status.ERROR_ARCHIVE_METADATA def test_invalid_config(capsys): @@ -492,13 +464,12 @@ def test_valid_encryption_config(capsys): metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: f.write( - '{"device": "disk", "encryption_method": "luks", "encryption_key": "hunter1"}' + '{"device": "disk", "encryption_key": "hunter1"}' ) config = Metadata(temp_folder).validate() assert config.encryption_key == "hunter1" - assert config.encryption_method == "luks" @mock.patch("json.loads", side_effect=json.decoder.JSONDecodeError("ugh", "badjson", 0)) From 4fdfa496e5d2cefe795a7174fcab3e5b5af84354 Mon Sep 17 00:00:00 2001 From: Ro Date: Thu, 30 Nov 2023 13:13:19 -0500 Subject: [PATCH 10/14] Update README --- README.md | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 241f97b..31cefe8 100644 --- a/README.md +++ b/README.md @@ -64,9 +64,6 @@ Metadata contains three possible keys which may contain several possible values: `device` : specifies the method used for export, and can be either a device or a preflight check. See the Devices section below for possible values. It is a required key. -`encryption_method` -: used exclusively when exporting to USB storage. It is an optional key. Possible values are: -luks `encryption_passphrase` : used exclusively when exporting to USB storage. It is an optional key. It contains an arbitrary string that contains the disk encryption passphrase of the device. @@ -76,7 +73,6 @@ Example archive metadata (`metadata.json`): ``` { "device": "disk" - "encryption-method": "luks" "encryption-key": "Your encryption passphrase goes here" } ``` @@ -94,34 +90,34 @@ For all device types (described in detail below), the following standard error t The supported device types for export are as follows, including the possible errors specific to that device type: -1. `usb-test` : Preflight check that probes for USB connected devices, that returns: - - `USB_CONNECTED` if a USB device is attached to the dedicated slot - - `USB_NOT_CONNECTED` if no USB is attached - - `USB_CHECK_ERROR` if an error occurred during pre-flight +1. `disk-test` : Preflight check that probes for USB connected devices, that returns: + - `DEVICE_WRITABLE` if a supported USB device is attached and unlocked + - `DEVICE_LOCKED` if a supported drive is inserted but locked (a LUKS drive, since locked Veracrypt detection is not supported) + - `NO_DEVICE_DETECTED`, `MULTI_DEVICE_DETECTED`: wrong number of inserted USB drives + - `INVALID_DEVICE_DETECTED`: Wrong number of partitions, unsupported encryption scheme, etc + - `UNKNOWN_DEVICE_DETECTED`: (Future use) this is what a locked drive that could be Veracrypt would return + - `DEVICE_ERROR`: A problem was encountered and device state cannot be reported. -2. `disk-test`: Preflight check that checks for LUKS-encrypted volume that returns: - - `USB_ENCRYPTED` if a LUKS volume is attached to sd-devices - - `USB_ENCRYPTION_NOT_SUPPORTED` if a LUKS volume is not attached or there was any other error - - `USB_DISK_ERROR` - -3. `printer-test`: prints a test page that returns: +2. `printer-test`: prints a test page that returns: - `ERROR_PRINTER_NOT_FOUND` if no printer is connected - `ERROR_PRINTER_NOT_SUPPORTED` if the printer is not currently supported by the export script - `ERROR_PRINTER_DRIVER_UNAVAILABLE` if the printer driver is not available - `ERROR_PRINTER_INSTALL` If there is an error installing the printer - `ERROR_PRINT` if there is an error printing -4. `printer`: sends files to printer that returns: +3. `printer`: sends files to printer that returns: - `ERROR_PRINTER_NOT_FOUND` if no printer is connected - `ERROR_PRINTER_NOT_SUPPORTED` if the printer is not currently supported by the export script - `ERROR_PRINTER_DRIVER_UNAVAILABLE` if the printer driver is not available - `ERROR_PRINTER_INSTALL` If there is an error installing the printer - `ERROR_PRINT` if there is an error printing -5. `disk`: sends files to disk that returns: - - `USB_BAD_PASSPHRASE` if the luks decryption failed (likely due to bad passphrase) - - `ERROR_USB_MOUNT` if there was an error mounting the volume (after unlocking the luks volume) - - `ERROR_USB_WRITE` if there was an error writing to disk (e.g., no space left on device) +4. `disk`: sends files to disk that returns: + - `SUCCESS_EXPORT`: Successful + - `ERROR_CLEANUP`: Export was successful but files could not be cleaned up or drive was not properly unmounted + - `ERROR_UNLOCK_LUKS` if the luks decryption failed (likely due to bad passphrase) + - `ERROR_MOUNT` if there was an error mounting the volume (after unlocking the luks volume) + - `ERROR_WRITE` if there was an error writing to disk (e.g., no space left on device) ### Export Folder Structure @@ -132,7 +128,8 @@ When exporting to a USB drive, the files will be placed on the drive as follows: └── sd-export-20200116-003153 └── export_data - └── secret_memo.pdf + └── transcript.txt + └── secret_memo.pdf ``` To support multiple files, in the long term, we are planning to use a folder structure similar to the following, where the journalist designation for a source is used for folder names and message/reply file names. From 548c2982ff78a84c087ff7c307971e18bec85b6a Mon Sep 17 00:00:00 2001 From: Kunal Mehta Date: Tue, 5 Dec 2023 13:14:48 -0500 Subject: [PATCH 11/14] Apply black formatting --- securedrop_export/archive.py | 6 +----- tests/test_archive.py | 4 +--- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/securedrop_export/archive.py b/securedrop_export/archive.py index fb39abf..8e4ac7f 100755 --- a/securedrop_export/archive.py +++ b/securedrop_export/archive.py @@ -38,11 +38,7 @@ def validate(self) -> "Metadata": json_config = json.loads(f.read()) self.export_method = json_config.get("device", None) self.encryption_key = json_config.get("encryption_key", None) - logger.info( - "Command: {}".format( - self.export_method - ) - ) + logger.info("Command: {}".format(self.export_method)) except Exception as ex: logger.error("Metadata parsing failure") diff --git a/tests/test_archive.py b/tests/test_archive.py index f8c4c65..7c09b83 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -463,9 +463,7 @@ def test_valid_encryption_config(capsys): temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: - f.write( - '{"device": "disk", "encryption_key": "hunter1"}' - ) + f.write('{"device": "disk", "encryption_key": "hunter1"}') config = Metadata(temp_folder).validate() From 4ccbfefb22d69f6483a7cb1ccb3c22348e903560 Mon Sep 17 00:00:00 2001 From: Kunal Mehta Date: Tue, 5 Dec 2023 13:17:56 -0500 Subject: [PATCH 12/14] Handle `udisksctl mount` not returning excepted output mypy noted a theoretical case in which `udisksctl mount` exits with a 0 status code but doesn't return our expected output. In this case just raise an exception as we couldn't figure out the mount point. While we're at it, add a test case to verify the mountpoint is parsed correctly. --- securedrop_export/disk/cli.py | 3 +++ tests/disk/test_cli.py | 10 +++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/securedrop_export/disk/cli.py b/securedrop_export/disk/cli.py index aa2c58f..bdf2141 100644 --- a/securedrop_export/disk/cli.py +++ b/securedrop_export/disk/cli.py @@ -481,6 +481,9 @@ def mount_volume(self, volume: Volume) -> MountedVolume: # Success is "Mounted $device at $path" if output.startswith("Mounted "): mountpoint = output.split()[-1] + else: + # it didn't successfully mount, but also exited with code 0? + raise ExportException(sdstatus=Status.ERROR_MOUNT) return MountedVolume.from_volume(volume, mountpoint) diff --git a/tests/disk/test_cli.py b/tests/disk/test_cli.py index 761beb7..c9b333e 100644 --- a/tests/disk/test_cli.py +++ b/tests/disk/test_cli.py @@ -309,17 +309,21 @@ def test_mount_volume_already_mounted( assert isinstance(result, MountedVolume) @mock.patch("os.path.exists", return_value=True) - @mock.patch("subprocess.check_output", return_value=b"\n") + @mock.patch("subprocess.check_output", return_value=b"Mounted $device at $path") @mock.patch("subprocess.check_call", return_value=0) - def test_mount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path): + @mock.patch("securedrop_export.disk.cli.CLI._get_mountpoint", return_value=None) + def test_mount_volume_mkdir( + self, mocked_output, mocked_subprocess, mocked_path, mocked_mountpoint + ): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, encryption=EncryptionScheme.LUKS, ) mv = self.cli.mount_volume(md) - assert mv.mapped_name == _PRETEND_LUKS_ID assert isinstance(mv, MountedVolume) + assert mv.mapped_name == _PRETEND_LUKS_ID + assert mv.mountpoint == "$path" @mock.patch("subprocess.check_output", return_value=b"\n") @mock.patch( From 15a8fa8f617f229d7154d31bf02e0786f994ba94 Mon Sep 17 00:00:00 2001 From: Kunal Mehta Date: Tue, 5 Dec 2023 13:18:46 -0500 Subject: [PATCH 13/14] Fix type signature of CLI._close_veracrypt_volume() I think this was just a mistake because the volume is already unmounted before the volume is closed. --- securedrop_export/disk/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/securedrop_export/disk/cli.py b/securedrop_export/disk/cli.py index bdf2141..de48814 100644 --- a/securedrop_export/disk/cli.py +++ b/securedrop_export/disk/cli.py @@ -580,7 +580,7 @@ def _close_luks_volume(self, unlocked_device: Volume) -> None: logger.error("Error closing device") raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex - def _close_veracrypt_volume(self, unlocked_device: MountedVolume) -> None: + def _close_veracrypt_volume(self, unlocked_device: Volume) -> None: """ Helper. Close VeraCrypt volume. """ From 3b7a34c56192fd33495ef97737dee9a4c0b72a11 Mon Sep 17 00:00:00 2001 From: Kunal Mehta Date: Tue, 5 Dec 2023 13:29:04 -0500 Subject: [PATCH 14/14] Disable logging when tests are run This should hopefully fix CI and running in containers that don't have an active rsyslog daemon. --- Makefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index b650fcb..c74c365 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,8 @@ check-black: ## Check Python source code formatting with black TESTS ?= tests .PHONY: test test: ## Run tests - poetry run pytest -v --cov-report html --cov-report term-missing --cov=securedrop_export $$TESTS + poetry run pytest -v --cov-report html --cov-report term-missing \ + --cov=securedrop_export --log-disable=securedrop_export.main $$TESTS .PHONY: lint lint: ## Run linter