Skip to content

Commit

Permalink
Merge branch 'master' into inline-conversation-updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ntoll committed Jan 28, 2020
2 parents 4738bea + eb9ce9c commit 01bd4ef
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 11 deletions.
6 changes: 3 additions & 3 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ def setup(self):
self.sync_timer.timeout.connect(self.update_sync)
self.sync_timer.start(30000)

# Automagically sync with the API every 5 minutes.
# Automagically sync with the API every minute.
self.sync_update = QTimer()
self.sync_update.timeout.connect(self.sync_api)
self.sync_update.start(1000 * 60 * 5) # every 5 minutes.
self.sync_update.start(1000 * 60) # every minute.

# Run export object in a separate thread context (a reference to the
# thread is kept on self such that it does not get garbage collected
Expand Down Expand Up @@ -428,6 +428,7 @@ def on_sync_success(self) -> None:
self.download_new_messages()
self.download_new_replies()
self.sync_events.emit('synced')
self.resume_queues()

def on_sync_failure(self, result: Exception) -> None:
"""
Expand All @@ -439,7 +440,6 @@ def on_sync_failure(self, result: Exception) -> None:
_('The SecureDrop server cannot be reached.'),
duration=0,
retry=True)
self.resume_queues()

def on_refresh_failure(self, result: Exception) -> None:
"""
Expand Down
49 changes: 43 additions & 6 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from PyQt5.QtCore import QObject, QThread, pyqtSlot, pyqtSignal
from queue import PriorityQueue
from queue import PriorityQueue, Full
from sdclientapi import API, RequestTimeoutError
from sqlalchemy.orm import scoped_session
from typing import Optional, Tuple # noqa: F401
Expand Down Expand Up @@ -61,11 +61,19 @@ class RunnableQueue(QObject):
'''
resume = pyqtSignal()

def __init__(self, api_client: API, session_maker: scoped_session) -> None:
"""
Signal emitted when the queue successfully.
"""
pinged = pyqtSignal()

def __init__(self, api_client: API, session_maker: scoped_session, size: int = 0) -> None:
"""
A size of zero means there's no upper bound to the queue size.
"""
super().__init__()
self.api_client = api_client
self.session_maker = session_maker
self.queue = PriorityQueue() # type: PriorityQueue[Tuple[int, ApiJob]]
self.queue = PriorityQueue(maxsize=size) # type: PriorityQueue[Tuple[int, ApiJob]]
# `order_number` ensures jobs with equal priority are retrived in FIFO order. This is needed
# because PriorityQueue is implemented using heapq which does not have sort stability. For
# more info, see : https://bugs.python.org/issue17794
Expand All @@ -81,7 +89,12 @@ def add_job(self, job: ApiJob) -> None:
current_order_number = next(self.order_number)
job.order_number = current_order_number
priority = self.JOB_PRIORITIES[type(job)]
self.queue.put_nowait((priority, job))
try:
self.queue.put_nowait((priority, job))
except Full:
# Pass silently if the queue is full. For use with MetadataSyncJob.
# See #652.
pass

def re_add_job(self, job: ApiJob) -> None:
'''
Expand Down Expand Up @@ -117,6 +130,7 @@ def process(self) -> None:
try:
session = self.session_maker()
job._do_call_api(self.api_client, session)
self.pinged.emit()
except (RequestTimeoutError, ApiInaccessibleError) as e:
logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e))
self.add_job(PauseQueueJob())
Expand All @@ -139,27 +153,36 @@ def __init__(self, api_client: API, session_maker: scoped_session) -> None:

self.main_thread = QThread()
self.download_file_thread = QThread()
self.metadata_thread = QThread()

self.main_queue = RunnableQueue(api_client, session_maker)
self.download_file_queue = RunnableQueue(api_client, session_maker)
self.metadata_queue = RunnableQueue(api_client, session_maker, size=1)

self.main_queue.moveToThread(self.main_thread)
self.download_file_queue.moveToThread(self.download_file_thread)
self.metadata_queue.moveToThread(self.metadata_thread)

self.main_thread.started.connect(self.main_queue.process)
self.download_file_thread.started.connect(self.download_file_queue.process)
self.metadata_thread.started.connect(self.metadata_queue.process)

self.main_queue.paused.connect(self.on_queue_paused)
self.download_file_queue.paused.connect(self.on_queue_paused)
self.metadata_queue.paused.connect(self.on_queue_paused)

self.metadata_queue.pinged.connect(self.resume_queues)

def logout(self) -> None:
self.main_queue.api_client = None
self.download_file_queue.api_client = None
self.metadata_queue.api_client = None

def login(self, api_client: API) -> None:
logger.debug('Passing API token to queues')
self.main_queue.api_client = api_client
self.download_file_queue.api_client = api_client
self.metadata_queue.api_client = api_client
self.start_queues()

def start_queues(self) -> None:
Expand All @@ -171,14 +194,25 @@ def start_queues(self) -> None:
logger.debug('Starting download thread')
self.download_file_thread.start()

