diff --git a/README.md b/README.md index 803c3a0a1..bbfebdb14 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,25 @@ Code for exporting and printing files from the SecureDrop Qubes Workstation. +## Supported Printers + +TBD + +## Supported Export Devices + +We support luks-encrypted drives that are either MBR/DOS partitioned or GPT partitioned. If you use `Disks` in Linux to partition your drive, you can [follow these instructions](https://docs.securedrop.org/en/stable/set_up_transfer_and_export_device.html#create-usb-transfer-device) to create a new export device. You can also use [cryptsetup](https://linux.die.net/man/8/cryptsetup) to create a luks-encrypted device with full-disk encryption, for example: + +1. `sudo cryptsetup luksFormat --hash=sha512 --key-size=512 DEVICE` where `DEVICE` is the name of your removable drive, which you can find via `lsblk -p`. + + Make sure `DEVICE` is correct because you will be overwriting its data irrevocably. + +2. `sudo cryptsetup luksOpen /dev/sdb encrypted_device` + +3. `sudo mkfs.ext4 /dev/mapper/encrypted_device` + +4. `sudo cryptsetup luksClose /dev/mapper/encrypted_device` + +We do not yet support drives that use full-disk encryption with VeraCrypt. ## Export Archive Format diff --git a/securedrop_export/export.py b/securedrop_export/export.py index febd54efa..c0b93d447 100755 --- a/securedrop_export/export.py +++ b/securedrop_export/export.py @@ -14,7 +14,7 @@ PRINTER_NAME = "sdw-printer" PRINTER_WAIT_TIMEOUT = 60 -DEVICE = "/dev/sda1" +DEVICE = "/dev/sda" MOUNTPOINT = "/media/usb" ENCRYPTED_DEVICE = "encrypted_volume" BRLASER_DRIVER = "/usr/share/cups/drv/brlaser.drv" @@ -164,76 +164,84 @@ def extract_tarball(self): self.exit_gracefully(msg) def check_usb_connected(self): - # If the USB is not attached via qvm-usb attach, lsusb will return empty string and a # return code of 1 logging.info('Performing usb preflight') try: - p = subprocess.check_output(["lsusb", "-s", "{}:".format(self.pci_bus_id)]) - logging.info("lsusb -s {} : {}".format(self.pci_bus_id, p.decode("utf-8"))) + subprocess.check_output( + ["lsblk", "-p", "-o", "KNAME", "--noheadings", "--inverse", DEVICE], + stderr=subprocess.PIPE) + self.exit_gracefully("USB_CONNECTED") except subprocess.CalledProcessError: - msg = "ERROR_USB_CONFIGURATION" - self.exit_gracefully(msg) - n_usb = len(p.decode("utf-8").rstrip().split("\n")) - # If there is one device, it is the root hub. - if n_usb == 1: - logging.info('usb preflight - no external devices connected') - msg = "USB_NOT_CONNECTED" - self.exit_gracefully(msg) - # If there are two devices, it's the root hub and another device (presumably for export) - elif n_usb == 2: - logging.info('usb preflight - external device connected') - msg = "USB_CONNECTED" - self.exit_gracefully(msg) - # Else the result is unexpected - else: - msg = "ERROR_USB_CHECK" + self.exit_gracefully("USB_NOT_CONNECTED") + + def set_extracted_device_name(self): + try: + device_and_partitions = subprocess.check_output( + ["lsblk", "-o", "TYPE", "--noheadings", DEVICE], stderr=subprocess.PIPE) + + # we don't support multiple partitions + partition_count = device_and_partitions.decode('utf-8').split('\n').count('part') + if partition_count > 1: + logging.debug("multiple partitions not supported") + self.exit_gracefully("USB_NO_SUPPORTED_ENCRYPTION") + + # set device to /dev/sda if disk is encrypted, /dev/sda1 if partition encrypted + self.device = DEVICE if partition_count == 0 else DEVICE + '1' + except subprocess.CalledProcessError: + msg = "USB_NO_SUPPORTED_ENCRYPTION" self.exit_gracefully(msg) def check_luks_volume(self): logging.info('Checking if volume is luks-encrypted') try: - # cryptsetup isLuks returns 0 if the device is a luks volume - # subprocess with throw if the device is not luks (rc !=0) - subprocess.check_call(["sudo", "cryptsetup", "isLuks", DEVICE]) - msg = "USB_ENCRYPTED" - self.exit_gracefully(msg) + self.set_extracted_device_name() + logging.debug("checking if {} is luks encrypted".format(self.device)) + subprocess.check_call(["sudo", "cryptsetup", "isLuks", self.device]) + self.exit_gracefully("USB_ENCRYPTED") except subprocess.CalledProcessError: msg = "USB_NO_SUPPORTED_ENCRYPTION" self.exit_gracefully(msg) def unlock_luks_volume(self, encryption_key): - # the luks device is not already unlocked - logging.info('Unlocking luks volume {}'.format(self.encrypted_device)) - if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)): - p = subprocess.Popen( - ["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE - ) - logging.info('Passing key') - p.communicate(input=str.encode(encryption_key, "utf-8")) - rc = p.returncode - if rc != 0: - logging.error('Bad phassphrase for {}'.format(self.encrypted_device)) - msg = "USB_BAD_PASSPHRASE" - self.exit_gracefully(msg) + try: + # get the encrypted device name + self.set_extracted_device_name() + luks_header = subprocess.check_output(["sudo", "cryptsetup", "luksDump", self.device]) + luks_header_list = luks_header.decode('utf-8').split('\n') + for line in luks_header_list: + items = line.split('\t') + if 'UUID' in items[0]: + self.encrypted_device = 'luks-' + items[1] + + # the luks device is not already unlocked + if not os.path.exists(os.path.join("/dev/mapper/", self.encrypted_device)): + logging.debug('Unlocking luks volume {}'.format(self.encrypted_device)) + p = subprocess.Popen( + ["sudo", "cryptsetup", "luksOpen", self.device, self.encrypted_device], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + logging.debug('Passing key') + p.communicate(input=str.encode(encryption_key, "utf-8")) + rc = p.returncode + if rc != 0: + logging.error('Bad phassphrase for {}'.format(self.encrypted_device)) + msg = "USB_BAD_PASSPHRASE" + self.exit_gracefully(msg) + except subprocess.CalledProcessError: + self.exit_gracefully("USB_NO_SUPPORTED_ENCRYPTION") def mount_volume(self): - # mount target not created - if not os.path.exists(self.mountpoint): - subprocess.check_call(["sudo", "mkdir", self.mountpoint]) try: - logging.info('Mounting {} to {}'.format(self.encrypted_device, self.mountpoint)) - subprocess.check_call( - [ - "sudo", - "mount", - os.path.join("/dev/mapper/", self.encrypted_device), - self.mountpoint, - ] - ) + # mount target not created + if not os.path.exists(self.mountpoint): + subprocess.check_call(["sudo", "mkdir", self.mountpoint]) + + mapped_device_path = os.path.join("/dev/mapper/", self.encrypted_device) + logging.info('Mounting {}'.format(mapped_device_path)) + subprocess.check_call(["sudo", "mount", mapped_device_path, self.mountpoint]) subprocess.check_call(["sudo", "chown", "-R", "user:user", self.mountpoint]) except subprocess.CalledProcessError: # clean up diff --git a/tests/test_export.py b/tests/test_export.py index 7c6e45244..94b6dd63a 100644 --- a/tests/test_export.py +++ b/tests/test_export.py @@ -4,16 +4,17 @@ import pytest import subprocess # noqa: F401 import tempfile +from subprocess import CalledProcessError from securedrop_export import export SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa -SAMPLE_OUTPUT_NO_USB = b"Bus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa -SAMPLE_OUTPUT_USB = b"Bus 001 Device 002: ID 0781:5575 SanDisk Corp.\nBus 001 Device 001: ID 1d6b:0002 Linux Foundation 2.0 root hub" # noqa -SAMPLE_OUTPUT_USB_ERROR = b"" -SAMPLE_OUTPUT_USB_ERROR2 = b"h\ne\nl\nl\no" +SAMPLE_OUTPUT_NO_PART = b"disk\ncrypt" # noqa +SAMPLE_OUTPUT_ONE_PART = b"disk\npart\ncrypt" # noqa +SAMPLE_OUTPUT_MULTI_PART = b"disk\npart\npart\npart\ncrypt" # noqa +SAMPLE_OUTPUT_USB = b"/dev/sda" # noqa TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config.json") BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad.json") ANOTHER_BAD_TEST_CONFIG = os.path.join(os.path.dirname(__file__), "sd-export-config-bad-2.json") @@ -204,11 +205,13 @@ def test_is_not_open_office_file(capsys, open_office_paths): assert not submission.is_open_office_file(open_office_paths) -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_USB) -def test_usb_precheck_connected(mocked_call, capsys): +def test_usb_precheck_disconnected(capsys): submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "USB_NOT_CONNECTED" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) + + mock.patch("subprocess.check_output", return_value=CalledProcessError(1, 'check_output')) + with pytest.raises(SystemExit) as sysexit: submission.check_usb_connected() mocked_exit.assert_called_once_with(expected_message) @@ -219,7 +222,7 @@ def test_usb_precheck_connected(mocked_call, capsys): @mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB) -def test_usb_precheck_disconnected(mocked_call, capsys): +def test_usb_precheck_connected(mocked_call, capsys): submission = export.SDExport("testfile", TEST_CONFIG) expected_message = "USB_CONNECTED" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) @@ -232,38 +235,38 @@ def test_usb_precheck_disconnected(mocked_call, capsys): assert captured.err == "{}\n".format(expected_message) -@mock.patch("subprocess.check_output", return_code=1) -def test_usb_precheck_error(mocked_call, capsys): +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PART) +def test_luks_precheck_encrypted(mocked_call, capsys): submission = export.SDExport("testfile", TEST_CONFIG) - expected_message = "ERROR_USB_CHECK" + expected_message = "USB_ENCRYPTED" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) + with pytest.raises(SystemExit) as sysexit: - submission.check_usb_connected() + submission.check_luks_volume() mocked_exit.assert_called_once_with(expected_message) - assert sysexit.value.code == 0 captured = capsys.readouterr() assert captured.err == "{}\n".format(expected_message) -@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_USB_ERROR2) -def test_usb_precheck_error_2(mocked_call, capsys): +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) +def test_luks_precheck_encrypted(mocked_call, capsys): submission = export.SDExport("testfile", TEST_CONFIG) - expected_message = "ERROR_USB_CHECK" + expected_message = "USB_ENCRYPTED" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) + with pytest.raises(SystemExit) as sysexit: - submission.check_usb_connected() + submission.check_luks_volume() mocked_exit.assert_called_once_with(expected_message) - assert sysexit.value.code == 0 captured = capsys.readouterr() assert captured.err == "{}\n".format(expected_message) -@mock.patch("subprocess.check_call") +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_MULTI_PART) def test_luks_precheck_encrypted(mocked_call, capsys): submission = export.SDExport("testfile", TEST_CONFIG) - expected_message = "USB_ENCRYPTED" + expected_message = "USB_NO_SUPPORTED_ENCRYPTION" mocked_exit = mock.patch("export.exit_gracefully", return_value=0) with pytest.raises(SystemExit) as sysexit: @@ -272,3 +275,18 @@ def test_luks_precheck_encrypted(mocked_call, capsys): assert sysexit.value.code == 0 captured = capsys.readouterr() assert captured.err == "{}\n".format(expected_message) + +@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_ONE_PART) +def test_luks_precheck_encrypted(mocked_call, capsys): + submission = export.SDExport("testfile", TEST_CONFIG) + expected_message = "USB_NO_SUPPORTED_ENCRYPTION" + mocked_exit = mock.patch("export.exit_gracefully", return_value=0) + + mock.patch("subprocess.check_call", return_value=CalledProcessError(1, 'check_call')) + + with pytest.raises(SystemExit) as sysexit: + submission.check_luks_volume() + mocked_exit.assert_called_once_with(expected_message) + assert sysexit.value.code == 0 + captured = capsys.readouterr() + assert captured.err == "{}\n".format(expected_message)