Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 389 sync #405

Merged
merged 8 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 57 additions & 8 deletions securedrop_client/api_jobs/downloads.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,70 @@
import os
import sdclientapi
import shutil
from tempfile import NamedTemporaryFile
from typing import Any, Union, Type, Tuple

from sdclientapi import API
from sqlalchemy.orm.session import Session
from typing import Any, Union, Type, Tuple

from securedrop_client import storage
from securedrop_client.api_jobs.base import ApiJob
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.db import File, Message

from securedrop_client.db import File, Message, Reply
from securedrop_client.storage import mark_message_as_downloaded, mark_file_as_downloaded, \
set_object_decryption_status_with_content

logger = logging.getLogger(__name__)


class DownloadSubmissionJob(ApiJob):
class MessageDownloadJob(ApiJob):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the plan still to inherit from a new common object to reduce code duplication with FileDownloadJob?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! That's the plan: MessageDownloadJob, FileDownloadJob, and ReplyDownloadJob will all inherit from a DownloadJob class. I tried to keep the _decrypt_file method generic enough so that we can move it into DownloadJob. There should be a follow-up PR for this.

I just wanted to keep code changes small here since deleting the message_sync thread felt like a big change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filed #406 for this


def __init__(self, uuid: str, download_dir: str, gpg: GpgHelper) -> None:
super().__init__()
self.uuid = uuid
self.download_dir = download_dir
self.gpg = gpg
self.type = Message

def call_api(self, api_client: API, session: Session) -> Any:
# Download
db_object = session.query(self.type).filter_by(uuid=self.uuid).one()
if not db_object.is_downloaded:
_, filepath = self._make_call(db_object, api_client)
mark_message_as_downloaded(db_object.uuid, session)
else:
filepath = os.path.join(self.download_dir, db_object.filename)

# Decrypt
self._decrypt_file(session, db_object, filepath)

return db_object.uuid

def _make_call(self, db_object: Message, api_client: API) -> Tuple[str, str]:
sdk_obj = sdclientapi.Submission(uuid=db_object.uuid)
sdk_obj.filename = db_object.filename
sdk_obj.source_uuid = db_object.source.uuid

return api_client.download_submission(sdk_obj)

def _decrypt_file(
self,
session: Session,
encrypted_file: Union[File, Message, Reply],
filepath: str,
) -> None:
with NamedTemporaryFile('w+') as plaintext_file:
try:
self.gpg.decrypt_submission_or_reply(filepath, plaintext_file.name, False)
plaintext_file.seek(0)
content = plaintext_file.read()
set_object_decryption_status_with_content(encrypted_file, session, True, content)
logger.info("File decrypted: {}".format(encrypted_file.filename))
except CryptoError:
set_object_decryption_status_with_content(encrypted_file, session, False)
logger.info("Failed to decrypt file: {}".format(encrypted_file.filename))


class FileDownloadJob(ApiJob):

CHUNK_SIZE = 4096

Expand Down Expand Up @@ -99,13 +148,13 @@ def _decrypt_file(
# server (e.g. spotless-tater-msg.gpg).
filepath_in_datadir = os.path.join(self.data_dir, server_filename)
shutil.move(file_path, filepath_in_datadir)
storage.mark_file_as_downloaded(file_uuid, session)
mark_file_as_downloaded(file_uuid, session)

try:
self.gpg.decrypt_submission_or_reply(filepath_in_datadir, server_filename, is_doc=True)
except CryptoError as e:
logger.debug('Failed to decrypt file {}: {}'.format(server_filename, e))
storage.set_object_decryption_status_with_content(db_object, session, False)
set_object_decryption_status_with_content(db_object, session, False)
raise e

storage.set_object_decryption_status_with_content(db_object, session, True)
set_object_decryption_status_with_content(db_object, session, True)
2 changes: 1 addition & 1 deletion securedrop_client/gui/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,7 @@ def add_message(self, message: Message) -> None:
self.conversation_layout.addWidget(MessageWidget(
message.uuid,
content,
self.controller.message_sync.message_ready))
self.controller.message_ready))

