Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e_topic_recovery_test: have s3 check the delta offset #8856

Merged
merged 1 commit into from
Feb 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions tests/rptest/tests/e2e_topic_recovery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,20 @@ def _s3_has_all_data(self, num_messages):
self._bucket, o.key)
manifest = json.loads(data)
last_upl_offset = manifest['last_offset']
self.logger.info(
f"Found manifest at {o.key}, last_offset is {last_upl_offset}"
)
# We have one partition so this invariant holds
# it has to be changed when the number of partitions
# will get larger. This will also require different
# S3 check.
return last_upl_offset >= num_messages
if 'segments' in manifest:
segments = manifest['segments']
last_segment = segments[list(segments)[-1]]
last_delta_offset = last_segment['delta_offset_end']
# We have one partition so this invariant holds it has to
# be changed when the number of partitions will get larger.
# This will also require different S3 check.
self.logger.info(
f"Found manifest at {o.key}, last_offset is {last_upl_offset}, last delta_offset is {last_delta_offset}"
)
return (last_upl_offset - last_delta_offset +
1) >= num_messages
else:
return False
return False

@cluster(num_nodes=4)
Expand Down Expand Up @@ -182,6 +188,7 @@ def test_restore_with_config_batches(self, num_messages,
# we just care for the consumer to receive some data
self._consumer.wait(timeout_sec=100)

@skip_debug_mode
@cluster(num_nodes=4)
@matrix(message_size=[5000],
num_messages=[100000],
Expand All @@ -194,11 +201,6 @@ def test_restore(self, message_size, num_messages, recovery_overrides,
"""Write some data. Remove local data then restore
the cluster."""

if self.debug_mode:
self.logger.info(
"Skipping test in debug mode (requires release build)")
return

self.init_producer(message_size, num_messages)
self._producer.start(clean=False)

Expand Down