Skip to content

Commit

Permalink
[hotfix][runtime] JobResultStore#hasJobResultEntryAsync does not need…
Browse files Browse the repository at this point in the history
… to be triggered as a leader since it's read access only
  • Loading branch information
XComp committed Dec 9, 2024
1 parent 24087b4 commit a73915a
Showing 1 changed file with 4 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElection;
import org.apache.flink.runtime.leaderelection.LeadershipLostException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
Expand Down Expand Up @@ -256,34 +255,17 @@ private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
sequentialOperation =
sequentialOperation.thenCompose(
unused ->
supplyAsyncIfValidLeader(
leaderSessionId,
() ->
jobResultStore.hasJobResultEntryAsync(
getJobID()),
() ->
FutureUtils.completedExceptionally(
new LeadershipLostException(
"The leadership is lost.")))
.handle(
(hasJobResult, throwable) -> {
if (throwable
instanceof LeadershipLostException) {
printLogIfNotValidLeader(
"verify job result entry",
leaderSessionId);
return null;
} else if (throwable != null) {
ExceptionUtils.rethrow(throwable);
}
jobResultStore
.hasJobResultEntryAsync(getJobID())
.thenAccept(
hasJobResult -> {
if (hasJobResult) {
handleJobAlreadyDoneIfValidLeader(
leaderSessionId);
} else {
createNewJobMasterServiceProcessIfValidLeader(
leaderSessionId);
}
return null;
}));
handleAsyncOperationError(sequentialOperation, "Could not start the job manager.");
}
Expand Down Expand Up @@ -517,19 +499,6 @@ private void runIfValidLeader(
noLeaderFallbackCommandDescription, expectedLeaderId));
}

private <T> CompletableFuture<T> supplyAsyncIfValidLeader(
UUID expectedLeaderId,
Supplier<CompletableFuture<T>> supplier,
Supplier<CompletableFuture<T>> noLeaderFallback) {
final CompletableFuture<T> resultFuture = new CompletableFuture<>();
runIfValidLeader(
expectedLeaderId,
() -> FutureUtils.forward(supplier.get(), resultFuture),
() -> FutureUtils.forward(noLeaderFallback.get(), resultFuture));

return resultFuture;
}

@GuardedBy("lock")
private boolean isValidLeader(UUID expectedLeaderId) {
return isRunning()
Expand Down

0 comments on commit a73915a

Please sign in to comment.