Skip to content

Commit

Permalink
[FLINK-36451][runtime] Adds Async suffix to method names
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Dec 3, 2024
1 parent bac9035 commit a045cf0
Show file tree
Hide file tree
Showing 19 changed files with 78 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private void forwardConfirmLeaderSessionFuture(
.getLeaderAddressFuture()
.thenCompose(
leaderAddress ->
leaderElection.confirmLeadership(
leaderElection.confirmLeadershipAsync(
leaderSessionID, leaderAddress)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,15 +467,15 @@ public void close() {
}

@Override
public CompletableFuture<Void> confirmLeadership(
public CompletableFuture<Void> confirmLeadershipAsync(
UUID leaderSessionID, String leaderAddress) {
checkNotNull(leaderSessionID);
checkNotNull(leaderAddress);
return confirmLeader(this, leaderSessionID, leaderAddress);
}

@Override
public CompletableFuture<Boolean> hasLeadership(UUID leaderSessionId) {
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
return CompletableFuture.completedFuture(
isLeader && leaderSessionId.equals(currentLeaderSessionId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ private void confirmLeadership(
LOG.debug(
"Confirm leadership {}.",
leaderSessionId);
return leaderElection.confirmLeadership(
return leaderElection.confirmLeadershipAsync(
leaderSessionId, address);
},
"confirming leadership")
Expand Down Expand Up @@ -487,7 +487,7 @@ private CompletableFuture<Void> runIfValidLeader(
synchronized (lock) {
if (isRunning() && leaderElection != null) {
return leaderElection
.hasLeadership(expectedLeaderId)
.hasLeadershipAsync(expectedLeaderId)
.thenAccept(
hasLeadership -> {
synchronized (lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ public void startLeaderElection(LeaderContender contender) throws Exception {
}

@Override
public CompletableFuture<Void> confirmLeadership(UUID leaderSessionID, String leaderAddress) {
return parentService.confirmLeadership(componentId, leaderSessionID, leaderAddress);
public CompletableFuture<Void> confirmLeadershipAsync(
UUID leaderSessionID, String leaderAddress) {
return parentService.confirmLeadershipAsync(componentId, leaderSessionID, leaderAddress);
}

@Override
public CompletableFuture<Boolean> hasLeadership(UUID leaderSessionId) {
return parentService.hasLeadership(componentId, leaderSessionId);
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
return parentService.hasLeadershipAsync(componentId, leaderSessionId);
}

@Override
Expand Down Expand Up @@ -82,7 +83,7 @@ abstract static class ParentService {
* the {@link LeaderContender} that is associated with the {@code componentId}. The
* information is only propagated to the HA backend if the leadership is still acquired.
*/
abstract CompletableFuture<Void> confirmLeadership(
abstract CompletableFuture<Void> confirmLeadershipAsync(
String componentId, UUID leaderSessionID, String leaderAddress);

/**
Expand All @@ -92,6 +93,7 @@ abstract CompletableFuture<Void> confirmLeadership(
* @return {@code true} if the service has leadership with the passed {@code
* leaderSessionID} acquired; {@code false} otherwise.
*/
abstract CompletableFuture<Boolean> hasLeadership(String componentId, UUID leaderSessionID);
abstract CompletableFuture<Boolean> hasLeadershipAsync(
String componentId, UUID leaderSessionID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ public void close() throws Exception {
}

@Override
protected CompletableFuture<Void> confirmLeadership(
protected CompletableFuture<Void> confirmLeadershipAsync(
String componentId, UUID leaderSessionID, String leaderAddress) {
Preconditions.checkArgument(leaderContenderRegistry.containsKey(componentId));
LOG.debug(
Expand Down Expand Up @@ -365,7 +365,8 @@ protected CompletableFuture<Void> confirmLeadership(
}

@Override
protected CompletableFuture<Boolean> hasLeadership(String componentId, UUID leaderSessionId) {
protected CompletableFuture<Boolean> hasLeadershipAsync(
String componentId, UUID leaderSessionId) {
return CompletableFuture.supplyAsync(
() -> {
synchronized (lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public interface LeaderElection extends AutoCloseable {
* @param leaderSessionID The new leader session ID
* @param leaderAddress The address of the new leader
*/
CompletableFuture<Void> confirmLeadership(UUID leaderSessionID, String leaderAddress);
CompletableFuture<Void> confirmLeadershipAsync(UUID leaderSessionID, String leaderAddress);

/**
* Returns {@code true} if the service's {@link LeaderContender} has the leadership under the
Expand All @@ -52,7 +52,7 @@ public interface LeaderElection extends AutoCloseable {
* @param leaderSessionId identifying the current leader
* @return true if the associated {@link LeaderContender} is the leader, otherwise false
*/
CompletableFuture<Boolean> hasLeadership(UUID leaderSessionId);
CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId);

/**
* Closes the {@code LeaderElection} by deregistering the {@link LeaderContender} from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
* instantiate its own leader election service.
*
* <p>Once a contender has been granted leadership he has to confirm the received leader session ID
* by calling the method {@link LeaderElection#confirmLeadership(UUID, String)}. This will notify
* the leader election service, that the contender has accepted the leadership specified and that
* the leader session id as well as the leader address can now be published for leader retrieval
* services.
* by calling the method {@link LeaderElection#confirmLeadershipAsync(UUID, String)}. This will
* notify the leader election service, that the contender has accepted the leadership specified and
* that the leader session id as well as the leader address can now be published for leader
* retrieval services.
*/
public interface LeaderElectionService {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,13 @@ public void startLeaderElection(LeaderContender contender) throws Exception {
}

@Override
public CompletableFuture<Void> confirmLeadership(UUID leaderSessionID, String leaderAddress) {
public CompletableFuture<Void> confirmLeadershipAsync(
UUID leaderSessionID, String leaderAddress) {
return FutureUtils.completedVoidFuture();
}

@Override
public CompletableFuture<Boolean> hasLeadership(UUID leaderSessionId) {
public CompletableFuture<Boolean> hasLeadershipAsync(UUID leaderSessionId) {
synchronized (lock) {
return CompletableFuture.completedFuture(
this.leaderContender != null && this.sessionID.equals(leaderSessionId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Excep
.thenAcceptAsync(
(isStillLeader) -> {
if (isStillLeader) {
leaderElection.confirmLeadership(
leaderElection.confirmLeadershipAsync(
newLeaderSessionID, newLeaderResourceManager.getAddress());
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ public void grantLeadership(final UUID leaderSessionID) {
"{} was granted leadership with leaderSessionID={}",
getRestBaseUrl(),
leaderSessionID);
leaderElection.confirmLeadership(leaderSessionID, getRestBaseUrl());
leaderElection.confirmLeadershipAsync(leaderSessionID, getRestBaseUrl());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void grantLeadership_oldLeader_doesNotConfirmLeaderSession() throws Excep
// complete the confirmation future after losing the leadership
contenderConfirmationFuture.complete("leader address");

assertThat(leaderElection.hasLeadership(leaderSessionId).get(), is(false));
assertThat(leaderElection.hasLeadershipAsync(leaderSessionId).get(), is(false));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private void runLeaderRetrievalTest(

final UUID leaderId = leaderContender.getLeaderSessionFuture().get();

leaderElection.confirmLeadership(leaderId, ADDRESS).get();
leaderElection.confirmLeadershipAsync(leaderId, ADDRESS).get();

final LeaderInformation leaderInformation =
leaderRetrievalListener.getLeaderInformationFuture().get();
Expand Down Expand Up @@ -172,20 +172,20 @@ public void testConcurrentLeadershipOperations() throws Exception {

final UUID oldLeaderSessionId = leaderContender.getLeaderSessionFuture().get();

assertThat(leaderElection.hasLeadership(oldLeaderSessionId).get(), is(true));
assertThat(leaderElection.hasLeadershipAsync(oldLeaderSessionId).get(), is(true));

embeddedHaServices.getDispatcherLeaderService().revokeLeadership().get();
assertThat(leaderElection.hasLeadership(oldLeaderSessionId).get(), is(false));
assertThat(leaderElection.hasLeadershipAsync(oldLeaderSessionId).get(), is(false));

embeddedHaServices.getDispatcherLeaderService().grantLeadership();
final UUID newLeaderSessionId = leaderContender.getLeaderSessionFuture().get();

assertThat(leaderElection.hasLeadership(newLeaderSessionId).get(), is(true));
assertThat(leaderElection.hasLeadershipAsync(newLeaderSessionId).get(), is(true));

leaderElection.confirmLeadership(oldLeaderSessionId, ADDRESS).get();
leaderElection.confirmLeadership(newLeaderSessionId, ADDRESS).get();
leaderElection.confirmLeadershipAsync(oldLeaderSessionId, ADDRESS).get();
leaderElection.confirmLeadershipAsync(newLeaderSessionId, ADDRESS).get();

assertThat(leaderElection.hasLeadership(newLeaderSessionId).get(), is(true));
assertThat(leaderElection.hasLeadershipAsync(newLeaderSessionId).get(), is(true));

leaderContender.tryRethrowException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ void testAllLeaderInformationChangeEventWithUnknownComponentId() throws Exceptio
}

@Test
void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception {
void testHasLeadershipAsyncWithLeadershipButNoGrantEventProcessed() throws Exception {
new Context() {
{
runTestWithManuallyTriggeredEvents(
Expand All @@ -714,10 +714,10 @@ void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception
applyToBothContenderContexts(
ctx -> {
final CompletableFuture<Boolean> validSessionFuture =
leaderElectionService.hasLeadership(
leaderElectionService.hasLeadershipAsync(
ctx.componentId, expectedSessionID);
final CompletableFuture<Boolean> invalidSessionFuture =
leaderElectionService.hasLeadership(
leaderElectionService.hasLeadershipAsync(
ctx.componentId, UUID.randomUUID());
executorService.triggerAll();

Expand All @@ -734,7 +734,7 @@ void testHasLeadershipWithLeadershipButNoGrantEventProcessed() throws Exception
}

@Test
void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception {
void testHasLeadershipAsyncWithLeadershipAndGrantEventProcessed() throws Exception {
new Context() {
{
runTestWithManuallyTriggeredEvents(
Expand All @@ -753,10 +753,10 @@ void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception {
.isEqualTo(expectedSessionID);

final CompletableFuture<Boolean> validSessionFuture =
leaderElectionService.hasLeadership(
leaderElectionService.hasLeadershipAsync(
ctx.componentId, expectedSessionID);
final CompletableFuture<Boolean> invalidSessionFuture =
leaderElectionService.hasLeadership(
leaderElectionService.hasLeadershipAsync(
ctx.componentId, UUID.randomUUID());
executorService.triggerAll();

Expand All @@ -773,7 +773,7 @@ void testHasLeadershipWithLeadershipAndGrantEventProcessed() throws Exception {
}

@Test
void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Exception {
void testHasLeadershipAsyncWithLeadershipLostButNoRevokeEventProcessed() throws Exception {
new Context() {
{
runTestWithManuallyTriggeredEvents(
Expand All @@ -787,10 +787,10 @@ void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Excep
applyToBothContenderContexts(
ctx -> {
final CompletableFuture<Boolean> validSessionFuture =
leaderElectionService.hasLeadership(
leaderElectionService.hasLeadershipAsync(
ctx.componentId, expectedSessionID);
final CompletableFuture<Boolean> invalidSessionFuture =
leaderElectionService.hasLeadership(
leaderElectionService.hasLeadershipAsync(
ctx.componentId, UUID.randomUUID());

executorService.triggerAll();
Expand All @@ -814,7 +814,7 @@ void testHasLeadershipWithLeadershipLostButNoRevokeEventProcessed() throws Excep
}

@Test
void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Exception {
void testHasLeadershipAsyncWithLeadershipLostAndRevokeEventProcessed() throws Exception {
new Context() {
{
runTestWithSynchronousEventHandling(
Expand All @@ -826,12 +826,12 @@ void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Excepti
applyToBothContenderContexts(
ctx -> {
assertThatFuture(
leaderElectionService.hasLeadership(
leaderElectionService.hasLeadershipAsync(
ctx.componentId, expectedSessionID))
.eventuallySucceeds()
.isEqualTo(false);
assertThatFuture(
leaderElectionService.hasLeadership(
leaderElectionService.hasLeadershipAsync(
ctx.componentId, UUID.randomUUID()))
.eventuallySucceeds()
.isEqualTo(false);
Expand All @@ -842,7 +842,7 @@ void testHasLeadershipWithLeadershipLostAndRevokeEventProcessed() throws Excepti
}

@Test
void testHasLeadershipAfterLeaderElectionClose() throws Exception {
void testHasLeadershipAsyncAfterLeaderElectionClose() throws Exception {
new Context() {
{
runTestWithSynchronousEventHandling(
Expand All @@ -855,7 +855,7 @@ void testHasLeadershipAfterLeaderElectionClose() throws Exception {
ctx.leaderElection.close();

assertThatFuture(
leaderElectionService.hasLeadership(
leaderElectionService.hasLeadershipAsync(
ctx.componentId, expectedSessionID))
.eventuallySucceeds()
.isEqualTo(false);
Expand Down Expand Up @@ -1061,7 +1061,7 @@ void testOldConfirmLeaderInformationWhileHavingNewLeadership() throws Exception
.hasValue(expectedLeaderInformation);

// Old confirm call should be ignored.
ctx.leaderElection.confirmLeadership(
ctx.leaderElection.confirmLeadershipAsync(
UUID.randomUUID(), ctx.address);
assertThat(
leaderElectionService.getLeaderSessionID(
Expand Down Expand Up @@ -1092,7 +1092,7 @@ void testOldConfirmationWhileHavingLeadershipLost() throws Exception {
applyToBothContenderContexts(
ctx -> {
// Old confirm call should be ignored.
ctx.leaderElection.confirmLeadership(
ctx.leaderElection.confirmLeadershipAsync(
currentLeaderSessionId, ctx.address);

assertThat(
Expand Down Expand Up @@ -1316,7 +1316,7 @@ void testNestedDeadlockInLeadershipConfirmation() throws Exception {
revocationFuture = CompletableFuture.runAsync(testInstance::onRevokeLeadership);
contenderLockAcquireLatch.await();
confirmLeadershipFuture =
leaderElection.confirmLeadership(leaderSessionId, "random-address");
leaderElection.confirmLeadershipAsync(leaderSessionId, "random-address");
}

assertThatFuture(revocationFuture).eventuallySucceeds();
Expand Down
Loading

0 comments on commit a045cf0

Please sign in to comment.