Skip to content

Commit

Permalink
chore(build): add e2e to ci workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Aug 10, 2023
1 parent fe879c4 commit 8498791
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/main_push_and_pull_request_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,8 @@ jobs:
java-version: ${{ matrix.java-version }}
distribution: temurin
- name: Build with Gradle
run: ./gradlew build distTar
run: ./gradlew build distTar -x e2e:test
- name: Build docker image
run: make docker_image
- name: Run E2E tests
run: ./gradlew e2e:test
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ clean:
./gradlew clean

build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz:
./gradlew build distTar
./gradlew build distTar -x e2e:test

.PHONY: docker_image
docker_image: build/distributions/tiered-storage-for-apache-kafka-$(VERSION).tgz
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.aiven.kafka.tieredstorage.e2e.internal;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -122,26 +123,37 @@ private boolean allRemoteSegmentsAreFinished() {
}

public void waitUntilSegmentsAreDeleted(final List<RemoteSegment> segmentsToBeDeleted) {
// TODO set some max wait time

final Supplier<Boolean> segmentsDeleted = () -> segmentsToBeDeleted.stream()
.allMatch(rs ->
remoteSegmentStates.get(rs.remoteLogSegmentId()).equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED));
ConsumerRecords<byte[], RemoteLogMetadata> records;

final var metadataRecords = new ArrayList<RemoteLogMetadata>();
final var timeout = Duration.ofSeconds(30);

final var startAt = System.currentTimeMillis();

while (!(records = consumer.poll(Duration.ofSeconds(5))).isEmpty() || !segmentsDeleted.get()) {
records.forEach(r -> {
final RemoteLogMetadata metadata = r.value();
metadataRecords.add(metadata);
if (metadata instanceof RemoteLogSegmentMetadataUpdate) {
final var rlsmu = (RemoteLogSegmentMetadataUpdate) metadata;
remoteSegmentStates.put(rlsmu.remoteLogSegmentId(), rlsmu.state());
if (rlsmu.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)) {
final var metadataUpdate = (RemoteLogSegmentMetadataUpdate) metadata;
remoteSegmentStates.put(metadataUpdate.remoteLogSegmentId(), metadataUpdate.state());
if (metadataUpdate.state().equals(RemoteLogSegmentState.DELETE_SEGMENT_FINISHED)) {
remoteSegments.get(metadata.topicIdPartition())
.removeIf(rs -> rs.remoteLogSegmentId().equals(rlsmu.remoteLogSegmentId()));
.removeIf(
segment -> segment.remoteLogSegmentId().equals(metadataUpdate.remoteLogSegmentId()));
}
} else {
fail("Unexpected metadata: %s", metadata);
}
});
if (System.currentTimeMillis() - startAt > timeout.toMillis()) {
break;
}
}

if (!segmentsDeleted.get()) {
fail("Fail to receive delete metadata records. Records received: %s", metadataRecords);
}
}
}

0 comments on commit 8498791

Please sign in to comment.