Skip to content

Commit

Permalink
[FLINK-14816] Let TaskManagerThreadDumpHandler return JSON response
Browse files Browse the repository at this point in the history
Instead of serving plain text, this commit changes the TaskManagerThreadDumpHandler
to return a JSON response. This allows to further extend this handler to not
only generate a thread dump of all threads but also for a sub set. Morever,
it allows to return a more structured return value in the future.

This closes #11887.
  • Loading branch information
tillrohrmann committed Apr 24, 2020
1 parent df7cab4 commit c81293a
Show file tree
Hide file tree
Showing 17 changed files with 275 additions and 67 deletions.
25 changes: 25 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -2874,5 +2874,30 @@
"response" : {
"type" : "any"
}
}, {
"url" : "/taskmanagers/:taskmanagerid/thread-dump",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ {
"key" : "taskmanagerid"
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "any"
},
"response" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:ThreadDumpInfo",
"properties" : {
"threadDump" : {
"type" : "string"
}
}
}
} ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,7 @@ interface GarbageCollectorsItem {
count: number;
time: number;
}

export interface TaskManagerThreadDumpInterface {
threadDump: string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { Injectable } from '@angular/core';
import { EMPTY, of, ReplaySubject } from 'rxjs';
import { catchError, map } from 'rxjs/operators';
import { BASE_URL } from 'config';
import { TaskManagerListInterface, TaskManagerDetailInterface, TaskManagerLogInterface } from 'interfaces';
import { TaskManagerListInterface, TaskManagerDetailInterface, TaskManagerLogInterface, TaskManagerThreadDumpInterface } from 'interfaces';

@Injectable({
providedIn: 'root'
Expand Down Expand Up @@ -88,10 +88,9 @@ export class TaskManagerService {
* Load TM thread dump
*/
loadThreadDump(taskManagerId: string) {
return this.httpClient.get(`${BASE_URL}/taskmanagers/${taskManagerId}/thread-dump`, {
responseType: 'text',
headers: new HttpHeaders().append('Cache-Control', 'no-cache')
});
return this.httpClient
.get<TaskManagerThreadDumpInterface>(`${BASE_URL}/taskmanagers/${taskManagerId}/thread-dump`)
.pipe(map(taskManagerThreadDump => taskManagerThreadDump.threadDump));
}

constructor(private httpClient: HttpClient) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -648,6 +649,19 @@ public CompletableFuture<Map<IntermediateDataSetID, DataSetMetaInfo>> listDataSe
return CompletableFuture.completedFuture(clusterPartitionTracker.listDataSets());
}

@Override
public CompletableFuture<ThreadDumpInfo> requestThreadDump(ResourceID taskManagerId, Time timeout) {
final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);

if (taskExecutor == null) {
log.debug("Requested thread dump from unregistered TaskExecutor {}.", taskManagerId);
return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
} else {
return taskExecutor.getTaskExecutorGateway().requestThreadDump(timeout);
}

}

// ------------------------------------------------------------------------
// Internal methods
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskexecutor.FileType;
Expand Down Expand Up @@ -238,4 +239,13 @@ void notifySlotAvailable(
* @return Future which is completed with the historical log list
*/
CompletableFuture<Collection<LogInfo>> requestTaskManagerLogList(ResourceID taskManagerId, @RpcTimeout Time timeout);

/**
* Requests the thread dump from the given {@link TaskExecutor}.
*
* @param taskManagerId taskManagerId identifying the {@link TaskExecutor} to get the thread dump from
* @param timeout timeout of the asynchronous operation
* @return Future containing the thread dump information
*/
CompletableFuture<ThreadDumpInfo> requestThreadDump(ResourceID taskManagerId, @RpcTimeout Time timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
package org.apache.flink.runtime.rest.handler.taskmanager;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.resourcemanager.AbstractResourceManagerHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand All @@ -38,24 +39,24 @@
import java.util.concurrent.CompletableFuture;

/**
* Rest handler which serves the thread dump files from {@link TaskExecutor}.
* Rest handler which serves the thread dump info from a {@link TaskExecutor}.
*/
public class TaskManagerThreadDumpFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {

public TaskManagerThreadDumpFileHandler(
@Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@Nonnull Time timeout,
@Nonnull Map<String, String> responseHeaders,
@Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
@Nonnull TransientBlobService transientBlobService,
@Nonnull Time cacheEntryDuration) {
super(leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
public class TaskManagerThreadDumpHandler extends AbstractResourceManagerHandler<RestfulGateway, EmptyRequestBody, ThreadDumpInfo, TaskManagerMessageParameters> {

public TaskManagerThreadDumpHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, ThreadDumpInfo, TaskManagerMessageParameters> messageHeaders,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders, resourceManagerGatewayRetriever);
}

@Override
protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, Tuple2<ResourceID, String> taskManagerIdAndFileName) {
return resourceManagerGateway.requestTaskManagerFileUploadByType(taskManagerIdAndFileName.f0, FileType.THREAD_DUMP, timeout);
protected CompletableFuture<ThreadDumpInfo> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> request,
@Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
final ResourceID taskManagerId = request.getPathParameter(TaskManagerIdPathParameter.class);
return gateway.requestThreadDump(taskManagerId, timeout);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@
package org.apache.flink.runtime.rest.messages.taskmanager;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* Headers for the {@link TaskManagerThreadDumpFileHandler}.
* Headers for the {@link TaskManagerThreadDumpHandler}.
*/
public class TaskManagerThreadDumpFileHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> {
public class TaskManagerThreadDumpHeaders implements MessageHeaders<EmptyRequestBody, ThreadDumpInfo, TaskManagerMessageParameters> {

private static final TaskManagerThreadDumpFileHeaders INSTANCE = new TaskManagerThreadDumpFileHeaders();
private static final TaskManagerThreadDumpHeaders INSTANCE = new TaskManagerThreadDumpHeaders();

private static final String URL = String.format("/taskmanagers/:%s/thread-dump", TaskManagerIdPathParameter.KEY);

private TaskManagerThreadDumpFileHeaders() {}
private TaskManagerThreadDumpHeaders() {}

@Override
public Class<EmptyRequestBody> getRequestClass() {
Expand All @@ -54,7 +56,22 @@ public String getTargetRestEndpointURL() {
return URL;
}

public static TaskManagerThreadDumpFileHeaders getInstance() {
public static TaskManagerThreadDumpHeaders getInstance() {
return INSTANCE;
}

@Override
public Class<ThreadDumpInfo> getResponseClass() {
return ThreadDumpInfo.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public String getDescription() {
return "Returns the thread dump of the requested TaskManager.";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.taskmanager;

import org.apache.flink.runtime.rest.messages.ResponseBody;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.io.Serializable;

/**
* Class containing thread dump information.
*/
public class ThreadDumpInfo implements ResponseBody, Serializable {
private static final long serialVersionUID = 1L;

public static final String FIELD_NAME_THREAD_DUMP = "threadDump";

@JsonProperty(FIELD_NAME_THREAD_DUMP)
private final String threadDump;

private ThreadDumpInfo(String threadDump) {
this.threadDump = threadDump;
}

@JsonCreator
public static ThreadDumpInfo create(
@JsonProperty(FIELD_NAME_THREAD_DUMP) String threadDump) {
return new ThreadDumpInfo(threadDump);
}

@Override
public String toString() {
return threadDump;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,4 @@ public enum FileType {
* the stdout file type for taskmanager
*/
STDOUT,

/**
* the thread dump type for taskmanager
*/
THREAD_DUMP
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -120,14 +121,14 @@
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.StringUtils;

import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.runtime.util.JvmUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand All @@ -136,6 +137,7 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ThreadInfo;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -946,8 +948,6 @@ public CompletableFuture<TransientBlobKey> requestFileUploadByType(FileType file
case STDOUT:
filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
break;
case THREAD_DUMP:
return putTransientBlobStream(JvmUtils.threadDumpStream(), fileType.toString());
default:
filePath = null;
}
Expand Down Expand Up @@ -1016,6 +1016,17 @@ public CompletableFuture<Acknowledge> sendOperatorEventToTask(
}
}

@Override
public CompletableFuture<ThreadDumpInfo> requestThreadDump(Time timeout) {
final Collection<ThreadInfo> threadDump = JvmUtils.createThreadDump();

final String stringifiedThreadDump = threadDump.stream()
.map(ThreadInfo::toString)
.collect(Collectors.joining());

return CompletableFuture.completedFuture(ThreadDumpInfo.create(stringifiedThreadDump));
}

// ======================================================================
// Internal methods
// ======================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskmanager.Task;
Expand Down Expand Up @@ -249,4 +250,12 @@ CompletableFuture<Acknowledge> sendOperatorEventToTask(
ExecutionAttemptID task,
OperatorID operator,
SerializedValue<OperatorEvent> evt);

/**
* Requests the thread dump from this TaskManager.
*
* @param timeout timeout for the asynchronous operation
* @return the {@link ThreadDumpInfo} for this TaskManager.
*/
CompletableFuture<ThreadDumpInfo> requestThreadDump(@RpcTimeout Time timeout);
}
Loading

0 comments on commit c81293a

Please sign in to comment.