Skip to content

Commit

Permalink
refactor(core): Move active workflows endpoints to a decorated contro…
Browse files Browse the repository at this point in the history
…ller class (no-changelog)
  • Loading branch information
netroy committed Dec 21, 2023
1 parent 15ffd4f commit 88b92df
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 133 deletions.
10 changes: 5 additions & 5 deletions cypress/e2e/30-editor-after-route-changes.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,17 @@ const switchBetweenEditorAndHistory = () => {

workflowPage.getters.canvasNodes().first().should('be.visible');
workflowPage.getters.canvasNodes().last().should('be.visible');
}
};

const switchBetweenEditorAndWorkflowlist = () => {
cy.getByTestId('menu-item').first().click();
cy.wait(['@getUsers', '@getWorkflows', '@getActive', '@getCredentials']);
cy.wait(['@getUsers', '@getWorkflows', '@getActiveWorkflows', '@getCredentials']);

cy.getByTestId('resources-list-item').first().click();

workflowPage.getters.canvasNodes().first().should('be.visible');
workflowPage.getters.canvasNodes().last().should('be.visible');
}
};

const zoomInAndCheckNodes = () => {
cy.getByTestId('zoom-in-button').click();
Expand All @@ -119,7 +119,7 @@ const zoomInAndCheckNodes = () => {

workflowPage.getters.canvasNodes().first().should('not.be.visible');
workflowPage.getters.canvasNodes().last().should('not.be.visible');
}
};

