Skip to content

Commit

Permalink
feat(backup): Support import chunking
Browse files Browse the repository at this point in the history
With this feature in place, we now atomically record which models we
imported in a given `import_by_model` call. This will be useful in the
short term for implementing the post-processing import step, and in the
long term to support rollbacks and partial import recovery.

Issue: getsentry/team-ospo#203
Issue: getsentry/team-ospo#213
  • Loading branch information
azaslavsky committed Nov 16, 2023
1 parent 8824524 commit cda57c5
Show file tree
Hide file tree
Showing 9 changed files with 766 additions and 92 deletions.
26 changes: 20 additions & 6 deletions src/sentry/backup/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

class NormalizedModelName:
"""
A wrapper type that ensures that the contained model name has been properly normalized. A "normalized" model name is one that is identical to the name as it appears in an exported JSON backup, so a string of the form `{app_label.lower()}.{model_name.lower()}`.
A wrapper type that ensures that the contained model name has been properly normalized. A
"normalized" model name is one that is identical to the name as it appears in an exported JSON
backup, so a string of the form `{app_label.lower()}.{model_name.lower()}`.
"""

__model_name: str
Expand Down Expand Up @@ -276,7 +278,8 @@ def get_pks(self, model_name: NormalizedModelName) -> set[int]:

def get_kind(self, model_name: NormalizedModelName, old: int) -> Optional[ImportKind]:
"""
Is the mapped entry a newly inserted model, or an already existing one that has been merged in?
Is the mapped entry a newly inserted model, or an already existing one that has been merged
in?
"""

pk_map = self.mapping.get(str(model_name))
Expand Down Expand Up @@ -313,7 +316,8 @@ def insert(
slug: str | None = None,
) -> None:
"""
Create a new OLD_PK -> NEW_PK mapping for the given model. Models that contain unique slugs (organizations, projects, etc) can optionally store that information as well.
Create a new OLD_PK -> NEW_PK mapping for the given model. Models that contain unique slugs
(organizations, projects, etc) can optionally store that information as well.
"""

self.mapping[str(model_name)][old] = (new, kind, slug)
Expand All @@ -327,18 +331,25 @@ def extend(self, other: PrimaryKeyMap) -> None:
for old_pk, new_entry in mappings.items():
self.mapping[model_name_str][old_pk] = new_entry

def partition(self, model_names: set[NormalizedModelName]) -> PrimaryKeyMap:
def partition(
self, model_names: set[NormalizedModelName], kinds: set[ImportKind] | None = None
) -> PrimaryKeyMap:
"""
Create a new map with only the specified model kinds retained.
Create a new map with only the specified models and kinds retained.
"""

building = PrimaryKeyMap()
import_kinds = {k for k in ImportKind} if kinds is None else kinds
for model_name_str, mappings in self.mapping.items():
model_name = NormalizedModelName(model_name_str)
if model_name not in model_names:
continue

for old_pk, new_entry in mappings.items():
(_, import_kind, _) = new_entry
if import_kind not in import_kinds:
continue

building.mapping[model_name_str][old_pk] = new_entry

return building
Expand All @@ -347,7 +358,10 @@ def partition(self, model_names: set[NormalizedModelName]) -> PrimaryKeyMap:
# No arguments, so we lazily cache the result after the first calculation.
@lru_cache(maxsize=1)
def dependencies() -> dict[NormalizedModelName, ModelRelations]:
"""Produce a dictionary mapping model type definitions to a `ModelDeps` describing their dependencies."""
"""
Produce a dictionary mapping model type definitions to a `ModelDeps` describing their
dependencies.
"""

from django.apps import apps

Expand Down
6 changes: 6 additions & 0 deletions src/sentry/backup/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,9 @@ class ImportFlags(NamedTuple):
# `key`) or `Relay` (as identified by its unique `relay_id`) already exists, should we overwrite
# it with the new value, or keep the existing one and discard the incoming value instead?
overwrite_configs: bool = False

# A UUID with which to identify this import's `*ImportChunk` database entries. Useful for
# passing the calling `Relocation` model's UUID to all of the imports it triggered. If this flag
# is not provided, the import was called in a non-relocation context, like from the `sentry
# import` CLI command.
import_uuid: str | None = None
62 changes: 56 additions & 6 deletions src/sentry/backup/imports.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
from __future__ import annotations

