Skip to content
/ qp-n8n Public
forked from n8n-io/n8n

Commit

Permalink
feat(core): Convert eventBus controller to decorator style and improv…
Browse files Browse the repository at this point in the history
…e permissions (n8n-io#5779)
  • Loading branch information
flipswitchingmonkey authored and sunilrr committed Apr 24, 2023
1 parent 869ab1e commit ad6de22
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 96 deletions.
5 changes: 2 additions & 3 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ import { WaitTracker } from '@/WaitTracker';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { toHttpNodeParameters } from '@/CurlConverterHelper';
import { eventBusRouter } from '@/eventbus/eventBusRoutes';
import { EventBusController } from '@/eventbus/eventBus.controller';
import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper';
import { licenseController } from './license/license.controller';
import { Push, setupPushServer, setupPushHandler } from '@/push';
Expand Down Expand Up @@ -377,6 +377,7 @@ class Server extends AbstractServer {
const samlService = Container.get(SamlService);

const controllers: object[] = [
new EventBusController(),
new AuthController({ config, internalHooks, repositories, logger, postHog }),
new OwnerController({ config, internalHooks, repositories, logger }),
new MeController({ externalHooks, internalHooks, repositories, logger }),
Expand Down Expand Up @@ -1229,8 +1230,6 @@ class Server extends AbstractServer {
if (!eventBus.isInitialized) {
await eventBus.initialize();
}
// add Event Bus REST endpoints
this.app.use(`/${this.restEndpoint}/eventbus`, eventBusRouter);

// ----------------------------------------
// Webhooks
Expand Down
8 changes: 8 additions & 0 deletions packages/cli/src/eventbus/EventMessageClasses/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,11 @@ export type EventMessageTypes =
| EventMessageWorkflow
| EventMessageAudit
| EventMessageNode;

export interface FailedEventSummary {
lastNodeExecuted: string;
executionId: string;
name: string;
event: string;
timestamp: string;
}
48 changes: 47 additions & 1 deletion packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { LoggerProxy } from 'n8n-workflow';
import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
import type { DeleteResult } from 'typeorm';
import type { EventMessageTypes } from '../EventMessageClasses/';
import type {
EventMessageTypes,
EventNamesTypes,
FailedEventSummary,
} from '../EventMessageClasses/';
import type { MessageEventBusDestination } from '../MessageEventBusDestination/MessageEventBusDestination.ee';
import { MessageEventBusLogWriter } from '../MessageEventBusWriter/MessageEventBusLogWriter';
import EventEmitter from 'events';
Expand Down Expand Up @@ -249,6 +253,48 @@ export class MessageEventBus extends EventEmitter {
);
}

async getEventsFailed(amount = 5): Promise<FailedEventSummary[]> {
const result: FailedEventSummary[] = [];
try {
const queryResult = await this.logWriter?.getMessagesAll();
const uniques = uniqby(queryResult, 'id');
const filteredExecutionIds = uniques
.filter((e) =>
(['n8n.workflow.crashed', 'n8n.workflow.failed'] as EventNamesTypes[]).includes(
e.eventName,
),
)
.map((e) => ({
executionId: e.payload.executionId as string,
name: e.payload.workflowName,
timestamp: e.ts,
event: e.eventName,
}))
.filter((e) => e)
.sort((a, b) => (a.timestamp > b.timestamp ? 1 : -1))
.slice(-amount);

for (const execution of filteredExecutionIds) {
const data = await recoverExecutionDataFromEventLogMessages(
execution.executionId,
queryResult,
false,
);
if (data) {
const lastNodeExecuted = data.resultData.lastNodeExecuted;
result.push({
lastNodeExecuted: lastNodeExecuted ?? '',
executionId: execution.executionId,
name: execution.name as string,
event: execution.event,
timestamp: execution.timestamp.toISO(),
});
}
}
} catch {}
return result;
}

async getEventsAll(): Promise<EventMessageTypes[]> {
const queryResult = await this.logWriter?.getMessagesAll();
const filtered = uniqby(queryResult, 'id');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,26 @@ import {
MessageEventBusDestinationSyslog,
} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee';
import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee';
import type { EventMessageTypes, FailedEventSummary } from './EventMessageClasses';
import { eventNamesAll } from './EventMessageClasses';
import type { EventMessageAuditOptions } from './EventMessageClasses/EventMessageAudit';
import { EventMessageAudit } from './EventMessageClasses/EventMessageAudit';
import { BadRequestError } from '../ResponseHelper';
import { BadRequestError } from '@/ResponseHelper';
import type {
MessageEventBusDestinationWebhookOptions,
MessageEventBusDestinationOptions,
IRunExecutionData,
} from 'n8n-workflow';
import { MessageEventBusDestinationTypeNames, EventMessageTypeNames } from 'n8n-workflow';
import type { User } from '../databases/entities/User';
import type { User } from '@db/entities/User';
import * as ResponseHelper from '@/ResponseHelper';
import type { EventMessageNodeOptions } from './EventMessageClasses/EventMessageNode';
import { EventMessageNode } from './EventMessageClasses/EventMessageNode';
import { recoverExecutionDataFromEventLogMessages } from './MessageEventBus/recoverEvents';

export const eventBusRouter = express.Router();
import { RestController, Get, Post, Delete } from '@/decorators';
import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee';
import { isOwnerMiddleware } from '../middlewares/isOwner';
import type { DeleteResult } from 'typeorm';

// ----------------------------------------
// TypeGuards
Expand All @@ -50,7 +54,6 @@ const isWithQueryString = (candidate: unknown): candidate is { query: string } =
return o.query !== undefined;
};

// TODO: add credentials
const isMessageEventBusDestinationWebhookOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationWebhookOptions => {
Expand All @@ -68,11 +71,18 @@ const isMessageEventBusDestinationOptions = (
};

// ----------------------------------------
// Events
// Controller
// ----------------------------------------
eventBusRouter.get(
'/event',
ResponseHelper.send(async (req: express.Request): Promise<any> => {

@RestController('/eventbus')
export class EventBusController {
// ----------------------------------------
// Events
// ----------------------------------------
@Get('/event', { middlewares: [isOwnerMiddleware] })
async getEvents(
req: express.Request,
): Promise<EventMessageTypes[] | Record<string, EventMessageTypes[]>> {
if (isWithQueryString(req.query)) {
switch (req.query.query as EventMessageReturnMode) {
case 'sent':
Expand All @@ -85,55 +95,47 @@ eventBusRouter.get(
default:
return eventBus.getEventsAll();
}
} else {
return eventBus.getEventsAll();
}
return eventBus.getEventsAll();
}),
);
}

@Get('/failed')
async getFailedEvents(req: express.Request): Promise<FailedEventSummary[]> {
const amount = parseInt(req.query?.amount as string) ?? 5;
return eventBus.getEventsFailed(amount);
}

eventBusRouter.get(
'/execution/:id',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Get('/execution/:id')
async getEventForExecutionId(req: express.Request): Promise<EventMessageTypes[] | undefined> {
if (req.params?.id) {
let logHistory;
if (req.query?.logHistory) {
logHistory = parseInt(req.query.logHistory as string, 10);
}
return eventBus.getEventsByExecutionId(req.params.id, logHistory);
}
}),
);
return;
}

eventBusRouter.get(
'/execution-recover/:id',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Get('/execution-recover/:id')
async getRecoveryForExecutionId(req: express.Request): Promise<IRunExecutionData | undefined> {
const { id } = req.params;
if (req.params?.id) {
let logHistory;
let applyToDb = true;
if (req.query?.logHistory) {
logHistory = parseInt(req.query.logHistory as string, 10);
}
if (req.query?.applyToDb) {
applyToDb = !!req.query.applyToDb;
}
const messages = await eventBus.getEventsByExecutionId(req.params.id, logHistory);
const logHistory = parseInt(req.query.logHistory as string, 10) || undefined;
const applyToDb = req.query.applyToDb !== undefined ? !!req.query.applyToDb : true;
const messages = await eventBus.getEventsByExecutionId(id, logHistory);
if (messages.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const recoverResult = await recoverExecutionDataFromEventLogMessages(
req.params.id,
messages,
applyToDb,
);
return recoverResult;
return recoverExecutionDataFromEventLogMessages(id, messages, applyToDb);
}
}
}),
);
return;
}

eventBusRouter.post(
'/event',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Post('/event', { middlewares: [isOwnerMiddleware] })
async postEvent(req: express.Request): Promise<EventMessageTypes | undefined> {
let msg: EventMessageTypes | undefined;
if (isEventMessageOptions(req.body)) {
let msg;
switch (req.body.__type) {
case EventMessageTypeNames.workflow:
msg = new EventMessageWorkflow(req.body as EventMessageWorkflowOptions);
Expand All @@ -154,35 +156,30 @@ eventBusRouter.post(
'Body is not a serialized EventMessage or eventName does not match format {namespace}.{domain}.{event}',
);
}
}),
);
return msg;
}

// ----------------------------------------
// Destinations
// ----------------------------------------
// ----------------------------------------
// Destinations
// ----------------------------------------

eventBusRouter.get(
'/destination',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
let result = [];
@Get('/destination')
async getDestination(req: express.Request): Promise<MessageEventBusDestinationOptions[]> {
if (isWithIdString(req.query)) {
result = await eventBus.findDestination(req.query.id);
return eventBus.findDestination(req.query.id);
} else {
result = await eventBus.findDestination();
return eventBus.findDestination();
}
return result;
}),
);
}