describe('Editor actions should work', () => {
beforeEach(() => {
Expand Down Expand Up @@ -199,7 +199,7 @@ describe('Editor zoom should work after route changes', () => {
cy.intercept('GET', '/rest/workflow-history/workflow/*').as('getHistory');
cy.intercept('GET', '/rest/users').as('getUsers');
cy.intercept('GET', '/rest/workflows').as('getWorkflows');
cy.intercept('GET', '/rest/active').as('getActive');
cy.intercept('GET', '/rest/active_workflows').as('getActiveWorkflows');
cy.intercept('GET', '/rest/credentials').as('getCredentials');

switchBetweenEditorAndHistory();
Expand Down
80 changes: 12 additions & 68 deletions packages/cli/src/ActiveWorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import type {
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';

import type { User } from '@db/entities/User';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ExecutionsService } from './executions/executions.service';
Expand All @@ -57,44 +56,40 @@ import {
import { NodeTypes } from '@/NodeTypes';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExternalHooks } from '@/ExternalHooks';
import { whereClause } from './UserManagement/UserManagementHelper';
import { WebhookNotFoundError } from './errors/response-errors/webhook-not-found.error';
import { In } from 'typeorm';
import { WebhookService } from './services/webhook.service';
import { Logger } from './Logger';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { MultiMainSetup } from '@/services/orchestration/main/MultiMainSetup.ee';
import { ActivationErrorsService } from '@/ActivationErrors.service';
import type { Scope } from '@n8n/permissions';
import { NotFoundError } from './errors/response-errors/not-found.error';
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';

interface QueuedActivation {
activationMode: WorkflowActivateMode;
lastTimeout: number;
timeout: NodeJS.Timeout;
workflowData: IWorkflowDb;
}

@Service()
export class ActiveWorkflowRunner implements IWebhookManager {
activeWorkflows = new ActiveWorkflows();

private queuedActivations: {
[workflowId: string]: {
activationMode: WorkflowActivateMode;
lastTimeout: number;
timeout: NodeJS.Timeout;
workflowData: IWorkflowDb;
};
} = {};
private queuedActivations: Record<string, QueuedActivation> = {};

constructor(
private readonly logger: Logger,
private readonly activeWorkflows: ActiveWorkflows,
private readonly activeExecutions: ActiveExecutions,
private readonly externalHooks: ExternalHooks,
private readonly nodeTypes: NodeTypes,
private readonly webhookService: WebhookService,
private readonly workflowRepository: WorkflowRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly multiMainSetup: MultiMainSetup,
private readonly activationErrorsService: ActivationErrorsService,
private readonly executionService: ExecutionsService,
private readonly workflowStaticDataService: WorkflowStaticDataService,
private readonly activeWorkflowsService: ActiveWorkflowsService,
) {}

async init() {
Expand All @@ -119,7 +114,7 @@ export class ActiveWorkflowRunner implements IWebhookManager {

activeWorkflowIds.push(...this.activeWorkflows.allActiveWorkflows());

const activeWorkflows = await this.allActiveInStorage();
const activeWorkflows = await this.activeWorkflowsService.getAllActiveIds();
activeWorkflowIds = [...activeWorkflowIds, ...activeWorkflows];
// Make sure IDs are unique
activeWorkflowIds = Array.from(new Set(activeWorkflowIds));
Expand Down Expand Up @@ -269,50 +264,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
return this.activeWorkflows.allActiveWorkflows();
}

/**
* Get the IDs of active workflows from storage.
*/
async allActiveInStorage(options?: { user: User; scope: Scope | Scope[] }) {
const isFullAccess = !options?.user || options.user.hasGlobalScope(options.scope);

const activationErrors = await this.activationErrorsService.getAll();

if (isFullAccess) {
const activeWorkflows = await this.workflowRepository.find({
select: ['id'],
where: { active: true },
});

return activeWorkflows
.map((workflow) => workflow.id)
.filter((workflowId) => !activationErrors[workflowId]);
}

const where = whereClause({
user: options.user,
globalScope: 'workflow:list',
entityType: 'workflow',
});

const activeWorkflows = await this.workflowRepository.find({
select: ['id'],
where: { active: true },
});

const activeIds = activeWorkflows.map((workflow) => workflow.id);

Object.assign(where, { workflowId: In(activeIds) });

const sharings = await this.sharedWorkflowRepository.find({
select: ['workflowId'],
where,
});

return sharings
.map((sharing) => sharing.workflowId)
.filter((workflowId) => !activationErrors[workflowId]);
}

/**
* Returns if the workflow is stored as `active`.
*
Expand All @@ -328,13 +279,6 @@ export class ActiveWorkflowRunner implements IWebhookManager {
return !!workflow?.active;
}

/**
* Return error if there was a problem activating the workflow
*/
async getActivationError(workflowId: string) {
return this.activationErrorsService.get(workflowId);
}

/**
* Register workflow-defined webhooks in the `workflow_entity` table.
*/
Expand Down
49 changes: 3 additions & 46 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import {
TEMPLATES_DIR,
} from '@/constants';
import { credentialsController } from '@/credentials/credentials.controller';
import type { CurlHelper, ExecutionRequest, WorkflowRequest } from '@/requests';
import type { CurlHelper, ExecutionRequest } from '@/requests';
import { registerController } from '@/decorators';
import { AuthController } from '@/controllers/auth.controller';
import { BinaryDataController } from '@/controllers/binaryData.controller';
Expand All @@ -66,7 +66,6 @@ import { WorkflowStatisticsController } from '@/controllers/workflowStatistics.c
import { ExternalSecretsController } from '@/ExternalSecrets/ExternalSecrets.controller.ee';
import { executionsController } from '@/executions/executions.controller';
import { isApiEnabled, loadPublicApiVersions } from '@/PublicApi';
import { whereClause } from '@/UserManagement/UserManagementHelper';
import type { ICredentialsOverwrite, IDiagnosticInfo, IExecutionsStopData } from '@/Interfaces';
import { ActiveExecutions } from '@/ActiveExecutions';
import { CredentialsOverwrites } from '@/CredentialsOverwrites';
Expand Down Expand Up @@ -112,6 +111,7 @@ import { handleMfaDisable, isMfaFeatureEnabled } from './Mfa/helpers';
import type { FrontendService } from './services/frontend.service';
import { RoleService } from './services/role.service';
import { UserService } from './services/user.service';
import { ActiveWorkflowsController } from './controllers/activeWorkflows.controller';
import { OrchestrationController } from './controllers/orchestration.controller';
import { WorkflowHistoryController } from './workflows/workflowHistory/workflowHistory.controller.ee';
import { InvitationController } from './controllers/invitation.controller';
Expand Down Expand Up @@ -305,6 +305,7 @@ export class Server extends AbstractServer {
),
Container.get(VariablesController),
Container.get(RoleController),
Container.get(ActiveWorkflowsController),
];

if (Container.get(MultiMainSetup).isEnabled) {
Expand Down Expand Up @@ -443,50 +444,6 @@ export class Server extends AbstractServer {
this.logger.warn(`Source Control initialization failed: ${error.message}`);
}

// ----------------------------------------
// Active Workflows
// ----------------------------------------

// Returns the active workflow ids
this.app.get(
`/${this.restEndpoint}/active`,
ResponseHelper.send(async (req: WorkflowRequest.GetAllActive) => {
return this.activeWorkflowRunner.allActiveInStorage({
user: req.user,
scope: 'workflow:list',
});
}),
);

// Returns if the workflow with the given id had any activation errors
this.app.get(
`/${this.restEndpoint}/active/error/:id`,
ResponseHelper.send(async (req: WorkflowRequest.GetActivationError) => {
const { id: workflowId } = req.params;

const shared = await Container.get(SharedWorkflowRepository).findOne({
relations: ['workflow'],
where: whereClause({
user: req.user,
globalScope: 'workflow:read',
entityType: 'workflow',
entityId: workflowId,
}),
});

if (!shared) {
this.logger.verbose('User attempted to access workflow errors without permissions', {
workflowId,
userId: req.user.id,
});

throw new BadRequestError(`Workflow with ID "${workflowId}" could not be found.`);
}

return this.activeWorkflowRunner.getActivationError(workflowId);
}),
);

// ----------------------------------------
// curl-converter
// ----------------------------------------
Expand Down
25 changes: 25 additions & 0 deletions packages/cli/src/controllers/activeWorkflows.controller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Service } from 'typedi';
import { Authorized, Get, RestController } from '@/decorators';
import { WorkflowRequest } from '@/requests';
import { ActiveWorkflowsService } from '@/services/activeWorkflows.service';

@Service()
@Authorized()
@RestController('/active_workflows')
export class ActiveWorkflowsController {
constructor(private readonly activeWorkflowsService: ActiveWorkflowsService) {}

@Get('/')
async getActiveWorkflows(req: WorkflowRequest.GetAllActive) {
return this.activeWorkflowsService.getAllActiveIdsForUser(req.user);
}

@Get('/error/:id')
async getActiveError(req: WorkflowRequest.GetActivationError) {
const {
user,
params: { id: workflowId },
} = req;
return this.activeWorkflowsService.getActivationError(workflowId, user);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,31 @@
import { Service } from 'typedi';
import { DataSource, Repository } from 'typeorm';
import { DataSource, type FindOptionsWhere, Repository, In } from 'typeorm';
import { SharedWorkflow } from '../entities/SharedWorkflow';
import { type User } from '../entities/User';

@Service()
export class SharedWorkflowRepository extends Repository<SharedWorkflow> {
constructor(dataSource: DataSource) {
super(SharedWorkflow, dataSource.manager);
}

async hasAccess(workflowId: string, user: User) {
const where: FindOptionsWhere<SharedWorkflow> = {
workflowId,
};
if (!user.hasGlobalScope('workflow:read')) {
where.userId = user.id;
}
return this.exist({ where });
}

async getSharedWorkflowIds(workflowIds: string[]) {
const sharedWorkflows = await this.find({
select: ['workflowId'],
where: {
workflowId: In(workflowIds),
},
});
return sharedWorkflows.map((sharing) => sharing.workflowId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ export class WorkflowRepository extends Repository<WorkflowEntity> {
});
}

async getActiveIds() {
const activeWorkflows = await this.find({
select: ['id'],
where: { active: true },
});
return activeWorkflows.map((workflow) => workflow.id);
}

async findById(workflowId: string) {
return this.findOne({
where: { id: workflowId },
Expand Down
52 changes: 52 additions & 0 deletions packages/cli/src/services/activeWorkflows.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { Service } from 'typedi';

import type { User } from '@db/entities/User';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
import { ActivationErrorsService } from '@/ActivationErrors.service';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { Logger } from '@/Logger';

@Service()
export class ActiveWorkflowsService {
constructor(
private readonly logger: Logger,
private readonly workflowRepository: WorkflowRepository,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly activationErrorsService: ActivationErrorsService,
) {}

async getAllActiveIds() {
const activationErrors = await this.activationErrorsService.getAll();
const activeWorkflowIds = await this.workflowRepository.getActiveIds();
return activeWorkflowIds.filter((workflowId) => !activationErrors[workflowId]);
}

async getAllActiveIdsForUser(user: User) {
const activationErrors = await this.activationErrorsService.getAll();
const activeWorkflowIds = await this.workflowRepository.getActiveIds();

const hasFullAccess = user.hasGlobalScope('workflow:list');
if (hasFullAccess) {
return activeWorkflowIds.filter((workflowId) => !activationErrors[workflowId]);
}

const sharedWorkflowIds =
await this.sharedWorkflowRepository.getSharedWorkflowIds(activeWorkflowIds);
return sharedWorkflowIds.filter((workflowId) => !activationErrors[workflowId]);
}

async getActivationError(workflowId: string, user: User) {
const hasAccess = await this.sharedWorkflowRepository.hasAccess(workflowId, user);
if (!hasAccess) {
this.logger.verbose('User attempted to access workflow errors without permissions', {
workflowId,
userId: user.id,
});

throw new BadRequestError(`Workflow with ID "${workflowId}" could not be found.`);
}

return this.activationErrorsService.get(workflowId);
}
}
Loading

0 comments on commit 88b92df

Please sign in to comment.