Skip to content

Commit

Permalink
[FLINK-36451][runtime] Makes hasLeadership and confirmLeadership work…
Browse files Browse the repository at this point in the history
… asynchronously

A test case is added that illustrates the concurrent lock acquisition problem in the DefaultLeaderElectionService
  • Loading branch information
XComp committed Dec 9, 2024
1 parent 4bdc82d commit 0dbf027
Show file tree
Hide file tree
Showing 20 changed files with 385 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ private void forwardConfirmLeaderSessionFuture(
FutureUtils.assertNoException(
newDispatcherLeaderProcess
.getLeaderAddressFuture()
.thenAccept(
.thenCompose(
leaderAddress ->
leaderElection.confirmLeadership(
leaderElection.confirmLeadershipAsync(
leaderSessionID, leaderAddress)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,14 @@ private void removeContender(EmbeddedLeaderElection embeddedLeaderElection) {
}

/** Callback from leader contenders when they confirm a leader grant. */
private void confirmLeader(
private CompletableFuture<Void> confirmLeader(
final EmbeddedLeaderElection embeddedLeaderElection,
final UUID leaderSessionId,
final String leaderAddress) {
synchronized (lock) {
// if the leader election was shut down in the meantime, ignore this confirmation
if (!embeddedLeaderElection.running || shutdown) {
return;
return FutureUtils.completedVoidFuture();
}

try {
Expand All @@ -269,7 +269,7 @@ private void confirmLeader(
currentLeaderProposed = null;

// notify all listeners
notifyAllListeners(leaderAddress, leaderSessionId);
return notifyAllListeners(leaderAddress, leaderSessionId);
} else {
LOG.debug(
"Received confirmation of leadership for a stale leadership grant. Ignoring.");
Expand All @@ -278,6 +278,8 @@ private void confirmLeader(
fatalError(t);
}
}

return FutureUtils.completedVoidFuture();
}

private CompletableFuture<Void> notifyAllListeners(String address, UUID leaderSessionId) {
Expand Down Expand Up @@ -465,15 +467,17 @@ public void close() {
}

@Override
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
public CompletableFuture<Void> confirmLeadershipAsync(
UUID leaderSessionID, String leaderAddress) {
checkNotNull(leaderSessionID);
checkNotNull(leaderAddress);
confirmLeader(this, leaderSessionID, leaderAddress);
return confirmLeader(this, leaderSessionID, leaderAddress);
}

@Override
public boolean hasLeadership(UUID leaderSessionId) {
return isLeader && leaderSessionId.equals(currentLeaderSessionId);
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
return CompletableFuture.completedFuture(
isLeader && leaderSessionId.equals(currentLeaderSessionId));
}

void shutdown(Exception cause) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,28 +257,34 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
unused ->
jobResultStore
.hasJobResultEntryAsync(getJobID())
.thenAccept(
.thenCompose(
hasJobResult -> {
if (hasJobResult) {
handleJobAlreadyDoneIfValidLeader(
return handleJobAlreadyDoneIfValidLeader(
leaderSessionId);
} else {
createNewJobMasterServiceProcessIfValidLeader(
return createNewJobMasterServiceProcessIfValidLeader(
leaderSessionId);
}
}));
handleAsyncOperationError(sequentialOperation, "Could not start the job manager.");
}

private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
runIfValidLeader(
private CompletableFuture<Void> handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
return runIfValidLeader(
leaderSessionId, () -> jobAlreadyDone(leaderSessionId), "check completed job");
}

private void createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId) {
runIfValidLeader(
private CompletableFuture<Void> createNewJobMasterServiceProcessIfValidLeader(
UUID leaderSessionId) {
return runIfValidLeader(
leaderSessionId,
() ->
// the heavy lifting of the JobMasterServiceProcess instantiation is still
// done asynchronously (see
// DefaultJobMasterServiceFactory#createJobMasterService executing the logic
// on the leaderOperation thread in the DefaultLeaderElectionService should
// be, therefore, fine
ThrowingRunnable.unchecked(
() -> createNewJobMasterServiceProcess(leaderSessionId))
.run(),
Expand Down Expand Up @@ -336,15 +342,18 @@ private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws Flink
private void confirmLeadership(
UUID leaderSessionId, CompletableFuture<String> leaderAddressFuture) {
FutureUtils.assertNoException(
leaderAddressFuture.thenAccept(
leaderAddressFuture.thenCompose(
address ->
runIfStateRunning(
() -> {
LOG.debug("Confirm leadership {}.", leaderSessionId);
leaderElection.confirmLeadership(
leaderSessionId, address);
},
"confirming leadership")));
callIfRunning(
() -> {
LOG.debug(
"Confirm leadership {}.",
leaderSessionId);
return leaderElection.confirmLeadershipAsync(
leaderSessionId, address);
},
"confirming leadership")
.orElse(FutureUtils.completedVoidFuture())));
}

private void forwardResultFuture(
Expand Down Expand Up @@ -478,34 +487,39 @@ private boolean isRunning() {
return state == State.RUNNING;
}

private void runIfValidLeader(
private CompletableFuture<Void> runIfValidLeader(
UUID expectedLeaderId, Runnable action, Runnable noLeaderFallback) {
synchronized (lock) {
if (isValidLeader(expectedLeaderId)) {
action.run();
if (isRunning() && leaderElection != null) {
return leaderElection
.hasLeadershipAsync(expectedLeaderId)
.thenAccept(
hasLeadership -> {
synchronized (lock) {
if (isRunning() && hasLeadership) {
action.run();
} else {
noLeaderFallback.run();
}
}
});
} else {
noLeaderFallback.run();
return FutureUtils.completedVoidFuture();
}
}
}

private void runIfValidLeader(
private CompletableFuture<Void> runIfValidLeader(
UUID expectedLeaderId, Runnable action, String noLeaderFallbackCommandDescription) {
runIfValidLeader(
return runIfValidLeader(
expectedLeaderId,
action,
() ->
printLogIfNotValidLeader(
noLeaderFallbackCommandDescription, expectedLeaderId));
}

@GuardedBy("lock")
private boolean isValidLeader(UUID expectedLeaderId) {
return isRunning()
&& leaderElection != null
&& leaderElection.hasLeadership(expectedLeaderId);
}

private <T> void forwardIfValidLeader(
UUID expectedLeaderId,
CompletableFuture<? extends T> source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.util.Preconditions;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/**
* {@code DefaultLeaderElection} implements the {@link LeaderElection} based on the {@link
Expand All @@ -43,13 +44,14 @@ public void startLeaderElection(LeaderContender contender) throws Exception {
}

@Override
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress);
public CompletableFuture<Void> confirmLeadershipAsync(
UUID leaderSessionID, String leaderAddress) {
return parentService.confirmLeadershipAsync(componentId, leaderSessionID, leaderAddress);
}

@Override
public boolean hasLeadership(UUID leaderSessionId) {
return parentService.hasLeadership(componentId, leaderSessionId);
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
return parentService.hasLeadershipAsync(componentId, leaderSessionId);
}

@Override
Expand Down Expand Up @@ -81,7 +83,7 @@ abstract static class ParentService {
* the {@link LeaderContender} that is associated with the {@code componentId}. The
* information is only propagated to the HA backend if the leadership is still acquired.
*/
abstract void confirmLeadership(
abstract CompletableFuture<Void> confirmLeadershipAsync(
String componentId, UUID leaderSessionID, String leaderAddress);

/**
Expand All @@ -91,6 +93,7 @@ abstract void confirmLeadership(
* @return {@code true} if the service has leadership with the passed {@code
* leaderSessionID} acquired; {@code false} otherwise.
*/
abstract boolean hasLeadership(String componentId, UUID leaderSessionID);
abstract CompletableFuture<Boolean> hasLeadershipAsync(
String componentId, UUID leaderSessionID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public void close() throws Exception {
}

@Override
protected void confirmLeadership(
protected CompletableFuture<Void> confirmLeadershipAsync(
String componentId, UUID leaderSessionID, String leaderAddress) {
Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId));
LOG.debug(
Expand All @@ -324,59 +324,73 @@ protected void confirmLeadership(

checkNotNull(leaderSessionID);

synchronized (lock) {
if (hasLeadership(componentId, leaderSessionID)) {
Preconditions.checkState(
leaderElectionDriver != null,
"The leadership check should only return true if a driver is instantiated.");
Preconditions.checkState(
!confirmedLeaderInformation.hasLeaderInformation(componentId),
"No confirmation should have happened, yet.");

final LeaderInformation newConfirmedLeaderInformation =
LeaderInformation.known(leaderSessionID, leaderAddress);
confirmedLeaderInformation =
LeaderInformationRegister.merge(
confirmedLeaderInformation,
componentId,
newConfirmedLeaderInformation);
leaderElectionDriver.publishLeaderInformation(
componentId, newConfirmedLeaderInformation);
} else {
if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
LOG.debug(
"Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).",
leaderSessionID,
componentId,
issuedLeaderSessionID);
} else {
LOG.warn(
"The leader session ID {} for component '{}' was confirmed even though the corresponding "
+ "service was not elected as the leader or has been stopped already.",
componentId,
leaderSessionID);
}
}
}
return CompletableFuture.runAsync(
() -> {
synchronized (lock) {
if (hasLeadershipInternal(componentId, leaderSessionID)) {
Preconditions.checkState(
leaderElectionDriver != null,
"The leadership check should only return true if a driver is instantiated.");
Preconditions.checkState(
!confirmedLeaderInformation.hasLeaderInformation(componentId),
"No confirmation should have happened, yet.");

final LeaderInformation newConfirmedLeaderInformation =
LeaderInformation.known(leaderSessionID, leaderAddress);
confirmedLeaderInformation =
LeaderInformationRegister.merge(
confirmedLeaderInformation,
componentId,
newConfirmedLeaderInformation);
leaderElectionDriver.publishLeaderInformation(
componentId, newConfirmedLeaderInformation);
} else {
if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
LOG.debug(
"Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).",
leaderSessionID,
componentId,
issuedLeaderSessionID);
} else {
LOG.warn(
"The leader session ID {} for component '{}' was confirmed even though the corresponding "
+ "service was not elected as the leader or has been stopped already.",
componentId,
leaderSessionID);
}
}
}
},
leadershipOperationExecutor);
}

@Override
protected boolean hasLeadership(String componentId, UUID leaderSessionId) {
synchronized (lock) {
if (leaderElectionDriver != null) {
if (leaderContenderRegistry.containsKey(componentId)) {
return leaderElectionDriver.hasLeadership()
&& leaderSessionId.equals(issuedLeaderSessionID);
} else {
LOG.debug(
"hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.",
componentId);
return false;
}
protected CompletableFuture<Boolean> hasLeadershipAsync(
String componentId, UUID leaderSessionId) {
return CompletableFuture.supplyAsync(
() -> {
synchronized (lock) {
return hasLeadershipInternal(componentId, leaderSessionId);
}
},
leadershipOperationExecutor);
}

@GuardedBy("lock")
private boolean hasLeadershipInternal(String componentId, UUID leaderSessionId) {
if (leaderElectionDriver != null) {
if (leaderContenderRegistry.containsKey(componentId)) {
return leaderElectionDriver.hasLeadership()
&& leaderSessionId.equals(issuedLeaderSessionID);
} else {
LOG.debug("hasLeadership is called after the service is closed, returning false.");
LOG.debug(
"hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.",
componentId);
return false;
}
} else {
LOG.debug("hasLeadership is called after the service is closed, returning false.");
return false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.leaderelection;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/**
* {@code LeaderElection} serves as a proxy between {@code LeaderElectionService} and {@link
Expand All @@ -42,7 +43,7 @@ public interface LeaderElection extends AutoCloseable {
* @param leaderSessionID The new leader session ID
* @param leaderAddress The address of the new leader
*/
void confirmLeadership(UUID leaderSessionID, String leaderAddress);
CompletableFuture<Void> confirmLeadershipAsync(UUID leaderSessionID, String leaderAddress);

/**
* Returns {@code true} if the service's {@link LeaderContender} has the leadership under the
Expand All @@ -51,7 +52,7 @@ public interface LeaderElection extends AutoCloseable {
* @param leaderSessionId identifying the current leader
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
*/
boolean hasLeadership(UUID leaderSessionId);
CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId);

/**
* Closes the {@code LeaderElection} by deregistering the {@link LeaderContender} from the
Expand Down
Loading

0 comments on commit 0dbf027

Please sign in to comment.