Skip to content

Commit

Permalink
ref(backup): Use burst task runner for better tests (#59926)
Browse files Browse the repository at this point in the history
Instead of using the default task runner, which runs celery tasks at
their call sites and causes all sorts of shenanigans with nested
transactions, we switch to the "burst" task runner. This allows us to
add "max" retry test cases as well, and helped catch a couple of sneaky
bugs.

Closes getsentry/team-ospo#169
Closes getsentry/team-ospo#203
  • Loading branch information
azaslavsky authored Nov 16, 2023
1 parent c50a11a commit 9c867e1
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 92 deletions.
108 changes: 60 additions & 48 deletions src/sentry/services/hybrid_cloud/import_export/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,55 +263,67 @@ def import_by_model(
reason=str(e),
)

# If we wrote at least one model, make sure to write an appropriate `ImportChunk`
# If the `counter` is at 0, no model instances were actually imported, so we can
# return early.
if counter == 0:
return RpcImportOk(
mapped_pks=RpcPrimaryKeyMap.into_rpc(out_pk_map),
min_ordinal=None,
max_ordinal=None,
min_source_pk=None,
max_source_pk=None,
min_inserted_pk=None,
max_inserted_pk=None,
)

# We wrote at least one model, so make sure to write an appropriate `ImportChunk`
# and update the sequences too.
if counter > 0:
table = model_instance._meta.db_table
seq = f"{table}_id_seq"
with connections[using].cursor() as cursor:
cursor.execute(f"SELECT setval(%s, (SELECT MAX(id) FROM {table}))", [seq])

inserted = out_pk_map.partition(
{batch_model_name}, {ImportKind.Inserted}
).mapping[model_name]
existing = out_pk_map.partition(
{batch_model_name}, {ImportKind.Existing}
).mapping[model_name]
overwrite = out_pk_map.partition(
{batch_model_name}, {ImportKind.Overwrite}
).mapping[model_name]
import_chunk_args = {
"import_uuid": flags.import_uuid,
"model": model_name,
# TODO(getsentry/team-ospo#190): The next two fields assume the entire model
# is being imported in a single call; we may change this in the future.
"min_ordinal": 1,
"max_ordinal": counter,
"min_source_pk": min_old_pk,
"max_source_pk": max_old_pk,
"min_inserted_pk": min_inserted_pk,
"max_inserted_pk": max_inserted_pk,
"inserted_map": {k: v[0] for k, v in inserted.items()},
"existing_map": {k: v[0] for k, v in existing.items()},
"overwrite_map": {k: v[0] for k, v in overwrite.items()},
"inserted_identifiers": {
k: v[2] for k, v in inserted.items() if v[2] is not None
},
}
if import_chunk_type == ControlImportChunk:
ControlImportChunk(**import_chunk_args).save()
else:
RegionImportChunk(**import_chunk_args).save()

return RpcImportOk(
mapped_pks=RpcPrimaryKeyMap.into_rpc(out_pk_map),
min_ordinal=1,
max_ordinal=counter,
min_source_pk=min_old_pk,
max_source_pk=max_old_pk,
min_inserted_pk=min_inserted_pk,
max_inserted_pk=max_inserted_pk,
)
table = model_instance._meta.db_table
seq = f"{table}_id_seq"
with connections[using].cursor() as cursor:
cursor.execute(f"SELECT setval(%s, (SELECT MAX(id) FROM {table}))", [seq])

inserted = out_pk_map.partition({batch_model_name}, {ImportKind.Inserted}).mapping[
model_name
]
existing = out_pk_map.partition({batch_model_name}, {ImportKind.Existing}).mapping[
model_name
]
overwrite = out_pk_map.partition(
{batch_model_name}, {ImportKind.Overwrite}
).mapping[model_name]
import_chunk_args = {
"import_uuid": flags.import_uuid,
"model": model_name,
# TODO(getsentry/team-ospo#190): The next two fields assume the entire model is
# being imported in a single call; we may change this in the future.
"min_ordinal": 1,
"max_ordinal": counter,
"min_source_pk": min_old_pk,
"max_source_pk": max_old_pk,
"min_inserted_pk": min_inserted_pk,
"max_inserted_pk": max_inserted_pk,
"inserted_map": {k: v[0] for k, v in inserted.items()},
"existing_map": {k: v[0] for k, v in existing.items()},
"overwrite_map": {k: v[0] for k, v in overwrite.items()},
"inserted_identifiers": {
k: v[2] for k, v in inserted.items() if v[2] is not None
},
}
if import_chunk_type == ControlImportChunk:
ControlImportChunk(**import_chunk_args).save()
else:
RegionImportChunk(**import_chunk_args).save()

return RpcImportOk(
mapped_pks=RpcPrimaryKeyMap.into_rpc(out_pk_map),
min_ordinal=1,
max_ordinal=counter,
min_source_pk=min_old_pk,
max_source_pk=max_old_pk,
min_inserted_pk=min_inserted_pk,
max_inserted_pk=max_inserted_pk,
)

except DeserializationError:
return RpcImportError(
Expand Down
13 changes: 6 additions & 7 deletions src/sentry/tasks/relocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,18 @@ def preprocessing_scan(uuid: str) -> None:

# Decrypt the DEK using Google KMS, and use the decrypted DEK to decrypt the encoded
# JSON.
try:
with retry_task_or_fail_relocation(
relocation,
OrderedTask.PREPROCESSING_SCAN,
attempts_left,
ERR_PREPROCESSING_DECRYPTION,
):
decryptor = GCPKMSDecryptor.from_bytes(
json.dumps(get_default_crypto_key_version()).encode("utf-8")
)
plaintext_data_encryption_key = decryptor.decrypt_data_encryption_key(unwrapped)
fernet = Fernet(plaintext_data_encryption_key)
json_data = fernet.decrypt(unwrapped.encrypted_json_blob).decode("utf-8")
except Exception:
return fail_relocation(
relocation,
OrderedTask.PREPROCESSING_SCAN,
ERR_PREPROCESSING_DECRYPTION,
)

# Grab usernames and org slugs from the JSON data.
usernames = []
Expand Down
15 changes: 14 additions & 1 deletion src/sentry/testutils/helpers/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ def TaskRunner():
settings.CELERY_ALWAYS_EAGER = prev


class BustTaskRunnerRetryError(Exception):
"""
An exception that mocks can throw, which will bubble to tasks run by the `BurstTaskRunner` and
cause them to be re-queued, rather than failed immediately. Useful for simulating the
`@instrument_task` decorator's retry semantics.
"""

pass


@contextmanager
def BurstTaskRunner():
"""
Expand All @@ -40,7 +50,10 @@ def work(max_jobs=None):
self, args, kwargs = job_queue.pop(0)

with patch("celery.app.task.Task.apply_async", apply_async):
self(*args, **kwargs)
try:
self(*args, **kwargs)
except BustTaskRunnerRetryError:
job_queue.append((self, args, kwargs))

jobs += 1

Expand Down
5 changes: 5 additions & 0 deletions src/sentry/utils/relocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ def fail_relocation(relocation: Relocation, task: OrderedTask, reason: str = "")
instead.
"""

# Another nested exception handler could have already failed this relocation - in this case, do
# nothing.
if relocation.status == Relocation.Status.FAILURE.value:
return

if reason:
relocation.failure_reason = reason

Expand Down
Loading

0 comments on commit 9c867e1

Please sign in to comment.