From f9d151be1a6bc3f96e27353757b98dcd87902438 Mon Sep 17 00:00:00 2001 From: heartsucker Date: Tue, 21 May 2019 14:02:20 +0200 Subject: [PATCH] it lives --- securedrop_client/logic.py | 7 ++-- securedrop_client/queue.py | 65 +++++++++++++++++--------------------- 2 files changed, 33 insertions(+), 39 deletions(-) diff --git a/securedrop_client/logic.py b/securedrop_client/logic.py index 0ad86bca9b..9b0d835407 100644 --- a/securedrop_client/logic.py +++ b/securedrop_client/logic.py @@ -147,7 +147,7 @@ def __init__(self, hostname, gui, session, self.api = None # type: sdclientapi.API # Queue that handles running API job - self.api_job_queue = ApiJobQueue(self.api, self) + self.api_job_queue = ApiJobQueue(self.api) # Contains active threads calling the API. self.api_threads = {} # type: Dict[str, Dict] @@ -313,9 +313,11 @@ def on_authenticate_success(self, result): self.gui.hide_login() self.sync_api() self.gui.show_main_window(self.api.username) + self.start_message_thread() self.start_reply_thread() - self.api_job_queue.start_queues() # TODO <------------------- this is wrong somehow? + + self.api_job_queue.start_queues(self.api) # Clear the sidebar error status bar if a message was shown # to the user indicating they should log in. @@ -543,7 +545,6 @@ def on_submission_download( """ Download the file associated with the Submission (which may be a File or Message). """ - print('on sub') if not self.api: # Then we should tell the user they need to login. self.on_action_requiring_login() return diff --git a/securedrop_client/queue.py b/securedrop_client/queue.py index 2ed03251ca..2a922bf378 100644 --- a/securedrop_client/queue.py +++ b/securedrop_client/queue.py @@ -1,10 +1,11 @@ import logging import sdclientapi -from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtBoundSignal, Qt +from PyQt5.QtCore import QObject, QThread, pyqtSlot +from PyQt5.QtWidgets import QApplication from queue import Queue from sdclientapi import API, RequestTimeoutError -from typing import Any, Optional, Union +from typing import Any, Union, Optional # noqa: F401 from securedrop_client.db import File, Message @@ -25,7 +26,7 @@ def _do_call_api(self, api_client: API) -> None: logger.debug('Job {} timed out'.format(self)) raise except Exception as e: - logger.error('Job {} raised an exception: {}'.format(self, e)) + logger.error('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e)) self.handle_failure(e) else: self.handle_success(result) @@ -63,21 +64,18 @@ def handle_failure(self, exception: Exception) -> None: class RunnableQueue(QObject): - def __init__(self, api_client: API, halt_signal: pyqtBoundSignal) -> None: + def __init__(self, api_client: API) -> None: super().__init__() - self.run = True self.api_client = api_client self.queue = Queue() # type: Queue[ApiJob] self.last_job = None # type: Optional[ApiJob] - self.halt_signal = halt_signal - halt_signal.connect(self.stop, type=Qt.QueuedConnection) + @pyqtSlot() + def process(self) -> None: # pragma: nocover + self.__process(True) - def stop(self) -> None: - self.run = False - - def __call__(self, loop: bool = True) -> None: - while self.run: + def __process(self, loop: bool) -> None: + while True: # retry the "cached" job if it exists, otherwise get the next job job = self.last_job or self.queue.get(block=True) self.last_job = None @@ -85,45 +83,40 @@ def __call__(self, loop: bool = True) -> None: try: job._do_call_api(self.api_client) except RequestTimeoutError: - self.run = False - self.halt_signal.emit() # notify other threads of failure self.last_job = job # "cache" the last job since we can't re-queue it return if not loop: return + # process events to allow this thread to handle incoming signals + QApplication.processEvents() -class ApiJobQueue(QObject): - ''' - Signal used to notify different job threads that they should halt. This is a pub/sub like signal - in that any job queues may trigger it, and all job queues listen to it. - ''' - halt_signal = pyqtSignal() +class ApiJobQueue(QObject): - def __init__(self, api_client: API, parent: Optional[QObject] = None) -> None: - super().__init__(parent) + def __init__(self, api_client: API) -> None: + super().__init__(None) self.api_client = api_client - self.main_queue = RunnableQueue(self.api_client, self.halt_signal) - self.download_queue = RunnableQueue(self.api_client, self.halt_signal) - def start_queues(self) -> None: - # ensure the queues are set to run (for previously stopped threads) - self.main_queue.run = True - self.download_queue.run = True + self.main_thread = QThread() + self.download_thread = QThread() + + self.main_queue = RunnableQueue(self.api_client) + self.download_queue = RunnableQueue(self.api_client) - main_thread = QThread(self) - download_thread = QThread(self) + self.main_queue.moveToThread(self.main_thread) + self.download_queue.moveToThread(self.download_thread) - self.main_queue.moveToThread(main_thread) - self.download_queue.moveToThread(download_thread) + self.main_thread.started.connect(self.main_queue.process) + self.download_thread.started.connect(self.download_queue.process) - main_thread.started.connect(self.main_queue) - download_thread.started.connect(self.download_queue) + def start_queues(self, api_client: API) -> None: + self.main_queue.api_client = api_client + self.download_queue.api_client = api_client - main_thread.start() - download_thread.start() + self.main_thread.start() + self.download_thread.start() def enqueue(self, job: ApiJob) -> None: if isinstance(job, DownloadSubmissionJob):