Skip to content

Commit

Permalink
[FLINK-32688][runtime] Removes deprecated JobExceptionsInfo.
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Aug 23, 2024
1 parent 4faf096 commit 036741f
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 338 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
Expand Down Expand Up @@ -56,7 +52,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -140,37 +135,8 @@ private static JobExceptionsInfoWithHistory createJobExceptionsInfo(
failureLabelFilter));
}

List<JobExceptionsInfo.ExecutionExceptionInfo> taskExceptionList = new ArrayList<>();
boolean truncated = false;
for (AccessExecutionVertex task : executionGraph.getAllExecutionVertices()) {
for (AccessExecution execution : task.getCurrentExecutions()) {
Optional<ErrorInfo> failure = execution.getFailureInfo();
if (failure.isPresent()) {
if (taskExceptionList.size() >= exceptionToReportMaxSize) {
truncated = true;
break;
}

TaskManagerLocation location = execution.getAssignedResourceLocation();
String locationString = toString(location);
long timestamp = execution.getStateTimestamp(ExecutionState.FAILED);
taskExceptionList.add(
new JobExceptionsInfo.ExecutionExceptionInfo(
failure.get().getExceptionAsString(),
task.getTaskNameWithSubtaskIndex(),
locationString,
timestamp == 0 ? -1 : timestamp,
toTaskManagerId(location)));
}
}
}

final ErrorInfo rootCause = executionGraph.getFailureInfo();
return new JobExceptionsInfoWithHistory(
rootCause.getExceptionAsString(),
rootCause.getTimestamp(),
taskExceptionList,
truncated,
createJobExceptionHistory(
executionGraphInfo.getExceptionHistory(),
exceptionToReportMaxSize,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* {@code JobExceptionsInfoWithHistory} extends {@link JobExceptionsInfo} providing a history of
* previously caused failures. It's the response type of the {@link JobExceptionsHandler}.
* {@code JobExceptionsInfoWithHistory} providing a history of previously caused failures. It's the
* response type of the {@link JobExceptionsHandler}.
*/
public class JobExceptionsInfoWithHistory extends JobExceptionsInfo implements ResponseBody {
public class JobExceptionsInfoWithHistory implements ResponseBody {

public static final String FIELD_NAME_EXCEPTION_HISTORY = "exceptionHistory";

Expand All @@ -50,19 +50,10 @@ public class JobExceptionsInfoWithHistory extends JobExceptionsInfo implements R

@JsonCreator
public JobExceptionsInfoWithHistory(
@JsonProperty(FIELD_NAME_ROOT_EXCEPTION) String rootException,
@JsonProperty(FIELD_NAME_TIMESTAMP) Long rootTimestamp,
@JsonProperty(FIELD_NAME_ALL_EXCEPTIONS) List<ExecutionExceptionInfo> allExceptions,
@JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated,
@JsonProperty(FIELD_NAME_EXCEPTION_HISTORY) JobExceptionHistory exceptionHistory) {
super(rootException, rootTimestamp, allExceptions, truncated);
this.exceptionHistory = exceptionHistory;
}

public JobExceptionsInfoWithHistory(JobExceptionHistory exceptionHistory) {
this(null, null, Collections.emptyList(), false, exceptionHistory);
}

@JsonIgnore
public JobExceptionHistory getExceptionHistory() {
return exceptionHistory;
Expand All @@ -79,30 +70,17 @@ public boolean equals(Object o) {
return false;
}
JobExceptionsInfoWithHistory that = (JobExceptionsInfoWithHistory) o;
return this.isTruncated() == that.isTruncated()
&& Objects.equals(this.getRootException(), that.getRootException())
&& Objects.equals(this.getRootTimestamp(), that.getRootTimestamp())
&& Objects.equals(this.getAllExceptions(), that.getAllExceptions())
&& Objects.equals(exceptionHistory, that.exceptionHistory);
return Objects.equals(exceptionHistory, that.exceptionHistory);
}

@Override
public int hashCode() {
return Objects.hash(
isTruncated(),
getRootException(),
getRootTimestamp(),
getAllExceptions(),
exceptionHistory);
return Objects.hash(exceptionHistory);
}

@Override
public String toString() {
return new StringJoiner(", ", JobExceptionsInfoWithHistory.class.getSimpleName() + "[", "]")
.add("rootException='" + getRootException() + "'")
.add("rootTimestamp=" + getRootTimestamp())
.add("allExceptions=" + getAllExceptions())
.add("truncated=" + isTruncated())
.add("exceptionHistory=" + exceptionHistory)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.ExceptionInfo;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo;
Expand Down Expand Up @@ -451,10 +450,11 @@ private static void checkExceptionLimit(
throws HandlerRequestException {
final HandlerRequest<EmptyRequestBody> handlerRequest =
createRequest(graph.getJobId(), numExpectedException);
final JobExceptionsInfo jobExceptionsInfo =
final JobExceptionsInfoWithHistory jobExceptionsInfo =
jobExceptionsHandler.handleRequest(handlerRequest, graph);
final int numReportedException = Math.min(maxNumExceptions, numExpectedException);
assertThat(numReportedException).isEqualTo(jobExceptionsInfo.getAllExceptions().size());
assertThat(numReportedException)
.isEqualTo(jobExceptionsInfo.getExceptionHistory().getEntries().size());
}

private static ExecutionGraphInfo createAccessExecutionGraph(int numTasks) {
Expand Down
Loading

0 comments on commit 036741f

Please sign in to comment.