Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Convert eventBus controller to decorator style and improve permissions #5779

Merged
merged 9 commits into from
Mar 27, 2023
5 changes: 2 additions & 3 deletions packages/cli/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ import { WaitTracker } from '@/WaitTracker';
import * as WebhookHelpers from '@/WebhookHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { toHttpNodeParameters } from '@/CurlConverterHelper';
import { eventBusRouter } from '@/eventbus/eventBusRoutes';
import { EventBusController } from '@/eventbus/eventBus.controller';
import { isLogStreamingEnabled } from '@/eventbus/MessageEventBus/MessageEventBusHelper';
import { licenseController } from './license/license.controller';
import { Push, setupPushServer, setupPushHandler } from '@/push';
Expand Down Expand Up @@ -377,6 +377,7 @@ class Server extends AbstractServer {
const samlService = Container.get(SamlService);

const controllers: object[] = [
new EventBusController(),
new AuthController({ config, internalHooks, repositories, logger, postHog }),
new OwnerController({ config, internalHooks, repositories, logger }),
new MeController({ externalHooks, internalHooks, repositories, logger }),
Expand Down Expand Up @@ -1229,8 +1230,6 @@ class Server extends AbstractServer {
if (!eventBus.isInitialized) {
await eventBus.initialize();
}
// add Event Bus REST endpoints
this.app.use(`/${this.restEndpoint}/eventbus`, eventBusRouter);

// ----------------------------------------
// Webhooks
Expand Down
8 changes: 8 additions & 0 deletions packages/cli/src/eventbus/EventMessageClasses/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,11 @@ export type EventMessageTypes =
| EventMessageWorkflow
| EventMessageAudit
| EventMessageNode;

export interface FailedEventSummary {
lastNodeExecuted: string;
executionId: string;
name: string;
event: string;
timestamp: string;
}
48 changes: 47 additions & 1 deletion packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { LoggerProxy } from 'n8n-workflow';
import type { MessageEventBusDestinationOptions } from 'n8n-workflow';
import type { DeleteResult } from 'typeorm';
import type { EventMessageTypes } from '../EventMessageClasses/';
import type {
EventMessageTypes,
EventNamesTypes,
FailedEventSummary,
} from '../EventMessageClasses/';
import type { MessageEventBusDestination } from '../MessageEventBusDestination/MessageEventBusDestination.ee';
import { MessageEventBusLogWriter } from '../MessageEventBusWriter/MessageEventBusLogWriter';
import EventEmitter from 'events';
Expand Down Expand Up @@ -249,6 +253,48 @@ export class MessageEventBus extends EventEmitter {
);
}

async getEventsFailed(amount = 5): Promise<FailedEventSummary[]> {
const result: FailedEventSummary[] = [];
try {
const queryResult = await this.logWriter?.getMessagesAll();
const uniques = uniqby(queryResult, 'id');
const filteredExecutionIds = uniques
.filter((e) =>
(['n8n.workflow.crashed', 'n8n.workflow.failed'] as EventNamesTypes[]).includes(
e.eventName,
),
)
.map((e) => ({
executionId: e.payload.executionId as string,
name: e.payload.workflowName,
timestamp: e.ts,
event: e.eventName,
}))
.filter((e) => e)
.sort((a, b) => (a.timestamp > b.timestamp ? 1 : -1))
.slice(-amount);

for (const execution of filteredExecutionIds) {
const data = await recoverExecutionDataFromEventLogMessages(
execution.executionId,
queryResult,
false,
);
if (data) {
const lastNodeExecuted = data.resultData.lastNodeExecuted;
result.push({
lastNodeExecuted: lastNodeExecuted ?? '',
executionId: execution.executionId,
name: execution.name as string,
event: execution.event,
timestamp: execution.timestamp.toISO(),
});
}
}
} catch {}
return result;
}

async getEventsAll(): Promise<EventMessageTypes[]> {
const queryResult = await this.logWriter?.getMessagesAll();
const filtered = uniqby(queryResult, 'id');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,26 @@ import {
MessageEventBusDestinationSyslog,
} from './MessageEventBusDestination/MessageEventBusDestinationSyslog.ee';
import { MessageEventBusDestinationWebhook } from './MessageEventBusDestination/MessageEventBusDestinationWebhook.ee';
import type { EventMessageTypes, FailedEventSummary } from './EventMessageClasses';
import { eventNamesAll } from './EventMessageClasses';
import type { EventMessageAuditOptions } from './EventMessageClasses/EventMessageAudit';
import { EventMessageAudit } from './EventMessageClasses/EventMessageAudit';
import { BadRequestError } from '../ResponseHelper';
import { BadRequestError } from '@/ResponseHelper';
import type {
MessageEventBusDestinationWebhookOptions,
MessageEventBusDestinationOptions,
IRunExecutionData,
} from 'n8n-workflow';
import { MessageEventBusDestinationTypeNames, EventMessageTypeNames } from 'n8n-workflow';
import type { User } from '../databases/entities/User';
import type { User } from '@db/entities/User';
import * as ResponseHelper from '@/ResponseHelper';
import type { EventMessageNodeOptions } from './EventMessageClasses/EventMessageNode';
import { EventMessageNode } from './EventMessageClasses/EventMessageNode';
import { recoverExecutionDataFromEventLogMessages } from './MessageEventBus/recoverEvents';

export const eventBusRouter = express.Router();
import { RestController, Get, Post, Delete } from '@/decorators';
import type { MessageEventBusDestination } from './MessageEventBusDestination/MessageEventBusDestination.ee';
import { isOwnerMiddleware } from '../middlewares/isOwner';
import type { DeleteResult } from 'typeorm';

// ----------------------------------------
// TypeGuards
Expand All @@ -50,7 +54,6 @@ const isWithQueryString = (candidate: unknown): candidate is { query: string } =
return o.query !== undefined;
};

// TODO: add credentials
const isMessageEventBusDestinationWebhookOptions = (
candidate: unknown,
): candidate is MessageEventBusDestinationWebhookOptions => {
Expand All @@ -68,11 +71,18 @@ const isMessageEventBusDestinationOptions = (
};

// ----------------------------------------
// Events
// Controller
// ----------------------------------------
eventBusRouter.get(
'/event',
ResponseHelper.send(async (req: express.Request): Promise<any> => {

@RestController('/eventbus')
export class EventBusController {
// ----------------------------------------
// Events
// ----------------------------------------
@Get('/event', { middlewares: [isOwnerMiddleware] })
async getEvents(
req: express.Request,
): Promise<EventMessageTypes[] | Record<string, EventMessageTypes[]>> {
if (isWithQueryString(req.query)) {
switch (req.query.query as EventMessageReturnMode) {
case 'sent':
Expand All @@ -85,55 +95,47 @@ eventBusRouter.get(
default:
return eventBus.getEventsAll();
}
} else {
return eventBus.getEventsAll();
}
return eventBus.getEventsAll();
}),
);
}

@Get('/failed')
async getFailedEvents(req: express.Request): Promise<FailedEventSummary[]> {
const amount = parseInt(req.query?.amount as string) ?? 5;
return eventBus.getEventsFailed(amount);
}

eventBusRouter.get(
'/execution/:id',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Get('/execution/:id')
async getEventForExecutionId(req: express.Request): Promise<EventMessageTypes[] | undefined> {
if (req.params?.id) {
let logHistory;
if (req.query?.logHistory) {
logHistory = parseInt(req.query.logHistory as string, 10);
}
return eventBus.getEventsByExecutionId(req.params.id, logHistory);
}
}),
);
return;
}

eventBusRouter.get(
'/execution-recover/:id',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Get('/execution-recover/:id')
async getRecoveryForExecutionId(req: express.Request): Promise<IRunExecutionData | undefined> {
const { id } = req.params;
if (req.params?.id) {
let logHistory;
let applyToDb = true;
if (req.query?.logHistory) {
logHistory = parseInt(req.query.logHistory as string, 10);
}
if (req.query?.applyToDb) {
applyToDb = !!req.query.applyToDb;
}
const messages = await eventBus.getEventsByExecutionId(req.params.id, logHistory);
const logHistory = parseInt(req.query.logHistory as string, 10) || undefined;
const applyToDb = req.query.applyToDb !== undefined ? !!req.query.applyToDb : true;
const messages = await eventBus.getEventsByExecutionId(id, logHistory);
if (messages.length > 0) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const recoverResult = await recoverExecutionDataFromEventLogMessages(
req.params.id,
messages,
applyToDb,
);
return recoverResult;
return recoverExecutionDataFromEventLogMessages(id, messages, applyToDb);
}
}
}),
);
return;
}

