From a16e57cbee8e2d745a398f9f132086a5dd5354ff Mon Sep 17 00:00:00 2001 From: heartsucker Date: Wed, 15 May 2019 17:13:12 +0200 Subject: [PATCH] WIP --- securedrop_client/gui/widgets.py | 2 +- securedrop_client/logic.py | 56 +++++++++++++++++++++----------- securedrop_client/queue.py | 52 +++++++++++++++++++++++------ 3 files changed, 80 insertions(+), 30 deletions(-) diff --git a/securedrop_client/gui/widgets.py b/securedrop_client/gui/widgets.py index ad41df7e86..f17ffcc64d 100644 --- a/securedrop_client/gui/widgets.py +++ b/securedrop_client/gui/widgets.py @@ -1339,7 +1339,7 @@ def mouseReleaseEvent(self, e): self.controller.on_file_open(self.submission) else: # Download the file. - self.controller.on_file_download(self.source, self.submission) + self.controller.on_submission_download(self.source, self.submission) class ConversationView(QWidget): diff --git a/securedrop_client/logic.py b/securedrop_client/logic.py index 9e4c81bd8d..0ad86bca9b 100644 --- a/securedrop_client/logic.py +++ b/securedrop_client/logic.py @@ -28,13 +28,14 @@ from gettext import gettext as _ from PyQt5.QtCore import QObject, QThread, pyqtSignal, QTimer, QProcess from sdclientapi import RequestTimeoutError -from typing import Dict, Tuple # noqa: F401 +from typing import Dict, Tuple, Union # noqa: F401 from securedrop_client import storage from securedrop_client import db -from securedrop_client.utils import check_dir_permissions from securedrop_client.crypto import GpgHelper, CryptoError from securedrop_client.message_sync import MessageSync, ReplySync +from securedrop_client.queue import ApiJobQueue, DownloadSubmissionJob +from securedrop_client.utils import check_dir_permissions logger = logging.getLogger(__name__) @@ -144,6 +145,10 @@ def __init__(self, hostname, gui, session, # Reference to the API for secure drop proxy. self.api = None # type: sdclientapi.API + + # Queue that handles running API job + self.api_job_queue = ApiJobQueue(self.api, self) + # Contains active threads calling the API. self.api_threads = {} # type: Dict[str, Dict] @@ -310,6 +315,7 @@ def on_authenticate_success(self, result): self.gui.show_main_window(self.api.username) self.start_message_thread() self.start_reply_thread() + self.api_job_queue.start_queues() # TODO <------------------- this is wrong somehow? # Clear the sidebar error status bar if a message was shown # to the user indicating they should log in. @@ -508,35 +514,47 @@ def on_file_open(self, file_db_object): # Non Qubes OS. Just log the event for now. logger.info('Opening file "{}".'.format(submission_filepath)) - def on_file_download(self, source_db_object, message): + def on_reply_download(self, source_db_object: db.Source, reply: db.Reply) -> None: """ - Download the file associated with the associated message (which may - be a Submission or Reply). + Download the file associated with the Reply. """ if not self.api: # Then we should tell the user they need to login. self.on_action_requiring_login() return - if isinstance(message, db.File) or isinstance(message, db.Message): - # Handle submissions. - func = self.api.download_submission - sdk_object = sdclientapi.Submission(uuid=message.uuid) - sdk_object.filename = message.filename - sdk_object.source_uuid = source_db_object.uuid - elif isinstance(message, db.Reply): - # Handle journalist's replies. - func = self.api.download_reply - sdk_object = sdclientapi.Reply(uuid=message.uuid) - sdk_object.filename = message.filename - sdk_object.source_uuid = source_db_object.uuid + sdk_object = sdclientapi.Reply(uuid=reply.uuid) + sdk_object.filename = reply.filename + sdk_object.source_uuid = source_db_object.uuid self.set_status(_('Downloading {}'.format(sdk_object.filename))) - self.call_api(func, + + self.call_api(self.api.download_reply, self.on_file_download_success, self.on_file_download_failure, sdk_object, self.data_dir, - current_object=message) + current_object=reply) + + def on_submission_download( + self, + source_db_object: db.Source, + submission: Union[db.File, db.Message], + ) -> None: + """ + Download the file associated with the Submission (which may be a File or Message). + """ + print('on sub') + if not self.api: # Then we should tell the user they need to login. + self.on_action_requiring_login() + return + + sdk_object = sdclientapi.Submission(uuid=submission.uuid) + sdk_object.filename = submission.filename + sdk_object.source_uuid = source_db_object.uuid + + job = DownloadSubmissionJob(sdk_object, self.data_dir, submission) + self.api_job_queue.enqueue(job) + self.set_status(_('Downloading {}'.format(sdk_object.filename))) def on_file_download_success(self, result, current_object): """ diff --git a/securedrop_client/queue.py b/securedrop_client/queue.py index fd4f9630ee..2ed03251ca 100644 --- a/securedrop_client/queue.py +++ b/securedrop_client/queue.py @@ -1,7 +1,15 @@ -from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtBoundSignal +import logging +import sdclientapi + +from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtBoundSignal, Qt from queue import Queue from sdclientapi import API, RequestTimeoutError -from typing import Any, Optional +from typing import Any, Optional, Union + +from securedrop_client.db import File, Message + + +logger = logging.getLogger(__name__) class ApiJob: @@ -14,8 +22,10 @@ def _do_call_api(self, api_client: API) -> None: try: result = self.call_api(api_client, self.nargs, self.kwargs) except RequestTimeoutError: + logger.debug('Job {} timed out'.format(self)) raise except Exception as e: + logger.error('Job {} raised an exception: {}'.format(self, e)) self.handle_failure(e) else: self.handle_success(result) @@ -30,11 +40,26 @@ def handle_failure(self, exception: Exception) -> None: raise NotImplementedError -class DownloadFileJob(ApiJob): +class DownloadSubmissionJob(ApiJob): + + def __init__( + self, + submission: sdclientapi.Submission, + data_dir: str, + db_object: Union[File, Message], + ) -> None: + super().__init__([submission, data_dir], {}) + self.__db_object = db_object def call_api(self, api_client: API, nargs: list, kwargs: dict) -> Any: return api_client.download_submission(*nargs, **kwargs) + def handle_success(self, result: Any) -> None: + print('success', result) + + def handle_failure(self, exception: Exception) -> None: + print('fail', exception) + class RunnableQueue(QObject): @@ -43,22 +68,26 @@ def __init__(self, api_client: API, halt_signal: pyqtBoundSignal) -> None: self.run = True self.api_client = api_client self.queue = Queue() # type: Queue[ApiJob] + self.last_job = None # type: Optional[ApiJob] self.halt_signal = halt_signal - halt_signal.connect(self.stop) + halt_signal.connect(self.stop, type=Qt.QueuedConnection) def stop(self) -> None: self.run = False def __call__(self, loop: bool = True) -> None: while self.run: - job = self.queue.get(block=True) # type: ApiJob + # retry the "cached" job if it exists, otherwise get the next job + job = self.last_job or self.queue.get(block=True) + self.last_job = None try: job._do_call_api(self.api_client) except RequestTimeoutError: self.run = False self.halt_signal.emit() # notify other threads of failure + self.last_job = job # "cache" the last job since we can't re-queue it return if not loop: @@ -68,8 +97,8 @@ def __call__(self, loop: bool = True) -> None: class ApiJobQueue(QObject): ''' - Signal used to notify different job threads that they should halt. This is pub/sub like signal - in that any threat may trigger it, and all threads listen to it. + Signal used to notify different job threads that they should halt. This is a pub/sub like signal + in that any job queues may trigger it, and all job queues listen to it. ''' halt_signal = pyqtSignal() @@ -90,11 +119,14 @@ def start_queues(self) -> None: self.main_queue.moveToThread(main_thread) self.download_queue.moveToThread(download_thread) - main_thread.run() - download_thread.run() + main_thread.started.connect(self.main_queue) + download_thread.started.connect(self.download_queue) + + main_thread.start() + download_thread.start() def enqueue(self, job: ApiJob) -> None: - if isinstance(job, DownloadFileJob): + if isinstance(job, DownloadSubmissionJob): self.download_queue.queue.put_nowait(job) else: self.main_queue.queue.put_nowait(job)