eventBusRouter.post(
'/destination',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Post('/destination', { middlewares: [isOwnerMiddleware] })
async postDestination(req: express.Request): Promise<any> {
if (!req.user || (req.user as User).globalRole.name !== 'owner') {
throw new ResponseHelper.UnauthorizedError('Invalid request');
}

let result: MessageEventBusDestination | undefined;
if (isMessageEventBusDestinationOptions(req.body)) {
let result;
switch (req.body.__type) {
case MessageEventBusDestinationTypeNames.sentry:
if (isMessageEventBusDestinationSentryOptions(req.body)) {
Expand Down Expand Up @@ -214,51 +211,41 @@ eventBusRouter.post(
if (result) {
await result.saveToDb();
return {
...result,
...result.serialize(),
eventBusInstance: undefined,
};
}
throw new BadRequestError('There was an error adding the destination');
}
throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions');
}),
);
}

eventBusRouter.get(
'/testmessage',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
let result = false;
@Get('/testmessage')
async sendTestMessage(req: express.Request): Promise<boolean> {
if (isWithIdString(req.query)) {
result = await eventBus.testDestination(req.query.id);
return eventBus.testDestination(req.query.id);
}
return result;
}),
);
return false;
}

eventBusRouter.delete(
'/destination',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Delete('/destination', { middlewares: [isOwnerMiddleware] })
async deleteDestination(req: express.Request): Promise<DeleteResult | undefined> {
if (!req.user || (req.user as User).globalRole.name !== 'owner') {
throw new ResponseHelper.UnauthorizedError('Invalid request');
}
if (isWithIdString(req.query)) {
const result = await eventBus.removeDestination(req.query.id);
if (result) {
return result;
}
return eventBus.removeDestination(req.query.id);
} else {
throw new BadRequestError('Query is missing id');
}
}),
);
}