eventBusRouter.post(
'/event',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Post('/event', { middlewares: [isOwnerMiddleware] })
async postEvent(req: express.Request): Promise<EventMessageTypes | undefined> {
let msg: EventMessageTypes | undefined;
if (isEventMessageOptions(req.body)) {
let msg;
switch (req.body.__type) {
case EventMessageTypeNames.workflow:
msg = new EventMessageWorkflow(req.body as EventMessageWorkflowOptions);
Expand All @@ -154,35 +156,30 @@ eventBusRouter.post(
'Body is not a serialized EventMessage or eventName does not match format {namespace}.{domain}.{event}',
);
}
}),
);
return msg;
}

// ----------------------------------------
// Destinations
// ----------------------------------------
// ----------------------------------------
// Destinations
// ----------------------------------------

eventBusRouter.get(
'/destination',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
let result = [];
@Get('/destination')
async getDestination(req: express.Request): Promise<MessageEventBusDestinationOptions[]> {
if (isWithIdString(req.query)) {
result = await eventBus.findDestination(req.query.id);
return eventBus.findDestination(req.query.id);
} else {
result = await eventBus.findDestination();
return eventBus.findDestination();
}
return result;
}),
);
}

eventBusRouter.post(
'/destination',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Post('/destination', { middlewares: [isOwnerMiddleware] })
async postDestination(req: express.Request): Promise<any> {
if (!req.user || (req.user as User).globalRole.name !== 'owner') {
throw new ResponseHelper.UnauthorizedError('Invalid request');
}

let result: MessageEventBusDestination | undefined;
if (isMessageEventBusDestinationOptions(req.body)) {
let result;
switch (req.body.__type) {
case MessageEventBusDestinationTypeNames.sentry:
if (isMessageEventBusDestinationSentryOptions(req.body)) {
Expand Down Expand Up @@ -214,51 +211,41 @@ eventBusRouter.post(
if (result) {
await result.saveToDb();
return {
...result,
...result.serialize(),
eventBusInstance: undefined,
};
}
throw new BadRequestError('There was an error adding the destination');
}
throw new BadRequestError('Body is not configuring MessageEventBusDestinationOptions');
}),
);
}

eventBusRouter.get(
'/testmessage',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
let result = false;
@Get('/testmessage')
async sendTestMessage(req: express.Request): Promise<boolean> {
if (isWithIdString(req.query)) {
result = await eventBus.testDestination(req.query.id);
return eventBus.testDestination(req.query.id);
}
return result;
}),
);
return false;
}

