Skip to content

Commit

Permalink
Merge pull request #16 from creviera/bus-id-workaround
Browse files Browse the repository at this point in the history
Detect different kinds of storage devices attached to `sd-export-usb`
  • Loading branch information
emkll authored Nov 7, 2019
2 parents 78bcbfe + bb5f9a7 commit 5bb6e85
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 72 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@

Code for exporting and printing files from the SecureDrop Qubes Workstation.

## Supported Printers

TBD

## Supported Export Devices

We support luks-encrypted drives that are either MBR/DOS partitioned or GPT partitioned. If you use `Disks` in Linux to partition your drive, you can [follow these instructions](https://docs.securedrop.org/en/stable/set_up_transfer_and_export_device.html#create-usb-transfer-device) to create a new export device. You can also use [cryptsetup](https://linux.die.net/man/8/cryptsetup) to create a luks-encrypted device with full-disk encryption, for example:

1. `sudo cryptsetup luksFormat --hash=sha512 --key-size=512 DEVICE` where `DEVICE` is the name of your removable drive, which you can find via `lsblk -p`.

Make sure `DEVICE` is correct because you will be overwriting its data irrevocably.

2. `sudo cryptsetup luksOpen /dev/sdb encrypted_device`

3. `sudo mkfs.ext4 /dev/mapper/encrypted_device`

4. `sudo cryptsetup luksClose /dev/mapper/encrypted_device`

We do not yet support drives that use full-disk encryption with VeraCrypt.

## Export Archive Format

Expand Down
114 changes: 61 additions & 53 deletions securedrop_export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

PRINTER_NAME = "sdw-printer"
PRINTER_WAIT_TIMEOUT = 60
DEVICE = "/dev/sda1"
DEVICE = "/dev/sda"
MOUNTPOINT = "/media/usb"
ENCRYPTED_DEVICE = "encrypted_volume"
BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv"
Expand Down Expand Up @@ -164,76 +164,84 @@ def extract_tarball(self):
self.exit_gracefully(msg)

def check_usb_connected(self):

# If the USB is not attached via qvm-usb attach, lsusb will return empty string and a
# return code of 1
logging.info('Performing usb preflight')
try:
p = subprocess.check_output(["lsusb", "-s", "{}:".format(self.pci_bus_id)])
logging.info("lsusb -s {} : {}".format(self.pci_bus_id, p.decode("utf-8")))
subprocess.check_output(
["lsblk", "-p", "-o", "KNAME", "--noheadings", "--inverse", DEVICE],
stderr=subprocess.PIPE)
self.exit_gracefully("USB_CONNECTED")
except subprocess.CalledProcessError:
msg = "ERROR_USB_CONFIGURATION"
self.exit_gracefully(msg)
n_usb = len(p.decode("utf-8").rstrip().split("\n"))
# If there is one device, it is the root hub.
if n_usb == 1:
logging.info('usb preflight - no external devices connected')
msg = "USB_NOT_CONNECTED"
self.exit_gracefully(msg)
# If there are two devices, it's the root hub and another device (presumably for export)
elif n_usb == 2:
logging.info('usb preflight - external device connected')
msg = "USB_CONNECTED"
self.exit_gracefully(msg)
# Else the result is unexpected
else:
msg = "ERROR_USB_CHECK"
self.exit_gracefully("USB_NOT_CONNECTED")

def set_extracted_device_name(self):
try:
device_and_partitions = subprocess.check_output(
["lsblk", "-o", "TYPE", "--noheadings", DEVICE], stderr=subprocess.PIPE)

# we don't support multiple partitions
partition_count = device_and_partitions.decode('utf-8').split('\n').count('part')
if partition_count > 1:
logging.debug("multiple partitions not supported")
self.exit_gracefully("USB_NO_SUPPORTED_ENCRYPTION")

# set device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted
self.device = DEVICE if partition_count == 0 else DEVICE + '1'
except subprocess.CalledProcessError:
msg = "USB_NO_SUPPORTED_ENCRYPTION"
self.exit_gracefully(msg)

def check_luks_volume(self):
logging.info('Checking if volume is luks-encrypted')
try:
# cryptsetup isLuks returns 0 if the device is a luks volume
# subprocess with throw if the device is not luks (rc !=0)
subprocess.check_call(["sudo", "cryptsetup", "isLuks", DEVICE])
msg = "USB_ENCRYPTED"
self.exit_gracefully(msg)
self.set_extracted_device_name()
logging.debug("checking if {} is luks encrypted".format(self.device))
subprocess.check_call(["sudo", "cryptsetup", "isLuks", self.device])
self.exit_gracefully("USB_ENCRYPTED")
except subprocess.CalledProcessError:
msg = "USB_NO_SUPPORTED_ENCRYPTION"
self.exit_gracefully(msg)

def unlock_luks_volume(self, encryption_key):
# the luks device is not already unlocked
logging.info('Unlocking luks volume {}'.format(self.encrypted_device))
if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)):
p = subprocess.Popen(
["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logging.info('Passing key')
p.communicate(input=str.encode(encryption_key, "utf-8"))
rc = p.returncode
if rc != 0:
logging.error('Bad phassphrase for {}'.format(self.encrypted_device))
msg = "USB_BAD_PASSPHRASE"
self.exit_gracefully(msg)
try:
# get the encrypted device name
self.set_extracted_device_name()
luks_header = subprocess.check_output(["sudo", "cryptsetup", "luksDump", self.device])
luks_header_list = luks_header.decode('utf-8').split('\n')
for line in luks_header_list:
items = line.split('\t')
if 'UUID' in items[0]:
self.encrypted_device = 'luks-' + items[1]

# the luks device is not already unlocked
if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)):
logging.debug('Unlocking luks volume {}'.format(self.encrypted_device))
p = subprocess.Popen(
["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logging.debug('Passing key')
p.communicate(input=str.encode(encryption_key, "utf-8"))
rc = p.returncode
if rc != 0:
logging.error('Bad phassphrase for {}'.format(self.encrypted_device))
msg = "USB_BAD_PASSPHRASE"
self.exit_gracefully(msg)
except subprocess.CalledProcessError:
self.exit_gracefully("USB_NO_SUPPORTED_ENCRYPTION")

def mount_volume(self):
# mount target not created
if not os.path.exists(self.mountpoint):
subprocess.check_call(["sudo", "mkdir", self.mountpoint])
try:
logging.info('Mounting {} to {}'.format(self.encrypted_device, self.mountpoint))
subprocess.check_call(
[
"sudo",
"mount",
os.path.join("/dev/mapper/", self.encrypted_device),
self.mountpoint,
]
)
# mount target not created
if not os.path.exists(self.mountpoint):
subprocess.check_call(["sudo", "mkdir", self.mountpoint])

mapped_device_path = os.path.join("/dev/mapper/", self.encrypted_device)
logging.info('Mounting {}'.format(mapped_device_path))
subprocess.check_call(["sudo", "mount", mapped_device_path, self.mountpoint])
subprocess.check_call(["sudo", "chown", "-R", "user:user", self.mountpoint])
except subprocess.CalledProcessError:
# clean up
Expand Down
56 changes: 37 additions & 19 deletions tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import pytest
import subprocess # noqa: F401
import tempfile
from subprocess import CalledProcessError

from securedrop_export import export

SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa
SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa

SAMPLE_OUTPUT_NO_USB = b"Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa
SAMPLE_OUTPUT_USB = b"Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa
SAMPLE_OUTPUT_USB_ERROR = b""
SAMPLE_OUTPUT_USB_ERROR2 = b"h\ne\nl\nl\no"
SAMPLE_OUTPUT_NO_PART = b"disk\ncrypt" # noqa
SAMPLE_OUTPUT_ONE_PART = b"disk\npart\ncrypt" # noqa
SAMPLE_OUTPUT_MULTI_PART = b"disk\npart\npart\npart\ncrypt" # noqa
SAMPLE_OUTPUT_USB = b"/dev/sda" # noqa
TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json")
BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad.json")
ANOTHER_BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad-2.json")
Expand Down Expand Up @@ -204,11 +205,13 @@ def test_is_not_open_office_file(capsys, open_office_paths):
assert not submission.is_open_office_file(open_office_paths)


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_USB)
def test_usb_precheck_connected(mocked_call, capsys):
def test_usb_precheck_disconnected(capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_NOT_CONNECTED"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

mock.patch("subprocess.check_output", return_value=CalledProcessError(1, 'check_output'))

with pytest.raises(SystemExit) as sysexit:
submission.check_usb_connected()
mocked_exit.assert_called_once_with(expected_message)
Expand All @@ -219,7 +222,7 @@ def test_usb_precheck_connected(mocked_call, capsys):


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB)
def test_usb_precheck_disconnected(mocked_call, capsys):
def test_usb_precheck_connected(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_CONNECTED"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)
Expand All @@ -232,38 +235,38 @@ def test_usb_precheck_disconnected(mocked_call, capsys):
assert captured.err == "{}\n".format(expected_message)


@mock.patch("subprocess.check_output", return_code=1)
def test_usb_precheck_error(mocked_call, capsys):
@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PART)
def test_luks_precheck_encrypted(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "ERROR_USB_CHECK"
expected_message = "USB_ENCRYPTED"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

with pytest.raises(SystemExit) as sysexit:
submission.check_usb_connected()
submission.check_luks_volume()
mocked_exit.assert_called_once_with(expected_message)

assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR2)
def test_usb_precheck_error_2(mocked_call, capsys):
@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART)
def test_luks_precheck_encrypted(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "ERROR_USB_CHECK"
expected_message = "USB_ENCRYPTED"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

with pytest.raises(SystemExit) as sysexit:
submission.check_usb_connected()
submission.check_luks_volume()
mocked_exit.assert_called_once_with(expected_message)

assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)


@mock.patch("subprocess.check_call")
@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART)
def test_luks_precheck_encrypted(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_ENCRYPTED"
expected_message = "USB_NO_SUPPORTED_ENCRYPTION"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

with pytest.raises(SystemExit) as sysexit:
Expand All @@ -272,3 +275,18 @@ def test_luks_precheck_encrypted(mocked_call, capsys):
assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)

@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART)
def test_luks_precheck_encrypted(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_NO_SUPPORTED_ENCRYPTION"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

mock.patch("subprocess.check_call", return_value=CalledProcessError(1, 'check_call'))

with pytest.raises(SystemExit) as sysexit:
submission.check_luks_volume()
mocked_exit.assert_called_once_with(expected_message)
assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)

0 comments on commit 5bb6e85

Please sign in to comment.