-
Notifications
You must be signed in to change notification settings - Fork 7.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: New trigger PostgreSQL (#5495)
* Boilerplate for PostgresTrigger * Create trigger function as a query * Add additional fields to customize trigger query * Add customizable channel name && operation name * Add concat () for function name * Add hints and placeholders * Add resource Locator to trigger postgres * Add the ability for knowing trigger event * Throw error for same function name * Remove console.logs * Remove schema from Chanel notifcation mode * Add UUID and save trigger in workflow static data drop function * Fix bug where wrongfully casted result in pgl * Correctly drops the resources when manually executing the trigger * Remove manual execution with special interaction * Remove console.logs * ♻️ Move related trigger functions to new file * fix target using 'schema."tableName"' in quotes To support targets with Uppercase table names * Remove static Data and use node id for uuid * Update deleting of the trigger and function * Fix regex expression for channel name * Change to drop cascade the trigger function * Replace functions on restart if no name has been defined * Parse payload result * Improve handling with hyphens in names * Remove duplicate code and clean up * Add payload on delete * Fix rlc * fixing uppercase tableName * fix multiple triggers/connections issues * fixing rlc pgp.end() issues * unify pgp init db method * drop trigger only in createTrigger mode --------- Co-authored-by: Marcus <[email protected]>
- Loading branch information
Showing
6 changed files
with
413 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
export interface IPostgresTrigger { | ||
triggerName: string; | ||
functionName: string; | ||
channelName: string; | ||
target: string; | ||
} |
137 changes: 137 additions & 0 deletions
137
packages/nodes-base/nodes/Postgres/PostgresTrigger.functions.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
import type { | ||
ITriggerFunctions, | ||
IDataObject, | ||
ILoadOptionsFunctions, | ||
INodeListSearchResult, | ||
INodeListSearchItems, | ||
} from 'n8n-workflow'; | ||
import pgPromise from 'pg-promise'; | ||
import type pg from 'pg-promise/typescript/pg-subset'; | ||
|
||
export async function pgTriggerFunction( | ||
this: ITriggerFunctions, | ||
db: pgPromise.IDatabase<{}, pg.IClient>, | ||
): Promise<void> { | ||
const schema = this.getNodeParameter('schema', 'public', { extractValue: true }) as string; | ||
const tableName = this.getNodeParameter('tableName', undefined, { | ||
extractValue: true, | ||
}) as string; | ||
const target = `${schema}."${tableName}"`; | ||
const firesOn = this.getNodeParameter('firesOn', 0) as string; | ||
const functionReplace = | ||
"CREATE OR REPLACE FUNCTION $1:raw RETURNS trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF AS $BODY$ begin perform pg_notify('$2:raw', row_to_json($3:raw)::text); return null; end; $BODY$;"; | ||
const dropIfExist = 'DROP TRIGGER IF EXISTS $1:raw ON $2:raw'; | ||
const functionExists = | ||
"CREATE FUNCTION $1:raw RETURNS trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF AS $BODY$ begin perform pg_notify('$2:raw', row_to_json($3:raw)::text); return null; end; $BODY$"; | ||
const trigger = | ||
'CREATE TRIGGER $4:raw AFTER $3:raw ON $1:raw FOR EACH ROW EXECUTE FUNCTION $2:raw'; | ||
const whichData = firesOn === 'DELETE' ? 'old' : 'new'; | ||
const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; | ||
const nodeId = this.getNode().id.replace(/-/g, '_'); | ||
let functionName = | ||
(additionalFields.functionName as string) || `n8n_trigger_function_${nodeId}()`; | ||
if (!functionName.includes('()')) { | ||
functionName = functionName.concat('()'); | ||
} | ||
const triggerName = (additionalFields.triggerName as string) || `n8n_trigger_${nodeId}`; | ||
const channelName = (additionalFields.channelName as string) || `n8n_channel_${nodeId}`; | ||
if (channelName.includes('-')) { | ||
throw new Error('Channel name cannot contain hyphens (-)'); | ||
} | ||
const replaceIfExists = additionalFields.replaceIfExists || false; | ||
try { | ||
if (replaceIfExists || !(additionalFields.triggerName || additionalFields.functionName)) { | ||
await db.any(functionReplace, [functionName, channelName, whichData]); | ||
await db.any(dropIfExist, [triggerName, target, whichData]); | ||
} else { | ||
await db.any(functionExists, [functionName, channelName, whichData]); | ||
} | ||
await db.any(trigger, [target, functionName, firesOn, triggerName]); | ||
} catch (err) { | ||
if (err.message.includes('near "-"')) { | ||
throw new Error('Names cannot contain hyphens (-)'); | ||
} | ||
throw new Error(err as string); | ||
} | ||
} | ||
|
||
export async function initDB(this: ITriggerFunctions | ILoadOptionsFunctions) { | ||
const credentials = await this.getCredentials('postgres'); | ||
const pgp = pgPromise({ | ||
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection." | ||
noWarnings: true, | ||
}); | ||
const config: IDataObject = { | ||
host: credentials.host as string, | ||
port: credentials.port as number, | ||
database: credentials.database as string, | ||
user: credentials.user as string, | ||
password: credentials.password as string, | ||
}; | ||
|
||
if (credentials.allowUnauthorizedCerts === true) { | ||
config.ssl = { | ||
rejectUnauthorized: false, | ||
}; | ||
} else { | ||
config.ssl = !['disable', undefined].includes(credentials.ssl as string | undefined); | ||
config.sslmode = (credentials.ssl as string) || 'disable'; | ||
} | ||
return pgp(config); | ||
} | ||
|
||
export async function searchSchema(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> { | ||
const db = await initDB.call(this); | ||
const schemaList = await db.any('SELECT schema_name FROM information_schema.schemata'); | ||
const results: INodeListSearchItems[] = schemaList.map((s) => ({ | ||
name: s.schema_name as string, | ||
value: s.schema_name as string, | ||
})); | ||
await db.$pool.end(); | ||
return { results }; | ||
} | ||
|
||
export async function searchTables(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> { | ||
const schema = this.getNodeParameter('schema', 0) as IDataObject; | ||
const db = await initDB.call(this); | ||
let tableList = []; | ||
try { | ||
tableList = await db.any( | ||
'SELECT table_name FROM information_schema.tables WHERE table_schema = $1', | ||
[schema.value], | ||
); | ||
} catch (error) { | ||
throw new Error(error as string); | ||
} | ||
const results: INodeListSearchItems[] = tableList.map((s) => ({ | ||
name: s.table_name as string, | ||
value: s.table_name as string, | ||
})); | ||
await db.$pool.end(); | ||
return { results }; | ||
} | ||
|
||
export async function dropTriggerFunction( | ||
this: ITriggerFunctions, | ||
db: pgPromise.IDatabase<{}, pg.IClient>, | ||
): Promise<void> { | ||
const schema = this.getNodeParameter('schema', undefined, { extractValue: true }) as string; | ||
const tableName = this.getNodeParameter('tableName', undefined, { | ||
extractValue: true, | ||
}) as string; | ||
const target = `${schema}."${tableName}"`; | ||
const additionalFields = this.getNodeParameter('additionalFields', 0) as IDataObject; | ||
const nodeId = this.getNode().id.replace(/-/g, '_'); | ||
let functionName = | ||
(additionalFields.functionName as string) || `n8n_trigger_function_${nodeId}()`; | ||
if (!functionName.includes('()')) { | ||
functionName = functionName.concat('()'); | ||
} | ||
const triggerName = (additionalFields.triggerName as string) || `n8n_trigger_${nodeId}`; | ||
try { | ||
await db.any('DROP FUNCTION IF EXISTS $1:raw CASCADE', [functionName]); | ||
await db.any('DROP TRIGGER IF EXISTS $1:raw ON $2:raw CASCADE', [triggerName, target]); | ||
} catch (error) { | ||
throw new Error(error as string); | ||
} | ||
} |
18 changes: 18 additions & 0 deletions
18
packages/nodes-base/nodes/Postgres/PostgresTrigger.node.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
{ | ||
"node": "n8n-nodes-base.postgresTrigger", | ||
"nodeVersion": "1.0", | ||
"codexVersion": "1.0", | ||
"categories": ["Development"], | ||
"resources": { | ||
"credentialDocumentation": [ | ||
{ | ||
"url": "https://docs.n8n.io/credentials/postgres" | ||
} | ||
], | ||
"primaryDocumentation": [ | ||
{ | ||
"url": "https://docs.n8n.io/integrations/builtin/app-nodes/n8n-nodes-base.postgres/" | ||
} | ||
] | ||
} | ||
} |
Oops, something went wrong.