Skip to content

Commit

Permalink
[FLINK-???][runtime] Does proper error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Mar 25, 2024
1 parent 2db44f0 commit fbfa5b8
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,17 @@ private CompletableFuture<Void> stopJobMasterServiceProcess() {

return jobMasterServiceProcess
.closeAsync()
.exceptionally(
t -> handleAsyncOperationError(t, "Could not suspend the JobMaster."));
.handle(
(ignored, t) -> {
final JobNotFinishedException error =
new JobNotFinishedException(getJobID());
if (t != null) {
error.addSuppressed(ExceptionUtils.stripCompletionException(t));
}

return handleAsyncOperationError(
error, "Could not stop the JobMaster.");
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarde

assertThatFuture(jobManagerRunner.getResultFuture())
.eventuallyFailsWith(ExecutionException.class)
.withCause(cause);
.havingRootCause()
.isEqualTo(cause);
}

@Test
Expand Down Expand Up @@ -608,6 +609,9 @@ void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception {
@Test
void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() throws Exception {
final CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
final FlinkException testException = new FlinkException("Test exception");
terminationFuture.completeExceptionally(testException);

final JobMasterServiceLeadershipRunner jobManagerRunner =
newJobMasterServiceLeadershipRunnerBuilder()
.withSingleJobMasterServiceProcess(
Expand All @@ -621,12 +625,13 @@ void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() thro
leaderElection.isLeader(UUID.randomUUID());
leaderElection.notLeader();

final FlinkException testException = new FlinkException("Test exception");
terminationFuture.completeExceptionally(testException);

assertThatFuture(jobManagerRunner.getResultFuture())
.eventuallyFailsWith(ExecutionException.class)
.satisfies(cause -> assertThat(cause).hasRootCause(testException));
.havingRootCause()
.isInstanceOf(JobNotFinishedException.class)
.satisfies(
rootCause ->
assertThat(rootCause.getSuppressed()).containsOnly(testException));
}

@Test
Expand All @@ -649,7 +654,9 @@ void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture() throw

assertThatFuture(jobManagerRunner.getResultFuture())
.eventuallyFailsWith(ExecutionException.class)
.satisfies(cause -> assertThat(cause).hasRootCause(testException));
.havingRootCause()
.isEqualTo(testException)
.withNoCause();
}

@Test
Expand Down

0 comments on commit fbfa5b8

Please sign in to comment.