Skip to content

Commit

Permalink
no longer stop metadata syncs unless logged out
Browse files Browse the repository at this point in the history
  • Loading branch information
Allie Crevier committed Jan 28, 2020
1 parent 8fe8158 commit c7522fd
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 82 deletions.
50 changes: 2 additions & 48 deletions securedrop_client/api_jobs/downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.db import File, Message, Reply
from securedrop_client.storage import mark_as_decrypted, mark_as_downloaded, \
set_message_or_reply_content, get_remote_data, update_local_storage
set_message_or_reply_content


logger = logging.getLogger(__name__)

Expand All @@ -31,53 +32,6 @@ def __init__(self, message: str,
self.uuid = uuid


class MetadataSyncJob(ApiJob):
'''
Update source metadata such that new download jobs can be added to the queue.
'''

def __init__(self, data_dir: str, gpg: GpgHelper) -> None:
super().__init__(remaining_attempts=15)
self.data_dir = data_dir
self.gpg = gpg

def call_api(self, api_client: API, session: Session) -> Any:
'''
Override ApiJob.
Download new metadata, update the local database, import new keys, and
then the success signal will let the controller know to add any new download
jobs.
'''

# TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to
# pass the default request timeout to api calls instead of setting it on the api object
# directly.
api_client.default_request_timeout = 20
remote_sources, remote_submissions, remote_replies = \
get_remote_data(api_client)

update_local_storage(session,
remote_sources,
remote_submissions,
remote_replies,
self.data_dir)

for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
fingerprint = source.key.get('fingerprint', None)
if not pub_key or not fingerprint:
# The below line needs to be excluded from the coverage computation
# as it will show as uncovered due to a cpython compiler optimziation.
# See: https://bugs.python.org/issue2506
continue # pragma: no cover
try:
self.gpg.import_key(source.uuid, pub_key, fingerprint)
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))


