diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 62d7096087720..5bffa178c7e08 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -129,7 +129,7 @@ import { WaitTracker } from '@/WaitTracker'; import * as WebhookHelpers from '@/WebhookHelpers'; import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'; import { toHttpNodeParameters } from '@/CurlConverterHelper'; -import { eventBusRouter } from '@/eventbus/eventBusRoutes'; +import { EventBusController } from '@/eventbus/eventBus.controller'; import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper'; import { licenseController } from './license/license.controller'; import { Push, setupPushServer, setupPushHandler } from '@/push'; @@ -377,6 +377,7 @@ class Server extends AbstractServer { const samlService = Container.get(SamlService); const controllers: object[] = [ + new EventBusController(), new AuthController({ config, internalHooks, repositories, logger, postHog }), new OwnerController({ config, internalHooks, repositories, logger }), new MeController({ externalHooks, internalHooks, repositories, logger }), @@ -1229,8 +1230,6 @@ class Server extends AbstractServer { if (!eventBus.isInitialized) { await eventBus.initialize(); } - // add Event Bus REST endpoints - this.app.use(`/${this.restEndpoint}/eventbus`, eventBusRouter); // ---------------------------------------- // Webhooks diff --git a/packages/cli/src/eventbus/EventMessageClasses/index.ts b/packages/cli/src/eventbus/EventMessageClasses/index.ts index 508b5031cfa56..4b4fe47f8d822 100644 --- a/packages/cli/src/eventbus/EventMessageClasses/index.ts +++ b/packages/cli/src/eventbus/EventMessageClasses/index.ts @@ -49,3 +49,11 @@ export type EventMessageTypes = | EventMessageWorkflow | EventMessageAudit | EventMessageNode; + +export interface FailedEventSummary { + lastNodeExecuted: string; + executionId: string; + name: string; + event: string; + timestamp: string; +} diff --git a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts index 439e4773e6a2f..5d63f57b26de3 100644 --- a/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts +++ b/packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts @@ -1,7 +1,11 @@ import { LoggerProxy } from 'n8n-workflow'; import type { MessageEventBusDestinationOptions } from 'n8n-workflow'; import type { DeleteResult } from 'typeorm'; -import type { EventMessageTypes } from '../EventMessageClasses/'; +import type { + EventMessageTypes, + EventNamesTypes, + FailedEventSummary, +} from '../EventMessageClasses/'; import type { MessageEventBusDestination } from '../MessageEventBusDestination/MessageEventBusDestination.ee'; import { MessageEventBusLogWriter } from '../MessageEventBusWriter/MessageEventBusLogWriter'; import EventEmitter from 'events'; @@ -249,6 +253,48 @@ export class MessageEventBus extends EventEmitter { ); } + async getEventsFailed(amount = 5): Promise { + const result: FailedEventSummary[] = []; + try { + const queryResult = await this.logWriter?.getMessagesAll(); + const uniques = uniqby(queryResult, 'id'); + const filteredExecutionIds = uniques + .filter((e) => + (['n8n.workflow.crashed', 'n8n.workflow.failed'] as EventNamesTypes[]).includes( + e.eventName, + ), + ) + .map((e) => ({ + executionId: e.payload.executionId as string, + name: e.payload.workflowName, + timestamp: e.ts, + event: e.eventName, + })) + .filter((e) => e) + .sort((a, b) => (a.timestamp > b.timestamp ? 1 : -1)) + .slice(-amount); + + for (const execution of filteredExecutionIds) { + const data = await recoverExecutionDataFromEventLogMessages( + execution.executionId, + queryResult, + false, + ); + if (data) { + const lastNodeExecuted = data.resultData.lastNodeExecuted; + result.push({ + lastNodeExecuted: lastNodeExecuted ?? '', + executionId: execution.executionId, + name: execution.name as string, + event: execution.event, + timestamp: execution.timestamp.toISO(), + }); + } + } + } catch {} + return result; + } + async getEventsAll(): Promise { const queryResult = await this.logWriter?.getMessagesAll(); const filtered = uniqby(queryResult, 'id'); diff --git a/packages/cli/src/eventbus/eventBusRoutes.ts b/packages/cli/src/eventbus/eventBus.controller.ts similarity index 67% rename from packages/cli/src/eventbus/eventBusRoutes.ts rename to packages/cli/src/eventbus/eventBus.controller.ts index 26d76e6c398e5..bc18472d4c910 100644 --- a/packages/cli/src/eventbus/eventBusRoutes.ts +++ b/packages/cli/src/eventbus/eventBus.controller.ts @@ -17,22 +17,26 @@ import { MessageEventBusDestinationSyslog, } from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee'; import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee'; +import type { EventMessageTypes, FailedEventSummary } from './EventMessageClasses'; import { eventNamesAll } from './EventMessageClasses'; import type { EventMessageAuditOptions } from './EventMessageClasses/EventMessageAudit'; import { EventMessageAudit } from './EventMessageClasses/EventMessageAudit'; -import { BadRequestError } from '../ResponseHelper'; +import { BadRequestError } from '@/ResponseHelper'; import type { MessageEventBusDestinationWebhookOptions, MessageEventBusDestinationOptions, + IRunExecutionData, } from 'n8n-workflow'; import { MessageEventBusDestinationTypeNames, EventMessageTypeNames } from 'n8n-workflow'; -import type { User } from '../databases/entities/User'; +import type { User } from '@db/entities/User'; import * as ResponseHelper from '@/ResponseHelper'; import type { EventMessageNodeOptions } from './EventMessageClasses/EventMessageNode'; import { EventMessageNode } from './EventMessageClasses/EventMessageNode'; import { recoverExecutionDataFromEventLogMessages } from './MessageEventBus/recoverEvents'; - -export const eventBusRouter = express.Router(); +import { RestController, Get, Post, Delete } from '@/decorators'; +import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee'; +import { isOwnerMiddleware } from '../middlewares/isOwner'; +import type { DeleteResult } from 'typeorm'; // ---------------------------------------- // TypeGuards @@ -50,7 +54,6 @@ const isWithQueryString = (candidate: unknown): candidate is { query: string } = return o.query !== undefined; }; -// TODO: add credentials const isMessageEventBusDestinationWebhookOptions = ( candidate: unknown, ): candidate is MessageEventBusDestinationWebhookOptions => { @@ -68,11 +71,18 @@ const isMessageEventBusDestinationOptions = ( }; // ---------------------------------------- -// Events +// Controller // ---------------------------------------- -eventBusRouter.get( - '/event', - ResponseHelper.send(async (req: express.Request): Promise => { + +@RestController('/eventbus') +export class EventBusController { + // ---------------------------------------- + // Events + // ---------------------------------------- + @Get('/event', { middlewares: [isOwnerMiddleware] }) + async getEvents( + req: express.Request, + ): Promise> { if (isWithQueryString(req.query)) { switch (req.query.query as EventMessageReturnMode) { case 'sent': @@ -85,14 +95,19 @@ eventBusRouter.get( default: return eventBus.getEventsAll(); } + } else { + return eventBus.getEventsAll(); } - return eventBus.getEventsAll(); - }), -); + } + + @Get('/failed') + async getFailedEvents(req: express.Request): Promise { + const amount = parseInt(req.query?.amount as string) ?? 5; + return eventBus.getEventsFailed(amount); + } -eventBusRouter.get( - '/execution/:id', - ResponseHelper.send(async (req: express.Request): Promise => { + @Get('/execution/:id') + async getEventForExecutionId(req: express.Request): Promise { if (req.params?.id) { let logHistory; if (req.query?.logHistory) { @@ -100,40 +115,27 @@ eventBusRouter.get( } return eventBus.getEventsByExecutionId(req.params.id, logHistory); } - }), -); + return; + } -eventBusRouter.get( - '/execution-recover/:id', - ResponseHelper.send(async (req: express.Request): Promise => { + @Get('/execution-recover/:id') + async getRecoveryForExecutionId(req: express.Request): Promise { + const { id } = req.params; if (req.params?.id) { - let logHistory; - let applyToDb = true; - if (req.query?.logHistory) { - logHistory = parseInt(req.query.logHistory as string, 10); - } - if (req.query?.applyToDb) { - applyToDb = !!req.query.applyToDb; - } - const messages = await eventBus.getEventsByExecutionId(req.params.id, logHistory); + const logHistory = parseInt(req.query.logHistory as string, 10) || undefined; + const applyToDb = req.query.applyToDb !== undefined ? !!req.query.applyToDb : true; + const messages = await eventBus.getEventsByExecutionId(id, logHistory); if (messages.length > 0) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment - const recoverResult = await recoverExecutionDataFromEventLogMessages( - req.params.id, - messages, - applyToDb, - ); - return recoverResult; + return recoverExecutionDataFromEventLogMessages(id, messages, applyToDb); } } - }), -); + return; + } -eventBusRouter.post( - '/event', - ResponseHelper.send(async (req: express.Request): Promise => { + @Post('/event', { middlewares: [isOwnerMiddleware] }) + async postEvent(req: express.Request): Promise { + let msg: EventMessageTypes | undefined; if (isEventMessageOptions(req.body)) { - let msg; switch (req.body.__type) { case EventMessageTypeNames.workflow: msg = new EventMessageWorkflow(req.body as EventMessageWorkflowOptions); @@ -154,35 +156,30 @@ eventBusRouter.post( 'Body is not a serialized EventMessage or eventName does not match format {namespace}.{domain}.{event}', ); } - }), -); + return msg; + } -// ---------------------------------------- -// Destinations -// ---------------------------------------- + // ---------------------------------------- + // Destinations + // ---------------------------------------- -eventBusRouter.get( - '/destination', - ResponseHelper.send(async (req: express.Request): Promise => { - let result = []; + @Get('/destination') + async getDestination(req: express.Request): Promise { if (isWithIdString(req.query)) { - result = await eventBus.findDestination(req.query.id); + return eventBus.findDestination(req.query.id); } else { - result = await eventBus.findDestination(); + return eventBus.findDestination(); } - return result; - }), -); + } -eventBusRouter.post( - '/destination', - ResponseHelper.send(async (req: express.Request): Promise => { + @Post('/destination', { middlewares: [isOwnerMiddleware] }) + async postDestination(req: express.Request): Promise { if (!req.user || (req.user as User).globalRole.name !== 'owner') { throw new ResponseHelper.UnauthorizedError('Invalid request'); } + let result: MessageEventBusDestination | undefined; if (isMessageEventBusDestinationOptions(req.body)) { - let result; switch (req.body.__type) { case MessageEventBusDestinationTypeNames.sentry: if (isMessageEventBusDestinationSentryOptions(req.body)) { @@ -214,51 +211,41 @@ eventBusRouter.post( if (result) { await result.saveToDb(); return { - ...result, + ...result.serialize(), eventBusInstance: undefined, }; } throw new BadRequestError('There was an error adding the destination'); } throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions'); - }), -); + } -eventBusRouter.get( - '/testmessage', - ResponseHelper.send(async (req: express.Request): Promise => { - let result = false; + @Get('/testmessage') + async sendTestMessage(req: express.Request): Promise { if (isWithIdString(req.query)) { - result = await eventBus.testDestination(req.query.id); + return eventBus.testDestination(req.query.id); } - return result; - }), -); + return false; + } -eventBusRouter.delete( - '/destination', - ResponseHelper.send(async (req: express.Request): Promise => { + @Delete('/destination', { middlewares: [isOwnerMiddleware] }) + async deleteDestination(req: express.Request): Promise { if (!req.user || (req.user as User).globalRole.name !== 'owner') { throw new ResponseHelper.UnauthorizedError('Invalid request'); } if (isWithIdString(req.query)) { - const result = await eventBus.removeDestination(req.query.id); - if (result) { - return result; - } + return eventBus.removeDestination(req.query.id); } else { throw new BadRequestError('Query is missing id'); } - }), -); + } -// ---------------------------------------- -// Utilities -// ---------------------------------------- + // ---------------------------------------- + // Utilities + // ---------------------------------------- -eventBusRouter.get( - '/eventnames', - ResponseHelper.send(async (): Promise => { + @Get('/eventnames') + async getEventNames(): Promise { return eventNamesAll; - }), -); + } +} diff --git a/packages/cli/src/middlewares/isOwner.ts b/packages/cli/src/middlewares/isOwner.ts new file mode 100644 index 0000000000000..d3e3c70a0fb46 --- /dev/null +++ b/packages/cli/src/middlewares/isOwner.ts @@ -0,0 +1,12 @@ +import type { RequestHandler } from 'express'; +import { LoggerProxy } from 'n8n-workflow'; +import type { AuthenticatedRequest } from '@/requests'; + +export const isOwnerMiddleware: RequestHandler = (req: AuthenticatedRequest, res, next) => { + if (req.user.globalRole.name === 'owner') { + next(); + } else { + LoggerProxy.debug('Request failed because user is not owner'); + res.status(401).send('Unauthorized'); + } +}; diff --git a/packages/cli/test/integration/eventbus.test.ts b/packages/cli/test/integration/eventbus.test.ts index a2dee871a1f96..d456f86799884 100644 --- a/packages/cli/test/integration/eventbus.test.ts +++ b/packages/cli/test/integration/eventbus.test.ts @@ -100,7 +100,7 @@ beforeAll(async () => { utils.initConfigFile(); config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter'); - config.set('eventBus.logWriter.keepLogCount', '1'); + config.set('eventBus.logWriter.keepLogCount', 1); config.set('enterprise.features.logStreaming', true); config.set('userManagement.disabled', false); config.set('userManagement.isInstanceOwnerSetUp', true); diff --git a/packages/cli/test/integration/shared/utils.ts b/packages/cli/test/integration/shared/utils.ts index 1fdb77f34aeef..e8bf676ba7125 100644 --- a/packages/cli/test/integration/shared/utils.ts +++ b/packages/cli/test/integration/shared/utils.ts @@ -56,7 +56,6 @@ import type { PostgresSchemaSection, } from './types'; import { licenseController } from '@/license/license.controller'; -import { eventBusRouter } from '@/eventbus/eventBusRoutes'; import { registerController } from '@/decorators'; import { AuthController, @@ -81,6 +80,7 @@ import { Push } from '@/push'; import { setSamlLoginEnabled } from '@/sso/saml/samlHelpers'; import { SamlService } from '@/sso/saml/saml.service.ee'; import { SamlController } from '@/sso/saml/routes/saml.controller.ee'; +import { EventBusController } from '@/eventbus/eventBus.controller'; export const mockInstance = ( ctor: new (...args: any[]) => T, @@ -151,7 +151,6 @@ export async function initTestServer({ credentials: { controller: credentialsController, path: 'credentials' }, workflows: { controller: workflowsController, path: 'workflows' }, license: { controller: licenseController, path: 'license' }, - eventBus: { controller: eventBusRouter, path: 'eventbus' }, }; if (enablePublicAPI) { @@ -176,6 +175,9 @@ export async function initTestServer({ for (const group of functionEndpoints) { switch (group) { + case 'eventBus': + registerController(testServer.app, config, new EventBusController()); + break; case 'auth': registerController( testServer.app, @@ -266,7 +268,7 @@ const classifyEndpointGroups = (endpointGroups: EndpointGroup[]) => { const routerEndpoints: EndpointGroup[] = []; const functionEndpoints: EndpointGroup[] = []; - const ROUTER_GROUP = ['credentials', 'workflows', 'publicApi', 'eventBus', 'license']; + const ROUTER_GROUP = ['credentials', 'workflows', 'publicApi', 'license']; endpointGroups.forEach((group) => (ROUTER_GROUP.includes(group) ? routerEndpoints : functionEndpoints).push(group),