Skip to content

Commit

Permalink
feat(backup): Support import chunking
Browse files Browse the repository at this point in the history
With this feature in place, we now atomically record which models we
imported in a given `import_by_model` call. This will be useful in the
short term for implementing the post-processing import step, and in the
long term to support rollbacks and partial import recovery.

Issue: getsentry/team-ospo#203
Issue: getsentry/team-ospo#213
  • Loading branch information
azaslavsky committed Nov 14, 2023
1 parent 2a0decf commit 9a74955
Show file tree
Hide file tree
Showing 9 changed files with 773 additions and 92 deletions.
33 changes: 27 additions & 6 deletions src/sentry/backup/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

class NormalizedModelName:
"""
A wrapper type that ensures that the contained model name has been properly normalized. A "normalized" model name is one that is identical to the name as it appears in an exported JSON backup, so a string of the form `{app_label.lower()}.{model_name.lower()}`.
A wrapper type that ensures that the contained model name has been properly normalized. A
"normalized" model name is one that is identical to the name as it appears in an exported JSON
backup, so a string of the form `{app_label.lower()}.{model_name.lower()}`.
"""

__model_name: str
Expand Down Expand Up @@ -252,6 +254,13 @@ class PrimaryKeyMap:
def __init__(self):
self.mapping = defaultdict(dict)

def __len__(self):
count = 0
for model_name_str, mappings in self.mapping.items():
count += len(mappings)

return count

