Skip to content

Commit

Permalink
[FLINK-14816] Add thread dump feature for taskmanager
Browse files Browse the repository at this point in the history
  • Loading branch information
lamberken committed Jan 4, 2020
1 parent df5eb21 commit 1a3ab9e
Show file tree
Hide file tree
Showing 13 changed files with 351 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<flink-monaco-editor [value]="dump"></flink-monaco-editor>
<flink-refresh-download [downloadHref]="'taskmanagers/'+taskManagerDetail?.id+'/dump'" [downloadName]="'taskmanager_'+taskManagerDetail?.id+'_dump'" (reload)="reload()"></flink-refresh-download>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@import "theme";

flink-monaco-editor {
height: calc(~"100vh - 310px");
}

:host {
position: relative;
display: block;
border: 1px solid @border-color-split;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnInit, ViewChild } from '@angular/core';
import { TaskManagerDetailInterface } from 'interfaces';
import { first } from 'rxjs/operators';
import { TaskManagerService } from 'services';
import { MonacoEditorComponent } from 'share/common/monaco-editor/monaco-editor.component';

@Component({
selector: 'flink-task-manager-dump',
templateUrl: './task-manager-dump.component.html',
styleUrls: ['./task-manager-dump.component.less'],
changeDetection: ChangeDetectionStrategy.OnPush
})
export class TaskManagerDumpComponent implements OnInit {
@ViewChild(MonacoEditorComponent) monacoEditorComponent: MonacoEditorComponent;
dump = '';
taskManagerDetail: TaskManagerDetailInterface;

reload() {
if (this.taskManagerDetail) {
this.taskManagerService.loadDump(this.taskManagerDetail.id).subscribe(
data => {
this.monacoEditorComponent.layout();
this.dump = data;
this.cdr.markForCheck();
},
() => {
this.cdr.markForCheck();
}
);
}
}

constructor(private taskManagerService: TaskManagerService, private cdr: ChangeDetectorRef) {}

ngOnInit() {
this.taskManagerService.taskManagerDetail$.pipe(first()).subscribe(data => {
this.taskManagerDetail = data;
this.reload();
this.cdr.markForCheck();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ export class TaskManagerStatusComponent implements OnInit, OnDestroy {
listOfNavigation = [
{ path: 'metrics', title: 'Metrics' },
{ path: 'logs', title: 'Logs' },
{ path: 'stdout', title: 'Stdout' }
{ path: 'stdout', title: 'Stdout' },
{ path: 'dump', title: 'Thread Dump' }
];
taskManagerDetail: TaskManagerDetailInterface;
private destroy$ = new Subject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { TaskManagerListComponent } from './list/task-manager-list.component';
import { TaskManagerLogsComponent } from './logs/task-manager-logs.component';
import { TaskManagerMetricsComponent } from './metrics/task-manager-metrics.component';
import { TaskManagerStdoutComponent } from './stdout/task-manager-stdout.component';
import { TaskManagerDumpComponent } from './dump/task-manager-dump.component';

const routes: Routes = [
{
Expand Down Expand Up @@ -54,6 +55,13 @@ const routes: Routes = [
path: 'stdout'
}
},
{
path: 'dump',
component: TaskManagerDumpComponent,
data: {
path: 'dump'
}
},
{
path: '**',
redirectTo: 'metrics',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { TaskManagerComponent } from './task-manager.component';
import { TaskManagerStatusComponent } from './status/task-manager-status.component';
import { TaskManagerLogsComponent } from './logs/task-manager-logs.component';
import { TaskManagerStdoutComponent } from './stdout/task-manager-stdout.component';
import { TaskManagerDumpComponent } from './dump/task-manager-dump.component';

@NgModule({
imports: [CommonModule, ShareModule, TaskManagerRoutingModule],
Expand All @@ -36,7 +37,8 @@ import { TaskManagerStdoutComponent } from './stdout/task-manager-stdout.compone
TaskManagerComponent,
TaskManagerStatusComponent,
TaskManagerLogsComponent,
TaskManagerStdoutComponent
TaskManagerStdoutComponent,
TaskManagerDumpComponent
]
})
export class TaskManagerModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,13 @@ export class TaskManagerService {
return this.httpClient.get(`${BASE_URL}/taskmanagers/${taskManagerId}/stdout`, { responseType: 'text' });
}

/**
* Load TM dump
* @param taskManagerId
*/
loadDump(taskManagerId: string) {
return this.httpClient.get(`${BASE_URL}/taskmanagers/${taskManagerId}/dump`, { responseType: 'text' });
}

constructor(private httpClient: HttpClient) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.taskmanager;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nonnull;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Rest handler which serves the thread dump files from {@link TaskExecutor}.
*/
public class TaskManagerThreadDumpFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {

public TaskManagerThreadDumpFileHandler(
@Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@Nonnull Time timeout,
@Nonnull Map<String, String> responseHeaders,
@Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
@Nonnull TransientBlobService transientBlobService,
@Nonnull Time cacheEntryDuration) {
super(leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
}

@Override
protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, ResourceID taskManagerResourceId) {
return resourceManagerGateway.requestTaskManagerFileUpload(taskManagerResourceId, FileType.THREAD_DUMP, timeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.taskmanager;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;

/**
* Headers for the {@link TaskManagerThreadDumpFileHandler}.
*/
public class TaskManagerThreadDumpFileHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> {

private static final TaskManagerThreadDumpFileHeaders INSTANCE = new TaskManagerThreadDumpFileHeaders();

private static final String URL = String.format("/taskmanagers/:%s/dump", TaskManagerIdPathParameter.KEY);

private TaskManagerThreadDumpFileHeaders() {}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public TaskManagerMessageParameters getUnresolvedMessageParameters() {
return new TaskManagerMessageParameters();
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return URL;
}

public static TaskManagerThreadDumpFileHeaders getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@
* Different file types to request from the {@link TaskExecutor}.
*/
public enum FileType {
/**
* the log file type for taskmanager
*/
LOG,
STDOUT

/**
* the stdout file type for taskmanager
*/
STDOUT,

/**
* the thread dump type for taskmanager
*/
THREAD_DUMP
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@
import org.apache.flink.util.FlinkException;

import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.runtime.util.JvmUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -904,6 +906,8 @@ public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType,
case STDOUT:
filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
break;
case THREAD_DUMP:
return putTransientBlobStream(JvmUtils.threadDumpStream(), fileType);
default:
filePath = null;
}
Expand All @@ -912,16 +916,12 @@ public CompletableFuture<TransientBlobKey> requestFileUpload(FileType fileType,
final File file = new File(filePath);

if (file.exists()) {
final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
final TransientBlobKey transientBlobKey;
try (FileInputStream fileInputStream = new FileInputStream(file)) {
transientBlobKey = transientBlobService.putTransient(fileInputStream);
try {
return putTransientBlobStream(new FileInputStream(file), fileType);
} catch (IOException e) {
log.debug("Could not upload file {}.", fileType, e);
return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e));
}

return CompletableFuture.completedFuture(transientBlobKey);
} else {
log.debug("The file {} does not exist on the TaskExecutor {}.", fileType, getResourceID());
return FutureUtils.completedExceptionally(new FlinkException("The file " + fileType + " does not exist on the TaskExecutor."));
Expand Down Expand Up @@ -958,6 +958,19 @@ public void disconnectResourceManager(Exception cause) {
// Internal methods
// ======================================================================

private CompletableFuture<TransientBlobKey> putTransientBlobStream(InputStream inputStream, FileType fileType) {
final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
final TransientBlobKey transientBlobKey;

try {
transientBlobKey = transientBlobService.putTransient(inputStream);
} catch (IOException e) {
log.debug("Could not upload file {}.", fileType, e);
return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileType + '.', e));
}
return CompletableFuture.completedFuture(transientBlobKey);
}

// ------------------------------------------------------------------------
// Internal resource manager connection methods
// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 1a3ab9e

Please sign in to comment.