def add_reply(self, reply: Reply) -> None:
"""
Expand Down
75 changes: 45 additions & 30 deletions securedrop_client/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@
import os
import sdclientapi
import uuid
from typing import Dict, Tuple, Union, Any, Type # noqa: F401

from gettext import gettext as _
from PyQt5.QtCore import QObject, QThread, pyqtSignal, QTimer, QProcess, Qt
from sdclientapi import RequestTimeoutError
from sqlalchemy.orm.session import sessionmaker
from typing import Dict, Tuple, Union, Any, Type # noqa: F401

from securedrop_client import storage
from securedrop_client import db
from securedrop_client.api_jobs.downloads import DownloadSubmissionJob
from securedrop_client.api_jobs.downloads import FileDownloadJob, MessageDownloadJob
from securedrop_client.api_jobs.uploads import SendReplyJob
from securedrop_client.crypto import GpgHelper, CryptoError
from securedrop_client.message_sync import MessageSync, ReplySync
from securedrop_client.message_sync import ReplySync
from securedrop_client.queue import ApiJobQueue
from securedrop_client.utils import check_dir_permissions

Expand Down Expand Up @@ -119,6 +119,12 @@ class Controller(QObject):
"""
file_ready = pyqtSignal(str)

"""
This signal indicates that a file has been successfully downloaded by emitting the file's
UUID as a string.
"""
message_ready = pyqtSignal([str, str])

def __init__(self, hostname: str, gui, session_maker: sessionmaker,
home: str, proxy: bool = True) -> None:
"""
Expand Down Expand Up @@ -159,10 +165,6 @@ def __init__(self, hostname: str, gui, session_maker: sessionmaker,

self.gpg = GpgHelper(home, self.session_maker, proxy)

# thread responsible for fetching messages
self.message_thread = None
self.message_sync = MessageSync(self.api, self.gpg, self.session_maker)

# thread responsible for fetching replies
self.reply_thread = None
self.reply_sync = ReplySync(self.api, self.gpg, self.session_maker)
Expand Down Expand Up @@ -271,19 +273,6 @@ def completed_api_call(self, thread_id, user_callback):
else:
user_callback(result_data)

def start_message_thread(self):
"""
Starts the message-fetching thread in the background.
"""
if not self.message_thread:
self.message_sync.api = self.api
self.message_thread = QThread()
self.message_sync.moveToThread(self.message_thread)
self.message_thread.started.connect(self.message_sync.run)
self.message_thread.start()
else: # Already running from last login
self.message_sync.api = self.api

def start_reply_thread(self):
"""
Starts the reply-fetching thread in the background.
Expand Down Expand Up @@ -316,7 +305,6 @@ def on_authenticate_success(self, result):
self.sync_api()
self.gui.show_main_window(self.api.username)

self.start_message_thread()
self.start_reply_thread()

self.api_job_queue.start_queues(self.api)
Expand All @@ -340,7 +328,6 @@ def login_offline_mode(self):
"""
self.gui.hide_login()
self.gui.show_main_window()
self.start_message_thread()
self.start_reply_thread()
self.is_authenticated = False
self.update_sources()
Expand Down Expand Up @@ -389,19 +376,19 @@ def on_sync_success(self, result) -> None:
"""
Called when syncronisation of data via the API succeeds
"""
# Update db with new metadata
remote_sources, remote_submissions, remote_replies = result

storage.update_local_storage(self.session,
remote_sources,
remote_submissions,
remote_replies,
self.data_dir)

# Set last sync flag.
# Set last sync flag
with open(self.sync_flag, 'w') as f:
f.write(arrow.now().format())

# import keys into keyring
# Import keys into keyring
for source in remote_sources:
if source.key and source.key.get('type', None) == 'PGP':
pub_key = source.key.get('public', None)
Expand All @@ -413,15 +400,19 @@ def on_sync_success(self, result) -> None:
except CryptoError:
logger.warning('Failed to import key for source {}'.format(source.uuid))

self.sync_events.emit('synced')
# Update sources
self.update_sources()

# Download new messages
self.download_new_messages()

self.sync_events.emit('synced')

def on_sync_failure(self, result: Exception) -> None:
"""
Called when syncronisation of data via the API fails.
"""
pass
self.update_sources()
logger.debug('Sync failed: "{}".'.format(result))

def update_sync(self):
"""
Expand Down Expand Up @@ -484,7 +475,6 @@ def logout(self):
state.
"""
self.api = None
self.message_sync.api = None
self.reply_sync.api = None
self.gui.logout()
self.is_authenticated = False
Expand All @@ -496,6 +486,31 @@ def set_status(self, message, duration=5000):
"""
self.gui.update_activity_status(message, duration)

def download_new_messages(self) -> None:
messages = storage.find_new_messages(self.session)

if len(messages) > 0:
self.set_status(_('Downloading new messages'))

