diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts index dec106d2f812..b7bdfbaa66f2 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.actions.ts @@ -90,6 +90,7 @@ import { StartComponentRequest, StartComponentResponse, StartComponentsRequest, + StartPollingProcessorUntilStoppedRequest, StartProcessGroupRequest, StartProcessGroupResponse, StopComponentRequest, @@ -778,6 +779,20 @@ export const pollChangeVersionSuccess = createAction( export const stopPollingChangeVersion = createAction(`${CANVAS_PREFIX} Stop Polling Change Version`); +export const startPollingProcessorUntilStopped = createAction( + `${CANVAS_PREFIX} Start Polling Processor Until Stopped`, + props<{ request: StartPollingProcessorUntilStoppedRequest }>() +); + +export const pollProcessorUntilStopped = createAction(`${CANVAS_PREFIX} Poll Processor Until Stopped`); + +export const pollProcessorUntilStoppedSuccess = createAction( + `${CANVAS_PREFIX} Poll Processor Until Stopped Success`, + props<{ response: LoadProcessorSuccess }>() +); + +export const stopPollingProcessor = createAction(`${CANVAS_PREFIX} Stop Polling Processor`); + export const openSaveVersionDialog = createAction( `${CANVAS_PREFIX} Open Save Flow Version Dialog`, props<{ request: SaveVersionDialogRequest }>() diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts index ffc14fa94471..353d5bbe6fe3 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.effects.ts @@ -53,6 +53,8 @@ import { SaveVersionRequest, SelectedComponent, Snippet, + StartComponentRequest, + StopComponentRequest, StopVersionControlRequest, StopVersionControlResponse, UpdateComponentFailure, @@ -77,6 +79,7 @@ import { selectParentProcessGroupId, selectProcessGroup, selectProcessor, + selectPollingProcessor, selectRefreshRpgDetails, selectRemoteProcessGroup, selectSaving, @@ -158,6 +161,7 @@ import { selectDocumentVisibilityState } from '../../../../state/document-visibi import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; import { DocumentVisibility } from '../../../../state/document-visibility'; import { ErrorContextKey } from '../../../../state/error'; +import { startComponent, startPollingProcessorUntilStopped, stopComponent } from './flow.actions'; @Injectable() export class FlowEffects { @@ -1550,6 +1554,83 @@ export class FlowEffects { ); }); + this.store + .select(selectProcessor(processorId)) + .pipe( + takeUntil(editDialogReference.afterClosed()), + isDefinedAndNotNull(), + filter((processorEntity) => { + return ( + processorEntity.revision.clientId === this.client.getClientId() || + processorEntity.revision.clientId === request.entity.revision.clientId + ); + }) + ) + .subscribe((response) => { + editDialogReference.componentInstance.processorUpdates = response; + + if ( + !editDialogReference.componentInstance.isStoppable(response) && + !editDialogReference.componentInstance.isRunnable(response) + ) { + this.store.dispatch( + startPollingProcessorUntilStopped({ + request: { + id: response.id, + uri: response.uri, + type: ComponentType.Processor, + revision: response.revision, + errorStrategy: 'snackbar' + } + }) + ); + } + }); + + editDialogReference.componentInstance.stopComponentRequest + .pipe(takeUntil(editDialogReference.afterClosed())) + .subscribe((stopComponentRequest: StopComponentRequest) => { + this.store.dispatch( + stopComponent({ + request: { + id: stopComponentRequest.id, + uri: stopComponentRequest.uri, + type: ComponentType.Processor, + revision: stopComponentRequest.revision, + errorStrategy: 'snackbar' + } + }) + ); + + this.store.dispatch( + startPollingProcessorUntilStopped({ + request: { + id: stopComponentRequest.id, + uri: stopComponentRequest.uri, + type: ComponentType.Processor, + revision: stopComponentRequest.revision, + errorStrategy: 'snackbar' + } + }) + ); + }); + + editDialogReference.componentInstance.startComponentRequest + .pipe(takeUntil(editDialogReference.afterClosed())) + .subscribe((startComponentRequest: StartComponentRequest) => { + this.store.dispatch( + startComponent({ + request: { + id: startComponentRequest.id, + uri: startComponentRequest.uri, + type: ComponentType.Processor, + revision: startComponentRequest.revision, + errorStrategy: 'snackbar' + } + }) + ); + }); + editDialogReference.afterClosed().subscribe((response) => { this.store.dispatch(resetPropertyVerificationState()); if (response != 'ROUTED') { @@ -1572,6 +1653,57 @@ export class FlowEffects { { dispatch: false } ); + startPollingProcessorUntilStopped = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.startPollingProcessorUntilStopped), + switchMap(() => + interval(2000, asyncScheduler).pipe( + takeUntil(this.actions$.pipe(ofType(FlowActions.stopPollingProcessor))) + ) + ), + switchMap(() => of(FlowActions.pollProcessorUntilStopped())) + ) + ); + + pollProcessorUntilStopped$ = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.pollProcessorUntilStopped), + concatLatestFrom(() => [this.store.select(selectPollingProcessor).pipe(isDefinedAndNotNull())]), + switchMap(([, pollingProcessor]) => { + return from( + this.flowService.getProcessor(pollingProcessor.id).pipe( + map((response) => + FlowActions.pollProcessorUntilStoppedSuccess({ + response: { + id: pollingProcessor.id, + processor: response + } + }) + ), + catchError((errorResponse: HttpErrorResponse) => { + this.store.dispatch(FlowActions.stopPollingProcessor()); + return of(this.snackBarOrFullScreenError(errorResponse)); + }) + ) + ); + }) + ) + ); + + pollProcessorUntilStoppedSuccess$ = createEffect(() => + this.actions$.pipe( + ofType(FlowActions.pollProcessorUntilStoppedSuccess), + map((action) => action.response), + filter((response) => { + return !( + response.processor.status.runStatus === 'Stopped' && + response.processor.status.aggregateSnapshot.activeThreadCount > 0 + ); + }), + switchMap(() => of(FlowActions.stopPollingProcessor())) + ) + ); + openEditConnectionDialog$ = createEffect( () => this.actions$.pipe( diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts index d284d643df0e..c44cadef4aad 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.reducer.ts @@ -51,6 +51,7 @@ import { navigateWithoutTransform, pasteSuccess, pollChangeVersionSuccess, + pollProcessorUntilStoppedSuccess, pollRevertChangesSuccess, requestRefreshRemoteProcessGroup, resetFlowState, @@ -69,10 +70,12 @@ import { setTransitionRequired, startComponent, startComponentSuccess, + startPollingProcessorUntilStopped, startProcessGroupSuccess, startRemoteProcessGroupPolling, stopComponent, stopComponentSuccess, + stopPollingProcessor, stopProcessGroupSuccess, stopRemoteProcessGroupPolling, stopVersionControl, @@ -93,6 +96,7 @@ import { produce } from 'immer'; export const initialState: FlowState = { id: 'root', changeVersionRequest: null, + pollingProcessor: null, flow: { permissions: { canRead: false, @@ -296,7 +300,7 @@ export const flowReducer = createReducer( } }); }), - on(loadProcessorSuccess, (state, { response }) => { + on(loadProcessorSuccess, pollProcessorUntilStoppedSuccess, (state, { response }) => { return produce(state, (draftState) => { const proposedProcessor = response.processor; const componentIndex: number = draftState.flow.processGroupFlow.flow.processors.findIndex( @@ -372,6 +376,14 @@ export const flowReducer = createReducer( saving: false, versionSaving: false })), + on(startPollingProcessorUntilStopped, (state, { request }) => ({ + ...state, + pollingProcessor: request + })), + on(stopPollingProcessor, (state) => ({ + ...state, + pollingProcessor: null + })), on( createProcessor, createProcessGroup, diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts index fb4620432ea9..927c0b5f552a 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/flow.selectors.ts @@ -30,6 +30,8 @@ export const selectChangeVersionRequest = createSelector( (state: FlowState) => state.changeVersionRequest ); +export const selectPollingProcessor = createSelector(selectFlowState, (state: FlowState) => state.pollingProcessor); + export const selectSaving = createSelector(selectFlowState, (state: FlowState) => state.saving); export const selectVersionSaving = createSelector(selectFlowState, (state: FlowState) => state.versionSaving); diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts index d1d9041e6dbe..0dd834db6236 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/state/flow/index.ts @@ -642,6 +642,7 @@ export interface FlowState { versionSaving: boolean; changeVersionRequest: FlowUpdateRequestEntity | null; copiedSnippet: CopiedSnippet | null; + pollingProcessor: any | null; status: 'pending' | 'loading' | 'success' | 'complete'; } @@ -775,6 +776,14 @@ export interface StopComponentRequest { errorStrategy: 'snackbar' | 'banner'; } +export interface StartPollingProcessorUntilStoppedRequest { + id: string; + uri: string; + type: ComponentType; + revision: Revision; + errorStrategy: 'snackbar' | 'banner'; +} + export interface StopProcessGroupRequest { id: string; type: ComponentType; diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html index 93a9b035fe30..9d8fcdf5e5ac 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.html @@ -17,17 +17,51 @@

-
- {{ readonly ? 'Processor Details' : 'Edit Processor' }} +
+
+ {{ readonly ? 'Processor Details' : 'Edit Processor' }} +
+ | +
+ {{ formatType(request.entity) }} +
-
- {{ formatType(request.entity) }} +
+ @if (isStoppable(request.entity)) { + + } @else if (isRunnable(request.entity)) { + + } @else { + + } + + @if (isStoppable(request.entity)) { + + } @else if (isRunnable(request.entity)) { + + } +

- diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.ts index b238e9ecc8fe..46daa8059d1e 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/pages/flow-designer/ui/canvas/items/processor/edit-processor/edit-processor.component.ts @@ -17,6 +17,7 @@ import { Component, EventEmitter, Inject, Input, Output } from '@angular/core'; import { MAT_DIALOG_DATA, MatDialogModule } from '@angular/material/dialog'; +import { MatMenuModule } from '@angular/material/menu'; import { AbstractControl, FormBuilder, @@ -42,7 +43,12 @@ import { Property } from '../../../../../../../state/shared'; import { Client } from '../../../../../../../service/client.service'; -import { EditComponentDialogRequest, UpdateProcessorRequest } from '../../../../../state/flow'; +import { + EditComponentDialogRequest, + StartComponentRequest, + StopComponentRequest, + UpdateProcessorRequest +} from '../../../../../state/flow'; import { PropertyTable } from '../../../../../../../ui/common/property-table/property-table.component'; import { NifiSpinnerDirective } from '../../../../../../../ui/common/spinner/nifi-spinner.directive'; import { NifiTooltipDirective, NiFiCommon, TextTip, CopyDirective } from '@nifi/shared'; @@ -79,6 +85,7 @@ import { ContextErrorBanner } from '../../../../../../../ui/common/context-error MatTabsModule, MatOptionModule, MatSelectModule, + MatMenuModule, AsyncPipe, PropertyTable, NifiSpinnerDirective, @@ -93,6 +100,9 @@ import { ContextErrorBanner } from '../../../../../../../ui/common/context-error styleUrls: ['./edit-processor.component.scss'] }) export class EditProcessor extends TabbedDialog { + @Input() set processorUpdates(processorUpdates: any | undefined) { + this.initialize(processorUpdates); + } @Input() createNewProperty!: (existingProperties: string[], allowsSensitive: boolean) => Observable; @Input() createNewService!: (request: InlineServiceCreationRequest) => Observable; @Input() parameterContext: ParameterContextEntity | undefined; @@ -110,11 +120,15 @@ export class EditProcessor extends TabbedDialog { @Output() verify: EventEmitter = new EventEmitter(); @Output() editProcessor: EventEmitter = new EventEmitter(); + @Output() stopComponentRequest: EventEmitter = new EventEmitter(); + @Output() startComponentRequest: EventEmitter = new EventEmitter(); protected readonly TextTip = TextTip; editProcessorForm: FormGroup; - readonly: boolean; + readonly: boolean = true; + status: any = true; + revision: any = true; bulletinLevels = [ { @@ -182,8 +196,7 @@ export class EditProcessor extends TabbedDialog { ) { super('edit-processor-selected-index'); - this.readonly = - !request.entity.permissions.canWrite || !this.canvasUtils.runnableSupportsModification(request.entity); + this.initialize(request.entity); const processorProperties: any = request.entity.component.config.properties; const properties: Property[] = Object.entries(processorProperties).map((entry: any) => { @@ -255,6 +268,13 @@ export class EditProcessor extends TabbedDialog { } } + initialize(entity: any) { + this.status = entity.status; + this.revision = entity.revision; + + this.readonly = !entity.permissions.canWrite || !this.canvasUtils.runnableSupportsModification(entity); + } + private relationshipConfigurationValidator(): ValidatorFn { return (control: AbstractControl): ValidationErrors | null => { const relationshipConfiguration: RelationshipConfiguration = control.value; @@ -296,6 +316,14 @@ export class EditProcessor extends TabbedDialog { return this.nifiCommon.formatBundle(entity.component.bundle); } + formatRunStatus() { + if (this.status.runStatus === 'Stopped' && this.status.aggregateSnapshot.activeThreadCount > 0) { + return `Stopping (${this.status.aggregateSnapshot.activeThreadCount})`; + } + + return `${this.status.runStatus}`; + } + concurrentTasksChanged(): void { if (this.schedulingStrategy === 'CRON_DRIVEN') { this.cronDrivenConcurrentTasks = this.editProcessorForm.get('concurrentTasks')?.value; @@ -351,7 +379,10 @@ export class EditProcessor extends TabbedDialog { .map((relationship) => relationship.name); const payload: any = { - revision: this.client.getRevision(this.request.entity), + revision: this.client.getRevision({ + ...this.request.entity, + revision: this.revision + }), disconnectedNodeAcknowledged: this.clusterConnectionService.isDisconnectionAcknowledged(), component: { id: this.request.entity.id, @@ -401,6 +432,51 @@ export class EditProcessor extends TabbedDialog { }); } + isStoppable(entity: any): boolean { + if (!this.canOperate(entity)) { + return false; + } + + return this.status.aggregateSnapshot.runStatus === 'Running'; + } + + isRunnable(entity: any): boolean { + if (!this.canOperate(entity)) { + return false; + } + + return ( + !( + this.status.aggregateSnapshot.runStatus === 'Running' || + this.status.aggregateSnapshot.activeThreadCount > 0 + ) && this.status.aggregateSnapshot.runStatus === 'Stopped' + ); + } + + canOperate(entity: any): boolean { + return entity.permissions.canWrite || entity.operatePermissions?.canWrite; + } + + stop(entity: any) { + this.stopComponentRequest.next({ + id: entity.id, + uri: entity.uri, + type: ComponentType.Processor, + revision: this.revision, + errorStrategy: 'snackbar' + }); + } + + start(entity: any) { + this.startComponentRequest.next({ + id: entity.id, + uri: entity.uri, + type: ComponentType.Processor, + revision: this.revision, + errorStrategy: 'snackbar' + }); + } + private getModifiedProperties(): ModifiedProperties { const propertyControl: AbstractControl | null = this.editProcessorForm.get('properties'); if (propertyControl && propertyControl.dirty) { diff --git a/nifi-frontend/src/main/frontend/apps/nifi/src/app/service/client.service.ts b/nifi-frontend/src/main/frontend/apps/nifi/src/app/service/client.service.ts index 99704c4736f6..fe2d7167d532 100644 --- a/nifi-frontend/src/main/frontend/apps/nifi/src/app/service/client.service.ts +++ b/nifi-frontend/src/main/frontend/apps/nifi/src/app/service/client.service.ts @@ -17,12 +17,24 @@ import { Injectable } from '@angular/core'; import { v4 as uuidv4 } from 'uuid'; +import { SessionStorageService } from '@nifi/shared'; @Injectable({ providedIn: 'root' }) export class Client { - private clientId: string = uuidv4(); + private clientId: string; + + constructor(private sessionStorage: SessionStorageService) { + let clientId = this.sessionStorage.getItem('clientId'); + + if (clientId === null) { + clientId = uuidv4(); + this.sessionStorage.setItem('clientId', clientId); + } + + this.clientId = clientId; + } public getClientId(): string { return this.clientId; diff --git a/nifi-frontend/src/main/frontend/libs/shared/src/services/index.ts b/nifi-frontend/src/main/frontend/libs/shared/src/services/index.ts index 5e5054972e6c..8572877ade9e 100644 --- a/nifi-frontend/src/main/frontend/libs/shared/src/services/index.ts +++ b/nifi-frontend/src/main/frontend/libs/shared/src/services/index.ts @@ -17,5 +17,6 @@ export * from './nifi-common.service'; export * from './storage.service'; +export * from './session-storage.service'; export * from './theming.service'; export * from './map-table-helper.service'; diff --git a/nifi-frontend/src/main/frontend/libs/shared/src/services/session-storage.service.spec.ts b/nifi-frontend/src/main/frontend/libs/shared/src/services/session-storage.service.spec.ts new file mode 100644 index 000000000000..231824f15daa --- /dev/null +++ b/nifi-frontend/src/main/frontend/libs/shared/src/services/session-storage.service.spec.ts @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { TestBed } from '@angular/core/testing'; + +import { SessionStorageService } from './session-storage.service'; + +describe('SessionStorageService', () => { + let service: SessionStorageService; + + beforeEach(() => { + TestBed.configureTestingModule({}); + service = TestBed.inject(SessionStorageService); + }); + + it('should be created', () => { + expect(service).toBeTruthy(); + }); +}); diff --git a/nifi-frontend/src/main/frontend/libs/shared/src/services/session-storage.service.ts b/nifi-frontend/src/main/frontend/libs/shared/src/services/session-storage.service.ts new file mode 100644 index 000000000000..31d36ac4438e --- /dev/null +++ b/nifi-frontend/src/main/frontend/libs/shared/src/services/session-storage.service.ts @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Injectable } from '@angular/core'; + +interface SessionStorageEntry { + item: T; +} + +@Injectable({ + providedIn: 'root' +}) +export class SessionStorageService { + /** + * Gets an entry for the key. The entry expiration is not checked. + * + * @param {string} key + */ + private getEntry(key: string): null | SessionStorageEntry { + try { + // parse the entry + const item = sessionStorage.getItem(key); + if (!item) { + return null; + } + + const entry = JSON.parse(item); + + // ensure the entry is present + if (entry) { + return entry; + } else { + return null; + } + } catch (e) { + return null; + } + } + + /** + * Stores the specified item. + * + * @param {string} key + * @param {object} item + */ + public setItem(key: string, item: T): void { + // create the entry + const entry: SessionStorageEntry = { + item + }; + + // store the item + sessionStorage.setItem(key, JSON.stringify(entry)); + } + + /** + * Returns whether there is an entry for this key. This will not check the expiration. If + * the entry is expired, it will return null on a subsequent getItem invocation. + * + * @param {string} key + * @returns {boolean} + */ + public hasItem(key: string): boolean { + return this.getEntry(key) !== null; + } + + /** + * Gets the item with the specified key. If an item with this key does + * not exist, null is returned. If an item exists but cannot be parsed + * or is malformed/unrecognized, null is returned. + * + * @param {type} key + */ + public getItem(key: string): null | T { + const entry: SessionStorageEntry | null = this.getEntry(key); + if (entry === null) { + return null; + } + + // if the entry has the specified field return its value + if (entry['item']) { + return entry['item']; + } else { + return null; + } + } + + /** + * Removes the item with the specified key. + * + * @param {string} key + */ + public removeItem(key: string): void { + sessionStorage.removeItem(key); + } +}