Skip to content

Commit

Permalink
[FLINK-14816] Add thread dump feature for taskmanager
Browse files Browse the repository at this point in the history
  • Loading branch information
lamberken authored and tillrohrmann committed Apr 24, 2020
1 parent 948a42f commit ec954fb
Show file tree
Hide file tree
Showing 13 changed files with 350 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { TaskManagerService } from 'services';
})
export class TaskManagerStatusComponent implements OnInit, OnDestroy {
@Input() isLoading = true;
listOfNavigation = [{ path: 'metrics', title: 'Metrics' }, { path: 'log-list', title: 'Log' }];
listOfNavigation = [{ path: 'metrics', title: 'Metrics' }, { path: 'log-list', title: 'Log' }, { path: 'thread-dump', title: 'Thread Dump' }];
taskManagerDetail: TaskManagerDetailInterface;
private destroy$ = new Subject();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { TaskManagerLogListComponent } from './log-list/task-manager-log-list.co
import { TaskManagerComponent } from './task-manager.component';
import { TaskManagerListComponent } from './list/task-manager-list.component';
import { TaskManagerMetricsComponent } from './metrics/task-manager-metrics.component';
import { TaskManagerThreadDumpComponent } from './thread-dump/task-manager-thread-dump.component';

const routes: Routes = [
{
Expand All @@ -47,6 +48,13 @@ const routes: Routes = [
path: 'log-list'
}
},
{
path: 'thread-dump',
component: TaskManagerThreadDumpComponent,
data: {
path: 'thread-dump'
}
},
{
path: 'log-list/:logName',
component: TaskManagerLogDetailComponent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { TaskManagerListComponent } from './list/task-manager-list.component';
import { TaskManagerMetricsComponent } from './metrics/task-manager-metrics.component';
import { TaskManagerComponent } from './task-manager.component';
import { TaskManagerStatusComponent } from './status/task-manager-status.component';
import { TaskManagerThreadDumpComponent } from './thread-dump/task-manager-thread-dump.component';

@NgModule({
imports: [CommonModule, ShareModule, TaskManagerRoutingModule],
Expand All @@ -36,7 +37,8 @@ import { TaskManagerStatusComponent } from './status/task-manager-status.compone
TaskManagerComponent,
TaskManagerStatusComponent,
TaskManagerLogListComponent,
TaskManagerLogDetailComponent
TaskManagerLogDetailComponent,
TaskManagerThreadDumpComponent
]
})
export class TaskManagerModule {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<flink-monaco-editor [value]="dump"></flink-monaco-editor>
<flink-refresh-download [downloadHref]="'taskmanagers/'+taskManagerDetail?.id+'/thread-dump'" [downloadName]="'taskmanager_'+taskManagerDetail?.id+'_thread_dump'" (reload)="reload()"></flink-refresh-download>
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

flink-monaco-editor {
height: calc(~"100vh - 160px");
}

:host {
position: relative;
display: block;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { ChangeDetectorRef, Component, OnInit, ViewChild, ChangeDetectionStrategy } from '@angular/core';
import { TaskManagerService } from 'services';
import { first } from 'rxjs/operators';
import { MonacoEditorComponent } from 'share/common/monaco-editor/monaco-editor.component';
import { TaskManagerDetailInterface } from 'interfaces';

@Component({
selector: 'flink-task-manager-thread-dump',
templateUrl: './task-manager-thread-dump.component.html',
styleUrls: ['./task-manager-thread-dump.component.less'],
changeDetection: ChangeDetectionStrategy.OnPush
})
export class TaskManagerThreadDumpComponent implements OnInit {
@ViewChild(MonacoEditorComponent) monacoEditorComponent: MonacoEditorComponent;
dump = '';
taskManagerDetail: TaskManagerDetailInterface;

reload() {
if (this.taskManagerDetail) {
this.taskManagerService.loadThreadDump(this.taskManagerDetail.id).subscribe(data => {
this.monacoEditorComponent.layout();
this.dump = data;
this.cdr.markForCheck();
}, () => {
this.cdr.markForCheck();
});
}
}

constructor(private taskManagerService: TaskManagerService, private cdr: ChangeDetectorRef) {}

ngOnInit() {
this.taskManagerService.taskManagerDetail$.pipe(first()).subscribe(data => {
this.taskManagerDetail = data;
this.reload();
this.cdr.markForCheck();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,15 @@ export class TaskManagerService {
);
}

/**
* Load TM thread dump
*/
loadThreadDump(taskManagerId: string) {
return this.httpClient.get(`${BASE_URL}/taskmanagers/${taskManagerId}/thread-dump`, {
responseType: 'text',
headers: new HttpHeaders().append('Cache-Control', 'no-cache')
});
}

constructor(private httpClient: HttpClient) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.handler.taskmanager;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
import org.apache.flink.runtime.taskexecutor.FileType;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nonnull;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Rest handler which serves the thread dump files from {@link TaskExecutor}.
*/
public class TaskManagerThreadDumpFileHandler extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {

public TaskManagerThreadDumpFileHandler(
@Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@Nonnull Time timeout,
@Nonnull Map<String, String> responseHeaders,
@Nonnull UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> untypedResponseMessageHeaders,
@Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
@Nonnull TransientBlobService transientBlobService,
@Nonnull Time cacheEntryDuration) {
super(leaderRetriever, timeout, responseHeaders, untypedResponseMessageHeaders, resourceManagerGatewayRetriever, transientBlobService, cacheEntryDuration);
}

@Override
protected CompletableFuture<TransientBlobKey> requestFileUpload(ResourceManagerGateway resourceManagerGateway, Tuple2<ResourceID, String> taskManagerIdAndFileName) {
return resourceManagerGateway.requestTaskManagerFileUploadByType(taskManagerIdAndFileName.f0, FileType.THREAD_DUMP, timeout);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest.messages.taskmanager;

import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerThreadDumpFileHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;

/**
* Headers for the {@link TaskManagerThreadDumpFileHandler}.
*/
public class TaskManagerThreadDumpFileHeaders implements UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> {

private static final TaskManagerThreadDumpFileHeaders INSTANCE = new TaskManagerThreadDumpFileHeaders();

private static final String URL = String.format("/taskmanagers/:%s/thread-dump", TaskManagerIdPathParameter.KEY);

private TaskManagerThreadDumpFileHeaders() {}

@Override
public Class<EmptyRequestBody> getRequestClass() {
return EmptyRequestBody.class;
}

@Override
public TaskManagerMessageParameters getUnresolvedMessageParameters() {
return new TaskManagerMessageParameters();
}

@Override
public HttpMethodWrapper getHttpMethod() {
return HttpMethodWrapper.GET;
}

@Override
public String getTargetRestEndpointURL() {
return URL;
}

public static TaskManagerThreadDumpFileHeaders getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@
* Different file types to request from the {@link TaskExecutor}.
*/
public enum FileType {
/**
* the log file type for taskmanager
*/
LOG,
STDOUT

/**
* the stdout file type for taskmanager
*/
STDOUT,

/**
* the thread dump type for taskmanager
*/
THREAD_DUMP
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,15 @@
import org.apache.flink.util.StringUtils;

import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.apache.flink.runtime.util.JvmUtils;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -944,6 +946,8 @@ public CompletableFuture<TransientBlobKey> requestFileUploadByType(FileType file
case STDOUT:
filePath = taskManagerConfiguration.getTaskManagerStdoutPath();
break;
case THREAD_DUMP:
return putTransientBlobStream(JvmUtils.threadDumpStream(), fileType.toString());
default:
filePath = null;
}
Expand Down Expand Up @@ -1016,6 +1020,19 @@ public CompletableFuture<Acknowledge> sendOperatorEventToTask(
// Internal methods
// ======================================================================

private CompletableFuture<TransientBlobKey> putTransientBlobStream(InputStream inputStream, String fileTag) {
final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
final TransientBlobKey transientBlobKey;

try {
transientBlobKey = transientBlobService.putTransient(inputStream);
} catch (IOException e) {
log.debug("Could not upload file {}.", fileTag, e);
return FutureUtils.completedExceptionally(new FlinkException("Could not upload file " + fileTag + '.', e));
}
return CompletableFuture.completedFuture(transientBlobKey);
}

// ------------------------------------------------------------------------
// Internal resource manager connection methods
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1675,15 +1692,12 @@ private CompletableFuture<TransientBlobKey> requestFileUploadByFilePath(String f
return CompletableFuture.supplyAsync(() -> {
final File file = new File(filePath);
if (file.exists()) {
final TransientBlobCache transientBlobService = blobCacheService.getTransientBlobService();
final TransientBlobKey transientBlobKey;
try (FileInputStream fileInputStream = new FileInputStream(file)) {
transientBlobKey = transientBlobService.putTransient(fileInputStream);
} catch (IOException e) {
try {
return putTransientBlobStream(new FileInputStream(file), fileTag).get();
} catch (Exception e) {
log.debug("Could not upload file {}.", fileTag, e);
throw new CompletionException(new FlinkException("Could not upload file " + fileTag + '.', e));
}
return transientBlobKey;
} else {
log.debug("The file {} does not exist on the TaskExecutor {}.", fileTag, getResourceID());
throw new CompletionException(new FlinkException("The file " + fileTag + " does not exist on the TaskExecutor."));
Expand Down
Loading

0 comments on commit ec954fb

Please sign in to comment.