Skip to content

Commit

Permalink
Use of utility test method
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Nov 25, 2024
1 parent 9b4e87e commit da42067
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.leaderelection.LeaderElectionTestUtils.assertRunAsLeaderOmitsCallbackDueToLostLeadership;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for leader election. */
Expand Down Expand Up @@ -121,16 +122,6 @@ private static void assertRunAsLeaderSuccessfully(
assertThat(callbackTriggered).isTrue();
}

private static void assertRunAsLeaderOmitsCallbackDueToLostLeadership(
LeaderElection testInstance, UUID leaderSessionId) {
final AtomicBoolean callbackTriggered = new AtomicBoolean(false);
final CompletableFuture<Void> resultFuture =
testInstance.runAsLeader(leaderSessionId, () -> callbackTriggered.set(true));

assertThatFuture(resultFuture).eventuallyFailsWith(LeadershipLostException.class);
assertThat(callbackTriggered).isFalse();
}

private static final class ManualLeaderContender implements LeaderContender {

private static final UUID NULL_LEADER_SESSION_ID = new UUID(0L, 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.leaderelection.LeaderElectionTestUtils.assertRunAsLeaderOmitsCallbackDueToLostLeadership;
import static org.apache.flink.runtime.leaderelection.LeaderElectionTestUtils.assertRunAsLeaderSuccessfully;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -88,27 +88,18 @@ void testRunAsLeaderWithContender() throws Exception {
testInstance.startLeaderElection(contender);

assertRunAsLeaderSuccessfully(testInstance, SESSION_ID);
assertRunAsLeaderFailsDueToLostLeadership(testInstance, UUID.randomUUID());
assertRunAsLeaderOmitsCallbackDueToLostLeadership(testInstance, UUID.randomUUID());
}
}

@Test
void testRunAsLeaderWithoutContender() throws Exception {
try (final LeaderElection testInstance = new StandaloneLeaderElection(SESSION_ID)) {
assertRunAsLeaderFailsDueToLostLeadership(testInstance, SESSION_ID);
assertRunAsLeaderFailsDueToLostLeadership(testInstance, UUID.randomUUID());
assertRunAsLeaderOmitsCallbackDueToLostLeadership(testInstance, SESSION_ID);
assertRunAsLeaderOmitsCallbackDueToLostLeadership(testInstance, UUID.randomUUID());
}
}

private static void assertRunAsLeaderFailsDueToLostLeadership(
LeaderElection testInstance, UUID sessionId) {
final AtomicBoolean callbackTriggered = new AtomicBoolean(false);
final CompletableFuture<Void> resultFuture =
testInstance.runAsLeader(sessionId, () -> callbackTriggered.set(true));
assertThatFuture(resultFuture).eventuallyFailsWith(LeadershipLostException.class);
assertThat(callbackTriggered).isFalse();
}

@Test
void testRevokeCallOnClose() throws Exception {
final AtomicBoolean revokeLeadershipCalled = new AtomicBoolean(false);
Expand Down

0 comments on commit da42067

Please sign in to comment.