for message in messages:
job = MessageDownloadJob(message.uuid, self.data_dir, self.gpg)
job.success_signal.connect(self.on_message_download_success, type=Qt.QueuedConnection)
job.failure_signal.connect(self.on_message_download_failure, type=Qt.QueuedConnection)
self.api_job_queue.enqueue(job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯


def on_message_download_success(self, uuid: str) -> None:
"""
Called when a message has downloaded.
"""
message = storage.get_message(self.session, uuid)
self.message_ready.emit(message.uuid, message.content)

def on_message_download_failure(self, exception: Exception) -> None:
"""
Called when a message fails to download.
"""
self.set_status("The message download failed.")

def on_file_open(self, file_uuid: str) -> None:
"""
Open the already downloaded file associated with the message (which is a `File`).
Expand Down Expand Up @@ -528,7 +543,7 @@ def on_submission_download(
"""
Download the file associated with the Submission (which may be a File or Message).
"""
job = DownloadSubmissionJob(
job = FileDownloadJob(
submission_type,
submission_uuid,
self.data_dir,
Expand Down
58 changes: 1 addition & 57 deletions securedrop_client/message_sync.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Contains the MessageSync class, which runs in the background and loads new
Contains the ReplySync class, which runs in the background and loads new
messages from the SecureDrop server.

Copyright (C) 2018 The Freedom of the Press Foundation.
Expand Down Expand Up @@ -81,62 +81,6 @@ def fetch_the_thing(
self.decrypt_the_thing(session, filepath, msg)


class MessageSync(APISyncObject):
"""
Runs in the background, finding messages to download and downloading them.
"""

"""
Signal emitted notifying that a message is ready to be displayed. The signal is a tuple of
(str, str) containing the message's UUID and the content of the message.
"""
message_ready = pyqtSignal([str, str])

def run(self, loop: bool = True) -> None:
session = self.session_maker()
while True:
submissions = storage.find_new_messages(session)

for db_submission in submissions:
try:
sdk_submission = sdkobjects.Submission(
uuid=db_submission.uuid
)
sdk_submission.source_uuid = db_submission.source.uuid
# Need to set filename on non-Qubes platforms
sdk_submission.filename = db_submission.filename

if not db_submission.is_downloaded and self.api:
# Download and decrypt
self.fetch_the_thing(session,
sdk_submission,
db_submission,
self.api.download_submission,
storage.mark_message_as_downloaded)
elif db_submission.is_downloaded:
# Just decrypt file that is already on disk
self.decrypt_the_thing(session,
db_submission.filename,
db_submission)

if db_submission.content is not None:
content = db_submission.content
else:
content = '<Message not yet available>'

self.message_ready.emit(db_submission.uuid, content)
except Exception:
tb = traceback.format_exc()
logger.critical("Exception while processing message!\n{}".format(tb))

logger.debug('Messages synced')

if not loop:
break
else:
time.sleep(5) # pragma: no cover


class ReplySync(APISyncObject):
"""
Runs in the background, finding replies to download and downloading them.
Expand Down
18 changes: 9 additions & 9 deletions securedrop_client/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Optional # noqa: F401

from securedrop_client.api_jobs.base import ApiJob
from securedrop_client.api_jobs.downloads import DownloadSubmissionJob
from securedrop_client.api_jobs.downloads import FileDownloadJob


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -57,26 +57,26 @@ def __init__(self, api_client: API, session_maker: scoped_session) -> None:
self.api_client = api_client

self.main_thread = QThread()
self.download_thread = QThread()
self.download_file_thread = QThread()

self.main_queue = RunnableQueue(self.api_client, session_maker)
self.download_queue = RunnableQueue(self.api_client, session_maker)
self.download_file_queue = RunnableQueue(self.api_client, session_maker)

self.main_queue.moveToThread(self.main_thread)
self.download_queue.moveToThread(self.download_thread)
self.download_file_queue.moveToThread(self.download_file_thread)

self.main_thread.started.connect(self.main_queue.process)
self.download_thread.started.connect(self.download_queue.process)
self.download_file_thread.started.connect(self.download_file_queue.process)

def start_queues(self, api_client: API) -> None:
self.main_queue.api_client = api_client
self.download_queue.api_client = api_client
self.download_file_queue.api_client = api_client

self.main_thread.start()
self.download_thread.start()
self.download_file_thread.start()

def enqueue(self, job: ApiJob) -> None:
if isinstance(job, DownloadSubmissionJob):
self.download_queue.queue.put_nowait(job)
if isinstance(job, FileDownloadJob):
self.download_file_queue.queue.put_nowait(job)
else:
self.main_queue.queue.put_nowait(job)
Loading