Skip to content

Commit

Permalink
it lives
Browse files Browse the repository at this point in the history
  • Loading branch information
heartsucker committed May 21, 2019
1 parent 61329c5 commit f9d151b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 39 deletions.
7 changes: 4 additions & 3 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def __init__(self, hostname, gui, session,
self.api = None # type: sdclientapi.API

# Queue that handles running API job
self.api_job_queue = ApiJobQueue(self.api, self)
self.api_job_queue = ApiJobQueue(self.api)

# Contains active threads calling the API.
self.api_threads = {} # type: Dict[str, Dict]
Expand Down Expand Up @@ -313,9 +313,11 @@ def on_authenticate_success(self, result):
self.gui.hide_login()
self.sync_api()
self.gui.show_main_window(self.api.username)

self.start_message_thread()
self.start_reply_thread()
self.api_job_queue.start_queues() # TODO <------------------- this is wrong somehow?

self.api_job_queue.start_queues(self.api)

# Clear the sidebar error status bar if a message was shown
# to the user indicating they should log in.
Expand Down Expand Up @@ -543,7 +545,6 @@ def on_submission_download(
"""
Download the file associated with the Submission (which may be a File or Message).
"""
print('on sub')
if not self.api: # Then we should tell the user they need to login.
self.on_action_requiring_login()
return
Expand Down
65 changes: 29 additions & 36 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
import sdclientapi

from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtBoundSignal, Qt
from PyQt5.QtCore import QObject, QThread, pyqtSlot
from PyQt5.QtWidgets import QApplication
from queue import Queue
from sdclientapi import API, RequestTimeoutError
from typing import Any, Optional, Union
from typing import Any, Union, Optional # noqa: F401

from securedrop_client.db import File, Message

Expand All @@ -25,7 +26,7 @@ def _do_call_api(self, api_client: API) -> None:
logger.debug('Job {} timed out'.format(self))
raise
except Exception as e:
logger.error('Job {} raised an exception: {}'.format(self, e))
logger.error('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e))
self.handle_failure(e)
else:
self.handle_success(result)
Expand Down Expand Up @@ -63,67 +64,59 @@ def handle_failure(self, exception: Exception) -> None:

class RunnableQueue(QObject):

def __init__(self, api_client: API, halt_signal: pyqtBoundSignal) -> None:
def __init__(self, api_client: API) -> None:
super().__init__()
self.run = True
self.api_client = api_client
self.queue = Queue() # type: Queue[ApiJob]
self.last_job = None # type: Optional[ApiJob]

self.halt_signal = halt_signal
halt_signal.connect(self.stop, type=Qt.QueuedConnection)
@pyqtSlot()
def process(self) -> None: # pragma: nocover
self.__process(True)

def stop(self) -> None:
self.run = False

def __call__(self, loop: bool = True) -> None:
while self.run:
def __process(self, loop: bool) -> None:
while True:
# retry the "cached" job if it exists, otherwise get the next job
job = self.last_job or self.queue.get(block=True)
self.last_job = None

try:
job._do_call_api(self.api_client)
except RequestTimeoutError:
self.run = False
self.halt_signal.emit() # notify other threads of failure
self.last_job = job # "cache" the last job since we can't re-queue it
return

if not loop:
return

# process events to allow this thread to handle incoming signals
QApplication.processEvents()

class ApiJobQueue(QObject):

'''
Signal used to notify different job threads that they should halt. This is a pub/sub like signal
in that any job queues may trigger it, and all job queues listen to it.
'''
halt_signal = pyqtSignal()
class ApiJobQueue(QObject):

def __init__(self, api_client: API, parent: Optional[QObject] = None) -> None:
super().__init__(parent)
def __init__(self, api_client: API) -> None:
super().__init__(None)
self.api_client = api_client
self.main_queue = RunnableQueue(self.api_client, self.halt_signal)
self.download_queue = RunnableQueue(self.api_client, self.halt_signal)

def start_queues(self) -> None:
# ensure the queues are set to run (for previously stopped threads)
self.main_queue.run = True
self.download_queue.run = True
self.main_thread = QThread()
self.download_thread = QThread()

self.main_queue = RunnableQueue(self.api_client)
self.download_queue = RunnableQueue(self.api_client)

main_thread = QThread(self)
download_thread = QThread(self)
self.main_queue.moveToThread(self.main_thread)
self.download_queue.moveToThread(self.download_thread)

self.main_queue.moveToThread(main_thread)
self.download_queue.moveToThread(download_thread)
self.main_thread.started.connect(self.main_queue.process)
self.download_thread.started.connect(self.download_queue.process)

main_thread.started.connect(self.main_queue)
download_thread.started.connect(self.download_queue)
def start_queues(self, api_client: API) -> None:
self.main_queue.api_client = api_client
self.download_queue.api_client = api_client

main_thread.start()
download_thread.start()
self.main_thread.start()
self.download_thread.start()

def enqueue(self, job: ApiJob) -> None:
if isinstance(job, DownloadSubmissionJob):
Expand Down

0 comments on commit f9d151b

Please sign in to comment.