diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index c30ba82448f14f..fc7cf6a975b3da 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -2874,5 +2874,42 @@ "response" : { "type" : "any" } + }, { + "url" : "/taskmanagers/:taskmanagerid/thread-dump", + "method" : "GET", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "taskmanagerid" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "any" + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:ThreadDumpInfo", + "properties" : { + "threadInfos" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:taskmanager:ThreadDumpInfo:ThreadInfo", + "properties" : { + "threadName" : { + "type" : "string" + }, + "stringifiedThreadInfo" : { + "type" : "string" + } + } + } + } + } + } } ] } \ No newline at end of file diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts index 2172cdd4d308d2..c0c0912644990a 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/task-manager.ts @@ -75,3 +75,12 @@ interface GarbageCollectorsItem { count: number; time: number; } + +export interface TaskManagerThreadDumpInterface { + threadInfos: TaskManagerThreadInfoInterface[]; +} + +interface TaskManagerThreadInfoInterface { + threadName: string; + stringifiedThreadInfo: string; +} diff --git a/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts b/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts index e35688a1d4e62c..54a5b262db9e89 100644 --- a/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts +++ b/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts @@ -21,7 +21,7 @@ import { Injectable } from '@angular/core'; import { EMPTY, of, ReplaySubject } from 'rxjs'; import { catchError, map } from 'rxjs/operators'; import { BASE_URL } from 'config'; -import { TaskManagerListInterface, TaskManagerDetailInterface, TaskManagerLogInterface } from 'interfaces'; +import { TaskManagerListInterface, TaskManagerDetailInterface, TaskManagerLogInterface, TaskManagerThreadDumpInterface } from 'interfaces'; @Injectable({ providedIn: 'root' @@ -88,10 +88,12 @@ export class TaskManagerService { * Load TM thread dump */ loadThreadDump(taskManagerId: string) { - return this.httpClient.get(`${BASE_URL}/taskmanagers/${taskManagerId}/thread-dump`, { - responseType: 'text', - headers: new HttpHeaders().append('Cache-Control', 'no-cache') - }); + return this.httpClient + .get(`${BASE_URL}/taskmanagers/${taskManagerId}/thread-dump`) + .pipe( + map(taskManagerThreadDump => { + return taskManagerThreadDump.threadInfos.map(threadInfo => threadInfo.stringifiedThreadInfo).join(''); + })); } constructor(private httpClient: HttpClient) {} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 2759a38d05dc3f..6b20ec278a1e00 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -59,6 +59,7 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rest.messages.LogInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; @@ -648,6 +649,19 @@ public CompletableFuture> listDataSe return CompletableFuture.completedFuture(clusterPartitionTracker.listDataSets()); } + @Override + public CompletableFuture requestThreadDump(ResourceID taskManagerId, Time timeout) { + final WorkerRegistration taskExecutor = taskExecutors.get(taskManagerId); + + if (taskExecutor == null) { + log.debug("Requested thread dump from unregistered TaskExecutor {}.", taskManagerId); + return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId)); + } else { + return taskExecutor.getTaskExecutorGateway().requestThreadDump(timeout); + } + + } + // ------------------------------------------------------------------------ // Internal methods // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index f3af8cb5d0d753..9cc70b0c01b3c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.rest.messages.LogInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskexecutor.FileType; @@ -238,4 +239,13 @@ void notifySlotAvailable( * @return Future which is completed with the historical log list */ CompletableFuture> requestTaskManagerLogList(ResourceID taskManagerId, @RpcTimeout Time timeout); + + /** + * Requests the thread dump from the given {@link TaskExecutor}. + * + * @param taskManagerId taskManagerId identifying the {@link TaskExecutor} to get the thread dump from + * @param timeout timeout of the asynchronous operation + * @return Future containing the thread dump information + */ + CompletableFuture requestThreadDump(ResourceID taskManagerId, @RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpHandler.java similarity index 50% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpFileHandler.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpHandler.java index 62b589e805ea29..b848f44b0240a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpFileHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpHandler.java @@ -19,15 +19,16 @@ package org.apache.flink.runtime.rest.handler.taskmanager; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.blob.TransientBlobKey; -import org.apache.flink.runtime.blob.TransientBlobService; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.resourcemanager.AbstractResourceManagerHandler; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; -import org.apache.flink.runtime.taskexecutor.FileType; +import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; @@ -38,24 +39,24 @@ import java.util.concurrent.CompletableFuture; /** - * Rest handler which serves the thread dump files from {@link TaskExecutor}. + * Rest handler which serves the thread dump info from a {@link TaskExecutor}. */ -public class TaskManagerThreadDumpFileHandler extends AbstractTaskManagerFileHandler { - - public TaskManagerThreadDumpFileHandler( - @Nonnull GatewayRetriever leaderRetriever, - @Nonnull Time timeout, - @Nonnull Map responseHeaders, - @Nonnull UntypedResponseMessageHeaders untypedResponseMessageHeaders, - @Nonnull GatewayRetriever resourceManagerGatewayRetriever, - @Nonnull TransientBlobService transientBlobService, - @Nonnull Time cacheEntryDuration) { - super(leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration); +public class TaskManagerThreadDumpHandler extends AbstractResourceManagerHandler { + + public TaskManagerThreadDumpHandler( + GatewayRetriever leaderRetriever, + Time timeout, + Map responseHeaders, + MessageHeaders messageHeaders, + GatewayRetriever resourceManagerGatewayRetriever) { + super(leaderRetriever, timeout, responseHeaders, messageHeaders, resourceManagerGatewayRetriever); } @Override - protected CompletableFuture requestFileUpload(ResourceManagerGateway resourceManagerGateway, Tuple2 taskManagerIdAndFileName) { - return resourceManagerGateway.requestTaskManagerFileUploadByType(taskManagerIdAndFileName.f0, FileType.THREAD_DUMP, timeout); + protected CompletableFuture handleRequest( + @Nonnull HandlerRequest request, + @Nonnull ResourceManagerGateway gateway) throws RestHandlerException { + final ResourceID taskManagerId = request.getPathParameter(TaskManagerIdPathParameter.class); + return gateway.requestThreadDump(taskManagerId, timeout); } - } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpFileHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpHeaders.java similarity index 64% rename from flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpFileHeaders.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpHeaders.java index 6c7985df9ff78a..09998fa046a4fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpFileHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpHeaders.java @@ -19,20 +19,22 @@ package org.apache.flink.runtime.rest.messages.taskmanager; import org.apache.flink.runtime.rest.HttpMethodWrapper; -import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpHandler; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; -import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rest.messages.MessageHeaders; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; /** - * Headers for the {@link TaskManagerThreadDumpFileHandler}. + * Headers for the {@link TaskManagerThreadDumpHandler}. */ -public class TaskManagerThreadDumpFileHeaders implements UntypedResponseMessageHeaders { +public class TaskManagerThreadDumpHeaders implements MessageHeaders { - private static final TaskManagerThreadDumpFileHeaders INSTANCE = new TaskManagerThreadDumpFileHeaders(); + private static final TaskManagerThreadDumpHeaders INSTANCE = new TaskManagerThreadDumpHeaders(); private static final String URL = String.format("/taskmanagers/:%s/thread-dump", TaskManagerIdPathParameter.KEY); - private TaskManagerThreadDumpFileHeaders() {} + private TaskManagerThreadDumpHeaders() {} @Override public Class getRequestClass() { @@ -54,7 +56,22 @@ public String getTargetRestEndpointURL() { return URL; } - public static TaskManagerThreadDumpFileHeaders getInstance() { + public static TaskManagerThreadDumpHeaders getInstance() { return INSTANCE; } + + @Override + public Class getResponseClass() { + return ThreadDumpInfo.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Returns the thread dump of the requested TaskManager."; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/ThreadDumpInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/ThreadDumpInfo.java new file mode 100644 index 00000000000000..71494748e6b9e7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/ThreadDumpInfo.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.taskmanager; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Objects; + +/** + * Class containing thread dump information. + */ +public class ThreadDumpInfo implements ResponseBody, Serializable { + private static final long serialVersionUID = 1L; + + public static final String FIELD_NAME_THREAD_INFOS = "threadInfos"; + + @JsonProperty(FIELD_NAME_THREAD_INFOS) + private final Collection threadInfos; + + private ThreadDumpInfo(Collection threadInfos) { + this.threadInfos = threadInfos; + } + + public Collection getThreadInfos() { + return threadInfos; + } + + @JsonCreator + public static ThreadDumpInfo create( + @JsonProperty(FIELD_NAME_THREAD_INFOS) Collection threadInfos) { + return new ThreadDumpInfo(threadInfos); + } + + public static final class ThreadInfo implements Serializable { + private static final long serialVersionUID = 1L; + + public static final String FIELD_NAME_THREAD_NAME = "threadName"; + + public static final String FIELD_NAME_THREAD_INFO = "stringifiedThreadInfo"; + + @JsonProperty(FIELD_NAME_THREAD_NAME) + private final String threadName; + + @JsonProperty(FIELD_NAME_THREAD_INFO) + private final String stringifiedThreadInfo; + + private ThreadInfo(String threadName, String stringifiedThreadInfo) { + this.threadName = threadName; + this.stringifiedThreadInfo = stringifiedThreadInfo; + } + + @JsonCreator + public static ThreadInfo create( + @JsonProperty(FIELD_NAME_THREAD_NAME) String threadName, + @JsonProperty(FIELD_NAME_THREAD_INFO) String stringifiedThreadInfo) { + return new ThreadInfo(threadName, stringifiedThreadInfo); + } + + public String getThreadName() { + return threadName; + } + + public String getStringifiedThreadInfo() { + return stringifiedThreadInfo; + } + + @Override + public String toString() { + return stringifiedThreadInfo; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ThreadInfo that = (ThreadInfo) o; + return Objects.equals(threadName, that.threadName) && + Objects.equals(stringifiedThreadInfo, that.stringifiedThreadInfo); + } + + @Override + public int hashCode() { + return Objects.hash(threadName, stringifiedThreadInfo); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java index 0b131a7ff42116..41d3501d26c92e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java @@ -31,9 +31,4 @@ public enum FileType { * the stdout file type for taskmanager */ STDOUT, - - /** - * the thread dump type for taskmanager - */ - THREAD_DUMP } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 8d514881c7eab0..9bee637d1b1725 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -85,6 +85,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration; import org.apache.flink.runtime.rest.messages.LogInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; @@ -120,6 +121,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.runtime.util.JvmUtils; import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -127,7 +129,6 @@ import org.apache.flink.util.StringUtils; import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; -import org.apache.flink.runtime.util.JvmUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -136,6 +137,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.lang.management.ThreadInfo; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; @@ -946,8 +948,6 @@ public CompletableFuture requestFileUploadByType(FileType file case STDOUT: filePath = taskManagerConfiguration.getTaskManagerStdoutPath(); break; - case THREAD_DUMP: - return putTransientBlobStream(JvmUtils.threadDumpStream(), fileType.toString()); default: filePath = null; } @@ -1016,6 +1016,17 @@ public CompletableFuture sendOperatorEventToTask( } } + @Override + public CompletableFuture requestThreadDump(Time timeout) { + final Collection threadDump = JvmUtils.createThreadDump(); + + final Collection threadInfos = threadDump.stream() + .map(threadInfo -> ThreadDumpInfo.ThreadInfo.create(threadInfo.getThreadName(), threadInfo.toString())) + .collect(Collectors.toList()); + + return CompletableFuture.completedFuture(ThreadDumpInfo.create(threadInfos)); + } + // ====================================================================== // Internal methods // ====================================================================== diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 33224bb3955a73..ad37db04ca7f1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rest.messages.LogInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskmanager.Task; @@ -249,4 +250,12 @@ CompletableFuture sendOperatorEventToTask( ExecutionAttemptID task, OperatorID operator, SerializedValue evt); + + /** + * Requests the thread dump from this TaskManager. + * + * @param timeout timeout for the asynchronous operation + * @return the {@link ThreadDumpInfo} for this TaskManager. + */ + CompletableFuture requestThreadDump(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java index cecaf0de518a8c..1d8b6a7131b919 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java @@ -18,16 +18,11 @@ package org.apache.flink.runtime.util; -import java.io.ByteArrayInputStream; -import java.io.InputStream; -import java.io.SequenceInputStream; import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; -import java.nio.charset.StandardCharsets; import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.stream.Collectors; +import java.util.Collection; /** * Utilities for {@link java.lang.management.ManagementFactory}. @@ -35,20 +30,14 @@ public final class JvmUtils { /** - * Returns the thread info for all live threads with stack trace and synchronization information. + * Creates a thread dump of the current JVM. * - * @return the thread dump stream of current JVM + * @return the thread dump of current JVM */ - public static InputStream threadDumpStream() { + public static Collection createThreadDump() { ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean(); - List streams = Arrays - .stream(threadMxBean.dumpAllThreads(true, true)) - .map((v) -> v.toString().getBytes(StandardCharsets.UTF_8)) - .map(ByteArrayInputStream::new) - .collect(Collectors.toList()); - - return new SequenceInputStream(Collections.enumeration(streams)); + return Arrays.asList(threadMxBean.dumpAllThreads(true, true)); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index 53f7424e1700ee..d1d839a9b9ab44 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -85,7 +85,7 @@ import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogListHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler; -import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler; import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders; import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; @@ -124,7 +124,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders; -import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerThreadDumpFileHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerThreadDumpHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.util.ExecutorThreadFactory; @@ -712,20 +712,18 @@ protected List> initiali resourceManagerRetriever ); - final TaskManagerThreadDumpFileHandler taskManagerThreadDumpFileHandler = new TaskManagerThreadDumpFileHandler( + final TaskManagerThreadDumpHandler taskManagerThreadDumpFileHandler = new TaskManagerThreadDumpHandler( leaderRetriever, timeout, responseHeaders, - TaskManagerThreadDumpFileHeaders.getInstance(), - resourceManagerRetriever, - transientBlobService, - cacheEntryDuration); + TaskManagerThreadDumpHeaders.getInstance(), + resourceManagerRetriever); handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler)); handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler)); handlers.add(Tuple2.of(TaskManagerCustomLogHeaders.getInstance(), taskManagerCustomLogHandler)); handlers.add(Tuple2.of(TaskManagerLogsHeaders.getInstance(), taskManagerLogListHandler)); - handlers.add(Tuple2.of(TaskManagerThreadDumpFileHeaders.getInstance(), taskManagerThreadDumpFileHandler)); + handlers.add(Tuple2.of(TaskManagerThreadDumpHeaders.getInstance(), taskManagerThreadDumpFileHandler)); handlers.stream() .map(tuple -> tuple.f1) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index da56fa4ff1521f..68902ec6d0e2c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException; import org.apache.flink.runtime.rest.messages.LogInfo; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo; import org.apache.flink.runtime.taskexecutor.FileType; import org.apache.flink.runtime.taskexecutor.SlotReport; import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload; @@ -100,6 +101,8 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { private volatile Function>> requestTaskManagerLogListFunction; + private volatile Function> requestThreadDumpFunction; + public TestingResourceManagerGateway() { this( ResourceManagerId.generate(), @@ -178,6 +181,10 @@ public void setNotifySlotAvailableConsumer(Consumer> requestThreadDumpFunction) { + this.requestThreadDumpFunction = requestThreadDumpFunction; + } + @Override public CompletableFuture registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) { final Consumer> currentConsumer = registerJobManagerConsumer; @@ -347,6 +354,17 @@ public CompletableFuture> requestTaskManagerLogList(Resource } } + @Override + public CompletableFuture requestThreadDump(ResourceID taskManagerId, Time timeout) { + final Function> function = this.requestThreadDumpFunction; + + if (function != null) { + return function.apply(taskManagerId); + } else { + return FutureUtils.completedExceptionally(new UnknownTaskExecutorException(taskManagerId)); + } + } + @Override public ResourceManagerId getFencingToken() { return resourceManagerId; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/ThreadDumpInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/ThreadDumpInfoTest.java new file mode 100644 index 00000000000000..5008d36f588f14 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/taskmanager/ThreadDumpInfoTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.taskmanager; + +import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; + +import java.util.Arrays; +import java.util.Collection; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +/** + * Test for (un)marshalling of the {@link ThreadDumpInfo}. + */ +public class ThreadDumpInfoTest extends RestResponseMarshallingTestBase { + + @Override + protected Class getTestResponseClass() { + return ThreadDumpInfo.class; + } + + @Override + protected ThreadDumpInfo getTestResponseInstance() throws Exception { + final Collection threadInfos = Arrays.asList( + ThreadDumpInfo.ThreadInfo.create("foobar", "barfoo"), + ThreadDumpInfo.ThreadInfo.create("bar", "foo")); + + return ThreadDumpInfo.create(threadInfos); + } + + @Override + protected void assertOriginalEqualsToUnmarshalled(ThreadDumpInfo expected, ThreadDumpInfo actual) { + assertThat(actual.getThreadInfos(), containsInAnyOrder(expected.getThreadInfos().toArray())); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index f16bc4e7d90ff6..f7e82d2858062e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rest.messages.LogInfo; +import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.Preconditions; @@ -90,6 +91,8 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { private final TriFunction, CompletableFuture> operatorEventHandler; + private final Supplier> requestThreadDumpSupplier; + TestingTaskExecutorGateway( String address, String hostname, @@ -104,7 +107,8 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { Supplier> canBeReleasedSupplier, TriConsumer, Set> releaseOrPromotePartitionsConsumer, Consumer> releaseClusterPartitionsConsumer, - TriFunction, CompletableFuture> operatorEventHandler) { + TriFunction, CompletableFuture> operatorEventHandler, + Supplier> requestThreadDumpSupplier) { this.address = Preconditions.checkNotNull(address); this.hostname = Preconditions.checkNotNull(hostname); @@ -120,6 +124,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { this.releaseOrPromotePartitionsConsumer = releaseOrPromotePartitionsConsumer; this.releaseClusterPartitionsConsumer = releaseClusterPartitionsConsumer; this.operatorEventHandler = operatorEventHandler; + this.requestThreadDumpSupplier = requestThreadDumpSupplier; } @Override @@ -221,6 +226,11 @@ public CompletableFuture sendOperatorEventToTask( return operatorEventHandler.apply(task, operator, evt); } + @Override + public CompletableFuture requestThreadDump(Time timeout) { + return requestThreadDumpSupplier.get(); + } + @Override public String getAddress() { return address; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java index 0fbbd89598099d..eacaf516caeca5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; @@ -34,6 +35,7 @@ import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.function.TriConsumer; import org.apache.flink.util.function.TriFunction; @@ -62,6 +64,7 @@ public class TestingTaskExecutorGatewayBuilder { private static final Function> NOOP_CANCEL_TASK_FUNCTION = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); private static final TriConsumer, Set> NOOP_RELEASE_PARTITIONS_CONSUMER = (ignoredA, ignoredB, ignoredC) -> {}; private static final TriFunction, CompletableFuture> DEFAULT_OPERATOR_EVENT_HANDLER = (a, b, c) -> CompletableFuture.completedFuture(Acknowledge.get()); + private static final Supplier> DEFAULT_THREAD_DUMP_SUPPLIER = () -> FutureUtils.completedExceptionally(new UnsupportedOperationException()); private String address = "foobar:1234"; private String hostname = "foobar"; @@ -77,6 +80,7 @@ public class TestingTaskExecutorGatewayBuilder { private TriConsumer, Set> releaseOrPromotePartitionsConsumer = NOOP_RELEASE_PARTITIONS_CONSUMER; private Consumer> releaseClusterPartitionsConsumer = ignored -> {}; private TriFunction, CompletableFuture> operatorEventHandler = DEFAULT_OPERATOR_EVENT_HANDLER; + private Supplier> requestThreadDumpSupplier = DEFAULT_THREAD_DUMP_SUPPLIER; public TestingTaskExecutorGatewayBuilder setAddress(String address) { this.address = address; @@ -148,6 +152,10 @@ public TestingTaskExecutorGatewayBuilder setOperatorEventHandler(TriFunction> requestThreadDumpSupplier) { + this.requestThreadDumpSupplier = requestThreadDumpSupplier; + } + public TestingTaskExecutorGateway createTestingTaskExecutorGateway() { return new TestingTaskExecutorGateway( address, @@ -163,6 +171,7 @@ public TestingTaskExecutorGateway createTestingTaskExecutorGateway() { canBeReleasedSupplier, releaseOrPromotePartitionsConsumer, releaseClusterPartitionsConsumer, - operatorEventHandler); + operatorEventHandler, + requestThreadDumpSupplier); } }