Skip to content

Commit

Permalink
[FLINK-14816] Let TaskManagerThreadDumpHandler return JSON response
Browse files Browse the repository at this point in the history
Instead of serving plain text, this commit changes the TaskManagerThreadDumpHandler
to return a JSON response. This allows to further extend this handler to not
only generate a thread dump of all threads but also for a sub set. Morever,
it allows to return a more structured return value in the future.

This closes apache#11887.
  • Loading branch information
tillrohrmann committed Apr 27, 2020
1 parent eeebb59 commit 2f81ac5
Show file tree
Hide file tree
Showing 17 changed files with 362 additions and 67 deletions.
37 changes: 37 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -2936,5 +2936,42 @@
"response" : {
"type" : "any"
}
}, {
"url" : "/taskmanagers/:taskmanagerid/thread-dump",
"method" : "GET",
"status-code" : "200 OK",
"file-upload" : false,
"path-parameters" : {
"pathParameters" : [ {
"key" : "taskmanagerid"
} ]
},
"query-parameters" : {
"queryParameters" : [ ]
},
"request" : {
"type" : "any"
},
"response" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:ThreadDumpInfo",
"properties" : {
"threadInfos" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:ThreadDumpInfo:ThreadInfo",
"properties" : {
"threadName" : {
"type" : "string"
},
"stringifiedThreadInfo" : {
"type" : "string"
}
}
}
}
}
}
} ]
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,12 @@ interface GarbageCollectorsItem {
count: number;
time: number;
}

export interface TaskManagerThreadDumpInterface {
threadInfos: TaskManagerThreadInfoInterface[];
}

