Skip to content

Commit

Permalink
Speed up update_local_storage in sync
Browse files Browse the repository at this point in the history
Stop importing source GPG keys during sync. We have a place on Source
to store fingerprint and key, which is all we need for the import, and
which is what we check when deciding whether to enable the reply box,
so let's just update the database here and defer import to the keyring
until someone actually wants to reply. It takes milliseconds that
won't be noticed then, but do add up during sync.

Avoid dirtying synced objects on update unless they've actually
changed.

Use maps instead of sets to hold local objects in the update_
functions, so it's faster to check if we already have a record of
incoming objects.

Reduce the number of database queries during sync: cache sources or
journalists instead of looking them up for each incoming object
associated with them.
  • Loading branch information
rmol committed Mar 27, 2020
1 parent 1b51b4b commit a52a032
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 338 deletions.
37 changes: 18 additions & 19 deletions securedrop_client/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import tempfile

from sqlalchemy.orm import scoped_session
from uuid import UUID

from securedrop_client.config import Config
from securedrop_client.db import Source
Expand Down Expand Up @@ -145,17 +144,14 @@ def _gpg_cmd_base(self) -> list:
cmd.extend(['--trust-model', 'always'])
return cmd

def import_key(self, source_uuid: UUID, key_data: str, fingerprint: str) -> None:
session = self.session_maker()
local_source = session.query(Source).filter_by(uuid=source_uuid).one()

logger.debug("Importing key with fingerprint %s", fingerprint)
self._import(key_data)

local_source.fingerprint = fingerprint
local_source.public_key = key_data
session.add(local_source)
session.commit()
def import_key(self, source: Source) -> None:
"""
Imports a Source's GPG key.
"""
logger.debug("Importing key for source %s", source.uuid)
if not source.public_key:
raise CryptoError(f"Could not import key: source {source.uuid} has no key")
self._import(source.public_key)

def _import(self, key_data: str) -> None:
'''Wrapper for `gpg --import-keys`'''
Expand Down Expand Up @@ -187,16 +183,19 @@ def encrypt_to_source(self, source_uuid: str, data: str) -> str:
session = self.session_maker()
source = session.query(Source).filter_by(uuid=source_uuid).one()

# do not attempt to encrypt if the source key is missing
if source.fingerprint is None:
raise CryptoError(
'Could not encrypt reply due to missing fingerprint for source: {}'.format(
source_uuid))

# do not attempt to encrypt if the journalist key is missing
if self.journalist_key_fingerprint is None:
if not self.journalist_key_fingerprint:
raise CryptoError('Could not encrypt reply due to missing fingerprint for journalist')

# do not attempt to encrypt if the source key is missing
if not (source.fingerprint and source.public_key):
raise CryptoError(f'Could not encrypt reply: no key for source {source_uuid}')

try:
self.import_key(source)
except CryptoError as e:
raise CryptoError("Could not import key before encrypting reply: {e}") from e

cmd = self._gpg_cmd_base()

with tempfile.NamedTemporaryFile('w+') as content, \
Expand Down
252 changes: 128 additions & 124 deletions securedrop_client/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import collections
from datetime import datetime
import logging
import os
import shutil

from pathlib import Path
from dateutil.parser import parse
from typing import List, Tuple, Type, Union
from typing import Any, Dict, List, Tuple, Type, Union

from sqlalchemy import and_, desc, or_
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.session import Session