class DownloadJob(ApiJob):
'''
Download and decrypt a file that contains either a message, reply, or file submission.
Expand Down
57 changes: 57 additions & 0 deletions securedrop_client/api_jobs/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from typing import Any
import logging

from sdclientapi import API
from sqlalchemy.orm.session import Session

from securedrop_client.api_jobs.base import ApiJob
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.storage import get_remote_data, update_local_storage


logger = logging.getLogger(__name__)


class MetadataSyncJob(ApiJob):
'''
Update source metadata such that new download jobs can be added to the queue.
'''

def __init__(self, data_dir: str, gpg: GpgHelper) -> None:
super().__init__(remaining_attempts=15)
self.data_dir = data_dir
self.gpg = gpg

def call_api(self, api_client: API, session: Session) -> Any:
'''
Override ApiJob.
Download new metadata, update the local database, import new keys, and
then the success signal will let the controller know to add any new download
jobs.
'''

# TODO: Once https://github.com/freedomofpress/securedrop-client/issues/648, we will want to
# pass the default request timeout to api calls instead of setting it on the api object
# directly.
api_client.default_request_timeout = 20
remote_sources, remote_submissions, remote_replies = get_remote_data(api_client)
update_local_storage(session,
remote_sources,
remote_submissions,
remote_replies,
self.data_dir)

for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
fingerprint = source.key.get('fingerprint', None)
if not pub_key or not fingerprint:
# The below line needs to be excluded from the coverage computation
# as it will show as uncovered due to a cpython compiler optimziation.
# See: https://bugs.python.org/issue2506
continue # pragma: no cover
try:
self.gpg.import_key(source.uuid, pub_key, fingerprint)
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))
24 changes: 16 additions & 8 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@

from securedrop_client import storage
from securedrop_client import db
from securedrop_client.sync import ApiSync
from securedrop_client.api_jobs.sync import MetadataSyncJob
from securedrop_client.api_jobs.downloads import FileDownloadJob, MessageDownloadJob, \
ReplyDownloadJob, DownloadChecksumMismatchException, MetadataSyncJob
ReplyDownloadJob, DownloadChecksumMismatchException
from securedrop_client.api_jobs.sources import DeleteSourceJob
from securedrop_client.api_jobs.uploads import SendReplyJob, SendReplyJobError, \
SendReplyJobTimeoutError
Expand Down Expand Up @@ -189,6 +191,12 @@ def __init__(self, hostname: str, gui, session_maker: sessionmaker,
# File data.
self.data_dir = os.path.join(self.home, 'data')

# Background sync to keep client up-to-date with server changes
self.api_sync = ApiSync(self.api, self.session_maker, self.gpg, self.data_dir)
self.api_sync.sync_started.connect(self.on_sync_started, type=Qt.QueuedConnection)
self.api_sync.sync_success.connect(self.on_sync_success, type=Qt.QueuedConnection)
self.api_sync.sync_failure.connect(self.on_sync_failure, type=Qt.QueuedConnection)

@property
def is_authenticated(self) -> bool:
return self.__is_authenticated
Expand Down Expand Up @@ -221,11 +229,6 @@ def setup(self):
self.sync_timer.timeout.connect(self.update_sync)
self.sync_timer.start(30000)

# Automagically sync with the API every minute.
self.sync_update = QTimer()
self.sync_update.timeout.connect(self.sync_api)
self.sync_update.start(1000 * 60) # every minute.

# Run export object in a separate thread context (a reference to the
# thread is kept on self such that it does not get garbage collected
# after this method returns) - we want to keep our export thread around for
Expand Down Expand Up @@ -334,7 +337,7 @@ def on_authenticate_success(self, result):
self.gui.show_main_window(user)
self.update_sources()
self.api_job_queue.login(self.api)
self.sync_api()
self.api_sync.start(self.api)
self.is_authenticated = True
self.resume_queues()

Expand Down Expand Up @@ -372,10 +375,12 @@ def authenticated(self):
def sync_api(self):
"""
Grab data from the remote SecureDrop API in a non-blocking manner.
TODO: This should be removed once sync_api calls have been removed from all the different
job handlers.
"""
logger.debug("In sync_api on thread {}".format(self.thread().currentThreadId()))
if self.authenticated():
self.sync_events.emit('syncing')
logger.debug("You are authenticated, going to make your call")

job = MetadataSyncJob(self.data_dir, self.gpg)
Expand All @@ -397,6 +402,9 @@ def last_sync(self):
except Exception:
return None

def on_sync_started(self) -> None:
self.sync_events.emit('syncing')

def on_sync_success(self) -> None:
"""
Called when syncronisation of data via the API queue succeeds.
Expand Down
28 changes: 2 additions & 26 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@

from securedrop_client.api_jobs.base import ApiJob, ApiInaccessibleError, DEFAULT_NUM_ATTEMPTS, \
PauseQueueJob
from securedrop_client.api_jobs.sync import MetadataSyncJob
from securedrop_client.api_jobs.downloads import (FileDownloadJob, MessageDownloadJob,
ReplyDownloadJob, MetadataSyncJob)
ReplyDownloadJob)
from securedrop_client.api_jobs.sources import DeleteSourceJob
from securedrop_client.api_jobs.uploads import SendReplyJob
from securedrop_client.api_jobs.updatestar import UpdateStarJob
Expand Down Expand Up @@ -61,11 +62,6 @@ class RunnableQueue(QObject):
'''
resume = pyqtSignal()

"""
Signal emitted when the queue successfully.
"""
pinged = pyqtSignal()

