Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new metadata queue. #715

Merged
merged 8 commits into from
Jan 27, 2020
6 changes: 3 additions & 3 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ def setup(self):
self.sync_timer.timeout.connect(self.update_sync)
self.sync_timer.start(30000)

# Automagically sync with the API every 5 minutes.
# Automagically sync with the API every minute.
self.sync_update = QTimer()
self.sync_update.timeout.connect(self.sync_api)
self.sync_update.start(1000 * 60 * 5) # every 5 minutes.
self.sync_update.start(1000 * 60) # every minute.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this time seems good for now, we can increase sync frequency once all sync_api calls have been removed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also: you need to remove the line of code that resumes the queues in on_sync_failure now that sync is outside of the main queue and we don't need to unpause the queue in order to run the MetadataSyncJob. We should never pause the metadata sync queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more note: while offline you can stop the thread and restart it on login, which will take care of this issue: #671 since you can remove the sync_api call on login and instead just start the metadata sync thread

Copy link
Contributor Author

@ntoll ntoll Jan 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed the queue resumption code as requested. The QTimer will kick in (after a minute) and try another MetadataSyncJob.

See my comment in #671.


# Run export object in a separate thread context (a reference to the
# thread is kept on self such that it does not get garbage collected
Expand Down Expand Up @@ -428,6 +428,7 @@ def on_sync_success(self) -> None:
self.download_new_messages()
self.download_new_replies()
self.sync_events.emit('synced')
self.resume_queues()

def on_sync_failure(self, result: Exception) -> None:
"""
Expand All @@ -439,7 +440,6 @@ def on_sync_failure(self, result: Exception) -> None:
_('The SecureDrop server cannot be reached.'),
duration=0,
retry=True)
self.resume_queues()

def on_refresh_failure(self, result: Exception) -> None:
"""
Expand Down
49 changes: 43 additions & 6 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging

from PyQt5.QtCore import QObject, QThread, pyqtSlot, pyqtSignal
from queue import PriorityQueue
from queue import PriorityQueue, Full
from sdclientapi import API, RequestTimeoutError
from sqlalchemy.orm import scoped_session
from typing import Optional, Tuple # noqa: F401
Expand Down Expand Up @@ -61,11 +61,19 @@ class RunnableQueue(QObject):
'''
resume = pyqtSignal()

def __init__(self, api_client: API, session_maker: scoped_session) -> None:
"""
Signal emitted when the queue successfully.
"""
pinged = pyqtSignal()

def __init__(self, api_client: API, session_maker: scoped_session, size: int = 0) -> None:
"""
A size of zero means there's no upper bound to the queue size.
"""
super().__init__()
self.api_client = api_client
self.session_maker = session_maker
self.queue = PriorityQueue() # type: PriorityQueue[Tuple[int, ApiJob]]
self.queue = PriorityQueue(maxsize=size) # type: PriorityQueue[Tuple[int, ApiJob]]
# `order_number` ensures jobs with equal priority are retrived in FIFO order. This is needed
# because PriorityQueue is implemented using heapq which does not have sort stability. For
# more info, see : https://bugs.python.org/issue17794
Expand All @@ -81,7 +89,12 @@ def add_job(self, job: ApiJob) -> None:
current_order_number = next(self.order_number)
job.order_number = current_order_number
priority = self.JOB_PRIORITIES[type(job)]
self.queue.put_nowait((priority, job))
try:
self.queue.put_nowait((priority, job))
except Full:
# Pass silently if the queue is full. For use with MetadataSyncJob.
# See #652.
pass

def re_add_job(self, job: ApiJob) -> None:
'''
Expand Down Expand Up @@ -117,6 +130,7 @@ def process(self) -> None:
try:
session = self.session_maker()
job._do_call_api(self.api_client, session)
self.pinged.emit()
except (RequestTimeoutError, ApiInaccessibleError) as e:
logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e))
self.add_job(PauseQueueJob())
Expand All @@ -139,27 +153,36 @@ def __init__(self, api_client: API, session_maker: scoped_session) -> None:

self.main_thread = QThread()
self.download_file_thread = QThread()
self.metadata_thread = QThread()

