From ec954fb5af60895ef2c7accf4cc0e30d3f0a7bba Mon Sep 17 00:00:00 2001 From: lamber-ken <2217232293@qq.com> Date: Sat, 16 Nov 2019 10:50:53 +0800 Subject: [PATCH] [FLINK-14816] Add thread dump feature for taskmanager --- .../status/task-manager-status.component.ts | 2 +- .../task-manager-routing.module.ts | 8 +++ .../pages/task-manager/task-manager.module.ts | 4 +- .../task-manager-thread-dump.component.html | 20 ++++++ .../task-manager-thread-dump.component.less | 26 ++++++++ .../task-manager-thread-dump.component.ts | 57 +++++++++++++++++ .../src/app/services/task-manager.service.ts | 10 +++ .../TaskManagerThreadDumpFileHandler.java | 61 +++++++++++++++++++ .../TaskManagerThreadDumpFileHeaders.java | 60 ++++++++++++++++++ .../flink/runtime/taskexecutor/FileType.java | 14 ++++- .../runtime/taskexecutor/TaskExecutor.java | 26 ++++++-- .../apache/flink/runtime/util/JvmUtils.java | 59 ++++++++++++++++++ .../webmonitor/WebMonitorEndpoint.java | 12 ++++ 13 files changed, 350 insertions(+), 9 deletions(-) create mode 100644 flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.html create mode 100644 flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.less create mode 100644 flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.ts create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpFileHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpFileHeaders.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts index b244445a23fe13..b77592ea4b59d2 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts @@ -30,7 +30,7 @@ import { TaskManagerService } from 'services'; }) export class TaskManagerStatusComponent implements OnInit, OnDestroy { @Input() isLoading = true; - listOfNavigation = [{ path: 'metrics', title: 'Metrics' }, { path: 'log-list', title: 'Log' }]; + listOfNavigation = [{ path: 'metrics', title: 'Metrics' }, { path: 'log-list', title: 'Log' }, { path: 'thread-dump', title: 'Thread Dump' }]; taskManagerDetail: TaskManagerDetailInterface; private destroy$ = new Subject(); diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager-routing.module.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager-routing.module.ts index 25e0ae107ce81d..7642d3a4c3b951 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager-routing.module.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager-routing.module.ts @@ -23,6 +23,7 @@ import { TaskManagerLogListComponent } from './log-list/task-manager-log-list.co import { TaskManagerComponent } from './task-manager.component'; import { TaskManagerListComponent } from './list/task-manager-list.component'; import { TaskManagerMetricsComponent } from './metrics/task-manager-metrics.component'; +import { TaskManagerThreadDumpComponent } from './thread-dump/task-manager-thread-dump.component'; const routes: Routes = [ { @@ -47,6 +48,13 @@ const routes: Routes = [ path: 'log-list' } }, + { + path: 'thread-dump', + component: TaskManagerThreadDumpComponent, + data: { + path: 'thread-dump' + } + }, { path: 'log-list/:logName', component: TaskManagerLogDetailComponent, diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager.module.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager.module.ts index d2ea2cbe2da1a0..3c75d3d1b98370 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager.module.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager.module.ts @@ -27,6 +27,7 @@ import { TaskManagerListComponent } from './list/task-manager-list.component'; import { TaskManagerMetricsComponent } from './metrics/task-manager-metrics.component'; import { TaskManagerComponent } from './task-manager.component'; import { TaskManagerStatusComponent } from './status/task-manager-status.component'; +import { TaskManagerThreadDumpComponent } from './thread-dump/task-manager-thread-dump.component'; @NgModule({ imports: [CommonModule, ShareModule, TaskManagerRoutingModule], @@ -36,7 +37,8 @@ import { TaskManagerStatusComponent } from './status/task-manager-status.compone TaskManagerComponent, TaskManagerStatusComponent, TaskManagerLogListComponent, - TaskManagerLogDetailComponent + TaskManagerLogDetailComponent, + TaskManagerThreadDumpComponent ] }) export class TaskManagerModule {} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.html b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.html new file mode 100644 index 00000000000000..096b6b9d4d48a7 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.html @@ -0,0 +1,20 @@ + + + + diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.less b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.less new file mode 100644 index 00000000000000..805c75d6f30b73 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.less @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +flink-monaco-editor { + height: calc(~"100vh - 160px"); +} + +:host { + position: relative; + display: block; +} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.ts new file mode 100644 index 00000000000000..5d356080c59b01 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/thread-dump/task-manager-thread-dump.component.ts @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { ChangeDetectorRef, Component, OnInit, ViewChild, ChangeDetectionStrategy } from '@angular/core'; +import { TaskManagerService } from 'services'; +import { first } from 'rxjs/operators'; +import { MonacoEditorComponent } from 'share/common/monaco-editor/monaco-editor.component'; +import { TaskManagerDetailInterface } from 'interfaces'; + +@Component({ + selector: 'flink-task-manager-thread-dump', + templateUrl: './task-manager-thread-dump.component.html', + styleUrls: ['./task-manager-thread-dump.component.less'], + changeDetection: ChangeDetectionStrategy.OnPush +}) +export class TaskManagerThreadDumpComponent implements OnInit { + @ViewChild(MonacoEditorComponent) monacoEditorComponent: MonacoEditorComponent; + dump = ''; + taskManagerDetail: TaskManagerDetailInterface; + + reload() { + if (this.taskManagerDetail) { + this.taskManagerService.loadThreadDump(this.taskManagerDetail.id).subscribe(data => { + this.monacoEditorComponent.layout(); + this.dump = data; + this.cdr.markForCheck(); + }, () => { + this.cdr.markForCheck(); + }); + } + } + + constructor(private taskManagerService: TaskManagerService, private cdr: ChangeDetectorRef) {} + + ngOnInit() { + this.taskManagerService.taskManagerDetail$.pipe(first()).subscribe(data => { + this.taskManagerDetail = data; + this.reload(); + this.cdr.markForCheck(); + }); + } +} diff --git a/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts b/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts index bd790bf07aee3e..e35688a1d4e62c 100644 --- a/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts +++ b/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts @@ -84,5 +84,15 @@ export class TaskManagerService { ); } + /** + * Load TM thread dump + */ + loadThreadDump(taskManagerId: string) { + return this.httpClient.get(`${BASE_URL}/taskmanagers/${taskManagerId}/thread-dump`, { + responseType: 'text', + headers: new HttpHeaders().append('Cache-Control', 'no-cache') + }); + } + constructor(private httpClient: HttpClient) {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpFileHandler.java new file mode 100644 index 00000000000000..62b589e805ea29 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpFileHandler.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.blob.TransientBlobKey; +import org.apache.flink.runtime.blob.TransientBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.taskexecutor.FileType; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Rest handler which serves the thread dump files from {@link TaskExecutor}. + */ +public class TaskManagerThreadDumpFileHandler extends AbstractTaskManagerFileHandler { + + public TaskManagerThreadDumpFileHandler( + @Nonnull GatewayRetriever leaderRetriever, + @Nonnull Time timeout, + @Nonnull Map responseHeaders, + @Nonnull UntypedResponseMessageHeaders untypedResponseMessageHeaders, + @Nonnull GatewayRetriever resourceManagerGatewayRetriever, + @Nonnull TransientBlobService transientBlobService, + @Nonnull Time cacheEntryDuration) { + super(leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration); + } + + @Override + protected CompletableFuture requestFileUpload(ResourceManagerGateway resourceManagerGateway, Tuple2 taskManagerIdAndFileName) { + return resourceManagerGateway.requestTaskManagerFileUploadByType(taskManagerIdAndFileName.f0, FileType.THREAD_DUMP, timeout); + } + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpFileHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpFileHeaders.java new file mode 100644 index 00000000000000..6c7985df9ff78a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpFileHeaders.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.taskmanager; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; + +/** + * Headers for the {@link TaskManagerThreadDumpFileHandler}. + */ +public class TaskManagerThreadDumpFileHeaders implements UntypedResponseMessageHeaders { + + private static final TaskManagerThreadDumpFileHeaders INSTANCE = new TaskManagerThreadDumpFileHeaders(); + + private static final String URL = String.format("/taskmanagers/:%s/thread-dump", TaskManagerIdPathParameter.KEY); + + private TaskManagerThreadDumpFileHeaders() {} + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public TaskManagerMessageParameters getUnresolvedMessageParameters() { + return new TaskManagerMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + public static TaskManagerThreadDumpFileHeaders getInstance() { + return INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java index bb6923c6b46f98..0b131a7ff42116 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java @@ -22,6 +22,18 @@ * Different file types to request from the {@link TaskExecutor}. */ public enum FileType { + /** + * the log file type for taskmanager + */ LOG, - STDOUT + + /** + * the stdout file type for taskmanager + */ + STDOUT, + + /** + * the thread dump type for taskmanager + */ + THREAD_DUMP } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 69a80079580cb3..8d514881c7eab0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -127,6 +127,7 @@ import org.apache.flink.util.StringUtils; import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; +import org.apache.flink.runtime.util.JvmUtils; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -134,6 +135,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Arrays; @@ -944,6 +946,8 @@ public CompletableFuture requestFileUploadByType(FileType file case STDOUT: filePath = taskManagerConfiguration.getTaskManagerStdoutPath(); break; + case THREAD_DUMP: + return putTransientBlobStream(JvmUtils.threadDumpStream(), fileType.toString()); default: filePath = null; } @@ -1016,6 +1020,19 @@ public CompletableFuture sendOperatorEventToTask( // Internal methods // ====================================================================== + private CompletableFuture putTransientBlobStream(InputStream inputStream, String fileTag) { + final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService(); + final TransientBlobKey transientBlobKey; + + try { + transientBlobKey = transientBlobService.putTransient(inputStream); + } catch (IOException e) { + log.debug("Could not upload file {}.", fileTag, e); + return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileTag + '.', e)); + } + return CompletableFuture.completedFuture(transientBlobKey); + } + // ------------------------------------------------------------------------ // Internal resource manager connection methods // ------------------------------------------------------------------------ @@ -1675,15 +1692,12 @@ private CompletableFuture requestFileUploadByFilePath(String f return CompletableFuture.supplyAsync(() -> { final File file = new File(filePath); if (file.exists()) { - final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService(); - final TransientBlobKey transientBlobKey; - try (FileInputStream fileInputStream = new FileInputStream(file)) { - transientBlobKey = transientBlobService.putTransient(fileInputStream); - } catch (IOException e) { + try { + return putTransientBlobStream(new FileInputStream(file), fileTag).get(); + } catch (Exception e) { log.debug("Could not upload file {}.", fileTag, e); throw new CompletionException(new FlinkException("Could not upload file " + fileTag + '.', e)); } - return transientBlobKey; } else { log.debug("The file {} does not exist on the TaskExecutor {}.", fileTag, getResourceID()); throw new CompletionException(new FlinkException("The file " + fileTag + " does not exist on the TaskExecutor.")); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java new file mode 100644 index 00000000000000..cecaf0de518a8c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Utilities for {@link java.lang.management.ManagementFactory}. + */ +public final class JvmUtils { + + /** + * Returns the thread info for all live threads with stack trace and synchronization information. + * + * @return the thread dump stream of current JVM + */ + public static InputStream threadDumpStream() { + ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean(); + + List streams = Arrays + .stream(threadMxBean.dumpAllThreads(true, true)) + .map((v) -> v.toString().getBytes(StandardCharsets.UTF_8)) + .map(ByteArrayInputStream::new) + .collect(Collectors.toList()); + + return new SequenceInputStream(Collections.enumeration(streams)); + } + + /** + * Private default constructor to avoid instantiation. + */ + private JvmUtils() {} + +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index fc1887b130a1ad..53f7424e1700ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -85,6 +85,7 @@ import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogListHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler; +import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler; import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler; import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders; import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders; @@ -123,6 +124,7 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerThreadDumpFileHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.util.ExecutorThreadFactory; @@ -710,10 +712,20 @@ protected List> initiali resourceManagerRetriever ); + final TaskManagerThreadDumpFileHandler taskManagerThreadDumpFileHandler = new TaskManagerThreadDumpFileHandler( + leaderRetriever, + timeout, + responseHeaders, + TaskManagerThreadDumpFileHeaders.getInstance(), + resourceManagerRetriever, + transientBlobService, + cacheEntryDuration); + handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler)); handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler)); handlers.add(Tuple2.of(TaskManagerCustomLogHeaders.getInstance(), taskManagerCustomLogHandler)); handlers.add(Tuple2.of(TaskManagerLogsHeaders.getInstance(), taskManagerLogListHandler)); + handlers.add(Tuple2.of(TaskManagerThreadDumpFileHeaders.getInstance(), taskManagerThreadDumpFileHandler)); handlers.stream() .map(tuple -> tuple.f1)