diff --git a/securedrop_client/logic.py b/securedrop_client/logic.py index 6b52325ff..ccc91ff22 100644 --- a/securedrop_client/logic.py +++ b/securedrop_client/logic.py @@ -221,10 +221,10 @@ def setup(self): self.sync_timer.timeout.connect(self.update_sync) self.sync_timer.start(30000) - # Automagically sync with the API every 5 minutes. + # Automagically sync with the API every minute. self.sync_update = QTimer() self.sync_update.timeout.connect(self.sync_api) - self.sync_update.start(1000 * 60 * 5) # every 5 minutes. + self.sync_update.start(1000 * 60) # every minute. # Run export object in a separate thread context (a reference to the # thread is kept on self such that it does not get garbage collected @@ -428,6 +428,7 @@ def on_sync_success(self) -> None: self.download_new_messages() self.download_new_replies() self.sync_events.emit('synced') + self.resume_queues() def on_sync_failure(self, result: Exception) -> None: """ @@ -439,7 +440,6 @@ def on_sync_failure(self, result: Exception) -> None: _('The SecureDrop server cannot be reached.'), duration=0, retry=True) - self.resume_queues() def on_refresh_failure(self, result: Exception) -> None: """ diff --git a/securedrop_client/queue.py b/securedrop_client/queue.py index 3f30887d6..a52ce4517 100644 --- a/securedrop_client/queue.py +++ b/securedrop_client/queue.py @@ -2,7 +2,7 @@ import logging from PyQt5.QtCore import QObject, QThread, pyqtSlot, pyqtSignal -from queue import PriorityQueue +from queue import PriorityQueue, Full from sdclientapi import API, RequestTimeoutError from sqlalchemy.orm import scoped_session from typing import Optional, Tuple # noqa: F401 @@ -61,11 +61,19 @@ class RunnableQueue(QObject): ''' resume = pyqtSignal() - def __init__(self, api_client: API, session_maker: scoped_session) -> None: + """ + Signal emitted when the queue successfully. + """ + pinged = pyqtSignal() + + def __init__(self, api_client: API, session_maker: scoped_session, size: int = 0) -> None: + """ + A size of zero means there's no upper bound to the queue size. + """ super().__init__() self.api_client = api_client self.session_maker = session_maker - self.queue = PriorityQueue() # type: PriorityQueue[Tuple[int, ApiJob]] + self.queue = PriorityQueue(maxsize=size) # type: PriorityQueue[Tuple[int, ApiJob]] # `order_number` ensures jobs with equal priority are retrived in FIFO order. This is needed # because PriorityQueue is implemented using heapq which does not have sort stability. For # more info, see : https://bugs.python.org/issue17794 @@ -81,7 +89,12 @@ def add_job(self, job: ApiJob) -> None: current_order_number = next(self.order_number) job.order_number = current_order_number priority = self.JOB_PRIORITIES[type(job)] - self.queue.put_nowait((priority, job)) + try: + self.queue.put_nowait((priority, job)) + except Full: + # Pass silently if the queue is full. For use with MetadataSyncJob. + # See #652. + pass def re_add_job(self, job: ApiJob) -> None: ''' @@ -117,6 +130,7 @@ def process(self) -> None: try: session = self.session_maker() job._do_call_api(self.api_client, session) + self.pinged.emit() except (RequestTimeoutError, ApiInaccessibleError) as e: logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e)) self.add_job(PauseQueueJob()) @@ -139,27 +153,36 @@ def __init__(self, api_client: API, session_maker: scoped_session) -> None: self.main_thread = QThread() self.download_file_thread = QThread() + self.metadata_thread = QThread() self.main_queue = RunnableQueue(api_client, session_maker) self.download_file_queue = RunnableQueue(api_client, session_maker) + self.metadata_queue = RunnableQueue(api_client, session_maker, size=1) self.main_queue.moveToThread(self.main_thread) self.download_file_queue.moveToThread(self.download_file_thread) + self.metadata_queue.moveToThread(self.metadata_thread) self.main_thread.started.connect(self.main_queue.process) self.download_file_thread.started.connect(self.download_file_queue.process) + self.metadata_thread.started.connect(self.metadata_queue.process) self.main_queue.paused.connect(self.on_queue_paused) self.download_file_queue.paused.connect(self.on_queue_paused) + self.metadata_queue.paused.connect(self.on_queue_paused) + + self.metadata_queue.pinged.connect(self.resume_queues) def logout(self) -> None: self.main_queue.api_client = None self.download_file_queue.api_client = None + self.metadata_queue.api_client = None def login(self, api_client: API) -> None: logger.debug('Passing API token to queues') self.main_queue.api_client = api_client self.download_file_queue.api_client = api_client + self.metadata_queue.api_client = api_client self.start_queues() def start_queues(self) -> None: @@ -171,14 +194,25 @@ def start_queues(self) -> None: logger.debug('Starting download thread') self.download_file_thread.start() + if not self.metadata_thread.isRunning(): + logger.debug("Starting metadata thread") + self.metadata_thread.start() + def on_queue_paused(self) -> None: self.paused.emit() def resume_queues(self) -> None: logger.info("Resuming queues") + main_paused = not self.main_thread.isRunning() + download_paused = not self.download_file_thread.isRunning() + metadata_paused = not self.metadata_thread.isRunning() self.start_queues() - self.main_queue.resume.emit() - self.download_file_queue.resume.emit() + if main_paused: + self.main_queue.resume.emit() + if download_paused: + self.download_file_queue.resume.emit() + if metadata_paused: + self.metadata_queue.resume.emit() def enqueue(self, job: ApiJob) -> None: # Prevent api jobs being added to the queue when not logged in. @@ -192,6 +226,9 @@ def enqueue(self, job: ApiJob) -> None: if isinstance(job, FileDownloadJob): logger.debug('Adding job to download queue') self.download_file_queue.add_job(job) + elif isinstance(job, MetadataSyncJob): + logger.debug("Adding job to metadata queue") + self.metadata_queue.add_job(job) else: logger.debug('Adding job to main queue') self.main_queue.add_job(job) diff --git a/tests/test_logic.py b/tests/test_logic.py index be68625da..ecbaf364e 100644 --- a/tests/test_logic.py +++ b/tests/test_logic.py @@ -441,7 +441,6 @@ def test_Controller_on_sync_failure(homedir, config, mocker, session_maker): co.on_sync_failure(exception) assert mock_storage.update_local_storage.call_count == 0 - co.resume_queues.assert_called_once_with() def test_Controller_on_refresh_failure(homedir, config, mocker, session_maker): @@ -476,6 +475,7 @@ def test_Controller_on_sync_success(homedir, config, mocker): co.download_new_messages = mocker.MagicMock() co.download_new_replies = mocker.MagicMock() co.gpg = mocker.MagicMock() + co.resume_queues = mocker.MagicMock() mock_storage = mocker.patch('securedrop_client.logic.storage') co.on_sync_success() @@ -484,6 +484,7 @@ def test_Controller_on_sync_success(homedir, config, mocker): co.update_sources.assert_called_once_with() co.download_new_messages.assert_called_once_with() co.download_new_replies.assert_called_once_with() + co.resume_queues.assert_called_once_with() def test_Controller_update_sync(homedir, config, mocker, session_maker): diff --git a/tests/test_queue.py b/tests/test_queue.py index cabff3c95..ba49a3567 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -4,7 +4,7 @@ from queue import Queue from sdclientapi import RequestTimeoutError -from securedrop_client.api_jobs.downloads import FileDownloadJob +from securedrop_client.api_jobs.downloads import FileDownloadJob, MetadataSyncJob from securedrop_client.api_jobs.base import ApiInaccessibleError, PauseQueueJob from securedrop_client.queue import RunnableQueue, ApiJobQueue from tests import factory @@ -44,6 +44,26 @@ def test_RunnableQueue_happy_path(mocker): assert queue.queue.empty() +def test_RunnableQueue_with_size_constraint(mocker): + ''' + Add one job to the queue, run it. + ''' + mock_api_client = mocker.MagicMock() + mock_session = mocker.MagicMock() + mock_session_maker = mocker.MagicMock(return_value=mock_session) + return_value = 'foo' + + dummy_job_cls = factory.dummy_job_factory(mocker, return_value) + queue = RunnableQueue(mock_api_client, mock_session_maker, size=1) + queue.JOB_PRIORITIES = {dummy_job_cls: 1, PauseQueueJob: 2} + + queue.add_job(dummy_job_cls()) + queue.add_job(dummy_job_cls()) + queue.add_job(dummy_job_cls()) + + assert queue.queue.qsize() == 1 + + def test_RunnableQueue_job_timeout(mocker): ''' Add two jobs to the queue. The first times out, and then gets resubmitted for the next pass @@ -239,11 +259,14 @@ def test_ApiJobQueue_enqueue(mocker): job_queue.JOB_PRIORITIES = {FileDownloadJob: job_priority, type(dummy_job): job_priority} mock_download_file_queue = mocker.patch.object(job_queue, 'download_file_queue') + mock_metadata_queue = mocker.patch.object(job_queue, 'metadata_queue') mock_main_queue = mocker.patch.object(job_queue, 'main_queue') mock_download_file_add_job = mocker.patch.object(mock_download_file_queue, 'add_job') + mock_metadata_add_job = mocker.patch.object(mock_metadata_queue, 'add_job') mock_main_queue_add_job = mocker.patch.object(mock_main_queue, 'add_job') job_queue.main_queue.api_client = 'has a value' job_queue.download_file_queue.api_client = 'has a value' + job_queue.metadata_queue.api_client = 'has a value' mock_start_queues = mocker.patch.object(job_queue, 'start_queues') dl_job = FileDownloadJob('mock', 'mock', 'mock') @@ -258,6 +281,18 @@ def test_ApiJobQueue_enqueue(mocker): mock_main_queue.reset_mock() mock_main_queue_add_job.reset_mock() + md_job = MetadataSyncJob("mock", "mock") + job_queue.enqueue(md_job) + + mock_metadata_add_job.assert_called_once_with(md_job) + assert not mock_main_queue_add_job.called + + # reset for next test + mock_download_file_queue.reset_mock() + mock_download_file_add_job.reset_mock() + mock_main_queue.reset_mock() + mock_main_queue_add_job.reset_mock() + job_queue.enqueue(dummy_job) mock_main_queue_add_job.assert_called_once_with(dummy_job) @@ -277,9 +312,16 @@ def test_ApiJobQueue_pause_queues(mocker): def test_ApiJobQueue_resume_queues_emits_resume_signal(mocker): + """ + Resume only emits if the queue is paused. + """ job_queue = ApiJobQueue(mocker.MagicMock(), mocker.MagicMock()) mocker.patch.object(job_queue.main_queue, 'resume') mocker.patch.object(job_queue.download_file_queue, 'resume') + mocker.patch.object(job_queue.metadata_queue, 'resume') + job_queue.main_thread.isRunning = mocker.MagicMock(return_value=False) + job_queue.download_file_thread.isRunning = mocker.MagicMock(return_value=False) + job_queue.metadata_thread.isRunning = mocker.MagicMock(return_value=False) job_queue.start_queues = mocker.MagicMock() job_queue.resume_queues() @@ -287,6 +329,7 @@ def test_ApiJobQueue_resume_queues_emits_resume_signal(mocker): job_queue.start_queues.assert_called_once_with() job_queue.main_queue.resume.emit.assert_called_once_with() job_queue.download_file_queue.resume.emit.assert_called_once_with() + job_queue.metadata_queue.resume.emit.assert_called_once_with() def test_ApiJobQueue_enqueue_no_auth(mocker): @@ -320,18 +363,23 @@ def test_ApiJobQueue_login_if_queues_not_running(mocker): mock_main_queue = mocker.patch.object(job_queue, 'main_queue') mock_download_file_queue = mocker.patch.object(job_queue, 'download_file_queue') + mock_metadata_queue = mocker.patch.object(job_queue, 'metadata_queue') mock_main_thread = mocker.patch.object(job_queue, 'main_thread') mock_download_file_thread = mocker.patch.object(job_queue, 'download_file_thread') + mock_metadata_thread = mocker.patch.object(job_queue, 'metadata_thread') job_queue.main_thread.isRunning = mocker.MagicMock(return_value=False) job_queue.download_file_thread.isRunning = mocker.MagicMock(return_value=False) + job_queue.metadata_thread.isRunning = mocker.MagicMock(return_value=False) job_queue.login(mock_api) assert mock_main_queue.api_client == mock_api assert mock_download_file_queue.api_client == mock_api + assert mock_metadata_queue.api_client == mock_api mock_main_thread.start.assert_called_once_with() mock_download_file_thread.start.assert_called_once_with() + mock_metadata_thread.start.assert_called_once_with() def test_ApiJobQueue_login_if_queues_running(mocker): @@ -343,18 +391,23 @@ def test_ApiJobQueue_login_if_queues_running(mocker): mock_main_queue = mocker.patch.object(job_queue, 'main_queue') mock_download_file_queue = mocker.patch.object(job_queue, 'download_file_queue') + mock_metadata_queue = mocker.patch.object(job_queue, 'metadata_queue') mock_main_thread = mocker.patch.object(job_queue, 'main_thread') mock_download_file_thread = mocker.patch.object(job_queue, 'download_file_thread') + mock_metadata_thread = mocker.patch.object(job_queue, 'metadata_thread') job_queue.main_thread.isRunning = mocker.MagicMock(return_value=True) job_queue.download_file_thread.isRunning = mocker.MagicMock(return_value=True) + job_queue.metadata_thread.isRunning = mocker.MagicMock(return_value=True) job_queue.login(mock_api) assert mock_main_queue.api_client == mock_api assert mock_download_file_queue.api_client == mock_api + assert mock_metadata_queue.api_client == mock_api assert not mock_main_thread.start.called assert not mock_download_file_thread.start.called + assert not mock_metadata_thread.start.called def test_ApiJobQueue_logout_removes_api_client(mocker): @@ -364,8 +417,10 @@ def test_ApiJobQueue_logout_removes_api_client(mocker): job_queue = ApiJobQueue(mock_client, mock_session_maker) job_queue.main_queue.api_client = 'my token!!!' job_queue.download_file_queue.api_client = 'my token!!!' + job_queue.metadata_queue.api_client = 'my token!!!' job_queue.logout() assert job_queue.main_queue.api_client is None assert job_queue.download_file_queue.api_client is None + assert job_queue.metadata_queue.api_client is None