From 6c75023d25385bada5e40563d5d2b3f82bdbaccd Mon Sep 17 00:00:00 2001 From: Jorge Esteban Quilcate Otoya Date: Thu, 10 Aug 2023 15:25:26 +0300 Subject: [PATCH] fixup! feat(e2e): move e2e tests from [1] --- .../internal/RemoteLogMetadataTracker.java | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java index a4345f468..176815c5c 100644 --- a/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java +++ b/e2e/src/test/java/io/aiven/kafka/tieredstorage/e2e/internal/RemoteLogMetadataTracker.java @@ -17,6 +17,7 @@ package io.aiven.kafka.tieredstorage.e2e.internal; import java.time.Duration; +import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -122,26 +123,37 @@ private boolean allRemoteSegmentsAreFinished() { } public void waitUntilSegmentsAreDeleted(final List segmentsToBeDeleted) { - // TODO set some max wait time - final Supplier segmentsDeleted = () -> segmentsToBeDeleted.stream() .allMatch(rs -> remoteSegmentStates.get(rs.remoteLogSegmentId()).equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)); ConsumerRecords records; + + var metadataRecords = new ArrayList(); + var timeout = Duration.ofSeconds(30); + + var startAt = System.currentTimeMillis(); + while (!(records = consumer.poll(Duration.ofSeconds(5))).isEmpty() || !segmentsDeleted.get()) { records.forEach(r -> { final RemoteLogMetadata metadata = r.value(); + metadataRecords.add(metadata); if (metadata instanceof RemoteLogSegmentMetadataUpdate) { - final var rlsmu = (RemoteLogSegmentMetadataUpdate) metadata; - remoteSegmentStates.put(rlsmu.remoteLogSegmentId(), rlsmu.state()); - if (rlsmu.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)) { + final var metadataUpdate = (RemoteLogSegmentMetadataUpdate) metadata; + remoteSegmentStates.put(metadataUpdate.remoteLogSegmentId(), metadataUpdate.state()); + if (metadataUpdate.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)) { remoteSegments.get(metadata.topicIdPartition()) - .removeIf(rs -> rs.remoteLogSegmentId().equals(rlsmu.remoteLogSegmentId())); + .removeIf( + segment -> segment.remoteLogSegmentId().equals(metadataUpdate.remoteLogSegmentId())); } - } else { - fail("Unexpected metadata: %s", metadata); } }); + if (System.currentTimeMillis() - startAt > timeout.toMillis()) { + break; + } + } + + if (!segmentsDeleted.get()) { + fail("Fail to receive delete metadata records. Records received: %s", metadataRecords); } } }