Skip to content

Commit

Permalink
app: pure refactor/reorganize to separate *Queue and APIJob objects
Browse files Browse the repository at this point in the history
We are going to be adding quite a few more API jobs (even without
considering any additional functionality that might be added to the
application). Let's separate the logic out a bit so that it's easier
to move around the codebase.
  • Loading branch information
redshiftzero authored and sssoleileraaa committed Jun 4, 2019
1 parent dd6ee5e commit 6125793
Show file tree
Hide file tree
Showing 10 changed files with 516 additions and 483 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ mypy: ## Run static type checker
securedrop_client/resources/__init__.py \
securedrop_client/storage.py \
securedrop_client/message_sync.py \
securedrop_client/queue.py
securedrop_client/queue.py \
securedrop_client/api_jobs/__init__.py \
securedrop_client/api_jobs/base.py \
securedrop_client/api_jobs/downloads.py

.PHONY: clean
clean: ## Clean the workspace of generated resources
Expand Down
Empty file.
62 changes: 62 additions & 0 deletions securedrop_client/api_jobs/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging

from PyQt5.QtCore import QObject, pyqtSignal
from sdclientapi import API, RequestTimeoutError, AuthError
from sqlalchemy.orm.session import Session
from typing import Any, Optional


logger = logging.getLogger(__name__)


class ApiInaccessibleError(Exception):

def __init__(self, message: Optional[str] = None) -> None:
if not message:
message = ('API is inaccessible either because there is no client or because the '
'client is not properly authenticated.')
super().__init__(message)


class ApiJob(QObject):

'''
Signal that is emitted after an job finishes successfully.
'''
success_signal = pyqtSignal('PyQt_PyObject')

'''
Signal that is emitted if there is a failure during the job.
'''
failure_signal = pyqtSignal(Exception)

def __init__(self) -> None:
super().__init__(None) # `None` because the QOjbect has no parent

def _do_call_api(self, api_client: API, session: Session) -> None:
if not api_client:
raise ApiInaccessibleError()

try:
result = self.call_api(api_client, session)
except AuthError as e:
raise ApiInaccessibleError() from e
except RequestTimeoutError:
logger.debug('Job {} timed out'.format(self))
raise
except Exception as e:
logger.error('Job {} raised an exception: {}: {}'
.format(self, type(e).__name__, e))
self.failure_signal.emit(e)
else:
self.success_signal.emit(result)

def call_api(self, api_client: API, session: Session) -> Any:
'''
Method for making the actual API call and handling the result.
This MUST resturn a value if the API call and other tasks were successful and MUST raise
an exception if and only iff the tasks failed. Presence of a raise exception indicates a
failure.
'''
raise NotImplementedError
111 changes: 111 additions & 0 deletions securedrop_client/api_jobs/downloads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import binascii
import hashlib
import logging
import os
import sdclientapi
import shutil

from sdclientapi import API
from sqlalchemy.orm.session import Session
from typing import Any, Union, Type, Tuple

from securedrop_client import storage
from securedrop_client.api_jobs.base import ApiJob
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.db import File, Message


logger = logging.getLogger(__name__)


class DownloadSubmissionJob(ApiJob):

CHUNK_SIZE = 4096

def __init__(
self,
submission_type: Union[Type[File], Type[Message]],
submission_uuid: str,
data_dir: str,
gpg: GpgHelper,
) -> None:
super().__init__()
self.data_dir = data_dir
self.submission_type = submission_type
self.submission_uuid = submission_uuid
self.gpg = gpg

def call_api(self, api_client: API, session: Session) -> Any:
db_object = session.query(self.submission_type) \
.filter_by(uuid=self.submission_uuid).one()

etag, download_path = self._make_call(db_object, api_client)

if not self._check_file_integrity(etag, download_path):
raise RuntimeError('Downloaded file had an invalid checksum.')

self._decrypt_file(session, db_object, download_path)

return db_object.uuid

def _make_call(self, db_object: Union[File, Message], api_client: API) -> Tuple[str, str]:
sdk_obj = sdclientapi.Submission(uuid=db_object.uuid)
sdk_obj.filename = db_object.filename
sdk_obj.source_uuid = db_object.source.uuid

return api_client.download_submission(sdk_obj)

@classmethod
def _check_file_integrity(cls, etag: str, file_path: str) -> bool:
'''
Checks if the file is valid.
:return: `True` if valid or unknown, `False` otherwise.
'''
if not etag:
logger.debug('No ETag. Skipping integrity check for file at {}'.format(file_path))
return True

alg, checksum = etag.split(':')

if alg == 'sha256':
hasher = hashlib.sha256()
else:
logger.debug('Unknown hash algorithm ({}). Skipping integrity check for file at {}'
.format(alg, file_path))
return True

with open(file_path, 'rb') as f:
while True:
read_bytes = f.read(cls.CHUNK_SIZE)
if not read_bytes:
break
hasher.update(read_bytes)

calculated_checksum = binascii.hexlify(hasher.digest()).decode('utf-8')
return calculated_checksum == checksum

def _decrypt_file(
self,
session: Session,
db_object: Union[File, Message],
file_path: str,
) -> None:
file_uuid = db_object.uuid
server_filename = db_object.filename

# The filename contains the location where the file has been stored. On non-Qubes OSes, this
# will be the data directory. On Qubes OS, this will a ~/QubesIncoming directory. In case we
# are on Qubes, we should move the file to the data directory and name it the same as the
# server (e.g. spotless-tater-msg.gpg).
filepath_in_datadir = os.path.join(self.data_dir, server_filename)
shutil.move(file_path, filepath_in_datadir)
storage.mark_file_as_downloaded(file_uuid, session)

