From 6f75d6e713b95ce3177bf938ca06e3167f15b73f Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Thu, 28 Nov 2024 15:01:45 +0100 Subject: [PATCH] [FLINK-36451][runtime] Makes hasLeadership and confirmLeadership work asynchronously A test case is added that illustrates the concurrent lock acquisition problem in the DefaultLeaderElectionService --- .../runner/DefaultDispatcherRunner.java | 4 +- .../nonha/embedded/EmbeddedLeaderService.java | 18 +- .../JobMasterServiceLeadershipRunner.java | 68 ++++--- .../leaderelection/DefaultLeaderElection.java | 15 +- .../DefaultLeaderElectionService.java | 110 ++++++----- .../leaderelection/LeaderElection.java | 5 +- .../leaderelection/LeaderElectionService.java | 8 +- .../StandaloneLeaderElection.java | 12 +- .../ResourceManagerServiceImpl.java | 2 +- .../webmonitor/WebMonitorEndpoint.java | 2 +- .../runner/DefaultDispatcherRunnerTest.java | 2 +- .../embedded/EmbeddedHaServicesTest.java | 14 +- .../DefaultLeaderElectionServiceTest.java | 174 +++++++++++++----- .../DefaultLeaderElectionTest.java | 49 +++-- .../leaderelection/LeaderElectionTest.java | 22 ++- .../StandaloneLeaderElectionTest.java | 21 ++- .../leaderelection/TestingContender.java | 9 +- .../TestingGenericLeaderContender.java | 22 ++- .../leaderelection/TestingLeaderElection.java | 11 +- .../DocumentingDispatcherRestEndpoint.java | 10 +- 20 files changed, 385 insertions(+), 193 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java index ce6579a67b956..767fa5efde7c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunner.java @@ -176,9 +176,9 @@ private void forwardConfirmLeaderSessionFuture( FutureUtils.assertNoException( newDispatcherLeaderProcess .getLeaderAddressFuture() - .thenAccept( + .thenCompose( leaderAddress -> - leaderElection.confirmLeadership( + leaderElection.confirmLeadershipAsync( leaderSessionID, leaderAddress))); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java index 189189c2bc286..68e08f4e36cc0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedLeaderService.java @@ -244,14 +244,14 @@ private void removeContender(EmbeddedLeaderElection embeddedLeaderElection) { } /** Callback from leader contenders when they confirm a leader grant. */ - private void confirmLeader( + private CompletableFuture confirmLeader( final EmbeddedLeaderElection embeddedLeaderElection, final UUID leaderSessionId, final String leaderAddress) { synchronized (lock) { // if the leader election was shut down in the meantime, ignore this confirmation if (!embeddedLeaderElection.running || shutdown) { - return; + return FutureUtils.completedVoidFuture(); } try { @@ -269,7 +269,7 @@ private void confirmLeader( currentLeaderProposed = null; // notify all listeners - notifyAllListeners(leaderAddress, leaderSessionId); + return notifyAllListeners(leaderAddress, leaderSessionId); } else { LOG.debug( "Received confirmation of leadership for a stale leadership grant. Ignoring."); @@ -278,6 +278,8 @@ private void confirmLeader( fatalError(t); } } + + return FutureUtils.completedVoidFuture(); } private CompletableFuture notifyAllListeners(String address, UUID leaderSessionId) { @@ -465,15 +467,17 @@ public void close() { } @Override - public void confirmLeadership(UUID leaderSessionID, String leaderAddress) { + public CompletableFuture confirmLeadershipAsync( + UUID leaderSessionID, String leaderAddress) { checkNotNull(leaderSessionID); checkNotNull(leaderAddress); - confirmLeader(this, leaderSessionID, leaderAddress); + return confirmLeader(this, leaderSessionID, leaderAddress); } @Override - public boolean hasLeadership(UUID leaderSessionId) { - return isLeader && leaderSessionId.equals(currentLeaderSessionId); + public CompletableFuture hasLeadershipAsync(UUID leaderSessionId) { + return CompletableFuture.completedFuture( + isLeader && leaderSessionId.equals(currentLeaderSessionId)); } void shutdown(Exception cause) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java index 160830beaa456..bd0c5e53a69d1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java @@ -257,28 +257,34 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) { unused -> jobResultStore .hasJobResultEntryAsync(getJobID()) - .thenAccept( + .thenCompose( hasJobResult -> { if (hasJobResult) { - handleJobAlreadyDoneIfValidLeader( + return handleJobAlreadyDoneIfValidLeader( leaderSessionId); } else { - createNewJobMasterServiceProcessIfValidLeader( + return createNewJobMasterServiceProcessIfValidLeader( leaderSessionId); } })); handleAsyncOperationError(sequentialOperation, "Could not start the job manager."); } - private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) { - runIfValidLeader( + private CompletableFuture handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) { + return runIfValidLeader( leaderSessionId, () -> jobAlreadyDone(leaderSessionId), "check completed job"); } - private void createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId) { - runIfValidLeader( + private CompletableFuture createNewJobMasterServiceProcessIfValidLeader( + UUID leaderSessionId) { + return runIfValidLeader( leaderSessionId, () -> + // the heavy lifting of the JobMasterServiceProcess instantiation is still + // done asynchronously (see + // DefaultJobMasterServiceFactory#createJobMasterService executing the logic + // on the leaderOperation thread in the DefaultLeaderElectionService should + // be, therefore, fine ThrowingRunnable.unchecked( () -> createNewJobMasterServiceProcess(leaderSessionId)) .run(), @@ -336,15 +342,18 @@ private void createNewJobMasterServiceProcess(UUID leaderSessionId) { private void confirmLeadership( UUID leaderSessionId, CompletableFuture leaderAddressFuture) { FutureUtils.assertNoException( - leaderAddressFuture.thenAccept( + leaderAddressFuture.thenCompose( address -> - runIfStateRunning( - () -> { - LOG.debug("Confirm leadership {}.", leaderSessionId); - leaderElection.confirmLeadership( - leaderSessionId, address); - }, - "confirming leadership"))); + callIfRunning( + () -> { + LOG.debug( + "Confirm leadership {}.", + leaderSessionId); + return leaderElection.confirmLeadershipAsync( + leaderSessionId, address); + }, + "confirming leadership") + .orElse(FutureUtils.completedVoidFuture()))); } private void forwardResultFuture( @@ -478,20 +487,32 @@ private boolean isRunning() { return state == State.RUNNING; } - private void runIfValidLeader( + private CompletableFuture runIfValidLeader( UUID expectedLeaderId, Runnable action, Runnable noLeaderFallback) { synchronized (lock) { - if (isValidLeader(expectedLeaderId)) { - action.run(); + if (isRunning() && leaderElection != null) { + return leaderElection + .hasLeadershipAsync(expectedLeaderId) + .thenAccept( + hasLeadership -> { + synchronized (lock) { + if (isRunning() && hasLeadership) { + action.run(); + } else { + noLeaderFallback.run(); + } + } + }); } else { noLeaderFallback.run(); + return FutureUtils.completedVoidFuture(); } } } - private void runIfValidLeader( + private CompletableFuture runIfValidLeader( UUID expectedLeaderId, Runnable action, String noLeaderFallbackCommandDescription) { - runIfValidLeader( + return runIfValidLeader( expectedLeaderId, action, () -> @@ -499,13 +520,6 @@ private void runIfValidLeader( noLeaderFallbackCommandDescription, expectedLeaderId)); } - @GuardedBy("lock") - private boolean isValidLeader(UUID expectedLeaderId) { - return isRunning() - && leaderElection != null - && leaderElection.hasLeadership(expectedLeaderId); - } - private void forwardIfValidLeader( UUID expectedLeaderId, CompletableFuture source, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java index 06d6b62fb65f3..773a3fccec821 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElection.java @@ -21,6 +21,7 @@ import org.apache.flink.util.Preconditions; import java.util.UUID; +import java.util.concurrent.CompletableFuture; /** * {@code DefaultLeaderElection} implements the {@link LeaderElection} based on the {@link @@ -43,13 +44,14 @@ public void startLeaderElection(LeaderContender contender) throws Exception { } @Override - public void confirmLeadership(UUID leaderSessionID, String leaderAddress) { - parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress); + public CompletableFuture confirmLeadershipAsync( + UUID leaderSessionID, String leaderAddress) { + return parentService.confirmLeadershipAsync(componentId, leaderSessionID, leaderAddress); } @Override - public boolean hasLeadership(UUID leaderSessionId) { - return parentService.hasLeadership(componentId, leaderSessionId); + public CompletableFuture hasLeadershipAsync(UUID leaderSessionId) { + return parentService.hasLeadershipAsync(componentId, leaderSessionId); } @Override @@ -81,7 +83,7 @@ abstract static class ParentService { * the {@link LeaderContender} that is associated with the {@code componentId}. The * information is only propagated to the HA backend if the leadership is still acquired. */ - abstract void confirmLeadership( + abstract CompletableFuture confirmLeadershipAsync( String componentId, UUID leaderSessionID, String leaderAddress); /** @@ -91,6 +93,7 @@ abstract void confirmLeadership( * @return {@code true} if the service has leadership with the passed {@code * leaderSessionID} acquired; {@code false} otherwise. */ - abstract boolean hasLeadership(String componentId, UUID leaderSessionID); + abstract CompletableFuture hasLeadershipAsync( + String componentId, UUID leaderSessionID); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java index 83acee3aa1804..ebd1d70dc95f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionService.java @@ -313,7 +313,7 @@ public void close() throws Exception { } @Override - protected void confirmLeadership( + protected CompletableFuture confirmLeadershipAsync( String componentId, UUID leaderSessionID, String leaderAddress) { Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId)); LOG.debug( @@ -324,59 +324,73 @@ protected void confirmLeadership( checkNotNull(leaderSessionID); - synchronized (lock) { - if (hasLeadership(componentId, leaderSessionID)) { - Preconditions.checkState( - leaderElectionDriver != null, - "The leadership check should only return true if a driver is instantiated."); - Preconditions.checkState( - !confirmedLeaderInformation.hasLeaderInformation(componentId), - "No confirmation should have happened, yet."); - - final LeaderInformation newConfirmedLeaderInformation = - LeaderInformation.known(leaderSessionID, leaderAddress); - confirmedLeaderInformation = - LeaderInformationRegister.merge( - confirmedLeaderInformation, - componentId, - newConfirmedLeaderInformation); - leaderElectionDriver.publishLeaderInformation( - componentId, newConfirmedLeaderInformation); - } else { - if (!leaderSessionID.equals(this.issuedLeaderSessionID)) { - LOG.debug( - "Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).", - leaderSessionID, - componentId, - issuedLeaderSessionID); - } else { - LOG.warn( - "The leader session ID {} for component '{}' was confirmed even though the corresponding " - + "service was not elected as the leader or has been stopped already.", - componentId, - leaderSessionID); - } - } - } + return CompletableFuture.runAsync( + () -> { + synchronized (lock) { + if (hasLeadershipInternal(componentId, leaderSessionID)) { + Preconditions.checkState( + leaderElectionDriver != null, + "The leadership check should only return true if a driver is instantiated."); + Preconditions.checkState( + !confirmedLeaderInformation.hasLeaderInformation(componentId), + "No confirmation should have happened, yet."); + + final LeaderInformation newConfirmedLeaderInformation = + LeaderInformation.known(leaderSessionID, leaderAddress); + confirmedLeaderInformation = + LeaderInformationRegister.merge( + confirmedLeaderInformation, + componentId, + newConfirmedLeaderInformation); + leaderElectionDriver.publishLeaderInformation( + componentId, newConfirmedLeaderInformation); + } else { + if (!leaderSessionID.equals(this.issuedLeaderSessionID)) { + LOG.debug( + "Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).", + leaderSessionID, + componentId, + issuedLeaderSessionID); + } else { + LOG.warn( + "The leader session ID {} for component '{}' was confirmed even though the corresponding " + + "service was not elected as the leader or has been stopped already.", + componentId, + leaderSessionID); + } + } + } + }, + leadershipOperationExecutor); } @Override - protected boolean hasLeadership(String componentId, UUID leaderSessionId) { - synchronized (lock) { - if (leaderElectionDriver != null) { - if (leaderContenderRegistry.containsKey(componentId)) { - return leaderElectionDriver.hasLeadership() - && leaderSessionId.equals(issuedLeaderSessionID); - } else { - LOG.debug( - "hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.", - componentId); - return false; - } + protected CompletableFuture hasLeadershipAsync( + String componentId, UUID leaderSessionId) { + return CompletableFuture.supplyAsync( + () -> { + synchronized (lock) { + return hasLeadershipInternal(componentId, leaderSessionId); + } + }, + leadershipOperationExecutor); + } + + @GuardedBy("lock") + private boolean hasLeadershipInternal(String componentId, UUID leaderSessionId) { + if (leaderElectionDriver != null) { + if (leaderContenderRegistry.containsKey(componentId)) { + return leaderElectionDriver.hasLeadership() + && leaderSessionId.equals(issuedLeaderSessionID); } else { - LOG.debug("hasLeadership is called after the service is closed, returning false."); + LOG.debug( + "hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.", + componentId); return false; } + } else { + LOG.debug("hasLeadership is called after the service is closed, returning false."); + return false; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java index 0dc4ff5562197..090179e823500 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElection.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.leaderelection; import java.util.UUID; +import java.util.concurrent.CompletableFuture; /** * {@code LeaderElection} serves as a proxy between {@code LeaderElectionService} and {@link @@ -42,7 +43,7 @@ public interface LeaderElection extends AutoCloseable { * @param leaderSessionID The new leader session ID * @param leaderAddress The address of the new leader */ - void confirmLeadership(UUID leaderSessionID, String leaderAddress); + CompletableFuture confirmLeadershipAsync(UUID leaderSessionID, String leaderAddress); /** * Returns {@code true} if the service's {@link LeaderContender} has the leadership under the @@ -51,7 +52,7 @@ public interface LeaderElection extends AutoCloseable { * @param leaderSessionId identifying the current leader * @return true if the associated {@link LeaderContender} is the leader, otherwise false */ - boolean hasLeadership(UUID leaderSessionId); + CompletableFuture hasLeadershipAsync(UUID leaderSessionId); /** * Closes the {@code LeaderElection} by deregistering the {@link LeaderContender} from the diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java index 2a3e2a15c37b1..e1a3136a7070a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderElectionService.java @@ -28,10 +28,10 @@ * instantiate its own leader election service. * *

Once a contender has been granted leadership he has to confirm the received leader session ID - * by calling the method {@link LeaderElection#confirmLeadership(UUID, String)}. This will notify - * the leader election service, that the contender has accepted the leadership specified and that - * the leader session id as well as the leader address can now be published for leader retrieval - * services. + * by calling the method {@link LeaderElection#confirmLeadershipAsync(UUID, String)}. This will + * notify the leader election service, that the contender has accepted the leadership specified and + * that the leader session id as well as the leader address can now be published for leader + * retrieval services. */ public interface LeaderElectionService { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java index aa408a24563e2..9e5a47a997c99 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElection.java @@ -19,11 +19,13 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import java.util.UUID; +import java.util.concurrent.CompletableFuture; /** * {@code StandaloneLeaderElection} implements {@link LeaderElection} for non-HA cases. This @@ -57,12 +59,16 @@ public void startLeaderElection(LeaderContender contender) throws Exception { } @Override - public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {} + public CompletableFuture confirmLeadershipAsync( + UUID leaderSessionID, String leaderAddress) { + return FutureUtils.completedVoidFuture(); + } @Override - public boolean hasLeadership(UUID leaderSessionId) { + public CompletableFuture hasLeadershipAsync(UUID leaderSessionId) { synchronized (lock) { - return this.leaderContender != null && this.sessionID.equals(leaderSessionId); + return CompletableFuture.completedFuture( + this.leaderContender != null && this.sessionID.equals(leaderSessionId)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java index 73dd72b26275f..29eeba2cb5464 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServiceImpl.java @@ -266,7 +266,7 @@ private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Excep .thenAcceptAsync( (isStillLeader) -> { if (isStillLeader) { - leaderElection.confirmLeadership( + leaderElection.confirmLeadershipAsync( newLeaderSessionID, newLeaderResourceManager.getAddress()); } }, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 63b621614321e..25958416af735 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -1213,7 +1213,7 @@ public void grantLeadership(final UUID leaderSessionID) { "{} was granted leadership with leaderSessionID={}", getRestBaseUrl(), leaderSessionID); - leaderElection.confirmLeadership(leaderSessionID, getRestBaseUrl()); + leaderElection.confirmLeadershipAsync(leaderSessionID, getRestBaseUrl()); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java index d4ddb1daedb32..6eb00786795b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerTest.java @@ -273,7 +273,7 @@ public void grantLeadership_oldLeader_doesNotConfirmLeaderSession() throws Excep // complete the confirmation future after losing the leadership contenderConfirmationFuture.complete("leader address"); - assertThat(leaderElection.hasLeadership(leaderSessionId), is(false)); + assertThat(leaderElection.hasLeadershipAsync(leaderSessionId).get(), is(false)); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java index deefa2bf2daf2..c0990bfcba0cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesTest.java @@ -140,7 +140,7 @@ private void runLeaderRetrievalTest( final UUID leaderId = leaderContender.getLeaderSessionFuture().get(); - leaderElection.confirmLeadership(leaderId, ADDRESS); + leaderElection.confirmLeadershipAsync(leaderId, ADDRESS).get(); final LeaderInformation leaderInformation = leaderRetrievalListener.getLeaderInformationFuture().get(); @@ -172,20 +172,20 @@ public void testConcurrentLeadershipOperations() throws Exception { final UUID oldLeaderSessionId = leaderContender.getLeaderSessionFuture().get(); - assertThat(leaderElection.hasLeadership(oldLeaderSessionId), is(true)); + assertThat(leaderElection.hasLeadershipAsync(oldLeaderSessionId).get(), is(true)); embeddedHaServices.getDispatcherLeaderService().revokeLeadership().get(); - assertThat(leaderElection.hasLeadership(oldLeaderSessionId), is(false)); + assertThat(leaderElection.hasLeadershipAsync(oldLeaderSessionId).get(), is(false)); embeddedHaServices.getDispatcherLeaderService().grantLeadership(); final UUID newLeaderSessionId = leaderContender.getLeaderSessionFuture().get(); - assertThat(leaderElection.hasLeadership(newLeaderSessionId), is(true)); + assertThat(leaderElection.hasLeadershipAsync(newLeaderSessionId).get(), is(true)); - leaderElection.confirmLeadership(oldLeaderSessionId, ADDRESS); - leaderElection.confirmLeadership(newLeaderSessionId, ADDRESS); + leaderElection.confirmLeadershipAsync(oldLeaderSessionId, ADDRESS).get(); + leaderElection.confirmLeadershipAsync(newLeaderSessionId, ADDRESS).get(); - assertThat(leaderElection.hasLeadership(newLeaderSessionId), is(true)); + assertThat(leaderElection.hasLeadershipAsync(newLeaderSessionId).get(), is(true)); leaderContender.tryRethrowException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java index de9a31d2b2490..2b0719489924e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionServiceTest.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.leaderelection; -import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension; @@ -34,6 +33,7 @@ import java.util.Collections; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -41,6 +41,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -218,7 +219,7 @@ void testCloseGrantDeadlock() throws Exception { closeThread.join(); grantThread.join(); - FlinkAssertions.assertThatFuture(driverCloseTriggered).eventuallySucceeds(); + assertThatFuture(driverCloseTriggered).eventuallySucceeds(); } } @@ -702,7 +703,7 @@ void testAllLeaderInformationChangeEventWithUnknownComponentId() throws Exceptio } @Test - void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception { + void testHasLeadershipAsyncWithLeadershipButNoGrantEventProcessed() throws Exception { new Context() { { runTestWithManuallyTriggeredEvents( @@ -712,14 +713,20 @@ void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception applyToBothContenderContexts( ctx -> { - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, expectedSessionID)) - .isFalse(); - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, UUID.randomUUID())) - .isFalse(); + final CompletableFuture validSessionFuture = + leaderElectionService.hasLeadershipAsync( + ctx.componentId, expectedSessionID); + final CompletableFuture invalidSessionFuture = + leaderElectionService.hasLeadershipAsync( + ctx.componentId, UUID.randomUUID()); + executorService.triggerAll(); + + assertThatFuture(validSessionFuture) + .eventuallySucceeds() + .isEqualTo(true); + assertThatFuture(invalidSessionFuture) + .eventuallySucceeds() + .isEqualTo(false); }); }); } @@ -727,7 +734,7 @@ void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception } @Test - void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception { + void testHasLeadershipAsyncWithLeadershipAndGrantEventProcessed() throws Exception { new Context() { { runTestWithManuallyTriggeredEvents( @@ -745,14 +752,20 @@ void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception { assertThat(ctx.contender.getLeaderSessionID()) .isEqualTo(expectedSessionID); - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, expectedSessionID)) - .isTrue(); - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, UUID.randomUUID())) - .isFalse(); + final CompletableFuture validSessionFuture = + leaderElectionService.hasLeadershipAsync( + ctx.componentId, expectedSessionID); + final CompletableFuture invalidSessionFuture = + leaderElectionService.hasLeadershipAsync( + ctx.componentId, UUID.randomUUID()); + executorService.triggerAll(); + + assertThatFuture(validSessionFuture) + .eventuallySucceeds() + .isEqualTo(true); + assertThatFuture(invalidSessionFuture) + .eventuallySucceeds() + .isEqualTo(false); }); }); } @@ -760,7 +773,7 @@ void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception { } @Test - void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Exception { + void testHasLeadershipAsyncWithLeadershipLostButNoRevokeEventProcessed() throws Exception { new Context() { { runTestWithManuallyTriggeredEvents( @@ -773,20 +786,27 @@ void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Excep applyToBothContenderContexts( ctx -> { - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, expectedSessionID)) + final CompletableFuture validSessionFuture = + leaderElectionService.hasLeadershipAsync( + ctx.componentId, expectedSessionID); + final CompletableFuture invalidSessionFuture = + leaderElectionService.hasLeadershipAsync( + ctx.componentId, UUID.randomUUID()); + + executorService.triggerAll(); + + assertThatFuture(validSessionFuture) + .eventuallySucceeds() .as( "No operation should be handled anymore after the HA backend " + "indicated leadership loss even if the onRevokeLeadership wasn't " + "processed, yet, because some other process could have picked up " + "the leadership in the meantime already based on the HA " + "backend's decision.") - .isFalse(); - assertThat( - leaderElectionService.hasLeadership( - ctx.componentId, UUID.randomUUID())) - .isFalse(); + .isEqualTo(false); + assertThatFuture(invalidSessionFuture) + .eventuallySucceeds() + .isEqualTo(false); }); }); } @@ -794,7 +814,7 @@ void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Excep } @Test - void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Exception { + void testHasLeadershipAsyncWithLeadershipLostAndRevokeEventProcessed() throws Exception { new Context() { { runTestWithSynchronousEventHandling( @@ -805,14 +825,16 @@ void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Excepti applyToBothContenderContexts( ctx -> { - assertThat( - leaderElectionService.hasLeadership( + assertThatFuture( + leaderElectionService.hasLeadershipAsync( ctx.componentId, expectedSessionID)) - .isFalse(); - assertThat( - leaderElectionService.hasLeadership( + .eventuallySucceeds() + .isEqualTo(false); + assertThatFuture( + leaderElectionService.hasLeadershipAsync( ctx.componentId, UUID.randomUUID())) - .isFalse(); + .eventuallySucceeds() + .isEqualTo(false); }); }); } @@ -820,7 +842,7 @@ void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Excepti } @Test - void testHasLeadershipAfterLeaderElectionClose() throws Exception { + void testHasLeadershipAsyncAfterLeaderElectionClose() throws Exception { new Context() { { runTestWithSynchronousEventHandling( @@ -832,10 +854,11 @@ void testHasLeadershipAfterLeaderElectionClose() throws Exception { ctx -> { ctx.leaderElection.close(); - assertThat( - leaderElectionService.hasLeadership( + assertThatFuture( + leaderElectionService.hasLeadershipAsync( ctx.componentId, expectedSessionID)) - .isFalse(); + .eventuallySucceeds() + .isEqualTo(false); }); }); } @@ -1038,7 +1061,7 @@ void testOldConfirmLeaderInformationWhileHavingNewLeadership() throws Exception .hasValue(expectedLeaderInformation); // Old confirm call should be ignored. - ctx.leaderElection.confirmLeadership( + ctx.leaderElection.confirmLeadershipAsync( UUID.randomUUID(), ctx.address); assertThat( leaderElectionService.getLeaderSessionID( @@ -1069,7 +1092,7 @@ void testOldConfirmationWhileHavingLeadershipLost() throws Exception { applyToBothContenderContexts( ctx -> { // Old confirm call should be ignored. - ctx.leaderElection.confirmLeadership( + ctx.leaderElection.confirmLeadershipAsync( currentLeaderSessionId, ctx.address); assertThat( @@ -1246,6 +1269,73 @@ private void testNonBlockingCall( testInstance.close(); } + /** + * This test is used to verify FLINK-36451 where we observed concurrent nested locks being + * acquired from the {@link LeaderContender} and from the {@link DefaultLeaderElectionService}. + */ + @Test + void testNestedDeadlockInLeadershipConfirmation() throws Exception { + final AtomicReference leaderInformationStorage = + new AtomicReference<>(LeaderInformationRegister.empty()); + try (final DefaultLeaderElectionService testInstance = + new DefaultLeaderElectionService( + TestingLeaderElectionDriver.newBuilder( + new AtomicBoolean(false), + leaderInformationStorage, + new AtomicBoolean(false)) + ::build)) { + final String componentId = "test-component"; + final LeaderElection leaderElection = testInstance.createLeaderElection(componentId); + + // we need the lock to be acquired once for the leadership grant and once for the + // revocation + final CountDownLatch contenderLockAcquireLatch = new CountDownLatch(2); + final OneShotLatch grantReceivedLatch = new OneShotLatch(); + + final AtomicBoolean contenderLeadership = new AtomicBoolean(false); + final TestingGenericLeaderContender leaderContender = + TestingGenericLeaderContender.newBuilder() + .setPreLockAcquireAction(contenderLockAcquireLatch::countDown) + .setGrantLeadershipConsumer( + ignoredSessionId -> { + contenderLeadership.set(true); + grantReceivedLatch.trigger(); + }) + .setRevokeLeadershipRunnable(() -> contenderLeadership.set(false)) + .build(); + + leaderElection.startLeaderElection(leaderContender); + + final UUID leaderSessionId = UUID.randomUUID(); + testInstance.onGrantLeadership(leaderSessionId); + grantReceivedLatch.await(); + + final CompletableFuture revocationFuture; + final CompletableFuture confirmLeadershipFuture; + synchronized (leaderContender.getLock()) { + revocationFuture = CompletableFuture.runAsync(testInstance::onRevokeLeadership); + contenderLockAcquireLatch.await(); + confirmLeadershipFuture = + leaderElection.confirmLeadershipAsync(leaderSessionId, "random-address"); + } + + assertThatFuture(revocationFuture).eventuallySucceeds(); + assertThatFuture(confirmLeadershipFuture).eventuallySucceeds(); + + assertThat(contenderLeadership).isFalse(); + assertThat(leaderInformationStorage.get().forComponentId(componentId).isEmpty()) + .as( + "The LeaderInformation is empty because the leadership confirmation succeeded the " + + "leadership revocation which resulted in no leader information being written out to " + + "the HA backend.") + .isTrue(); + + // not closing the LeaderElection instance would leave the DefaultLeaderElectionService + // in an inconsistent state causing an error when closing the service + leaderElection.close(); + } + } + private static String createRandomComponentId() { return String.format("component-id-%s", UUID.randomUUID()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java index 575f01396ae3a..11bf45dbfd572 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/DefaultLeaderElectionTest.java @@ -18,8 +18,9 @@ package org.apache.flink.runtime.leaderelection; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.BiConsumerWithException; -import org.apache.flink.util.function.TriConsumer; +import org.apache.flink.util.function.TriFunction; import org.junit.jupiter.api.Test; @@ -104,6 +105,8 @@ void testLeaderConfirmation() throws Exception { componentIdRef.set(componentId); leaderSessionIDRef.set(leaderSessionID); leaderAddressRef.set(address); + + return FutureUtils.completedVoidFuture(); }) .build(); try (final DefaultLeaderElection testInstance = @@ -111,7 +114,7 @@ void testLeaderConfirmation() throws Exception { final UUID expectedLeaderSessionID = UUID.randomUUID(); final String expectedAddress = "random-address"; - testInstance.confirmLeadership(expectedLeaderSessionID, expectedAddress); + testInstance.confirmLeadershipAsync(expectedLeaderSessionID, expectedAddress); assertThat(componentIdRef).hasValue(DEFAULT_TEST_COMPONENT_ID); assertThat(leaderSessionIDRef).hasValue(expectedLeaderSessionID); @@ -155,16 +158,16 @@ void testCloseWithoutStart() throws Exception { } @Test - void testHasLeadershipTrue() throws Exception { - testHasLeadership(true); + void testHasLeadershipAsyncTrue() throws Exception { + testHasLeadershipAsync(true); } @Test - void testHasLeadershipFalse() throws Exception { - testHasLeadership(false); + void testHasLeadershipAsyncFalse() throws Exception { + testHasLeadershipAsync(false); } - private void testHasLeadership(boolean expectedReturnValue) throws Exception { + private void testHasLeadershipAsync(boolean expectedReturnValue) throws Exception { final AtomicReference componentIdRef = new AtomicReference<>(); final AtomicReference leaderSessionIDRef = new AtomicReference<>(); final DefaultLeaderElection.ParentService parentService = @@ -173,14 +176,15 @@ private void testHasLeadership(boolean expectedReturnValue) throws Exception { (actualComponentId, actualLeaderSessionID) -> { componentIdRef.set(actualComponentId); leaderSessionIDRef.set(actualLeaderSessionID); - return expectedReturnValue; + return CompletableFuture.completedFuture(expectedReturnValue); }) .build(); try (final DefaultLeaderElection testInstance = new DefaultLeaderElection(parentService, DEFAULT_TEST_COMPONENT_ID)) { final UUID expectedLeaderSessionID = UUID.randomUUID(); - assertThat(testInstance.hasLeadership(expectedLeaderSessionID)) + assertThatFuture(testInstance.hasLeadershipAsync(expectedLeaderSessionID)) + .eventuallySucceeds() .isEqualTo(expectedReturnValue); assertThat(componentIdRef).hasValue(DEFAULT_TEST_COMPONENT_ID); assertThat(leaderSessionIDRef).hasValue(expectedLeaderSessionID); @@ -192,14 +196,16 @@ private static class TestingAbstractLeaderElectionService private final BiConsumerWithException registerConsumer; private final Consumer removeConsumer; - private final TriConsumer confirmLeadershipConsumer; - private final BiFunction hasLeadershipFunction; + private final TriFunction> + confirmLeadershipConsumer; + private final BiFunction> hasLeadershipFunction; private TestingAbstractLeaderElectionService( BiConsumerWithException registerConsumer, Consumer removeConsumer, - TriConsumer confirmLeadershipConsumer, - BiFunction hasLeadershipFunction) { + TriFunction> + confirmLeadershipConsumer, + BiFunction> hasLeadershipFunction) { super(); this.registerConsumer = registerConsumer; @@ -219,13 +225,14 @@ protected void remove(String componentId) { } @Override - protected void confirmLeadership( + protected CompletableFuture confirmLeadershipAsync( String componentId, UUID leaderSessionID, String leaderAddress) { - confirmLeadershipConsumer.accept(componentId, leaderSessionID, leaderAddress); + return confirmLeadershipConsumer.apply(componentId, leaderSessionID, leaderAddress); } @Override - protected boolean hasLeadership(String componentId, UUID leaderSessionId) { + protected CompletableFuture hasLeadershipAsync( + String componentId, UUID leaderSessionId) { return hasLeadershipFunction.apply(componentId, leaderSessionId); } @@ -252,8 +259,9 @@ private static class Builder { private BiConsumerWithException registerConsumer; private Consumer removeConsumer; - private TriConsumer confirmLeadershipConsumer; - private BiFunction hasLeadershipFunction; + private TriFunction> + confirmLeadershipConsumer; + private BiFunction> hasLeadershipFunction; private Builder() {} @@ -269,13 +277,14 @@ public Builder setRemoveConsumer(Consumer removeConsumer) { } public Builder setConfirmLeadershipConsumer( - TriConsumer confirmLeadershipConsumer) { + TriFunction> + confirmLeadershipConsumer) { this.confirmLeadershipConsumer = confirmLeadershipConsumer; return this; } public Builder setHasLeadershipFunction( - BiFunction hasLeadershipFunction) { + BiFunction> hasLeadershipFunction) { this.hasLeadershipFunction = hasLeadershipFunction; return this; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java index 92c4720878df3..b11c639b7101d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderElectionTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.assertj.core.api.Assertions.assertThat; /** Tests for leader election. */ @@ -80,7 +81,7 @@ void teardown() throws Exception { } @TestTemplate - void testHasLeadership() throws Exception { + void testHasLeadershipAsync() throws Exception { final ManualLeaderContender manualLeaderContender = new ManualLeaderContender(); try { @@ -89,16 +90,25 @@ void testHasLeadership() throws Exception { final UUID leaderSessionId = manualLeaderContender.waitForLeaderSessionId(); - assertThat(leaderElection.hasLeadership(leaderSessionId)).isTrue(); - assertThat(leaderElection.hasLeadership(UUID.randomUUID())).isFalse(); + assertThatFuture(leaderElection.hasLeadershipAsync(leaderSessionId)) + .eventuallySucceeds() + .isEqualTo(true); + assertThatFuture(leaderElection.hasLeadershipAsync(UUID.randomUUID())) + .eventuallySucceeds() + .isEqualTo(false); - leaderElection.confirmLeadership(leaderSessionId, "foobar"); + assertThatFuture(leaderElection.confirmLeadershipAsync(leaderSessionId, "foobar")) + .eventuallySucceeds(); - assertThat(leaderElection.hasLeadership(leaderSessionId)).isTrue(); + assertThatFuture(leaderElection.hasLeadershipAsync(leaderSessionId)) + .eventuallySucceeds() + .isEqualTo(true); leaderElection.close(); - assertThat(leaderElection.hasLeadership(leaderSessionId)).isFalse(); + assertThatFuture(leaderElection.hasLeadershipAsync(leaderSessionId)) + .eventuallySucceeds() + .isEqualTo(false); assertThat(manualLeaderContender.waitForLeaderSessionId()) .as("The leadership has been revoked from the contender.") diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java index de8e782254940..2869f1ca031d3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/StandaloneLeaderElectionTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; import static org.assertj.core.api.Assertions.assertThat; class StandaloneLeaderElectionTest { @@ -79,26 +80,34 @@ void testStartLeaderElection() throws Exception { } @Test - void testHasLeadershipWithContender() throws Exception { + void testHasLeadershipAsyncWithContender() throws Exception { final TestingGenericLeaderContender contender = TestingGenericLeaderContender.newBuilder().build(); try (final LeaderElection testInstance = new StandaloneLeaderElection(SESSION_ID)) { testInstance.startLeaderElection(contender); - assertThat(testInstance.hasLeadership(SESSION_ID)).isTrue(); + assertThatFuture(testInstance.hasLeadershipAsync(SESSION_ID)) + .eventuallySucceeds() + .isEqualTo(true); final UUID differentSessionID = UUID.randomUUID(); - assertThat(testInstance.hasLeadership(differentSessionID)).isFalse(); + assertThatFuture(testInstance.hasLeadershipAsync(differentSessionID)) + .eventuallySucceeds() + .isEqualTo(false); } } @Test - void testHasLeadershipWithoutContender() throws Exception { + void testHasLeadershipAsyncWithoutContender() throws Exception { try (final LeaderElection testInstance = new StandaloneLeaderElection(SESSION_ID)) { - assertThat(testInstance.hasLeadership(SESSION_ID)).isFalse(); + assertThatFuture(testInstance.hasLeadershipAsync(SESSION_ID)) + .eventuallySucceeds() + .isEqualTo(false); final UUID differentSessionID = UUID.randomUUID(); - assertThat(testInstance.hasLeadership(differentSessionID)).isFalse(); + assertThatFuture(testInstance.hasLeadershipAsync(differentSessionID)) + .eventuallySucceeds() + .isEqualTo(false); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java index d566289c711a0..95437d0c42b1e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingContender.java @@ -50,9 +50,12 @@ public void grantLeadership(UUID leaderSessionID) { this.leaderSessionID = leaderSessionID; - leaderElection.confirmLeadership(leaderSessionID, address); - - leaderEventQueue.offer(LeaderInformation.known(leaderSessionID, address)); + leaderElection + .confirmLeadershipAsync(leaderSessionID, address) + .thenRun( + () -> + leaderEventQueue.offer( + LeaderInformation.known(leaderSessionID, address))); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java index a5b43cf8bb218..ae460363ee7e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingGenericLeaderContender.java @@ -29,21 +29,30 @@ public class TestingGenericLeaderContender implements LeaderContender { private final Object lock = new Object(); + private final Runnable preLockAcquireAction; + private final Consumer grantLeadershipConsumer; private final Runnable revokeLeadershipRunnable; private final Consumer handleErrorConsumer; private TestingGenericLeaderContender( + Runnable preLockAcquireAction, Consumer grantLeadershipConsumer, Runnable revokeLeadershipRunnable, Consumer handleErrorConsumer) { + this.preLockAcquireAction = preLockAcquireAction; this.grantLeadershipConsumer = grantLeadershipConsumer; this.revokeLeadershipRunnable = revokeLeadershipRunnable; this.handleErrorConsumer = handleErrorConsumer; } + public Object getLock() { + return lock; + } + @Override public void grantLeadership(UUID leaderSessionID) { + preLockAcquireAction.run(); synchronized (lock) { grantLeadershipConsumer.accept(leaderSessionID); } @@ -51,6 +60,7 @@ public void grantLeadership(UUID leaderSessionID) { @Override public void revokeLeadership() { + preLockAcquireAction.run(); synchronized (lock) { revokeLeadershipRunnable.run(); } @@ -58,6 +68,7 @@ public void revokeLeadership() { @Override public void handleError(Exception exception) { + preLockAcquireAction.run(); synchronized (lock) { handleErrorConsumer.accept(exception); } @@ -69,6 +80,7 @@ public static Builder newBuilder() { /** {@code Builder} for creating {@code TestingGenericLeaderContender} instances. */ public static class Builder { + private Runnable preLockAcquireAction = () -> {}; private Consumer grantLeadershipConsumer = ignoredSessionID -> {}; private Runnable revokeLeadershipRunnable = () -> {}; private Consumer handleErrorConsumer = @@ -93,9 +105,17 @@ public Builder setHandleErrorConsumer(Consumer handleErrorConsumer) { return this; } + public Builder setPreLockAcquireAction(Runnable preLockAcquireAction) { + this.preLockAcquireAction = preLockAcquireAction; + return this; + } + public TestingGenericLeaderContender build() { return new TestingGenericLeaderContender( - grantLeadershipConsumer, revokeLeadershipRunnable, handleErrorConsumer); + preLockAcquireAction, + grantLeadershipConsumer, + revokeLeadershipRunnable, + handleErrorConsumer); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java index cd59be897327f..a50a2e6d1e8ed 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/TestingLeaderElection.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; import javax.annotation.Nullable; @@ -65,17 +66,21 @@ public synchronized void startLeaderElection(LeaderContender contender) throws E } @Override - public synchronized void confirmLeadership(UUID leaderSessionID, String leaderAddress) { + public synchronized CompletableFuture confirmLeadershipAsync( + UUID leaderSessionID, String leaderAddress) { if (leaderSessionID.equals(this.issuedLeaderSessionId) && confirmationFuture != null && !confirmationFuture.isDone()) { confirmationFuture.complete(LeaderInformation.known(leaderSessionID, leaderAddress)); } + + return FutureUtils.completedVoidFuture(); } @Override - public synchronized boolean hasLeadership(UUID leaderSessionId) { - return hasLeadership() && leaderSessionId.equals(issuedLeaderSessionId); + public synchronized CompletableFuture hasLeadershipAsync(UUID leaderSessionId) { + return CompletableFuture.completedFuture( + hasLeadership() && leaderSessionId.equals(issuedLeaderSessionId)); } private boolean hasLeadership() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java index c42a2a9b4047a..4b3a544001b7b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/util/DocumentingDispatcherRestEndpoint.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.ConfigurationException; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; @@ -93,11 +94,14 @@ private enum NoOpLeaderElection implements LeaderElection { public void startLeaderElection(LeaderContender contender) {} @Override - public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {} + public CompletableFuture confirmLeadershipAsync( + UUID leaderSessionID, String leaderAddress) { + return FutureUtils.completedVoidFuture(); + } @Override - public boolean hasLeadership(UUID leaderSessionId) { - return false; + public CompletableFuture hasLeadershipAsync(UUID leaderSessionId) { + return CompletableFuture.completedFuture(false); } @Override