Skip to content

Commit

Permalink
[FLINK-36451][runtime] Moves leader-related logic into leaderOperatio…
Browse files Browse the repository at this point in the history
…n executor

- introduces runAsLeader
- makes confirmLeadership use runAsLeader
- removes hasLeadership
  • Loading branch information
XComp committed Nov 28, 2024
1 parent 5a3b18e commit 5b77d8a
Show file tree
Hide file tree
Showing 22 changed files with 710 additions and 383 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeadershipLostException;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
Expand All @@ -31,6 +32,7 @@
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

/**
* Runner for the {@link org.apache.flink.runtime.dispatcher.Dispatcher} which is responsible for
Expand Down Expand Up @@ -176,13 +178,27 @@ private void forwardConfirmLeaderSessionFuture(
FutureUtils.assertNoException(
newDispatcherLeaderProcess
.getLeaderAddressFuture()
.thenAccept(
leaderAddress -> {
if (leaderElection.hasLeadership(leaderSessionID)) {
leaderElection.confirmLeadership(
leaderSessionID, leaderAddress);
}
}));
.thenCompose(
leaderAddress ->
leaderElection
.confirmLeadershipAsLeader(
leaderSessionID, leaderAddress)
.exceptionally(
error -> {
if (error
instanceof
LeadershipLostException) {
LOG.warn(
"Leadership couldn't be confirmed due to leadership loss.",
error);
return null;
}

// any other error is unexpected and
// should be passed down to the
// system-wide error handling
throw new CompletionException(error);
})));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeadershipLostException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -244,38 +246,45 @@ private void removeContender(EmbeddedLeaderElection embeddedLeaderElection) {
}

/** Callback from leader contenders when they confirm a leader grant. */
@GuardedBy("lock")
private void confirmLeader(
final EmbeddedLeaderElection embeddedLeaderElection,
final UUID leaderSessionId,
final String leaderAddress) {
synchronized (lock) {
// if the leader election was shut down in the meantime, ignore this confirmation
if (!embeddedLeaderElection.running || shutdown) {
return;
}

try {
// check if the confirmation is for the same grant, or whether it is a stale grant
if (embeddedLeaderElection == currentLeaderProposed
&& currentLeaderSessionId.equals(leaderSessionId)) {
LOG.info(
"Received confirmation of leadership for leader {} , session={}",
leaderAddress,
leaderSessionId);

// mark leadership
currentLeaderConfirmed = embeddedLeaderElection;
currentLeaderAddress = leaderAddress;
currentLeaderProposed = null;
Preconditions.checkState(
currentLeaderProposed == embeddedLeaderElection,
"The confirmLeader method should only be called when having the leadership acquired.");
LOG.info(
"Received confirmation of leadership for leader {} , session={}",
leaderAddress,
leaderSessionId);

// mark leadership
currentLeaderConfirmed = embeddedLeaderElection;
currentLeaderAddress = leaderAddress;
currentLeaderProposed = null;

// notify all listeners
notifyAllListeners(leaderAddress, leaderSessionId);
}

// notify all listeners
notifyAllListeners(leaderAddress, leaderSessionId);
} else {
LOG.debug(
"Received confirmation of leadership for a stale leadership grant. Ignoring.");
private void runAsLeader(
EmbeddedLeaderElection embeddedLeaderElection,
UUID leaderSessionId,
ThrowingRunnable<? extends Throwable> runnable)
throws LeadershipLostException {
synchronized (lock) {
if (embeddedLeaderElection.running
&& !shutdown
&& embeddedLeaderElection.isLeader
&& currentLeaderSessionId.equals(leaderSessionId)) {
try {
runnable.run();
} catch (Throwable t) {
fatalError(t);
}
} catch (Throwable t) {
fatalError(t);
} else {
throw new LeadershipLostException(leaderSessionId);
}
}
}
Expand Down Expand Up @@ -472,8 +481,15 @@ public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
}

@Override
public boolean hasLeadership(UUID leaderSessionId) {
return isLeader && leaderSessionId.equals(currentLeaderSessionId);
public CompletableFuture<Void> runAsLeader(
UUID leaderSessionId, ThrowingRunnable<? extends Throwable> callback) {
try {
EmbeddedLeaderService.this.runAsLeader(this, leaderSessionId, callback);
} catch (LeadershipLostException e) {
return FutureUtils.completedExceptionally(e);
}

return FutureUtils.completedVoidFuture();
}

void shutdown(Exception cause) {
Expand Down
Loading

0 comments on commit 5b77d8a

Please sign in to comment.