From 036741f3c912393289cff1d213c6e690629b212f Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Fri, 23 Aug 2024 22:14:47 +0200 Subject: [PATCH] [FLINK-32688][runtime] Removes deprecated JobExceptionsInfo. --- .../handler/job/JobExceptionsHandler.java | 34 --- .../rest/messages/JobExceptionsInfo.java | 230 ------------------ .../JobExceptionsInfoWithHistory.java | 32 +-- .../handler/job/JobExceptionsHandlerTest.java | 6 +- ...obExceptionsInfoWithHistoryNoRootTest.java | 22 -- .../JobExceptionsInfoWithHistoryTest.java | 22 -- 6 files changed, 8 insertions(+), 338 deletions(-) delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index 6d5f49d55b4cee..dacb1da27da3f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -20,15 +20,11 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.rest.messages.JobExceptionsInfo; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.MessageHeaders; @@ -56,7 +52,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.Executor; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -140,37 +135,8 @@ private static JobExceptionsInfoWithHistory createJobExceptionsInfo( failureLabelFilter)); } - List taskExceptionList = new ArrayList<>(); - boolean truncated = false; - for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) { - for (AccessExecution execution : task.getCurrentExecutions()) { - Optional failure = execution.getFailureInfo(); - if (failure.isPresent()) { - if (taskExceptionList.size() >= exceptionToReportMaxSize) { - truncated = true; - break; - } - - TaskManagerLocation location = execution.getAssignedResourceLocation(); - String locationString = toString(location); - long timestamp = execution.getStateTimestamp(ExecutionState.FAILED); - taskExceptionList.add( - new JobExceptionsInfo.ExecutionExceptionInfo( - failure.get().getExceptionAsString(), - task.getTaskNameWithSubtaskIndex(), - locationString, - timestamp == 0 ? -1 : timestamp, - toTaskManagerId(location))); - } - } - } - final ErrorInfo rootCause = executionGraph.getFailureInfo(); return new JobExceptionsInfoWithHistory( - rootCause.getExceptionAsString(), - rootCause.getTimestamp(), - taskExceptionList, - truncated, createJobExceptionHistory( executionGraphInfo.getExceptionHistory(), exceptionToReportMaxSize, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java deleted file mode 100644 index 0acc64d4b47dc4..00000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfo.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.messages; - -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.List; -import java.util.Objects; -import java.util.StringJoiner; - -/** - * {@code JobExceptionInfo} holds the information for single failure which caused a (maybe partial) - * job restart. - */ -public class JobExceptionsInfo { - - public static final String FIELD_NAME_ROOT_EXCEPTION = "root-exception"; - public static final String FIELD_NAME_TIMESTAMP = "timestamp"; - public static final String FIELD_NAME_ALL_EXCEPTIONS = "all-exceptions"; - public static final String FIELD_NAME_TRUNCATED = "truncated"; - - /** - * @deprecated Use {@link JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead. - */ - @Deprecated - @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) - private final String rootException; - - /** - * @deprecated Use {@link JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead. - */ - @Deprecated - @JsonProperty(FIELD_NAME_TIMESTAMP) - private final Long rootTimestamp; - - /** - * @deprecated Use {@link JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead. - */ - @Deprecated - @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) - private final List allExceptions; - - /** - * @deprecated Use {@link JobExceptionsInfoWithHistory#getExceptionHistory()}'s entries instead. - */ - @Deprecated - @JsonProperty(FIELD_NAME_TRUNCATED) - private final boolean truncated; - - @JsonCreator - public JobExceptionsInfo( - @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException, - @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp, - @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) List allExceptions, - @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) { - this.rootException = rootException; - this.rootTimestamp = rootTimestamp; - this.allExceptions = Preconditions.checkNotNull(allExceptions); - this.truncated = truncated; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - JobExceptionsInfo that = (JobExceptionsInfo) o; - return truncated == that.truncated - && Objects.equals(rootException, that.rootException) - && Objects.equals(rootTimestamp, that.rootTimestamp) - && Objects.equals(allExceptions, that.allExceptions); - } - - @Override - public int hashCode() { - return Objects.hash(rootException, rootTimestamp, allExceptions, truncated); - } - - @Override - public String toString() { - return new StringJoiner(", ", JobExceptionsInfo.class.getSimpleName() + "[", "]") - .add("rootException='" + rootException + "'") - .add("rootTimestamp=" + rootTimestamp) - .add("allExceptions=" + allExceptions) - .add("truncated=" + truncated) - .toString(); - } - - @JsonIgnore - public String getRootException() { - return rootException; - } - - @JsonIgnore - public Long getRootTimestamp() { - return rootTimestamp; - } - - @JsonIgnore - public List getAllExceptions() { - return allExceptions; - } - - @JsonIgnore - public boolean isTruncated() { - return truncated; - } - - // --------------------------------------------------------------------------------- - // Static helper classes - // --------------------------------------------------------------------------------- - - /** - * Nested class to encapsulate the task execution exception. - * - * @deprecated {@code ExecutionExceptionInfo} will be replaced by {@link - * JobExceptionsInfoWithHistory.ExceptionInfo} as part of the effort of deprecating {@link - * JobExceptionsInfo#allExceptions}. - */ - @Deprecated - public static final class ExecutionExceptionInfo { - public static final String FIELD_NAME_EXCEPTION = "exception"; - public static final String FIELD_NAME_TASK = "task"; - @Deprecated public static final String FIELD_NAME_LOCATION = "location"; - public static final String FIELD_NAME_ENDPOINT = "endpoint"; - public static final String FIELD_NAME_TIMESTAMP = "timestamp"; - public static final String FIELD_NAME_TASK_MANAGER_ID = "taskManagerId"; - - @JsonProperty(FIELD_NAME_EXCEPTION) - private final String exception; - - @JsonProperty(FIELD_NAME_TASK) - private final String task; - - @JsonProperty(FIELD_NAME_LOCATION) - private final String location; - - @JsonProperty(FIELD_NAME_ENDPOINT) - private final String endpoint; - - @JsonProperty(FIELD_NAME_TIMESTAMP) - private final long timestamp; - - @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) - private final String taskManagerId; - - public ExecutionExceptionInfo( - String exception, - String task, - String endpoint, - long timestamp, - String taskManagerId) { - this(exception, task, endpoint, endpoint, timestamp, taskManagerId); - } - - @JsonCreator - public ExecutionExceptionInfo( - @JsonProperty(FIELD_NAME_EXCEPTION) String exception, - @JsonProperty(FIELD_NAME_TASK) String task, - @JsonProperty(FIELD_NAME_LOCATION) String location, - @JsonProperty(FIELD_NAME_ENDPOINT) String endpoint, - @JsonProperty(FIELD_NAME_TIMESTAMP) long timestamp, - @JsonProperty(FIELD_NAME_TASK_MANAGER_ID) String taskManagerId) { - this.exception = Preconditions.checkNotNull(exception); - this.task = Preconditions.checkNotNull(task); - this.location = Preconditions.checkNotNull(location); - this.endpoint = Preconditions.checkNotNull(endpoint); - this.timestamp = timestamp; - this.taskManagerId = taskManagerId; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - JobExceptionsInfo.ExecutionExceptionInfo that = - (JobExceptionsInfo.ExecutionExceptionInfo) o; - return timestamp == that.timestamp - && Objects.equals(exception, that.exception) - && Objects.equals(task, that.task) - && Objects.equals(location, that.location) - && Objects.equals(endpoint, that.endpoint) - && Objects.equals(taskManagerId, that.taskManagerId); - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, exception, task, location, endpoint, taskManagerId); - } - - @Override - public String toString() { - return new StringJoiner(", ", ExecutionExceptionInfo.class.getSimpleName() + "[", "]") - .add("exception='" + exception + "'") - .add("task='" + task + "'") - .add("location='" + location + "'") - .add("endpoint='" + endpoint + "'") - .add("timestamp=" + timestamp) - .add("taskManagerId=" + taskManagerId) - .toString(); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java index cb15825cc6bd6b..fbf17723f6e5d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java @@ -38,10 +38,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** - * {@code JobExceptionsInfoWithHistory} extends {@link JobExceptionsInfo} providing a history of - * previously caused failures. It's the response type of the {@link JobExceptionsHandler}. + * {@code JobExceptionsInfoWithHistory} providing a history of previously caused failures. It's the + * response type of the {@link JobExceptionsHandler}. */ -public class JobExceptionsInfoWithHistory extends JobExceptionsInfo implements ResponseBody { +public class JobExceptionsInfoWithHistory implements ResponseBody { public static final String FIELD_NAME_EXCEPTION_HISTORY = "exceptionHistory"; @@ -50,19 +50,10 @@ public class JobExceptionsInfoWithHistory extends JobExceptionsInfo implements R @JsonCreator public JobExceptionsInfoWithHistory( - @JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException, - @JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp, - @JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) List allExceptions, - @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated, @JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory exceptionHistory) { - super(rootException, rootTimestamp, allExceptions, truncated); this.exceptionHistory = exceptionHistory; } - public JobExceptionsInfoWithHistory(JobExceptionHistory exceptionHistory) { - this(null, null, Collections.emptyList(), false, exceptionHistory); - } - @JsonIgnore public JobExceptionHistory getExceptionHistory() { return exceptionHistory; @@ -79,30 +70,17 @@ public boolean equals(Object o) { return false; } JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o; - return this.isTruncated() == that.isTruncated() - && Objects.equals(this.getRootException(), that.getRootException()) - && Objects.equals(this.getRootTimestamp(), that.getRootTimestamp()) - && Objects.equals(this.getAllExceptions(), that.getAllExceptions()) - && Objects.equals(exceptionHistory, that.exceptionHistory); + return Objects.equals(exceptionHistory, that.exceptionHistory); } @Override public int hashCode() { - return Objects.hash( - isTruncated(), - getRootException(), - getRootTimestamp(), - getAllExceptions(), - exceptionHistory); + return Objects.hash(exceptionHistory); } @Override public String toString() { return new StringJoiner(", ", JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]") - .add("rootException='" + getRootException() + "'") - .add("rootTimestamp=" + getRootTimestamp()) - .add("allExceptions=" + getAllExceptions()) - .add("truncated=" + isTruncated()) .add("exceptionHistory=" + exceptionHistory) .toString(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index c40bdb2c8e880e..428e1846afa582 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -37,7 +37,6 @@ import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders; -import org.apache.flink.runtime.rest.messages.JobExceptionsInfo; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.ExceptionInfo; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo; @@ -451,10 +450,11 @@ private static void checkExceptionLimit( throws HandlerRequestException { final HandlerRequest handlerRequest = createRequest(graph.getJobId(), numExpectedException); - final JobExceptionsInfo jobExceptionsInfo = + final JobExceptionsInfoWithHistory jobExceptionsInfo = jobExceptionsHandler.handleRequest(handlerRequest, graph); final int numReportedException = Math.min(maxNumExceptions, numExpectedException); - assertThat(numReportedException).isEqualTo(jobExceptionsInfo.getAllExceptions().size()); + assertThat(numReportedException) + .isEqualTo(jobExceptionsInfo.getExceptionHistory().getEntries().size()); } private static ExecutionGraphInfo createAccessExecutionGraph(int numTasks) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java index 3bb7d1be0b9c90..ac83d0be12353b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryNoRootTest.java @@ -22,10 +22,8 @@ import org.junit.jupiter.api.extension.ExtendWith; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.List; /** * Tests that the {@link JobExceptionsInfoWithHistory} with no root exception can be marshalled and @@ -41,27 +39,7 @@ protected Class getTestResponseClass() { @Override protected JobExceptionsInfoWithHistory getTestResponseInstance() throws Exception { - List executionTaskExceptionInfoList = - new ArrayList<>(); - executionTaskExceptionInfoList.add( - new JobExceptionsInfo.ExecutionExceptionInfo( - "exception1", - "task1", - "location1", - System.currentTimeMillis(), - "taskManagerId1")); - executionTaskExceptionInfoList.add( - new JobExceptionsInfo.ExecutionExceptionInfo( - "exception2", - "task2", - "location2", - System.currentTimeMillis(), - "taskManagerId2")); return new JobExceptionsInfoWithHistory( - null, - null, - executionTaskExceptionInfoList, - false, new JobExceptionsInfoWithHistory.JobExceptionHistory( Arrays.asList( new JobExceptionsInfoWithHistory.RootExceptionInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java index 04e4eaeb3fa71b..20098d97622253 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistoryTest.java @@ -27,9 +27,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import static org.assertj.core.api.Assertions.assertThat; @@ -44,27 +42,7 @@ protected Class getTestResponseClass() { @Override protected JobExceptionsInfoWithHistory getTestResponseInstance() throws Exception { - List executionTaskExceptionInfoList = - new ArrayList<>(); - executionTaskExceptionInfoList.add( - new JobExceptionsInfo.ExecutionExceptionInfo( - "exception1", - "task1", - "location1", - System.currentTimeMillis(), - "taskManagerId1")); - executionTaskExceptionInfoList.add( - new JobExceptionsInfo.ExecutionExceptionInfo( - "exception2", - "task2", - "location2", - System.currentTimeMillis(), - "taskManagerId2")); return new JobExceptionsInfoWithHistory( - "root exception", - System.currentTimeMillis(), - executionTaskExceptionInfoList, - false, new JobExceptionsInfoWithHistory.JobExceptionHistory( Collections.emptyList(), false)); }