diff --git a/securedrop_client/api_jobs/sync.py b/securedrop_client/api_jobs/sync.py index 741662b16d..fa11408fb4 100644 --- a/securedrop_client/api_jobs/sync.py +++ b/securedrop_client/api_jobs/sync.py @@ -17,8 +17,10 @@ class MetadataSyncJob(ApiJob): Update source metadata such that new download jobs can be added to the queue. ''' + NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL = 15 + def __init__(self, data_dir: str, gpg: GpgHelper) -> None: - super().__init__(remaining_attempts=15) + super().__init__(remaining_attempts=self.NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL) self.data_dir = data_dir self.gpg = gpg @@ -34,14 +36,17 @@ def call_api(self, api_client: API, session: Session) -> Any: # TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to # pass the default request timeout to api calls instead of setting it on the api object # directly. - api_client.default_request_timeout = 20 - remote_sources, remote_submissions, remote_replies = get_remote_data(api_client) + api_client.default_request_timeout = 40 + remote_sources, remote_submissions, remote_replies = \ + get_remote_data(api_client) + update_local_storage(session, remote_sources, remote_submissions, remote_replies, self.data_dir) + fingerprints = self.gpg.fingerprints() for source in remote_sources: if source.key and source.key.get('type', None) == 'PGP': pub_key = source.key.get('public', None) @@ -51,7 +56,13 @@ def call_api(self, api_client: API, session: Session) -> Any: # as it will show as uncovered due to a cpython compiler optimziation. # See: https://bugs.python.org/issue2506 continue # pragma: no cover + + if fingerprint in fingerprints: + logger.debug("Skipping import of key with fingerprint {}".format(fingerprint)) + continue + try: + logger.debug("Importing key with fingerprint {}".format(fingerprint)) self.gpg.import_key(source.uuid, pub_key, fingerprint) except CryptoError: logger.warning('Failed to import key for source {}'.format(source.uuid)) diff --git a/securedrop_client/logic.py b/securedrop_client/logic.py index 2bbeae50bd..954903c476 100644 --- a/securedrop_client/logic.py +++ b/securedrop_client/logic.py @@ -32,7 +32,7 @@ from securedrop_client import storage from securedrop_client import db -from securedrop_client.sync import ApiSync +from securedrop_client.api_jobs.base import ApiInaccessibleError from securedrop_client.api_jobs.downloads import FileDownloadJob, MessageDownloadJob, \ ReplyDownloadJob, DownloadChecksumMismatchException from securedrop_client.api_jobs.sources import DeleteSourceJob @@ -42,6 +42,7 @@ from securedrop_client.crypto import GpgHelper from securedrop_client.export import Export from securedrop_client.queue import ApiJobQueue +from securedrop_client.sync import ApiSync from securedrop_client.utils import check_dir_permissions logger = logging.getLogger(__name__) @@ -281,20 +282,6 @@ def call_api(self, new_api_thread.start() def on_queue_paused(self) -> None: - # TODO: remove if block once https://github.com/freedomofpress/securedrop-client/pull/739 - # is merged and rely on continuous metadata sync to encounter same auth error from the - # server which will log the user out in the on_sync_failure handler - if ( - not self.api or - not self.api_job_queue.main_queue.api_client or - not self.api_job_queue.download_file_queue.api_client or - not self.api_job_queue.metadata_queue.api_client - ): - self.invalidate_token() - self.logout() - self.gui.show_login(error=_('Your session expired. Please log in again.')) - return - self.gui.update_error_status( _('The SecureDrop server cannot be reached.'), duration=0, diff --git a/securedrop_client/queue.py b/securedrop_client/queue.py index 947359de64..27ce712918 100644 --- a/securedrop_client/queue.py +++ b/securedrop_client/queue.py @@ -46,8 +46,8 @@ class RunnableQueue(QObject): DeleteSourceJob: 14, SendReplyJob: 15, UpdateStarJob: 16, - MessageDownloadJob: 18, - ReplyDownloadJob: 18, + MessageDownloadJob: 17, + ReplyDownloadJob: 17, } ''' @@ -108,10 +108,16 @@ def process(self) -> None: If the job is a PauseQueueJob, emit the paused signal and return from the processing loop so that no more jobs are processed until the queue resumes. - If the job raises RequestTimeoutError or ApiInaccessibleError, then: + If the job raises RequestTimeoutError, then: (1) Add a PauseQueuejob to the queue (2) Add the job back to the queue so that it can be reprocessed once the queue is resumed. + If the job raises ApiInaccessibleError, then: + (1) Set the token to None so that the queue manager will stop enqueuing jobs since we are + no longer able to make api requests. + (2) Return from the processing loop since a valid token will be needed in order to process + jobs. + Note: Generic exceptions are handled in _do_call_api. ''' while True: @@ -129,7 +135,7 @@ def process(self) -> None: except ApiInaccessibleError as e: logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e)) self.api_client = None - self.add_job(PauseQueueJob()) + return except RequestTimeoutError as e: logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e)) self.add_job(PauseQueueJob()) diff --git a/tests/api_jobs/test_sync.py b/tests/api_jobs/test_sync.py index a0d9e39a95..570d16d8fc 100644 --- a/tests/api_jobs/test_sync.py +++ b/tests/api_jobs/test_sync.py @@ -1,4 +1,5 @@ import os +from uuid import UUID from securedrop_client.api_jobs.sync import MetadataSyncJob from securedrop_client.crypto import GpgHelper, CryptoError @@ -107,3 +108,55 @@ def test_MetadataSyncJob_success_with_missing_key(mocker, homedir, session, sess assert mock_key_import.call_count == 0 assert mock_get_remote_data.call_count == 1 + + +def test_MetadataSyncJob_only_import_new_source_keys(mocker, homedir, session, session_maker): + """ + Verify that we only import source keys we don't already have. + """ + class LimitedImportGpgHelper(GpgHelper): + def import_key(self, source_uuid: UUID, key_data: str, fingerprint: str) -> None: + self._import(key_data) + + gpg = LimitedImportGpgHelper(homedir, session_maker, is_qubes=False) + job = MetadataSyncJob(homedir, gpg) + + mock_source = mocker.MagicMock() + mock_source.uuid = 'bar' + mock_source.key = { + 'type': 'PGP', + 'public': PUB_KEY, + 'fingerprint': 'B2FF7FB28EED8CABEBC5FB6C6179D97BCFA52E5F', + } + + mock_get_remote_data = mocker.patch( + 'securedrop_client.api_jobs.sync.get_remote_data', + return_value=([mock_source], [], [])) + + api_client = mocker.MagicMock() + api_client.default_request_timeout = mocker.MagicMock() + + mocker.patch( + 'securedrop_client.api_jobs.sync.update_local_storage', + return_value=([mock_source], [], [])) + + mock_logger = mocker.patch('securedrop_client.api_jobs.sync.logger') + + job.call_api(api_client, session) + + assert mock_get_remote_data.call_count == 1 + assert len(gpg.fingerprints()) == 2 + + log_msg = mock_logger.debug.call_args_list[0][0][0] + assert log_msg.startswith( + 'Importing key with fingerprint {}'.format(mock_source.key['fingerprint']) + ) + + job.call_api(api_client, session) + + assert mock_get_remote_data.call_count == 2 + + log_msg = mock_logger.debug.call_args_list[1][0][0] + assert log_msg.startswith( + 'Skipping import of key with fingerprint {}'.format(mock_source.key['fingerprint']) + ) diff --git a/tests/test_logic.py b/tests/test_logic.py index a1b29b83dc..894a4d3e37 100644 --- a/tests/test_logic.py +++ b/tests/test_logic.py @@ -1415,24 +1415,6 @@ def test_Controller_on_queue_paused(homedir, config, mocker, session_maker): 'The SecureDrop server cannot be reached.', duration=0, retry=True) -def test_Controller_on_queue_paused_due_to_invalid_token(homedir, config, mocker, session_maker): - """ - If the api is inaccessible then ensure user is logged out and shown the login window. Also check - that "SecureDrop server cannot be reached" is not shown when the user is not authenticated. - """ - gui = mocker.MagicMock() - co = Controller('http://localhost', gui, session_maker, homedir) - co.api = None - co.logout = mocker.MagicMock() - co.gui = mocker.MagicMock() - co.gui.show_login = mocker.MagicMock() - - co.on_queue_paused() - - co.logout.assert_called_once_with() - co.gui.show_login.assert_called_once_with(error='Your session expired. Please log in again.') - - def test_Controller_call_update_star_success(homedir, config, mocker, session_maker, session): ''' Check that a UpdateStar is submitted to the queue when update_star is called. diff --git a/tests/test_queue.py b/tests/test_queue.py index d63dac8c9d..0c438d303f 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -3,7 +3,6 @@ ''' from queue import Queue, Full from sdclientapi import RequestTimeoutError -import pytest from securedrop_client.api_jobs.downloads import FileDownloadJob from securedrop_client.api_jobs.base import ApiInaccessibleError, PauseQueueJob @@ -45,25 +44,22 @@ def test_RunnableQueue_happy_path(mocker): assert queue.queue.empty() -def test_RunnableQueue_with_size_constraint(mocker): +def test_RunnableQueue_with_size_constraint(mocker, session_maker): ''' - Add one job to the queue, run it. + Add one job to a queue with the size constraint of 1 and see that the next job is dropped. ''' - mock_api_client = mocker.MagicMock() - mock_session = mocker.MagicMock() - mock_session_maker = mocker.MagicMock(return_value=mock_session) - return_value = 'foo' - - dummy_job_cls = factory.dummy_job_factory(mocker, return_value) - queue = RunnableQueue(mock_api_client, mock_session_maker, size=1) - queue.JOB_PRIORITIES = {dummy_job_cls: 1, PauseQueueJob: 2} + queue = RunnableQueue(mocker.MagicMock(), session_maker, size=1) + job_cls = factory.dummy_job_factory(mocker, 'mock_return_value') + queue.JOB_PRIORITIES = {job_cls: 1} + job1 = job_cls() + job2 = job_cls() - queue.add_job(dummy_job_cls()) - with pytest.raises(Full): - queue.add_job(dummy_job_cls()) - queue.add_job(dummy_job_cls()) + queue.add_job(job1) + queue.add_job(job2) assert queue.queue.qsize() == 1 + assert queue.queue.get(block=True) == (1, job1) + assert queue.queue.qsize() == 0 def test_RunnableQueue_job_timeout(mocker): @@ -203,12 +199,10 @@ def test_RunnableQueue_job_generic_exception(mocker): def test_RunnableQueue_does_not_run_jobs_when_not_authed(mocker): ''' - Check that the queue is paused when a job returns with aApiInaccessibleError. Check that the - job does not get resubmitted since it is not authorized and that its api_client is None. + Check that a job that sees an ApiInaccessibleError does not get resubmitted since it is not + authorized and that its api_client is None. ''' queue = RunnableQueue(mocker.MagicMock(), mocker.MagicMock()) - queue.paused = mocker.MagicMock() - queue.paused.emit = mocker.MagicMock() job_cls = factory.dummy_job_factory(mocker, ApiInaccessibleError()) queue.JOB_PRIORITIES = {PauseQueueJob: 0, job_cls: 1} @@ -220,7 +214,6 @@ def test_RunnableQueue_does_not_run_jobs_when_not_authed(mocker): queue.process() assert queue.queue.qsize() == 0 # queue should not contain job since it was not resubmitted assert queue.api_client is None - queue.paused.emit.assert_called_once_with() def test_ApiJobQueue_enqueue(mocker): @@ -373,7 +366,6 @@ def test_ApiJobQueue_logout_stops_queue_threads(mocker): assert not job_queue.main_thread.isRunning() assert not job_queue.download_file_thread.isRunning() - assert not job_queue.metadata_thread.isRunning() def test_ApiJobQueue_logout_results_in_queue_threads_not_running(mocker):