From c7522fd87f7a6070f48821cf597ef39b09c9ff52 Mon Sep 17 00:00:00 2001 From: Allie Crevier Date: Mon, 27 Jan 2020 14:08:32 -0800 Subject: [PATCH] no longer stop metadata syncs unless logged out --- securedrop_client/api_jobs/downloads.py | 50 +--------------- securedrop_client/api_jobs/sync.py | 57 +++++++++++++++++++ securedrop_client/logic.py | 24 +++++--- securedrop_client/queue.py | 28 +-------- securedrop_client/sync.py | 76 +++++++++++++++++++++++++ 5 files changed, 153 insertions(+), 82 deletions(-) create mode 100644 securedrop_client/api_jobs/sync.py create mode 100644 securedrop_client/sync.py diff --git a/securedrop_client/api_jobs/downloads.py b/securedrop_client/api_jobs/downloads.py index 1f3ac37294..414d9cbebf 100644 --- a/securedrop_client/api_jobs/downloads.py +++ b/securedrop_client/api_jobs/downloads.py @@ -17,7 +17,8 @@ from securedrop_client.crypto import GpgHelper, CryptoError from securedrop_client.db import File, Message, Reply from securedrop_client.storage import mark_as_decrypted, mark_as_downloaded, \ - set_message_or_reply_content, get_remote_data, update_local_storage + set_message_or_reply_content + logger = logging.getLogger(__name__) @@ -31,53 +32,6 @@ def __init__(self, message: str, self.uuid = uuid -class MetadataSyncJob(ApiJob): - ''' - Update source metadata such that new download jobs can be added to the queue. - ''' - - def __init__(self, data_dir: str, gpg: GpgHelper) -> None: - super().__init__(remaining_attempts=15) - self.data_dir = data_dir - self.gpg = gpg - - def call_api(self, api_client: API, session: Session) -> Any: - ''' - Override ApiJob. - - Download new metadata, update the local database, import new keys, and - then the success signal will let the controller know to add any new download - jobs. - ''' - - # TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to - # pass the default request timeout to api calls instead of setting it on the api object - # directly. - api_client.default_request_timeout = 20 - remote_sources, remote_submissions, remote_replies = \ - get_remote_data(api_client) - - update_local_storage(session, - remote_sources, - remote_submissions, - remote_replies, - self.data_dir) - - for source in remote_sources: - if source.key and source.key.get('type', None) == 'PGP': - pub_key = source.key.get('public', None) - fingerprint = source.key.get('fingerprint', None) - if not pub_key or not fingerprint: - # The below line needs to be excluded from the coverage computation - # as it will show as uncovered due to a cpython compiler optimziation. - # See: https://bugs.python.org/issue2506 - continue # pragma: no cover - try: - self.gpg.import_key(source.uuid, pub_key, fingerprint) - except CryptoError: - logger.warning('Failed to import key for source {}'.format(source.uuid)) - - class DownloadJob(ApiJob): ''' Download and decrypt a file that contains either a message, reply, or file submission. diff --git a/securedrop_client/api_jobs/sync.py b/securedrop_client/api_jobs/sync.py new file mode 100644 index 0000000000..741662b16d --- /dev/null +++ b/securedrop_client/api_jobs/sync.py @@ -0,0 +1,57 @@ +from typing import Any +import logging + +from sdclientapi import API +from sqlalchemy.orm.session import Session + +from securedrop_client.api_jobs.base import ApiJob +from securedrop_client.crypto import GpgHelper, CryptoError +from securedrop_client.storage import get_remote_data, update_local_storage + + +logger = logging.getLogger(__name__) + + +class MetadataSyncJob(ApiJob): + ''' + Update source metadata such that new download jobs can be added to the queue. + ''' + + def __init__(self, data_dir: str, gpg: GpgHelper) -> None: + super().__init__(remaining_attempts=15) + self.data_dir = data_dir + self.gpg = gpg + + def call_api(self, api_client: API, session: Session) -> Any: + ''' + Override ApiJob. + + Download new metadata, update the local database, import new keys, and + then the success signal will let the controller know to add any new download + jobs. + ''' + + # TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to + # pass the default request timeout to api calls instead of setting it on the api object + # directly. + api_client.default_request_timeout = 20 + remote_sources, remote_submissions, remote_replies = get_remote_data(api_client) + update_local_storage(session, + remote_sources, + remote_submissions, + remote_replies, + self.data_dir) + + for source in remote_sources: + if source.key and source.key.get('type', None) == 'PGP': + pub_key = source.key.get('public', None) + fingerprint = source.key.get('fingerprint', None) + if not pub_key or not fingerprint: + # The below line needs to be excluded from the coverage computation + # as it will show as uncovered due to a cpython compiler optimziation. + # See: https://bugs.python.org/issue2506 + continue # pragma: no cover + try: + self.gpg.import_key(source.uuid, pub_key, fingerprint) + except CryptoError: + logger.warning('Failed to import key for source {}'.format(source.uuid)) diff --git a/securedrop_client/logic.py b/securedrop_client/logic.py index 0302ff1f15..cfa12dd2f3 100644 --- a/securedrop_client/logic.py +++ b/securedrop_client/logic.py @@ -32,8 +32,10 @@ from securedrop_client import storage from securedrop_client import db +from securedrop_client.sync import ApiSync +from securedrop_client.api_jobs.sync import MetadataSyncJob from securedrop_client.api_jobs.downloads import FileDownloadJob, MessageDownloadJob, \ - ReplyDownloadJob, DownloadChecksumMismatchException, MetadataSyncJob + ReplyDownloadJob, DownloadChecksumMismatchException from securedrop_client.api_jobs.sources import DeleteSourceJob from securedrop_client.api_jobs.uploads import SendReplyJob, SendReplyJobError, \ SendReplyJobTimeoutError @@ -189,6 +191,12 @@ def __init__(self, hostname: str, gui, session_maker: sessionmaker, # File data. self.data_dir = os.path.join(self.home, 'data') + # Background sync to keep client up-to-date with server changes + self.api_sync = ApiSync(self.api, self.session_maker, self.gpg, self.data_dir) + self.api_sync.sync_started.connect(self.on_sync_started, type=Qt.QueuedConnection) + self.api_sync.sync_success.connect(self.on_sync_success, type=Qt.QueuedConnection) + self.api_sync.sync_failure.connect(self.on_sync_failure, type=Qt.QueuedConnection) + @property def is_authenticated(self) -> bool: return self.__is_authenticated @@ -221,11 +229,6 @@ def setup(self): self.sync_timer.timeout.connect(self.update_sync) self.sync_timer.start(30000) - # Automagically sync with the API every minute. - self.sync_update = QTimer() - self.sync_update.timeout.connect(self.sync_api) - self.sync_update.start(1000 * 60) # every minute. - # Run export object in a separate thread context (a reference to the # thread is kept on self such that it does not get garbage collected # after this method returns) - we want to keep our export thread around for @@ -334,7 +337,7 @@ def on_authenticate_success(self, result): self.gui.show_main_window(user) self.update_sources() self.api_job_queue.login(self.api) - self.sync_api() + self.api_sync.start(self.api) self.is_authenticated = True self.resume_queues() @@ -372,10 +375,12 @@ def authenticated(self): def sync_api(self): """ Grab data from the remote SecureDrop API in a non-blocking manner. + + TODO: This should be removed once sync_api calls have been removed from all the different + job handlers. """ logger.debug("In sync_api on thread {}".format(self.thread().currentThreadId())) if self.authenticated(): - self.sync_events.emit('syncing') logger.debug("You are authenticated, going to make your call") job = MetadataSyncJob(self.data_dir, self.gpg) @@ -397,6 +402,9 @@ def last_sync(self): except Exception: return None + def on_sync_started(self) -> None: + self.sync_events.emit('syncing') + def on_sync_success(self) -> None: """ Called when syncronisation of data via the API queue succeeds. diff --git a/securedrop_client/queue.py b/securedrop_client/queue.py index a52ce45176..e04722d12a 100644 --- a/securedrop_client/queue.py +++ b/securedrop_client/queue.py @@ -9,8 +9,9 @@ from securedrop_client.api_jobs.base import ApiJob, ApiInaccessibleError, DEFAULT_NUM_ATTEMPTS, \ PauseQueueJob +from securedrop_client.api_jobs.sync import MetadataSyncJob from securedrop_client.api_jobs.downloads import (FileDownloadJob, MessageDownloadJob, - ReplyDownloadJob, MetadataSyncJob) + ReplyDownloadJob) from securedrop_client.api_jobs.sources import DeleteSourceJob from securedrop_client.api_jobs.uploads import SendReplyJob from securedrop_client.api_jobs.updatestar import UpdateStarJob @@ -61,11 +62,6 @@ class RunnableQueue(QObject): ''' resume = pyqtSignal() - """ - Signal emitted when the queue successfully. - """ - pinged = pyqtSignal() - def __init__(self, api_client: API, session_maker: scoped_session, size: int = 0) -> None: """ A size of zero means there's no upper bound to the queue size. @@ -130,7 +126,6 @@ def process(self) -> None: try: session = self.session_maker() job._do_call_api(self.api_client, session) - self.pinged.emit() except (RequestTimeoutError, ApiInaccessibleError) as e: logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e)) self.add_job(PauseQueueJob()) @@ -153,36 +148,27 @@ def __init__(self, api_client: API, session_maker: scoped_session) -> None: self.main_thread = QThread() self.download_file_thread = QThread() - self.metadata_thread = QThread() self.main_queue = RunnableQueue(api_client, session_maker) self.download_file_queue = RunnableQueue(api_client, session_maker) - self.metadata_queue = RunnableQueue(api_client, session_maker, size=1) self.main_queue.moveToThread(self.main_thread) self.download_file_queue.moveToThread(self.download_file_thread) - self.metadata_queue.moveToThread(self.metadata_thread) self.main_thread.started.connect(self.main_queue.process) self.download_file_thread.started.connect(self.download_file_queue.process) - self.metadata_thread.started.connect(self.metadata_queue.process) self.main_queue.paused.connect(self.on_queue_paused) self.download_file_queue.paused.connect(self.on_queue_paused) - self.metadata_queue.paused.connect(self.on_queue_paused) - - self.metadata_queue.pinged.connect(self.resume_queues) def logout(self) -> None: self.main_queue.api_client = None self.download_file_queue.api_client = None - self.metadata_queue.api_client = None def login(self, api_client: API) -> None: logger.debug('Passing API token to queues') self.main_queue.api_client = api_client self.download_file_queue.api_client = api_client - self.metadata_queue.api_client = api_client self.start_queues() def start_queues(self) -> None: @@ -194,10 +180,6 @@ def start_queues(self) -> None: logger.debug('Starting download thread') self.download_file_thread.start() - if not self.metadata_thread.isRunning(): - logger.debug("Starting metadata thread") - self.metadata_thread.start() - def on_queue_paused(self) -> None: self.paused.emit() @@ -205,14 +187,11 @@ def resume_queues(self) -> None: logger.info("Resuming queues") main_paused = not self.main_thread.isRunning() download_paused = not self.download_file_thread.isRunning() - metadata_paused = not self.metadata_thread.isRunning() self.start_queues() if main_paused: self.main_queue.resume.emit() if download_paused: self.download_file_queue.resume.emit() - if metadata_paused: - self.metadata_queue.resume.emit() def enqueue(self, job: ApiJob) -> None: # Prevent api jobs being added to the queue when not logged in. @@ -226,9 +205,6 @@ def enqueue(self, job: ApiJob) -> None: if isinstance(job, FileDownloadJob): logger.debug('Adding job to download queue') self.download_file_queue.add_job(job) - elif isinstance(job, MetadataSyncJob): - logger.debug("Adding job to metadata queue") - self.metadata_queue.add_job(job) else: logger.debug('Adding job to main queue') self.main_queue.add_job(job) diff --git a/securedrop_client/sync.py b/securedrop_client/sync.py new file mode 100644 index 0000000000..d722575947 --- /dev/null +++ b/securedrop_client/sync.py @@ -0,0 +1,76 @@ +import logging + +from PyQt5.QtCore import pyqtSignal, QObject, QThread, QTimer, Qt +from sqlalchemy.orm import scoped_session +from sdclientapi import API, RequestTimeoutError + +from securedrop_client.api_jobs.sync import MetadataSyncJob +from securedrop_client.crypto import GpgHelper + + +logger = logging.getLogger(__name__) + + +class ApiSync(QObject): + ''' + ApiSync continuously executes a MetadataSyncJob, waiting 15 seconds between jobs. + ''' + sync_started = pyqtSignal() + sync_success = pyqtSignal() + sync_failure = pyqtSignal(Exception) + + TIME_BETWEEN_SYNCS_MS = 1000 * 15 # fifteen seconds between syncs + + def __init__( + self, api_client: API, session_maker: scoped_session, gpg: GpgHelper, data_dir: str + ): + super().__init__() + + self.api_client = api_client + self.session_maker = session_maker + self.gpg = gpg + self.data_dir = data_dir + + self.sync_thread = QThread() + self.sync_thread.started.connect(self._sync) + + def start(self, api_client: API) -> None: + ''' + Stop metadata syncs. + ''' + self.api_client = api_client + + if not self.sync_thread.isRunning(): + logger.debug('Starting sync thread') + self.sync_thread.start() + + def stop(self): + ''' + Stop metadata syncs. + ''' + self.api_client = None + + if self.sync_thread.isRunning(): + logger.debug('Stopping sync thread') + self.sync_thread.quit() + + def sync(self): + ''' + Create and run a new MetadataSyncJob. + ''' + job = MetadataSyncJob(self.data_dir, self.gpg) + job.success_signal.connect(self.on_sync_success, type=Qt.QueuedConnection) + job.failure_signal.connect(self.on_sync_failure, type=Qt.QueuedConnection) + + session = self.session_maker() + job._do_call_api(self.api_client, session) + self.sync_started.emit() + + def on_sync_success(self) -> None: + self.sync_success.emit() + QTimer.singleShot(self.TIME_BETWEEN_SYNCS_MS, self.sync) + + def on_sync_failure(self, result: Exception) -> None: + self.sync_failure.emit(result) + if isinstance(result, RequestTimeoutError): + QTimer.singleShot(self.TIME_BETWEEN_SYNCS_MS, self.sync)