diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/dump/task-manager-dump.component.html b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/dump/task-manager-dump.component.html
new file mode 100644
index 0000000000000..a29a536a46e14
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/dump/task-manager-dump.component.html
@@ -0,0 +1,20 @@
+
+
+
+
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/dump/task-manager-dump.component.less b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/dump/task-manager-dump.component.less
new file mode 100644
index 0000000000000..df8052556a98b
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/dump/task-manager-dump.component.less
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@import "theme";
+
+flink-monaco-editor {
+ height: calc(~"100vh - 310px");
+}
+
+:host {
+ position: relative;
+ display: block;
+ border: 1px solid @border-color-split;
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/dump/task-manager-dump.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/dump/task-manager-dump.component.ts
new file mode 100644
index 0000000000000..ae9c6d9e6d048
--- /dev/null
+++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/dump/task-manager-dump.component.ts
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnInit, ViewChild } from '@angular/core';
+import { TaskManagerDetailInterface } from 'interfaces';
+import { first } from 'rxjs/operators';
+import { TaskManagerService } from 'services';
+import { MonacoEditorComponent } from 'share/common/monaco-editor/monaco-editor.component';
+
+@Component({
+ selector: 'flink-task-manager-dump',
+ templateUrl: './task-manager-dump.component.html',
+ styleUrls: ['./task-manager-dump.component.less'],
+ changeDetection: ChangeDetectionStrategy.OnPush
+})
+export class TaskManagerDumpComponent implements OnInit {
+ @ViewChild(MonacoEditorComponent) monacoEditorComponent: MonacoEditorComponent;
+ dump = '';
+ taskManagerDetail: TaskManagerDetailInterface;
+
+ reload() {
+ if (this.taskManagerDetail) {
+ this.taskManagerService.loadDump(this.taskManagerDetail.id).subscribe(
+ data => {
+ this.monacoEditorComponent.layout();
+ this.dump = data;
+ this.cdr.markForCheck();
+ },
+ () => {
+ this.cdr.markForCheck();
+ }
+ );
+ }
+ }
+
+ constructor(private taskManagerService: TaskManagerService, private cdr: ChangeDetectorRef) {}
+
+ ngOnInit() {
+ this.taskManagerService.taskManagerDetail$.pipe(first()).subscribe(data => {
+ this.taskManagerDetail = data;
+ this.reload();
+ this.cdr.markForCheck();
+ });
+ }
+}
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts
index d54e4ec0777f6..760779bc9849a 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/status/task-manager-status.component.ts
@@ -33,7 +33,8 @@ export class TaskManagerStatusComponent implements OnInit, OnDestroy {
listOfNavigation = [
{ path: 'metrics', title: 'Metrics' },
{ path: 'logs', title: 'Logs' },
- { path: 'stdout', title: 'Stdout' }
+ { path: 'stdout', title: 'Stdout' },
+ { path: 'dump', title: 'Thread Dump' }
];
taskManagerDetail: TaskManagerDetailInterface;
private destroy$ = new Subject();
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager-routing.module.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager-routing.module.ts
index 0f5c7113ee2a3..b1ca215f50310 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager-routing.module.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager-routing.module.ts
@@ -23,6 +23,7 @@ import { TaskManagerListComponent } from './list/task-manager-list.component';
import { TaskManagerLogsComponent } from './logs/task-manager-logs.component';
import { TaskManagerMetricsComponent } from './metrics/task-manager-metrics.component';
import { TaskManagerStdoutComponent } from './stdout/task-manager-stdout.component';
+import { TaskManagerDumpComponent } from './dump/task-manager-dump.component';
const routes: Routes = [
{
@@ -54,6 +55,13 @@ const routes: Routes = [
path: 'stdout'
}
},
+ {
+ path: 'dump',
+ component: TaskManagerDumpComponent,
+ data: {
+ path: 'dump'
+ }
+ },
{
path: '**',
redirectTo: 'metrics',
diff --git a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager.module.ts b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager.module.ts
index 0a521b7f46c70..a4602abba13e3 100644
--- a/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager.module.ts
+++ b/flink-runtime-web/web-dashboard/src/app/pages/task-manager/task-manager.module.ts
@@ -27,6 +27,7 @@ import { TaskManagerComponent } from './task-manager.component';
import { TaskManagerStatusComponent } from './status/task-manager-status.component';
import { TaskManagerLogsComponent } from './logs/task-manager-logs.component';
import { TaskManagerStdoutComponent } from './stdout/task-manager-stdout.component';
+import { TaskManagerDumpComponent } from './dump/task-manager-dump.component';
@NgModule({
imports: [CommonModule, ShareModule, TaskManagerRoutingModule],
@@ -36,7 +37,8 @@ import { TaskManagerStdoutComponent } from './stdout/task-manager-stdout.compone
TaskManagerComponent,
TaskManagerStatusComponent,
TaskManagerLogsComponent,
- TaskManagerStdoutComponent
+ TaskManagerStdoutComponent,
+ TaskManagerDumpComponent
]
})
export class TaskManagerModule {}
diff --git a/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts b/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts
index b4cb2f7882002..f10a4f22336d3 100644
--- a/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts
+++ b/flink-runtime-web/web-dashboard/src/app/services/task-manager.service.ts
@@ -65,5 +65,13 @@ export class TaskManagerService {
return this.httpClient.get(`${BASE_URL}/taskmanagers/${taskManagerId}/stdout`, { responseType: 'text' });
}
+ /**
+ * Load TM dump
+ * @param taskManagerId
+ */
+ loadDump(taskManagerId: string) {
+ return this.httpClient.get(`${BASE_URL}/taskmanagers/${taskManagerId}/dump`, { responseType: 'text' });
+ }
+
constructor(private httpClient: HttpClient) {}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpFileHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpFileHandler.java
new file mode 100644
index 0000000000000..71f2431230492
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerThreadDumpFileHandler.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.taskexecutor.FileType;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Rest handler which serves the thread dump files from {@link TaskExecutor}.
+ */
+public class TaskManagerThreadDumpFileHandler extends AbstractTaskManagerFileHandler {
+
+ public TaskManagerThreadDumpFileHandler(
+ @Nonnull GatewayRetriever extends RestfulGateway> leaderRetriever,
+ @Nonnull Time timeout,
+ @Nonnull Map responseHeaders,
+ @Nonnull UntypedResponseMessageHeaders untypedResponseMessageHeaders,
+ @Nonnull GatewayRetriever resourceManagerGatewayRetriever,
+ @Nonnull TransientBlobService transientBlobService,
+ @Nonnull Time cacheEntryDuration) {
+ super(leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
+ }
+
+ @Override
+ protected CompletableFuture requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
+ return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.THREAD_DUMP, timeout);
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpFileHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpFileHeaders.java
new file mode 100644
index 0000000000000..bbc617b123d72
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/taskmanager/TaskManagerThreadDumpFileHeaders.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.taskmanager;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+
+/**
+ * Headers for the {@link TaskManagerThreadDumpFileHandler}.
+ */
+public class TaskManagerThreadDumpFileHeaders implements UntypedResponseMessageHeaders {
+
+ private static final TaskManagerThreadDumpFileHeaders INSTANCE = new TaskManagerThreadDumpFileHeaders();
+
+ private static final String URL = String.format("/taskmanagers/:%s/dump", TaskManagerIdPathParameter.KEY);
+
+ private TaskManagerThreadDumpFileHeaders() {}
+
+ @Override
+ public Class getRequestClass() {
+ return EmptyRequestBody.class;
+ }
+
+ @Override
+ public TaskManagerMessageParameters getUnresolvedMessageParameters() {
+ return new TaskManagerMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.GET;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ public static TaskManagerThreadDumpFileHeaders getInstance() {
+ return INSTANCE;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java
index bb6923c6b46f9..0b131a7ff4211 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/FileType.java
@@ -22,6 +22,18 @@
* Different file types to request from the {@link TaskExecutor}.
*/
public enum FileType {
+ /**
+ * the log file type for taskmanager
+ */
LOG,
- STDOUT
+
+ /**
+ * the stdout file type for taskmanager
+ */
+ STDOUT,
+
+ /**
+ * the thread dump type for taskmanager
+ */
+ THREAD_DUMP
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 778669df0ebc7..6ced71e336e67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -118,6 +118,7 @@
import org.apache.flink.util.FlinkException;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
+import org.apache.flink.runtime.util.JvmUtils;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -125,6 +126,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
@@ -904,6 +906,8 @@ public CompletableFuture requestFileUpload(FileType fileType,
case STDOUT:
filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
break;
+ case THREAD_DUMP:
+ return putTransientBlobStream(JvmUtils.threadDumpStream(), fileType);
default:
filePath = null;
}
@@ -912,16 +916,12 @@ public CompletableFuture requestFileUpload(FileType fileType,
final File file = new File(filePath);
if (file.exists()) {
- final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
- final TransientBlobKey transientBlobKey;
- try (FileInputStream fileInputStream = new FileInputStream(file)) {
- transientBlobKey = transientBlobService.putTransient(fileInputStream);
+ try {
+ return putTransientBlobStream(new FileInputStream(file), fileType);
} catch (IOException e) {
log.debug("Could not upload file {}.", fileType, e);
return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e));
}
-
- return CompletableFuture.completedFuture(transientBlobKey);
} else {
log.debug("The file {} does not exist on the TaskExecutor {}.", fileType, getResourceID());
return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " does not exist on the TaskExecutor."));
@@ -958,6 +958,19 @@ public void disconnectResourceManager(Exception cause) {
// Internal methods
// ======================================================================
+ private CompletableFuture putTransientBlobStream(InputStream inputStream, FileType fileType) {
+ final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
+ final TransientBlobKey transientBlobKey;
+
+ try {
+ transientBlobKey = transientBlobService.putTransient(inputStream);
+ } catch (IOException e) {
+ log.debug("Could not upload file {}.", fileType, e);
+ return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e));
+ }
+ return CompletableFuture.completedFuture(transientBlobKey);
+ }
+
// ------------------------------------------------------------------------
// Internal resource manager connection methods
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java
new file mode 100644
index 0000000000000..cecaf0de518a8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for {@link java.lang.management.ManagementFactory}.
+ */
+public final class JvmUtils {
+
+ /**
+ * Returns the thread info for all live threads with stack trace and synchronization information.
+ *
+ * @return the thread dump stream of current JVM
+ */
+ public static InputStream threadDumpStream() {
+ ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
+
+ List streams = Arrays
+ .stream(threadMxBean.dumpAllThreads(true, true))
+ .map((v) -> v.toString().getBytes(StandardCharsets.UTF_8))
+ .map(ByteArrayInputStream::new)
+ .collect(Collectors.toList());
+
+ return new SequenceInputStream(Collections.enumeration(streams));
+ }
+
+ /**
+ * Private default constructor to avoid instantiation.
+ */
+ private JvmUtils() {}
+
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index 54de58242c888..f66fbdcb924ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -81,6 +81,7 @@
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler;
+import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagersHandler;
import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
@@ -113,6 +114,7 @@
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerDetailsHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogFileHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerThreadDumpFileHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -641,8 +643,18 @@ protected List> initiali
transientBlobService,
cacheEntryDuration);
+ final TaskManagerThreadDumpFileHandler taskManagerThreadDumpFileHandler = new TaskManagerThreadDumpFileHandler(
+ leaderRetriever,
+ timeout,
+ responseHeaders,
+ TaskManagerThreadDumpFileHeaders.getInstance(),
+ resourceManagerRetriever,
+ transientBlobService,
+ cacheEntryDuration);
+
handlers.add(Tuple2.of(TaskManagerLogFileHeaders.getInstance(), taskManagerLogFileHandler));
handlers.add(Tuple2.of(TaskManagerStdoutFileHeaders.getInstance(), taskManagerStdoutFileHandler));
+ handlers.add(Tuple2.of(TaskManagerThreadDumpFileHeaders.getInstance(), taskManagerThreadDumpFileHandler));
handlers.stream()
.map(tuple -> tuple.f1)