Skip to content

Commit

Permalink
Minimize local object lookups during sync
Browse files Browse the repository at this point in the history
Reduce the number of database queries during sync: cache sources or
journalists instead of looking them up for each incoming object
associated with them.

Use maps instead of sets to hold local objects in the update_
functions, so it's faster to check if we already have a record of
incoming objects.
  • Loading branch information
rmol authored and sssoleileraaa committed Apr 2, 2020
1 parent 8934a55 commit 77bbb4b
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 30 deletions.
59 changes: 30 additions & 29 deletions securedrop_client/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
import shutil
from pathlib import Path
from dateutil.parser import parse
from typing import Any, List, Tuple, Type, Union
from typing import Any, Dict, List, Tuple, Type, Union

from sqlalchemy import and_, desc, or_
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.session import Session

from securedrop_client.db import (DraftReply, Source, Message, File, Reply, ReplySendStatus,
ReplySendStatusCodes, User)
from securedrop_client.utils import chronometer
from securedrop_client.utils import SourceCache, chronometer

from sdclientapi import API
from sdclientapi import Source as SDKSource
Expand Down Expand Up @@ -226,37 +226,34 @@ def __update_submissions(model: Union[Type[File], Type[Message]],
* Local submissions not returned in the remote submissions are deleted
from the local database.
"""
local_uuids = {submission.uuid for submission in local_submissions}
local_submissions_by_uuid = {s.uuid: s for s in local_submissions}
source_cache = SourceCache(session)
for submission in remote_submissions:
if submission.uuid in local_uuids:
local_submission = [s for s in local_submissions
if s.uuid == submission.uuid][0]

local_submission = local_submissions_by_uuid.get(submission.uuid)
if local_submission:
lazy_setattr(local_submission, "size", submission.size)
lazy_setattr(local_submission, "is_read", submission.is_read)
lazy_setattr(local_submission, "download_url", submission.download_url)

# Removing the UUID from local_uuids ensures this record won't be
# deleted at the end of this function.
local_uuids.remove(submission.uuid)
logger.debug('Updated submission {}'.format(submission.uuid))
del local_submissions_by_uuid[submission.uuid]
logger.debug(f"Updated {model.__name__} {submission.uuid}")
else:
# A new submission to be added to the database.
_, source_uuid = submission.source_url.rsplit('/', 1)
source = session.query(Source).filter_by(uuid=source_uuid).first()
source = source_cache.get(submission.source_uuid)
if source:
ns = model(source_id=source.id, uuid=submission.uuid, size=submission.size,
filename=submission.filename, download_url=submission.download_url)
session.add(ns)
logger.debug('Added new submission {}'.format(submission.uuid))
logger.debug(f"Added {model.__name__} {submission.uuid}")

# The uuids remaining in local_uuids do not exist on the remote server, so
# delete the related records.
for deleted_submission in [s for s in local_submissions
if s.uuid in local_uuids]:
for deleted_submission in local_submissions_by_uuid.values():
delete_single_submission_or_reply_on_disk(deleted_submission, data_dir)
session.delete(deleted_submission)
logger.debug('Deleted submission {}'.format(deleted_submission.uuid))
logger.debug(f"Deleted {model.__name__} {deleted_submission.uuid}")

session.commit()

Expand All @@ -272,26 +269,31 @@ def update_replies(remote_replies: List[SDKReply], local_replies: List[Reply],
If a reply references a new journalist username, add them to the database
as a new user.
"""
local_uuids = {reply.uuid for reply in local_replies}
local_replies_by_uuid = {r.uuid: r for r in local_replies}
users: Dict[str, User] = {}
source_cache = SourceCache(session)
for reply in remote_replies:
if reply.uuid in local_uuids:
local_reply = [r for r in local_replies if r.uuid == reply.uuid][0]
user = users.get(reply.journalist_uuid)
if not user:
user = find_or_create_user(
reply.journalist_uuid, reply.journalist_username, session
)
users[reply.journalist_uuid] = user

user = find_or_create_user(reply.journalist_uuid, reply.journalist_username, session)
local_reply = local_replies_by_uuid.get(reply.uuid)
if local_reply:
lazy_setattr(local_reply, "journalist_id", user.id)
lazy_setattr(local_reply, "size", reply.size)
lazy_setattr(local_reply, "filename", reply.filename)

local_uuids.remove(reply.uuid)
del local_replies_by_uuid[reply.uuid]
logger.debug('Updated reply {}'.format(reply.uuid))
else:
# A new reply to be added to the database.
source_uuid = reply.source_uuid
source = session.query(Source).filter_by(uuid=source_uuid)[0]
user = find_or_create_user(
reply.journalist_uuid,
reply.journalist_username,
session)
source = source_cache.get(reply.source_uuid)
if not source:
logger.error(f"No source found for reply {reply.uuid}")
continue

nr = Reply(uuid=reply.uuid,
journalist_id=user.id,
Expand All @@ -309,7 +311,7 @@ def update_replies(remote_replies: List[SDKReply], local_replies: List[Reply],
update_draft_replies(session, draft_reply_db_object.source.id,
draft_reply_db_object.timestamp,
draft_reply_db_object.file_counter,
nr.file_counter)
nr.file_counter, commit=False)
session.delete(draft_reply_db_object)

except NoResultFound:
Expand All @@ -319,8 +321,7 @@ def update_replies(remote_replies: List[SDKReply], local_replies: List[Reply],

# The uuids remaining in local_uuids do not exist on the remote server, so
# delete the related records.
replies_to_delete = [r for r in local_replies if r.uuid in local_uuids]
for deleted_reply in replies_to_delete:
for deleted_reply in local_replies_by_uuid.values():
delete_single_submission_or_reply_on_disk(deleted_reply, data_dir)
session.delete(deleted_reply)
logger.debug('Deleted reply {}'.format(deleted_reply.uuid))
Expand Down
23 changes: 22 additions & 1 deletion securedrop_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import time

from contextlib import contextmanager
from typing import Generator
from typing import Dict, Generator, Optional

from sqlalchemy.orm.session import Session

from securedrop_client import db


def safe_mkdir(sdc_home: str, relative_path: str = None) -> None:
Expand Down Expand Up @@ -85,3 +89,20 @@ def chronometer(logger: logging.Logger, description: str) -> Generator:
finally:
elapsed = time.perf_counter() - start
logger.debug(f"{description} duration: {elapsed:.4f}s")


class SourceCache(object):
"""
Caches Sources by UUID.
"""

def __init__(self, session: Session) -> None:
super().__init__()
self.cache: Dict[str, db.Source] = {}
self.session = session

def get(self, source_uuid: str) -> Optional[db.Source]:
if source_uuid not in self.cache:
source = self.session.query(db.Source).filter_by(uuid=source_uuid).first()
self.cache[source_uuid] = source
return self.cache.get(source_uuid)
26 changes: 26 additions & 0 deletions tests/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,32 @@ def test_update_replies_cleanup_drafts(homedir, mocker, session):
assert new_draft_replies[0].uuid == draft_reply_new.uuid


def test_update_replies_missing_source(homedir, mocker, session):
"""
Verify that a reply to an invalid source is handled.
"""
data_dir = os.path.join(homedir, 'data')

journalist = factory.User(id=1)
session.add(journalist)

source = factory.Source()
session.add(source)

# Some remote reply objects from the API, one of which will exist in the
# local database, the other will NOT exist in the local database
# (this will be added to the database)
remote_reply = make_remote_reply("nonexistent-source", journalist.uuid)
remote_replies = [remote_reply]
local_replies = []

error_logger = mocker.patch('securedrop_client.storage.logger.error')

update_replies(remote_replies, local_replies, session, data_dir)

error_logger.assert_called_once_with(f"No source found for reply {remote_reply.uuid}")


def test_find_or_create_user_existing_uuid(mocker):
"""
Return an existing user object with the referenced uuid.
Expand Down

0 comments on commit 77bbb4b

Please sign in to comment.