Skip to content

Commit

Permalink
[NIFI-13318] processor stop and configure UX
Browse files Browse the repository at this point in the history
  • Loading branch information
scottyaslan committed Nov 21, 2024
1 parent d07b363 commit 4455c12
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ import {
StartComponentRequest,
StartComponentResponse,
StartComponentsRequest,
StartPollingProcessorUntilStoppedRequest,
StartProcessGroupRequest,
StartProcessGroupResponse,
StopComponentRequest,
Expand Down Expand Up @@ -778,6 +779,20 @@ export const pollChangeVersionSuccess = createAction(

export const stopPollingChangeVersion = createAction(`${CANVAS_PREFIX} Stop Polling Change Version`);

export const startPollingProcessorUntilStopped = createAction(
`${CANVAS_PREFIX} Start Polling Processor Until Stopped`,
props<{ request: StartPollingProcessorUntilStoppedRequest }>()
);

export const pollProcessorUntilStopped = createAction(`${CANVAS_PREFIX} Poll Processor Until Stopped`);

export const pollProcessorUntilStoppedSuccess = createAction(
`${CANVAS_PREFIX} Poll Processor Until Stopped Success`,
props<{ response: LoadProcessorSuccess }>()
);

export const stopPollingProcessor = createAction(`${CANVAS_PREFIX} Stop Polling Processor`);

export const openSaveVersionDialog = createAction(
`${CANVAS_PREFIX} Open Save Flow Version Dialog`,
props<{ request: SaveVersionDialogRequest }>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ import {
SaveVersionRequest,
SelectedComponent,
Snippet,
StartComponentRequest,
StopComponentRequest,
StopVersionControlRequest,
StopVersionControlResponse,
UpdateComponentFailure,
Expand All @@ -77,6 +79,7 @@ import {
selectParentProcessGroupId,
selectProcessGroup,
selectProcessor,
selectPollingProcessor,
selectRefreshRpgDetails,
selectRemoteProcessGroup,
selectSaving,
Expand Down Expand Up @@ -158,6 +161,7 @@ import { selectDocumentVisibilityState } from '../../../../state/document-visibi
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import { DocumentVisibility } from '../../../../state/document-visibility';
import { ErrorContextKey } from '../../../../state/error';
import { startComponent, startPollingProcessorUntilStopped, stopComponent } from './flow.actions';

@Injectable()
export class FlowEffects {
Expand Down Expand Up @@ -1550,6 +1554,83 @@ export class FlowEffects {
);
});

this.store
.select(selectProcessor(processorId))
.pipe(
takeUntil(editDialogReference.afterClosed()),
isDefinedAndNotNull(),
filter((processorEntity) => {
return (
processorEntity.revision.clientId === this.client.getClientId() ||
processorEntity.revision.clientId === request.entity.revision.clientId
);
})
)
.subscribe((response) => {
editDialogReference.componentInstance.processorUpdates = response;

if (
!editDialogReference.componentInstance.isStoppable(response) &&
!editDialogReference.componentInstance.isRunnable(response)
) {
this.store.dispatch(
startPollingProcessorUntilStopped({
request: {
id: response.id,
uri: response.uri,
type: ComponentType.Processor,
revision: response.revision,
errorStrategy: 'snackbar'
}
})
);
}
});

editDialogReference.componentInstance.stopComponentRequest
.pipe(takeUntil(editDialogReference.afterClosed()))
.subscribe((stopComponentRequest: StopComponentRequest) => {
this.store.dispatch(
stopComponent({
request: {
id: stopComponentRequest.id,
uri: stopComponentRequest.uri,
type: ComponentType.Processor,
revision: stopComponentRequest.revision,
errorStrategy: 'snackbar'
}
})
);

this.store.dispatch(
startPollingProcessorUntilStopped({
request: {
id: stopComponentRequest.id,
uri: stopComponentRequest.uri,
type: ComponentType.Processor,
revision: stopComponentRequest.revision,
errorStrategy: 'snackbar'
}
})
);
});

editDialogReference.componentInstance.startComponentRequest
.pipe(takeUntil(editDialogReference.afterClosed()))
.subscribe((startComponentRequest: StartComponentRequest) => {
this.store.dispatch(
startComponent({
request: {
id: startComponentRequest.id,
uri: startComponentRequest.uri,
type: ComponentType.Processor,
revision: startComponentRequest.revision,
errorStrategy: 'snackbar'
}
})
);
});

editDialogReference.afterClosed().subscribe((response) => {
this.store.dispatch(resetPropertyVerificationState());
if (response != 'ROUTED') {
Expand All @@ -1572,6 +1653,57 @@ export class FlowEffects {
{ dispatch: false }
);

startPollingProcessorUntilStopped = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.startPollingProcessorUntilStopped),
switchMap(() =>
interval(2000, asyncScheduler).pipe(
takeUntil(this.actions$.pipe(ofType(FlowActions.stopPollingProcessor)))
)
),
switchMap(() => of(FlowActions.pollProcessorUntilStopped()))
)
);

pollProcessorUntilStopped$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.pollProcessorUntilStopped),
concatLatestFrom(() => [this.store.select(selectPollingProcessor).pipe(isDefinedAndNotNull())]),
switchMap(([, pollingProcessor]) => {
return from(
this.flowService.getProcessor(pollingProcessor.id).pipe(
map((response) =>
FlowActions.pollProcessorUntilStoppedSuccess({
response: {
id: pollingProcessor.id,
processor: response
}
})
),
catchError((errorResponse: HttpErrorResponse) => {
this.store.dispatch(FlowActions.stopPollingProcessor());
return of(this.snackBarOrFullScreenError(errorResponse));
})
)
);
})
)
);

pollProcessorUntilStoppedSuccess$ = createEffect(() =>
this.actions$.pipe(
ofType(FlowActions.pollProcessorUntilStoppedSuccess),
map((action) => action.response),
filter((response) => {
return !(
response.processor.status.runStatus === 'Stopped' &&
response.processor.status.aggregateSnapshot.activeThreadCount > 0
);
}),
switchMap(() => of(FlowActions.stopPollingProcessor()))
)
);

openEditConnectionDialog$ = createEffect(
() =>
this.actions$.pipe(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
navigateWithoutTransform,
pasteSuccess,
pollChangeVersionSuccess,
pollProcessorUntilStoppedSuccess,
pollRevertChangesSuccess,
requestRefreshRemoteProcessGroup,
resetFlowState,
Expand All @@ -69,10 +70,12 @@ import {
setTransitionRequired,
startComponent,
startComponentSuccess,
startPollingProcessorUntilStopped,
startProcessGroupSuccess,
startRemoteProcessGroupPolling,
stopComponent,
stopComponentSuccess,
stopPollingProcessor,
stopProcessGroupSuccess,
stopRemoteProcessGroupPolling,
stopVersionControl,
Expand All @@ -93,6 +96,7 @@ import { produce } from 'immer';
export const initialState: FlowState = {
id: 'root',
changeVersionRequest: null,
pollingProcessor: null,
flow: {
permissions: {
canRead: false,
Expand Down Expand Up @@ -296,7 +300,7 @@ export const flowReducer = createReducer(
}
});
}),
on(loadProcessorSuccess, (state, { response }) => {
on(loadProcessorSuccess, pollProcessorUntilStoppedSuccess, (state, { response }) => {
return produce(state, (draftState) => {
const proposedProcessor = response.processor;
const componentIndex: number = draftState.flow.processGroupFlow.flow.processors.findIndex(
Expand Down Expand Up @@ -372,6 +376,14 @@ export const flowReducer = createReducer(
saving: false,
versionSaving: false
})),
on(startPollingProcessorUntilStopped, (state, { request }) => ({
...state,
pollingProcessor: request
})),
on(stopPollingProcessor, (state) => ({
...state,
pollingProcessor: null
})),
on(
createProcessor,
createProcessGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ export const selectChangeVersionRequest = createSelector(
(state: FlowState) => state.changeVersionRequest
);

export const selectPollingProcessor = createSelector(selectFlowState, (state: FlowState) => state.pollingProcessor);

export const selectSaving = createSelector(selectFlowState, (state: FlowState) => state.saving);

export const selectVersionSaving = createSelector(selectFlowState, (state: FlowState) => state.versionSaving);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ export interface FlowState {
versionSaving: boolean;
changeVersionRequest: FlowUpdateRequestEntity | null;
copiedSnippet: CopiedSnippet | null;
pollingProcessor: any | null;
status: 'pending' | 'loading' | 'success' | 'complete';
}

Expand Down Expand Up @@ -775,6 +776,14 @@ export interface StopComponentRequest {
errorStrategy: 'snackbar' | 'banner';
}

export interface StartPollingProcessorUntilStoppedRequest {
id: string;
uri: string;
type: ComponentType;
revision: Revision;
errorStrategy: 'snackbar' | 'banner';
}

export interface StopProcessGroupRequest {
id: string;
type: ComponentType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,51 @@

<h2 mat-dialog-title>
<div class="flex justify-between items-baseline">
<div>
{{ readonly ? 'Processor Details' : 'Edit Processor' }}
<div class="flex items-baseline">
<div class="mr-2">
{{ readonly ? 'Processor Details' : 'Edit Processor' }}
</div>
|
<div class="ml-2 text-base">
{{ formatType(request.entity) }}
</div>
</div>
<div class="text-base">
{{ formatType(request.entity) }}
<div>
@if (isStoppable(request.entity)) {
<button type="button" mat-stroked-button [matMenuTriggerFor]="operateMenu">
<div class="flex items-center">
<i class="mr-2 success-color-default fa fa-play"></i>Running<i
class="ml-2 -mt-1 fa fa-sort-desc"></i>
</div>
</button>
} @else if (isRunnable(request.entity)) {
<button type="button" mat-stroked-button [matMenuTriggerFor]="operateMenu">
<div class="flex items-center">
<i class="mr-2 error-color-variant fa fa-stop"></i>Stopped<i
class="ml-2 -mt-1 fa fa-sort-desc"></i>
</div>
</button>
} @else {
<button type="button" mat-stroked-button [disabled]="true">
<div class="flex items-center">
<i class="mr-2 fa fa-circle-o-notch fa-spin primary-color"></i>{{ formatRunStatus() }}
</div>
</button>
}
<mat-menu #operateMenu="matMenu" xPosition="before">
@if (isStoppable(request.entity)) {
<button mat-menu-item (click)="stop(request.entity)">Stop</button>
} @else if (isRunnable(request.entity)) {
<button mat-menu-item [disabled]="editProcessorForm.dirty" (click)="start(request.entity)">
Start
</button>
}
</mat-menu>
</div>
</div>
</h2>
<form class="processor-edit-form" [formGroup]="editProcessorForm">
<context-error-banner [context]="ErrorContextKey.PROCESSOR"></context-error-banner>
<!-- TODO - Stop & Configure -->
<mat-tab-group [(selectedIndex)]="selectedIndex" (selectedIndexChange)="tabChanged($event)">
<mat-tab label="Settings">
<mat-dialog-content>
Expand Down
Loading

0 comments on commit 4455c12

Please sign in to comment.