try:
self.gpg.decrypt_submission_or_reply(filepath_in_datadir, server_filename, is_doc=True)
except CryptoError as e:
logger.debug('Failed to decrypt file {}: {}'.format(server_filename, e))
storage.set_object_decryption_status_with_content(db_object, session, False)
raise e

storage.set_object_decryption_status_with_content(db_object, session, True)
163 changes: 5 additions & 158 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
@@ -1,172 +1,19 @@
import binascii
import hashlib
import logging
import os
import sdclientapi
import shutil

from PyQt5.QtCore import QObject, QThread, pyqtSlot, pyqtSignal
from PyQt5.QtCore import QObject, QThread, pyqtSlot
from PyQt5.QtWidgets import QApplication
from queue import Queue
from sdclientapi import API, RequestTimeoutError, AuthError
from sdclientapi import API, RequestTimeoutError
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm.session import Session
from typing import Any, Union, Optional, Type, Tuple
from typing import Optional # noqa: F401

from securedrop_client import storage
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.db import File, Message
from securedrop_client.api_jobs.base import ApiJob
from securedrop_client.api_jobs.downloads import DownloadSubmissionJob


logger = logging.getLogger(__name__)


class ApiInaccessibleError(Exception):

def __init__(self, message: Optional[str] = None) -> None:
if not message:
message = ('API is inaccessible either because there is no client or because the '
'client is not properly authenticated.')
super().__init__(message)


class ApiJob(QObject):

'''
Signal that is emitted after an job finishes successfully.
'''
success_signal = pyqtSignal('PyQt_PyObject')

'''
Signal that is emitted if there is a failure during the job.
'''
failure_signal = pyqtSignal(Exception)

def __init__(self) -> None:
super().__init__(None) # `None` because the QOjbect has no parent

def _do_call_api(self, api_client: API, session: Session) -> None:
if not api_client:
raise ApiInaccessibleError()

try:
result = self.call_api(api_client, session)
except AuthError as e:
raise ApiInaccessibleError() from e
except RequestTimeoutError:
logger.debug('Job {} timed out'.format(self))
raise
except Exception as e:
logger.error('Job {} raised an exception: {}: {}'
.format(self, type(e).__name__, e))
self.failure_signal.emit(e)
else:
self.success_signal.emit(result)

def call_api(self, api_client: API, session: Session) -> Any:
'''
Method for making the actual API call and handling the result.
This MUST resturn a value if the API call and other tasks were successful and MUST raise
an exception if and only iff the tasks failed. Presence of a raise exception indicates a
failure.
'''
raise NotImplementedError


class DownloadSubmissionJob(ApiJob):

CHUNK_SIZE = 4096

def __init__(
self,
submission_type: Union[Type[File], Type[Message]],
submission_uuid: str,
data_dir: str,
gpg: GpgHelper,
) -> None:
super().__init__()
self.data_dir = data_dir
self.submission_type = submission_type
self.submission_uuid = submission_uuid
self.gpg = gpg

def call_api(self, api_client: API, session: Session) -> Any:
db_object = session.query(self.submission_type) \
.filter_by(uuid=self.submission_uuid).one()

etag, download_path = self._make_call(db_object, api_client)

if not self._check_file_integrity(etag, download_path):
raise RuntimeError('Downloaded file had an invalid checksum.')

self._decrypt_file(session, db_object, download_path)

return db_object.uuid

def _make_call(self, db_object: Union[File, Message], api_client: API) -> Tuple[str, str]:
sdk_obj = sdclientapi.Submission(uuid=db_object.uuid)
sdk_obj.filename = db_object.filename
sdk_obj.source_uuid = db_object.source.uuid

return api_client.download_submission(sdk_obj)

@classmethod
def _check_file_integrity(cls, etag: str, file_path: str) -> bool:
'''
Checks if the file is valid.
:return: `True` if valid or unknown, `False` otherwise.
'''
if not etag:
logger.debug('No ETag. Skipping integrity check for file at {}'.format(file_path))
return True

alg, checksum = etag.split(':')

if alg == 'sha256':
hasher = hashlib.sha256()
else:
logger.debug('Unknown hash algorithm ({}). Skipping integrity check for file at {}'
.format(alg, file_path))
return True

with open(file_path, 'rb') as f:
while True:
read_bytes = f.read(cls.CHUNK_SIZE)
if not read_bytes:
break
hasher.update(read_bytes)

calculated_checksum = binascii.hexlify(hasher.digest()).decode('utf-8')
return calculated_checksum == checksum

def _decrypt_file(
self,
session: Session,
db_object: Union[File, Message],
file_path: str,
) -> None:
file_uuid = db_object.uuid
server_filename = db_object.filename

# The filename contains the location where the file has been stored. On non-Qubes OSes, this
# will be the data directory. On Qubes OS, this will a ~/QubesIncoming directory. In case we
# are on Qubes, we should move the file to the data directory and name it the same as the
# server (e.g. spotless-tater-msg.gpg).
filepath_in_datadir = os.path.join(self.data_dir, server_filename)
shutil.move(file_path, filepath_in_datadir)
storage.mark_file_as_downloaded(file_uuid, session)

try:
self.gpg.decrypt_submission_or_reply(filepath_in_datadir, server_filename, is_doc=True)
except CryptoError as e:
logger.debug('Failed to decrypt file {}: {}'.format(server_filename, e))
storage.set_object_decryption_status_with_content(db_object, session, False)
raise e

storage.set_object_decryption_status_with_content(db_object, session, True)


class RunnableQueue(QObject):

def __init__(self, api_client: API, session_maker: scoped_session) -> None:
Expand Down
Empty file added tests/api_jobs/__init__.py
Empty file.
Loading

0 comments on commit 6125793

Please sign in to comment.