Skip to content

Commit

Permalink
Minimize local object lookups during sync
Browse files Browse the repository at this point in the history
Reduce the number of database queries during sync: cache sources or
journalists instead of looking them up for each incoming object
associated with them.

Use maps instead of sets to hold local objects in the update_
functions, so it's faster to check if we already have a record of
incoming objects.
  • Loading branch information
rmol committed Mar 31, 2020
1 parent 7cb9a22 commit e13184d
Showing 1 changed file with 80 additions and 73 deletions.
153 changes: 80 additions & 73 deletions securedrop_client/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
import collections
from datetime import datetime
import logging
import os
import shutil
from pathlib import Path
from dateutil.parser import parse
from typing import Any, List, Tuple, Type, Union
from typing import Any, Dict, List, Tuple, Type, Union

from sqlalchemy import and_, desc, or_
from sqlalchemy.orm.exc import NoResultFound
Expand Down Expand Up @@ -226,37 +227,37 @@ def __update_submissions(model: Union[Type[File], Type[Message]],
* Local submissions not returned in the remote submissions are deleted
from the local database.
"""
local_uuids = {submission.uuid for submission in local_submissions}
for submission in remote_submissions:
if submission.uuid in local_uuids:
local_submission = [s for s in local_submissions
if s.uuid == submission.uuid][0]

lazy_setattr(local_submission, "size", submission.size)
lazy_setattr(local_submission, "is_read", submission.is_read)
lazy_setattr(local_submission, "download_url", submission.download_url)

# Removing the UUID from local_uuids ensures this record won't be
# deleted at the end of this function.
local_uuids.remove(submission.uuid)
logger.debug('Updated submission {}'.format(submission.uuid))
else:
# A new submission to be added to the database.
_, source_uuid = submission.source_url.rsplit('/', 1)
source = session.query(Source).filter_by(uuid=source_uuid).first()
if source:
ns = model(source_id=source.id, uuid=submission.uuid, size=submission.size,
filename=submission.filename, download_url=submission.download_url)
session.add(ns)
logger.debug('Added new submission {}'.format(submission.uuid))
local_submissions_by_uuid = {s.uuid: s for s in local_submissions}
remote_submissions_by_source: Dict[str, list] = collections.defaultdict(list)
for s in remote_submissions:
remote_submissions_by_source[s.source_uuid].append(s)

for source_uuid, submissions in remote_submissions_by_source.items():
source = session.query(Source).filter_by(uuid=source_uuid).first()
for submission in submissions:
local_submission = local_submissions_by_uuid.get(submission.uuid)
if local_submission:
lazy_setattr(local_submission, "size", submission.size)
lazy_setattr(local_submission, "is_read", submission.is_read)
lazy_setattr(local_submission, "download_url", submission.download_url)

# Removing the UUID from local_uuids ensures this record won't be
# deleted at the end of this function.
del local_submissions_by_uuid[submission.uuid]
logger.debug(f"Updated {model.__name__} {submission.uuid}")
else:
# A new submission to be added to the database.
if source:
ns = model(source_id=source.id, uuid=submission.uuid, size=submission.size,
filename=submission.filename, download_url=submission.download_url)
session.add(ns)

# The uuids remaining in local_uuids do not exist on the remote server, so
# delete the related records.
for deleted_submission in [s for s in local_submissions
if s.uuid in local_uuids]:
for deleted_submission in local_submissions_by_uuid.values():
delete_single_submission_or_reply_on_disk(deleted_submission, data_dir)
session.delete(deleted_submission)
logger.debug('Deleted submission {}'.format(deleted_submission.uuid))
logger.debug(f"Deleted {model.__name__} {deleted_submission.uuid}")

session.commit()

Expand All @@ -272,55 +273,61 @@ def update_replies(remote_replies: List[SDKReply], local_replies: List[Reply],
If a reply references a new journalist username, add them to the database
as a new user.
"""
local_uuids = {reply.uuid for reply in local_replies}
for reply in remote_replies:
if reply.uuid in local_uuids:
local_reply = [r for r in local_replies if r.uuid == reply.uuid][0]

user = find_or_create_user(reply.journalist_uuid, reply.journalist_username, session)
lazy_setattr(local_reply, "journalist_id", user.id)
lazy_setattr(local_reply, "size", reply.size)
lazy_setattr(local_reply, "filename", reply.filename)

local_uuids.remove(reply.uuid)
logger.debug('Updated reply {}'.format(reply.uuid))
else:
# A new reply to be added to the database.
source_uuid = reply.source_uuid
source = session.query(Source).filter_by(uuid=source_uuid)[0]
user = find_or_create_user(
reply.journalist_uuid,
reply.journalist_username,
session)

nr = Reply(uuid=reply.uuid,
journalist_id=user.id,
source_id=source.id,
filename=reply.filename,
size=reply.size)
session.add(nr)

# All replies fetched from the server have succeeded in being sent,
# so we should delete the corresponding draft locally if it exists.
try:
draft_reply_db_object = session.query(DraftReply).filter_by(
uuid=reply.uuid).one()

update_draft_replies(session, draft_reply_db_object.source.id,
draft_reply_db_object.timestamp,
draft_reply_db_object.file_counter,
nr.file_counter)
session.delete(draft_reply_db_object)

except NoResultFound:
pass # No draft locally stored corresponding to this reply.

logger.debug('Added new reply {}'.format(reply.uuid))
local_replies_by_uuid = {r.uuid: r for r in local_replies}

remote_replies_by_source: Dict[str, list] = collections.defaultdict(list)
for r in remote_replies:
remote_replies_by_source[r.source_uuid].append(r)

users: Dict[str, User] = {}
for source_uuid, replies in remote_replies_by_source.items():
source = session.query(Source).filter_by(uuid=source_uuid).first()
for reply in replies:
user = users.get(reply.journalist_uuid)
if not user:
logger.debug(f"user {reply.journalist_uuid} not cached")
user = find_or_create_user(
reply.journalist_uuid, reply.journalist_username, session
)
users[reply.journalist_uuid] = user

local_reply = local_replies_by_uuid.get(reply.uuid)
if local_reply:
lazy_setattr(local_reply, "journalist_id", user.id)
lazy_setattr(local_reply, "size", reply.size)
lazy_setattr(local_reply, "filename", reply.filename)

del local_replies_by_uuid[reply.uuid]
logger.debug('Updated reply {}'.format(reply.uuid))
else:
# A new reply to be added to the database.
nr = Reply(uuid=reply.uuid,
journalist_id=user.id,
source_id=source.id,
filename=reply.filename,
size=reply.size)
session.add(nr)

# All replies fetched from the server have succeeded in being sent,
# so we should delete the corresponding draft locally if it exists.
try:
draft_reply_db_object = session.query(DraftReply).filter_by(
uuid=reply.uuid).one()

update_draft_replies(session, draft_reply_db_object.source.id,
draft_reply_db_object.timestamp,
draft_reply_db_object.file_counter,
nr.file_counter, commit=False)
session.delete(draft_reply_db_object)

except NoResultFound:
pass # No draft locally stored corresponding to this reply.

logger.debug('Added new reply {}'.format(reply.uuid))

# The uuids remaining in local_uuids do not exist on the remote server, so
# delete the related records.
replies_to_delete = [r for r in local_replies if r.uuid in local_uuids]
for deleted_reply in replies_to_delete:
for deleted_reply in local_replies_by_uuid.values():
delete_single_submission_or_reply_on_disk(deleted_reply, data_dir)
session.delete(deleted_reply)
logger.debug('Deleted reply {}'.format(deleted_reply.uuid))
Expand Down

0 comments on commit e13184d

Please sign in to comment.