def __init__(self, api_client: API, session_maker: scoped_session, size: int = 0) -> None:
"""
A size of zero means there's no upper bound to the queue size.
Expand Down Expand Up @@ -130,7 +126,6 @@ def process(self) -> None:
try:
session = self.session_maker()
job._do_call_api(self.api_client, session)
self.pinged.emit()
except (RequestTimeoutError, ApiInaccessibleError) as e:
logger.debug('Job {} raised an exception: {}: {}'.format(self, type(e).__name__, e))
self.add_job(PauseQueueJob())
Expand All @@ -153,36 +148,27 @@ def __init__(self, api_client: API, session_maker: scoped_session) -> None:

self.main_thread = QThread()
self.download_file_thread = QThread()
self.metadata_thread = QThread()

self.main_queue = RunnableQueue(api_client, session_maker)
self.download_file_queue = RunnableQueue(api_client, session_maker)
self.metadata_queue = RunnableQueue(api_client, session_maker, size=1)

self.main_queue.moveToThread(self.main_thread)
self.download_file_queue.moveToThread(self.download_file_thread)
self.metadata_queue.moveToThread(self.metadata_thread)

self.main_thread.started.connect(self.main_queue.process)
self.download_file_thread.started.connect(self.download_file_queue.process)
self.metadata_thread.started.connect(self.metadata_queue.process)

self.main_queue.paused.connect(self.on_queue_paused)
self.download_file_queue.paused.connect(self.on_queue_paused)
self.metadata_queue.paused.connect(self.on_queue_paused)

self.metadata_queue.pinged.connect(self.resume_queues)

def logout(self) -> None:
self.main_queue.api_client = None
self.download_file_queue.api_client = None
self.metadata_queue.api_client = None

def login(self, api_client: API) -> None:
logger.debug('Passing API token to queues')
self.main_queue.api_client = api_client
self.download_file_queue.api_client = api_client
self.metadata_queue.api_client = api_client
self.start_queues()

def start_queues(self) -> None:
Expand All @@ -194,25 +180,18 @@ def start_queues(self) -> None:
logger.debug('Starting download thread')
self.download_file_thread.start()

if not self.metadata_thread.isRunning():
logger.debug("Starting metadata thread")
self.metadata_thread.start()

def on_queue_paused(self) -> None:
self.paused.emit()

def resume_queues(self) -> None:
logger.info("Resuming queues")
main_paused = not self.main_thread.isRunning()
download_paused = not self.download_file_thread.isRunning()
metadata_paused = not self.metadata_thread.isRunning()
self.start_queues()
if main_paused:
self.main_queue.resume.emit()
if download_paused:
self.download_file_queue.resume.emit()
if metadata_paused:
self.metadata_queue.resume.emit()

def enqueue(self, job: ApiJob) -> None:
# Prevent api jobs being added to the queue when not logged in.
Expand All @@ -226,9 +205,6 @@ def enqueue(self, job: ApiJob) -> None:
if isinstance(job, FileDownloadJob):
logger.debug('Adding job to download queue')
self.download_file_queue.add_job(job)
elif isinstance(job, MetadataSyncJob):
logger.debug("Adding job to metadata queue")
self.metadata_queue.add_job(job)
else:
logger.debug('Adding job to main queue')
self.main_queue.add_job(job)
76 changes: 76 additions & 0 deletions securedrop_client/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import logging

from PyQt5.QtCore import pyqtSignal, QObject, QThread, QTimer, Qt
from sqlalchemy.orm import scoped_session
from sdclientapi import API, RequestTimeoutError

from securedrop_client.api_jobs.sync import MetadataSyncJob
from securedrop_client.crypto import GpgHelper


logger = logging.getLogger(__name__)


class ApiSync(QObject):
'''
ApiSync continuously executes a MetadataSyncJob, waiting 15 seconds between jobs.
'''
sync_started = pyqtSignal()
sync_success = pyqtSignal()
sync_failure = pyqtSignal(Exception)

TIME_BETWEEN_SYNCS_MS = 1000 * 15 # fifteen seconds between syncs

def __init__(
self, api_client: API, session_maker: scoped_session, gpg: GpgHelper, data_dir: str
):
super().__init__()

self.api_client = api_client
self.session_maker = session_maker
self.gpg = gpg
self.data_dir = data_dir

self.sync_thread = QThread()
self.sync_thread.started.connect(self._sync)

def start(self, api_client: API) -> None:
'''
Stop metadata syncs.
'''
self.api_client = api_client

if not self.sync_thread.isRunning():
logger.debug('Starting sync thread')
self.sync_thread.start()

def stop(self):
'''
Stop metadata syncs.
'''
self.api_client = None

if self.sync_thread.isRunning():
logger.debug('Stopping sync thread')
self.sync_thread.quit()

def sync(self):
'''
Create and run a new MetadataSyncJob.
'''
job = MetadataSyncJob(self.data_dir, self.gpg)
job.success_signal.connect(self.on_sync_success, type=Qt.QueuedConnection)
job.failure_signal.connect(self.on_sync_failure, type=Qt.QueuedConnection)

session = self.session_maker()
job._do_call_api(self.api_client, session)
self.sync_started.emit()

def on_sync_success(self) -> None:
self.sync_success.emit()
QTimer.singleShot(self.TIME_BETWEEN_SYNCS_MS, self.sync)

def on_sync_failure(self, result: Exception) -> None:
self.sync_failure.emit(result)
if isinstance(result, RequestTimeoutError):
QTimer.singleShot(self.TIME_BETWEEN_SYNCS_MS, self.sync)

0 comments on commit c7522fd

Please sign in to comment.