Skip to content

Commit

Permalink
Merge pull request #739 from freedomofpress/make-sync-continuous
Browse files Browse the repository at this point in the history
Make sync continuous
  • Loading branch information
redshiftzero authored Feb 6, 2020
2 parents 7bbb265 + 94b1721 commit cdb1263
Show file tree
Hide file tree
Showing 10 changed files with 588 additions and 426 deletions.
59 changes: 2 additions & 57 deletions securedrop_client/api_jobs/downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.db import File, Message, Reply
from securedrop_client.storage import mark_as_decrypted, mark_as_downloaded, \
set_message_or_reply_content, get_remote_data, update_local_storage
set_message_or_reply_content


logger = logging.getLogger(__name__)

Expand All @@ -31,62 +32,6 @@ def __init__(self, message: str,
self.uuid = uuid


class MetadataSyncJob(ApiJob):
'''
Update source metadata such that new download jobs can be added to the queue.
'''

NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL = 15

def __init__(self, data_dir: str, gpg: GpgHelper) -> None:
super().__init__(remaining_attempts=self.NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL)
self.data_dir = data_dir
self.gpg = gpg

def call_api(self, api_client: API, session: Session) -> Any:
'''
Override ApiJob.
Download new metadata, update the local database, import new keys, and
then the success signal will let the controller know to add any new download
jobs.
'''

# TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to
# pass the default request timeout to api calls instead of setting it on the api object
# directly.
api_client.default_request_timeout = 40
remote_sources, remote_submissions, remote_replies = \
get_remote_data(api_client)

update_local_storage(session,
remote_sources,
remote_submissions,
remote_replies,
self.data_dir)

fingerprints = self.gpg.fingerprints()
for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
fingerprint = source.key.get('fingerprint', None)
if not pub_key or not fingerprint:
# The below line needs to be excluded from the coverage computation
# as it will show as uncovered due to a cpython compiler optimziation.
# See: https://bugs.python.org/issue2506
continue # pragma: no cover

if fingerprint in fingerprints:
logger.debug("Skipping import of key with fingerprint {}".format(fingerprint))
continue

try:
logger.debug("Importing key with fingerprint {}".format(fingerprint))
self.gpg.import_key(source.uuid, pub_key, fingerprint)
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))


