This repository has been archived by the owner on Jan 5, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #41 from freedomofpress/sdexport
Separate print, USB export related actions into their own classes
- Loading branch information
Showing
13 changed files
with
885 additions
and
740 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,230 @@ | ||
import logging | ||
import os | ||
import subprocess | ||
import sys | ||
|
||
from typing import List | ||
|
||
from securedrop_export.export import ExportAction | ||
from securedrop_export.exceptions import ExportStatus | ||
|
||
MOUNTPOINT = "/media/usb" | ||
ENCRYPTED_DEVICE = "encrypted_volume" | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class DiskAction(ExportAction): | ||
def __init__(self, submission): | ||
self.submission = submission | ||
self.device = None # Optional[str] | ||
self.mountpoint = MOUNTPOINT | ||
self.encrypted_device = ENCRYPTED_DEVICE | ||
|
||
def run(self) -> None: | ||
"""Run logic""" | ||
raise NotImplementedError | ||
|
||
def check_usb_connected(self, exit=False) -> None: | ||
usb_devices = self._get_connected_usbs() | ||
|
||
if len(usb_devices) == 0: | ||
logger.info('0 USB devices connected') | ||
self.submission.exit_gracefully(ExportStatus.USB_NOT_CONNECTED.value) | ||
elif len(usb_devices) == 1: | ||
logger.info('1 USB device connected') | ||
self.device = usb_devices[0] | ||
if exit: | ||
self.submission.exit_gracefully(ExportStatus.USB_CONNECTED.value) | ||
elif len(usb_devices) > 1: | ||
logger.info('>1 USB devices connected') | ||
# Return generic error until freedomofpress/securedrop-export/issues/25 | ||
self.submission.exit_gracefully(ExportStatus.ERROR_GENERIC.value) | ||
|
||
def _get_connected_usbs(self) -> List[str]: | ||
logger.info('Performing usb preflight') | ||
# List all block devices attached to VM that are disks and not partitions. | ||
try: | ||
lsblk = subprocess.Popen(["lsblk", "-o", "NAME,TYPE"], stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE) | ||
grep = subprocess.Popen(["grep", "disk"], stdin=lsblk.stdout, stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE) | ||
command_output = grep.stdout.readlines() | ||
|
||
# The first word in each element of the command_output list is the device name | ||
attached_devices = [x.decode('utf8').split()[0] for x in command_output] | ||
except subprocess.CalledProcessError: | ||
self.submission.exit_gracefully(ExportStatus.ERROR_GENERIC.value) | ||
|
||
# Determine which are USBs by selecting those block devices that are removable disks. | ||
usb_devices = [] | ||
for device in attached_devices: | ||
try: | ||
removable = subprocess.check_output( | ||
["cat", "/sys/class/block/{}/removable".format(device)], | ||
stderr=subprocess.PIPE) | ||
is_removable = int(removable.decode('utf8').strip()) | ||
except subprocess.CalledProcessError: | ||
is_removable = False | ||
|
||
if is_removable: | ||
usb_devices.append("/dev/{}".format(device)) | ||
|
||
return usb_devices | ||
|
||
def set_extracted_device_name(self): | ||
try: | ||
device_and_partitions = subprocess.check_output( | ||
["lsblk", "-o", "TYPE", "--noheadings", self.device], stderr=subprocess.PIPE) | ||
|
||
# we don't support multiple partitions | ||
partition_count = device_and_partitions.decode('utf-8').split('\n').count('part') | ||
if partition_count > 1: | ||
logger.debug("multiple partitions not supported") | ||
self.submission.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value) | ||
|
||
# redefine device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted | ||
self.device = self.device if partition_count == 0 else self.device + '1' | ||
except subprocess.CalledProcessError: | ||
self.submission.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value) | ||
|
||
def check_luks_volume(self): | ||
# cryptsetup isLuks returns 0 if the device is a luks volume | ||
# subprocess with throw if the device is not luks (rc !=0) | ||
logger.info('Checking if volume is luks-encrypted') | ||
self.set_extracted_device_name() | ||
logger.debug("checking if {} is luks encrypted".format(self.device)) | ||
self.submission.safe_check_call( | ||
command=["sudo", "cryptsetup", "isLuks", self.device], | ||
error_message=ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value | ||
) | ||
self.submission.exit_gracefully(ExportStatus.USB_ENCRYPTED.value) | ||
|
||
def unlock_luks_volume(self, encryption_key): | ||
try: | ||
# get the encrypted device name | ||
self.set_extracted_device_name() | ||
luks_header = subprocess.check_output(["sudo", "cryptsetup", "luksDump", self.device]) | ||
luks_header_list = luks_header.decode('utf-8').split('\n') | ||
for line in luks_header_list: | ||
items = line.split('\t') | ||
if 'UUID' in items[0]: | ||
self.encrypted_device = 'luks-' + items[1] | ||
|
||
# the luks device is already unlocked | ||
if os.path.exists(os.path.join('/dev/mapper/', self.encrypted_device)): | ||
logger.debug('Device already unlocked') | ||
return | ||
|
||
logger.debug('Unlocking luks volume {}'.format(self.encrypted_device)) | ||
p = subprocess.Popen( | ||
["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device], | ||
stdin=subprocess.PIPE, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE | ||
) | ||
logger.debug('Passing key') | ||
p.communicate(input=str.encode(encryption_key, "utf-8")) | ||
rc = p.returncode | ||
if rc != 0: | ||
logger.error('Bad phassphrase for {}'.format(self.encrypted_device)) | ||
self.submission.exit_gracefully(ExportStatus.USB_BAD_PASSPHRASE.value) | ||
except subprocess.CalledProcessError: | ||
self.submission.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED) | ||
|
||
def mount_volume(self): | ||
# If the drive is already mounted then we don't need to mount it again | ||
output = subprocess.check_output( | ||
["lsblk", "-o", "MOUNTPOINT", "--noheadings", self.device]) | ||
mountpoint = output.decode('utf-8').strip() | ||
if mountpoint: | ||
logger.debug('The device is already mounted') | ||
self.mountpoint = mountpoint | ||
return | ||
|
||
# mount target not created, create folder | ||
if not os.path.exists(self.mountpoint): | ||
self.submission.safe_check_call( | ||
command=["sudo", "mkdir", self.mountpoint], | ||
error_message=ExportStatus.ERROR_USB_MOUNT | ||
) | ||
|
||
mapped_device_path = os.path.join("/dev/mapper/", self.encrypted_device) | ||
logger.info('Mounting {}'.format(mapped_device_path)) | ||
self.submission.safe_check_call( | ||
command=["sudo", "mount", mapped_device_path, self.mountpoint], | ||
error_message=ExportStatus.ERROR_USB_MOUNT.value | ||
) | ||
self.submission.safe_check_call( | ||
command=["sudo", "chown", "-R", "user:user", self.mountpoint], | ||
error_message=ExportStatus.ERROR_USB_MOUNT.value | ||
) | ||
|
||
def copy_submission(self): | ||
# move files to drive (overwrites files with same filename) and unmount drive | ||
# we don't use safe_check_call here because we must lock and | ||
# unmount the drive as part of the finally block | ||
try: | ||
target_path = os.path.join(self.mountpoint, self.submission.target_dirname) | ||
subprocess.check_call(["mkdir", target_path]) | ||
export_data = os.path.join(self.submission.tmpdir, "export_data/") | ||
logger.info('Copying file to {}'.format(self.submission.target_dirname)) | ||
subprocess.check_call(["cp", "-r", export_data, target_path]) | ||
logger.info('File copied successfully to {}'.format(self.submission.target_dirname)) | ||
self.submission.popup_message("Files exported successfully to disk.") | ||
except (subprocess.CalledProcessError, OSError): | ||
self.submission.exit_gracefully(ExportStatus.ERROR_USB_WRITE.value) | ||
finally: | ||
logger.info('Syncing filesystems') | ||
subprocess.check_call(["sync"]) | ||
|
||
if os.path.exists(self.mountpoint): | ||
logger.info('Unmounting drive from {}'.format(self.mountpoint)) | ||
subprocess.check_call(["sudo", "umount", self.mountpoint]) | ||
|
||
if os.path.exists(os.path.join('/dev/mapper', self.encrypted_device)): | ||
logger.info('Locking luks volume {}'.format(self.encrypted_device)) | ||
subprocess.check_call( | ||
["sudo", "cryptsetup", "luksClose", self.encrypted_device] | ||
) | ||
|
||
logger.info('Deleting temporary directory {}'.format(self.submission.tmpdir)) | ||
subprocess.check_call(["rm", "-rf", self.submission.tmpdir]) | ||
sys.exit(0) | ||
|
||
|
||
class USBTestAction(DiskAction): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
def run(self): | ||
logger.info('Export archive is usb-test') | ||
self.check_usb_connected(exit=True) | ||
|
||
|
||
class DiskTestAction(DiskAction): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
def run(self): | ||
logger.info('Export archive is disk-test') | ||
# check_usb_connected looks for the drive, sets the drive to use | ||
self.check_usb_connected() | ||
self.check_luks_volume() | ||
|
||
|
||
class DiskExportAction(DiskAction): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
def run(self): | ||
logger.info('Export archive is disk') | ||
# check_usb_connected looks for the drive, sets the drive to use | ||
self.check_usb_connected() | ||
logger.info('Unlocking volume') | ||
# exports all documents in the archive to luks-encrypted volume | ||
self.unlock_luks_volume(self.submission.archive_metadata.encryption_key) | ||
logger.info('Mounting volume') | ||
self.mount_volume() | ||
logger.info('Copying submission to drive') | ||
self.copy_submission() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
from enum import Enum | ||
|
||
|
||
class ExportStatus(Enum): | ||
|
||
# General errors | ||
ERROR_FILE_NOT_FOUND = 'ERROR_FILE_NOT_FOUND' | ||
ERROR_EXTRACTION = 'ERROR_EXTRACTION' | ||
ERROR_METADATA_PARSING = 'ERROR_METADATA_PARSING' | ||
ERROR_ARCHIVE_METADATA = 'ERROR_ARCHIVE_METADATA' | ||
ERROR_USB_CONFIGURATION = 'ERROR_USB_CONFIGURATION' | ||
ERROR_GENERIC = 'ERROR_GENERIC' | ||
|
||
# USB preflight related errors | ||
USB_CONNECTED = 'USB_CONNECTED' | ||
USB_NOT_CONNECTED = 'USB_NOT_CONNECTED' | ||
ERROR_USB_CHECK = 'ERROR_USB_CHECK' | ||
|
||
# USB Disk preflight related errors | ||
USB_ENCRYPTED = 'USB_ENCRYPTED' | ||
USB_ENCRYPTION_NOT_SUPPORTED = 'USB_ENCRYPTION_NOT_SUPPORTED' | ||
USB_DISK_ERROR = 'USB_DISK_ERROR' | ||
|
||
# Printer preflight related errors | ||
ERROR_PRINTER_NOT_FOUND = 'ERROR_PRINTER_NOT_FOUND' | ||
ERROR_PRINTER_NOT_SUPPORTED = 'ERROR_PRINTER_NOT_SUPPORTED' | ||
ERROR_PRINTER_DRIVER_UNAVAILABLE = 'ERROR_PRINTER_DRIVER_UNAVAILABLE' | ||
ERROR_PRINTER_INSTALL = 'ERROR_PRINTER_INSTALL' | ||
|
||
# Disk export errors | ||
USB_BAD_PASSPHRASE = 'USB_BAD_PASSPHRASE' | ||
ERROR_USB_MOUNT = 'ERROR_USB_MOUNT' | ||
ERROR_USB_WRITE = 'ERROR_USB_WRITE' | ||
|
||
# Printer export errors | ||
ERROR_PRINT = 'ERROR_PRINT' | ||
|
||
|
||
class TimeoutException(Exception): | ||
pass | ||
|
||
|
||
def handler(signum, frame): | ||
""" | ||
This is a signal handler used for raising timeouts: | ||
https://docs.python.org/3/library/signal.html#signal.signal | ||
""" | ||
raise TimeoutException("Timeout") |
Oops, something went wrong.