Skip to content

Commit

Permalink
[FLINK-36451] Removes hasLeadership method from interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Nov 22, 2024
1 parent dd670a7 commit 91eb73c
Show file tree
Hide file tree
Showing 18 changed files with 307 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeadershipLostException;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
Expand Down Expand Up @@ -176,13 +177,24 @@ private void forwardConfirmLeaderSessionFuture(
FutureUtils.assertNoException(
newDispatcherLeaderProcess
.getLeaderAddressFuture()
.thenAccept(
leaderAddress -> {
if (leaderElection.hasLeadership(leaderSessionID)) {
leaderElection.confirmLeadership(
leaderSessionID, leaderAddress);
}
}));
.thenCompose(
leaderAddress ->
leaderElection
.confirmLeadership(leaderSessionID, leaderAddress)
.exceptionally(
error -> {
if (error
instanceof
LeadershipLostException) {
LOG.warn(
"Leadership couldn't be confirmed due to leadership loss.",
error);
} else {
fatalErrorHandler.onFatalError(
error);
}
return null;
})));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeaderElectionUtils;
import org.apache.flink.runtime.leaderelection.LeadershipLostException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -468,15 +467,13 @@ public void close() {
}

@Override
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
public CompletableFuture<Void> confirmLeadership(
UUID leaderSessionID, String leaderAddress) {
checkNotNull(leaderSessionID);
checkNotNull(leaderAddress);
confirmLeader(this, leaderSessionID, leaderAddress);
}