from typing import BinaryIO, Iterator, Optional, Tuple, Type
from uuid import uuid4

import click
from django.core import serializers
from django.db import transaction
from django.db.models.base import Model

from sentry.backup.dependencies import (
ImportKind,
NormalizedModelName,
PrimaryKeyMap,
dependencies,
get_model_name,
)
from sentry.backup.helpers import Decryptor, Filter, ImportFlags, decrypt_encrypted_tarball
from sentry.backup.scopes import ImportScope
from sentry.models.importchunk import ControlImportChunkReplica
from sentry.models.orgauthtoken import OrgAuthToken
from sentry.services.hybrid_cloud.import_export.model import (
RpcFilter,
Expand Down Expand Up @@ -73,6 +76,10 @@ def _import(
raise RuntimeError(errText)

flags = flags if flags is not None else ImportFlags()
if flags.import_uuid is None:
flags = flags._replace(import_uuid=uuid4().hex)

deps = dependencies()
user_model_name = get_model_name(User)
org_auth_token_model_name = get_model_name(OrgAuthToken)
org_member_model_name = get_model_name(OrganizationMember)
Expand Down Expand Up @@ -107,8 +114,8 @@ def _import(
if filter_by is not None:
filters.append(filter_by)

# `sentry.Email` models don't have any explicit dependencies on `User`, so we need to find
# and record them manually.
# `sentry.Email` models don't have any explicit dependencies on `sentry.User`, so we need to
# find and record them manually.
user_to_email = dict()

if filter_by.model == Organization:
Expand Down Expand Up @@ -199,14 +206,17 @@ def yield_json_models(content) -> Iterator[Tuple[NormalizedModelName, str]]:
def do_write(
pk_map: PrimaryKeyMap, model_name: NormalizedModelName, json_data: json.JSONData
) -> None:
model_relations = dependencies().get(model_name)
nonlocal scope, flags, filters, deps

model_relations = deps.get(model_name)
if not model_relations:
return

dep_models = {get_model_name(d) for d in model_relations.get_dependencies_for_relocation()}
import_by_model = ImportExportService.get_importer_for_model(model_relations.model)
model_name_str = str(model_name)
result = import_by_model(
model_name=str(model_name),
model_name=model_name_str,
scope=RpcImportScope.into_rpc(scope),
flags=RpcImportFlags.into_rpc(flags),
filter_by=[RpcFilter.into_rpc(f) for f in filters],
Expand All @@ -220,15 +230,55 @@ def do_write(
warningText = ">> Are you restoring from a backup of the same version of Sentry?\n>> Are you restoring onto a clean database?\n>> If so then this IntegrityError might be our fault, you can open an issue here:\n>> https://github.com/getsentry/sentry/issues/new/choose"
printer(warningText, err=True)
raise ImportingError(result)
pk_map.extend(result.mapped_pks)

out_pk_map: PrimaryKeyMap = result.mapped_pks.from_rpc()
pk_map.extend(out_pk_map)

# If the model we just imported lives in the control silo, that means the import took place
# over RPC. To ensure that we have an accurate view of the import result in both sides of
# the RPC divide, we create a replica of the `ControlImportChunk` that successful import
# would have generated in the calling region as well.
if result.min_ordinal is not None and SiloMode.CONTROL in deps[model_name].silos:
# If `min_ordinal` is not null, these values must not be either.
assert result.max_ordinal is not None
assert result.min_source_pk is not None
assert result.max_source_pk is not None

inserted = out_pk_map.partition({model_name}, {ImportKind.Inserted}).mapping[
model_name_str
]
existing = out_pk_map.partition({model_name}, {ImportKind.Existing}).mapping[
model_name_str
]
overwrite = out_pk_map.partition({model_name}, {ImportKind.Overwrite}).mapping[
model_name_str
]
control_import_chunk_replica = ControlImportChunkReplica(
import_uuid=flags.import_uuid,
model=model_name_str,
# TODO(getsentry/team-ospo#190): The next two fields assume the entire model is
# being imported in a single call; we may change this in the future.
min_ordinal=result.min_ordinal,
max_ordinal=result.max_ordinal,
min_source_pk=result.min_source_pk,
max_source_pk=result.max_source_pk,
min_inserted_pk=result.min_inserted_pk,
max_inserted_pk=result.max_inserted_pk,
inserted_map={k: v[0] for k, v in inserted.items()},
existing_map={k: v[0] for k, v in existing.items()},
overwrite_map={k: v[0] for k, v in overwrite.items()},
inserted_identifiers={k: v[2] for k, v in inserted.items() if v[2] is not None},
)
control_import_chunk_replica.save()

# Extract some write logic into its own internal function, so that we may call it irrespective
# of how we do atomicity: on a per-model (if using multiple dbs) or global (if using a single
# db) basis.
def do_writes(pk_map: PrimaryKeyMap) -> None:
nonlocal deferred_org_auth_tokens

for model_name, json_data in yield_json_models(content):
if model_name == org_auth_token_model_name:
nonlocal deferred_org_auth_tokens
deferred_org_auth_tokens = json_data
continue

Expand Down
123 changes: 110 additions & 13 deletions src/sentry/services/hybrid_cloud/import_export/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from django.core.serializers.base import DeserializationError
from django.db import DatabaseError, IntegrityError, connections, router, transaction
from django.db.models import Q
from django.forms import model_to_dict
from rest_framework.serializers import ValidationError as DjangoRestFrameworkValidationError

from sentry.backup.dependencies import (
Expand All @@ -24,6 +25,7 @@
from sentry.backup.findings import InstanceID
from sentry.backup.helpers import EXCLUDED_APPS, DatetimeSafeDjangoJSONEncoder, Filter
from sentry.backup.scopes import ExportScope
from sentry.models.importchunk import ControlImportChunk, RegionImportChunk
from sentry.models.user import User
from sentry.models.userpermission import UserPermission
from sentry.models.userrole import UserRoleUser
Expand Down Expand Up @@ -71,7 +73,7 @@ def import_by_model(
pk_map: RpcPrimaryKeyMap,
json_data: str,
) -> RpcImportResult:
import_flags = flags.from_rpc()
deps = dependencies()
batch_model_name = NormalizedModelName(model_name)
model = get_model(batch_model_name)
if model is None:
Expand All @@ -97,6 +99,14 @@ def import_by_model(
reason="The RPC was called incorrectly, please set an `ImportScope` parameter",
)

import_flags = flags.from_rpc()
if import_flags.import_uuid is None:
return RpcImportError(
kind=RpcImportErrorKind.MissingImportUUID,
on=InstanceID(model_name),
reason="Must specify `import_uuid` when importing",
)

import_scope = scope.from_rpc()
in_pk_map = pk_map.from_rpc()
filters: List[Filter] = []
Expand All @@ -107,12 +117,52 @@ def import_by_model(
try:
using = router.db_for_write(model)
with transaction.atomic(using=using):
# It's possible that this write has already occurred, and we are simply retrying
# because the response got lost in transit. If so, just re-use that reply. We do
# this in the transaction because, while `import_by_model` is generally called in a
# sequential manner, cases like timeouts or long queues may cause a previous call to
# still be active when the next one is made. Doing this check inside the transaction
# lock ensures that the data is globally accurate and thwarts data races.
found_chunk = (
(
ControlImportChunk
if SiloMode.CONTROL in deps[batch_model_name].silos
else RegionImportChunk
)
.objects.filter(import_uuid=flags.import_uuid, model=model_name)
.first()
)
if found_chunk is not None:
found_data = model_to_dict(found_chunk)
out_pk_map = PrimaryKeyMap()
for old_pk, new_pk in found_data["inserted_map"].items():
identifier = found_data["inserted_identifiers"].get(new_pk, None)
out_pk_map.insert(
batch_model_name, old_pk, new_pk, ImportKind.Inserted, identifier
)
for old_pk, new_pk in found_data["existing_map"].items():
out_pk_map.insert(batch_model_name, old_pk, new_pk, ImportKind.Existing)
for old_pk, new_pk in found_data["overwrite_map"].items():
out_pk_map.insert(batch_model_name, old_pk, new_pk, ImportKind.Overwrite)

return RpcImportOk(
mapped_pks=RpcPrimaryKeyMap.into_rpc(out_pk_map),
min_ordinal=found_data["min_ordinal"],
max_ordinal=found_data["max_ordinal"],
min_source_pk=found_data["min_source_pk"],
max_source_pk=found_data["max_source_pk"],
min_inserted_pk=found_data["min_inserted_pk"],
max_inserted_pk=found_data["max_inserted_pk"],
)

ok_relocation_scopes = import_scope.value
out_pk_map = PrimaryKeyMap()
max_pk = 0
min_old_pk = 0
max_old_pk = 0
min_inserted_pk: Optional[int] = None
max_inserted_pk: Optional[int] = None
counter = 0
for deserialized_object in deserialize("json", json_data, use_natural_keys=False):
counter += 1
model_instance = deserialized_object.object
if model_instance._meta.app_label not in EXCLUDED_APPS or model_instance:
if model_instance.get_possible_relocation_scopes() & ok_relocation_scopes:
Expand Down Expand Up @@ -159,6 +209,7 @@ def import_by_model(

# For models that may have circular references to themselves
# (unlikely), keep track of the new pk in the input map as well.
counter += 1
new_pk, import_kind = written
slug = getattr(model_instance, "slug", None)
in_pk_map.insert(
Expand All @@ -167,8 +218,17 @@ def import_by_model(
out_pk_map.insert(
inst_model_name, old_pk, new_pk, import_kind, slug
)
if new_pk > max_pk:
max_pk = new_pk

# Do a little bit of book-keeping for our future `ImportChunk`.
if min_old_pk == 0:
min_old_pk = old_pk
if old_pk > max_old_pk:
max_old_pk = old_pk
if import_kind == ImportKind.Inserted:
if min_inserted_pk is None:
min_inserted_pk = new_pk
if max_inserted_pk is None or new_pk > max_inserted_pk:
max_inserted_pk = new_pk

except DjangoValidationError as e:
errs = {field: error for field, error in e.message_dict.items()}
Expand All @@ -187,17 +247,54 @@ def import_by_model(
reason=str(e),
)

# If we wrote at least one model, make sure to update the sequences too.
if counter > 0:
table = model_instance._meta.db_table
seq = f"{table}_id_seq"
with connections[using].cursor() as cursor:
cursor.execute(f"SELECT setval(%s, (SELECT MAX(id) FROM {table}))", [seq])
# If we wrote at least one model, make sure to write an appropriate `ImportChunk`
# and update the sequences too.
if counter > 0:
table = model_instance._meta.db_table
seq = f"{table}_id_seq"
with connections[using].cursor() as cursor:
cursor.execute(f"SELECT setval(%s, (SELECT MAX(id) FROM {table}))", [seq])

inserted = out_pk_map.partition(
{batch_model_name}, {ImportKind.Inserted}
).mapping[model_name]
existing = out_pk_map.partition(
{batch_model_name}, {ImportKind.Existing}
).mapping[model_name]
overwrite = out_pk_map.partition(
{batch_model_name}, {ImportKind.Overwrite}
).mapping[model_name]
import_chunk_args = {
"import_uuid": flags.import_uuid,
"model": model_name,
# TODO(getsentry/team-ospo#190): The next two fields assume the entire model
# is being imported in a single call; we may change this in the future.
"min_ordinal": 1,
"max_ordinal": counter,
"min_source_pk": min_old_pk,
"max_source_pk": max_old_pk,
"min_inserted_pk": min_inserted_pk,
"max_inserted_pk": max_inserted_pk,
"inserted_map": {k: v[0] for k, v in inserted.items()},
"existing_map": {k: v[0] for k, v in existing.items()},
"overwrite_map": {k: v[0] for k, v in overwrite.items()},
"inserted_identifiers": {
k: v[2] for k, v in inserted.items() if v[2] is not None
},
}
if SiloMode.CONTROL in deps[batch_model_name].silos:
ControlImportChunk(**import_chunk_args).save()
else:
RegionImportChunk(**import_chunk_args).save()

return RpcImportOk(
mapped_pks=RpcPrimaryKeyMap.into_rpc(out_pk_map),
max_pk=max_pk,
num_imported=counter,
min_ordinal=1,
max_ordinal=counter,
min_source_pk=min_old_pk,
max_source_pk=max_old_pk,
min_inserted_pk=min_inserted_pk,
max_inserted_pk=max_inserted_pk,
)

except DeserializationError:
Expand Down
Loading

0 comments on commit cda57c5

Please sign in to comment.