From fbfa5b8ef54fe53a57dfa71435c5cc715c6badb0 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Mon, 25 Mar 2024 11:33:13 +0100 Subject: [PATCH] [FLINK-???][runtime] Does proper error handling --- .../JobMasterServiceLeadershipRunner.java | 13 +++++++++++-- .../JobMasterServiceLeadershipRunnerTest.java | 19 +++++++++++++------ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java index ed97e26a804045..34d17693f8a774 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunner.java @@ -436,8 +436,17 @@ private CompletableFuture stopJobMasterServiceProcess() { return jobMasterServiceProcess .closeAsync() - .exceptionally( - t -> handleAsyncOperationError(t, "Could not suspend the JobMaster.")); + .handle( + (ignored, t) -> { + final JobNotFinishedException error = + new JobNotFinishedException(getJobID()); + if (t != null) { + error.addSuppressed(ExceptionUtils.stripCompletionException(t)); + } + + return handleAsyncOperationError( + error, "Could not stop the JobMaster."); + }); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java index a088d3f32c1463..e25ad5751a2689 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterServiceLeadershipRunnerTest.java @@ -214,7 +214,8 @@ void testExceptionallyCompletedResultFutureFromJobMasterServiceProcessIsForwarde assertThatFuture(jobManagerRunner.getResultFuture()) .eventuallyFailsWith(ExecutionException.class) - .withCause(cause); + .havingRootCause() + .isEqualTo(cause); } @Test @@ -608,6 +609,9 @@ void testJobStatusCancellingIsClearedOnLeadershipLoss() throws Exception { @Test void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() throws Exception { final CompletableFuture terminationFuture = new CompletableFuture<>(); + final FlinkException testException = new FlinkException("Test exception"); + terminationFuture.completeExceptionally(testException); + final JobMasterServiceLeadershipRunner jobManagerRunner = newJobMasterServiceLeadershipRunnerBuilder() .withSingleJobMasterServiceProcess( @@ -621,12 +625,13 @@ void testJobMasterServiceProcessClosingExceptionIsForwardedToResultFuture() thro leaderElection.isLeader(UUID.randomUUID()); leaderElection.notLeader(); - final FlinkException testException = new FlinkException("Test exception"); - terminationFuture.completeExceptionally(testException); - assertThatFuture(jobManagerRunner.getResultFuture()) .eventuallyFailsWith(ExecutionException.class) - .satisfies(cause -> assertThat(cause).hasRootCause(testException)); + .havingRootCause() + .isInstanceOf(JobNotFinishedException.class) + .satisfies( + rootCause -> + assertThat(rootCause.getSuppressed()).containsOnly(testException)); } @Test @@ -649,7 +654,9 @@ void testJobMasterServiceProcessCreationFailureIsForwardedToResultFuture() throw assertThatFuture(jobManagerRunner.getResultFuture()) .eventuallyFailsWith(ExecutionException.class) - .satisfies(cause -> assertThat(cause).hasRootCause(testException)); + .havingRootCause() + .isEqualTo(testException) + .withNoCause(); } @Test