Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Detect different kinds of storage devices attached to sd-export-usb #16

Merged
merged 8 commits into from
Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@

Code for exporting and printing files from the SecureDrop Qubes Workstation.

## Supported Printers

TBD

## Supported Export Devices

We support luks-encrypted drives that are either MBR/DOS partitioned or GPT partitioned. If you use `Disks` in Linux to partition your drive, you can [follow these instructions](https://docs.securedrop.org/en/stable/set_up_transfer_and_export_device.html#create-usb-transfer-device) to create a new export device. You can also use [cryptsetup](https://linux.die.net/man/8/cryptsetup) to create a luks-encrypted device with full-disk encryption, for example:

1. `sudo cryptsetup luksFormat --hash=sha512 --key-size=512 DEVICE` where `DEVICE` is the name of your removable drive, which you can find via `lsblk -p`.

Make sure `DEVICE` is correct because you will be overwriting its data irrevocably.

2. `sudo cryptsetup luksOpen /dev/sdb encrypted_device`

3. `sudo mkfs.ext4 /dev/mapper/encrypted_device`

sssoleileraaa marked this conversation as resolved.
Show resolved Hide resolved
4. `sudo cryptsetup luksClose /dev/mapper/encrypted_device`

We do not yet support drives that use full-disk encryption with VeraCrypt.

## Export Archive Format

Expand Down
114 changes: 61 additions & 53 deletions securedrop_export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

PRINTER_NAME = "sdw-printer"
PRINTER_WAIT_TIMEOUT = 60
DEVICE = "/dev/sda1"
DEVICE = "/dev/sda"
MOUNTPOINT = "/media/usb"
ENCRYPTED_DEVICE = "encrypted_volume"
BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv"
Expand Down Expand Up @@ -164,76 +164,84 @@ def extract_tarball(self):
self.exit_gracefully(msg)

def check_usb_connected(self):

# If the USB is not attached via qvm-usb attach, lsusb will return empty string and a
# return code of 1
logging.info('Performing usb preflight')
try:
p = subprocess.check_output(["lsusb", "-s", "{}:".format(self.pci_bus_id)])
logging.info("lsusb -s {} : {}".format(self.pci_bus_id, p.decode("utf-8")))
subprocess.check_output(
["lsblk", "-p", "-o", "KNAME", "--noheadings", "--inverse", DEVICE],
stderr=subprocess.PIPE)
self.exit_gracefully("USB_CONNECTED")
except subprocess.CalledProcessError:
msg = "ERROR_USB_CONFIGURATION"
self.exit_gracefully(msg)
n_usb = len(p.decode("utf-8").rstrip().split("\n"))
# If there is one device, it is the root hub.
if n_usb == 1:
logging.info('usb preflight - no external devices connected')
msg = "USB_NOT_CONNECTED"
self.exit_gracefully(msg)
# If there are two devices, it's the root hub and another device (presumably for export)
elif n_usb == 2:
logging.info('usb preflight - external device connected')
msg = "USB_CONNECTED"
self.exit_gracefully(msg)
# Else the result is unexpected
else:
msg = "ERROR_USB_CHECK"
self.exit_gracefully("USB_NOT_CONNECTED")

def set_extracted_device_name(self):
try:
device_and_partitions = subprocess.check_output(
["lsblk", "-o", "TYPE", "--noheadings", DEVICE], stderr=subprocess.PIPE)

# we don't support multiple partitions
partition_count = device_and_partitions.decode('utf-8').split('\n').count('part')
if partition_count > 1:
logging.debug("multiple partitions not supported")
self.exit_gracefully("USB_NO_SUPPORTED_ENCRYPTION")

# set device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted
self.device = DEVICE if partition_count == 0 else DEVICE + '1'
except subprocess.CalledProcessError:
msg = "USB_NO_SUPPORTED_ENCRYPTION"
self.exit_gracefully(msg)

def check_luks_volume(self):
logging.info('Checking if volume is luks-encrypted')
try:
# cryptsetup isLuks returns 0 if the device is a luks volume
# subprocess with throw if the device is not luks (rc !=0)
subprocess.check_call(["sudo", "cryptsetup", "isLuks", DEVICE])
msg = "USB_ENCRYPTED"
self.exit_gracefully(msg)
self.set_extracted_device_name()
logging.debug("checking if {} is luks encrypted".format(self.device))
subprocess.check_call(["sudo", "cryptsetup", "isLuks", self.device])
self.exit_gracefully("USB_ENCRYPTED")
except subprocess.CalledProcessError:
msg = "USB_NO_SUPPORTED_ENCRYPTION"
self.exit_gracefully(msg)

def unlock_luks_volume(self, encryption_key):
# the luks device is not already unlocked
logging.info('Unlocking luks volume {}'.format(self.encrypted_device))
if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)):
p = subprocess.Popen(
["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logging.info('Passing key')
p.communicate(input=str.encode(encryption_key, "utf-8"))
rc = p.returncode
if rc != 0:
logging.error('Bad phassphrase for {}'.format(self.encrypted_device))
msg = "USB_BAD_PASSPHRASE"
self.exit_gracefully(msg)
try:
# get the encrypted device name
self.set_extracted_device_name()
luks_header = subprocess.check_output(["sudo", "cryptsetup", "luksDump", self.device])
sssoleileraaa marked this conversation as resolved.
Show resolved Hide resolved
luks_header_list = luks_header.decode('utf-8').split('\n')
for line in luks_header_list:
items = line.split('\t')
if 'UUID' in items[0]:
self.encrypted_device = 'luks-' + items[1]

# the luks device is not already unlocked
if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)):
logging.debug('Unlocking luks volume {}'.format(self.encrypted_device))
p = subprocess.Popen(
["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
logging.debug('Passing key')
p.communicate(input=str.encode(encryption_key, "utf-8"))
rc = p.returncode
if rc != 0:
logging.error('Bad phassphrase for {}'.format(self.encrypted_device))
msg = "USB_BAD_PASSPHRASE"
self.exit_gracefully(msg)
except subprocess.CalledProcessError:
self.exit_gracefully("USB_NO_SUPPORTED_ENCRYPTION")

def mount_volume(self):
# mount target not created
if not os.path.exists(self.mountpoint):
subprocess.check_call(["sudo", "mkdir", self.mountpoint])
try:
logging.info('Mounting {} to {}'.format(self.encrypted_device, self.mountpoint))
subprocess.check_call(
[
"sudo",
"mount",
os.path.join("/dev/mapper/", self.encrypted_device),
self.mountpoint,
]
)
# mount target not created
if not os.path.exists(self.mountpoint):
subprocess.check_call(["sudo", "mkdir", self.mountpoint])

mapped_device_path = os.path.join("/dev/mapper/", self.encrypted_device)
logging.info('Mounting {}'.format(mapped_device_path))
subprocess.check_call(["sudo", "mount", mapped_device_path, self.mountpoint])
subprocess.check_call(["sudo", "chown", "-R", "user:user", self.mountpoint])
except subprocess.CalledProcessError:
# clean up
Expand Down
56 changes: 37 additions & 19 deletions tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import pytest
import subprocess # noqa: F401
import tempfile
from subprocess import CalledProcessError

from securedrop_export import export

SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa
SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa

SAMPLE_OUTPUT_NO_USB = b"Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa
SAMPLE_OUTPUT_USB = b"Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa
SAMPLE_OUTPUT_USB_ERROR = b""
SAMPLE_OUTPUT_USB_ERROR2 = b"h\ne\nl\nl\no"
SAMPLE_OUTPUT_NO_PART = b"disk\ncrypt" # noqa
SAMPLE_OUTPUT_ONE_PART = b"disk\npart\ncrypt" # noqa
SAMPLE_OUTPUT_MULTI_PART = b"disk\npart\npart\npart\ncrypt" # noqa
SAMPLE_OUTPUT_USB = b"/dev/sda" # noqa
TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json")
BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad.json")
ANOTHER_BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad-2.json")
Expand Down Expand Up @@ -204,11 +205,13 @@ def test_is_not_open_office_file(capsys, open_office_paths):
assert not submission.is_open_office_file(open_office_paths)


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_USB)
def test_usb_precheck_connected(mocked_call, capsys):
def test_usb_precheck_disconnected(capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_NOT_CONNECTED"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

mock.patch("subprocess.check_output", return_value=CalledProcessError(1, 'check_output'))

with pytest.raises(SystemExit) as sysexit:
submission.check_usb_connected()
mocked_exit.assert_called_once_with(expected_message)
Expand All @@ -219,7 +222,7 @@ def test_usb_precheck_connected(mocked_call, capsys):


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB)
def test_usb_precheck_disconnected(mocked_call, capsys):
def test_usb_precheck_connected(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_CONNECTED"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)
Expand All @@ -232,38 +235,38 @@ def test_usb_precheck_disconnected(mocked_call, capsys):
assert captured.err == "{}\n".format(expected_message)


@mock.patch("subprocess.check_output", return_code=1)
def test_usb_precheck_error(mocked_call, capsys):
@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PART)
def test_luks_precheck_encrypted(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "ERROR_USB_CHECK"
expected_message = "USB_ENCRYPTED"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

with pytest.raises(SystemExit) as sysexit:
submission.check_usb_connected()
submission.check_luks_volume()
mocked_exit.assert_called_once_with(expected_message)

assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR2)
def test_usb_precheck_error_2(mocked_call, capsys):
@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART)
def test_luks_precheck_encrypted(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "ERROR_USB_CHECK"
expected_message = "USB_ENCRYPTED"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

with pytest.raises(SystemExit) as sysexit:
submission.check_usb_connected()
submission.check_luks_volume()
mocked_exit.assert_called_once_with(expected_message)

assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)


@mock.patch("subprocess.check_call")
@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART)
def test_luks_precheck_encrypted(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_ENCRYPTED"
expected_message = "USB_NO_SUPPORTED_ENCRYPTION"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

with pytest.raises(SystemExit) as sysexit:
Expand All @@ -272,3 +275,18 @@ def test_luks_precheck_encrypted(mocked_call, capsys):
assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)

@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART)
def test_luks_precheck_encrypted(mocked_call, capsys):
submission = export.SDExport("testfile", TEST_CONFIG)
expected_message = "USB_NO_SUPPORTED_ENCRYPTION"
mocked_exit = mock.patch("export.exit_gracefully", return_value=0)

mock.patch("subprocess.check_call", return_value=CalledProcessError(1, 'check_call'))

with pytest.raises(SystemExit) as sysexit:
submission.check_luks_volume()
mocked_exit.assert_called_once_with(expected_message)
assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)