Skip to content

Commit

Permalink
resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Allie Crevier committed Feb 3, 2020
1 parent d2d3b94 commit d2c49eb
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 61 deletions.
17 changes: 14 additions & 3 deletions securedrop_client/api_jobs/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ class MetadataSyncJob(ApiJob):
Update source metadata such that new download jobs can be added to the queue.
'''

NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL = 15

def __init__(self, data_dir: str, gpg: GpgHelper) -> None:
super().__init__(remaining_attempts=15)
super().__init__(remaining_attempts=self.NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL)
self.data_dir = data_dir
self.gpg = gpg

Expand All @@ -34,14 +36,17 @@ def call_api(self, api_client: API, session: Session) -> Any:
# TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to
# pass the default request timeout to api calls instead of setting it on the api object
# directly.
api_client.default_request_timeout = 20
remote_sources, remote_submissions, remote_replies = get_remote_data(api_client)
api_client.default_request_timeout = 40
remote_sources, remote_submissions, remote_replies = \
get_remote_data(api_client)

update_local_storage(session,
remote_sources,
remote_submissions,
remote_replies,
self.data_dir)

fingerprints = self.gpg.fingerprints()
for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
Expand All @@ -51,7 +56,13 @@ def call_api(self, api_client: API, session: Session) -> Any:
# as it will show as uncovered due to a cpython compiler optimziation.
# See: https://bugs.python.org/issue2506
continue # pragma: no cover

if fingerprint in fingerprints:
logger.debug("Skipping import of key with fingerprint {}".format(fingerprint))
continue

try:
logger.debug("Importing key with fingerprint {}".format(fingerprint))
self.gpg.import_key(source.uuid, pub_key, fingerprint)
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))
17 changes: 2 additions & 15 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from securedrop_client import storage
from securedrop_client import db
from securedrop_client.sync import ApiSync
from securedrop_client.api_jobs.base import ApiInaccessibleError
from securedrop_client.api_jobs.downloads import FileDownloadJob, MessageDownloadJob, \
ReplyDownloadJob, DownloadChecksumMismatchException
from securedrop_client.api_jobs.sources import DeleteSourceJob
Expand All @@ -42,6 +42,7 @@
from securedrop_client.crypto import GpgHelper
from securedrop_client.export import Export
from securedrop_client.queue import ApiJobQueue
from securedrop_client.sync import ApiSync
from securedrop_client.utils import check_dir_permissions

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -281,20 +282,6 @@ def call_api(self,
new_api_thread.start()

def on_queue_paused(self) -> None:
# TODO: remove if block once https://github.com/freedomofpress/securedrop-client/pull/739
# is merged and rely on continuous metadata sync to encounter same auth error from the
# server which will log the user out in the on_sync_failure handler
if (
not self.api or
not self.api_job_queue.main_queue.api_client or
not self.api_job_queue.download_file_queue.api_client or
not self.api_job_queue.metadata_queue.api_client
):
self.invalidate_token()
self.logout()
self.gui.show_login(error=_('Your session expired. Please log in again.'))
return

self.gui.update_error_status(
_('The SecureDrop server cannot be reached.'),
duration=0,
Expand Down
14 changes: 10 additions & 4 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class RunnableQueue(QObject):
DeleteSourceJob: 14,
SendReplyJob: 15,
UpdateStarJob: 16,
MessageDownloadJob: 18,
ReplyDownloadJob: 18,
MessageDownloadJob: 17,
ReplyDownloadJob: 17,
}

'''
Expand Down Expand Up @@ -108,10 +108,16 @@ def process(self) -> None:
If the job is a PauseQueueJob, emit the paused signal and return from the processing loop so
that no more jobs are processed until the queue resumes.
If the job raises RequestTimeoutError or ApiInaccessibleError, then:
If the job raises RequestTimeoutError, then:
(1) Add a PauseQueuejob to the queue
(2) Add the job back to the queue so that it can be reprocessed once the queue is resumed.
If the job raises ApiInaccessibleError, then:
(1) Set the token to None so that the queue manager will stop enqueuing jobs since we are
no longer able to make api requests.
(2) Return from the processing loop since a valid token will be needed in order to process
jobs.
Note: Generic exceptions are handled in _do_call_api.
'''
while True:
Expand All @@ -129,7 +135,7 @@ def process(self) -> None:
except ApiInaccessibleError as e:
logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e))
self.api_client = None
self.add_job(PauseQueueJob())
return
except RequestTimeoutError as e:
logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e))
self.add_job(PauseQueueJob())
Expand Down
53 changes: 53 additions & 0 deletions tests/api_jobs/test_sync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from uuid import UUID

from securedrop_client.api_jobs.sync import MetadataSyncJob
from securedrop_client.crypto import GpgHelper, CryptoError
Expand Down Expand Up @@ -107,3 +108,55 @@ def test_MetadataSyncJob_success_with_missing_key(mocker, homedir, session, sess

assert mock_key_import.call_count == 0
assert mock_get_remote_data.call_count == 1


def test_MetadataSyncJob_only_import_new_source_keys(mocker, homedir, session, session_maker):
"""
Verify that we only import source keys we don't already have.
"""
class LimitedImportGpgHelper(GpgHelper):
def import_key(self, source_uuid: UUID, key_data: str, fingerprint: str) -> None:
self._import(key_data)

gpg = LimitedImportGpgHelper(homedir, session_maker, is_qubes=False)
job = MetadataSyncJob(homedir, gpg)

mock_source = mocker.MagicMock()
mock_source.uuid = 'bar'
mock_source.key = {
'type': 'PGP',
'public': PUB_KEY,
'fingerprint': 'B2FF7FB28EED8CABEBC5FB6C6179D97BCFA52E5F',
}

mock_get_remote_data = mocker.patch(
'securedrop_client.api_jobs.sync.get_remote_data',
return_value=([mock_source], [], []))

api_client = mocker.MagicMock()
api_client.default_request_timeout = mocker.MagicMock()

mocker.patch(
'securedrop_client.api_jobs.sync.update_local_storage',
return_value=([mock_source], [], []))

mock_logger = mocker.patch('securedrop_client.api_jobs.sync.logger')

job.call_api(api_client, session)

assert mock_get_remote_data.call_count == 1
assert len(gpg.fingerprints()) == 2

log_msg = mock_logger.debug.call_args_list[0][0][0]
assert log_msg.startswith(
'Importing key with fingerprint {}'.format(mock_source.key['fingerprint'])
)

job.call_api(api_client, session)

assert mock_get_remote_data.call_count == 2

log_msg = mock_logger.debug.call_args_list[1][0][0]
assert log_msg.startswith(
'Skipping import of key with fingerprint {}'.format(mock_source.key['fingerprint'])
)
18 changes: 0 additions & 18 deletions tests/test_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -1415,24 +1415,6 @@ def test_Controller_on_queue_paused(homedir, config, mocker, session_maker):
'The SecureDrop server cannot be reached.', duration=0, retry=True)


def test_Controller_on_queue_paused_due_to_invalid_token(homedir, config, mocker, session_maker):
"""
If the api is inaccessible then ensure user is logged out and shown the login window. Also check
that "SecureDrop server cannot be reached" is not shown when the user is not authenticated.
"""
gui = mocker.MagicMock()
co = Controller('http://localhost', gui, session_maker, homedir)
co.api = None
co.logout = mocker.MagicMock()
co.gui = mocker.MagicMock()
co.gui.show_login = mocker.MagicMock()

co.on_queue_paused()

co.logout.assert_called_once_with()
co.gui.show_login.assert_called_once_with(error='Your session expired. Please log in again.')


def test_Controller_call_update_star_success(homedir, config, mocker, session_maker, session):
'''
Check that a UpdateStar is submitted to the queue when update_star is called.
Expand Down
34 changes: 13 additions & 21 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
'''
from queue import Queue, Full
from sdclientapi import RequestTimeoutError
import pytest

from securedrop_client.api_jobs.downloads import FileDownloadJob
from securedrop_client.api_jobs.base import ApiInaccessibleError, PauseQueueJob
Expand Down Expand Up @@ -45,25 +44,22 @@ def test_RunnableQueue_happy_path(mocker):
assert queue.queue.empty()


def test_RunnableQueue_with_size_constraint(mocker):
def test_RunnableQueue_with_size_constraint(mocker, session_maker):
'''
Add one job to the queue, run it.
Add one job to a queue with the size constraint of 1 and see that the next job is dropped.
'''
mock_api_client = mocker.MagicMock()
mock_session = mocker.MagicMock()
mock_session_maker = mocker.MagicMock(return_value=mock_session)
return_value = 'foo'

dummy_job_cls = factory.dummy_job_factory(mocker, return_value)
queue = RunnableQueue(mock_api_client, mock_session_maker, size=1)
queue.JOB_PRIORITIES = {dummy_job_cls: 1, PauseQueueJob: 2}
queue = RunnableQueue(mocker.MagicMock(), session_maker, size=1)
job_cls = factory.dummy_job_factory(mocker, 'mock_return_value')
queue.JOB_PRIORITIES = {job_cls: 1}
job1 = job_cls()
job2 = job_cls()

queue.add_job(dummy_job_cls())
with pytest.raises(Full):
queue.add_job(dummy_job_cls())
queue.add_job(dummy_job_cls())
queue.add_job(job1)
queue.add_job(job2)

assert queue.queue.qsize() == 1
assert queue.queue.get(block=True) == (1, job1)
assert queue.queue.qsize() == 0


def test_RunnableQueue_job_timeout(mocker):
Expand Down Expand Up @@ -203,12 +199,10 @@ def test_RunnableQueue_job_generic_exception(mocker):

def test_RunnableQueue_does_not_run_jobs_when_not_authed(mocker):
'''
Check that the queue is paused when a job returns with aApiInaccessibleError. Check that the
job does not get resubmitted since it is not authorized and that its api_client is None.
Check that a job that sees an ApiInaccessibleError does not get resubmitted since it is not
authorized and that its api_client is None.
'''
queue = RunnableQueue(mocker.MagicMock(), mocker.MagicMock())
queue.paused = mocker.MagicMock()
queue.paused.emit = mocker.MagicMock()
job_cls = factory.dummy_job_factory(mocker, ApiInaccessibleError())
queue.JOB_PRIORITIES = {PauseQueueJob: 0, job_cls: 1}

Expand All @@ -220,7 +214,6 @@ def test_RunnableQueue_does_not_run_jobs_when_not_authed(mocker):
queue.process()
assert queue.queue.qsize() == 0 # queue should not contain job since it was not resubmitted
assert queue.api_client is None
queue.paused.emit.assert_called_once_with()


def test_ApiJobQueue_enqueue(mocker):
Expand Down Expand Up @@ -373,7 +366,6 @@ def test_ApiJobQueue_logout_stops_queue_threads(mocker):

assert not job_queue.main_thread.isRunning()
assert not job_queue.download_file_thread.isRunning()
assert not job_queue.metadata_thread.isRunning()


def test_ApiJobQueue_logout_results_in_queue_threads_not_running(mocker):
Expand Down

0 comments on commit d2c49eb

Please sign in to comment.