from securedrop_client.crypto import CryptoError, GpgHelper
from securedrop_client.crypto import GpgHelper
from securedrop_client.db import (DraftReply, Source, Message, File, Reply, ReplySendStatus,
ReplySendStatusCodes, User)
from securedrop_client.utils import chronometer
from sdclientapi import API
from sdclientapi import Source as SDKSource
from sdclientapi import Submission as SDKSubmission
Expand Down Expand Up @@ -120,40 +123,29 @@ def update_local_storage(session: Session,
# The following update_* functions may change the database state.
# Because of that, each get_local_* function needs to be called just before
# its respective update_* function.
update_sources(gpg, remote_sources, get_local_sources(session), session, data_dir)
update_files(remote_files, get_local_files(session), session, data_dir)
update_messages(remote_messages, get_local_messages(session), session, data_dir)
update_replies(remote_replies, get_local_replies(session), session, data_dir)

with chronometer(logger, "update_sources"):
update_sources(gpg, remote_sources, get_local_sources(session), session, data_dir)

def update_source_key(gpg: GpgHelper, local_source: Source, remote_source: SDKSource) -> None:
"""
Updates a source's GPG key.
"""
if not remote_source.key.get("fingerprint"):
logger.error("New source data lacks key fingerprint")
return
with chronometer(logger, "update_files"):
update_files(remote_files, get_local_files(session), session, data_dir)

if not remote_source.key.get("public"):
logger.error("New source data lacks public key")
return
with chronometer(logger, "update_messages"):
update_messages(remote_messages, get_local_messages(session), session, data_dir)

if (
local_source.fingerprint == remote_source.key['fingerprint'] and
local_source.public_key == remote_source.key['public']
):
logger.debug("Source key data is unchanged")
return
with chronometer(logger, "update_replies"):
update_replies(remote_replies, get_local_replies(session), session, data_dir)

try:
# import_key updates the source's key and fingerprint, and commits
gpg.import_key(
remote_source.uuid,
remote_source.key['public'],
remote_source.key['fingerprint']
)
except CryptoError:
logger.error('Failed to update key information for source %s', remote_source.uuid)

def lazy_setattr(o: Any, a: str, v: Any) -> None:
"""
Only assign v to o.a if they differ.
Intended to avoid unnecessarily dirtying SQLAlchemy objects during
sync.
"""
if getattr(o, a) != v:
setattr(o, a, v)


def update_sources(gpg: GpgHelper, remote_sources: List[SDKSource],
Expand All @@ -173,15 +165,14 @@ def update_sources(gpg: GpgHelper, remote_sources: List[SDKSource],
if source.uuid in local_sources_by_uuid:
# Update an existing record.
local_source = local_sources_by_uuid[source.uuid]
local_source.journalist_designation = source.journalist_designation
local_source.is_flagged = source.is_flagged
local_source.interaction_count = source.interaction_count
local_source.document_count = source.number_of_documents
local_source.is_starred = source.is_starred
local_source.last_updated = parse(source.last_updated)
session.commit()

update_source_key(gpg, local_source, source)
lazy_setattr(local_source, "journalist_designation", source.journalist_designation)
lazy_setattr(local_source, "is_flagged", source.is_flagged)
lazy_setattr(local_source, "interaction_count", source.interaction_count)
lazy_setattr(local_source, "document_count", source.number_of_documents)
lazy_setattr(local_source, "is_starred", source.is_starred)
lazy_setattr(local_source, "last_updated", parse(source.last_updated))
lazy_setattr(local_source, "public_key", source.key['public'])
lazy_setattr(local_source, "fingerprint", source.key['fingerprint'])

# Removing the UUID from local_sources_by_uuid ensures
# this record won't be deleted at the end of this
Expand All @@ -190,17 +181,18 @@ def update_sources(gpg: GpgHelper, remote_sources: List[SDKSource],
logger.debug('Updated source {}'.format(source.uuid))
else:
# A new source to be added to the database.
ns = Source(uuid=source.uuid,
journalist_designation=source.journalist_designation,
is_flagged=source.is_flagged,
interaction_count=source.interaction_count,
is_starred=source.is_starred,
last_updated=parse(source.last_updated),
document_count=source.number_of_documents)
ns = Source(
uuid=source.uuid,
journalist_designation=source.journalist_designation,
is_flagged=source.is_flagged,
interaction_count=source.interaction_count,
is_starred=source.is_starred,
last_updated=parse(source.last_updated),
document_count=source.number_of_documents,
public_key=source.key['public'],
fingerprint=source.key['fingerprint'],
)
session.add(ns)
session.commit()

update_source_key(gpg, ns, source)

logger.debug('Added new source {}'.format(source.uuid))

Expand Down Expand Up @@ -237,37 +229,38 @@ def __update_submissions(model: Union[Type[File], Type[Message]],
* Local submissions not returned in the remote submissions are deleted
from the local database.
"""
local_uuids = {submission.uuid for submission in local_submissions}
for submission in remote_submissions:
if submission.uuid in local_uuids:
local_submission = [s for s in local_submissions
if s.uuid == submission.uuid][0]

local_submission.size = submission.size
local_submission.is_read = submission.is_read
local_submission.download_url = submission.download_url

# Removing the UUID from local_uuids ensures this record won't be
# deleted at the end of this function.
local_uuids.remove(submission.uuid)
logger.debug('Updated submission {}'.format(submission.uuid))
else:
# A new submission to be added to the database.
_, source_uuid = submission.source_url.rsplit('/', 1)
source = session.query(Source).filter_by(uuid=source_uuid).first()
if source:
ns = model(source_id=source.id, uuid=submission.uuid, size=submission.size,
filename=submission.filename, download_url=submission.download_url)
session.add(ns)
logger.debug('Added new submission {}'.format(submission.uuid))
local_submissions_by_uuid = {s.uuid: s for s in local_submissions}
remote_submissions_by_source: Dict[str, list] = collections.defaultdict(list)
for s in remote_submissions:
remote_submissions_by_source[s.source_uuid].append(s)

for source_uuid, submissions in remote_submissions_by_source.items():
source = session.query(Source).filter_by(uuid=source_uuid).first()
for submission in submissions:
local_submission = local_submissions_by_uuid.get(submission.uuid)
if local_submission:
lazy_setattr(local_submission, "size", submission.size)
lazy_setattr(local_submission, "is_read", submission.is_read)
lazy_setattr(local_submission, "download_url", submission.download_url)

# Removing the UUID from local_uuids ensures this record won't be
# deleted at the end of this function.
del local_submissions_by_uuid[submission.uuid]
logger.debug(f"Updated {model.__name__} {submission.uuid}")
else:
# A new submission to be added to the database.
if source:
ns = model(source_id=source.id, uuid=submission.uuid, size=submission.size,
filename=submission.filename, download_url=submission.download_url)
session.add(ns)
logger.debug(f"Added new {model.__name__} {submission.uuid}")

# The uuids remaining in local_uuids do not exist on the remote server, so
# delete the related records.
for deleted_submission in [s for s in local_submissions
if s.uuid in local_uuids]:
for deleted_submission in local_submissions_by_uuid.values():
delete_single_submission_or_reply_on_disk(deleted_submission, data_dir)
session.delete(deleted_submission)
logger.debug('Deleted submission {}'.format(deleted_submission.uuid))
logger.debug(f"Deleted {model.__name__} {deleted_submission.uuid}")

session.commit()

Expand All @@ -283,54 +276,61 @@ def update_replies(remote_replies: List[SDKReply], local_replies: List[Reply],
If a reply references a new journalist username, add them to the database
as a new user.
"""
local_uuids = {reply.uuid for reply in local_replies}
for reply in remote_replies:
if reply.uuid in local_uuids:
local_reply = [r for r in local_replies if r.uuid == reply.uuid][0]

user = find_or_create_user(reply.journalist_uuid, reply.journalist_username, session)
local_reply.journalist_id = user.id
local_reply.size = reply.size

local_uuids.remove(reply.uuid)
logger.debug('Updated reply {}'.format(reply.uuid))
else:
# A new reply to be added to the database.
source_uuid = reply.source_uuid
source = session.query(Source).filter_by(uuid=source_uuid)[0]
user = find_or_create_user(
reply.journalist_uuid,
reply.journalist_username,
session)

nr = Reply(uuid=reply.uuid,
journalist_id=user.id,
source_id=source.id,
filename=reply.filename,
size=reply.size)
session.add(nr)

# All replies fetched from the server have succeeded in being sent,
# so we should delete the corresponding draft locally if it exists.
try:
draft_reply_db_object = session.query(DraftReply).filter_by(
uuid=reply.uuid).one()

update_draft_replies(session, draft_reply_db_object.source.id,
draft_reply_db_object.timestamp,
draft_reply_db_object.file_counter,
nr.file_counter)
session.delete(draft_reply_db_object)

except NoResultFound:
pass # No draft locally stored corresponding to this reply.

logger.debug('Added new reply {}'.format(reply.uuid))
local_replies_by_uuid = {r.uuid: r for r in local_replies}

remote_replies_by_source: Dict[str, list] = collections.defaultdict(list)
for r in remote_replies:
remote_replies_by_source[r.source_uuid].append(r)

users: Dict[str, User] = {}
for source_uuid, replies in remote_replies_by_source.items():
source = session.query(Source).filter_by(uuid=source_uuid).first()
for reply in replies:
user = users.get(reply.journalist_uuid)
if not user:
logger.debug(f"user {reply.journalist_uuid} not cached")
user = find_or_create_user(
reply.journalist_uuid, reply.journalist_username, session
)
users[reply.journalist_uuid] = user

local_reply = local_replies_by_uuid.get(reply.uuid)
if local_reply:
lazy_setattr(local_reply, "journalist_id", user.id)
lazy_setattr(local_reply, "size", reply.size)
lazy_setattr(local_reply, "filename", reply.filename)

del local_replies_by_uuid[reply.uuid]
logger.debug('Updated reply {}'.format(reply.uuid))
else:
# A new reply to be added to the database.
nr = Reply(uuid=reply.uuid,
journalist_id=user.id,
source_id=source.id,
filename=reply.filename,
size=reply.size)
session.add(nr)

# All replies fetched from the server have succeeded in being sent,
# so we should delete the corresponding draft locally if it exists.
try:
draft_reply_db_object = session.query(DraftReply).filter_by(
uuid=reply.uuid).one()

update_draft_replies(session, draft_reply_db_object.source.id,
draft_reply_db_object.timestamp,
draft_reply_db_object.file_counter,
nr.file_counter, commit=False)
session.delete(draft_reply_db_object)

except NoResultFound:
pass # No draft locally stored corresponding to this reply.

logger.debug('Added new reply {}'.format(reply.uuid))

# The uuids remaining in local_uuids do not exist on the remote server, so
# delete the related records.
replies_to_delete = [r for r in local_replies if r.uuid in local_uuids]
for deleted_reply in replies_to_delete:
for deleted_reply in local_replies_by_uuid.values():
delete_single_submission_or_reply_on_disk(deleted_reply, data_dir)
session.delete(deleted_reply)
logger.debug('Deleted reply {}'.format(deleted_reply.uuid))
Expand Down Expand Up @@ -396,8 +396,11 @@ def update_missing_files(data_dir: str, session: Session) -> List[File]:
return files_that_are_missing


def update_draft_replies(session: Session, source_id: int, timestamp: datetime,
old_file_counter: int, new_file_counter: int) -> None:
def update_draft_replies(
session: Session, source_id: int, timestamp: datetime,
old_file_counter: int, new_file_counter: int,
commit: bool = True
) -> None:
"""
When we confirm a sent reply R, if there are drafts that were sent after it,
we need to reposition them to ensure that they appear _after_ the confirmed
Expand Down Expand Up @@ -431,7 +434,8 @@ def update_draft_replies(session: Session, source_id: int, timestamp: datetime,
.all():
draft_reply.file_counter = new_file_counter
session.add(draft_reply)
session.commit()
if commit:
session.commit()


def find_new_files(session: Session) -> List[File]:
Expand Down
Loading

0 comments on commit a52a032

Please sign in to comment.