Skip to content

Commit

Permalink
feat: Execution custom data saving and filtering (#5496)
Browse files Browse the repository at this point in the history
* wip: workflow execution filtering

* fix: import type failing to build

* fix: remove console.logs

* feat: execution metadata migrations

* fix(editor): Move global executions filter to its own component

* fix(editor): Using the same filter component in workflow level

* fix(editor): a small housekeeping

* checking workflowId in filter applied

* fix(editor): update filter after resolving merge conflicts

* fix(editor): unify empy filter status

* feat(editor): add datetime picker to filter

* feat(editor): add meta fields

* fix: fix button override in datepicker panel

* feat(editor): add filter metadata

* feat(core): add 'startedBefore' execution filter prop

* feat(core): add 'tags' execution query filter

* Revert "feat(core): add 'tags' execution query filter"

This reverts commit a7b9680.

* feat(editor): add translations and tooltip and counting selected filter props

* fix(editor): fix label layouts

* fix(editor): update custom data docs link

* fix(editor): update custom data tooltip position

* fix(editor): update tooltip text

* refactor: Ignore metadata if not enabled by license

* fix(editor): Add paywall states to advanced execution filter

* refactor: Save custom data also for worker mode

* fix: Remove duplicate migration name from list

* fix(editor): Reducing filter complexity and add debounce to text inputs

* fix(editor): Remove unused import, add comment

* fix(editor): simplify event listener

* fix: Prevent error when there are running executions

* test(editor): Add advanced execution filter basic unit test

* test(editor): Add advanced execution filter state change unit test

* fix: Small lint issue

* feat: Add indices to speed up queries

* feat: add customData limits

* refactor: put metadata save in transaction

* chore: remove unneed comment

* test: add tests for execution metadata

* fix(editor): Fixes after merge conflict

* fix(editor): Remove unused import

* wordings and ui fixes

* fix(editor): type fixes

* feat: add code node autocompletions for customData

* fix: Prevent transaction issues and ambiguous ID in sql clauses

* fix(editor): Suppress requesting current executions if metadata is used in filter (#5739)

* fix(editor): Suppress requesting current executions if metadata is used in filter

* fix(editor): Fix arrows for select in popover

* refactor: Improve performance by correcting database indices

* fix: Lint issue

* test: Fix broken test

* fix: Broken test

* test: add call data check for saveExecutionMetadata test

---------

Co-authored-by: Valya Bullions <[email protected]>
Co-authored-by: Alex Grozav <[email protected]>
Co-authored-by: Omar Ajoue <[email protected]>
Co-authored-by: Romain Minaud <[email protected]>
  • Loading branch information
5 people authored Mar 23, 2023
1 parent 4c583e2 commit d78a41d
Show file tree
Hide file tree
Showing 30 changed files with 1,430 additions and 269 deletions.
2 changes: 2 additions & 0 deletions packages/cli/src/Db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ export async function init(
collections.InstalledPackages = linkRepository(entities.InstalledPackages);
collections.InstalledNodes = linkRepository(entities.InstalledNodes);
collections.WorkflowStatistics = linkRepository(entities.WorkflowStatistics);
collections.ExecutionMetadata = linkRepository(entities.ExecutionMetadata);

collections.EventDestinations = linkRepository(entities.EventDestinations);

isInitialized = true;
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/src/Interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import type { WebhookEntity } from '@db/entities/WebhookEntity';
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import type { WorkflowStatistics } from '@db/entities/WorkflowStatistics';
import type { EventDestinations } from '@db/entities/MessageEventBusDestinationEntity';
import type { ExecutionMetadata } from './databases/entities/ExecutionMetadata';

export interface IActivationError {
time: number;
Expand Down Expand Up @@ -88,6 +89,7 @@ export interface IDatabaseCollections {
InstalledNodes: Repository<InstalledNodes>;
WorkflowStatistics: Repository<WorkflowStatistics>;
EventDestinations: Repository<EventDestinations>;
ExecutionMetadata: Repository<ExecutionMetadata>;
}

// ----------------------------------
Expand Down
33 changes: 33 additions & 0 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import { PermissionChecker } from './UserManagement/PermissionChecker';
import { WorkflowsService } from './workflows/workflows.services';
import { Container } from 'typedi';
import { InternalHooks } from '@/InternalHooks';
import type { ExecutionMetadata } from './databases/entities/ExecutionMetadata';

const ERROR_TRIGGER_TYPE = config.getEnv('nodes.errorTriggerType');

Expand Down Expand Up @@ -264,6 +265,22 @@ async function pruneExecutionData(this: WorkflowHooks): Promise<void> {
}
}

export async function saveExecutionMetadata(
executionId: string,
executionMetadata: Record<string, string>,
): Promise<ExecutionMetadata[]> {
const metadataRows = [];
for (const [key, value] of Object.entries(executionMetadata)) {
metadataRows.push({
execution: { id: executionId },
key,
value,
});
}

return Db.collections.ExecutionMetadata.save(metadataRows);
}

/**
* Returns hook functions to push data to Editor-UI
*
Expand Down Expand Up @@ -657,6 +674,14 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
executionData as IExecutionFlattedDb,
);

try {
if (fullRunData.data.resultData.metadata) {
await saveExecutionMetadata(this.executionId, fullRunData.data.resultData.metadata);
}
} catch (e) {
Logger.error(`Failed to save metadata for execution ID ${this.executionId}`, e);
}

if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
// await Db.collections.Execution.save(executionData as IExecutionFlattedDb);
Expand Down Expand Up @@ -789,6 +814,14 @@ function hookFunctionsSaveWorker(): IWorkflowExecuteHooks {
status: executionData.status,
});

try {
if (fullRunData.data.resultData.metadata) {
await saveExecutionMetadata(this.executionId, fullRunData.data.resultData.metadata);
}
} catch (e) {
Logger.error(`Failed to save metadata for execution ID ${this.executionId}`, e);
}

if (fullRunData.finished === true && this.retryOf !== undefined) {
// If the retry was successful save the reference it on the original execution
await Db.collections.Execution.update(this.retryOf, {
Expand Down
6 changes: 5 additions & 1 deletion packages/cli/src/databases/entities/ExecutionEntity.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { ExecutionStatus, WorkflowExecuteMode } from 'n8n-workflow';
import { Column, Entity, Generated, Index, PrimaryColumn } from 'typeorm';
import { Column, Entity, Generated, Index, OneToMany, PrimaryColumn } from 'typeorm';
import { datetimeColumnType, jsonColumnType } from './AbstractEntity';
import { IWorkflowDb } from '@/Interfaces';
import type { IExecutionFlattedDb } from '@/Interfaces';
import { idStringifier } from '../utils/transformers';
import type { ExecutionMetadata } from './ExecutionMetadata';

@Entity()
@Index(['workflowId', 'id'])
Expand Down Expand Up @@ -49,4 +50,7 @@ export class ExecutionEntity implements IExecutionFlattedDb {

@Column({ type: datetimeColumnType, nullable: true })
waitTill: Date;

@OneToMany('ExecutionMetadata', 'execution')
metadata: ExecutionMetadata[];
}
22 changes: 22 additions & 0 deletions packages/cli/src/databases/entities/ExecutionMetadata.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Column, Entity, ManyToOne, PrimaryGeneratedColumn, RelationId } from 'typeorm';
import { ExecutionEntity } from './ExecutionEntity';

@Entity()
export class ExecutionMetadata {
@PrimaryGeneratedColumn()
id: number;

@ManyToOne('ExecutionEntity', 'metadata', {
onDelete: 'CASCADE',
})
execution: ExecutionEntity;

@RelationId((executionMetadata: ExecutionMetadata) => executionMetadata.execution)
executionId: number;

@Column('text')
key: string;

@Column('text')
value: string;
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { User } from './User';
import { WebhookEntity } from './WebhookEntity';
import { WorkflowEntity } from './WorkflowEntity';
import { WorkflowStatistics } from './WorkflowStatistics';
import { ExecutionMetadata } from './ExecutionMetadata';

export const entities = {
AuthIdentity,
Expand All @@ -33,4 +34,5 @@ export const entities = {
WebhookEntity,
WorkflowEntity,
WorkflowStatistics,
ExecutionMetadata,
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';

export class CreateExecutionMetadataTable1679416281779 implements MigrationInterface {
name = 'CreateExecutionMetadataTable1679416281779';

public async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);
const tablePrefix = getTablePrefix();

await queryRunner.query(
`CREATE TABLE ${tablePrefix}execution_metadata (
id int(11) auto_increment NOT NULL PRIMARY KEY,
executionId int(11) NOT NULL,
\`key\` TEXT NOT NULL,
value TEXT NOT NULL,
CONSTRAINT \`${tablePrefix}execution_metadata_FK\` FOREIGN KEY (\`executionId\`) REFERENCES \`${tablePrefix}execution_entity\` (\`id\`) ON DELETE CASCADE,
INDEX \`IDX_${tablePrefix}6d44376da6c1058b5e81ed8a154e1fee106046eb\` (\`executionId\` ASC)
)
ENGINE=InnoDB`,
);

// Remove indices that are no longer needed since the addition of the status column
await queryRunner.query(
`DROP INDEX \`IDX_${tablePrefix}06da892aaf92a48e7d3e400003\` ON \`${tablePrefix}execution_entity\``,
);
await queryRunner.query(
`DROP INDEX \`IDX_${tablePrefix}78d62b89dc1433192b86dce18a\` ON \`${tablePrefix}execution_entity\``,
);
await queryRunner.query(
`DROP INDEX \`IDX_${tablePrefix}1688846335d274033e15c846a4\` ON \`${tablePrefix}execution_entity\``,
);
await queryRunner.query(
`DROP INDEX \`IDX_${tablePrefix}cefb067df2402f6aed0638a6c1\` ON \`${tablePrefix}execution_entity\``,
);

// Add index to the new status column
await queryRunner.query(
`CREATE INDEX \`IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584\` ON \`${tablePrefix}execution_entity\` (\`status\`, \`workflowId\`)`,
);

logMigrationEnd(this.name);
}

public async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = getTablePrefix();

await queryRunner.query(`DROP TABLE "${tablePrefix}execution_metadata"`);
await queryRunner.query(
`CREATE INDEX \`IDX_${tablePrefix}06da892aaf92a48e7d3e400003\` ON \`${tablePrefix}execution_entity\` (\`workflowId\`, \`waitTill\`, \`id\`)`,
);
await queryRunner.query(
`CREATE INDEX \`IDX_${tablePrefix}78d62b89dc1433192b86dce18a\` ON \`${tablePrefix}execution_entity\` (\`workflowId\`, \`finished\`, \`id\`)`,
);
await queryRunner.query(
`CREATE INDEX \`IDX_${tablePrefix}1688846335d274033e15c846a4\` ON \`${tablePrefix}execution_entity\` (\`finished\`, \`id\`)`,
);
await queryRunner.query(
'CREATE INDEX `IDX_' +
tablePrefix +
'cefb067df2402f6aed0638a6c1` ON `' +
tablePrefix +
'execution_entity` (`stoppedAt`)',
);
await queryRunner.query(
`DROP INDEX \`IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584\` ON \`${tablePrefix}execution_entity\``,
);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/mysqldb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-Pu
import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions';
import { MigrateExecutionStatus1676996103000 } from './1676996103000-MigrateExecutionStatus';
import { UpdateRunningExecutionStatus1677236788851 } from './1677236788851-UpdateRunningExecutionStatus';
import { CreateExecutionMetadataTable1679416281779 } from './1679416281779-CreateExecutionMetadataTable';

export const mysqlMigrations = [
InitialMigration1588157391238,
Expand Down Expand Up @@ -72,4 +73,5 @@ export const mysqlMigrations = [
AddStatusToExecutions1674138566000,
MigrateExecutionStatus1676996103000,
UpdateRunningExecutionStatus1677236788851,
CreateExecutionMetadataTable1679416281779,
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';

export class CreateExecutionMetadataTable1679416281778 implements MigrationInterface {
name = 'CreateExecutionMetadataTable1679416281778';

public async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);
const tablePrefix = getTablePrefix();

await queryRunner.query(
`CREATE TABLE ${tablePrefix}execution_metadata (
"id" serial4 NOT NULL PRIMARY KEY,
"executionId" int4 NOT NULL,
"key" text NOT NULL,
"value" text NOT NULL,
CONSTRAINT ${tablePrefix}execution_metadata_fk FOREIGN KEY ("executionId") REFERENCES ${tablePrefix}execution_entity(id) ON DELETE CASCADE
)`,
);

await queryRunner.query(
`CREATE INDEX "IDX_${tablePrefix}6d44376da6c1058b5e81ed8a154e1fee106046eb" ON "${tablePrefix}execution_metadata" ("executionId");`,
);

// Remove indices that are no longer needed since the addition of the status column
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}33228da131bb1112247cf52a42"`);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}72ffaaab9f04c2c1f1ea86e662"`);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}58154df94c686818c99fb754ce"`);
await queryRunner.query(`DROP INDEX IF EXISTS "IDX_${tablePrefix}4f474ac92be81610439aaad61e"`);

// Create new index for status
await queryRunner.query(
`CREATE INDEX "IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584" ON "${tablePrefix}execution_entity" ("status", "workflowId");`,
);

logMigrationEnd(this.name);
}

public async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = getTablePrefix();

// Re-add removed indices
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}33228da131bb1112247cf52a42" ON ${tablePrefix}execution_entity ("stoppedAt") `,
);
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}72ffaaab9f04c2c1f1ea86e662" ON ${tablePrefix}execution_entity ("finished", "id") `,
);
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}58154df94c686818c99fb754ce" ON ${tablePrefix}execution_entity ("workflowId", "waitTill", "id") `,
);
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}4f474ac92be81610439aaad61e" ON ${tablePrefix}execution_entity ("workflowId", "finished", "id") `,
);

await queryRunner.query(
`DROP INDEX IF EXISTS "IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584"`,
);

await queryRunner.query(`DROP TABLE "${tablePrefix}execution_metadata"`);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/postgresdb/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-Pu
import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions';
import { MigrateExecutionStatus1676996103000 } from './1676996103000-MigrateExecutionStatus';
import { UpdateRunningExecutionStatus1677236854063 } from './1677236854063-UpdateRunningExecutionStatus';
import { CreateExecutionMetadataTable1679416281778 } from './1679416281778-CreateExecutionMetadataTable';

export const postgresMigrations = [
InitialMigration1587669153312,
Expand Down Expand Up @@ -68,4 +69,5 @@ export const postgresMigrations = [
AddStatusToExecutions1674138566000,
MigrateExecutionStatus1676996103000,
UpdateRunningExecutionStatus1677236854063,
CreateExecutionMetadataTable1679416281778,
];
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { MigrationInterface, QueryRunner, Table } from 'typeorm';
import { getTablePrefix, logMigrationEnd, logMigrationStart } from '@db/utils/migrationHelpers';

export class CreateExecutionMetadataTable1679416281777 implements MigrationInterface {
name = 'CreateExecutionMetadataTable1679416281777';

public async up(queryRunner: QueryRunner): Promise<void> {
logMigrationStart(this.name);
const tablePrefix = getTablePrefix();

await queryRunner.query(
`CREATE TABLE "${tablePrefix}execution_metadata" (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
executionId INTEGER NOT NULL,
"key" TEXT NOT NULL,
value TEXT NOT NULL,
CONSTRAINT ${tablePrefix}execution_metadata_entity_FK FOREIGN KEY (executionId) REFERENCES ${tablePrefix}execution_entity(id) ON DELETE CASCADE
)`,
);

await queryRunner.query(
`CREATE INDEX IF NOT EXISTS "IDX_${tablePrefix}6d44376da6c1058b5e81ed8a154e1fee106046eb" ON "${tablePrefix}execution_metadata" ("executionId");`,
);

// Re add some lost indices from migration DeleteExecutionsWithWorkflows.ts
// that were part of AddExecutionEntityIndexes.ts
// not all were needed since we added the `status` column to execution_entity
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS 'IDX_${tablePrefix}b94b45ce2c73ce46c54f20b5f9' ON '${tablePrefix}execution_entity' ('waitTill', 'id') `,
);
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS 'IDX_${tablePrefix}81fc04c8a17de15835713505e4' ON '${tablePrefix}execution_entity' ('workflowId', 'id') `,
);

// Also add index to the new status column
await queryRunner.query(
`CREATE INDEX IF NOT EXISTS 'IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584' ON '${tablePrefix}execution_entity' ('status', 'workflowId') `,
);

// Remove no longer needed index to waitTill since it's already covered by the index b94b45ce2c73ce46c54f20b5f9 above
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}ca4a71b47f28ac6ea88293a8e2'`);
// Remove index for stoppedAt since it's not used anymore
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}cefb067df2402f6aed0638a6c1'`);

logMigrationEnd(this.name);
}

public async down(queryRunner: QueryRunner): Promise<void> {
const tablePrefix = getTablePrefix();

await queryRunner.query(`DROP TABLE "${tablePrefix}execution_metadata"`);
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}b94b45ce2c73ce46c54f20b5f9'`);
await queryRunner.query(`DROP INDEX IF EXISTS 'IDX_${tablePrefix}81fc04c8a17de15835713505e4'`);
await queryRunner.query(
`DROP INDEX IF EXISTS 'IDX_${tablePrefix}8b6f3f9ae234f137d707b98f3bf43584'`,
);
await queryRunner.query(
`CREATE INDEX "IDX_${tablePrefix}ca4a71b47f28ac6ea88293a8e2" ON "${tablePrefix}execution_entity" ("waitTill")`,
);
await queryRunner.query(
`CREATE INDEX "IDX_${tablePrefix}cefb067df2402f6aed0638a6c1" ON "${tablePrefix}execution_entity" ("stoppedAt")`,
);
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/databases/migrations/sqlite/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { PurgeInvalidWorkflowConnections1675940580449 } from './1675940580449-Pu
import { AddStatusToExecutions1674138566000 } from './1674138566000-AddStatusToExecutions';
import { MigrateExecutionStatus1676996103000 } from './1676996103000-MigrateExecutionStatus';
import { UpdateRunningExecutionStatus1677237073720 } from './1677237073720-UpdateRunningExecutionStatus';
import { CreateExecutionMetadataTable1679416281777 } from './1679416281777-CreateExecutionMetadataTable';

const sqliteMigrations = [
InitialMigration1588102412422,
Expand Down Expand Up @@ -66,6 +67,7 @@ const sqliteMigrations = [
AddStatusToExecutions1674138566000,
MigrateExecutionStatus1676996103000,
UpdateRunningExecutionStatus1677237073720,
CreateExecutionMetadataTable1679416281777,
];

export { sqliteMigrations };
Loading

0 comments on commit d78a41d

Please sign in to comment.