Skip to content

Commit

Permalink
Add metadata objects and tests
Browse files Browse the repository at this point in the history
* Ensure more reliable and DRY parsing of sd-export metadata.json
* Use notify-send to provide user feedback on export status
  • Loading branch information
emkll committed Jul 3, 2019
1 parent ff32328 commit 5f1d51b
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 60 deletions.
9 changes: 9 additions & 0 deletions dom0/sd-export-files.sls
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,12 @@ sd-export-file-format:
- require:
- file: sd-export-file-format
- file: sd-export-desktop-file

sd-export-securedrop-icon:
file.managed:
- name: /usr/share/securedrop/icons/sd-logo.png
- source: salt://sd/sd-proxy/logo-small.png
- user: root
- group: root
- mode: 644
- makedirs: True
128 changes: 70 additions & 58 deletions sd-export/send-to-usb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import datetime
import json
import os
import re
import subprocess
import sys
import tarfile
Expand All @@ -28,59 +27,62 @@ def exit_gracefully(msg, e=False):
sys.stderr.write("\n")
# exit with 0 return code otherwise the os will attempt to open
# the file with another application
popup_message("Export error: {}".format(msg))
sys.exit(0)


def extract_tarball():
def popup_message(msg):
try:
with tarfile.open(SUBMISSION_ARCHIVE) as tar:
tar.extractall(SUBMISSION_TMPDIR)
except Exception as e:
msg = "Error opening export bundle: "
subprocess.check_call([
"notify-send",
"--expire-time", "3000",
"--icon", "/usr/share/securedrop/icons/sd-logo.png",
"SecureDrop: {}".format(msg)
])
except subprocess.CalledProcessError as e:
msg = "Error sending notification:"
exit_gracefully(msg, e=e)


def retrieve_export_method():
try:
metadata_filepath = os.path.join(
SUBMISSION_TMPDIR, SUBMISSION_DIRNAME, "metadata.json"
)
with open(metadata_filepath) as json_data:
data = json.load(json_data)
export_method = data["device"]
except Exception as e:
msg = "Error parsing metadata."
exit_gracefully(msg, e=e)
class Metadata(object):
"""
Object to parse, validate and store json metadata from the sd-export archive.
"""

# we only support printers and encrypted disks as well as their test methods
# for now
if export_method not in ["disk", "disk-test", "printer", "printer-test"]:
msg = "Unsupported export device."
exit_gracefully(msg)
METADATA_FILE = "metadata.json"
SUPPORTED_EXPORT_METHODS = ["disk", "printer", "printer-test"]
SUPPORTED_ENCRYPTION_METHODS = ["luks"]

return export_method
def __init__(self, archive_path):
self.metadata_path = os.path.join(archive_path, self.METADATA_FILE)
try:
with open(self.metadata_path) as f:
json_config = json.loads(f.read())
self.export_method = json_config.get("device", None)
self.encryption_method = json_config.get("encryption_method", None)
self.encryption_key = json_config.get("encryption_key", None)
except Exception as e:
msg = "Error parsing metadata: "
exit_gracefully(msg, e=e)

def is_valid(self):
if self.export_method not in self.SUPPORTED_EXPORT_METHODS:
return False

if self.export_method == "disk":
if self.encryption_method not in self.SUPPORTED_ENCRYPTION_METHODS:
return False
return True


def retrieve_encryption_metadata():
def extract_tarball():
try:
metadata_filepath = os.path.join(
SUBMISSION_TMPDIR, SUBMISSION_DIRNAME, "metadata.json"
)
with open(metadata_filepath) as json_data:
data = json.load(json_data)
encryption_method = data["encryption-method"]
encryption_key = data["encryption-key"]
with tarfile.open(SUBMISSION_ARCHIVE) as tar:
tar.extractall(SUBMISSION_TMPDIR)
except Exception as e:
msg = "Error parsing metadata."
msg = "Error opening export bundle: "
exit_gracefully(msg, e=e)

# we only support luks for now
if encryption_method != "luks":
msg = "Unsupported export encryption."
exit_gracefully(msg)

return (encryption_method, encryption_key)


def unlock_luks_volume(encryption_key):
# the luks device is not already unlocked
Expand Down Expand Up @@ -129,6 +131,7 @@ def copy_submission():
SUBMISSION_TMPDIR, SUBMISSION_DIRNAME, "export_data/"
)
subprocess.check_call(["cp", "-r", export_data, TARGET_DIRNAME_path])
popup_message("Files exported successfully to disk.")
except (subprocess.CalledProcessError, OSError) as e:
msg = "Error writing to disk:"
exit_gracefully(msg, e=e)
Expand Down Expand Up @@ -211,14 +214,19 @@ def setup_printer(printer_name, printer_uri, printer_ppd):

def print_test_page(printer_name):
print_file(printer_name, "/usr/share/cups/data/testprint")
popup_message("Printing test page")


def print_all_files(printer_name):
files_path = os.path.join(SUBMISSION_TMPDIR, SUBMISSION_DIRNAME, "export_data/")
files = os.listdir(files_path)
print_count = 0
for f in files:
file_path = os.path.join(files_path, f)
print_file(printer_name, file_path)
print_count += 1
msg = "Printing document {} of {}".format(print_count, len(files))
popup_message(msg)


def print_file(printer_name, file_to_print):
Expand All @@ -233,25 +241,29 @@ def print_file(printer_name, file_to_print):

