Skip to content

Commit

Permalink
test: delete topic and wait for removal in backup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Sep 18, 2024
1 parent 8fc21ad commit 60bf6af
Showing 1 changed file with 47 additions and 39 deletions.
86 changes: 47 additions & 39 deletions tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ def _raise(exception: Exception) -> NoReturn:
raise exception


def _delete_topic(admin_client: KafkaAdminClient, topic_name: str) -> None:
try:
admin_client.delete_topic(topic_name)
except UnknownTopicOrPartitionError:
logger.info("No previously existing topic.")
else:
logger.info("Deleted topic from previous run.")

start_time = time.monotonic()
while True:
topics = admin_client.cluster_metadata().get("topics", {})
if topic_name not in topics:
break
if time.monotonic() - start_time > 60:
raise TimeoutError(f"Topic {topic_name} still in cluster metadata topics {topics}")


def test_roundtrip_from_kafka_state(
tmp_path: Path,
new_topic: NewTopic,
Expand Down Expand Up @@ -130,6 +147,7 @@ def test_roundtrip_from_kafka_state(
"karapace_schema_backup",
"get",
"--use-format-v3",
"--verbose",
"--config",
str(config_file),
"--topic",
Expand All @@ -154,14 +172,15 @@ def test_roundtrip_from_kafka_state(
assert metadata_path.exists()

# Delete the source topic.
admin_client.delete_topic(new_topic.topic)
_delete_topic(admin_client, new_topic.topic)

# todo: assert new topic uuid != old topic uuid?
# Execute backup restoration.
subprocess.run(
[
"karapace_schema_backup",
"restore",
"--verbose",
"--config",
str(config_file),
"--topic",
Expand Down Expand Up @@ -230,6 +249,7 @@ def test_roundtrip_empty_topic(
"karapace_schema_backup",
"get",
"--use-format-v3",
"--verbose",
"--config",
str(config_file),
"--topic",
Expand All @@ -249,13 +269,14 @@ def test_roundtrip_empty_topic(
(metadata_path,) = backup_directory.iterdir()

# Delete the source topic.
admin_client.delete_topic(new_topic.topic)
_delete_topic(admin_client, new_topic.topic)

# Execute backup restoration.
subprocess.run(
[
"karapace_schema_backup",
"restore",
"--verbose",
"--config",
str(config_file),
"--topic",
Expand Down Expand Up @@ -293,6 +314,7 @@ def test_errors_when_omitting_replication_factor(config_file: Path) -> None:
[
"karapace_schema_backup",
"get",
"--verbose",
"--use-format-v3",
f"--config={config_file!s}",
"--topic=foo-bar",
Expand All @@ -317,19 +339,15 @@ def test_exits_with_return_code_3_for_data_restoration_error(
)

# Make sure topic doesn't exist beforehand.
try:
admin_client.delete_topic(topic_name)
except UnknownTopicOrPartitionError:
logger.info("No previously existing topic.")
else:
logger.info("Deleted topic from previous run.")
_delete_topic(admin_client, topic_name)

admin_client.new_topic(topic_name)
with pytest.raises(subprocess.CalledProcessError) as er:
subprocess.run(
[
"karapace_schema_backup",
"restore",
"--verbose",
"--config",
str(config_file),
"--topic",
Expand Down Expand Up @@ -357,18 +375,14 @@ def test_roundtrip_from_file(
(data_file,) = metadata_path.parent.glob("*.data")

# Make sure topic doesn't exist beforehand.
try:
admin_client.delete_topic(topic_name)
except UnknownTopicOrPartitionError:
logger.info("No previously existing topic.")
else:
logger.info("Deleted topic from previous run.")
_delete_topic(admin_client, topic_name)

# Execute backup restoration.
subprocess.run(
[
"karapace_schema_backup",
"restore",
"--verbose",
"--config",
str(config_file),
"--topic",
Expand All @@ -387,6 +401,7 @@ def test_roundtrip_from_file(
[
"karapace_schema_backup",
"get",
"--verbose",
"--use-format-v3",
"--config",
str(config_file),
Expand Down Expand Up @@ -450,20 +465,15 @@ def test_roundtrip_from_file_skipping_topic_creation(
(data_file,) = metadata_path.parent.glob("*.data")

# Create topic exactly as it was stored on backup file
try:
admin_client.delete_topic(topic_name)
except UnknownTopicOrPartitionError:
logger.info("No previously existing topic.")
else:
logger.info("Deleted topic from previous run.")

_delete_topic(admin_client, topic_name)
admin_client.new_topic(topic_name)

# Execute backup restoration.
subprocess.run(
[
"karapace_schema_backup",
"restore",
"--verbose",
"--config",
str(config_file),
"--topic",
Expand All @@ -484,6 +494,7 @@ def test_roundtrip_from_file_skipping_topic_creation(
"karapace_schema_backup",
"get",
"--use-format-v3",
"--verbose",
"--config",
str(config_file),
"--topic",
Expand Down Expand Up @@ -542,12 +553,7 @@ def test_backup_restoration_fails_when_topic_does_not_exist_and_skip_creation_is
metadata_path = backup_directory / f"{topic_name}.metadata"

# Make sure topic doesn't exist beforehand.
try:
admin_client.delete_topic(topic_name)
except UnknownTopicOrPartitionError:
logger.info("No previously existing topic.")
else:
logger.info("Deleted topic from previous run.")
_delete_topic(admin_client, topic_name)

config = set_config_defaults(
{
Expand Down Expand Up @@ -595,12 +601,7 @@ def test_backup_restoration_fails_when_producer_send_fails_on_unknown_topic_or_p
metadata_path = backup_directory / f"{topic_name}.metadata"

# Make sure topic doesn't exist beforehand.
try:
admin_client.delete_topic(topic_name)
except UnknownTopicOrPartitionError:
logger.info("No previously existing topic.")
else:
logger.info("Deleted topic from previous run.")
_delete_topic(admin_client, topic_name)

config = set_config_defaults(
{
Expand Down Expand Up @@ -650,12 +651,7 @@ def test_backup_restoration_fails_when_producer_send_fails_on_buffer_error(
metadata_path = backup_directory / f"{topic_name}.metadata"

# Make sure topic doesn't exist beforehand.
try:
admin_client.delete_topic(topic_name)
except UnknownTopicOrPartitionError:
logger.info("No previously existing topic.")
else:
logger.info("Deleted topic from previous run.")
_delete_topic(admin_client, topic_name)

config = set_config_defaults(
{
Expand Down Expand Up @@ -719,6 +715,7 @@ def test_can_inspect_v3(self) -> None:
[
"karapace_schema_backup",
"inspect",
"--verbose",
"--location",
str(metadata_path),
],
Expand Down Expand Up @@ -769,6 +766,7 @@ def test_can_inspect_v3_with_future_checksum_algorithm(self) -> None:
[
"karapace_schema_backup",
"inspect",
"--verbose",
"--location",
str(metadata_path),
],
Expand Down Expand Up @@ -832,6 +830,7 @@ def test_can_inspect_v1(self) -> None:
[
"karapace_schema_backup",
"inspect",
"--verbose",
"--location",
str(backup_path),
],
Expand All @@ -856,6 +855,7 @@ def test_can_verify_file_integrity(self) -> None:
[
"karapace_schema_backup",
"verify",
"--verbose",
f"--location={metadata_path!s}",
"--level=file",
],
Expand Down Expand Up @@ -883,6 +883,7 @@ def test_can_verify_record_integrity(self) -> None:
[
"karapace_schema_backup",
"verify",
"--verbose",
f"--location={metadata_path!s}",
"--level=record",
],
Expand Down Expand Up @@ -924,6 +925,7 @@ def test_can_verify_file_integrity_from_large_topic(
"karapace_schema_backup",
"get",
"--use-format-v3",
"--verbose",
f"--config={config_file!s}",
f"--topic={new_topic.topic!s}",
"--replication-factor=1",
Expand All @@ -938,6 +940,7 @@ def test_can_verify_file_integrity_from_large_topic(
[
"karapace_schema_backup",
"verify",
"--verbose",
f"--location={metadata_path!s}",
"--level=file",
],
Expand Down Expand Up @@ -979,6 +982,7 @@ def test_can_verify_record_integrity_from_large_topic(
"karapace_schema_backup",
"get",
"--use-format-v3",
"--verbose",
f"--config={config_file!s}",
f"--topic={new_topic.topic}",
"--replication-factor=1",
Expand All @@ -993,6 +997,7 @@ def test_can_verify_record_integrity_from_large_topic(
[
"karapace_schema_backup",
"verify",
"--verbose",
f"--location={metadata_path}",
"--level=record",
],
Expand Down Expand Up @@ -1022,6 +1027,7 @@ def test_can_refute_file_integrity(self) -> None:
[
"karapace_schema_backup",
"verify",
"--verbose",
f"--location={metadata_path!s}",
"--level=file",
],
Expand Down Expand Up @@ -1051,6 +1057,7 @@ def test_can_refute_record_integrity(self) -> None:
[
"karapace_schema_backup",
"verify",
"--verbose",
f"--location={metadata_path!s}",
"--level=record",
],
Expand Down Expand Up @@ -1093,6 +1100,7 @@ def test_gives_non_successful_exit_code_for_legacy_backup_format(
[
"karapace_schema_backup",
"verify",
"--verbose",
f"--location={backup_path}",
"--level=file",
],
Expand Down

0 comments on commit 60bf6af

Please sign in to comment.