Skip to content

Commit

Permalink
[FLINK-36451][runtime] Adds runAsLeader method to LeaderElection inte…
Browse files Browse the repository at this point in the history
…rface
  • Loading branch information
XComp committed Nov 22, 2024
1 parent b48ccce commit dd670a7
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeaderElectionUtils;
import org.apache.flink.runtime.leaderelection.LeadershipLostException;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -476,6 +479,15 @@ public boolean hasLeadership(UUID leaderSessionId) {
return isLeader && leaderSessionId.equals(currentLeaderSessionId);
}

@Override
public CompletableFuture<Void> runAsLeader(
UUID leaderSessionId, ThrowingRunnable<? extends Throwable> callback) {
return LeaderElectionUtils.runOrFailWithLeadershipLostException(
leaderSessionId,
callback,
sessionId -> isLeader && sessionId.equals(currentLeaderSessionId));
}

void shutdown(Exception cause) {
if (running) {
running = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.flink.runtime.leaderelection;

import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/**
* {@code DefaultLeaderElection} implements the {@link LeaderElection} based on the {@link
Expand Down Expand Up @@ -52,6 +54,12 @@ public boolean hasLeadership(UUID leaderSessionId) {
return parentService.hasLeadership(componentId, leaderSessionId);
}

@Override
public CompletableFuture<Void> runAsLeader(
UUID leaderSessionId, ThrowingRunnable<? extends Throwable> callback) {
return parentService.runAsLeader(componentId, leaderSessionId, callback);
}

@Override
public void close() throws Exception {
parentService.remove(componentId);
Expand Down Expand Up @@ -91,5 +99,19 @@ abstract void confirmLeadership(
* leaderSessionID} acquired; {@code false} otherwise.
*/
abstract boolean hasLeadership(String componentId, UUID leaderSessionID);

/**
* Runs passed {@code callback} in the leadership main thread if the leadership is still
* valid or returns a future that failed with {@link LeadershipLostException} otherwise.
*
* @param componentId The id of the component that triggered the callback.
* @param leaderSessionId The session id the callback is associated with.
* @param callback The method that shall be executed if being the leader.
* @return The callback's result.
*/
abstract CompletableFuture<Void> runAsLeader(
String componentId,
UUID leaderSessionId,
ThrowingRunnable<? extends Throwable> callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +39,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.apache.flink.runtime.leaderelection.LeaderElectionUtils.runAsyncOrFailWithLeadershipLostException;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -380,6 +382,24 @@ protected boolean hasLeadership(String componentId, UUID leaderSessionId) {
}
}

@Override
CompletableFuture<Void> runAsLeader(
String componentId,
UUID leaderSessionId,
ThrowingRunnable<? extends Throwable> callback) {
synchronized (lock) {
return runAsyncOrFailWithLeadershipLostException(
leaderSessionId,
callback,
sessionId ->
leaderElectionDriver != null
&& leaderContenderRegistry.containsKey(componentId)
&& leaderElectionDriver.hasLeadership()
&& sessionId.equals(leaderSessionId),
leadershipOperationExecutor);
}
}

/**
* Returns the current leader session ID for the given {@code componentId} or {@code null}, if
* the session wasn't confirmed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.flink.runtime.leaderelection;

import org.apache.flink.util.function.ThrowingRunnable;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/**
* {@code LeaderElection} serves as a proxy between {@code LeaderElectionService} and {@link
Expand Down Expand Up @@ -51,6 +54,19 @@ public interface LeaderElection extends AutoCloseable {
*/
boolean hasLeadership(UUID leaderSessionId);

/**
* Runs the given {@code callback} on the leader election's main thread only if the associated
* {@code leaderSessionId} is still valid.
*
* @param leaderSessionId The session ID that's associated with the given {@code callback}.
* @param callback The callback that shall be executed as a leader.
* @return The future referring to the result of the callback operation. This future would
* complete exceptionally with a {@link LeadershipLostException} if the leadership wasn't
* active anymore.
*/
CompletableFuture<Void> runAsLeader(
UUID leaderSessionId, ThrowingRunnable<? extends Throwable> callback);

/**
* Closes the {@code LeaderElection} by deregistering the {@link LeaderContender} from the
* underlying leader election. {@link LeaderContender#revokeLeadership()} will be called if the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@
package org.apache.flink.runtime.leaderelection;

import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;

/** {@code LeaderElectionUtils} collects helper methods to handle LeaderElection-related issues. */
public class LeaderElectionUtils {
Expand All @@ -42,4 +51,32 @@ public static String convertToString(UUID sessionId, String address) {
"%s@%s",
Preconditions.checkNotNull(sessionId), Preconditions.checkNotNull(address));
}

public static CompletableFuture<Void> runOrFailWithLeadershipLostException(
UUID leaderSessionId,
ThrowingRunnable<? extends Throwable> callback,
Function<UUID, Boolean> isLeader) {
return runAsyncOrFailWithLeadershipLostException(
leaderSessionId, callback, isLeader, Executors.directExecutor());
}

public static CompletableFuture<Void> runAsyncOrFailWithLeadershipLostException(
UUID leaderSessionId,
ThrowingRunnable<? extends Throwable> callback,
Function<UUID, Boolean> isLeader,
Executor executor) {
if (!isLeader.apply(leaderSessionId)) {
return FutureUtils.completedExceptionally(new LeadershipLostException(leaderSessionId));
}

return CompletableFuture.runAsync(
() -> {
try {
callback.run();
} catch (Throwable e) {
throw new CompletionException(e);
}
},
executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@

package org.apache.flink.runtime.leaderelection;

import java.util.UUID;

/** This exception is used in the scenario that the leadership is lost. */
public class LeadershipLostException extends LeaderElectionException {

private static final long serialVersionUID = 1L;

public LeadershipLostException(UUID leaderSessionId) {
this(String.format("The leadership with session ID %s has been lost.", leaderSessionId));
}

public LeadershipLostException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
package org.apache.flink.runtime.leaderelection;

import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;

import javax.annotation.Nullable;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import static org.apache.flink.runtime.leaderelection.LeaderElectionUtils.runOrFailWithLeadershipLostException;

/**
* {@code StandaloneLeaderElection} implements {@link LeaderElection} for non-HA cases. This
Expand Down Expand Up @@ -62,6 +67,18 @@ public boolean hasLeadership(UUID leaderSessionId) {
}
}

@Override
public CompletableFuture<Void> runAsLeader(
UUID leaderSessionId, ThrowingRunnable<? extends Throwable> callback) {
synchronized (lock) {
return runOrFailWithLeadershipLostException(
leaderSessionId,
callback,
sessionId ->
this.leaderContender != null && this.sessionID.equals(leaderSessionId));
}
}

@Override
public void close() throws Exception {
synchronized (lock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.flink.runtime.leaderelection;

import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.flink.util.function.TriConsumer;
import org.apache.flink.util.function.TriFunction;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -194,18 +196,31 @@ private static class TestingAbstractLeaderElectionService
private final Consumer<String> removeConsumer;
private final TriConsumer<String, UUID, String> confirmLeadershipConsumer;
private final BiFunction<String, UUID, Boolean> hasLeadershipFunction;
private final TriFunction<
String,
UUID,
ThrowingRunnable<? extends Throwable>,
CompletableFuture<Void>>
runAsLeaderCallback;

private TestingAbstractLeaderElectionService(
BiConsumerWithException<String, LeaderContender, Exception> registerConsumer,
Consumer<String> removeConsumer,
TriConsumer<String, UUID, String> confirmLeadershipConsumer,
BiFunction<String, UUID, Boolean> hasLeadershipFunction) {
BiFunction<String, UUID, Boolean> hasLeadershipFunction,
TriFunction<
String,
UUID,
ThrowingRunnable<? extends Throwable>,
CompletableFuture<Void>>
runAsLeaderCallback) {
super();

this.registerConsumer = registerConsumer;
this.removeConsumer = removeConsumer;
this.confirmLeadershipConsumer = confirmLeadershipConsumer;
this.hasLeadershipFunction = hasLeadershipFunction;
this.runAsLeaderCallback = runAsLeaderCallback;
}

@Override
Expand All @@ -229,6 +244,14 @@ protected boolean hasLeadership(String componentId, UUID leaderSessionId) {
return hasLeadershipFunction.apply(componentId, leaderSessionId);
}

@Override
CompletableFuture<Void> runAsLeader(
String componentId,
UUID leaderSessionId,
ThrowingRunnable<? extends Throwable> callback) {
return runAsLeaderCallback.apply(componentId, leaderSessionId, callback);
}

public static Builder newBuilder() {
return new Builder()
.setRegisterConsumer(
Expand All @@ -254,6 +277,16 @@ private static class Builder {
private Consumer<String> removeConsumer;
private TriConsumer<String, UUID, String> confirmLeadershipConsumer;
private BiFunction<String, UUID, Boolean> hasLeadershipFunction;
private TriFunction<
String,
UUID,
ThrowingRunnable<? extends Throwable>,
CompletableFuture<Void>>
runAsLeaderCallback =
(ignoredComponentId, ignoredLeaderSessionId, ignoredCallback) -> {
throw new UnsupportedOperationException(
"runAsLeader not supported");
};

private Builder() {}

Expand All @@ -280,12 +313,24 @@ public Builder setHasLeadershipFunction(
return this;
}

public Builder setRunAsLeaderCallback(
TriFunction<
String,
UUID,
ThrowingRunnable<? extends Throwable>,
CompletableFuture<Void>>
runAsLeaderCallback) {
this.runAsLeaderCallback = runAsLeaderCallback;
return this;
}

public TestingAbstractLeaderElectionService build() {
return new TestingAbstractLeaderElectionService(
registerConsumer,
removeConsumer,
confirmLeadershipConsumer,
hasLeadershipFunction);
hasLeadershipFunction,
runAsLeaderCallback);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.flink.runtime.leaderelection;

import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -80,6 +83,15 @@ private boolean hasLeadership() {
return issuedLeaderSessionId != null;
}

@Override
public synchronized CompletableFuture<Void> runAsLeader(
UUID leaderSessionId, ThrowingRunnable<? extends Throwable> callback) {
return LeaderElectionUtils.runOrFailWithLeadershipLostException(
leaderSessionId,
callback,
sessionId -> hasLeadership() && leaderSessionId.equals(issuedLeaderSessionId));
}

@Override
public synchronized void close() {
if (hasLeadership() && this.contender != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;

Expand Down Expand Up @@ -100,6 +102,12 @@ public boolean hasLeadership(UUID leaderSessionId) {
return false;
}

@Override
public CompletableFuture<Void> runAsLeader(
UUID leaderSessionId, ThrowingRunnable<? extends Throwable> callback) {
return FutureUtils.completedVoidFuture();
}

@Override
public void close() {}
}
Expand Down

0 comments on commit dd670a7

Please sign in to comment.