Skip to content

Commit

Permalink
spotless:apply
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Nov 27, 2024
1 parent 45e1737 commit 87ef172
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ private void forwardConfirmLeaderSessionFuture(
.thenCompose(
leaderAddress ->
leaderElection
.confirmLeadershipAsLeader(leaderSessionID, leaderAddress)
.confirmLeadershipAsLeader(
leaderSessionID, leaderAddress)
.exceptionally(
error -> {
if (error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ private void runAsLeader(
ThrowingRunnable<? extends Throwable> runnable)
throws LeadershipLostException {
synchronized (lock) {
if (embeddedLeaderElection.running && !shutdown && embeddedLeaderElection.isLeader
if (embeddedLeaderElection.running
&& !shutdown
&& embeddedLeaderElection.isLeader
&& currentLeaderSessionId.equals(leaderSessionId)) {
try {
runnable.run();
Expand Down Expand Up @@ -473,8 +475,7 @@ public void close() {
}

@Override
public void confirmLeadership(
UUID leaderSessionID, String leaderAddress) {
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
checkNotNull(leaderSessionID);
checkNotNull(leaderAddress);
confirmLeader(this, leaderSessionID, leaderAddress);
Expand Down Expand Up @@ -531,10 +532,8 @@ public void shutdown(Exception cause) {

private static class NotifyOfLeaderCall implements Runnable {

@Nullable
private final String address; // null if leader revoked without new leader
@Nullable
private final UUID leaderSessionId; // null if leader revoked without new leader
@Nullable private final String address; // null if leader revoked without new leader
@Nullable private final UUID leaderSessionId; // null if leader revoked without new leader

private final LeaderRetrievalListener listener;
private final Logger logger;
Expand Down Expand Up @@ -590,8 +589,7 @@ public void run() {

private static class RevokeLeadershipCall implements Runnable {

@Nonnull
private final LeaderContender contender;
@Nonnull private final LeaderContender contender;

RevokeLeadershipCall(@Nonnull LeaderContender contender) {
this.contender = contender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
sequentialOperation.thenCompose(
unused ->
supplyAsyncAsLeader(
leaderSessionId,
() ->
jobResultStore.hasJobResultEntryAsync(
getJobID()))
leaderSessionId,
() ->
jobResultStore.hasJobResultEntryAsync(
getJobID()))
.exceptionally(
logLeadershipLostExceptionAndReturnNull(
"verify job result entry",
Expand Down Expand Up @@ -367,10 +367,13 @@ private void confirmLeadership(
UUID leaderSessionId, CompletableFuture<String> leaderAddressFuture) {
FutureUtils.assertNoException(
leaderAddressFuture.thenCompose(
address -> runAsyncAsLeaderAndLogLeadershipLoss(
leaderSessionId,
() -> leaderElection.confirmLeadership(leaderSessionId, address),
"confirming leadership")));
address ->
runAsyncAsLeaderAndLogLeadershipLoss(
leaderSessionId,
() ->
leaderElection.confirmLeadership(
leaderSessionId, address),
"confirming leadership")));
}

private void forwardResultFuture(
Expand Down Expand Up @@ -423,8 +426,8 @@ private void stopJobMasterServiceProcessAsync() {
sequentialOperation.thenCompose(
ignored ->
callIfRunning(
this::stopJobMasterServiceProcess,
"stop leading JobMasterServiceProcess")
this::stopJobMasterServiceProcess,
"stop leading JobMasterServiceProcess")
.orElse(FutureUtils.completedVoidFuture()));

handleAsyncOperationError(sequentialOperation, "Could not suspend the job manager.");
Expand Down Expand Up @@ -519,8 +522,8 @@ private <T> CompletableFuture<T> supplyAsyncAsLeader(
final CompletableFuture<T> resultFuture = new CompletableFuture<>();
FutureUtils.assertNoException(
runAsyncAsLeader(
expectedLeaderId,
() -> FutureUtils.forward(supplier.get(), resultFuture))
expectedLeaderId,
() -> FutureUtils.forward(supplier.get(), resultFuture))
.exceptionally(
t -> {
resultFuture.completeExceptionally(t);
Expand All @@ -531,8 +534,9 @@ private <T> CompletableFuture<T> supplyAsyncAsLeader(
}

private CompletableFuture<Void> runAsyncAsLeader(UUID expectedLeaderId, Runnable action) {
return leaderElection
.runAsLeader(expectedLeaderId, () -> {
return leaderElection.runAsLeader(
expectedLeaderId,
() -> {
synchronized (lock) {
if (isRunning()) {
action.run();
Expand All @@ -541,22 +545,24 @@ private CompletableFuture<Void> runAsyncAsLeader(UUID expectedLeaderId, Runnable
});
}

private void forwardJobMasterGatewayFutureAsLeader(
UUID expectedLeaderId) {
jobMasterServiceProcess.getJobMasterGatewayFuture().whenComplete(
(t, throwable) ->
runAsyncAsLeaderAndLogLeadershipLoss(
expectedLeaderId,
() -> {
synchronized (lock) {
if (throwable != null) {
jobMasterGatewayFuture.completeExceptionally(throwable);
} else {
jobMasterGatewayFuture.complete(t);
}
}
},
"JobMasterGatewayFuture from JobMasterServiceProcess"));
private void forwardJobMasterGatewayFutureAsLeader(UUID expectedLeaderId) {
jobMasterServiceProcess
.getJobMasterGatewayFuture()
.whenComplete(
(t, throwable) ->
runAsyncAsLeaderAndLogLeadershipLoss(
expectedLeaderId,
() -> {
synchronized (lock) {
if (throwable != null) {
jobMasterGatewayFuture.completeExceptionally(
throwable);
} else {
jobMasterGatewayFuture.complete(t);
}
}
},
"JobMasterGatewayFuture from JobMasterServiceProcess"));
}

enum State {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* {@code DefaultLeaderElection} implements the {@link LeaderElection} based on the {@link
* ParentService}.
Expand Down Expand Up @@ -92,12 +90,11 @@ abstract static class ParentService {
*
* @param componentId The ID of the component for which the leadership is confirmed.
* @param leaderSessionID The session ID of the leadership the confirmation is associated
* with.
* with.
* @param leaderAddress The address of the confirmed leader.
*
* @return A future that completes successfully if the confirmation succeeded or fails
* exceptionally with a {@link LeadershipLostException} if the leadership was revoked in
* the meantime.
* exceptionally with a {@link LeadershipLostException} if the leadership was revoked in
* the meantime.
*/
abstract void confirmLeadership(
String componentId, UUID leaderSessionID, String leaderAddress);
Expand All @@ -110,7 +107,6 @@ abstract void confirmLeadership(
* @param componentId The ID of the component that triggered the callback.
* @param leaderSessionId The session ID the callback is associated with.
* @param callback The method that shall be executed if the leadership is still acquired
*
* @return The callback's result.
*/
abstract CompletableFuture<Void> runAsLeader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ public class DefaultLeaderElectionService extends DefaultLeaderElection.ParentSe
*/
private final ExecutorService leadershipOperationExecutor;
/**
* A future that's referring to the leadership operation thread retrieval which is triggered as part of the service initialization to retrieve "main thread" of the DefaultLeaderElectionService
* A future that's referring to the leadership operation thread retrieval which is triggered as
* part of the service initialization to retrieve "main thread" of the
* DefaultLeaderElectionService
*/
private final CompletableFuture<Thread> leadershipOperationThreadFuture;

Expand Down Expand Up @@ -146,9 +148,8 @@ public DefaultLeaderElectionService(
this.confirmedLeaderInformation = LeaderInformationRegister.empty();

this.leadershipOperationExecutor = Preconditions.checkNotNull(leadershipOperationExecutor);
this.leadershipOperationThreadFuture = CompletableFuture.supplyAsync(
Thread::currentThread,
leadershipOperationExecutor);
this.leadershipOperationThreadFuture =
CompletableFuture.supplyAsync(Thread::currentThread, leadershipOperationExecutor);

this.running = true;
}
Expand Down Expand Up @@ -185,7 +186,6 @@ private void createLeaderElectionDriver() throws Exception {
leaderElectionDriver == null,
"This DefaultLeaderElectionService cannot be reused. Calling startLeaderElectionBackend can only be called once to establish the connection to the HA backend.");


leaderElectionDriver = leaderElectionDriverFactory.create(this);

LOG.info(
Expand Down Expand Up @@ -356,9 +356,7 @@ protected void confirmLeadership(
LeaderInformation.known(leaderSessionID, leaderAddress);
confirmedLeaderInformation =
LeaderInformationRegister.merge(
confirmedLeaderInformation,
componentId,
newConfirmedLeaderInformation);
confirmedLeaderInformation, componentId, newConfirmedLeaderInformation);
leaderElectionDriver.publishLeaderInformation(
componentId, newConfirmedLeaderInformation);
}
Expand Down Expand Up @@ -402,8 +400,8 @@ public UUID getLeaderSessionID(String componentId) {
synchronized (lock) {
return leaderContenderRegistry.containsKey(componentId)
? confirmedLeaderInformation
.forComponentIdOrEmpty(componentId)
.getLeaderSessionID()
.forComponentIdOrEmpty(componentId)
.getLeaderSessionID()
: null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,22 @@ public interface LeaderElection extends AutoCloseable {
*
* @param leaderSessionID The new leader session ID
* @param leaderAddress The address of the new leader
*
* @return A future that completes successfully if the confirmation succeeded or fails
* exceptionally with a {@link LeadershipLostException} if the leadership was revoked in the
* meantime.
* exceptionally with a {@link LeadershipLostException} if the leadership was revoked in the
* meantime.
*/
default CompletableFuture<Void> confirmLeadershipAsLeader(
UUID leaderSessionID,
String leaderAddress) {
UUID leaderSessionID, String leaderAddress) {
checkNotNull(leaderSessionID);
checkNotNull(leaderAddress);

return runAsLeader(
leaderSessionID,
() -> confirmLeadership(
leaderSessionID,
leaderAddress));
leaderSessionID, () -> confirmLeadership(leaderSessionID, leaderAddress));
}

/**
* Runs the actual leadership confirmation. This method should only be called if the leadership is acquired. Otherwise, a {@link IllegalStateException} should be thrown.
* Runs the actual leadership confirmation. This method should only be called if the leadership
* is acquired. Otherwise, a {@link IllegalStateException} should be thrown.
*/
void confirmLeadership(UUID leaderSessionID, String leaderAddress);

Expand All @@ -73,10 +69,9 @@ default CompletableFuture<Void> confirmLeadershipAsLeader(
*
* @param leaderSessionId The session ID that's associated with the given {@code callback}.
* @param callback The callback that shall be executed as a leader.
*
* @return The future referring to the result of the callback operation. This future would
* complete exceptionally with a {@link LeadershipLostException} if the leadership wasn't
* active anymore.
* complete exceptionally with a {@link LeadershipLostException} if the leadership wasn't
* active anymore.
*/
CompletableFuture<Void> runAsLeader(
UUID leaderSessionId, ThrowingRunnable<? extends Throwable> callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
* instantiate its own leader election service.
*
* <p>Once a contender has been granted leadership he has to confirm the received leader session ID
* by calling the method {@link LeaderElection#confirmLeadershipAsLeader(UUID, String)}. This will notify
* the leader election service, that the contender has accepted the leadership specified and that
* the leader session id as well as the leader address can now be published for leader retrieval
* services.
* by calling the method {@link LeaderElection#confirmLeadershipAsLeader(UUID, String)}. This will
* notify the leader election service, that the contender has accepted the leadership specified and
* that the leader session id as well as the leader address can now be published for leader
* retrieval services.
*/
public interface LeaderElectionService {

Expand Down
Loading

0 comments on commit 87ef172

Please sign in to comment.