if not self.metadata_thread.isRunning():
logger.debug("Starting metadata thread")
self.metadata_thread.start()

def on_queue_paused(self) -> None:
self.paused.emit()

def resume_queues(self) -> None:
logger.info("Resuming queues")
main_paused = not self.main_thread.isRunning()
download_paused = not self.download_file_thread.isRunning()
metadata_paused = not self.metadata_thread.isRunning()
self.start_queues()
self.main_queue.resume.emit()
self.download_file_queue.resume.emit()
if main_paused:
self.main_queue.resume.emit()
if download_paused:
self.download_file_queue.resume.emit()
if metadata_paused:
self.metadata_queue.resume.emit()

def enqueue(self, job: ApiJob) -> None:
# Prevent api jobs being added to the queue when not logged in.
Expand All @@ -192,6 +226,9 @@ def enqueue(self, job: ApiJob) -> None:
if isinstance(job, FileDownloadJob):
logger.debug('Adding job to download queue')
self.download_file_queue.add_job(job)
elif isinstance(job, MetadataSyncJob):
logger.debug("Adding job to metadata queue")
self.metadata_queue.add_job(job)
else:
logger.debug('Adding job to main queue')
self.main_queue.add_job(job)
3 changes: 2 additions & 1 deletion tests/test_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ def test_Controller_on_sync_failure(homedir, config, mocker, session_maker):
co.on_sync_failure(exception)

assert mock_storage.update_local_storage.call_count == 0
co.resume_queues.assert_called_once_with()


def test_Controller_on_refresh_failure(homedir, config, mocker, session_maker):
Expand Down Expand Up @@ -476,6 +475,7 @@ def test_Controller_on_sync_success(homedir, config, mocker):
co.download_new_messages = mocker.MagicMock()
co.download_new_replies = mocker.MagicMock()
co.gpg = mocker.MagicMock()
co.resume_queues = mocker.MagicMock()
mock_storage = mocker.patch('securedrop_client.logic.storage')

co.on_sync_success()
Expand All @@ -484,6 +484,7 @@ def test_Controller_on_sync_success(homedir, config, mocker):
co.update_sources.assert_called_once_with()
co.download_new_messages.assert_called_once_with()
co.download_new_replies.assert_called_once_with()
co.resume_queues.assert_called_once_with()


def test_Controller_update_sync(homedir, config, mocker, session_maker):
Expand Down
57 changes: 56 additions & 1 deletion tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from queue import Queue
from sdclientapi import RequestTimeoutError

from securedrop_client.api_jobs.downloads import FileDownloadJob
from securedrop_client.api_jobs.downloads import FileDownloadJob, MetadataSyncJob
from securedrop_client.api_jobs.base import ApiInaccessibleError, PauseQueueJob
from securedrop_client.queue import RunnableQueue, ApiJobQueue
from tests import factory
Expand Down Expand Up @@ -44,6 +44,26 @@ def test_RunnableQueue_happy_path(mocker):
assert queue.queue.empty()


def test_RunnableQueue_with_size_constraint(mocker):
'''
Add one job to the queue, run it.
'''
mock_api_client = mocker.MagicMock()
mock_session = mocker.MagicMock()
mock_session_maker = mocker.MagicMock(return_value=mock_session)
return_value = 'foo'

dummy_job_cls = factory.dummy_job_factory(mocker, return_value)
queue = RunnableQueue(mock_api_client, mock_session_maker, size=1)
queue.JOB_PRIORITIES = {dummy_job_cls: 1, PauseQueueJob: 2}

queue.add_job(dummy_job_cls())
queue.add_job(dummy_job_cls())
queue.add_job(dummy_job_cls())

assert queue.queue.qsize() == 1


