diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java index bdfd86ff534d2..997128efd5556 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/resources/TestingLeaderCallbackHandler.java @@ -18,9 +18,11 @@ package org.apache.flink.kubernetes.kubeclient.resources; -import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.util.Preconditions; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.LinkedBlockingQueue; /** Testing implementation for {@link KubernetesLeaderElector.LeaderCallbackHandler}. */ @@ -60,22 +62,42 @@ public boolean hasLeadership() { } public static String waitUntilNewLeaderAppears() throws Exception { - return sharedQueue.take(); + return retrieveNextEventAsync(sharedQueue).get(); + } + + public static CompletableFuture retrieveNextEventAsync( + BlockingQueue eventQueue) { + return CompletableFuture.supplyAsync( + () -> { + try { + return eventQueue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CompletionException(e); + } + }); } public void waitForNewLeader() throws Exception { - poll(leaderQueue); + waitForNewLeaderAsync().get(); + } + + public CompletableFuture waitForNewLeaderAsync() { + return waitForNextEvent(leaderQueue); } public void waitForRevokeLeader() throws Exception { - poll(revokeQueue); + waitForRevokeLeaderAsync().get(); } - private void poll(BlockingQueue queue) throws Exception { - CommonTestUtils.waitUntilCondition( - () -> { - final String lockIdentity = queue.take(); - return this.lockIdentity.equals(lockIdentity); - }); + public CompletableFuture waitForRevokeLeaderAsync() { + return waitForNextEvent(revokeQueue); + } + + private CompletableFuture waitForNextEvent(BlockingQueue eventQueue) { + return retrieveNextEventAsync(eventQueue) + .thenAccept( + eventLockIdentity -> + Preconditions.checkState(eventLockIdentity.equals(lockIdentity))); } }