Skip to content

Commit

Permalink
Merge pull request #32 from freedomofpress/26-remove-hardcoded-device
Browse files Browse the repository at this point in the history
remove hardcoded USB device name
  • Loading branch information
sssoleileraaa authored Dec 5, 2019
2 parents bd03d01 + 930dd9d commit ae8e16f
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 18 deletions.
58 changes: 45 additions & 13 deletions securedrop_export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
import tarfile
import tempfile
import time
from typing import List, Optional # noqa: F401

from enum import Enum

PRINTER_NAME = "sdw-printer"
PRINTER_WAIT_TIMEOUT = 60
DEVICE = "/dev/sda"
MOUNTPOINT = "/media/usb"
ENCRYPTED_DEVICE = "encrypted_volume"
BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv"
Expand Down Expand Up @@ -122,7 +122,7 @@ def is_valid(self):

class SDExport(object):
def __init__(self, archive, config_path):
self.device = DEVICE
self.device = None # Optional[str]
self.mountpoint = MOUNTPOINT
self.encrypted_device = ENCRYPTED_DEVICE

Expand Down Expand Up @@ -191,31 +191,63 @@ def extract_tarball(self):
except Exception:
self.exit_gracefully(ExportStatus.ERROR_EXTRACTION.value)

def check_usb_connected(self):
# If the USB is not attached via qvm-usb attach, lsusb will return empty string and a
# return code of 1
def check_usb_connected(self, exit=False) -> None:
usb_devices = self._get_connected_usbs()

if len(usb_devices) == 0:
self.exit_gracefully(ExportStatus.USB_NOT_CONNECTED.value)
elif len(usb_devices) == 1:
self.device = usb_devices[0]
if exit:
self.exit_gracefully(ExportStatus.USB_CONNECTED.value)
elif len(usb_devices) > 1:
# Return generic error until freedomofpress/securedrop-export/issues/25
self.exit_gracefully(ExportStatus.ERROR_GENERIC.value)

def _get_connected_usbs(self) -> List[str]:
logging.info('Performing usb preflight')
# List all block devices attached to VM that are disks and not partitions.
try:
subprocess.check_output(
["lsblk", "-p", "-o", "KNAME", "--noheadings", "--inverse", DEVICE],
stderr=subprocess.PIPE)
self.exit_gracefully(ExportStatus.USB_CONNECTED.value)
lsblk = subprocess.Popen(["lsblk", "-o", "NAME,TYPE"], stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
grep = subprocess.Popen(["grep", "disk"], stdin=lsblk.stdout, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
command_output = grep.stdout.readlines()

# The first word in each element of the command_output list is the device name
attached_devices = [x.decode('utf8').split()[0] for x in command_output]
except subprocess.CalledProcessError:
self.exit_gracefully(ExportStatus.USB_NOT_CONNECTED.value)
self.exit_gracefully(ExportStatus.ERROR_GENERIC.value)

# Determine which are USBs by selecting those block devices that are removable disks.
usb_devices = []
for device in attached_devices:
try:
removable = subprocess.check_output(
["cat", "/sys/class/block/{}/removable".format(device)],
stderr=subprocess.PIPE)
is_removable = int(removable.decode('utf8').strip())
except subprocess.CalledProcessError:
is_removable = False

if is_removable:
usb_devices.append("/dev/{}".format(device))

return usb_devices

def set_extracted_device_name(self):
try:
device_and_partitions = subprocess.check_output(
["lsblk", "-o", "TYPE", "--noheadings", DEVICE], stderr=subprocess.PIPE)
["lsblk", "-o", "TYPE", "--noheadings", self.device], stderr=subprocess.PIPE)

# we don't support multiple partitions
partition_count = device_and_partitions.decode('utf-8').split('\n').count('part')
if partition_count > 1:
logging.debug("multiple partitions not supported")
self.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value)

# set device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted
self.device = DEVICE if partition_count == 0 else DEVICE + '1'
# redefine device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted
self.device = self.device if partition_count == 0 else self.device + '1'
except subprocess.CalledProcessError:
self.exit_gracefully(ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value)

Expand Down
6 changes: 5 additions & 1 deletion securedrop_export/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ def __main__(submission):
if submission.archive_metadata.is_valid():
if submission.archive_metadata.export_method == "usb-test":
logging.info('Export archive is usb-test')
submission.check_usb_connected()
submission.check_usb_connected(exit=True)
elif submission.archive_metadata.export_method == "disk":
logging.info('Export archive is disk')
# check_usb_connected looks for the drive, sets the drive to use
submission.check_usb_connected()
logging.info('Unlocking volume')
# exports all documents in the archive to luks-encrypted volume
submission.unlock_luks_volume(submission.archive_metadata.encryption_key)
Expand All @@ -29,6 +31,8 @@ def __main__(submission):
submission.copy_submission()
elif submission.archive_metadata.export_method == "disk-test":
logging.info('Export archive is disk-test')
# check_usb_connected looks for the drive, sets the drive to use
submission.check_usb_connected()
submission.check_luks_volume()
elif submission.archive_metadata.export_method == "printer":
logging.info('Export archive is printer')
Expand Down
61 changes: 57 additions & 4 deletions tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,33 +193,81 @@ def test_is_not_open_office_file(capsys, open_office_paths):


