diff --git a/tests/rptest/tests/cloud_storage_timing_stress_test.py b/tests/rptest/tests/cloud_storage_timing_stress_test.py index 1388154ce192..5f37ebce1bf7 100644 --- a/tests/rptest/tests/cloud_storage_timing_stress_test.py +++ b/tests/rptest/tests/cloud_storage_timing_stress_test.py @@ -17,7 +17,6 @@ from rptest.clients.types import TopicSpec from rptest.tests.partition_movement import PartitionMovementMixin from ducktape.utils.util import wait_until -from ducktape.mark import matrix, ok_to_fail from rptest.utils.mode_checks import skip_debug_mode import concurrent.futures @@ -440,7 +439,6 @@ def start_check(check): self.logger.info(f"All checks completed successfuly") - @ok_to_fail # https://github.com/redpanda-data/redpanda/issues/10241 @cluster(num_nodes=5) def test_cloud_storage(self): """ @@ -455,7 +453,6 @@ def test_cloud_storage(self): self.epilogue() - @ok_to_fail # https://github.com/redpanda-data/redpanda/issues/10241 @cluster( num_nodes=5, log_allow_list=[r"Error in hydraton loop: .*Connection reset by peer"]) diff --git a/tests/rptest/utils/si_utils.py b/tests/rptest/utils/si_utils.py index 16f2219c4b7c..96824621a9c3 100644 --- a/tests/rptest/utils/si_utils.py +++ b/tests/rptest/utils/si_utils.py @@ -489,11 +489,28 @@ def kafka_start_offset(manifest) -> Optional[int]: return None start_model_offset = manifest['start_offset'] - first_segment = min(manifest['segments'].values(), - key=lambda seg: seg['base_offset']) - delta = first_segment['delta_offset'] - - return start_model_offset - delta + for seg in manifest['segments'].values(): + if seg['base_offset'] == start_model_offset: + # Usually, the manifest's 'start_offset' will match the 'base_offset' + # of a 'segment' as retention normally advances the start offset to + # another segment's base offset. This branch covers this case. + delta = seg['delta_offset'] + return start_model_offset - delta + elif start_model_offset == seg['committed_offset'] + 1: + # When retention decides to remove all current segments from the cloud + # according to the retention policy, it advances the manifest's start + # offset to `committed_offset + 1` of the latest segment present at the time. + # Since, there's no guarantee that new segments haven't been added in the + # meantime, we look for a match in all segments. + delta = seg['delta_offset_end'] + return start_model_offset - delta + + assert ( + False, + "'start_offset' in manifest is inconsistent with contents of 'segments'." + "'start_offset' should match either the 'base_offset' or 'committed_offset + 1'" + f"of a segment in 'segments': start_offset={start_model_offset}, segments={manifest['segments']}" + ) @staticmethod def kafka_last_offset(manifest) -> Optional[int]: