Skip to content

Commit

Permalink
app, test: use signals/slots to trigger adding jobs to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
redshiftzero committed May 19, 2020
1 parent ca02ae3 commit c4ded3b
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 33 deletions.
19 changes: 14 additions & 5 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ class Controller(QObject):
"""
source_deletion_failed = pyqtSignal(str)

"""
This signal lets the queue manager know to add the job to the appropriate
network queue.
Emits:
PyQt_PyObject: the ApiJob to be added
"""
add_job = pyqtSignal('PyQt_PyObject')

def __init__(
self, hostname: str, gui, session_maker: sessionmaker,
home: str, proxy: bool = True, qubes: bool = True
Expand Down Expand Up @@ -277,6 +286,7 @@ def __init__(
# Queue that handles running API job
self.api_job_queue = ApiJobQueue(self.api, self.session_maker)
self.api_job_queue.paused.connect(self.on_queue_paused)
self.add_job.connect(self.api_job_queue.enqueue)

# Contains active threads calling the API.
self.api_threads = {} # type: Dict[str, Dict]
Expand Down Expand Up @@ -571,8 +581,7 @@ def update_star(self, source_uuid: str, is_starred: bool):
job = UpdateStarJob(source_uuid, is_starred)
job.success_signal.connect(self.on_update_star_success, type=Qt.QueuedConnection)
job.failure_signal.connect(self.on_update_star_failure, type=Qt.QueuedConnection)

self.api_job_queue.enqueue(job)
self.add_job.emit(job)

def logout(self):
"""
Expand Down Expand Up @@ -630,7 +639,7 @@ def _submit_download_job(self,
job.success_signal.connect(self.on_file_download_success, type=Qt.QueuedConnection)
job.failure_signal.connect(self.on_file_download_failure, type=Qt.QueuedConnection)

self.api_job_queue.enqueue(job)
self.add_job.emit(job)

def download_new_messages(self) -> None:
new_messages = storage.find_new_messages(self.session)
Expand Down Expand Up @@ -861,7 +870,7 @@ def delete_source(self, source: db.Source):
job.success_signal.connect(self.on_delete_source_success, type=Qt.QueuedConnection)
job.failure_signal.connect(self.on_delete_source_failure, type=Qt.QueuedConnection)

self.api_job_queue.enqueue(job)
self.add_job.emit(job)
self.source_deleted.emit(source.uuid)

@login_required
Expand Down Expand Up @@ -890,7 +899,7 @@ def send_reply(self, source_uuid: str, reply_uuid: str, message: str) -> None:
job.success_signal.connect(self.on_reply_success, type=Qt.QueuedConnection)
job.failure_signal.connect(self.on_reply_failure, type=Qt.QueuedConnection)

self.api_job_queue.enqueue(job)
self.add_job.emit(job)

def on_reply_success(self, reply_uuid: str) -> None:
logger.info('{} sent successfully'.format(reply_uuid))
Expand Down
12 changes: 9 additions & 3 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ def _check_for_duplicate_jobs(self, job: ApiJob) -> bool:
return True
return False

@pyqtSlot()
def add_job(self, job: ApiJob) -> None:
'''
Add the job with its priority to the queue after assigning it the next order_number.
Can block while waiting to acquire condition_add_or_remove_job.
'''
with self.condition_add_or_remove_job:
if self._check_for_duplicate_jobs(job):
Expand All @@ -103,10 +106,12 @@ def add_job(self, job: ApiJob) -> None:
self.queue.put_nowait((priority, job))
self.condition_add_or_remove_job.notify()

def re_add_job(self, job: ApiJob) -> None:
def _re_add_job(self, job: ApiJob) -> None:
'''
Reset the job's remaining attempts and put it back into the queue in the order in which it
was submitted by the user (do not assign it the next order_number).
was submitted by the user (do not assign it the next order_number). Used internally.
When called condition_add_or_remove_job should be held.
'''
if self._check_for_duplicate_jobs(job):
return
Expand Down Expand Up @@ -162,7 +167,7 @@ def process(self) -> None:
self.add_job(PauseQueueJob())
with self.condition_add_or_remove_job:
job, self.current_job = self.current_job, None
self.re_add_job(job)
self._re_add_job(job)
except Exception as e:
logger.error('{}: {}'.format(type(e).__name__, e))
logger.debug('Skipping job')
Expand Down Expand Up @@ -258,6 +263,7 @@ def resume_queues(self) -> None:
logger.debug("Resuming download queue")
self.download_file_queue.resume.emit()

@pyqtSlot(object)
def enqueue(self, job: ApiJob) -> None:
'''
Enqueue the supplied job if the queues are running.
Expand Down
55 changes: 33 additions & 22 deletions tests/test_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,8 @@ def test_Controller_on_file_download_Submission(homedir, config, session, mocker
failure_signal=mock_failure_signal)
mock_job_cls = mocker.patch(
"securedrop_client.logic.FileDownloadJob", return_value=mock_job)
mock_queue = mocker.patch.object(co, 'api_job_queue')
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()

source = factory.Source()
file_ = factory.File(is_downloaded=None, is_decrypted=None, source=source)
Expand All @@ -843,7 +844,7 @@ def test_Controller_on_file_download_Submission(homedir, config, session, mocker
co.data_dir,
co.gpg,
)
mock_queue.enqueue.assert_called_once_with(mock_job)
co.add_job.emit.assert_called_once_with(mock_job)
mock_success_signal.connect.assert_called_once_with(
co.on_file_download_success, type=Qt.QueuedConnection)
mock_failure_signal.connect.assert_called_once_with(
Expand All @@ -864,7 +865,8 @@ def test_Controller_on_file_download_Submission_no_auth(homedir, config, session
failure_signal=mock_failure_signal)
mock_job_cls = mocker.patch(
"securedrop_client.logic.FileDownloadJob", return_value=mock_job)
mock_queue = mocker.patch.object(co, 'api_job_queue')
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()

source = factory.Source()
file_ = factory.File(is_downloaded=None, is_decrypted=None, source=source)
Expand All @@ -875,7 +877,7 @@ def test_Controller_on_file_download_Submission_no_auth(homedir, config, session
co.on_submission_download(db.File, file_.uuid)

assert not mock_job_cls.called
assert not mock_queue.enqueue.called
assert not co.add_job.emit.called
assert not mock_success_signal.connect.called
assert not mock_failure_signal.connect.called
assert co.on_action_requiring_login.called
Expand Down Expand Up @@ -1132,11 +1134,12 @@ def test_Controller_download_new_replies_with_new_reply(mocker, session, session
failure_signal = mocker.MagicMock()
job = mocker.MagicMock(success_signal=success_signal, failure_signal=failure_signal)
mocker.patch("securedrop_client.logic.ReplyDownloadJob", return_value=job)
api_job_queue = mocker.patch.object(co, 'api_job_queue')
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()

co.download_new_replies()

api_job_queue.enqueue.assert_called_once_with(job)
co.add_job.emit.assert_called_once_with(job)
success_signal.connect.assert_called_once_with(
co.on_reply_download_success, type=Qt.QueuedConnection)
failure_signal.connect.assert_called_once_with(
Expand All @@ -1154,12 +1157,13 @@ def test_Controller_download_new_replies_without_replies(mocker, session, sessio
failure_signal = mocker.MagicMock()
job = mocker.MagicMock(success_signal=success_signal, failure_signal=failure_signal)
mocker.patch("securedrop_client.logic.ReplyDownloadJob", return_value=job)
api_job_queue = mocker.patch.object(co, 'api_job_queue')
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()
set_status = mocker.patch.object(co, 'set_status')

co.download_new_replies()

api_job_queue.enqueue.assert_not_called()
co.add_job.emit.assert_not_called()
success_signal.connect.assert_not_called()
failure_signal.connect.assert_not_called()
set_status.assert_not_called()
Expand Down Expand Up @@ -1247,14 +1251,15 @@ def test_Controller_download_new_messages_with_new_message(mocker, session, sess
mocker.patch('securedrop_client.storage.find_new_messages', return_value=[message])
success_signal = mocker.MagicMock()
failure_signal = mocker.MagicMock()
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()
job = mocker.MagicMock(success_signal=success_signal, failure_signal=failure_signal)
mocker.patch("securedrop_client.logic.MessageDownloadJob", return_value=job)
api_job_queue = mocker.patch.object(co, 'api_job_queue')
set_status = mocker.patch.object(co, 'set_status')

co.download_new_messages()

api_job_queue.enqueue.assert_called_once_with(job)
co.add_job.emit.assert_called_once_with(job)
success_signal.connect.assert_called_once_with(
co.on_message_download_success, type=Qt.QueuedConnection)
failure_signal.connect.assert_called_once_with(
Expand All @@ -1273,12 +1278,13 @@ def test_Controller_download_new_messages_without_messages(mocker, session, sess
failure_signal = mocker.MagicMock()
job = mocker.MagicMock(success_signal=success_signal, failure_signal=failure_signal)
mocker.patch("securedrop_client.logic.MessageDownloadJob", return_value=job)
api_job_queue = mocker.patch.object(co, 'api_job_queue')
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()
set_status = mocker.patch.object(co, 'set_status')

co.download_new_messages()

api_job_queue.enqueue.assert_not_called()
co.add_job.emit.assert_not_called()
success_signal.connect.assert_not_called()
failure_signal.connect.assert_not_called()
set_status.assert_not_called()
Expand All @@ -1292,6 +1298,8 @@ def test_Controller_download_new_messages_skips_recent_failures(
"""
co = Controller("http://localhost", mocker.MagicMock(), session_maker, homedir)
co.api = "Api token has a value"
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()

# record the download failures
download_error = session.query(db.DownloadError).filter_by(
Expand All @@ -1303,13 +1311,12 @@ def test_Controller_download_new_messages_skips_recent_failures(
session.commit()

mocker.patch("securedrop_client.storage.find_new_messages", return_value=[message])
api_job_queue = mocker.patch.object(co, "api_job_queue")
mocker.patch("securedrop_client.logic.logger.isEnabledFor", return_value=logging.DEBUG)
info_logger = mocker.patch("securedrop_client.logic.logger.info")

co.download_new_messages()

api_job_queue.enqueue.assert_not_called()
co.add_job.emit.assert_not_called()
info_logger.call_args_list[0][0][0] == (
f"Download of message {message.uuid} failed since client start; not retrying."
)
Expand All @@ -1323,6 +1330,8 @@ def test_Controller_download_new_replies_skips_recent_failures(
"""
co = Controller("http://localhost", mocker.MagicMock(), session_maker, homedir)
co.api = "Api token has a value"
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()

# record the download failures
download_error = session.query(db.DownloadError).filter_by(
Expand All @@ -1335,13 +1344,12 @@ def test_Controller_download_new_replies_skips_recent_failures(
session.commit()

mocker.patch("securedrop_client.storage.find_new_replies", return_value=[reply])
api_job_queue = mocker.patch.object(co, "api_job_queue")
mocker.patch("securedrop_client.logic.logger.isEnabledFor", return_value=logging.DEBUG)
info_logger = mocker.patch("securedrop_client.logic.logger.info")

co.download_new_replies()

api_job_queue.enqueue.assert_not_called()
co.add_job.emit.assert_not_called()
info_logger.call_args_list[0][0][0] == (
f"Download of reply {reply.uuid} failed since client start; not retrying."
)
Expand Down Expand Up @@ -1463,13 +1471,14 @@ def test_Controller_delete_source(homedir, config, mocker, session_maker, sessio
co.call_api = mocker.MagicMock()
co.api = mocker.MagicMock()
co.source_deleted = mocker.MagicMock()
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()

mock_success_signal = mocker.MagicMock()
mock_failure_signal = mocker.MagicMock()
mock_job = mocker.MagicMock(
success_signal=mock_success_signal, failure_signal=mock_failure_signal)
mock_job_cls = mocker.patch("securedrop_client.logic.DeleteSourceJob", return_value=mock_job)
mock_queue = mocker.patch.object(co, 'api_job_queue')

source = factory.Source()
session.add(source)
Expand All @@ -1479,7 +1488,7 @@ def test_Controller_delete_source(homedir, config, mocker, session_maker, sessio

co.source_deleted.emit.assert_called_once_with(source.uuid)
mock_job_cls.assert_called_once_with(source.uuid)
mock_queue.enqueue.assert_called_once_with(mock_job)
co.add_job.emit.assert_called_once_with(mock_job)
mock_success_signal.connect.assert_called_once_with(
co.on_delete_source_success, type=Qt.QueuedConnection)
mock_failure_signal.connect.assert_called_once_with(
Expand All @@ -1503,7 +1512,8 @@ def test_Controller_send_reply_success(homedir, config, mocker, session_maker, s
failure_signal=mock_failure_signal)
mock_job_cls = mocker.patch(
"securedrop_client.logic.SendReplyJob", return_value=mock_job)
mock_queue = mocker.patch.object(co, 'api_job_queue')
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()

source = factory.Source()
session.add(source)
Expand All @@ -1518,7 +1528,7 @@ def test_Controller_send_reply_success(homedir, config, mocker, session_maker, s
co.gpg,
)

mock_queue.enqueue.assert_called_once_with(mock_job)
co.add_job.emit.assert_called_once_with(mock_job)
mock_success_signal.connect.assert_called_once_with(
co.on_reply_success, type=Qt.QueuedConnection)
mock_failure_signal.connect.assert_called_once_with(
Expand Down Expand Up @@ -1678,7 +1688,8 @@ def test_Controller_call_update_star_success(homedir, config, mocker, session_ma
mock_job = mocker.MagicMock(success_signal=star_update_successful,
failure_signal=star_update_failed)
mock_job_cls = mocker.patch("securedrop_client.logic.UpdateStarJob", return_value=mock_job)
mock_queue = mocker.patch.object(co, 'api_job_queue')
co.add_job = mocker.MagicMock()
co.add_job.emit = mocker.MagicMock()

source = factory.Source()
session.add(source)
Expand All @@ -1687,7 +1698,7 @@ def test_Controller_call_update_star_success(homedir, config, mocker, session_ma
co.update_star(source.uuid, source.is_starred)

mock_job_cls.assert_called_once_with(source.uuid, source.is_starred)
mock_queue.enqueue.assert_called_once_with(mock_job)
co.add_job.emit.assert_called_once_with(mock_job)
assert star_update_successful.connect.call_count == 1
star_update_failed.connect.assert_called_once_with(
co.on_update_star_failure, type=Qt.QueuedConnection)
Expand Down
6 changes: 3 additions & 3 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def test_RunnableQueue_resubmitted_jobs(mocker):

# Now resubmit job1 via put_nowait. It should execute prior to job2-4.
with queue.condition_add_or_remove_job:
queue.re_add_job(job1)
queue._re_add_job(job1)
assert queue.queue.get(block=True) == (1, job1)
assert queue.queue.get(block=True) == (1, job3)
assert queue.queue.get(block=True) == (2, job2)
Expand Down Expand Up @@ -187,10 +187,10 @@ def test_RunnableQueue_duplicate_jobs(mocker):
queue.add_job(msg_dl_job)
assert len(queue.queue.queue) == 2

# Ensure that using re_add_job in the case of a timeout won't allow duplicate
# Ensure that using _re_add_job in the case of a timeout won't allow duplicate
# jobs to be added.
with queue.condition_add_or_remove_job:
queue.re_add_job(msg_dl_job)
queue._re_add_job(msg_dl_job)
assert len(queue.queue.queue) == 2


Expand Down

0 comments on commit c4ded3b

Please sign in to comment.