From 5f0c169eacf7f0971a35c92ac035fb1ca2992bde Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Tue, 3 Dec 2024 08:53:03 +0100 Subject: [PATCH] [FLINK-36451][runtime] Adds Async suffix to method names --- .../runner/DefaultDispatcherRunner.java | 2 +- .../nonha/embedded/EmbeddedLeaderService.java | 4 +-- .../JobMasterServiceLeadershipRunner.java | 4 +-- .../leaderelection/DefaultLeaderElection.java | 14 ++++---- .../DefaultLeaderElectionService.java | 5 +-- .../leaderelection/LeaderElection.java | 4 +-- .../leaderelection/LeaderElectionService.java | 8 ++--- .../StandaloneLeaderElection.java | 5 +-- .../ResourceManagerServiceImpl.java | 2 +- .../webmonitor/WebMonitorEndpoint.java | 2 +- .../runner/DefaultDispatcherRunnerTest.java | 2 +- .../embedded/EmbeddedHaServicesTest.java | 14 ++++---- .../DefaultLeaderElectionServiceTest.java | 34 +++++++++---------- .../DefaultLeaderElectionTest.java | 18 +++++----- .../leaderelection/LeaderElectionTest.java | 12 +++---- .../StandaloneLeaderElectionTest.java | 12 +++---- .../leaderelection/TestingContender.java | 2 +- .../leaderelection/TestingLeaderElection.java | 4 +-- .../DocumentingDispatcherRestEndpoint.java | 4 +-- 19 files changed, 78 insertions(+), 74 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java index 9326a1ff41ab0e..767fa5efde7c50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java @@ -178,7 +178,7 @@ private void forwardConfirmLeaderSessionFuture( .getLeaderAddressFuture() .thenCompose( leaderAddress -> - leaderElection.confirmLeadership( + leaderElection.confirmLeadershipAsync( leaderSessionID, leaderAddress))); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java index 0d2085f44e6238..68e08f4e36cc0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java @@ -467,7 +467,7 @@ public void close() { } @Override - public CompletableFuture confirmLeadership( + public CompletableFuture confirmLeadershipAsync( UUID leaderSessionID, String leaderAddress) { checkNotNull(leaderSessionID); checkNotNull(leaderAddress); @@ -475,7 +475,7 @@ public CompletableFuture confirmLeadership( } @Override - public CompletableFuture hasLeadership(UUID leaderSessionId) { + public CompletableFuture hasLeadershipAsync(UUID leaderSessionId) { return CompletableFuture.completedFuture( isLeader && leaderSessionId.equals(currentLeaderSessionId)); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java index 5a479708ae29ab..4a9a43813f9cb4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java @@ -344,7 +344,7 @@ private void confirmLeadership( LOG.debug( "Confirm leadership {}.", leaderSessionId); - return leaderElection.confirmLeadership( + return leaderElection.confirmLeadershipAsync( leaderSessionId, address); }, "confirming leadership") @@ -487,7 +487,7 @@ private CompletableFuture runIfValidLeader( synchronized (lock) { if (isRunning() && leaderElection != null) { return leaderElection - .hasLeadership(expectedLeaderId) + .hasLeadershipAsync(expectedLeaderId) .thenAccept( hasLeadership -> { synchronized (lock) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java index b74566aab12a03..773a3fccec821c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java @@ -44,13 +44,14 @@ public void startLeaderElection(LeaderContender contender) throws Exception { } @Override - public CompletableFuture confirmLeadership(UUID leaderSessionID, String leaderAddress) { - return parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress); + public CompletableFuture confirmLeadershipAsync( + UUID leaderSessionID, String leaderAddress) { + return parentService.confirmLeadershipAsync(componentId, leaderSessionID, leaderAddress); } @Override - public CompletableFuture hasLeadership(UUID leaderSessionId) { - return parentService.hasLeadership(componentId, leaderSessionId); + public CompletableFuture hasLeadershipAsync(UUID leaderSessionId) { + return parentService.hasLeadershipAsync(componentId, leaderSessionId); } @Override @@ -82,7 +83,7 @@ abstract static class ParentService { * the {@link LeaderContender} that is associated with the {@code componentId}. The * information is only propagated to the HA backend if the leadership is still acquired. */ - abstract CompletableFuture confirmLeadership( + abstract CompletableFuture confirmLeadershipAsync( String componentId, UUID leaderSessionID, String leaderAddress); /** @@ -92,6 +93,7 @@ abstract CompletableFuture confirmLeadership( * @return {@code true} if the service has leadership with the passed {@code * leaderSessionID} acquired; {@code false} otherwise. */ - abstract CompletableFuture hasLeadership(String componentId, UUID leaderSessionID); + abstract CompletableFuture hasLeadershipAsync( + String componentId, UUID leaderSessionID); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java index 376fbb2b057a6f..ebd1d70dc95f67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java @@ -313,7 +313,7 @@ public void close() throws Exception { } @Override - protected CompletableFuture confirmLeadership( + protected CompletableFuture confirmLeadershipAsync( String componentId, UUID leaderSessionID, String leaderAddress) { Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId)); LOG.debug( @@ -365,7 +365,8 @@ protected CompletableFuture confirmLeadership( } @Override - protected CompletableFuture hasLeadership(String componentId, UUID leaderSessionId) { + protected CompletableFuture hasLeadershipAsync( + String componentId, UUID leaderSessionId) { return CompletableFuture.supplyAsync( () -> { synchronized (lock) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java index 5067c708c019e1..090179e8235007 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java @@ -43,7 +43,7 @@ public interface LeaderElection extends AutoCloseable { * @param leaderSessionID The new leader session ID * @param leaderAddress The address of the new leader */ - CompletableFuture confirmLeadership(UUID leaderSessionID, String leaderAddress); + CompletableFuture confirmLeadershipAsync(UUID leaderSessionID, String leaderAddress); /** * Returns {@code true} if the service's {@link LeaderContender} has the leadership under the @@ -52,7 +52,7 @@ public interface LeaderElection extends AutoCloseable { * @param leaderSessionId identifying the current leader * @return true if the associated {@link LeaderContender} is the leader, otherwise false */ - CompletableFuture hasLeadership(UUID leaderSessionId); + CompletableFuture hasLeadershipAsync(UUID leaderSessionId); /** * Closes the {@code LeaderElection} by deregistering the {@link LeaderContender} from the diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java index 2a3e2a15c37b1f..e1a3136a7070a4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java @@ -28,10 +28,10 @@ * instantiate its own leader election service. * *

Once a contender has been granted leadership he has to confirm the received leader session ID - * by calling the method {@link LeaderElection#confirmLeadership(UUID, String)}. This will notify - * the leader election service, that the contender has accepted the leadership specified and that - * the leader session id as well as the leader address can now be published for leader retrieval - * services. + * by calling the method {@link LeaderElection#confirmLeadershipAsync(UUID, String)}. This will + * notify the leader election service, that the contender has accepted the leadership specified and + * that the leader session id as well as the leader address can now be published for leader + * retrieval services. */ public interface LeaderElectionService { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java index 26d6bc2b0cc8cb..9e5a47a997c99b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java @@ -59,12 +59,13 @@ public void startLeaderElection(LeaderContender contender) throws Exception { } @Override - public CompletableFuture confirmLeadership(UUID leaderSessionID, String leaderAddress) { + public CompletableFuture confirmLeadershipAsync( + UUID leaderSessionID, String leaderAddress) { return FutureUtils.completedVoidFuture(); } @Override - public CompletableFuture hasLeadership(UUID leaderSessionId) { + public CompletableFuture hasLeadershipAsync(UUID leaderSessionId) { synchronized (lock) { return CompletableFuture.completedFuture( this.leaderContender != null && this.sessionID.equals(leaderSessionId)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java index 73dd72b26275f6..29eeba2cb5464b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java @@ -266,7 +266,7 @@ private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Excep .thenAcceptAsync( (isStillLeader) -> { if (isStillLeader) { - leaderElection.confirmLeadership( + leaderElection.confirmLeadershipAsync( newLeaderSessionID, newLeaderResourceManager.getAddress()); } }, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 63b621614321e4..25958416af7357 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -1213,7 +1213,7 @@ public void grantLeadership(final UUID leaderSessionID) { "{} was granted leadership with leaderSessionID={}", getRestBaseUrl(), leaderSessionID); - leaderElection.confirmLeadership(leaderSessionID, getRestBaseUrl()); + leaderElection.confirmLeadershipAsync(leaderSessionID, getRestBaseUrl()); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java index 5cd6d0183c5740..6eb00786795b56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java @@ -273,7 +273,7 @@ public void grantLeadership_oldLeader_doesNotConfirmLeaderSession() throws Excep // complete the confirmation future after losing the leadership contenderConfirmationFuture.complete("leader address"); - assertThat(leaderElection.hasLeadership(leaderSessionId).get(), is(false)); + assertThat(leaderElection.hasLeadershipAsync(leaderSessionId).get(), is(false)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java index 9155cf71e39332..c0990bfcba0cd3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java @@ -140,7 +140,7 @@ private void runLeaderRetrievalTest( final UUID leaderId = leaderContender.getLeaderSessionFuture().get(); - leaderElection.confirmLeadership(leaderId, ADDRESS).get(); + leaderElection.confirmLeadershipAsync(leaderId, ADDRESS).get(); final LeaderInformation leaderInformation = leaderRetrievalListener.getLeaderInformationFuture().get(); @@ -172,20 +172,20 @@ public void testConcurrentLeadershipOperations() throws Exception { final UUID oldLeaderSessionId = leaderContender.getLeaderSessionFuture().get(); - assertThat(leaderElection.hasLeadership(oldLeaderSessionId).get(), is(true)); + assertThat(leaderElection.hasLeadershipAsync(oldLeaderSessionId).get(), is(true)); embeddedHaServices.getDispatcherLeaderService().revokeLeadership().get(); - assertThat(leaderElection.hasLeadership(oldLeaderSessionId).get(), is(false)); + assertThat(leaderElection.hasLeadershipAsync(oldLeaderSessionId).get(), is(false)); embeddedHaServices.getDispatcherLeaderService().grantLeadership(); final UUID newLeaderSessionId = leaderContender.getLeaderSessionFuture().get(); - assertThat(leaderElection.hasLeadership(newLeaderSessionId).get(), is(true)); + assertThat(leaderElection.hasLeadershipAsync(newLeaderSessionId).get(), is(true)); - leaderElection.confirmLeadership(oldLeaderSessionId, ADDRESS).get(); - leaderElection.confirmLeadership(newLeaderSessionId, ADDRESS).get(); + leaderElection.confirmLeadershipAsync(oldLeaderSessionId, ADDRESS).get(); + leaderElection.confirmLeadershipAsync(newLeaderSessionId, ADDRESS).get(); - assertThat(leaderElection.hasLeadership(newLeaderSessionId).get(), is(true)); + assertThat(leaderElection.hasLeadershipAsync(newLeaderSessionId).get(), is(true)); leaderContender.tryRethrowException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java index f5e0a4d6f6efbc..5b3f8fda23b9be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java @@ -703,7 +703,7 @@ void testAllLeaderInformationChangeEventWithUnknownComponentId() throws Exceptio } @Test - void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception { + void testHasLeadershipAsyncWithLeadershipButNoGrantEventProcessed() throws Exception { new Context() { { runTestWithManuallyTriggeredEvents( @@ -714,10 +714,10 @@ void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception applyToBothContenderContexts( ctx -> { final CompletableFuture validSessionFuture = - leaderElectionService.hasLeadership( + leaderElectionService.hasLeadershipAsync( ctx.componentId, expectedSessionID); final CompletableFuture invalidSessionFuture = - leaderElectionService.hasLeadership( + leaderElectionService.hasLeadershipAsync( ctx.componentId, UUID.randomUUID()); executorService.triggerAll(); @@ -734,7 +734,7 @@ void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception } @Test - void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception { + void testHasLeadershipAsyncWithLeadershipAndGrantEventProcessed() throws Exception { new Context() { { runTestWithManuallyTriggeredEvents( @@ -753,10 +753,10 @@ void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception { .isEqualTo(expectedSessionID); final CompletableFuture validSessionFuture = - leaderElectionService.hasLeadership( + leaderElectionService.hasLeadershipAsync( ctx.componentId, expectedSessionID); final CompletableFuture invalidSessionFuture = - leaderElectionService.hasLeadership( + leaderElectionService.hasLeadershipAsync( ctx.componentId, UUID.randomUUID()); executorService.triggerAll(); @@ -773,7 +773,7 @@ void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception { } @Test - void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Exception { + void testHasLeadershipAsyncWithLeadershipLostButNoRevokeEventProcessed() throws Exception { new Context() { { runTestWithManuallyTriggeredEvents( @@ -787,10 +787,10 @@ void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Excep applyToBothContenderContexts( ctx -> { final CompletableFuture validSessionFuture = - leaderElectionService.hasLeadership( + leaderElectionService.hasLeadershipAsync( ctx.componentId, expectedSessionID); final CompletableFuture invalidSessionFuture = - leaderElectionService.hasLeadership( + leaderElectionService.hasLeadershipAsync( ctx.componentId, UUID.randomUUID()); executorService.triggerAll(); @@ -814,7 +814,7 @@ void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Excep } @Test - void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Exception { + void testHasLeadershipAsyncWithLeadershipLostAndRevokeEventProcessed() throws Exception { new Context() { { runTestWithSynchronousEventHandling( @@ -826,12 +826,12 @@ void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Excepti applyToBothContenderContexts( ctx -> { assertThatFuture( - leaderElectionService.hasLeadership( + leaderElectionService.hasLeadershipAsync( ctx.componentId, expectedSessionID)) .eventuallySucceeds() .isEqualTo(false); assertThatFuture( - leaderElectionService.hasLeadership( + leaderElectionService.hasLeadershipAsync( ctx.componentId, UUID.randomUUID())) .eventuallySucceeds() .isEqualTo(false); @@ -842,7 +842,7 @@ void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Excepti } @Test - void testHasLeadershipAfterLeaderElectionClose() throws Exception { + void testHasLeadershipAsyncAfterLeaderElectionClose() throws Exception { new Context() { { runTestWithSynchronousEventHandling( @@ -855,7 +855,7 @@ void testHasLeadershipAfterLeaderElectionClose() throws Exception { ctx.leaderElection.close(); assertThatFuture( - leaderElectionService.hasLeadership( + leaderElectionService.hasLeadershipAsync( ctx.componentId, expectedSessionID)) .eventuallySucceeds() .isEqualTo(false); @@ -1061,7 +1061,7 @@ void testOldConfirmLeaderInformationWhileHavingNewLeadership() throws Exception .hasValue(expectedLeaderInformation); // Old confirm call should be ignored. - ctx.leaderElection.confirmLeadership( + ctx.leaderElection.confirmLeadershipAsync( UUID.randomUUID(), ctx.address); assertThat( leaderElectionService.getLeaderSessionID( @@ -1092,7 +1092,7 @@ void testOldConfirmationWhileHavingLeadershipLost() throws Exception { applyToBothContenderContexts( ctx -> { // Old confirm call should be ignored. - ctx.leaderElection.confirmLeadership( + ctx.leaderElection.confirmLeadershipAsync( currentLeaderSessionId, ctx.address); assertThat( @@ -1316,7 +1316,7 @@ void testNestedDeadlockInLeadershipConfirmation() throws Exception { revocationFuture = CompletableFuture.runAsync(testInstance::onRevokeLeadership); contenderLockAcquireLatch.await(); confirmLeadershipFuture = - leaderElection.confirmLeadership(leaderSessionId, "random-address"); + leaderElection.confirmLeadershipAsync(leaderSessionId, "random-address"); } assertThatFuture(revocationFuture).eventuallySucceeds(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java index 2c23c903527f46..11bf45dbfd5720 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java @@ -114,7 +114,7 @@ void testLeaderConfirmation() throws Exception { final UUID expectedLeaderSessionID = UUID.randomUUID(); final String expectedAddress = "random-address"; - testInstance.confirmLeadership(expectedLeaderSessionID, expectedAddress); + testInstance.confirmLeadershipAsync(expectedLeaderSessionID, expectedAddress); assertThat(componentIdRef).hasValue(DEFAULT_TEST_COMPONENT_ID); assertThat(leaderSessionIDRef).hasValue(expectedLeaderSessionID); @@ -158,16 +158,16 @@ void testCloseWithoutStart() throws Exception { } @Test - void testHasLeadershipTrue() throws Exception { - testHasLeadership(true); + void testHasLeadershipAsyncTrue() throws Exception { + testHasLeadershipAsync(true); } @Test - void testHasLeadershipFalse() throws Exception { - testHasLeadership(false); + void testHasLeadershipAsyncFalse() throws Exception { + testHasLeadershipAsync(false); } - private void testHasLeadership(boolean expectedReturnValue) throws Exception { + private void testHasLeadershipAsync(boolean expectedReturnValue) throws Exception { final AtomicReference componentIdRef = new AtomicReference<>(); final AtomicReference leaderSessionIDRef = new AtomicReference<>(); final DefaultLeaderElection.ParentService parentService = @@ -183,7 +183,7 @@ private void testHasLeadership(boolean expectedReturnValue) throws Exception { new DefaultLeaderElection(parentService, DEFAULT_TEST_COMPONENT_ID)) { final UUID expectedLeaderSessionID = UUID.randomUUID(); - assertThatFuture(testInstance.hasLeadership(expectedLeaderSessionID)) + assertThatFuture(testInstance.hasLeadershipAsync(expectedLeaderSessionID)) .eventuallySucceeds() .isEqualTo(expectedReturnValue); assertThat(componentIdRef).hasValue(DEFAULT_TEST_COMPONENT_ID); @@ -225,13 +225,13 @@ protected void remove(String componentId) { } @Override - protected CompletableFuture confirmLeadership( + protected CompletableFuture confirmLeadershipAsync( String componentId, UUID leaderSessionID, String leaderAddress) { return confirmLeadershipConsumer.apply(componentId, leaderSessionID, leaderAddress); } @Override - protected CompletableFuture hasLeadership( + protected CompletableFuture hasLeadershipAsync( String componentId, UUID leaderSessionId) { return hasLeadershipFunction.apply(componentId, leaderSessionId); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java index e91ed4b2e78801..b11c639b7101d1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java @@ -81,7 +81,7 @@ void teardown() throws Exception { } @TestTemplate - void testHasLeadership() throws Exception { + void testHasLeadershipAsync() throws Exception { final ManualLeaderContender manualLeaderContender = new ManualLeaderContender(); try { @@ -90,23 +90,23 @@ void testHasLeadership() throws Exception { final UUID leaderSessionId = manualLeaderContender.waitForLeaderSessionId(); - assertThatFuture(leaderElection.hasLeadership(leaderSessionId)) + assertThatFuture(leaderElection.hasLeadershipAsync(leaderSessionId)) .eventuallySucceeds() .isEqualTo(true); - assertThatFuture(leaderElection.hasLeadership(UUID.randomUUID())) + assertThatFuture(leaderElection.hasLeadershipAsync(UUID.randomUUID())) .eventuallySucceeds() .isEqualTo(false); - assertThatFuture(leaderElection.confirmLeadership(leaderSessionId, "foobar")) + assertThatFuture(leaderElection.confirmLeadershipAsync(leaderSessionId, "foobar")) .eventuallySucceeds(); - assertThatFuture(leaderElection.hasLeadership(leaderSessionId)) + assertThatFuture(leaderElection.hasLeadershipAsync(leaderSessionId)) .eventuallySucceeds() .isEqualTo(true); leaderElection.close(); - assertThatFuture(leaderElection.hasLeadership(leaderSessionId)) + assertThatFuture(leaderElection.hasLeadershipAsync(leaderSessionId)) .eventuallySucceeds() .isEqualTo(false); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java index 59b6aace43ea97..2869f1ca031d3a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java @@ -80,32 +80,32 @@ void testStartLeaderElection() throws Exception { } @Test - void testHasLeadershipWithContender() throws Exception { + void testHasLeadershipAsyncWithContender() throws Exception { final TestingGenericLeaderContender contender = TestingGenericLeaderContender.newBuilder().build(); try (final LeaderElection testInstance = new StandaloneLeaderElection(SESSION_ID)) { testInstance.startLeaderElection(contender); - assertThatFuture(testInstance.hasLeadership(SESSION_ID)) + assertThatFuture(testInstance.hasLeadershipAsync(SESSION_ID)) .eventuallySucceeds() .isEqualTo(true); final UUID differentSessionID = UUID.randomUUID(); - assertThatFuture(testInstance.hasLeadership(differentSessionID)) + assertThatFuture(testInstance.hasLeadershipAsync(differentSessionID)) .eventuallySucceeds() .isEqualTo(false); } } @Test - void testHasLeadershipWithoutContender() throws Exception { + void testHasLeadershipAsyncWithoutContender() throws Exception { try (final LeaderElection testInstance = new StandaloneLeaderElection(SESSION_ID)) { - assertThatFuture(testInstance.hasLeadership(SESSION_ID)) + assertThatFuture(testInstance.hasLeadershipAsync(SESSION_ID)) .eventuallySucceeds() .isEqualTo(false); final UUID differentSessionID = UUID.randomUUID(); - assertThatFuture(testInstance.hasLeadership(differentSessionID)) + assertThatFuture(testInstance.hasLeadershipAsync(differentSessionID)) .eventuallySucceeds() .isEqualTo(false); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java index 3352c1d2561586..95437d0c42b1e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java @@ -51,7 +51,7 @@ public void grantLeadership(UUID leaderSessionID) { this.leaderSessionID = leaderSessionID; leaderElection - .confirmLeadership(leaderSessionID, address) + .confirmLeadershipAsync(leaderSessionID, address) .thenRun( () -> leaderEventQueue.offer( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java index 69af81d4be001b..a0ce592a193a8c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java @@ -66,7 +66,7 @@ public synchronized void startLeaderElection(LeaderContender contender) throws E } @Override - public synchronized CompletableFuture confirmLeadership( + public synchronized CompletableFuture confirmLeadershipAsync( UUID leaderSessionID, String leaderAddress) { if (confirmationFuture != null && !confirmationFuture.isDone()) { confirmationFuture.complete(LeaderInformation.known(leaderSessionID, leaderAddress)); @@ -76,7 +76,7 @@ public synchronized CompletableFuture confirmLeadership( } @Override - public synchronized CompletableFuture hasLeadership(UUID leaderSessionId) { + public synchronized CompletableFuture hasLeadershipAsync(UUID leaderSessionId) { return CompletableFuture.completedFuture( hasLeadership() && leaderSessionId.equals(issuedLeaderSessionId)); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java index 59288ba37e9340..4b3a544001b7b8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java @@ -94,13 +94,13 @@ private enum NoOpLeaderElection implements LeaderElection { public void startLeaderElection(LeaderContender contender) {} @Override - public CompletableFuture confirmLeadership( + public CompletableFuture confirmLeadershipAsync( UUID leaderSessionID, String leaderAddress) { return FutureUtils.completedVoidFuture(); } @Override - public CompletableFuture hasLeadership(UUID leaderSessionId) { + public CompletableFuture hasLeadershipAsync(UUID leaderSessionId) { return CompletableFuture.completedFuture(false); }