class DownloadJob(ApiJob):
'''
Download and decrypt a file that contains either a message, reply, or file submission.
Expand Down
67 changes: 67 additions & 0 deletions securedrop_client/api_jobs/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Any
import logging

from sdclientapi import API
from sqlalchemy.orm.session import Session

from securedrop_client.api_jobs.base import ApiJob
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.storage import get_remote_data, update_local_storage


logger = logging.getLogger(__name__)


class MetadataSyncJob(ApiJob):
'''
Update source metadata such that new download jobs can be added to the queue.
'''

NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL = 2

def __init__(self, data_dir: str, gpg: GpgHelper) -> None:
super().__init__(remaining_attempts=self.NUMBER_OF_TIMES_TO_RETRY_AN_API_CALL)
self.data_dir = data_dir
self.gpg = gpg

def call_api(self, api_client: API, session: Session) -> Any:
'''
Override ApiJob.
Download new metadata, update the local database, import new keys, and
then the success signal will let the controller know to add any new download
jobs.
'''

# TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to
# pass the default request timeout to api calls instead of setting it on the api object
# directly.
api_client.default_request_timeout = 40
remote_sources, remote_submissions, remote_replies = get_remote_data(api_client)

update_local_storage(session,
remote_sources,
remote_submissions,
remote_replies,
self.data_dir)

fingerprints = self.gpg.fingerprints()
for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
fingerprint = source.key.get('fingerprint', None)
if not pub_key or not fingerprint:
# The below line needs to be excluded from the coverage computation
# as it will show as uncovered due to a cpython compiler optimziation.
# See: https://bugs.python.org/issue2506
continue # pragma: no cover

if fingerprint in fingerprints:
logger.debug("Skipping import of key with fingerprint {}".format(fingerprint))
continue

try:
logger.debug("Importing key with fingerprint {}".format(fingerprint))
self.gpg.import_key(source.uuid, pub_key, fingerprint)
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))
53 changes: 14 additions & 39 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@
from securedrop_client import db
from securedrop_client.api_jobs.base import ApiInaccessibleError
from securedrop_client.api_jobs.downloads import FileDownloadJob, MessageDownloadJob, \
ReplyDownloadJob, DownloadChecksumMismatchException, MetadataSyncJob
ReplyDownloadJob, DownloadChecksumMismatchException
from securedrop_client.api_jobs.sources import DeleteSourceJob
from securedrop_client.api_jobs.uploads import SendReplyJob, SendReplyJobError, \
SendReplyJobTimeoutError
from securedrop_client.api_jobs.updatestar import UpdateStarJob, UpdateStarJobException
from securedrop_client.crypto import GpgHelper
from securedrop_client.export import Export
from securedrop_client.queue import ApiJobQueue
from securedrop_client.sync import ApiSync
from securedrop_client.utils import check_dir_permissions

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -215,6 +216,12 @@ def __init__(self, hostname: str, gui, session_maker: sessionmaker,
# File data.
self.data_dir = os.path.join(self.home, 'data')

# Background sync to keep client up-to-date with server changes
self.api_sync = ApiSync(self.api, self.session_maker, self.gpg, self.data_dir)
self.api_sync.sync_started.connect(self.on_sync_started, type=Qt.QueuedConnection)
self.api_sync.sync_success.connect(self.on_sync_success, type=Qt.QueuedConnection)
self.api_sync.sync_failure.connect(self.on_sync_failure, type=Qt.QueuedConnection)

@property
def is_authenticated(self) -> bool:
return self.__is_authenticated
Expand Down Expand Up @@ -247,11 +254,6 @@ def setup(self):
self.sync_timer.timeout.connect(self.update_sync)
self.sync_timer.start(30000)

# Automagically sync with the API every minute.
self.sync_update = QTimer()
self.sync_update.timeout.connect(self.sync_api)
self.sync_update.start(1000 * 60) # every minute.

# Run export object in a separate thread context (a reference to the
# thread is kept on self such that it does not get garbage collected
# after this method returns) - we want to keep our export thread around for
Expand Down Expand Up @@ -301,20 +303,6 @@ def call_api(self,
new_api_thread.start()

def on_queue_paused(self) -> None:
# TODO: remove if block once https://github.com/freedomofpress/securedrop-client/pull/739
# is merged and rely on continuous metadata sync to encounter same auth error from the
# server which will log the user out in the on_sync_failure handler
if (
not self.api or
not self.api_job_queue.main_queue.api_client or
not self.api_job_queue.download_file_queue.api_client or
not self.api_job_queue.metadata_queue.api_client
):
self.invalidate_token()
self.logout()
self.gui.show_login(error=_('Your session expired. Please log in again.'))
return

self.gui.update_error_status(
_('The SecureDrop server cannot be reached.'),
duration=0,
Expand Down Expand Up @@ -371,7 +359,7 @@ def on_authenticate_success(self, result):
self.gui.show_main_window(user)
self.update_sources()
self.api_job_queue.login(self.api)
self.sync_api()
self.api_sync.start(self.api)
self.is_authenticated = True
self.resume_queues()

Expand All @@ -380,6 +368,7 @@ def on_authenticate_failure(self, result: Exception) -> None:
self.invalidate_token()
error = _('There was a problem signing in. Please verify your credentials and try again.')
self.gui.show_login_error(error=error)
self.api_sync.stop()

def login_offline_mode(self):
"""
Expand All @@ -405,24 +394,6 @@ def authenticated(self):
"""
return bool(self.api and self.api.token is not None)

def sync_api(self):
"""
Grab data from the remote SecureDrop API in a non-blocking manner.
"""
logger.debug("In sync_api on thread {}".format(self.thread().currentThreadId()))
if self.authenticated():
self.sync_events.emit('syncing')
logger.debug("You are authenticated, going to make your call")

job = MetadataSyncJob(self.data_dir, self.gpg)
job.success_signal.connect(self.on_sync_success, type=Qt.QueuedConnection)
job.failure_signal.connect(self.on_sync_failure, type=Qt.QueuedConnection)

self.api_job_queue.enqueue(job)

logger.debug("In sync_api, after call to submit job to queue, on "
"thread {}".format(self.thread().currentThreadId()))

def last_sync(self):
"""
Returns the time of last synchronisation with the remote SD server.
Expand All @@ -433,6 +404,9 @@ def last_sync(self):
except Exception:
return None

def on_sync_started(self) -> None:
self.sync_events.emit('syncing')

def on_sync_success(self) -> None:
"""
Called when syncronisation of data via the API queue succeeds.
Expand Down Expand Up @@ -527,6 +501,7 @@ def logout(self):
for failed_reply in failed_replies:
self.reply_failed.emit(failed_reply.uuid)

self.api_sync.stop()
self.api_job_queue.logout()
self.gui.logout()

Expand Down
Loading

0 comments on commit cdb1263

Please sign in to comment.