def test_RunnableQueue_job_timeout(mocker):
'''
Add two jobs to the queue. The first times out, and then gets resubmitted for the next pass
Expand Down Expand Up @@ -239,11 +259,14 @@ def test_ApiJobQueue_enqueue(mocker):
job_queue.JOB_PRIORITIES = {FileDownloadJob: job_priority, type(dummy_job): job_priority}

mock_download_file_queue = mocker.patch.object(job_queue, 'download_file_queue')
mock_metadata_queue = mocker.patch.object(job_queue, 'metadata_queue')
mock_main_queue = mocker.patch.object(job_queue, 'main_queue')
mock_download_file_add_job = mocker.patch.object(mock_download_file_queue, 'add_job')
mock_metadata_add_job = mocker.patch.object(mock_metadata_queue, 'add_job')
mock_main_queue_add_job = mocker.patch.object(mock_main_queue, 'add_job')
job_queue.main_queue.api_client = 'has a value'
job_queue.download_file_queue.api_client = 'has a value'
job_queue.metadata_queue.api_client = 'has a value'
mock_start_queues = mocker.patch.object(job_queue, 'start_queues')

dl_job = FileDownloadJob('mock', 'mock', 'mock')
Expand All @@ -258,6 +281,18 @@ def test_ApiJobQueue_enqueue(mocker):
mock_main_queue.reset_mock()
mock_main_queue_add_job.reset_mock()

md_job = MetadataSyncJob("mock", "mock")
job_queue.enqueue(md_job)

mock_metadata_add_job.assert_called_once_with(md_job)
assert not mock_main_queue_add_job.called

# reset for next test
mock_download_file_queue.reset_mock()
mock_download_file_add_job.reset_mock()
mock_main_queue.reset_mock()
mock_main_queue_add_job.reset_mock()

job_queue.enqueue(dummy_job)

mock_main_queue_add_job.assert_called_once_with(dummy_job)
Expand All @@ -277,16 +312,24 @@ def test_ApiJobQueue_pause_queues(mocker):


def test_ApiJobQueue_resume_queues_emits_resume_signal(mocker):
"""
Resume only emits if the queue is paused.
"""
job_queue = ApiJobQueue(mocker.MagicMock(), mocker.MagicMock())
mocker.patch.object(job_queue.main_queue, 'resume')
mocker.patch.object(job_queue.download_file_queue, 'resume')
mocker.patch.object(job_queue.metadata_queue, 'resume')
job_queue.main_thread.isRunning = mocker.MagicMock(return_value=False)
job_queue.download_file_thread.isRunning = mocker.MagicMock(return_value=False)
job_queue.metadata_thread.isRunning = mocker.MagicMock(return_value=False)
job_queue.start_queues = mocker.MagicMock()

job_queue.resume_queues()

job_queue.start_queues.assert_called_once_with()
job_queue.main_queue.resume.emit.assert_called_once_with()
job_queue.download_file_queue.resume.emit.assert_called_once_with()
job_queue.metadata_queue.resume.emit.assert_called_once_with()


def test_ApiJobQueue_enqueue_no_auth(mocker):
Expand Down Expand Up @@ -320,18 +363,23 @@ def test_ApiJobQueue_login_if_queues_not_running(mocker):

mock_main_queue = mocker.patch.object(job_queue, 'main_queue')
mock_download_file_queue = mocker.patch.object(job_queue, 'download_file_queue')
mock_metadata_queue = mocker.patch.object(job_queue, 'metadata_queue')
mock_main_thread = mocker.patch.object(job_queue, 'main_thread')
mock_download_file_thread = mocker.patch.object(job_queue, 'download_file_thread')
mock_metadata_thread = mocker.patch.object(job_queue, 'metadata_thread')
job_queue.main_thread.isRunning = mocker.MagicMock(return_value=False)
job_queue.download_file_thread.isRunning = mocker.MagicMock(return_value=False)
job_queue.metadata_thread.isRunning = mocker.MagicMock(return_value=False)

job_queue.login(mock_api)

assert mock_main_queue.api_client == mock_api
assert mock_download_file_queue.api_client == mock_api
assert mock_metadata_queue.api_client == mock_api

mock_main_thread.start.assert_called_once_with()
mock_download_file_thread.start.assert_called_once_with()
mock_metadata_thread.start.assert_called_once_with()


def test_ApiJobQueue_login_if_queues_running(mocker):
Expand All @@ -343,18 +391,23 @@ def test_ApiJobQueue_login_if_queues_running(mocker):

mock_main_queue = mocker.patch.object(job_queue, 'main_queue')
mock_download_file_queue = mocker.patch.object(job_queue, 'download_file_queue')
mock_metadata_queue = mocker.patch.object(job_queue, 'metadata_queue')
mock_main_thread = mocker.patch.object(job_queue, 'main_thread')
mock_download_file_thread = mocker.patch.object(job_queue, 'download_file_thread')
mock_metadata_thread = mocker.patch.object(job_queue, 'metadata_thread')
job_queue.main_thread.isRunning = mocker.MagicMock(return_value=True)
job_queue.download_file_thread.isRunning = mocker.MagicMock(return_value=True)
job_queue.metadata_thread.isRunning = mocker.MagicMock(return_value=True)

job_queue.login(mock_api)

assert mock_main_queue.api_client == mock_api
assert mock_download_file_queue.api_client == mock_api
assert mock_metadata_queue.api_client == mock_api

assert not mock_main_thread.start.called
assert not mock_download_file_thread.start.called
assert not mock_metadata_thread.start.called


def test_ApiJobQueue_logout_removes_api_client(mocker):
Expand All @@ -364,8 +417,10 @@ def test_ApiJobQueue_logout_removes_api_client(mocker):
job_queue = ApiJobQueue(mock_client, mock_session_maker)
job_queue.main_queue.api_client = 'my token!!!'
job_queue.download_file_queue.api_client = 'my token!!!'
job_queue.metadata_queue.api_client = 'my token!!!'

job_queue.logout()

assert job_queue.main_queue.api_client is None
assert job_queue.download_file_queue.api_client is None
assert job_queue.metadata_queue.api_client is None

0 comments on commit 01bd4ef

Please sign in to comment.