Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): Decouple post workflow execute event from internal hooks (no-changelog) #10280

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 2 additions & 156 deletions packages/cli/src/InternalHooks.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,10 @@
import { Service } from 'typedi';
import { snakeCase } from 'change-case';
import { get as pslGet } from 'psl';
import type {
ExecutionStatus,
INodesGraphResult,
IRun,
ITelemetryTrackProperties,
IWorkflowBase,
} from 'n8n-workflow';
import { TelemetryHelpers } from 'n8n-workflow';

import { N8N_VERSION } from '@/constants';
import type { ITelemetryTrackProperties } from 'n8n-workflow';
import type { AuthProviderType } from '@db/entities/AuthIdentity';
import type { User } from '@db/entities/User';
import { SharedWorkflowRepository } from '@db/repositories/sharedWorkflow.repository';
import { determineFinalExecutionStatus } from '@/executionLifecycleHooks/shared/sharedHookFunctions';
import type { ITelemetryUserDeletionData, IExecutionTrackProperties } from '@/Interfaces';
import type { ITelemetryUserDeletionData } from '@/Interfaces';
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
import { NodeTypes } from '@/NodeTypes';
import { Telemetry } from '@/telemetry';
import { MessageEventBus } from './eventbus/MessageEventBus/MessageEventBus';

Expand All @@ -30,8 +17,6 @@ import { MessageEventBus } from './eventbus/MessageEventBus/MessageEventBus';
export class InternalHooks {
constructor(
private readonly telemetry: Telemetry,
private readonly nodeTypes: NodeTypes,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
workflowStatisticsService: WorkflowStatisticsService,
// Can't use @ts-expect-error because only dev time tsconfig considers this as an error, but not build time
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
Expand Down Expand Up @@ -64,145 +49,6 @@ export class InternalHooks {
this.telemetry.track('User responded to personalization questions', personalizationSurveyData);
}

// eslint-disable-next-line complexity
async onWorkflowPostExecute(
_executionId: string,
workflow: IWorkflowBase,
runData?: IRun,
userId?: string,
) {
if (!workflow.id) {
return;
}

if (runData?.status === 'waiting') {
// No need to send telemetry or logs when the workflow hasn't finished yet.
return;
}

const telemetryProperties: IExecutionTrackProperties = {
workflow_id: workflow.id,
is_manual: false,
version_cli: N8N_VERSION,
success: false,
};

if (userId) {
telemetryProperties.user_id = userId;
}

if (runData?.data.resultData.error?.message?.includes('canceled')) {
runData.status = 'canceled';
}

telemetryProperties.success = !!runData?.finished;

// const executionStatus: ExecutionStatus = runData?.status ?? 'unknown';
const executionStatus: ExecutionStatus = runData
? determineFinalExecutionStatus(runData)
: 'unknown';

if (runData !== undefined) {
telemetryProperties.execution_mode = runData.mode;
telemetryProperties.is_manual = runData.mode === 'manual';

let nodeGraphResult: INodesGraphResult | null = null;

if (!telemetryProperties.success && runData?.data.resultData.error) {
telemetryProperties.error_message = runData?.data.resultData.error.message;
let errorNodeName =
'node' in runData?.data.resultData.error
? runData?.data.resultData.error.node?.name
: undefined;
telemetryProperties.error_node_type =
'node' in runData?.data.resultData.error
? runData?.data.resultData.error.node?.type
: undefined;

if (runData.data.resultData.lastNodeExecuted) {
const lastNode = TelemetryHelpers.getNodeTypeForName(
workflow,
runData.data.resultData.lastNodeExecuted,
);

if (lastNode !== undefined) {
telemetryProperties.error_node_type = lastNode.type;
errorNodeName = lastNode.name;
}
}

if (telemetryProperties.is_manual) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
telemetryProperties.node_graph = nodeGraphResult.nodeGraph;
telemetryProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);

if (errorNodeName) {
telemetryProperties.error_node_id = nodeGraphResult.nameIndices[errorNodeName];
}
}
}

if (telemetryProperties.is_manual) {
if (!nodeGraphResult) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
}

let userRole: 'owner' | 'sharee' | undefined = undefined;
if (userId) {
const role = await this.sharedWorkflowRepository.findSharingRole(userId, workflow.id);
if (role) {
userRole = role === 'workflow:owner' ? 'owner' : 'sharee';
}
}

const manualExecEventProperties: ITelemetryTrackProperties = {
user_id: userId,
workflow_id: workflow.id,
status: executionStatus,
executionStatus: runData?.status ?? 'unknown',
error_message: telemetryProperties.error_message as string,
error_node_type: telemetryProperties.error_node_type,
node_graph_string: telemetryProperties.node_graph_string as string,
error_node_id: telemetryProperties.error_node_id as string,
webhook_domain: null,
sharing_role: userRole,
};

if (!manualExecEventProperties.node_graph_string) {
nodeGraphResult = TelemetryHelpers.generateNodesGraph(workflow, this.nodeTypes);
manualExecEventProperties.node_graph_string = JSON.stringify(nodeGraphResult.nodeGraph);
}

if (runData.data.startData?.destinationNode) {
const telemetryPayload = {
...manualExecEventProperties,
node_type: TelemetryHelpers.getNodeTypeForName(
workflow,
runData.data.startData?.destinationNode,
)?.type,
node_id: nodeGraphResult.nameIndices[runData.data.startData?.destinationNode],
};

this.telemetry.track('Manual node exec finished', telemetryPayload);
} else {
nodeGraphResult.webhookNodeNames.forEach((name: string) => {
const execJson = runData.data.resultData.runData[name]?.[0]?.data?.main?.[0]?.[0]
?.json as { headers?: { origin?: string } };
if (execJson?.headers?.origin && execJson.headers.origin !== '') {
manualExecEventProperties.webhook_domain = pslGet(
execJson.headers.origin.replace(/^https?:\/\//, ''),
);
}
});

this.telemetry.track('Manual workflow exec finished', manualExecEventProperties);
}
}
}

this.telemetry.trackWorkflowExecution(telemetryProperties);
}

