Skip to content

Commit

Permalink
[hotfix][test] Refactors TestingLeaderCallbackHandler to allow async …
Browse files Browse the repository at this point in the history
…calls

This way, we can use FlinkAssertions#assertThatFuture and use assertion messages instead of comments.
  • Loading branch information
XComp committed Jan 31, 2024
1 parent 5f30bb1 commit ae62b5d
Showing 1 changed file with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.flink.kubernetes.kubeclient.resources;

import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.Preconditions;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.LinkedBlockingQueue;

/** Testing implementation for {@link KubernetesLeaderElector.LeaderCallbackHandler}. */
Expand Down Expand Up @@ -60,22 +62,42 @@ public boolean hasLeadership() {
}

public static String waitUntilNewLeaderAppears() throws Exception {
return sharedQueue.take();
return retrieveNextEventAsync(sharedQueue).get();
}

public static CompletableFuture<String> retrieveNextEventAsync(
BlockingQueue<String> eventQueue) {
return CompletableFuture.supplyAsync(
() -> {
try {
return eventQueue.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CompletionException(e);
}
});
}

public void waitForNewLeader() throws Exception {
poll(leaderQueue);
waitForNewLeaderAsync().get();
}

public CompletableFuture<Void> waitForNewLeaderAsync() {
return waitForNextEvent(leaderQueue);
}

public void waitForRevokeLeader() throws Exception {
poll(revokeQueue);
waitForRevokeLeaderAsync().get();
}

private void poll(BlockingQueue<String> queue) throws Exception {
CommonTestUtils.waitUntilCondition(
() -> {
final String lockIdentity = queue.take();
return this.lockIdentity.equals(lockIdentity);
});
public CompletableFuture<Void> waitForRevokeLeaderAsync() {
return waitForNextEvent(revokeQueue);
}

private CompletableFuture<Void> waitForNextEvent(BlockingQueue<String> eventQueue) {
return retrieveNextEventAsync(eventQueue)
.thenAccept(
eventLockIdentity ->
Preconditions.checkState(eventLockIdentity.equals(lockIdentity)));
}
}

0 comments on commit ae62b5d

Please sign in to comment.