@Override
public boolean hasLeadership(UUID leaderSessionId) {
return isLeader && leaderSessionId.equals(currentLeaderSessionId);
return FutureUtils.completedVoidFuture();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -256,27 +257,21 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
sequentialOperation =
sequentialOperation.thenCompose(
unused ->
supplyAsyncIfValidLeader(
supplyAsyncAsLeader(
leaderSessionId,
() ->
jobResultStore.hasJobResultEntryAsync(
getJobID()),
() ->
FutureUtils.completedExceptionally(
new LeadershipLostException(
"The leadership is lost.")))
getJobID()))
.handle(
(hasJobResult, throwable) -> {
if (throwable
instanceof LeadershipLostException) {
printLogIfNotValidLeader(
logLeadershipLoss(
"verify job result entry",
leaderSessionId);
return null;
} else if (throwable != null) {
ExceptionUtils.rethrow(throwable);
}
if (hasJobResult) {
} else if (hasJobResult) {
handleJobAlreadyDoneIfValidLeader(
leaderSessionId);
} else {
Expand All @@ -289,12 +284,12 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
}

private void handleJobAlreadyDoneIfValidLeader(UUID leaderSessionId) {
runIfValidLeader(
runAsyncAsLeader(
leaderSessionId, () -> jobAlreadyDone(leaderSessionId), "check completed job");
}

private void createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId) {
runIfValidLeader(
runAsyncAsLeader(
leaderSessionId,
() ->
ThrowingRunnable.unchecked(
Expand All @@ -303,7 +298,20 @@ private void createNewJobMasterServiceProcessIfValidLeader(UUID leaderSessionId)
"create new job master service process");
}

private void printLogIfNotValidLeader(String actionDescription, UUID leaderSessionId) {
private Function<Throwable, ? extends Void> handleLeadershipError(
String actionDescription, UUID leaderSessionId) {
return error -> {
if (error instanceof LeadershipLostException) {
logLeadershipLoss(actionDescription, leaderSessionId);
} else {
fatalErrorHandler.onFatalError(error);
}

return null;
};
}

private static void logLeadershipLoss(String actionDescription, UUID leaderSessionId) {
LOG.debug(
"Ignore leader action '{}' because the leadership runner is no longer the valid leader for {}.",
actionDescription,
Expand Down Expand Up @@ -354,23 +362,21 @@ private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws Flink
private void confirmLeadership(
UUID leaderSessionId, CompletableFuture<String> leaderAddressFuture) {
FutureUtils.assertNoException(
leaderAddressFuture.thenAccept(
leaderAddressFuture.thenCompose(
address ->
runIfValidLeader(
leaderSessionId,
() -> {
LOG.debug("Confirm leadership {}.", leaderSessionId);
leaderElection.confirmLeadership(
leaderSessionId, address);
},
"confirming leadership")));
leaderElection
.confirmLeadership(leaderSessionId, address)
.exceptionally(
handleLeadershipError(
"confirming leadership",
leaderSessionId))));
}

private void forwardResultFuture(
UUID leaderSessionId, CompletableFuture<JobManagerRunnerResult> resultFuture) {
resultFuture.whenComplete(
(jobManagerRunnerResult, throwable) ->
runIfValidLeader(
runAsyncAsLeader(
leaderSessionId,
() -> onJobCompletion(jobManagerRunnerResult, throwable),
"result future forwarding"));
Expand Down Expand Up @@ -497,55 +503,41 @@ private boolean isRunning() {
return state == State.RUNNING;
}

private void runIfValidLeader(
UUID expectedLeaderId, Runnable action, Runnable noLeaderFallback) {
synchronized (lock) {
if (isValidLeader(expectedLeaderId)) {
action.run();
} else {
noLeaderFallback.run();
}
}
}

private void runIfValidLeader(
private void runAsyncAsLeader(
UUID expectedLeaderId, Runnable action, String noLeaderFallbackCommandDescription) {
runIfValidLeader(
expectedLeaderId,
action,
() ->
printLogIfNotValidLeader(
noLeaderFallbackCommandDescription, expectedLeaderId));
FutureUtils.assertNoException(
leaderElection
.runAsLeader(expectedLeaderId, action::run)
.exceptionally(
handleLeadershipError(
noLeaderFallbackCommandDescription, expectedLeaderId)));
}

private <T> CompletableFuture<T> supplyAsyncIfValidLeader(
UUID expectedLeaderId,
Supplier<CompletableFuture<T>> supplier,
Supplier<CompletableFuture<T>> noLeaderFallback) {
private <T> CompletableFuture<T> supplyAsyncAsLeader(
UUID expectedLeaderId, Supplier<CompletableFuture<T>> supplier) {
final CompletableFuture<T> resultFuture = new CompletableFuture<>();
runIfValidLeader(
expectedLeaderId,
() -> FutureUtils.forward(supplier.get(), resultFuture),
() -> FutureUtils.forward(noLeaderFallback.get(), resultFuture));
FutureUtils.assertNoException(
leaderElection
.runAsLeader(
expectedLeaderId,
() -> FutureUtils.forward(supplier.get(), resultFuture))
.exceptionally(
t -> {
resultFuture.completeExceptionally(t);
return null;
}));

return resultFuture;
}

@GuardedBy("lock")
private boolean isValidLeader(UUID expectedLeaderId) {
return isRunning()
&& leaderElection != null
&& leaderElection.hasLeadership(expectedLeaderId);
}

private <T> void forwardIfValidLeader(
UUID expectedLeaderId,
CompletableFuture<? extends T> source,
CompletableFuture<T> target,
String forwardDescription) {
source.whenComplete(
(t, throwable) ->
runIfValidLeader(
runAsyncAsLeader(
expectedLeaderId,
() -> {
if (throwable != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,8 @@ public void startLeaderElection(LeaderContender contender) throws Exception {
}

@Override
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress);
}

@Override
public boolean hasLeadership(UUID leaderSessionId) {
return parentService.hasLeadership(componentId, leaderSessionId);
public CompletableFuture<Void> confirmLeadership(UUID leaderSessionID, String leaderAddress) {
return parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress);
}

@Override
Expand Down Expand Up @@ -88,18 +83,9 @@ abstract static class ParentService {
* Confirms the leadership with the {@code leaderSessionID} and {@code leaderAddress} for
* the {@link LeaderContender} that is associated with the {@code componentId}.
*/
abstract void confirmLeadership(
abstract CompletableFuture<Void> confirmLeadership(
String componentId, UUID leaderSessionID, String leaderAddress);

/**
* Checks whether the {@code ParentService} has the leadership acquired for the {@code
* componentId} and {@code leaderSessionID}.
*
* @return {@code true} if the service has leadership with the passed {@code
* leaderSessionID} acquired; {@code false} otherwise.
*/
abstract boolean hasLeadership(String componentId, UUID leaderSessionID);

/**
* Runs passed {@code callback} in the leadership main thread if the leadership is still
* valid or returns a future that failed with {@link LeadershipLostException} otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public void close() throws Exception {
}

@Override
protected void confirmLeadership(
protected CompletableFuture<Void> confirmLeadership(
String componentId, UUID leaderSessionID, String leaderAddress) {
Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId));
LOG.debug(
Expand All @@ -326,60 +326,33 @@ protected void confirmLeadership(

checkNotNull(leaderSessionID);

synchronized (lock) {
if (hasLeadership(componentId, leaderSessionID)) {
Preconditions.checkState(
leaderElectionDriver != null,
"The leadership check should only return true if a driver is instantiated.");
Preconditions.checkState(
!confirmedLeaderInformation.hasLeaderInformation(componentId),
"No confirmation should have happened, yet.");

final LeaderInformation newConfirmedLeaderInformation =
LeaderInformation.known(leaderSessionID, leaderAddress);
confirmedLeaderInformation =
LeaderInformationRegister.merge(
confirmedLeaderInformation,
componentId,
newConfirmedLeaderInformation);
leaderElectionDriver.publishLeaderInformation(
componentId, newConfirmedLeaderInformation);
} else {
if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
LOG.debug(
"Received an old confirmation call of leader session ID {} for component '{}' (current issued session ID is {}).",
leaderSessionID,
componentId,
issuedLeaderSessionID);
} else {
LOG.warn(
"The leader session ID {} for component '{}' was confirmed even though the corresponding "
+ "service was not elected as the leader or has been stopped already.",
componentId,
leaderSessionID);
}
}
}
}

@Override
protected boolean hasLeadership(String componentId, UUID leaderSessionId) {
synchronized (lock) {
if (leaderElectionDriver != null) {
if (leaderContenderRegistry.containsKey(componentId)) {
return leaderElectionDriver.hasLeadership()
&& leaderSessionId.equals(issuedLeaderSessionID);
} else {
LOG.debug(
"hasLeadership is called for component '{}' while there is no contender registered under that ID in the service, returning false.",
componentId);
return false;
}
} else {
LOG.debug("hasLeadership is called after the service is closed, returning false.");
return false;
}
}
return runAsLeader(
componentId,
leaderSessionID,
() -> {
Preconditions.checkState(
leaderElectionDriver != null,
"The leadership check should only return true if a driver is instantiated.");
Preconditions.checkState(
!confirmedLeaderInformation.hasLeaderInformation(componentId),
"No confirmation should have happened, yet.");

final LeaderInformation newConfirmedLeaderInformation =
LeaderInformation.known(leaderSessionID, leaderAddress);
confirmedLeaderInformation =
LeaderInformationRegister.merge(
confirmedLeaderInformation,
componentId,
newConfirmedLeaderInformation);
leaderElectionDriver.publishLeaderInformation(
componentId, newConfirmedLeaderInformation);
})
.exceptionally(
error -> {
LOG.debug(
"Leadership was lost for component '{}'.", componentId, error);
return null;
});
}

@Override
Expand Down
Loading

0 comments on commit 91eb73c

Please sign in to comment.