diff --git a/Makefile b/Makefile index b650fcb..c74c365 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,8 @@ check-black: ## Check Python source code formatting with black TESTS ?= tests .PHONY: test test: ## Run tests - poetry run pytest -v --cov-report html --cov-report term-missing --cov=securedrop_export $$TESTS + poetry run pytest -v --cov-report html --cov-report term-missing \ + --cov=securedrop_export --log-disable=securedrop_export.main $$TESTS .PHONY: lint lint: ## Run linter diff --git a/README.md b/README.md index 241f97b..31cefe8 100644 --- a/README.md +++ b/README.md @@ -64,9 +64,6 @@ Metadata contains three possible keys which may contain several possible values: `device` : specifies the method used for export, and can be either a device or a preflight check. See the Devices section below for possible values. It is a required key. -`encryption_method` -: used exclusively when exporting to USB storage. It is an optional key. Possible values are: -luks `encryption_passphrase` : used exclusively when exporting to USB storage. It is an optional key. It contains an arbitrary string that contains the disk encryption passphrase of the device. @@ -76,7 +73,6 @@ Example archive metadata (`metadata.json`): ``` { "device": "disk" - "encryption-method": "luks" "encryption-key": "Your encryption passphrase goes here" } ``` @@ -94,34 +90,34 @@ For all device types (described in detail below), the following standard error t The supported device types for export are as follows, including the possible errors specific to that device type: -1. `usb-test` : Preflight check that probes for USB connected devices, that returns: - - `USB_CONNECTED` if a USB device is attached to the dedicated slot - - `USB_NOT_CONNECTED` if no USB is attached - - `USB_CHECK_ERROR` if an error occurred during pre-flight +1. `disk-test` : Preflight check that probes for USB connected devices, that returns: + - `DEVICE_WRITABLE` if a supported USB device is attached and unlocked + - `DEVICE_LOCKED` if a supported drive is inserted but locked (a LUKS drive, since locked Veracrypt detection is not supported) + - `NO_DEVICE_DETECTED`, `MULTI_DEVICE_DETECTED`: wrong number of inserted USB drives + - `INVALID_DEVICE_DETECTED`: Wrong number of partitions, unsupported encryption scheme, etc + - `UNKNOWN_DEVICE_DETECTED`: (Future use) this is what a locked drive that could be Veracrypt would return + - `DEVICE_ERROR`: A problem was encountered and device state cannot be reported. -2. `disk-test`: Preflight check that checks for LUKS-encrypted volume that returns: - - `USB_ENCRYPTED` if a LUKS volume is attached to sd-devices - - `USB_ENCRYPTION_NOT_SUPPORTED` if a LUKS volume is not attached or there was any other error - - `USB_DISK_ERROR` - -3. `printer-test`: prints a test page that returns: +2. `printer-test`: prints a test page that returns: - `ERROR_PRINTER_NOT_FOUND` if no printer is connected - `ERROR_PRINTER_NOT_SUPPORTED` if the printer is not currently supported by the export script - `ERROR_PRINTER_DRIVER_UNAVAILABLE` if the printer driver is not available - `ERROR_PRINTER_INSTALL` If there is an error installing the printer - `ERROR_PRINT` if there is an error printing -4. `printer`: sends files to printer that returns: +3. `printer`: sends files to printer that returns: - `ERROR_PRINTER_NOT_FOUND` if no printer is connected - `ERROR_PRINTER_NOT_SUPPORTED` if the printer is not currently supported by the export script - `ERROR_PRINTER_DRIVER_UNAVAILABLE` if the printer driver is not available - `ERROR_PRINTER_INSTALL` If there is an error installing the printer - `ERROR_PRINT` if there is an error printing -5. `disk`: sends files to disk that returns: - - `USB_BAD_PASSPHRASE` if the luks decryption failed (likely due to bad passphrase) - - `ERROR_USB_MOUNT` if there was an error mounting the volume (after unlocking the luks volume) - - `ERROR_USB_WRITE` if there was an error writing to disk (e.g., no space left on device) +4. `disk`: sends files to disk that returns: + - `SUCCESS_EXPORT`: Successful + - `ERROR_CLEANUP`: Export was successful but files could not be cleaned up or drive was not properly unmounted + - `ERROR_UNLOCK_LUKS` if the luks decryption failed (likely due to bad passphrase) + - `ERROR_MOUNT` if there was an error mounting the volume (after unlocking the luks volume) + - `ERROR_WRITE` if there was an error writing to disk (e.g., no space left on device) ### Export Folder Structure @@ -132,7 +128,8 @@ When exporting to a USB drive, the files will be placed on the drive as follows: └── sd-export-20200116-003153 └── export_data - └── secret_memo.pdf + └── transcript.txt + └── secret_memo.pdf ``` To support multiple files, in the long term, we are planning to use a folder structure similar to the following, where the journalist designation for a source is used for folder names and message/reply file names. diff --git a/securedrop_export/archive.py b/securedrop_export/archive.py index ed81082..8e4ac7f 100755 --- a/securedrop_export/archive.py +++ b/securedrop_export/archive.py @@ -26,7 +26,6 @@ class Metadata(object): """ METADATA_FILE = "metadata.json" - SUPPORTED_ENCRYPTION_METHODS = ["luks"] def __init__(self, archive_path: str): self.metadata_path = os.path.join(archive_path, self.METADATA_FILE) @@ -38,13 +37,8 @@ def validate(self) -> "Metadata": logger.info("Parsing archive metadata") json_config = json.loads(f.read()) self.export_method = json_config.get("device", None) - self.encryption_method = json_config.get("encryption_method", None) self.encryption_key = json_config.get("encryption_key", None) - logger.info( - "Target: {}, encryption_method {}".format( - self.export_method, self.encryption_method - ) - ) + logger.info("Command: {}".format(self.export_method)) except Exception as ex: logger.error("Metadata parsing failure") @@ -54,12 +48,6 @@ def validate(self) -> "Metadata": try: logger.debug("Validate export action") self.command = Command(self.export_method) - if ( - self.command is Command.EXPORT - and self.encryption_method not in self.SUPPORTED_ENCRYPTION_METHODS - ): - logger.error("Unsupported encryption method") - raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) except ValueError as v: raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) from v @@ -95,7 +83,5 @@ def set_metadata(self, metadata: Metadata) -> "Archive": """ self.command = metadata.command if self.command is Command.EXPORT: - # When we support multiple encryption types, we will also want to add the - # encryption_method here self.encryption_key = metadata.encryption_key return self diff --git a/securedrop_export/disk/__init__.py b/securedrop_export/disk/__init__.py index e610945..760c6e0 100644 --- a/securedrop_export/disk/__init__.py +++ b/securedrop_export/disk/__init__.py @@ -1,2 +1,2 @@ -from .legacy_service import Service as LegacyService # noqa: F401 -from .legacy_status import Status as LegacyStatus # noqa: F401 +from .service import Service # noqa: F401 +from .status import Status # noqa: F401 diff --git a/securedrop_export/disk/cli.py b/securedrop_export/disk/cli.py index abdc0c1..de48814 100644 --- a/securedrop_export/disk/cli.py +++ b/securedrop_export/disk/cli.py @@ -11,6 +11,9 @@ logger = logging.getLogger(__name__) +# Entries in /dev/mapper on sd-devices +_DEVMAPPER_SYSTEM = ["control", "dmroot"] + class CLI: """ @@ -21,10 +24,9 @@ class CLI: sys.exit(0) so that another program does not attempt to open the submission. """ - # Default mountpoint (unless drive is already mounted manually by the user) - _DEFAULT_MOUNTPOINT = "/media/usb" + _DEFAULT_VC_CONTAINER_NAME = "vc-volume" - def get_connected_devices(self) -> List[str]: + def _get_connected_devices(self) -> List[str]: """ List all block devices attached to VM that are disks and not partitions. Return list of all removable connected block devices. @@ -67,7 +69,6 @@ def _get_removable_devices(self, attached_devices: List[str]) -> List[str]: ["cat", f"/sys/class/block/{device}/removable"], stderr=subprocess.PIPE, ) - # removable is "0" for non-removable device, "1" for removable, # convert that into a Python boolean is_removable = bool(int(removable.decode("utf8").strip())) @@ -82,7 +83,53 @@ def _get_removable_devices(self, attached_devices: List[str]) -> List[str]: logger.info(f"{len(usb_devices)} connected") return usb_devices - def get_partitioned_device(self, blkid: str) -> str: + def get_all_volumes(self) -> List[Volume]: + """ + Returns a list of all currently-attached removable Volumes that are + export device candidates, attempting to get as far towards export process + as possible (i.e. probing if device is already unlocked and/or mounted, + and mounting it if unlocked but unmounted.) + + Caller must handle ExportException. + """ + volumes = [] + + removable_devices = self._get_connected_devices() + + try: + for item in removable_devices: + blkid = self._get_partitioned_device(item) + if self.is_luks_volume(blkid): + logger.debug("LUKS volume detected. Checking if unlocked.") + volumes.append(self._get_luks_volume(blkid)) + else: + try: + logger.debug( + "Not a LUKS volume. Checking if unlocked VeraCrypt." + ) + volumes.append( + self._attempt_get_unlocked_veracrypt_volume(blkid) + ) + except ExportException: + logger.info("Device is not an unlocked Veracrypt drive.") + volumes.append( + Volume( + device_name=blkid, + encryption=EncryptionScheme.UNKNOWN, + # This will be the name we use if + # trying to unlock the drive. + mapped_name=self._DEFAULT_VC_CONTAINER_NAME, + ) + ) + + return volumes + + except ExportException as ex: + logger.error(f"get_all_volumes failed: {ex.sdstatus.value}") + logger.debug(ex) + raise + + def _get_partitioned_device(self, blkid: str) -> str: """ Given a string representing a block device, return string that includes correct partition (such as "/dev/sda" or "/dev/sda1"). @@ -178,15 +225,15 @@ def _get_luks_name_from_headers(self, device: str) -> str: logger.error("Failed to dump LUKS header") raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex - def get_luks_volume(self, device: str) -> Union[Volume, MountedVolume]: + def _get_luks_volume(self, device: str) -> Union[Volume, MountedVolume]: """ Given a string corresponding to a LUKS-partitioned volume, return a corresponding Volume object. If LUKS volume is already mounted, existing mountpoint will be preserved and a MountedVolume object will be returned. - If LUKS volume is unlocked but not mounted, volume will be mounted at _DEFAULT_MOUNTPOINT, - and a MountedVolume object will be returned. + If LUKS volume is unlocked but not mounted, volume will be mounted and a MountedVolume + object will be returned. If device is still locked, mountpoint will not be set, and a Volume object will be retuned. Once the decrpytion passphrase is available, call unlock_luks_volume(), passing the Volume @@ -248,11 +295,8 @@ def unlock_luks_volume(self, volume: Volume, decryption_key: str) -> Volume: rc = p.returncode if rc == 0: - return Volume( - device_name=volume.device_name, - mapped_name=volume.mapped_name, - encryption=EncryptionScheme.LUKS, - ) + logger.debug("Successfully unlocked.") + return volume else: logger.error("Bad volume passphrase") raise ExportException(sdstatus=Status.ERROR_UNLOCK_LUKS) @@ -260,6 +304,138 @@ def unlock_luks_volume(self, volume: Volume, decryption_key: str) -> Volume: except subprocess.CalledProcessError as ex: raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + def _get_dev_mapper_entries(self) -> List[str]: + """ + Helper function to return a list of entries in /dev/mapper/ + (excluding `system` and `dmroot`). + """ + try: + ls = subprocess.check_output(["ls", "/dev/mapper/"], stderr=subprocess.PIPE) + entries = ls.decode("utf-8").rstrip().split("\n") + + return [r for r in entries if r not in _DEVMAPPER_SYSTEM] + + except (subprocess.CalledProcessError, ValueError) as ex: + logger.error(f"Error checking entries in /dev/mapper: {ex}") + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + + def _attempt_get_unlocked_veracrypt_volume(self, device_name: str) -> MountedVolume: + """ + Looks for an already-unlocked volume in /dev/mapper to see if the name matches + given device name. + Returns MountedVolume object if a drive is found. Otherwise, raises ExportException. + """ + try: + devmapper_entries = self._get_dev_mapper_entries() + for item in devmapper_entries: + # Check it out with cryptsetup, see if it's a VeraCrypt/TrueCrypt drive. + # Example format (some lines ommitted for brevity): + # + # b'/dev/mapper/vc is active and is in use.\n type: TCRYPT\n cipher: + # aes-xts-plain64\n keysize: 512 bits\n key location: dm-crypt\n device: + # /dev/sdc\n sector size: 512\noffset: 256 sectors\n size: + # 1968640 sectors\n skipped: 256 sectors\n mode: read/write\n' + # + # (A mapped entry can also have a null device, if it wasn't properly removed + # from /dev/mapper using `cryptsetup close`.) + status = ( + subprocess.check_output( + ["sudo", "cryptsetup", "status", f"/dev/mapper/{item}"] + ) + .decode("utf-8") + .split("\n ") + ) + + logger.debug(f"{status}") + + if "type: TCRYPT" in status and f"device: {device_name}" in status: + logger.info("Unlocked VeraCrypt volume detected") + volume = Volume( + device_name=device_name, + mapped_name=item, + encryption=EncryptionScheme.VERACRYPT, + ) + + # Is it mounted? + mountpoint = ( + subprocess.check_output( + [ + "lsblk", + f"/dev/mapper/{item}", + "--noheadings", + "-o", + "MOUNTPOINT", + ] + ) + .decode() + .strip() + ) + if mountpoint: + # Note: Here we're accepting the user's choice of how they + # have mounted the drive, including whatever permissions/ + # options they have set. + logger.info(f"Drive is already mounted at {mountpoint}") + return MountedVolume.from_volume(volume, mountpoint) + else: + logger.info("Drive is not mounted; mounting") + return self.mount_volume(volume) + + else: + # Somehow it didn't work. + logger.error(f"Did not parse veracrypt drive from: {status}") + + # If we got here, there is no unlocked VC drive present. Not an error, but not + # a state we can continue the workflow in, so raise ExportException. + logger.info("No unlocked Veracrypt drive found.") + raise ExportException(sdstatus=Status.UNKNOWN_DEVICE_DETECTED) + + except subprocess.CalledProcessError as ex: + logger.error("Encountered exception while checking /dev/mapper entries") + logger.debug(ex) + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + + def attempt_unlock_veracrypt( + self, volume: Volume, encryption_key: str + ) -> MountedVolume: + """ + Attempt to unlock and mount a presumed-Veracrypt drive at the default mountpoint. + """ + try: + p = subprocess.Popen( + [ + "sudo", + "cryptsetup", + "open", + "--type", + "tcrypt", + "--veracrypt", + f"{volume.device_name}", + f"{self._DEFAULT_VC_CONTAINER_NAME}", + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + p.communicate(input=str.encode(encryption_key, "utf-8")) + rc = p.returncode + + if rc == 0: + volume.encryption = EncryptionScheme.VERACRYPT + + # Mapped name is /dev/mapper/${self._DEFAULT_VC_CONTAINER_NAME}, since + # the /dev/mapper entry isn't derived from the header like a LUKS drive + volume.mapped_name = self._DEFAULT_VC_CONTAINER_NAME + return self.mount_volume(volume) + + else: + # Something was wrong and we could not unlock. + logger.error("Unlocking failed. Bad passphrase, or unsuitable volume.") + raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC) + + except subprocess.CalledProcessError as error: + logger.error("Error during unlock/mount attempt.") + logger.debug(error) + raise ExportException(sdstatus=Status.ERROR_UNLOCK_GENERIC) + def _get_mountpoint(self, volume: Volume) -> Optional[str]: """ Check for existing mountpoint. @@ -281,7 +457,7 @@ def mount_volume(self, volume: Volume) -> MountedVolume: Given an unlocked LUKS volume, return MountedVolume object. If volume is already mounted, mountpoint is not changed. Otherwise, - volume is mounted at _DEFAULT_MOUNTPOINT. + volume is mounted inside /media/user/ by udisksctl. Raise ExportException if errors are encountered during mounting. """ @@ -296,40 +472,25 @@ def mount_volume(self, volume: Volume) -> MountedVolume: return MountedVolume.from_volume(volume, mountpoint) else: - logger.info("Mount volume at default mountpoint") - return self._mount_at_mountpoint(volume, self._DEFAULT_MOUNTPOINT) + try: + logger.info("Mount volume in /media/user using udisksctl") + output = subprocess.check_output( + ["udisksctl", "mount", "-b", f"/dev/mapper/{volume.mapped_name}"] + ).decode("utf-8") - def _mount_at_mountpoint(self, volume: Volume, mountpoint: str) -> MountedVolume: - """ - Mount a volume at the supplied mountpoint, creating the mountpoint directory and - adjusting permissions (user:user) if need be. `mountpoint` must be a full path. + # Success is "Mounted $device at $path" + if output.startswith("Mounted "): + mountpoint = output.split()[-1] + else: + # it didn't successfully mount, but also exited with code 0? + raise ExportException(sdstatus=Status.ERROR_MOUNT) + + return MountedVolume.from_volume(volume, mountpoint) - Return MountedVolume object. - Raise ExportException if unable to mount volume at target mountpoint. - """ - if not os.path.exists(mountpoint): - try: - subprocess.check_call(["sudo", "mkdir", mountpoint]) except subprocess.CalledProcessError as ex: logger.error(ex) raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex - # Mount device /dev/mapper/{mapped_name} at /media/usb/ - mapped_device_path = os.path.join( - volume.MAPPED_VOLUME_PREFIX, volume.mapped_name - ) - - try: - logger.info(f"Mounting volume at {mountpoint}") - subprocess.check_call(["sudo", "mount", mapped_device_path, mountpoint]) - subprocess.check_call(["sudo", "chown", "-R", "user:user", mountpoint]) - - except subprocess.CalledProcessError as ex: - logger.error(ex) - raise ExportException(sdstatus=Status.ERROR_MOUNT) from ex - - return MountedVolume.from_volume(volume, mountpoint) - def write_data_to_device( self, submission_tmpdir: str, @@ -373,8 +534,10 @@ def cleanup_drive_and_tmpdir(self, volume: MountedVolume, submission_tmpdir: str try: subprocess.check_call(["sync"]) umounted = self._unmount_volume(volume) - if umounted: + if umounted.encryption is EncryptionScheme.LUKS: self._close_luks_volume(umounted) + elif umounted.encryption is EncryptionScheme.VERACRYPT: + self._close_veracrypt_volume(umounted) self._remove_temp_directory(submission_tmpdir) except subprocess.CalledProcessError as ex: @@ -388,7 +551,7 @@ def _unmount_volume(self, volume: MountedVolume) -> Volume: if os.path.exists(volume.mountpoint): logger.debug(f"Unmounting drive from {volume.mountpoint}") try: - subprocess.check_call(["sudo", "umount", volume.mountpoint]) + subprocess.check_call(["udisksctl", "unmount", volume.mountpoint]) except subprocess.CalledProcessError as ex: logger.error("Error unmounting device") @@ -417,6 +580,21 @@ def _close_luks_volume(self, unlocked_device: Volume) -> None: logger.error("Error closing device") raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + def _close_veracrypt_volume(self, unlocked_device: Volume) -> None: + """ + Helper. Close VeraCrypt volume. + """ + if os.path.exists(os.path.join("/dev/mapper", unlocked_device.mapped_name)): + logger.debug("Locking luks volume {}".format(unlocked_device)) + try: + subprocess.check_call( + ["sudo", "cryptsetup", "close", unlocked_device.mapped_name] + ) + + except subprocess.CalledProcessError as ex: + logger.error("Error closing device") + raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex + def _remove_temp_directory(self, tmpdir: str): """ Helper. Remove temporary directory used during archive export. diff --git a/securedrop_export/disk/legacy_service.py b/securedrop_export/disk/legacy_service.py deleted file mode 100644 index 3dbe6ac..0000000 --- a/securedrop_export/disk/legacy_service.py +++ /dev/null @@ -1,156 +0,0 @@ -import logging - -from securedrop_export.exceptions import ExportException - -from .cli import CLI -from .legacy_status import Status as LegacyStatus -from .status import Status as Status -from .volume import MountedVolume - -logger = logging.getLogger(__name__) - - -class Service: - def __init__(self, submission, cli=None): - self.submission = submission - self.cli = cli or CLI() - - def check_connected_devices(self) -> LegacyStatus: - """ - Check if single USB is inserted. - """ - logger.info("Export archive is usb-test") - - try: - all_devices = self.cli.get_connected_devices() - num_devices = len(all_devices) - - except ExportException as ex: - logger.error(f"Error encountered during USB check: {ex.sdstatus.value}") - # Use legacy status instead of new status values - raise ExportException(sdstatus=LegacyStatus.LEGACY_ERROR_USB_CHECK) from ex - - if num_devices == 0: - raise ExportException(sdstatus=LegacyStatus.LEGACY_USB_NOT_CONNECTED) - elif num_devices == 1: - return LegacyStatus.LEGACY_USB_CONNECTED - elif num_devices > 1: - raise ExportException( - sdstatus=LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - ) - else: - # Unreachable, num_devices is a non-negative integer, - # and we handled all possible cases already - raise ValueError(f"unreachable: num_devices is negative: {num_devices}") - - def check_disk_format(self) -> LegacyStatus: - """ - Check if volume is correctly formatted for export. - """ - try: - all_devices = self.cli.get_connected_devices() - - if len(all_devices) == 1: - device = self.cli.get_partitioned_device(all_devices[0]) - logger.info("Check if LUKS") - if not self.cli.is_luks_volume(device): - raise ExportException( - sdstatus=LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - ) - # We can support checking if a drive is already unlocked, but for - # backwards compatibility, this is the only expected status - # at this stage - return LegacyStatus.LEGACY_USB_ENCRYPTED - else: - logger.error("Multiple partitions not supported") - return LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - - except ExportException as ex: - logger.error( - f"Error encountered during disk format check: {ex.sdstatus.value}" - ) - # Return legacy status values for now for ongoing client compatibility - if ex.sdstatus in [s for s in Status]: - status = self._legacy_status(ex.sdstatus) - raise ExportException(sdstatus=status) - elif ex.sdstatus: - raise - else: - raise ExportException(sdstatus=LegacyStatus.LEGACY_USB_DISK_ERROR) - - def export(self): - """ - Export all files to target device. - """ - logger.info("Export archive is disk") - - try: - all_devices = self.cli.get_connected_devices() - - if len(all_devices) == 1: - device = self.cli.get_partitioned_device(all_devices[0]) - - # Decide what kind of volume it is - logger.info("Check if LUKS") - if self.cli.is_luks_volume(device): - volume = self.cli.get_luks_volume(device) - logger.info("Check if writable") - if not isinstance(volume, MountedVolume): - logger.info("Not writable-will try unlocking") - volume = self.cli.unlock_luks_volume( - volume, self.submission.encryption_key - ) - mounted_volume = self.cli.mount_volume(volume) - - logger.info(f"Export submission to {mounted_volume.mountpoint}") - self.cli.write_data_to_device( - self.submission.tmpdir, - self.submission.target_dirname, - mounted_volume, - ) - # This is SUCCESS_EXPORT, but the 0.7.0 client is not expecting - # a return status from a successful export operation. - # When the client is updated, we will return SUCCESS_EXPORT here. - - else: - # Another kind of drive: VeraCrypt/TC, or unsupported. - # For now this is an error--in future there will be support - # for additional encryption formats - logger.error(f"Export failed because {device} is not supported") - raise ExportException( - sdstatus=LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - ) - - except ExportException as ex: - logger.error( - f"Error encountered during disk format check: {ex.sdstatus.value}" - ) - # Return legacy status values for now for ongoing client compatibility - if ex.sdstatus in [s for s in Status]: - status = self._legacy_status(ex.sdstatus) - raise ExportException(sdstatus=status) - elif ex.sdstatus: - raise - else: - raise ExportException(sdstatus=LegacyStatus.LEGACY_ERROR_GENERIC) - - def _legacy_status(self, status: Status) -> LegacyStatus: - """ - Backwards-compatibility - status values that client (@0.7.0) is expecting. - """ - logger.info(f"Convert to legacy: {status.value}") - if status is Status.ERROR_MOUNT: - return LegacyStatus.LEGACY_ERROR_USB_MOUNT - elif status in [Status.ERROR_EXPORT, Status.ERROR_EXPORT_CLEANUP]: - return LegacyStatus.LEGACY_ERROR_USB_WRITE - elif status in [Status.ERROR_UNLOCK_LUKS, Status.ERROR_UNLOCK_GENERIC]: - return LegacyStatus.LEGACY_USB_BAD_PASSPHRASE - elif status in [ - Status.INVALID_DEVICE_DETECTED, - Status.MULTI_DEVICE_DETECTED, - ]: - return LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - # The other status values, such as Status.NO_DEVICE_DETECTED, are not returned by the - # CLI, so we don't need to check for them here - else: - return LegacyStatus.LEGACY_ERROR_GENERIC diff --git a/securedrop_export/disk/legacy_status.py b/securedrop_export/disk/legacy_status.py deleted file mode 100644 index 77f0fa6..0000000 --- a/securedrop_export/disk/legacy_status.py +++ /dev/null @@ -1,25 +0,0 @@ -from securedrop_export.status import BaseStatus - - -class Status(BaseStatus): - LEGACY_ERROR_GENERIC = "ERROR_GENERIC" - - # Legacy USB preflight related - LEGACY_USB_CONNECTED = "USB_CONNECTED" # Success - LEGACY_USB_NOT_CONNECTED = "USB_NOT_CONNECTED" - LEGACY_ERROR_USB_CHECK = "ERROR_USB_CHECK" - - # Legacy USB Disk preflight related errors - LEGACY_USB_ENCRYPTED = "USB_ENCRYPTED" # Success - LEGACY_USB_ENCRYPTION_NOT_SUPPORTED = "USB_ENCRYPTION_NOT_SUPPORTED" - - # Can be raised during disk format check - LEGACY_USB_DISK_ERROR = "USB_DISK_ERROR" - - # Legacy Disk export errors - LEGACY_USB_BAD_PASSPHRASE = "USB_BAD_PASSPHRASE" - LEGACY_ERROR_USB_MOUNT = "ERROR_USB_MOUNT" - LEGACY_ERROR_USB_WRITE = "ERROR_USB_WRITE" - - # New - SUCCESS_EXPORT = "SUCCESS_EXPORT" diff --git a/securedrop_export/disk/service.py b/securedrop_export/disk/service.py index 1db9a83..ced2447 100644 --- a/securedrop_export/disk/service.py +++ b/securedrop_export/disk/service.py @@ -1,11 +1,11 @@ import logging -from securedrop_export.archive import Archive - from .cli import CLI from .status import Status -from .volume import Volume, MountedVolume +from .volume import Volume, MountedVolume, EncryptionScheme +from securedrop_export.archive import Archive from securedrop_export.exceptions import ExportException +from typing import List, Optional, Tuple logger = logging.getLogger(__name__) @@ -13,12 +13,13 @@ class Service: """ - Checks that can be performed against the device(s). + Actions that can be performed against USB device(s). This is the "API" portion of the export workflow. """ - def __init__(self, cli: CLI): + def __init__(self, submission: Archive, cli: CLI = CLI()): self.cli = cli + self.submission = submission def scan_all_devices(self) -> Status: """ @@ -26,89 +27,141 @@ def scan_all_devices(self) -> Status: status. """ try: - all_devices = self.cli.get_connected_devices() - number_devices = len(all_devices) - - if number_devices == 0: - return Status.NO_DEVICE_DETECTED - elif number_devices > 1: - return Status.MULTI_DEVICE_DETECTED + volumes = self.cli.get_all_volumes() + if len(volumes) == 0: + status = Status.NO_DEVICE_DETECTED + elif len(volumes) > 1: + status = Status.MULTI_DEVICE_DETECTED else: - return self.scan_single_device(all_devices[0]) + status, _ = self._check_volumes(volumes) + return status except ExportException as ex: logger.error(ex) - return Status.DEVICE_ERROR # Could not assess devices + return Status.DEVICE_ERROR - def scan_single_device(self, blkid: str) -> Status: + def export(self) -> Status: """ - Given a string representing a single block device, see if it - is a suitable export target and return information about its state. + Export material to USB drive. """ try: - target = self.cli.get_partitioned_device(blkid) - - # See if it's a LUKS drive - if self.cli.is_luks_volume(target): - # Returns Volume or throws ExportException - self.volume = self.cli.get_luks_volume(target) - - # See if it's unlocked and mounted - if isinstance(self.volume, MountedVolume): - logger.debug("LUKS device is already mounted") - return Status.DEVICE_WRITABLE + volumes = self.cli.get_all_volumes() + status, target = self._check_volumes(volumes) + + if not target: + logger.error(f"Could not export, no available volumes ({status.value})") + return status + + # If it's writable, it's a MountedVolume object + if status == Status.DEVICE_WRITABLE and isinstance(target, MountedVolume): + return self._write_to_device(target, self.submission) + elif status == Status.DEVICE_LOCKED: + status, unlocked_volume = self._unlock_device( + self.submission.encryption_key, target + ) + if status == Status.DEVICE_WRITABLE and isinstance( + target, MountedVolume + ): + return self._write_to_device(target, self.submission) else: - # Prompt for passphrase - return Status.DEVICE_LOCKED + return status else: - # Might be VeraCrypt, might be madness - logger.info("LUKS drive not found") - - # Currently we don't support anything other than LUKS. - # In future, we will support TC/VC volumes as well - return Status.INVALID_DEVICE_DETECTED + logger.info(f"Could not export, volume check was {status.value}") + return status except ExportException as ex: - logger.error(ex) - if ex.sdstatus: - return ex.sdstatus + logger.debug(ex) + status = ex.sdstatus if ex.sdstatus is not None else Status.ERROR_EXPORT + logger.error(f"Enountered {status.value} while trying to export") + return status + + def _check_volumes( + self, all_volumes: List[Volume] + ) -> Tuple[Status, Optional[Volume]]: + """ + Check all potentially-compatible export devices (removable, + single-partition USB devices). + """ + number_devices = len(all_volumes) + if number_devices == 0: + return (Status.NO_DEVICE_DETECTED, None) + + # At some point we could consider returning all devices, so + # that the user can select their desired target device, but for + # now, only one attached device is supported. + elif number_devices > 1: + return (Status.MULTI_DEVICE_DETECTED, None) + else: + target_volume = all_volumes[0] + if isinstance(target_volume, MountedVolume): + logger.debug("Device is unlocked and mounted") + return (Status.DEVICE_WRITABLE, target_volume) + elif target_volume.encryption is EncryptionScheme.LUKS: + logger.debug("Device is locked LUKS drive") + return (Status.DEVICE_LOCKED, target_volume) else: - return Status.DEVICE_ERROR - - def unlock_device(self, passphrase: str, volume: Volume) -> Status: + logger.debug("Device status is unknown") + # This could be a locked VeraCrypt drive, or it could be an + # invalid drive (another encryption type). + # The client has to decide whether or not to try to use it + # (i.e. by prompting for passphrase) or to error. + # The simplest implementation will have the client error unless + # it is supplied with an already-unlocked VeraCrypt drive that + # it can use; a more sophisticated implementation might allow for + # a finite number of re-prompts before giving up, in case of + # user error with typing the password, and would return the volume + # (eg to print information about which drive failed). + return (Status.UNKNOWN_DEVICE_DETECTED, target_volume) + + def _unlock_device( + self, passphrase: str, volume: Volume + ) -> Tuple[Status, Optional[Volume]]: """ - Given provided passphrase, unlock target volume. Currently, - LUKS volumes are supported. + Given provided passphrase, unlock target volume. """ if volume: - try: - self.volume = self.cli.unlock_luks_volume(volume, passphrase) - - if isinstance(volume, MountedVolume): - return Status.DEVICE_WRITABLE - else: - return Status.ERROR_UNLOCK_LUKS + if volume.encryption is EncryptionScheme.LUKS: + try: + logger.info("Unlocking LUKS drive") + volume = self.cli.unlock_luks_volume(volume, passphrase) + if volume.unlocked: + logger.debug("Volume unlocked, attempt to mount") + # Returns MountedVolume or errors + return (Status.DEVICE_WRITABLE, self.cli.mount_volume(volume)) + except ExportException as ex: + logger.error(ex) + + return (Status.ERROR_UNLOCK_LUKS, volume) + + # Try to unlock another drive, opportunistically + # hoping it is VeraCrypt/TC. + else: + try: + logger.info( + "Encryption scheme is not LUKS. Attempt VeraCrypt unlock." + ) + volume = self.cli.attempt_unlock_veracrypt(volume, passphrase) + + if isinstance(volume, MountedVolume): + return (Status.DEVICE_WRITABLE, volume) + else: + # Might be VeraCrypt, might be madness + return (Status.ERROR_UNLOCK_GENERIC, volume) + except ExportException as ex: + logger.error(ex) + return (Status.ERROR_UNLOCK_GENERIC, volume) - except ExportException as ex: - logger.error(ex) - return Status.ERROR_UNLOCK_LUKS else: # Trying to unlock devices before having an active device logger.warning("Tried to unlock_device but no current volume detected.") - return Status.NO_DEVICE_DETECTED + return (Status.NO_DEVICE_DETECTED, None) - def write_to_device(self, volume: MountedVolume, data: Archive) -> Status: + def _write_to_device(self, volume: MountedVolume, data: Archive) -> Status: """ Export data to volume. CLI unmounts and locks volume on completion, even if export was unsuccessful. - """ - try: - self.cli.write_data_to_device(data.tmpdir, data.target_dirname, volume) - return Status.SUCCESS_EXPORT - except ExportException as ex: - logger.error(ex) - if ex.sdstatus: - return ex.sdstatus - else: - return Status.ERROR_EXPORT + Calling method should handle ExportException. + """ + self.cli.write_data_to_device(data.tmpdir, data.target_dirname, volume) + return Status.SUCCESS_EXPORT diff --git a/securedrop_export/disk/status.py b/securedrop_export/disk/status.py index 7ce7139..9767c4a 100644 --- a/securedrop_export/disk/status.py +++ b/securedrop_export/disk/status.py @@ -7,10 +7,13 @@ class Status(BaseStatus): "INVALID_DEVICE_DETECTED" # Multi partitioned, not encrypted, etc ) MULTI_DEVICE_DETECTED = "MULTI_DEVICE_DETECTED" # Not currently supported + UNKNOWN_DEVICE_DETECTED = ( + "UNKNOWN_DEVICE_DETECTED" # Badly-formatted USB or VeraCrypt/TC + ) - DEVICE_LOCKED = "DEVICE_LOCKED" # One device detected, and it's locked + DEVICE_LOCKED = "DEVICE_LOCKED" # One valid device detected, and it's locked DEVICE_WRITABLE = ( - "DEVICE_WRITABLE" # One device detected, and it's unlocked (and mounted) + "DEVICE_WRITABLE" # One valid device detected, and it's unlocked (and mounted) ) ERROR_UNLOCK_LUKS = "ERROR_UNLOCK_LUKS" diff --git a/securedrop_export/disk/volume.py b/securedrop_export/disk/volume.py index aae7d93..4a8aa60 100644 --- a/securedrop_export/disk/volume.py +++ b/securedrop_export/disk/volume.py @@ -9,6 +9,7 @@ class EncryptionScheme(Enum): UNKNOWN = 0 LUKS = 1 + VERACRYPT = 2 class Volume: @@ -16,8 +17,8 @@ class Volume: """ A volume on a removable device. - Volumes have a device name ("/dev/sdX"), a mapped name ("/dev/mapper/xxx"), an encryption - scheme, and a mountpoint if they are mounted. + Volumes have a device name ("/dev/sdX"), a mapped name that represents their intended + name ("/dev/mapper/xxx"), an encryption scheme, and a mountpoint. """ def __init__( @@ -45,6 +46,7 @@ def encryption(self, scheme: EncryptionScheme): def unlocked(self) -> bool: return ( self.mapped_name is not None + and self.encryption is not None and self.encryption is not EncryptionScheme.UNKNOWN and os.path.exists( os.path.join(self.MAPPED_VOLUME_PREFIX, self.mapped_name) diff --git a/securedrop_export/main.py b/securedrop_export/main.py index bc55ae1..e62bf00 100755 --- a/securedrop_export/main.py +++ b/securedrop_export/main.py @@ -11,8 +11,7 @@ from securedrop_export.directory import safe_mkdir from securedrop_export.exceptions import ExportException -from securedrop_export.disk import LegacyService as ExportService -from securedrop_export.disk import LegacyStatus +from securedrop_export.disk import Service as ExportService from securedrop_export.print import Service as PrintService from logging.handlers import TimedRotatingFileHandler, SysLogHandler @@ -43,6 +42,8 @@ def entrypoint(): Non-zero exit values will cause the system to try alternative solutions for mimetype handling, which we want to avoid. + + The program is called with the archive name as the first argument. """ status, submission = None, None @@ -54,7 +55,8 @@ def entrypoint(): # Halt if target file is absent if not os.path.exists(data_path): - logger.info("Archive is not found {}.".format(data_path)) + logger.error("Archive not found at provided path.") + logger.debug("Archive missing, path: {}".format(data_path)) status = Status.ERROR_FILE_NOT_FOUND else: @@ -125,7 +127,7 @@ def _configure_logging(): raise ExportException(sdstatus=Status.ERROR_LOGGING) from ex -def _start_service(submission: Archive) -> LegacyStatus: +def _start_service(submission: Archive) -> BaseStatus: """ Start print or export service. """ @@ -140,10 +142,11 @@ def _start_service(submission: Archive) -> LegacyStatus: # Export routines elif submission.command is Command.EXPORT: return ExportService(submission).export() - elif submission.command is Command.CHECK_USBS: - return ExportService(submission).check_connected_devices() - elif submission.command is Command.CHECK_VOLUME: - return ExportService(submission).check_disk_format() + elif ( + submission.command is Command.CHECK_USBS + or submission.command is Command.CHECK_VOLUME + ): + return ExportService(submission).scan_all_devices() # Unreachable raise ExportException( diff --git a/tests/disk/test_cli.py b/tests/disk/test_cli.py index 7989809..c9b333e 100644 --- a/tests/disk/test_cli.py +++ b/tests/disk/test_cli.py @@ -22,6 +22,12 @@ _SAMPLE_OUTPUT_USB = b"/dev/sda" # noqa _SAMPLE_LUKS_HEADER = b"\n\nUUID:\t123456-DEADBEEF" # noqa +_SAMPLE_CRYPTSETUP_STATUS_OUTPUT = b"/dev/mapper/vc is active and is in use.\n type: \ +TCRYPT\n cipher: aes-xts-plain64\n keysize: 512 bits\n key location: dm-crypt\n device: \ +/dev/sdc\n sector size: 512\n mode: read/write\n" +_SAMPLE_MOUNTED_VC_OUTPUT = b"/media/custom_mount" + +_EXAMPLE_MOUNTPOINT = "/media/usb" class TestCli: @@ -63,12 +69,12 @@ def _setup_usb_devices(self, mocker, disks, is_removable): # which matches what would happen if iterating through list of devices mocker.patch("subprocess.check_output", side_effect=is_removable) - def test_get_connected_devices(self, mocker): + def test__get_connected_devices(self, mocker): disks = [b"sda disk\n", b"sdb disk\n"] removable = [b"1\n", b"1\n"] self._setup_usb_devices(mocker, disks, removable) - result = self.cli.get_connected_devices() + result = self.cli._get_connected_devices() assert result[0] == "/dev/sda" and result[1] == "/dev/sdb" @@ -88,34 +94,34 @@ def test_get_removable_devices_none_removable(self, mocker): @mock.patch( "subprocess.Popen", side_effect=subprocess.CalledProcessError(1, "Popen") ) - def test_get_connected_devices_error(self, mocked_subprocess): + def test__get_connected_devices_error(self, mocked_subprocess): with pytest.raises(ExportException): - self.cli.get_connected_devices() + self.cli._get_connected_devices() @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_NO_PART) - def test_get_partitioned_device_no_partition(self, mocked_call): + def test__get_partitioned_device_no_partition(self, mocked_call): assert ( - self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) == _DEFAULT_USB_DEVICE + self.cli._get_partitioned_device(_DEFAULT_USB_DEVICE) == _DEFAULT_USB_DEVICE ) @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_ONE_PART) - def test_get_partitioned_device_one_partition(self, mocked_call): + def test__get_partitioned_device_one_partition(self, mocked_call): assert ( - self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) + self.cli._get_partitioned_device(_DEFAULT_USB_DEVICE) == _DEFAULT_USB_DEVICE + "1" ) @mock.patch("subprocess.check_output", return_value=_SAMPLE_OUTPUT_MULTI_PART) - def test_get_partitioned_device_multi_partition(self, mocked_call): + def test__get_partitioned_device_multi_partition(self, mocked_call): with pytest.raises(ExportException) as ex: - self.cli.get_partitioned_device(_SAMPLE_OUTPUT_MULTI_PART) + self.cli._get_partitioned_device(_SAMPLE_OUTPUT_MULTI_PART) assert ex.value.sdstatus is Status.INVALID_DEVICE_DETECTED @mock.patch("subprocess.check_output", return_value=None) - def test_get_partitioned_device_lsblk_error(self, mocked_subprocess): + def test__get_partitioned_device_lsblk_error(self, mocked_subprocess): with pytest.raises(ExportException) as ex: - self.cli.get_partitioned_device(_SAMPLE_OUTPUT_ONE_PART) + self.cli._get_partitioned_device(_SAMPLE_OUTPUT_ONE_PART) assert ex.value.sdstatus is Status.DEVICE_ERROR @@ -123,10 +129,10 @@ def test_get_partitioned_device_lsblk_error(self, mocked_subprocess): "subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "check_output"), ) - def test_get_partitioned_device_multi_partition_error(self, mocked_call): + def test__get_partitioned_device_multi_partition_error(self, mocked_call): # Make sure we wrap CalledProcessError and throw our own exception with pytest.raises(ExportException) as ex: - self.cli.get_partitioned_device(_DEFAULT_USB_DEVICE) + self.cli._get_partitioned_device(_DEFAULT_USB_DEVICE) assert ex.value.sdstatus is Status.DEVICE_ERROR @@ -186,16 +192,16 @@ def test__get_luks_name_from_headers_error(self, mocked_subprocess): @mock.patch("os.path.exists", return_value=True) @mock.patch("subprocess.check_output", return_value=_SAMPLE_LUKS_HEADER) - def test_get_luks_volume_already_unlocked(self, mocked_subprocess, mocked_os_call): - result = self.cli.get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) + def test__get_luks_volume_already_unlocked(self, mocked_subprocess, mocked_os_call): + result = self.cli._get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) assert result.encryption is EncryptionScheme.LUKS assert result.unlocked @mock.patch("os.path.exists", return_value=False) @mock.patch("subprocess.check_output", return_value=_SAMPLE_LUKS_HEADER) - def test_get_luks_volume_still_locked(self, mocked_subprocess, mocked_os_call): - result = self.cli.get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) + def test__get_luks_volume_still_locked(self, mocked_subprocess, mocked_os_call): + result = self.cli._get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) assert result.encryption is EncryptionScheme.LUKS assert not result.unlocked @@ -204,9 +210,9 @@ def test_get_luks_volume_still_locked(self, mocked_subprocess, mocked_os_call): "subprocess.check_output", side_effect=subprocess.CalledProcessError(1, "check_output"), ) - def test_get_luks_volume_error(self, mocked_subprocess): + def test__get_luks_volume_error(self, mocked_subprocess): with pytest.raises(ExportException) as ex: - self.cli.get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) + self.cli._get_luks_volume(_DEFAULT_USB_DEVICE_ONE_PART) assert ex.value.sdstatus is Status.DEVICE_ERROR @@ -215,6 +221,7 @@ def test_unlock_luks_volume_success(self, mock_path, mocker): mock_popen = mocker.MagicMock() mock_popen_communicate = mocker.MagicMock() mock_popen.returncode = 0 + mock_popen.return_value = (("stdout", "stderr"), 0) mocker.patch("subprocess.Popen", return_value=mock_popen) mocker.patch( @@ -232,7 +239,7 @@ def test_unlock_luks_volume_success(self, mock_path, mocker): assert result.unlocked @mock.patch("os.path.exists", return_value=True) - def test_unlock_luks_volume_not_luks(self, mocker): + def test_unlock_luks_volume_unknown(self, mocker): mock_popen = mocker.MagicMock() mock_popen.communicate = mocker.MagicMock() mock_popen.communicate.returncode = 1 # An error unlocking @@ -284,19 +291,6 @@ def test_unlock_luks_volume_luksOpen_exception(self, mocked_subprocess): assert ex.value.sdstatus is Status.DEVICE_ERROR - @mock.patch("os.path.exists", return_value=True) - @mock.patch("subprocess.check_output", return_value=b"\n") - @mock.patch("subprocess.check_call", return_value=0) - def test_mount_volume(self, mocked_call, mocked_output, mocked_path): - vol = Volume( - device_name=_DEFAULT_USB_DEVICE_ONE_PART, - mapped_name=_PRETEND_LUKS_ID, - encryption=EncryptionScheme.LUKS, - ) - mv = self.cli.mount_volume(vol) - assert isinstance(mv, MountedVolume) - assert mv.mountpoint is self.cli._DEFAULT_MOUNTPOINT - @mock.patch("os.path.exists", return_value=True) @mock.patch( "subprocess.check_output", return_value=b"/dev/pretend/luks-id-123456\n" @@ -315,17 +309,21 @@ def test_mount_volume_already_mounted( assert isinstance(result, MountedVolume) @mock.patch("os.path.exists", return_value=True) - @mock.patch("subprocess.check_output", return_value=b"\n") + @mock.patch("subprocess.check_output", return_value=b"Mounted $device at $path") @mock.patch("subprocess.check_call", return_value=0) - def test_mount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path): + @mock.patch("securedrop_export.disk.cli.CLI._get_mountpoint", return_value=None) + def test_mount_volume_mkdir( + self, mocked_output, mocked_subprocess, mocked_path, mocked_mountpoint + ): md = Volume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, encryption=EncryptionScheme.LUKS, ) mv = self.cli.mount_volume(md) - assert mv.mapped_name == _PRETEND_LUKS_ID assert isinstance(mv, MountedVolume) + assert mv.mapped_name == _PRETEND_LUKS_ID + assert mv.mountpoint == "$path" @mock.patch("subprocess.check_output", return_value=b"\n") @mock.patch( @@ -344,47 +342,13 @@ def test_mount_volume_error(self, mocked_subprocess, mocked_output): assert ex.value.sdstatus is Status.ERROR_MOUNT - @mock.patch("os.path.exists", return_value=False) - @mock.patch( - "subprocess.check_call", - side_effect=subprocess.CalledProcessError(1, "check_call"), - ) - def test_mount_at_mountpoint_mkdir_error(self, mocked_subprocess, mocked_path): - md = Volume( - device_name=_DEFAULT_USB_DEVICE_ONE_PART, - mapped_name=_PRETEND_LUKS_ID, - encryption=EncryptionScheme.LUKS, - ) - - with pytest.raises(ExportException) as ex: - self.cli._mount_at_mountpoint(md, self.cli._DEFAULT_MOUNTPOINT) - - assert ex.value.sdstatus is Status.ERROR_MOUNT - - @mock.patch("os.path.exists", return_value=True) - @mock.patch( - "subprocess.check_call", - side_effect=subprocess.CalledProcessError(1, "check_call"), - ) - def test_mount_at_mountpoint_mounting_error(self, mocked_subprocess, mocked_path): - md = Volume( - device_name=_DEFAULT_USB_DEVICE_ONE_PART, - mapped_name=_PRETEND_LUKS_ID, - encryption=EncryptionScheme.LUKS, - ) - - with pytest.raises(ExportException) as ex: - self.cli._mount_at_mountpoint(md, self.cli._DEFAULT_MOUNTPOINT) - - assert ex.value.sdstatus is Status.ERROR_MOUNT - @mock.patch("os.path.exists", return_value=True) @mock.patch("subprocess.check_call", return_value=0) def test__unmount_volume(self, mocked_subprocess, mocked_mountpath): mounted = MountedVolume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, - mountpoint=self.cli._DEFAULT_MOUNTPOINT, + mountpoint="/media/usb", encryption=EncryptionScheme.LUKS, ) @@ -400,7 +364,7 @@ def test__unmount_volume_error(self, mocked_subprocess, mocked_mountpath): mounted = MountedVolume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, - mountpoint=self.cli._DEFAULT_MOUNTPOINT, + mountpoint=_EXAMPLE_MOUNTPOINT, encryption=EncryptionScheme.LUKS, ) @@ -456,7 +420,7 @@ def test_write_to_disk(self, mock_check_call): vol = MountedVolume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, - mountpoint=self.cli._DEFAULT_MOUNTPOINT, + mountpoint=_EXAMPLE_MOUNTPOINT, encryption=EncryptionScheme.LUKS, ) @@ -481,7 +445,7 @@ def test_write_to_disk_error_still_does_cleanup(self, mock_call): vol = MountedVolume( device_name=_DEFAULT_USB_DEVICE_ONE_PART, mapped_name=_PRETEND_LUKS_ID, - mountpoint=self.cli._DEFAULT_MOUNTPOINT, + mountpoint=_EXAMPLE_MOUNTPOINT, encryption=EncryptionScheme.LUKS, ) submission = Archive("testfile") @@ -515,7 +479,7 @@ def test_cleanup_drive_and_tmpdir(self, mock_subprocess, mocked_path): mapped_name=_PRETEND_LUKS_ID, encryption=EncryptionScheme.LUKS, ) - mv = MountedVolume.from_volume(vol, mountpoint=self.cli._DEFAULT_MOUNTPOINT) + mv = MountedVolume.from_volume(vol, mountpoint=_EXAMPLE_MOUNTPOINT) close_patch = mock.patch.object(self.cli, "_close_luks_volume") remove_tmpdir_patch = mock.patch.object(self.cli, "_remove_temp_directory") @@ -565,3 +529,58 @@ def test_mount_mkdir_fails(self, mocked_path): self.cli.mount_volume(vol) assert ex.value.sdstatus is Status.ERROR_MOUNT + + def test_mount_fails_with_locked_device(self): + vol = Volume( + device_name=_DEFAULT_USB_DEVICE_ONE_PART, + mapped_name=_PRETEND_LUKS_ID, + encryption=EncryptionScheme.LUKS, + ) + mock.patch.object(vol, "unlocked", return_value=False) + + with pytest.raises(ExportException) as ex: + self.cli.mount_volume(vol) + + assert ex.value.sdstatus == Status.ERROR_MOUNT + + @mock.patch("os.path.exists", return_value=True) + @mock.patch( + "subprocess.check_output", + side_effect=[ + b"vc", + _SAMPLE_CRYPTSETUP_STATUS_OUTPUT, + _SAMPLE_MOUNTED_VC_OUTPUT, + ], + ) + def test_get_unlocked_veracrypt_mounted(self, mock_subprocess, mock_os): + v = self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc") + + assert v.unlocked + assert v.mountpoint == _SAMPLE_MOUNTED_VC_OUTPUT.decode("utf-8") + + @mock.patch("os.path.exists", return_value=True) + @mock.patch( + "subprocess.check_output", + side_effect=subprocess.CalledProcessError(1, "Oh no!"), + ) + def test_get_unlocked_veracrypt_lsblk_error(self, mock_subprocess, mock_os): + with pytest.raises(ExportException) as ex: + self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc") + + assert ex.value.sdstatus == Status.DEVICE_ERROR + + @mock.patch("os.path.exists", return_value=True) + @mock.patch("subprocess.check_output", return_value=b"sdc disk\n") + def test_get_unlocked_veracrypt_no_vc_drive(self, mock_subprocess, mock_os): + with pytest.raises(ExportException) as ex: + self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc") + + assert ex.value.sdstatus == Status.UNKNOWN_DEVICE_DETECTED + + @mock.patch("os.path.exists", return_value=True) + @mock.patch("subprocess.check_output", return_value=b"\n") + def test_get_unlocked_veracrypt_empty_lsblk_error(self, mock_subprocess, mock_os): + with pytest.raises(ExportException) as ex: + self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc") + + assert ex.value.sdstatus == Status.UNKNOWN_DEVICE_DETECTED diff --git a/tests/disk/test_service.py b/tests/disk/test_service.py index d7053e1..9843ca7 100644 --- a/tests/disk/test_service.py +++ b/tests/disk/test_service.py @@ -1,14 +1,12 @@ -import pytest from unittest import mock import os import tempfile from securedrop_export.exceptions import ExportException -from securedrop_export.disk.legacy_status import Status as LegacyStatus -from securedrop_export.disk.status import Status as Status +from securedrop_export.disk.status import Status from securedrop_export.disk.volume import Volume, MountedVolume, EncryptionScheme from securedrop_export.archive import Archive, Metadata -from securedrop_export.disk.legacy_service import Service +from securedrop_export.disk.service import Service from securedrop_export.disk.cli import CLI SAMPLE_OUTPUT_LSBLK_NO_PART = b"disk\ncrypt" # noqa @@ -19,7 +17,7 @@ class TestExportService: @classmethod def setup_class(cls): - cls.mock_cli = mock.MagicMock(CLI) + cls.mock_cli = mock.MagicMock(spec=CLI) cls.mock_submission = cls._setup_submission() cls.mock_luks_volume_unmounted = Volume( @@ -34,6 +32,19 @@ def setup_class(cls): encryption=EncryptionScheme.LUKS, ) + cls.mock_vc_volume_mounted = MountedVolume( + device_name=SAMPLE_OUTPUT_USB, + mapped_name="mock-veracrypt-vol", + encryption=EncryptionScheme.VERACRYPT, + mountpoint="/media/usb/", + ) + + cls.mock_vc_volume_unmounted = Volume( + device_name=SAMPLE_OUTPUT_USB, + mapped_name=None, + encryption=EncryptionScheme.UNKNOWN, + ) + cls.service = Service(cls.mock_submission, cls.mock_cli) @classmethod @@ -65,139 +76,81 @@ def setup_method(self, method): errors. `teardown_method()` will reset the side effects so they do not affect subsequent test methods. """ - self.mock_cli.get_connected_devices.return_value = [SAMPLE_OUTPUT_USB] - self.mock_cli.get_partitioned_device.return_value = ( - SAMPLE_OUTPUT_USB_PARTITIONED - ) - self.mock_cli.get_luks_volume.return_value = self.mock_luks_volume_unmounted + self.mock_cli._get_luks_volume.return_value = self.mock_luks_volume_unmounted self.mock_cli.mount_volume.return_value = self.mock_luks_volume_mounted + self.mock_cli.get_all_volumes.return_value = [self.mock_luks_volume_unmounted] def teardown_method(self, method): self.mock_cli.reset_mock(return_value=True, side_effect=True) - def test_check_usb(self): - status = self.service.check_connected_devices() + def test_scan_all_device_is_locked_luks(self): + status = self.service.scan_all_devices() - assert status is LegacyStatus.LEGACY_USB_CONNECTED + assert status == Status.DEVICE_LOCKED - def test_no_devices_connected(self): - self.mock_cli.get_connected_devices.return_value = [] - with pytest.raises(ExportException) as ex: - self.service.check_connected_devices() + def test_scan_all_no_devices_connected(self): + self.mock_cli.get_all_volumes.return_value = [] - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_NOT_CONNECTED + assert self.service.scan_all_devices() == Status.NO_DEVICE_DETECTED - def test_too_many_devices_connected(self): - self.mock_cli.get_connected_devices.return_value = [ + def test_scan_all_too_many_devices_connected(self): + self.mock_cli.get_all_volumes.return_value = [ SAMPLE_OUTPUT_USB, "/dev/sdb", ] - with pytest.raises(ExportException) as ex: - self.service.check_connected_devices() - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + assert self.service.scan_all_devices() == Status.MULTI_DEVICE_DETECTED - def test_device_is_not_luks(self): - self.mock_cli.is_luks_volume.return_value = False + def test_scan_all_devices_error(self): + self.mock_cli.get_all_volumes.side_effect = ExportException("zounds!") - # When VeraCrypt is supported, this will no longer be an exception - # and the return status will change - with pytest.raises(ExportException) as ex: - self.service.check_disk_format() + assert self.service.scan_all_devices() == Status.DEVICE_ERROR - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + def test_scan_all_device_is_not_luks_and_unlocked(self): + self.mock_cli.get_all_volumes.return_value = [self.mock_vc_volume_mounted] - def test_check_usb_error(self): - self.mock_cli.get_connected_devices.side_effect = ExportException( - sdstatus=LegacyStatus.LEGACY_ERROR_USB_CHECK - ) + assert self.service.scan_all_devices() == Status.DEVICE_WRITABLE - with pytest.raises(ExportException) as ex: - self.service.check_connected_devices() + def test_scan_all_device_is_not_luks_and_not_unlocked_error(self): + self.mock_cli.get_all_volumes.return_value = [self.mock_vc_volume_unmounted] - assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_CHECK + assert self.service.scan_all_devices() == Status.UNKNOWN_DEVICE_DETECTED - def test_check_disk_format(self): - status = self.service.check_disk_format() - - assert status is LegacyStatus.LEGACY_USB_ENCRYPTED - - def test_check_disk_format_error(self): - self.mock_cli.get_partitioned_device.side_effect = ExportException( - sdstatus=Status.INVALID_DEVICE_DETECTED + self.mock_cli.get_all_volumes.side_effect = ExportException( + sdstatus=Status.DEVICE_ERROR ) - with pytest.raises(ExportException) as ex: - self.service.check_disk_format() - - # We still return the legacy status for now - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED + assert self.service.scan_all_devices() == Status.DEVICE_ERROR - def test_export(self): - # Currently, a successful export does not return a success status. - # When the client is updated, this will change to assert EXPORT_SUCCESS - # is returned. - self.service.export() + def test_scan_all_device_is_locked_veracrypt_volume(self): + self.mock_cli.get_all_volumes.return_value = [self.mock_vc_volume_unmounted] - def test_export_disk_not_supported(self): - self.mock_cli.is_luks_volume.return_value = False + assert self.service.scan_all_devices() == Status.UNKNOWN_DEVICE_DETECTED - with pytest.raises(ExportException) as ex: - self.service.export() + def test_export_already_mounted_no_cleanup(self): + self.mock_cli.get_all_volumes.return_value = [self.mock_luks_volume_mounted] + mock_write = mock.MagicMock() + self.mock_cli.write_data_to_device = mock_write + result = self.service.export() - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_ENCRYPTION_NOT_SUPPORTED - - def test_export_write_error(self): - self.mock_cli.is_luks_volume.return_value = True - self.mock_cli.write_data_to_device.side_effect = ExportException( - sdstatus=LegacyStatus.LEGACY_ERROR_USB_WRITE + assert result == Status.SUCCESS_EXPORT + mock_write.assert_called_once_with( + self.mock_submission.tmpdir, + self.mock_submission.target_dirname, + self.mock_luks_volume_mounted, ) - with pytest.raises(ExportException) as ex: - self.service.export() - - assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_WRITE - - def test_export_throws_new_exception_return_legacy_status(self): - self.mock_cli.get_connected_devices.side_effect = ExportException( - sdstatus=Status.ERROR_MOUNT - ) - - with pytest.raises(ExportException) as ex: - self.service.export() - - assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_MOUNT - @mock.patch("os.path.exists", return_value=True) - def test_write_error_returns_legacy_status(self, mock_path): - self.mock_cli.is_luks_volume.return_value = True + def test_export_write_error(self, mock_path): + self.mock_cli.get_all_volumes.return_value = [self.mock_luks_volume_mounted] self.mock_cli.write_data_to_device.side_effect = ExportException( sdstatus=Status.ERROR_EXPORT ) - with pytest.raises(ExportException) as ex: - self.service.export() - - assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_USB_WRITE - - @mock.patch("os.path.exists", return_value=True) - def test_unlock_error_returns_legacy_status(self, mock_path): - self.mock_cli.unlock_luks_volume.side_effect = ExportException( - sdstatus=Status.ERROR_UNLOCK_LUKS - ) - - with pytest.raises(ExportException) as ex: - self.service.export() - - assert ex.value.sdstatus is LegacyStatus.LEGACY_USB_BAD_PASSPHRASE + assert self.service.export() == Status.ERROR_EXPORT @mock.patch("os.path.exists", return_value=True) - def test_unexpected_error_returns_legacy_status_generic(self, mock_path): - self.mock_cli.unlock_luks_volume.side_effect = ExportException( - sdstatus=Status.DEVICE_ERROR - ) - - with pytest.raises(ExportException) as ex: - self.service.export() + def test_export_unlock_error(self, mock_path): + self.mock_cli.unlock_luks_volume.side_effect = ExportException("oh no!") - assert ex.value.sdstatus is LegacyStatus.LEGACY_ERROR_GENERIC + assert self.service.export() == Status.ERROR_UNLOCK_LUKS diff --git a/tests/files/sample_export.sd_export b/tests/files/sample_export.sd_export new file mode 100644 index 0000000..dab6433 Binary files /dev/null and b/tests/files/sample_export.sd_export differ diff --git a/tests/files/sample_print.sd_export b/tests/files/sample_print.sd_export new file mode 100644 index 0000000..911113c Binary files /dev/null and b/tests/files/sample_print.sd_export differ diff --git a/tests/test_archive.py b/tests/test_archive.py index 57791a8..7c09b83 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -22,7 +22,6 @@ def test_extract_tarball(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -73,7 +72,6 @@ def test_extract_tarball_with_symlink(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -108,7 +106,6 @@ def test_extract_tarball_raises_if_doing_path_traversal(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -147,7 +144,6 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_dir(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -184,7 +180,6 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_symlink(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -223,7 +218,6 @@ def test_extract_tarball_raises_if_doing_path_traversal_with_symlink_linkname(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -260,7 +254,6 @@ def test_extract_tarball_raises_if_name_has_unsafe_absolute_path(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -303,7 +296,6 @@ def test_extract_tarball_raises_if_name_has_unsafe_absolute_path_with_symlink(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -347,7 +339,6 @@ def test_extract_tarball_raises_if_name_has_unsafe_absolute_path_with_symlink_to with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -380,7 +371,6 @@ def test_extract_tarball_raises_if_linkname_has_unsafe_absolute_path(): with tarfile.open(archive_path, "w:gz") as archive: metadata = { "device": "disk", - "encryption_method": "luks", "encryption_key": "test", } metadata_str = json.dumps(metadata) @@ -426,7 +416,6 @@ def test_valid_printer_test_config(capsys): config = Metadata(temp_folder).validate() assert config.encryption_key is None - assert config.encryption_method is None def test_valid_printer_config(capsys): @@ -439,23 +428,6 @@ def test_valid_printer_config(capsys): config = Metadata(temp_folder).validate() assert config.encryption_key is None - assert config.encryption_method is None - - -def test_invalid_encryption_config(capsys): - Archive("testfile") - - temp_folder = tempfile.mkdtemp() - metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) - with open(metadata, "w") as f: - f.write( - '{"device": "disk", "encryption_method": "base64", "encryption_key": "hunter1"}' - ) - - with pytest.raises(ExportException) as ex: - Metadata(temp_folder).validate() - - assert ex.value.sdstatus is Status.ERROR_ARCHIVE_METADATA def test_invalid_config(capsys): @@ -491,14 +463,11 @@ def test_valid_encryption_config(capsys): temp_folder = tempfile.mkdtemp() metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) with open(metadata, "w") as f: - f.write( - '{"device": "disk", "encryption_method": "luks", "encryption_key": "hunter1"}' - ) + f.write('{"device": "disk", "encryption_key": "hunter1"}') config = Metadata(temp_folder).validate() assert config.encryption_key == "hunter1" - assert config.encryption_method == "luks" @mock.patch("json.loads", side_effect=json.decoder.JSONDecodeError("ugh", "badjson", 0)) diff --git a/tests/test_main.py b/tests/test_main.py index 06da9ef..08ceeb8 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -1,13 +1,13 @@ import pytest -import tempfile -import os from unittest import mock import shutil +from pathlib import Path from securedrop_export.archive import Archive, Metadata, Status as ArchiveStatus from securedrop_export.status import BaseStatus from securedrop_export.command import Command from securedrop_export.exceptions import ExportException +from securedrop_export.disk.status import Status as ExportStatus from securedrop_export.main import ( Status, @@ -18,107 +18,123 @@ _configure_logging, ) -SUBMISSION_SAMPLE_ARCHIVE = "pretendfile.tar.gz" +_PRINT_SAMPLE_ARCHIVE = "sample_print.sd_export" +_EXPORT_SAMPLE_ARCHIVE = "sample_export.sd_export" class TestMain: def setup_method(self, method): - # This can't be a class method, since we expect sysexit during this test suite, - # which - self.submission = Archive("pretendfile.tar.gz") - assert os.path.exists(self.submission.tmpdir) + # These can't be class setup methods, since we expect sysexit during this test suite + self.print_archive_path = Path.cwd() / "tests/files" / _PRINT_SAMPLE_ARCHIVE + assert self.print_archive_path.exists() + + self.export_archive_path = Path.cwd() / "tests/files" / _EXPORT_SAMPLE_ARCHIVE + assert self.export_archive_path.exists() + + self.print_submission = Archive(str(self.print_archive_path)) + assert Path(self.print_submission.tmpdir).exists() + + self.export_submission = Archive(str(self.export_archive_path)) + assert Path(self.export_submission.tmpdir).exists() def teardown_method(self, method): - if os.path.exists(self.submission.tmpdir): - shutil.rmtree(self.submission.tmpdir) - self.submission = None + if Path(self.print_submission.tmpdir).exists(): + shutil.rmtree(self.print_submission.tmpdir) + + if Path(self.export_submission.tmpdir).exists(): + shutil.rmtree(self.export_submission.tmpdir) - def test_exit_gracefully_no_exception(self, capsys): + def _did_exit_gracefully(self, exit, capsys, status: BaseStatus) -> bool: + """ + Helper. True if exited with 0, writing supplied status to stderr. + """ + captured = capsys.readouterr() + + return ( + exit.value.code == 0 + and captured.err.rstrip().endswith(status.value) + and captured.out == "" + ) + + def test__exit_gracefully_no_exception(self, capsys): with pytest.raises(SystemExit) as sysexit: - _exit_gracefully(self.submission, Status.ERROR_GENERIC) + # `ERROR_GENERIC` is just a placeholder status here + _exit_gracefully(self.print_submission, Status.ERROR_GENERIC) assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_GENERIC) - def test_exit_gracefully_exception(self, capsys): - with pytest.raises(SystemExit) as sysexit: - _exit_gracefully(self.submission, Status.ERROR_GENERIC) + @mock.patch( + "securedrop_export.main.shutil.rmtree", + side_effect=FileNotFoundError("oh no", 0), + ) + def test__exit_gracefully_even_with_cleanup_exception(self, mock_rm, capsys): + with mock.patch( + "sys.argv", ["qvm-send-to-usb", self.export_archive_path] + ), mock.patch( + "securedrop_export.main._start_service", + return_value=ExportStatus.SUCCESS_EXPORT, + ), pytest.raises( + SystemExit + ) as sysexit: + entrypoint() - # A graceful exit means a return code of 0 assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_GENERIC) + def test_entrypoint_success(self, capsys): + with mock.patch( + "sys.argv", ["qvm-send-to-usb", self.export_archive_path] + ), mock.patch( + "securedrop_export.main._start_service", + return_value=ExportStatus.SUCCESS_EXPORT, + ), pytest.raises( + SystemExit + ) as sysexit: + entrypoint() + + assert self._did_exit_gracefully(sysexit, capsys, ExportStatus.SUCCESS_EXPORT) + @pytest.mark.parametrize("status", [s for s in Status]) - def test_write_status(self, status, capsys): + def test__write_status_success(self, status, capsys): _write_status(status) captured = capsys.readouterr() assert captured.err == status.value + "\n" @pytest.mark.parametrize("invalid_status", ["foo", ";ls", "&& echo 0", None]) - def test_write_status_error(self, invalid_status, capsys): + def test__write_status_will_not_write_bad_value(self, invalid_status, capsys): with pytest.raises(ValueError): _write_status(Status(invalid_status)) - def _did_exit_gracefully(self, exit, capsys, status: BaseStatus) -> bool: - """ - Helper. True if exited with 0, writing supplied status to stderr. - """ captured = capsys.readouterr() + assert captured.err == "" + assert captured.out == "" - return ( - exit.value.code == 0 - and captured.err.rstrip().endswith(status.value) - and captured.out == "" - ) - - @pytest.mark.parametrize("command", list(Command)) - @mock.patch("securedrop_export.main._configure_logging") - @mock.patch("os.path.exists", return_value=True) - def test_entrypoint_success_start_service(self, mock_log, mock_path, command): - metadata = os.path.join(self.submission.tmpdir, Metadata.METADATA_FILE) - - with open(metadata, "w") as f: - f.write(f'{{"device": "{command.value}", "encryption_method": "luks"}}') - + def test_entrypoint_success_start_service(self): with mock.patch( - "sys.argv", ["qvm-send-to-usb", SUBMISSION_SAMPLE_ARCHIVE] + "sys.argv", ["qvm-send-to-usb", self.export_archive_path] ), mock.patch( "securedrop_export.main._start_service" - ) as mock_service, mock.patch( - "securedrop_export.main.Archive.extract_tarball", - return_value=self.submission, - ), pytest.raises( + ) as mock_service, pytest.raises( SystemExit ): entrypoint() - if command is not Command.START_VM: - assert self.submission.command == command - assert mock_service.call_args[0][0].archive == SUBMISSION_SAMPLE_ARCHIVE - mock_service.assert_called_once_with(self.submission) - - def test_valid_printer_test_config(self, capsys): - Archive("testfile") - temp_folder = tempfile.mkdtemp() - metadata = os.path.join(temp_folder, Metadata.METADATA_FILE) - with open(metadata, "w") as f: - f.write('{"device": "printer-test"}') + assert mock_service.call_args[0][0].archive == self.export_archive_path + assert mock_service.call_args[0][0].command == Command.EXPORT - config = Metadata(temp_folder).validate() + def test_validate_metadata(self): + for archive_path in [self.print_archive_path, self.export_archive_path]: + archive = Archive(archive_path) + extracted = archive.extract_tarball() - assert config.encryption_key is None - assert config.encryption_method is None + assert Metadata(extracted.tmpdir).validate() @mock.patch( "securedrop_export.archive.safe_extractall", side_effect=ValueError("A tarball problem!"), ) - @mock.patch("securedrop_export.main.os.path.exists", return_value=True) - @mock.patch("securedrop_export.main.shutil.rmtree") - @mock.patch("securedrop_export.main._configure_logging") - def test_entrypoint_failure_extraction( - self, mock_log, mock_rm, mock_path, mock_extract, capsys - ): + def test_entrypoint_failure_extraction(self, mock_extract, capsys): with mock.patch( - "sys.argv", ["qvm-send-to-usb", SUBMISSION_SAMPLE_ARCHIVE] + "sys.argv", ["qvm-send-to-usb", self.export_archive_path] ), pytest.raises(SystemExit) as sysexit: entrypoint() @@ -149,9 +165,10 @@ def test_entrypoint_fails_unexpected(self, mock_mkdir, capsys): assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_GENERIC) - @mock.patch("os.path.exists", return_value=False) - def test_entrypoint_archive_path_fails(self, mock_path, capsys): - with pytest.raises(SystemExit) as sysexit: + def test_entrypoint_archive_path_fails(self, capsys): + with mock.patch( + "sys.argv", ["qvm-send-to-usb", "THIS_FILE_DOES_NOT_EXIST.sd_export"] + ), pytest.raises(SystemExit) as sysexit: entrypoint() assert self._did_exit_gracefully(sysexit, capsys, Status.ERROR_FILE_NOT_FOUND) @@ -171,14 +188,15 @@ def test__start_service_calls_correct_services(self, command): if command is Command.START_VM: pytest.skip("Command does not start a service") - self.submission.command = command + mock_submission = Archive("mock_submission.sd_export") + mock_submission.command = command with mock.patch("securedrop_export.main.PrintService") as ps, mock.patch( "securedrop_export.main.ExportService" ) as es: - _start_service(self.submission) + _start_service(mock_submission) if command in [Command.PRINT, Command.PRINTER_TEST, Command.PRINTER_PREFLIGHT]: - assert ps.call_args[0][0] is self.submission + assert ps.call_args[0][0] is mock_submission else: - assert es.call_args[0][0] is self.submission + assert es.call_args[0][0] is mock_submission