From e13184d093d5795ffd50ca0c2f572c2a28f0d417 Mon Sep 17 00:00:00 2001 From: John Hensley Date: Tue, 31 Mar 2020 16:13:35 -0400 Subject: [PATCH] Minimize local object lookups during sync Reduce the number of database queries during sync: cache sources or journalists instead of looking them up for each incoming object associated with them. Use maps instead of sets to hold local objects in the update_ functions, so it's faster to check if we already have a record of incoming objects. --- securedrop_client/storage.py | 153 ++++++++++++++++++----------------- 1 file changed, 80 insertions(+), 73 deletions(-) diff --git a/securedrop_client/storage.py b/securedrop_client/storage.py index 72cf46a8ca..3e8a979260 100644 --- a/securedrop_client/storage.py +++ b/securedrop_client/storage.py @@ -19,13 +19,14 @@ You should have received a copy of the GNU Affero General Public License along with this program. If not, see . """ +import collections from datetime import datetime import logging import os import shutil from pathlib import Path from dateutil.parser import parse -from typing import Any, List, Tuple, Type, Union +from typing import Any, Dict, List, Tuple, Type, Union from sqlalchemy import and_, desc, or_ from sqlalchemy.orm.exc import NoResultFound @@ -226,37 +227,37 @@ def __update_submissions(model: Union[Type[File], Type[Message]], * Local submissions not returned in the remote submissions are deleted from the local database. """ - local_uuids = {submission.uuid for submission in local_submissions} - for submission in remote_submissions: - if submission.uuid in local_uuids: - local_submission = [s for s in local_submissions - if s.uuid == submission.uuid][0] - - lazy_setattr(local_submission, "size", submission.size) - lazy_setattr(local_submission, "is_read", submission.is_read) - lazy_setattr(local_submission, "download_url", submission.download_url) - - # Removing the UUID from local_uuids ensures this record won't be - # deleted at the end of this function. - local_uuids.remove(submission.uuid) - logger.debug('Updated submission {}'.format(submission.uuid)) - else: - # A new submission to be added to the database. - _, source_uuid = submission.source_url.rsplit('/', 1) - source = session.query(Source).filter_by(uuid=source_uuid).first() - if source: - ns = model(source_id=source.id, uuid=submission.uuid, size=submission.size, - filename=submission.filename, download_url=submission.download_url) - session.add(ns) - logger.debug('Added new submission {}'.format(submission.uuid)) + local_submissions_by_uuid = {s.uuid: s for s in local_submissions} + remote_submissions_by_source: Dict[str, list] = collections.defaultdict(list) + for s in remote_submissions: + remote_submissions_by_source[s.source_uuid].append(s) + + for source_uuid, submissions in remote_submissions_by_source.items(): + source = session.query(Source).filter_by(uuid=source_uuid).first() + for submission in submissions: + local_submission = local_submissions_by_uuid.get(submission.uuid) + if local_submission: + lazy_setattr(local_submission, "size", submission.size) + lazy_setattr(local_submission, "is_read", submission.is_read) + lazy_setattr(local_submission, "download_url", submission.download_url) + + # Removing the UUID from local_uuids ensures this record won't be + # deleted at the end of this function. + del local_submissions_by_uuid[submission.uuid] + logger.debug(f"Updated {model.__name__} {submission.uuid}") + else: + # A new submission to be added to the database. + if source: + ns = model(source_id=source.id, uuid=submission.uuid, size=submission.size, + filename=submission.filename, download_url=submission.download_url) + session.add(ns) # The uuids remaining in local_uuids do not exist on the remote server, so # delete the related records. - for deleted_submission in [s for s in local_submissions - if s.uuid in local_uuids]: + for deleted_submission in local_submissions_by_uuid.values(): delete_single_submission_or_reply_on_disk(deleted_submission, data_dir) session.delete(deleted_submission) - logger.debug('Deleted submission {}'.format(deleted_submission.uuid)) + logger.debug(f"Deleted {model.__name__} {deleted_submission.uuid}") session.commit() @@ -272,55 +273,61 @@ def update_replies(remote_replies: List[SDKReply], local_replies: List[Reply], If a reply references a new journalist username, add them to the database as a new user. """ - local_uuids = {reply.uuid for reply in local_replies} - for reply in remote_replies: - if reply.uuid in local_uuids: - local_reply = [r for r in local_replies if r.uuid == reply.uuid][0] - - user = find_or_create_user(reply.journalist_uuid, reply.journalist_username, session) - lazy_setattr(local_reply, "journalist_id", user.id) - lazy_setattr(local_reply, "size", reply.size) - lazy_setattr(local_reply, "filename", reply.filename) - - local_uuids.remove(reply.uuid) - logger.debug('Updated reply {}'.format(reply.uuid)) - else: - # A new reply to be added to the database. - source_uuid = reply.source_uuid - source = session.query(Source).filter_by(uuid=source_uuid)[0] - user = find_or_create_user( - reply.journalist_uuid, - reply.journalist_username, - session) - - nr = Reply(uuid=reply.uuid, - journalist_id=user.id, - source_id=source.id, - filename=reply.filename, - size=reply.size) - session.add(nr) - - # All replies fetched from the server have succeeded in being sent, - # so we should delete the corresponding draft locally if it exists. - try: - draft_reply_db_object = session.query(DraftReply).filter_by( - uuid=reply.uuid).one() - - update_draft_replies(session, draft_reply_db_object.source.id, - draft_reply_db_object.timestamp, - draft_reply_db_object.file_counter, - nr.file_counter) - session.delete(draft_reply_db_object) - - except NoResultFound: - pass # No draft locally stored corresponding to this reply. - - logger.debug('Added new reply {}'.format(reply.uuid)) + local_replies_by_uuid = {r.uuid: r for r in local_replies} + + remote_replies_by_source: Dict[str, list] = collections.defaultdict(list) + for r in remote_replies: + remote_replies_by_source[r.source_uuid].append(r) + + users: Dict[str, User] = {} + for source_uuid, replies in remote_replies_by_source.items(): + source = session.query(Source).filter_by(uuid=source_uuid).first() + for reply in replies: + user = users.get(reply.journalist_uuid) + if not user: + logger.debug(f"user {reply.journalist_uuid} not cached") + user = find_or_create_user( + reply.journalist_uuid, reply.journalist_username, session + ) + users[reply.journalist_uuid] = user + + local_reply = local_replies_by_uuid.get(reply.uuid) + if local_reply: + lazy_setattr(local_reply, "journalist_id", user.id) + lazy_setattr(local_reply, "size", reply.size) + lazy_setattr(local_reply, "filename", reply.filename) + + del local_replies_by_uuid[reply.uuid] + logger.debug('Updated reply {}'.format(reply.uuid)) + else: + # A new reply to be added to the database. + nr = Reply(uuid=reply.uuid, + journalist_id=user.id, + source_id=source.id, + filename=reply.filename, + size=reply.size) + session.add(nr) + + # All replies fetched from the server have succeeded in being sent, + # so we should delete the corresponding draft locally if it exists. + try: + draft_reply_db_object = session.query(DraftReply).filter_by( + uuid=reply.uuid).one() + + update_draft_replies(session, draft_reply_db_object.source.id, + draft_reply_db_object.timestamp, + draft_reply_db_object.file_counter, + nr.file_counter, commit=False) + session.delete(draft_reply_db_object) + + except NoResultFound: + pass # No draft locally stored corresponding to this reply. + + logger.debug('Added new reply {}'.format(reply.uuid)) # The uuids remaining in local_uuids do not exist on the remote server, so # delete the related records. - replies_to_delete = [r for r in local_replies if r.uuid in local_uuids] - for deleted_reply in replies_to_delete: + for deleted_reply in local_replies_by_uuid.values(): delete_single_submission_or_reply_on_disk(deleted_reply, data_dir) session.delete(deleted_reply) logger.debug('Deleted reply {}'.format(deleted_reply.uuid))