interface TaskManagerThreadInfoInterface {
threadName: string;
stringifiedThreadInfo: string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { Injectable } from '@angular/core';
import { EMPTY, of, ReplaySubject } from 'rxjs';
import { catchError, map } from 'rxjs/operators';
import { BASE_URL } from 'config';
import { TaskManagerListInterface, TaskManagerDetailInterface, TaskManagerLogInterface } from 'interfaces';
import { TaskManagerListInterface, TaskManagerDetailInterface, TaskManagerLogInterface, TaskManagerThreadDumpInterface } from 'interfaces';

@Injectable({
providedIn: 'root'
Expand Down Expand Up @@ -88,10 +88,12 @@ export class TaskManagerService {
* Load TM thread dump
*/
loadThreadDump(taskManagerId: string) {
return this.httpClient.get(`${BASE_URL}/taskmanagers/${taskManagerId}/thread-dump`, {
responseType: 'text',
headers: new HttpHeaders().append('Cache-Control', 'no-cache')
});
return this.httpClient
.get<TaskManagerThreadDumpInterface>(`${BASE_URL}/taskmanagers/${taskManagerId}/thread-dump`)
.pipe(
map(taskManagerThreadDump => {
return taskManagerThreadDump.threadInfos.map(threadInfo => threadInfo.stringifiedThreadInfo).join('');
}));
}

constructor(private httpClient: HttpClient) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -657,6 +658,19 @@ public CompletableFuture<Map<IntermediateDataSetID, DataSetMetaInfo>> listDataSe
return CompletableFuture.completedFuture(clusterPartitionTracker.listDataSets());
}

@Override
public CompletableFuture<ThreadDumpInfo> requestThreadDump(ResourceID taskManagerId, Time timeout) {
final WorkerRegistration<WorkerType> taskExecutor = taskExecutors.get(taskManagerId);

if (taskExecutor == null) {
log.debug("Requested thread dump from unregistered TaskExecutor {}.", taskManagerId);
return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId));
} else {
return taskExecutor.getTaskExecutorGateway().requestThreadDump(timeout);
}

}

// ------------------------------------------------------------------------
// Internal methods
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.rest.messages.LogInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskexecutor.FileType;
Expand Down Expand Up @@ -238,4 +239,13 @@ void notifySlotAvailable(
* @return Future which is completed with the historical log list
*/
CompletableFuture<Collection<LogInfo>> requestTaskManagerLogList(ResourceID taskManagerId, @RpcTimeout Time timeout);

/**
* Requests the thread dump from the given {@link TaskExecutor}.
*
* @param taskManagerId taskManagerId identifying the {@link TaskExecutor} to get the thread dump from
* @param timeout timeout of the asynchronous operation
* @return Future containing the thread dump information
*/
CompletableFuture<ThreadDumpInfo> requestThreadDump(ResourceID taskManagerId, @RpcTimeout Time timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
package org.apache.flink.runtime.rest.handler.taskmanager;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.resourcemanager.AbstractResourceManagerHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand All @@ -38,24 +39,24 @@
import java.util.concurrent.CompletableFuture;

/**
* Rest handler which serves the thread dump files from {@link TaskExecutor}.
* Rest handler which serves the thread dump info from a {@link TaskExecutor}.
*/
public class TaskManagerThreadDumpFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {

public TaskManagerThreadDumpFileHandler(
@Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@Nonnull Time timeout,
@Nonnull Map<String, String> responseHeaders,
@Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
@Nonnull TransientBlobService transientBlobService,
@Nonnull Time cacheEntryDuration) {
super(leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
public class TaskManagerThreadDumpHandler extends AbstractResourceManagerHandler<RestfulGateway, EmptyRequestBody, ThreadDumpInfo, TaskManagerMessageParameters> {

public TaskManagerThreadDumpHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, ThreadDumpInfo, TaskManagerMessageParameters> messageHeaders,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever) {
super(leaderRetriever, timeout, responseHeaders, messageHeaders, resourceManagerGatewayRetriever);
}

@Override
protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, Tuple2<ResourceID, String> taskManagerIdAndFileName) {
return resourceManagerGateway.requestTaskManagerFileUploadByType(taskManagerIdAndFileName.f0, FileType.THREAD_DUMP, timeout);
protected CompletableFuture<ThreadDumpInfo> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, TaskManagerMessageParameters> request,
@Nonnull ResourceManagerGateway gateway) throws RestHandlerException {
final ResourceID taskManagerId = request.getPathParameter(TaskManagerIdPathParameter.class);
return gateway.requestThreadDump(taskManagerId, timeout);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@
package org.apache.flink.runtime.rest.messages.taskmanager;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;

import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;

/**
* Headers for the {@link TaskManagerThreadDumpFileHandler}.
* Headers for the {@link TaskManagerThreadDumpHandler}.
*/
public class TaskManagerThreadDumpFileHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> {
public class TaskManagerThreadDumpHeaders implements MessageHeaders<EmptyRequestBody, ThreadDumpInfo, TaskManagerMessageParameters> {

private static final TaskManagerThreadDumpFileHeaders INSTANCE = new TaskManagerThreadDumpFileHeaders();
private static final TaskManagerThreadDumpHeaders INSTANCE = new TaskManagerThreadDumpHeaders();

private static final String URL = String.format("/taskmanagers/:%s/thread-dump", TaskManagerIdPathParameter.KEY);

private TaskManagerThreadDumpFileHeaders() {}
private TaskManagerThreadDumpHeaders() {}

@Override
public Class<EmptyRequestBody> getRequestClass() {
Expand All @@ -54,7 +56,22 @@ public String getTargetRestEndpointURL() {
return URL;
}

public static TaskManagerThreadDumpFileHeaders getInstance() {
public static TaskManagerThreadDumpHeaders getInstance() {
return INSTANCE;
}

@Override
public Class<ThreadDumpInfo> getResponseClass() {
return ThreadDumpInfo.class;
}

@Override
public HttpResponseStatus getResponseStatusCode() {
return HttpResponseStatus.OK;
}

@Override
public String getDescription() {
return "Returns the thread dump of the requested TaskManager.";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.taskmanager;

import org.apache.flink.runtime.rest.messages.ResponseBody;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.io.Serializable;
import java.util.Collection;
import java.util.Objects;

/**
* Class containing thread dump information.
*/
public class ThreadDumpInfo implements ResponseBody, Serializable {
private static final long serialVersionUID = 1L;

public static final String FIELD_NAME_THREAD_INFOS = "threadInfos";

@JsonProperty(FIELD_NAME_THREAD_INFOS)
private final Collection<ThreadInfo> threadInfos;

private ThreadDumpInfo(Collection<ThreadInfo> threadInfos) {
this.threadInfos = threadInfos;
}

public Collection<ThreadInfo> getThreadInfos() {
return threadInfos;
}

@JsonCreator
public static ThreadDumpInfo create(
@JsonProperty(FIELD_NAME_THREAD_INFOS) Collection<ThreadInfo> threadInfos) {
return new ThreadDumpInfo(threadInfos);
}

/**
* Class containing information about a thread.
*/
public static final class ThreadInfo implements Serializable {
private static final long serialVersionUID = 1L;

public static final String FIELD_NAME_THREAD_NAME = "threadName";

public static final String FIELD_NAME_THREAD_INFO = "stringifiedThreadInfo";

@JsonProperty(FIELD_NAME_THREAD_NAME)
private final String threadName;

@JsonProperty(FIELD_NAME_THREAD_INFO)
private final String stringifiedThreadInfo;

private ThreadInfo(String threadName, String stringifiedThreadInfo) {
this.threadName = threadName;
this.stringifiedThreadInfo = stringifiedThreadInfo;
}

@JsonCreator
public static ThreadInfo create(
@JsonProperty(FIELD_NAME_THREAD_NAME) String threadName,
@JsonProperty(FIELD_NAME_THREAD_INFO) String stringifiedThreadInfo) {
return new ThreadInfo(threadName, stringifiedThreadInfo);
}

public String getThreadName() {
return threadName;
}

public String getStringifiedThreadInfo() {
return stringifiedThreadInfo;
}

@Override
public String toString() {
return stringifiedThreadInfo;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ThreadInfo that = (ThreadInfo) o;
return Objects.equals(threadName, that.threadName) &&
Objects.equals(stringifiedThreadInfo, that.stringifiedThreadInfo);
}

@Override
public int hashCode() {
return Objects.hash(threadName, stringifiedThreadInfo);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,4 @@ public enum FileType {
* the stdout file type for taskmanager
*/
STDOUT,

/**
* the thread dump type for taskmanager
*/
THREAD_DUMP
}
Loading

0 comments on commit 2f81ac5

Please sign in to comment.