Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
heartsucker committed May 21, 2019
1 parent e0b83ee commit 61329c5
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 30 deletions.
2 changes: 1 addition & 1 deletion securedrop_client/gui/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ def mouseReleaseEvent(self, e):
self.controller.on_file_open(self.submission)
else:
# Download the file.
self.controller.on_file_download(self.source, self.submission)
self.controller.on_submission_download(self.source, self.submission)


class ConversationView(QWidget):
Expand Down
56 changes: 37 additions & 19 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
from gettext import gettext as _
from PyQt5.QtCore import QObject, QThread, pyqtSignal, QTimer, QProcess
from sdclientapi import RequestTimeoutError
from typing import Dict, Tuple # noqa: F401
from typing import Dict, Tuple, Union # noqa: F401

from securedrop_client import storage
from securedrop_client import db
from securedrop_client.utils import check_dir_permissions
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.message_sync import MessageSync, ReplySync
from securedrop_client.queue import ApiJobQueue, DownloadSubmissionJob
from securedrop_client.utils import check_dir_permissions

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -144,6 +145,10 @@ def __init__(self, hostname, gui, session,

# Reference to the API for secure drop proxy.
self.api = None # type: sdclientapi.API

# Queue that handles running API job
self.api_job_queue = ApiJobQueue(self.api, self)

# Contains active threads calling the API.
self.api_threads = {} # type: Dict[str, Dict]

Expand Down Expand Up @@ -310,6 +315,7 @@ def on_authenticate_success(self, result):
self.gui.show_main_window(self.api.username)
self.start_message_thread()
self.start_reply_thread()
self.api_job_queue.start_queues() # TODO <------------------- this is wrong somehow?

# Clear the sidebar error status bar if a message was shown
# to the user indicating they should log in.
Expand Down Expand Up @@ -508,35 +514,47 @@ def on_file_open(self, file_db_object):
# Non Qubes OS. Just log the event for now.
logger.info('Opening file "{}".'.format(submission_filepath))

def on_file_download(self, source_db_object, message):
def on_reply_download(self, source_db_object: db.Source, reply: db.Reply) -> None:
"""
Download the file associated with the associated message (which may
be a Submission or Reply).
Download the file associated with the Reply.
"""
if not self.api: # Then we should tell the user they need to login.
self.on_action_requiring_login()
return

if isinstance(message, db.File) or isinstance(message, db.Message):
# Handle submissions.
func = self.api.download_submission
sdk_object = sdclientapi.Submission(uuid=message.uuid)
sdk_object.filename = message.filename
sdk_object.source_uuid = source_db_object.uuid
elif isinstance(message, db.Reply):
# Handle journalist's replies.
func = self.api.download_reply
sdk_object = sdclientapi.Reply(uuid=message.uuid)
sdk_object.filename = message.filename
sdk_object.source_uuid = source_db_object.uuid
sdk_object = sdclientapi.Reply(uuid=reply.uuid)
sdk_object.filename = reply.filename
sdk_object.source_uuid = source_db_object.uuid

self.set_status(_('Downloading {}'.format(sdk_object.filename)))
self.call_api(func,

self.call_api(self.api.download_reply,
self.on_file_download_success,
self.on_file_download_failure,
sdk_object,
self.data_dir,
current_object=message)
current_object=reply)

def on_submission_download(
self,
source_db_object: db.Source,
submission: Union[db.File, db.Message],
) -> None:
"""
Download the file associated with the Submission (which may be a File or Message).
"""
print('on sub')
if not self.api: # Then we should tell the user they need to login.
self.on_action_requiring_login()
return

sdk_object = sdclientapi.Submission(uuid=submission.uuid)
sdk_object.filename = submission.filename
sdk_object.source_uuid = source_db_object.uuid

job = DownloadSubmissionJob(sdk_object, self.data_dir, submission)
self.api_job_queue.enqueue(job)
self.set_status(_('Downloading {}'.format(sdk_object.filename)))

def on_file_download_success(self, result, current_object):
"""
Expand Down
52 changes: 42 additions & 10 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtBoundSignal
import logging
import sdclientapi

from PyQt5.QtCore import QObject, QThread, pyqtSignal, pyqtBoundSignal, Qt
from queue import Queue
from sdclientapi import API, RequestTimeoutError
from typing import Any, Optional
from typing import Any, Optional, Union

from securedrop_client.db import File, Message


logger = logging.getLogger(__name__)


class ApiJob:
Expand All @@ -14,8 +22,10 @@ def _do_call_api(self, api_client: API) -> None:
try:
result = self.call_api(api_client, self.nargs, self.kwargs)
except RequestTimeoutError:
logger.debug('Job {} timed out'.format(self))
raise
except Exception as e:
logger.error('Job {} raised an exception: {}'.format(self, e))
self.handle_failure(e)
else:
self.handle_success(result)
Expand All @@ -30,11 +40,26 @@ def handle_failure(self, exception: Exception) -> None:
raise NotImplementedError


class DownloadFileJob(ApiJob):
class DownloadSubmissionJob(ApiJob):

def __init__(
self,
submission: sdclientapi.Submission,
data_dir: str,
db_object: Union[File, Message],
) -> None:
super().__init__([submission, data_dir], {})
self.__db_object = db_object

def call_api(self, api_client: API, nargs: list, kwargs: dict) -> Any:
return api_client.download_submission(*nargs, **kwargs)

def handle_success(self, result: Any) -> None:
print('success', result)

def handle_failure(self, exception: Exception) -> None:
print('fail', exception)


class RunnableQueue(QObject):

Expand All @@ -43,22 +68,26 @@ def __init__(self, api_client: API, halt_signal: pyqtBoundSignal) -> None:
self.run = True
self.api_client = api_client
self.queue = Queue() # type: Queue[ApiJob]
self.last_job = None # type: Optional[ApiJob]

self.halt_signal = halt_signal
halt_signal.connect(self.stop)
halt_signal.connect(self.stop, type=Qt.QueuedConnection)

def stop(self) -> None:
self.run = False

def __call__(self, loop: bool = True) -> None:
while self.run:
job = self.queue.get(block=True) # type: ApiJob
# retry the "cached" job if it exists, otherwise get the next job
job = self.last_job or self.queue.get(block=True)
self.last_job = None

try:
job._do_call_api(self.api_client)
except RequestTimeoutError:
self.run = False
self.halt_signal.emit() # notify other threads of failure
self.last_job = job # "cache" the last job since we can't re-queue it
return

if not loop:
Expand All @@ -68,8 +97,8 @@ def __call__(self, loop: bool = True) -> None:
class ApiJobQueue(QObject):

'''
Signal used to notify different job threads that they should halt. This is pub/sub like signal
in that any threat may trigger it, and all threads listen to it.
Signal used to notify different job threads that they should halt. This is a pub/sub like signal
in that any job queues may trigger it, and all job queues listen to it.
'''
halt_signal = pyqtSignal()

Expand All @@ -90,11 +119,14 @@ def start_queues(self) -> None:
self.main_queue.moveToThread(main_thread)
self.download_queue.moveToThread(download_thread)

main_thread.run()
download_thread.run()
main_thread.started.connect(self.main_queue)
download_thread.started.connect(self.download_queue)

main_thread.start()
download_thread.start()

def enqueue(self, job: ApiJob) -> None:
if isinstance(job, DownloadFileJob):
if isinstance(job, DownloadSubmissionJob):
self.download_queue.queue.put_nowait(job)
else:
self.main_queue.queue.put_nowait(job)

0 comments on commit 61329c5

Please sign in to comment.