Skip to content

Commit

Permalink
fixup! feat(e2e): move e2e tests from [1]
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Aug 10, 2023
1 parent a6962e9 commit c59042b
Showing 1 changed file with 20 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.aiven.kafka.tieredstorage.e2e.internal;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -122,26 +123,37 @@ private boolean allRemoteSegmentsAreFinished() {
}

public void waitUntilSegmentsAreDeleted(final List<RemoteSegment> segmentsToBeDeleted) {
// TODO set some max wait time

final Supplier<Boolean> segmentsDeleted = () -> segmentsToBeDeleted.stream()
.allMatch(rs ->
remoteSegmentStates.get(rs.remoteLogSegmentId()).equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED));
ConsumerRecords<byte[], RemoteLogMetadata> records;

final var metadataRecords = new ArrayList<RemoteLogMetadata>();
final var timeout = Duration.ofSeconds(30);

final var startAt = System.currentTimeMillis();

while (!(records = consumer.poll(Duration.ofSeconds(5))).isEmpty() || !segmentsDeleted.get()) {
records.forEach(r -> {
final RemoteLogMetadata metadata = r.value();
metadataRecords.add(metadata);
if (metadata instanceof RemoteLogSegmentMetadataUpdate) {
final var rlsmu = (RemoteLogSegmentMetadataUpdate) metadata;
remoteSegmentStates.put(rlsmu.remoteLogSegmentId(), rlsmu.state());
if (rlsmu.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)) {
final var metadataUpdate = (RemoteLogSegmentMetadataUpdate) metadata;
remoteSegmentStates.put(metadataUpdate.remoteLogSegmentId(), metadataUpdate.state());
if (metadataUpdate.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)) {
remoteSegments.get(metadata.topicIdPartition())
.removeIf(rs -> rs.remoteLogSegmentId().equals(rlsmu.remoteLogSegmentId()));
.removeIf(
segment -> segment.remoteLogSegmentId().equals(metadataUpdate.remoteLogSegmentId()));
}
} else {
fail("Unexpected metadata: %s", metadata);
}
});
if (System.currentTimeMillis() - startAt > timeout.toMillis()) {
break;
}
}

if (!segmentsDeleted.get()) {
fail("Fail to receive delete metadata records. Records received: %s", metadataRecords);
}
}
}

0 comments on commit c59042b

Please sign in to comment.