def test_usb_precheck_disconnected(capsys, mocker):
"""Tests the scenario where there are disks connected, but none of them are USB"""
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_NOT_CONNECTED"
assert export.ExportStatus.USB_NOT_CONNECTED.value == expected_message

# Popen call returns lsblk output
command_output = mock.MagicMock()
command_output.stdout = mock.MagicMock()
command_output.stdout.readlines = mock.MagicMock(return_value=[b"sda disk\n", b"sdb disk\n"])
mocker.patch("subprocess.Popen", return_value=command_output)

# check_output returns removable status
mocker.patch("subprocess.check_output", return_value=[b'0\n', b'0\n'])

mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0)

mocker.patch("subprocess.check_output",
side_effect=CalledProcessError(1, 'check_output'))

submission.check_usb_connected()
submission.check_usb_connected(exit=True)

mocked_exit.assert_called_once_with(expected_message)
assert submission.device is None


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB)
def test_usb_precheck_connected(mocked_call, capsys, mocker):
def test_usb_precheck_connected(capsys, mocker):
"""Tests the scenario where there is one USB connected"""
submission = export.SDExport("testfile", TEST_CONFIG)

# Popen call returns lsblk output
command_output = mock.MagicMock()
command_output.stdout = mock.MagicMock()
command_output.stdout.readlines = mock.MagicMock(return_value=[b"sdb disk\n"])
mocker.patch("subprocess.Popen", return_value=command_output)

# check_output returns removable status
mocker.patch("subprocess.check_output", return_value=b"1\n")

expected_message = "USB_CONNECTED"
assert export.ExportStatus.USB_CONNECTED.value == expected_message
mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0)

submission.check_usb_connected()
submission.check_usb_connected(exit=True)

mocked_exit.assert_called_once_with(expected_message)
assert submission.device == "/dev/sdb"


def test_usb_precheck_multiple_devices_connected(capsys, mocker):
"""Tests the scenario where there are multiple USB drives connected"""
submission = export.SDExport("testfile", TEST_CONFIG)

# Popen call returns lsblk output
command_output = mock.MagicMock()
command_output.stdout = mock.MagicMock()
command_output.stdout.readlines = mock.MagicMock(return_value=[b"sdb disk\n", b"sdc disk\n"])
mocker.patch("subprocess.Popen", return_value=command_output)

# check_output returns removable status
mocker.patch("subprocess.check_output", return_value=b"1\n")

expected_message = "ERROR_GENERIC"
assert export.ExportStatus.ERROR_GENERIC.value == expected_message
mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0)

submission.check_usb_connected(exit=True)

mocked_exit.assert_called_once_with(expected_message)
assert submission.device is None


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PART)
def test_extract_device_name_no_part(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
submission.device = "/dev/sda"

submission.set_extracted_device_name()

Expand All @@ -229,6 +277,7 @@ def test_extract_device_name_no_part(mocked_call, capsys):
@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART)
def test_extract_device_name_single_part(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
submission.device = "/dev/sda"

submission.set_extracted_device_name()

Expand All @@ -238,6 +287,7 @@ def test_extract_device_name_single_part(mocked_call, capsys):
@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART)
def test_extract_device_name_multiple_part(mocked_call, capsys, mocker):
submission = export.SDExport("testfile", TEST_CONFIG)
submission.device = "/dev/sda"
mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0)
expected_message = export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value

Expand All @@ -262,6 +312,7 @@ def test_luks_precheck_encrypted_fde(mocked_call, capsys, mocker):
@mock.patch("subprocess.check_call", return_value=0)
def test_luks_precheck_encrypted_single_part(mocked_call, capsys, mocker):
submission = export.SDExport("testfile", TEST_CONFIG)
submission.device = "/dev/sda"
expected_message = export.ExportStatus.USB_ENCRYPTED.value
mocked_exit = mocker.patch.object(submission, "exit_gracefully", return_value=0)

Expand All @@ -273,6 +324,7 @@ def test_luks_precheck_encrypted_single_part(mocked_call, capsys, mocker):
@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART)
def test_luks_precheck_encrypted_multi_part(mocked_call, capsys, mocker):
submission = export.SDExport("testfile", TEST_CONFIG)
submission.device = "/dev/sda"
expected_message = export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value

# Here we need to mock the exit_gracefully method with a side effect otherwise
Expand All @@ -296,6 +348,7 @@ def test_luks_precheck_encrypted_multi_part(mocked_call, capsys, mocker):
@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART)
def test_luks_precheck_encrypted_luks_error(mocked_call, capsys, mocker):
submission = export.SDExport("testfile", TEST_CONFIG)
submission.device = "/dev/sda"
expected_message = "USB_ENCRYPTION_NOT_SUPPORTED"
assert expected_message == export.ExportStatus.USB_ENCRYPTION_NOT_SUPPORTED.value

Expand Down

0 comments on commit ae8e16f

Please sign in to comment.