def main():
extract_tarball()
export_method = retrieve_export_method()
if export_method == "disk":
# exports all documents in the archive to luks-encrypted volume
encryption_method, encryption_key = retrieve_encryption_metadata()
unlock_luks_volume(encryption_key)
mount_volume()
copy_submission()
elif export_method == "printer":
# prints all documents in the archive
printer_uri = get_printer_uri()
printer_ppd = install_printer_ppd(printer_uri)
setup_printer(PRINTER_NAME, printer_uri, printer_ppd)
print_all_files(PRINTER_NAME)
elif export_method == "printer-test":
# Prints a test page to ensure the printer is functional
printer_uri = get_printer_uri()
printer_ppd = install_printer_ppd(printer_uri)
setup_printer(PRINTER_NAME, printer_uri, printer_ppd)
print_test_page(PRINTER_NAME)

archive_path = os.path.join(SUBMISSION_TMPDIR, SUBMISSION_DIRNAME)
archive_metadata = Metadata(archive_path)
if archive_metadata.is_valid():
if archive_metadata.export_method == "disk":
# exports all documents in the archive to luks-encrypted volume
unlock_luks_volume(archive_metadata.encryption_key)
mount_volume()
copy_submission()
elif archive_metadata.export_method == "printer":
# prints all documents in the archive
printer_uri = get_printer_uri()
printer_ppd = install_printer_ppd(printer_uri)
setup_printer(PRINTER_NAME, printer_uri, printer_ppd)
print_all_files(PRINTER_NAME)
elif archive_metadata.export_method == "printer-test":
# Prints a test page to ensure the printer is functional
printer_uri = get_printer_uri()
printer_ppd = install_printer_ppd(printer_uri)
setup_printer(PRINTER_NAME, printer_uri, printer_ppd)
print_test_page(PRINTER_NAME)
else:
exit_gracefully("Archive metadata is invalid")


if __name__ == "__main__":
Expand Down
98 changes: 96 additions & 2 deletions sd-export/test_export.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
from unittest import mock

import imp
import os
import pytest
import tempfile


SAMPLE_OUTPUT_NO_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\nnetwork lpd" # noqa
SAMPLE_OUTPUT_BOTHER_PRINTER = b"network beh\nnetwork https\nnetwork ipp\nnetwork ipps\nnetwork http\nnetwork\nnetwork ipp14\ndirect usb://Brother/HL-L2320D%20series?serial=A00000A000000\nnetwork lpd" # noqa


# This below stanza is only necessary because the export code is not
Expand All @@ -17,7 +24,7 @@ def test_exit_gracefully_no_exception(capsys):

with pytest.raises(SystemExit) as sysexit:
securedropexport.exit_gracefully(test_msg)

# A graceful exit means a return code of 0
assert sysexit.value.code == 0

Expand All @@ -38,4 +45,91 @@ def test_exit_gracefully_exception(capsys):

captured = capsys.readouterr()
assert captured.err == "{}\n<unknown exception>\n".format(test_msg)
assert captured.out == ""
assert captured.out == ""


def test_empty_config(capsys):
temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, securedropexport.Metadata.METADATA_FILE)
with open(metadata, "w") as f:
f.write("{}")
config = securedropexport.Metadata(temp_folder)
assert not config.is_valid()


def test_valid_printer_test_config(capsys):
temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, securedropexport.Metadata.METADATA_FILE)
with open(metadata, "w") as f:
f.write('{"device": "printer-test"}')
config = securedropexport.Metadata(temp_folder)
assert config.is_valid()
assert config.encryption_key is None
assert config.encryption_method is None


def test_valid_printer_config(capsys):
temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, securedropexport.Metadata.METADATA_FILE)
with open(metadata, "w") as f:
f.write('{"device": "printer"}')
config = securedropexport.Metadata(temp_folder)
assert config.is_valid()
assert config.encryption_key is None
assert config.encryption_method is None


def test_invalid_encryption_config(capsys):
temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, securedropexport.Metadata.METADATA_FILE)
with open(metadata, "w") as f:
f.write(
'{"device": "disk", "encryption_method": "base64", "encryption_key": "hunter1"}'
)
config = securedropexport.Metadata(temp_folder)
assert config.encryption_key == "hunter1"
assert config.encryption_method == "base64"
assert not config.is_valid()


def test_valid_encryption_config(capsys):
temp_folder = tempfile.mkdtemp()
metadata = os.path.join(temp_folder, securedropexport.Metadata.METADATA_FILE)
with open(metadata, "w") as f:
f.write(
'{"device": "disk", "encryption_method": "luks", "encryption_key": "hunter1"}'
)
config = securedropexport.Metadata(temp_folder)
assert config.encryption_key == "hunter1"
assert config.encryption_method == "luks"
assert config.is_valid()


@mock.patch("subprocess.check_call")
def test_popup_message(mocked_call):
securedropexport.popup_message("hello!")
mocked_call.assert_called_once_with(
["notify-send", "--expire-time", "3000", "SecureDrop: hello!"]
)


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_BOTHER_PRINTER)
def test_get_good_printer_uri(mocked_call):
result = securedropexport.get_printer_uri()
assert result == "usb://Brother/HL-L2320D%20series?serial=A00000A000000"


@mock.patch("subprocess.check_output", return_value=SAMPLE_OUTPUT_NO_PRINTER)
def test_get_bad_printer_uri(mocked_call, capsys):
expected_message = "USB Printer not found"
mocked_exit = mock.patch("securedropexport.exit_gracefully", return_value=0)

with pytest.raises(SystemExit) as sysexit:
result = securedropexport.get_printer_uri()
assert result == ""
mocked_exit.assert_called_once_with(expected_message)

assert sysexit.value.code == 0
captured = capsys.readouterr()
assert captured.err == "{}\n".format(expected_message)
assert captured.out == ""

0 comments on commit 5f1d51b

Please sign in to comment.