onWorkflowSharingUpdate(workflowId: string, userId: string, userList: string[]) {
const properties: ITelemetryTrackProperties = {
workflow_id: workflowId,
Expand Down
12 changes: 2 additions & 10 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -644,13 +644,9 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
async function (this: WorkflowHooks, runData: IRun): Promise<void> {
const { executionId, workflowData: workflow } = this;

void internalHooks.onWorkflowPostExecute(executionId, workflow, runData);
eventService.emit('workflow-post-execute', {
workflowId: workflow.id,
workflowName: workflow.name,
workflow,
executionId,
success: runData.status === 'success',
isManual: runData.mode === 'manual',
runData,
});
},
Expand Down Expand Up @@ -933,13 +929,9 @@ async function executeWorkflow(

await externalHooks.run('workflow.postExecute', [data, workflowData, executionId]);

void internalHooks.onWorkflowPostExecute(executionId, workflowData, data, additionalData.userId);
eventService.emit('workflow-post-execute', {
workflowId: workflowData.id,
workflowName: workflowData.name,
workflow: workflowData,
executionId,
success: data.status === 'success',
isManual: data.mode === 'manual',
userId: additionalData.userId,
runData: data,
});
Expand Down
12 changes: 1 addition & 11 deletions packages/cli/src/WorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import * as WorkflowHelpers from '@/WorkflowHelpers';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
import { generateFailedExecutionFromError } from '@/WorkflowHelpers';
import { PermissionChecker } from '@/UserManagement/PermissionChecker';
import { InternalHooks } from '@/InternalHooks';
import { Logger } from '@/Logger';
import { WorkflowStaticDataService } from '@/workflows/workflowStaticData.service';
import { EventService } from './eventbus/event.service';
Expand Down Expand Up @@ -160,18 +159,9 @@ export class WorkflowRunner {
const postExecutePromise = this.activeExecutions.getPostExecutePromise(executionId);
postExecutePromise
.then(async (executionData) => {
void Container.get(InternalHooks).onWorkflowPostExecute(
executionId,
data.workflowData,
executionData,
data.userId,
);
this.eventService.emit('workflow-post-execute', {
workflowId: data.workflowData.id,
workflowName: data.workflowData.name,
workflow: data.workflowData,
executionId,
success: executionData?.status === 'success',
isManual: data.executionMode === 'manual',
userId: data.userId,
runData: executionData,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,27 +141,31 @@ describe('AuditEventRelay', () => {
it('should log on `workflow-post-execute` for successful execution', () => {
const payload = mock<Event['workflow-post-execute']>({
executionId: 'some-id',
success: true,
userId: 'some-id',
workflowId: 'some-id',
isManual: true,
workflowName: 'some-name',
metadata: {},
runData: mock<IRun>({ data: { resultData: {} } }),
workflow: mock<IWorkflowBase>({ id: 'some-id', name: 'some-name' }),
runData: mock<IRun>({ status: 'success', mode: 'manual', data: { resultData: {} } }),
});

eventService.emit('workflow-post-execute', payload);

const { runData: _, ...rest } = payload;
const { runData: _, workflow: __, ...rest } = payload;

expect(eventBus.sendWorkflowEvent).toHaveBeenCalledWith({
eventName: 'n8n.workflow.success',
payload: rest,
payload: {
...rest,
success: true,
isManual: true,
workflowName: 'some-name',
workflowId: 'some-id',
},
});
});

it('should handle `workflow-post-execute` event for unsuccessful execution', () => {
it('should log on `workflow-post-execute` event for unsuccessful execution', () => {
const runData = mock<IRun>({
status: 'error',
mode: 'manual',
data: {
resultData: {
lastNodeExecuted: 'some-node',
Expand All @@ -177,23 +181,23 @@ describe('AuditEventRelay', () => {

const event = {
executionId: 'some-id',
success: false,
userId: 'some-id',
workflowId: 'some-id',
isManual: true,
workflowName: 'some-name',
metadata: {},
workflow: mock<IWorkflowBase>({ id: 'some-id', name: 'some-name' }),
runData,
};

eventService.emit('workflow-post-execute', event);

const { runData: _, ...rest } = event;
const { runData: _, workflow: __, ...rest } = event;

expect(eventBus.sendWorkflowEvent).toHaveBeenCalledWith({
eventName: 'n8n.workflow.failed',
payload: {
...rest,
success: false,
isManual: true,
workflowName: 'some-name',
workflowId: 'some-id',
lastNodeExecuted: 'some-node',
errorNodeType: 'some-type',
errorMessage: 'some-message',
Expand Down
16 changes: 12 additions & 4 deletions packages/cli/src/eventbus/audit-event-relay.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,20 @@ export class AuditEventRelay {
}

private workflowPostExecute(event: Event['workflow-post-execute']) {
const { runData, ...rest } = event;
const { runData, workflow, ...rest } = event;

if (event.success) {
const payload = {
...rest,
success: runData?.status === 'success',
isManual: runData?.mode === 'manual',
workflowId: workflow.id,
workflowName: workflow.name,
};

if (payload.success) {
void this.eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.success',
payload: rest,
payload,
});

return;
Expand All @@ -136,7 +144,7 @@ export class AuditEventRelay {
void this.eventBus.sendWorkflowEvent({
eventName: 'n8n.workflow.failed',
payload: {
...rest,
...payload,
lastNodeExecuted: runData?.data.resultData.lastNodeExecuted,
errorNodeType:
runData?.data.resultData.error && 'node' in runData?.data.resultData.error
Expand Down
6 changes: 1 addition & 5 deletions packages/cli/src/eventbus/event.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,8 @@ export type Event = {

'workflow-post-execute': {
executionId: string;
success: boolean;
userId?: string;
workflowId: string;
isManual: boolean;
workflowName: string;
metadata?: Record<string, string>;
workflow: IWorkflowBase;
runData?: IRun;
};

Expand Down
16 changes: 1 addition & 15 deletions packages/cli/src/executions/execution-recovery.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { Push } from '@/push';
import { jsonStringify, sleep } from 'n8n-workflow';
import { ExecutionRepository } from '@db/repositories/execution.repository';
import { getWorkflowHooksMain } from '@/WorkflowExecuteAdditionalData'; // @TODO: Dependency cycle
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still a Dependency cycle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately yes, but pretty small:

  • ExecutionRecoveryService → Dependency cycle via @/eventbus/MessageEventBus/MessageEventBus:73
  • MessageEventBus → Dependency cycle via @/WorkflowExecuteAdditionalData:5

import { InternalHooks } from '@/InternalHooks'; // @TODO: Dependency cycle if injected
import type { DateTime } from 'luxon';
import type { IRun, ITaskData } from 'n8n-workflow';
import type { EventMessageTypes } from '../eventbus/EventMessageClasses';
Expand Down Expand Up @@ -280,22 +279,9 @@ export class ExecutionRecoveryService {
private async runHooks(execution: IExecutionResponse) {
execution.data ??= { resultData: { runData: {} } };

await Container.get(InternalHooks).onWorkflowPostExecute(execution.id, execution.workflowData, {
data: execution.data,
finished: false,
mode: execution.mode,
waitTill: execution.waitTill,
startedAt: execution.startedAt,
stoppedAt: execution.stoppedAt,
status: execution.status,
});

this.eventService.emit('workflow-post-execute', {
workflowId: execution.workflowData.id,
workflowName: execution.workflowData.name,
workflow: execution.workflowData,
executionId: execution.id,
success: execution.status === 'success',
isManual: execution.mode === 'manual',
runData: execution,
});

Expand Down
Loading
Loading