Skip to content

Commit

Permalink
feat(async): long-running async job notification handling (#1491)
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh-Matsuoka authored Dec 11, 2024
1 parent 2e5202b commit 7733f17
Show file tree
Hide file tree
Showing 13 changed files with 444 additions and 61 deletions.
15 changes: 13 additions & 2 deletions src/app/Recordings/ActiveRecordingsTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ export const ActiveRecordingsTable: React.FC<ActiveRecordingsTableProps> = (prop

const handleArchiveRecordings = React.useCallback(() => {
setActionLoadings((old) => ({ ...old, ARCHIVE: true }));
const tasks: Observable<boolean>[] = [];
const tasks: Observable<string>[] = [];
filteredRecordings.forEach((r: ActiveRecording) => {
if (checkedIndices.includes(r.id)) {
handleRowCheck(false, r.id);
Expand All @@ -394,13 +394,24 @@ export const ActiveRecordingsTable: React.FC<ActiveRecordingsTableProps> = (prop
});
addSubscription(
forkJoin(tasks).subscribe({
next: () => handlePostActions('ARCHIVE'),
next: (jobIds) => {
addSubscription(
context.notificationChannel
.messages(NotificationCategory.ArchiveRecordingSuccess)
.subscribe((notification) => {
if (jobIds.includes(notification.message.jobId)) {
handlePostActions('ARCHIVE');
}
}),
);
},
error: () => handlePostActions('ARCHIVE'),
}),
);
}, [
filteredRecordings,
checkedIndices,
context.notificationChannel,
handleRowCheck,
context.api,
addSubscription,
Expand Down
11 changes: 11 additions & 0 deletions src/app/Recordings/ArchivedRecordingsTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,17 @@ export const ArchivedRecordingsTable: React.FC<ArchivedRecordingsTableProps> = (
);
}, [addSubscription, context.notificationChannel, setRecordings, propsTarget]);

React.useEffect(() => {
addSubscription(
combineLatest([
propsTarget,
context.notificationChannel.messages(NotificationCategory.ArchiveRecordingSuccess),
]).subscribe(() => {
refreshRecordingList();
}),
);
}, [addSubscription, context.notificationChannel, propsTarget, refreshRecordingList]);

React.useEffect(() => {
addSubscription(
combineLatest([
Expand Down
30 changes: 15 additions & 15 deletions src/app/Recordings/RecordingActions.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Recording, Target } from '@app/Shared/Services/api.types';
import { NotificationCategory, Recording, Target } from '@app/Shared/Services/api.types';
import { NotificationsContext } from '@app/Shared/Services/Notifications.service';
import { ServiceContext } from '@app/Shared/Services/Services';
import { useSubscriptions } from '@app/utils/hooks/useSubscriptions';
Expand All @@ -23,7 +23,7 @@ import { Td } from '@patternfly/react-table';
import * as React from 'react';
import { useTranslation } from 'react-i18next';
import { Observable } from 'rxjs';
import { first } from 'rxjs/operators';
import { concatMap, filter, first, tap } from 'rxjs/operators';

export interface RowAction {
title?: string | React.ReactNode;
Expand All @@ -36,7 +36,7 @@ export interface RecordingActionsProps {
index: number;
recording: Recording;
sourceTarget?: Observable<Target>;
uploadFn: () => Observable<boolean>;
uploadFn: () => Observable<string>;
}

export const RecordingActions: React.FC<RecordingActionsProps> = ({ recording, uploadFn, ...props }) => {
Expand All @@ -58,21 +58,21 @@ export const RecordingActions: React.FC<RecordingActionsProps> = ({ recording, u
}, [context.api, setGrafanaEnabled, addSubscription]);

const grafanaUpload = React.useCallback(() => {
notifications.info('Upload Started', `Recording "${recording.name}" uploading...`);
addSubscription(
uploadFn()
.pipe(first())
.subscribe((success) => {
if (success) {
notifications.success('Upload Success', `Recording "${recording.name}" uploaded`);
context.api
.grafanaDashboardUrl()
.pipe(first())
.subscribe((url) => window.open(url, '_blank'));
}
}),
.pipe(
tap(() => notifications.info('Upload Started', `Recording "${recording.name}" uploading...`)),
concatMap((jobId) =>
context.notificationChannel
.messages(NotificationCategory.GrafanaUploadSuccess)
.pipe(filter((n) => n.message.jobId === jobId)),
),
tap(() => notifications.success('Upload Success', `Recording ${recording.name} uploaded`)),
concatMap(() => context.api.grafanaDashboardUrl()),
)
.subscribe((url) => window.open(url, '_blank')),
);
}, [addSubscription, notifications, context.api, recording, uploadFn]);
}, [addSubscription, notifications, context.api, context.notificationChannel, recording, uploadFn]);

const handleDownloadRecording = React.useCallback(() => {
context.api.downloadRecording(recording);
Expand Down
16 changes: 8 additions & 8 deletions src/app/Shared/Services/Api.service.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -385,15 +385,15 @@ export class ApiService {
return this.archiveEnabled.asObservable();
}

archiveRecording(remoteId: number): Observable<boolean> {
archiveRecording(remoteId: number): Observable<string> {
return this.target.target().pipe(
filter((t) => !!t),
concatMap((target) =>
this.sendRequest('v4', `targets/${target!.id}/recordings/${remoteId}`, {
method: 'PATCH',
body: 'SAVE',
}).pipe(
map((resp) => resp.ok),
concatMap((resp) => resp.text()),
first(),
),
),
Expand Down Expand Up @@ -445,14 +445,14 @@ export class ApiService {
);
}

uploadActiveRecordingToGrafana(remoteId: number): Observable<boolean> {
uploadActiveRecordingToGrafana(remoteId: number): Observable<string> {
return this.target.target().pipe(
filter((t) => !!t),
concatMap((target) =>
this.sendRequest('v4', `targets/${target!.id}/recordings/${remoteId}/upload`, {
method: 'POST',
}).pipe(
map((resp) => resp.ok),
concatMap((resp) => resp.text()),
first(),
),
),
Expand All @@ -463,13 +463,13 @@ export class ApiService {
uploadArchivedRecordingToGrafana(
sourceTarget: Observable<Target | undefined>,
recordingName: string,
): Observable<boolean> {
): Observable<string> {
return sourceTarget.pipe(
concatMap((target) =>
this.sendRequest('v4', `grafana/${window.btoa((target!.jvmId ?? 'uploads') + '/' + recordingName)}`, {
method: 'POST',
}).pipe(
map((resp) => resp.ok),
concatMap((resp) => resp.text()),
first(),
),
),
Expand All @@ -478,11 +478,11 @@ export class ApiService {
}

// from file system path functions
uploadArchivedRecordingToGrafanaFromPath(jvmId: string, recordingName: string): Observable<boolean> {
uploadArchivedRecordingToGrafanaFromPath(jvmId: string, recordingName: string): Observable<string> {
return this.sendRequest('v4', `grafana/${window.btoa((jvmId ?? 'uploads') + '/' + recordingName)}`, {
method: 'POST',
}).pipe(
map((resp) => resp.ok),
concatMap((resp) => resp.text()),
first(),
);
}
Expand Down
65 changes: 59 additions & 6 deletions src/app/Shared/Services/Report.service.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,28 @@
* limitations under the License.
*/
import { Base64 } from 'js-base64';
import { Observable, from, throwError } from 'rxjs';
import { Observable, Subject, from, throwError } from 'rxjs';
import { fromFetch } from 'rxjs/fetch';
import { concatMap, first, tap } from 'rxjs/operators';
import { Recording, CachedReportValue, GenerationError, AnalysisResult } from './api.types';
import { concatMap, filter, first, tap } from 'rxjs/operators';
import { Recording, CachedReportValue, GenerationError, AnalysisResult, NotificationCategory } from './api.types';
import { isActiveRecording, isQuotaExceededError, isGenerationError } from './api.utils';
import type { LoginService } from './Login.service';
import { NotificationChannel } from './NotificationChannel.service';
import type { NotificationService } from './Notifications.service';

export class ReportService {
constructor(
private login: LoginService,
private notifications: NotificationService,
) {}
channel: NotificationChannel,
) {
channel.messages(NotificationCategory.ReportSuccess).subscribe((v) => {
if (this.jobIds.has(v.message.jobId)) {
this._jobCompletion.next(v.message.jobId);
}
});
}

private readonly jobIds: Map<string, string> = new Map();
private readonly _jobCompletion: Subject<string> = new Subject();

reportJson(recording: Recording, connectUrl: string): Observable<AnalysisResult[]> {
if (!recording.reportUrl) {
Expand All @@ -43,7 +52,49 @@ export class ReportService {
headers,
}).pipe(
concatMap((resp) => {
// 200 indicates that the backend has the report cached and it will return
// the json in the response.
if (resp.ok) {
// 202 indicates that the backend does not have the report cached and will return an
// async job ID in the response immediately, then emit a notification with the same
// job ID later to inform us that the report is now ready and can be retrieved by
// sending a follow-up GET to the Location header value
if (resp.status == 202) {
let subj = new Subject<AnalysisResult[]>();
resp.text().then((jobId) => {
this.jobIds.set(jobId, resp.headers.get('Location') || recording.reportUrl);
this._jobCompletion
.asObservable()
.pipe(filter((id) => id === jobId))
.subscribe((id) => {
const jobUrl = this.jobIds.get(id);
if (!jobUrl) throw new Error(`Unknown job ID: ${id}`);
this.jobIds.delete(id);
fromFetch(jobUrl, {
method: 'GET',
mode: 'cors',
credentials: 'include',
headers,
}).subscribe((resp) => {
if (resp.ok && resp.status === 200) {
resp
.text()
.then(JSON.parse)
.then((obj) => subj.next(Object.values(obj) as AnalysisResult[]));
} else {
const ge: GenerationError = {
name: `Report Failure (${recording.name})`,
message: resp.statusText,
messageDetail: from(resp.text()),
status: resp.status,
};
subj.error(ge);
}
});
});
});
return subj.asObservable();
}
return from(
resp
.text()
Expand Down Expand Up @@ -84,6 +135,8 @@ export class ReportService {
this.notifications.warning(`Report generation failure: ${detail}`);
this.deleteCachedAnalysisReport(connectUrl);
});
} else if (isGenerationError(err) && err.status == 202) {
this.notifications.info('Report generation in progress', 'Report is being generated');
} else {
this.notifications.danger(err.name, err.message);
}
Expand Down
6 changes: 3 additions & 3 deletions src/app/Shared/Services/Services.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import { TargetsService } from './Targets.service';
export interface Services {
target: TargetService;
targets: TargetsService;
reports: ReportService;
api: ApiService;
notificationChannel: NotificationChannel;
reports: ReportService;
settings: SettingsService;
login: LoginService;
}
Expand All @@ -38,15 +38,15 @@ const settings = new SettingsService();
const login = new LoginService(settings);
const api = new ApiService(target, NotificationsInstance, login);
const notificationChannel = new NotificationChannel(NotificationsInstance, login);
const reports = new ReportService(login, NotificationsInstance);
const reports = new ReportService(NotificationsInstance, notificationChannel);
const targets = new TargetsService(api, NotificationsInstance, notificationChannel);

const defaultServices: Services = {
target,
targets,
reports,
api,
notificationChannel,
reports,
settings,
login,
};
Expand Down
6 changes: 6 additions & 0 deletions src/app/Shared/Services/api.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,8 @@ export enum NotificationCategory {
ActiveRecordingStopped = 'ActiveRecordingStopped',
ActiveRecordingSaved = 'ActiveRecordingSaved',
ActiveRecordingDeleted = 'ActiveRecordingDeleted',
ArchiveRecordingSuccess = 'ArchiveRecordingSuccess',
ArchiveRecordingFail = 'ArchiveRecordingFailure',
SnapshotCreated = 'SnapshotCreated',
SnapshotDeleted = 'SnapshotDeleted',
ArchivedRecordingCreated = 'ArchivedRecordingCreated',
Expand All @@ -523,12 +525,16 @@ export enum NotificationCategory {
RuleUpdated = 'RuleUpdated',
RuleDeleted = 'RuleDeleted',
RecordingMetadataUpdated = 'RecordingMetadataUpdated',
GrafanaUploadSuccess = 'GrafanaUploadSuccess',
GrafanaUploadFail = 'GrafanaUploadFailure',
GrafanaConfiguration = 'GrafanaConfiguration', // generated client-side
LayoutTemplateCreated = 'LayoutTemplateCreated', // generated client-side
TargetCredentialsStored = 'TargetCredentialsStored',
TargetCredentialsDeleted = 'TargetCredentialsDeleted',
CredentialsStored = 'CredentialsStored',
CredentialsDeleted = 'CredentialsDeleted',
ReportSuccess = 'ReportSuccess',
ReportFail = 'ReportFailure',
}

export enum CloseStatus {
Expand Down
54 changes: 54 additions & 0 deletions src/app/Shared/Services/api.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,58 @@ export const messageKeys = new Map([
body: (evt) => `Credentials deleted for: ${evt.message.matchExpression}`,
} as NotificationMessageMapper,
],
[
NotificationCategory.ReportSuccess,
{
variant: AlertVariant.info,
title: 'Report Success',
body: (evt) => `Report generated successfully for job: ${evt.message.jobId}`,
hidden: true,
} as NotificationMessageMapper,
],
[
NotificationCategory.ReportFail,
{
variant: AlertVariant.warning,
title: 'Report Generation Failed',
body: (evt) => `Report generation failed for job: ${evt.message.jobId}`,
hidden: true,
} as NotificationMessageMapper,
],
[
NotificationCategory.GrafanaUploadSuccess,
{
variant: AlertVariant.success,
title: 'Grafana Upload Success',
body: (evt) => `Recording successfully uploaded to Grafana for job: ${evt.message.jobId}`,
hidden: false,
} as NotificationMessageMapper,
],
[
NotificationCategory.GrafanaUploadFail,
{
variant: AlertVariant.warning,
title: 'Grafana Upload Failed',
body: (evt) => `Grafana upload failed for job: ${evt.message.jobId}`,
hidden: false,
} as NotificationMessageMapper,
],
[
NotificationCategory.ArchiveRecordingSuccess,
{
variant: AlertVariant.success,
title: 'Recording Archive Success',
body: (evt) => `Recording for job: ${evt.message.jobId} successfully archived.`,
hidden: false,
} as NotificationMessageMapper,
],
[
NotificationCategory.ArchiveRecordingFail,
{
variant: AlertVariant.warning,
title: 'Recording Archive Failed',
body: (evt) => `Grafana upload failed for job: ${evt.message.jobId}`,
hidden: true,
} as NotificationMessageMapper,
],
]);
Loading

0 comments on commit 7733f17

Please sign in to comment.