def get_pk(self, model_name: NormalizedModelName, old: int) -> Optional[int]:
"""
Get the new, post-mapping primary key from an old primary key.
Expand All @@ -276,7 +285,8 @@ def get_pks(self, model_name: NormalizedModelName) -> set[int]:

def get_kind(self, model_name: NormalizedModelName, old: int) -> Optional[ImportKind]:
"""
Is the mapped entry a newly inserted model, or an already existing one that has been merged in?
Is the mapped entry a newly inserted model, or an already existing one that has been merged
in?
"""

pk_map = self.mapping.get(str(model_name))
Expand Down Expand Up @@ -313,7 +323,8 @@ def insert(
slug: str | None = None,
) -> None:
"""
Create a new OLD_PK -> NEW_PK mapping for the given model. Models that contain unique slugs (organizations, projects, etc) can optionally store that information as well.
Create a new OLD_PK -> NEW_PK mapping for the given model. Models that contain unique slugs
(organizations, projects, etc) can optionally store that information as well.
"""

self.mapping[str(model_name)][old] = (new, kind, slug)
Expand All @@ -327,18 +338,25 @@ def extend(self, other: PrimaryKeyMap) -> None:
for old_pk, new_entry in mappings.items():
self.mapping[model_name_str][old_pk] = new_entry

def partition(self, model_names: set[NormalizedModelName]) -> PrimaryKeyMap:
def partition(
self, model_names: set[NormalizedModelName], kinds: set[ImportKind] | None = None
) -> PrimaryKeyMap:
"""
Create a new map with only the specified model kinds retained.
Create a new map with only the specified models and kinds retained.
"""

building = PrimaryKeyMap()
import_kinds = {k for k in ImportKind} if kinds is None else kinds
for model_name_str, mappings in self.mapping.items():
model_name = NormalizedModelName(model_name_str)
if model_name not in model_names:
continue

for old_pk, new_entry in mappings.items():
(_, import_kind, _) = new_entry
if import_kind not in import_kinds:
continue

building.mapping[model_name_str][old_pk] = new_entry

return building
Expand All @@ -347,7 +365,10 @@ def partition(self, model_names: set[NormalizedModelName]) -> PrimaryKeyMap:
# No arguments, so we lazily cache the result after the first calculation.
@lru_cache(maxsize=1)
def dependencies() -> dict[NormalizedModelName, ModelRelations]:
"""Produce a dictionary mapping model type definitions to a `ModelDeps` describing their dependencies."""
"""
Produce a dictionary mapping model type definitions to a `ModelDeps` describing their
dependencies.
"""

from django.apps import apps

Expand Down
6 changes: 6 additions & 0 deletions src/sentry/backup/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,9 @@ class ImportFlags(NamedTuple):
# `key`) or `Relay` (as identified by its unique `relay_id`) already exists, should we overwrite
# it with the new value, or keep the existing one and discard the incoming value instead?
overwrite_configs: bool = False

# A UUID with which to identify this import's `*ImportChunk` database entries. Useful for
# passing the calling `Relocation` model's UUID to all of the imports it triggered. If this flag
# is not provided, the import was called in a non-relocation context, like from the `sentry
# import` CLI command.
import_uuid: str | None = None
62 changes: 56 additions & 6 deletions src/sentry/backup/imports.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
from __future__ import annotations

from typing import BinaryIO, Iterator, Optional, Tuple, Type
from uuid import uuid4

import click
from django.core import serializers
from django.db import transaction
from django.db.models.base import Model

from sentry.backup.dependencies import (
ImportKind,
NormalizedModelName,
PrimaryKeyMap,
dependencies,
get_model_name,
)
from sentry.backup.helpers import Decryptor, Filter, ImportFlags, decrypt_encrypted_tarball
from sentry.backup.scopes import ImportScope
from sentry.models.importchunk import ControlImportChunkReplica
from sentry.models.orgauthtoken import OrgAuthToken
from sentry.services.hybrid_cloud.import_export.model import (
RpcFilter,
Expand Down Expand Up @@ -73,6 +76,10 @@ def _import(
raise RuntimeError(errText)

flags = flags if flags is not None else ImportFlags()
if flags.import_uuid is None:
flags = flags._replace(import_uuid=uuid4().hex)

deps = dependencies()
user_model_name = get_model_name(User)
org_auth_token_model_name = get_model_name(OrgAuthToken)
org_member_model_name = get_model_name(OrganizationMember)
Expand Down Expand Up @@ -107,8 +114,8 @@ def _import(
if filter_by is not None:
filters.append(filter_by)

# `sentry.Email` models don't have any explicit dependencies on `User`, so we need to find
# and record them manually.
# `sentry.Email` models don't have any explicit dependencies on `sentry.User`, so we need to
# find and record them manually.
user_to_email = dict()

if filter_by.model == Organization:
Expand Down Expand Up @@ -199,14 +206,17 @@ def yield_json_models(content) -> Iterator[Tuple[NormalizedModelName, str]]:
def do_write(
pk_map: PrimaryKeyMap, model_name: NormalizedModelName, json_data: json.JSONData
) -> None:
model_relations = dependencies().get(model_name)
nonlocal scope, flags, filters, deps

model_relations = deps.get(model_name)
if not model_relations:
return

dep_models = {get_model_name(d) for d in model_relations.get_dependencies_for_relocation()}
import_by_model = ImportExportService.get_importer_for_model(model_relations.model)
model_name_str = str(model_name)
result = import_by_model(
model_name=str(model_name),
model_name=model_name_str,
scope=RpcImportScope.into_rpc(scope),
flags=RpcImportFlags.into_rpc(flags),
filter_by=[RpcFilter.into_rpc(f) for f in filters],
Expand All @@ -220,15 +230,55 @@ def do_write(
warningText = ">> Are you restoring from a backup of the same version of Sentry?\n>> Are you restoring onto a clean database?\n>> If so then this IntegrityError might be our fault, you can open an issue here:\n>> https://github.com/getsentry/sentry/issues/new/choose"
printer(warningText, err=True)
raise ImportingError(result)
pk_map.extend(result.mapped_pks)

out_pk_map: PrimaryKeyMap = result.mapped_pks.from_rpc()
pk_map.extend(out_pk_map)

# If the model we just imported lives in the control silo, that means the import took place
# over RPC. To ensure that we have an accurate view of the import result in both sides of
# the RPC divide, we create a replica of the `ControlImportChunk` that successful import
# would have generated in the calling region as well.
if result.min_ordinal is not None and SiloMode.CONTROL in deps[model_name].silos:
# If `min_ordinal` is not null, these values must not be either.
assert result.max_ordinal is not None
assert result.min_source_pk is not None
assert result.max_source_pk is not None

inserted = out_pk_map.partition({model_name}, {ImportKind.Inserted}).mapping[
model_name_str
]
existing = out_pk_map.partition({model_name}, {ImportKind.Existing}).mapping[
model_name_str
]
overwrite = out_pk_map.partition({model_name}, {ImportKind.Overwrite}).mapping[
model_name_str
]
control_import_chunk_replica = ControlImportChunkReplica(
import_uuid=flags.import_uuid,
model=model_name_str,
# TODO(getsentry/team-ospo#190): The next two fields assume the entire model is
# being imported in a single call; we may change this in the future.
min_ordinal=result.min_ordinal,
max_ordinal=result.max_ordinal,
min_source_pk=result.min_source_pk,
max_source_pk=result.max_source_pk,
min_inserted_pk=result.min_inserted_pk,
max_inserted_pk=result.max_inserted_pk,
inserted_map={k: v[0] for k, v in inserted.items()},
existing_map={k: v[0] for k, v in existing.items()},
overwrite_map={k: v[0] for k, v in overwrite.items()},
inserted_identifiers={k: v[2] for k, v in inserted.items() if v[2] is not None},
)
control_import_chunk_replica.save()

# Extract some write logic into its own internal function, so that we may call it irrespective
# of how we do atomicity: on a per-model (if using multiple dbs) or global (if using a single
# db) basis.
def do_writes(pk_map: PrimaryKeyMap) -> None:
nonlocal deferred_org_auth_tokens

for model_name, json_data in yield_json_models(content):
if model_name == org_auth_token_model_name:
nonlocal deferred_org_auth_tokens
deferred_org_auth_tokens = json_data
continue

Expand Down
Loading

0 comments on commit 9a74955

Please sign in to comment.