self.main_queue = RunnableQueue(api_client, session_maker)
self.download_file_queue = RunnableQueue(api_client, session_maker)
self.metadata_queue = RunnableQueue(api_client, session_maker, size=1)

self.main_queue.moveToThread(self.main_thread)
self.download_file_queue.moveToThread(self.download_file_thread)
self.metadata_queue.moveToThread(self.metadata_thread)

self.main_thread.started.connect(self.main_queue.process)
self.download_file_thread.started.connect(self.download_file_queue.process)
self.metadata_thread.started.connect(self.metadata_queue.process)

self.main_queue.paused.connect(self.on_queue_paused)
self.download_file_queue.paused.connect(self.on_queue_paused)
self.metadata_queue.paused.connect(self.on_queue_paused)

self.metadata_queue.pinged.connect(self.resume_queues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this queue doesn't need to pause or resume so you can remove this as well as the pinged signal. instead you can just call resume_queues from the Controller on_sync_success if the queues are paused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow.

The queue decides itself (see the exception handling in the process method of the RunnableQueue class.) Therefore, the metadata queue could be paused if it encounters a problem (which is a good thing). However, the update timer will restart the queue after X period of time, right...?

In which case, I'd argue these signals still need to be there. IYSWIM.

Copy link
Contributor

@sssoleileraaa sssoleileraaa Jan 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently, we never stop metadata syncs when they fail because of a request timeout. We want them to run in the background until they succeed, which is why it doesn't make sense to pause the metadata sync queue. It looks like your code will try to enqueue another MetadataSyncJob after 60 seconds but it will be dropped because the queue will be full and paused. The right behavior is to not pause the metadata queue and it makes the code simpler because you can remove the pinged signal and remove complicated logic around unpausing the metadata sync queue when we want to keep reaching out to server to see if we can unpause the other queues, see #671 (comment) for an idea on how to make this much simpler.


def logout(self) -> None:
self.main_queue.api_client = None
self.download_file_queue.api_client = None
self.metadata_queue.api_client = None

def login(self, api_client: API) -> None:
logger.debug('Passing API token to queues')
self.main_queue.api_client = api_client
self.download_file_queue.api_client = api_client
self.metadata_queue.api_client = api_client
self.start_queues()

def start_queues(self) -> None:
Expand All @@ -171,14 +194,25 @@ def start_queues(self) -> None:
logger.debug('Starting download thread')
self.download_file_thread.start()

if not self.metadata_thread.isRunning():
logger.debug("Starting metadata thread")
self.metadata_thread.start()

def on_queue_paused(self) -> None:
self.paused.emit()

def resume_queues(self) -> None:
logger.info("Resuming queues")
main_paused = not self.main_thread.isRunning()
download_paused = not self.download_file_thread.isRunning()
metadata_paused = not self.metadata_thread.isRunning()
self.start_queues()
self.main_queue.resume.emit()
self.download_file_queue.resume.emit()
if main_paused:
self.main_queue.resume.emit()
if download_paused:
self.download_file_queue.resume.emit()
if metadata_paused:
self.metadata_queue.resume.emit()

def enqueue(self, job: ApiJob) -> None:
# Prevent api jobs being added to the queue when not logged in.
Expand All @@ -192,6 +226,9 @@ def enqueue(self, job: ApiJob) -> None:
if isinstance(job, FileDownloadJob):
logger.debug('Adding job to download queue')
self.download_file_queue.add_job(job)
elif isinstance(job, MetadataSyncJob):
logger.debug("Adding job to metadata queue")
self.metadata_queue.add_job(job)
else:
logger.debug('Adding job to main queue')
self.main_queue.add_job(job)
3 changes: 2 additions & 1 deletion tests/test_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,6 @@ def test_Controller_on_sync_failure(homedir, config, mocker, session_maker):
co.on_sync_failure(exception)

assert mock_storage.update_local_storage.call_count == 0
co.resume_queues.assert_called_once_with()


def test_Controller_on_refresh_failure(homedir, config, mocker, session_maker):
Expand Down Expand Up @@ -476,6 +475,7 @@ def test_Controller_on_sync_success(homedir, config, mocker):
co.download_new_messages = mocker.MagicMock()
co.download_new_replies = mocker.MagicMock()
co.gpg = mocker.MagicMock()
co.resume_queues = mocker.MagicMock()
mock_storage = mocker.patch('securedrop_client.logic.storage')

co.on_sync_success()
Expand All @@ -484,6 +484,7 @@ def test_Controller_on_sync_success(homedir, config, mocker):
co.update_sources.assert_called_once_with()
co.download_new_messages.assert_called_once_with()
co.download_new_replies.assert_called_once_with()
co.resume_queues.assert_called_once_with()


def test_Controller_update_sync(homedir, config, mocker, session_maker):
Expand Down
57 changes: 56 additions & 1 deletion tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from queue import Queue
from sdclientapi import RequestTimeoutError

from securedrop_client.api_jobs.downloads import FileDownloadJob
from securedrop_client.api_jobs.downloads import FileDownloadJob, MetadataSyncJob
from securedrop_client.api_jobs.base import ApiInaccessibleError, PauseQueueJob
from securedrop_client.queue import RunnableQueue, ApiJobQueue
from tests import factory
Expand Down Expand Up @@ -44,6 +44,26 @@ def test_RunnableQueue_happy_path(mocker):
assert queue.queue.empty()


def test_RunnableQueue_with_size_constraint(mocker):
'''
Add one job to the queue, run it.
'''
mock_api_client = mocker.MagicMock()
mock_session = mocker.MagicMock()
mock_session_maker = mocker.MagicMock(return_value=mock_session)
return_value = 'foo'

dummy_job_cls = factory.dummy_job_factory(mocker, return_value)
queue = RunnableQueue(mock_api_client, mock_session_maker, size=1)
queue.JOB_PRIORITIES = {dummy_job_cls: 1, PauseQueueJob: 2}

queue.add_job(dummy_job_cls())
queue.add_job(dummy_job_cls())
queue.add_job(dummy_job_cls())

assert queue.queue.qsize() == 1


def test_RunnableQueue_job_timeout(mocker):
'''
Add two jobs to the queue. The first times out, and then gets resubmitted for the next pass
Expand Down Expand Up @@ -239,11 +259,14 @@ def test_ApiJobQueue_enqueue(mocker):
job_queue.JOB_PRIORITIES = {FileDownloadJob: job_priority, type(dummy_job): job_priority}

mock_download_file_queue = mocker.patch.object(job_queue, 'download_file_queue')
mock_metadata_queue = mocker.patch.object(job_queue, 'metadata_queue')
mock_main_queue = mocker.patch.object(job_queue, 'main_queue')
mock_download_file_add_job = mocker.patch.object(mock_download_file_queue, 'add_job')
mock_metadata_add_job = mocker.patch.object(mock_metadata_queue, 'add_job')
mock_main_queue_add_job = mocker.patch.object(mock_main_queue, 'add_job')
job_queue.main_queue.api_client = 'has a value'
job_queue.download_file_queue.api_client = 'has a value'
job_queue.metadata_queue.api_client = 'has a value'
mock_start_queues = mocker.patch.object(job_queue, 'start_queues')

dl_job = FileDownloadJob('mock', 'mock', 'mock')
Expand All @@ -258,6 +281,18 @@ def test_ApiJobQueue_enqueue(mocker):
mock_main_queue.reset_mock()
mock_main_queue_add_job.reset_mock()

md_job = MetadataSyncJob("mock", "mock")
job_queue.enqueue(md_job)

mock_metadata_add_job.assert_called_once_with(md_job)
assert not mock_main_queue_add_job.called

# reset for next test
mock_download_file_queue.reset_mock()
mock_download_file_add_job.reset_mock()
mock_main_queue.reset_mock()
mock_main_queue_add_job.reset_mock()

job_queue.enqueue(dummy_job)

mock_main_queue_add_job.assert_called_once_with(dummy_job)
Expand All @@ -277,16 +312,24 @@ def test_ApiJobQueue_pause_queues(mocker):


def test_ApiJobQueue_resume_queues_emits_resume_signal(mocker):
"""
Resume only emits if the queue is paused.
"""
job_queue = ApiJobQueue(mocker.MagicMock(), mocker.MagicMock())
mocker.patch.object(job_queue.main_queue, 'resume')
mocker.patch.object(job_queue.download_file_queue, 'resume')
mocker.patch.object(job_queue.metadata_queue, 'resume')
job_queue.main_thread.isRunning = mocker.MagicMock(return_value=False)
job_queue.download_file_thread.isRunning = mocker.MagicMock(return_value=False)
job_queue.metadata_thread.isRunning = mocker.MagicMock(return_value=False)
job_queue.start_queues = mocker.MagicMock()

job_queue.resume_queues()

job_queue.start_queues.assert_called_once_with()
job_queue.main_queue.resume.emit.assert_called_once_with()
job_queue.download_file_queue.resume.emit.assert_called_once_with()
job_queue.metadata_queue.resume.emit.assert_called_once_with()


def test_ApiJobQueue_enqueue_no_auth(mocker):
Expand Down Expand Up @@ -320,18 +363,23 @@ def test_ApiJobQueue_login_if_queues_not_running(mocker):

mock_main_queue = mocker.patch.object(job_queue, 'main_queue')
mock_download_file_queue = mocker.patch.object(job_queue, 'download_file_queue')
mock_metadata_queue = mocker.patch.object(job_queue, 'metadata_queue')
mock_main_thread = mocker.patch.object(job_queue, 'main_thread')
mock_download_file_thread = mocker.patch.object(job_queue, 'download_file_thread')
mock_metadata_thread = mocker.patch.object(job_queue, 'metadata_thread')
job_queue.main_thread.isRunning = mocker.MagicMock(return_value=False)
job_queue.download_file_thread.isRunning = mocker.MagicMock(return_value=False)
job_queue.metadata_thread.isRunning = mocker.MagicMock(return_value=False)

job_queue.login(mock_api)

assert mock_main_queue.api_client == mock_api
assert mock_download_file_queue.api_client == mock_api
assert mock_metadata_queue.api_client == mock_api

mock_main_thread.start.assert_called_once_with()
mock_download_file_thread.start.assert_called_once_with()
mock_metadata_thread.start.assert_called_once_with()


def test_ApiJobQueue_login_if_queues_running(mocker):
Expand All @@ -343,18 +391,23 @@ def test_ApiJobQueue_login_if_queues_running(mocker):

mock_main_queue = mocker.patch.object(job_queue, 'main_queue')
mock_download_file_queue = mocker.patch.object(job_queue, 'download_file_queue')
mock_metadata_queue = mocker.patch.object(job_queue, 'metadata_queue')
mock_main_thread = mocker.patch.object(job_queue, 'main_thread')
mock_download_file_thread = mocker.patch.object(job_queue, 'download_file_thread')
mock_metadata_thread = mocker.patch.object(job_queue, 'metadata_thread')
job_queue.main_thread.isRunning = mocker.MagicMock(return_value=True)
job_queue.download_file_thread.isRunning = mocker.MagicMock(return_value=True)
job_queue.metadata_thread.isRunning = mocker.MagicMock(return_value=True)

job_queue.login(mock_api)

assert mock_main_queue.api_client == mock_api
assert mock_download_file_queue.api_client == mock_api
assert mock_metadata_queue.api_client == mock_api

assert not mock_main_thread.start.called
assert not mock_download_file_thread.start.called
assert not mock_metadata_thread.start.called


def test_ApiJobQueue_logout_removes_api_client(mocker):
Expand All @@ -364,8 +417,10 @@ def test_ApiJobQueue_logout_removes_api_client(mocker):
job_queue = ApiJobQueue(mock_client, mock_session_maker)
job_queue.main_queue.api_client = 'my token!!!'
job_queue.download_file_queue.api_client = 'my token!!!'
job_queue.metadata_queue.api_client = 'my token!!!'

job_queue.logout()

assert job_queue.main_queue.api_client is None
assert job_queue.download_file_queue.api_client is None
assert job_queue.metadata_queue.api_client is None