eventBusRouter.delete(
'/destination',
ResponseHelper.send(async (req: express.Request): Promise<any> => {
@Delete('/destination', { middlewares: [isOwnerMiddleware] })
async deleteDestination(req: express.Request): Promise<DeleteResult | undefined> {
if (!req.user || (req.user as User).globalRole.name !== 'owner') {
throw new ResponseHelper.UnauthorizedError('Invalid request');
}
if (isWithIdString(req.query)) {
const result = await eventBus.removeDestination(req.query.id);
if (result) {
return result;
}
return eventBus.removeDestination(req.query.id);
} else {
throw new BadRequestError('Query is missing id');
}
}),
);
}

// ----------------------------------------
// Utilities
// ----------------------------------------
// ----------------------------------------
// Utilities
// ----------------------------------------

eventBusRouter.get(
'/eventnames',
ResponseHelper.send(async (): Promise<any> => {
@Get('/eventnames')
async getEventNames(): Promise<string[]> {
return eventNamesAll;
}),
);
}
}
12 changes: 12 additions & 0 deletions packages/cli/src/middlewares/isOwner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { RequestHandler } from 'express';
import { LoggerProxy } from 'n8n-workflow';
import type { AuthenticatedRequest } from '@/requests';

export const isOwnerMiddleware: RequestHandler = (req: AuthenticatedRequest, res, next) => {
if (req.user.globalRole.name === 'owner') {
next();
} else {
LoggerProxy.debug('Request failed because user is not owner');
res.status(401).send('Unauthorized');
}
};
2 changes: 1 addition & 1 deletion packages/cli/test/integration/eventbus.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ beforeAll(async () => {

utils.initConfigFile();
config.set('eventBus.logWriter.logBaseName', 'n8n-test-logwriter');
config.set('eventBus.logWriter.keepLogCount', '1');
config.set('eventBus.logWriter.keepLogCount', 1);
config.set('enterprise.features.logStreaming', true);
config.set('userManagement.disabled', false);
config.set('userManagement.isInstanceOwnerSetUp', true);
Expand Down
Loading