Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Commit

Permalink
Apply black and linter. Improve test coverage. TimeoutException no lo…
Browse files Browse the repository at this point in the history
…nger inherits from ExportException. Small fix to export metadata and get_partitioned_devices.
  • Loading branch information
rocodes committed Jan 12, 2023
1 parent 5e398ad commit 65920dc
Show file tree
Hide file tree
Showing 23 changed files with 1,058 additions and 450 deletions.
80 changes: 36 additions & 44 deletions securedrop_export/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import json
import logging
import os
import shutil
import subprocess
import sys
import tempfile

from securedrop_export.exceptions import ExportException
Expand All @@ -16,51 +13,26 @@

logger = logging.getLogger(__name__)


class Status(BaseStatus):
ERROR_ARCHIVE_METADATA = "ERROR_ARCHIVE_METADATA"
ERROR_METADATA_PARSING = "ERROR_METADATA_PARSING"
ERROR_EXTRACTION = "ERROR_EXTRACTION"


class Metadata(object):
"""
Object to parse, validate and store json metadata from the sd-export archive.
Create a Metadata object by using the `create_and_validate()` method to
ensure well-formed and valid metadata.
"""

METADATA_FILE = "metadata.json"
SUPPORTED_ENCRYPTION_METHODS = ["luks"]

# Slightly underhanded way of ensuring that a Metadata object is not instantiated
# directly; instead, the create_and_validate() method is used
__key = object()


def __init__(self, key: object, archive_path: str):
if not key == Metadata.__key:
raise ValueError("Must use create_and_validate() to create Metadata object")

# Initialize
def __init__(self, archive_path: str):
self.metadata_path = os.path.join(archive_path, self.METADATA_FILE)


@classmethod
def create_and_validate(cls, archive_path) -> 'Metadata':
"""
Create and validate metadata object. Raise ExportException for invalid metadata.
"""
md = Metadata(cls.__key, archive_path)
md.validate()

return md


def validate(self):
"""
Validate Metadata.
Throw ExportException if invalid state is found.
"""
def validate(self) -> "Metadata":
# Read metadata json and set relevant attributes
try:
with open(self.metadata_path) as f:
logger.info("Parsing archive metadata")
Expand All @@ -69,7 +41,7 @@ def validate(self):
self.encryption_method = json_config.get("encryption_method", None)
self.encryption_key = json_config.get("encryption_key", None)
logger.info(
"Exporting to device {} with encryption_method {}".format(
"Target: {}, encryption_method {}".format(
self.export_method, self.encryption_method
)
)
Expand All @@ -78,32 +50,52 @@ def validate(self):
logger.error("Metadata parsing failure")
raise ExportException(sdstatus=Status.ERROR_METADATA_PARSING) from ex

# Validate metadata - this will fail if command is not in list of supported commands
try:
# Validate action - fails if command is not in list of supported commands
try:
logger.debug("Validate export action")
self.command = Command(self.export_method)
if self.command is Command.EXPORT and not self.encryption_method in self.SUPPORTED_ENCRYPTION_METHODS:
if (
self.command is Command.EXPORT
and self.encryption_method not in self.SUPPORTED_ENCRYPTION_METHODS
):
logger.error("Unsupported encryption method")
raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA)
except ValueError as v:
raise ExportException(sdstatus=Status.ERROR_METADATA_PARSING) from v
raise ExportException(sdstatus=Status.ERROR_ARCHIVE_METADATA) from v

return self


class Archive(object):
def __init__(self, archive):
def __init__(self, archive_path: str):
os.umask(0o077)
self.archive = archive
self.submission_dirname = os.path.basename(self.archive).split(".")[0]
self.archive = archive_path
self.target_dirname = "sd-export-{}".format(
datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
)
self.tmpdir = tempfile.mkdtemp()

def extract_tarball(self):
def extract_tarball(self) -> "Archive":
"""
Extract tarball, checking for path traversal, and return Archive object.
"""
try:
logger.info("Extracting tarball {} into {}".format(self.archive, self.tmpdir))
logger.info(
"Extracting tarball {} into {}".format(self.archive, self.tmpdir)
)
safe_extractall(self.archive, self.tmpdir)
return self
except Exception as ex:
logger.error("Unable to extract tarball: {}".format(ex))
raise ExportException(sdstatus=Status.ERROR_EXTRACTION) from ex


def set_metadata(self, metadata: Metadata) -> "Archive":
"""
Set relevant metadata attributes for a given archive.
"""
self.command = metadata.command
if self.command is Command.EXPORT:
# When we support multiple encryption types, we will also want to add the
# encryption_method here
self.encryption_key = metadata.encryption_key
return self
4 changes: 3 additions & 1 deletion securedrop_export/command.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from enum import Enum


class Command(Enum):
"""
All supported commands.
Values are as supplied by the calling VM (sd-app), and a change in any values require
Values are as supplied by the calling VM (sd-app), and a change in any values requires
corresponding changes in the calling VM.
"""

PRINTER_PREFLIGHT = "printer-preflight"
PRINTER_TEST = "printer-test"
PRINT = "printer"
Expand Down
15 changes: 10 additions & 5 deletions securedrop_export/directory_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import tarfile
from pathlib import Path
from typing import Optional, Union
import subprocess
import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -105,7 +104,9 @@ def _check_path_traversal(filename_or_filepath: Union[str, Path]) -> None:
if filename_or_filepath.is_absolute():
base_path = filename_or_filepath
else:
base_path = Path.cwd() # use cwd so we can next ensure relative path does not traverse up
base_path = (
Path.cwd()
) # use cwd so we can next ensure relative path does not traverse up

try:
relative_path = relative_filepath(filename_or_filepath, base_path)
Expand All @@ -114,7 +115,10 @@ def _check_path_traversal(filename_or_filepath: Union[str, Path]) -> None:
# base, but can still have harmful side effects to the application. If this kind of
# traversal is needed, then call relative_filepath instead in order to check that the
# desired traversal does not go past a safe base directory.
if relative_path != filename_or_filepath and not filename_or_filepath.is_absolute():
if (
relative_path != filename_or_filepath
and not filename_or_filepath.is_absolute()
):
raise ValueError
except ValueError:
raise ValueError(f"Unsafe file or directory name: '{filename_or_filepath}'")
Expand Down Expand Up @@ -147,5 +151,6 @@ def _check_dir_permissions(dir_path: Union[str, Path]) -> None:
stat_res = os.stat(dir_path).st_mode
masked = stat_res & 0o777
if masked & 0o077:
raise RuntimeError("Unsafe permissions ({}) on {}".format(oct(stat_res), dir_path))

raise RuntimeError(
"Unsafe permissions ({}) on {}".format(oct(stat_res), dir_path)
)
Loading

0 comments on commit 65920dc

Please sign in to comment.