Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rptest: fix cloud start kafka offset computation #10262

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions tests/rptest/tests/cloud_storage_timing_stress_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from rptest.clients.types import TopicSpec
from rptest.tests.partition_movement import PartitionMovementMixin
from ducktape.utils.util import wait_until
from ducktape.mark import matrix, ok_to_fail
from rptest.utils.mode_checks import skip_debug_mode

import concurrent.futures
Expand Down Expand Up @@ -440,7 +439,6 @@ def start_check(check):

self.logger.info(f"All checks completed successfuly")

@ok_to_fail # https://github.com/redpanda-data/redpanda/issues/10241
@cluster(num_nodes=5)
def test_cloud_storage(self):
"""
Expand All @@ -455,7 +453,6 @@ def test_cloud_storage(self):

self.epilogue()

@ok_to_fail # https://github.com/redpanda-data/redpanda/issues/10241
@cluster(
num_nodes=5,
log_allow_list=[r"Error in hydraton loop: .*Connection reset by peer"])
Expand Down
27 changes: 22 additions & 5 deletions tests/rptest/utils/si_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,28 @@ def kafka_start_offset(manifest) -> Optional[int]:
return None

start_model_offset = manifest['start_offset']
first_segment = min(manifest['segments'].values(),
key=lambda seg: seg['base_offset'])
delta = first_segment['delta_offset']

return start_model_offset - delta
for seg in manifest['segments'].values():
if seg['base_offset'] == start_model_offset:
# Usually, the manifest's 'start_offset' will match the 'base_offset'
# of a 'segment' as retention normally advances the start offset to
# another segment's base offset. This branch covers this case.
delta = seg['delta_offset']
return start_model_offset - delta
elif start_model_offset == seg['committed_offset'] + 1:
# When retention decides to remove all current segments from the cloud
# according to the retention policy, it advances the manifest's start
# offset to `committed_offset + 1` of the latest segment present at the time.
# Since, there's no guarantee that new segments haven't been added in the
# meantime, we look for a match in all segments.
delta = seg['delta_offset_end']
return start_model_offset - delta

assert (
False,
"'start_offset' in manifest is inconsistent with contents of 'segments'."
"'start_offset' should match either the 'base_offset' or 'committed_offset + 1'"
f"of a segment in 'segments': start_offset={start_model_offset}, segments={manifest['segments']}"
)

@staticmethod
def kafka_last_offset(manifest) -> Optional[int]:
Expand Down