// ----------------------------------------
// Utilities
// ----------------------------------------
// ----------------------------------------
// Utilities
// ----------------------------------------

eventBusRouter.get(
'/eventnames',
ResponseHelper.send(async (): Promise<any> => {
@Get('/eventnames')
async getEventNames(): Promise<string[]> {
return eventNamesAll;
}),
);
}
}
12 changes: 12 additions & 0 deletions packages/cli/src/middlewares/isOwner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { RequestHandler } from 'express';
import { LoggerProxy } from 'n8n-workflow';
import type { AuthenticatedRequest } from '@/requests';

export const isOwnerMiddleware: RequestHandler = (req: AuthenticatedRequest, res, next) => {
if (req.user.globalRole.name === 'owner') {
next();
} else {
LoggerProxy.debug('Request failed because user is not owner');
res.status(401).send('Unauthorized');
}
};
2 changes: 1 addition & 1 deletion packages/cli/test/integration/eventbus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ beforeAll(async () => {

utils.initConfigFile();
config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter');
config.set('eventBus.logWriter.keepLogCount', '1');
config.set('eventBus.logWriter.keepLogCount', 1);
config.set('enterprise.features.logStreaming', true);
config.set('userManagement.disabled', false);
config.set('userManagement.isInstanceOwnerSetUp', true);
Expand Down
Loading

0 comments on commit ad6de22

Please sign in to comment.