Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(Postgres Node): Remove reusable connections (no-changelog) #6259

Merged
Merged
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { IExecuteFunctions } from 'n8n-core';
import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';

import { updateDisplayOptions } from '../../../../../utils/utilities';

Expand Down Expand Up @@ -181,6 +182,13 @@ export async function execute(
valueToMatchOn = this.getNodeParameter('valueToMatchOn', i) as string;
}

if (!item[columnToMatchOn] && dataMode === 'autoMapInputData') {
throw new NodeOperationError(
this.getNode(),
"Column to match on not found in input item. Add a column to match on or set the 'Data Mode' to 'Define Below' to define the value to match on.",
);
}

const tableSchema = await getTableSchema(db, schema, table);

item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
Expand All @@ -195,6 +203,13 @@ export async function execute(

const updateColumns = Object.keys(item).filter((column) => column !== columnToMatchOn);

if (!Object.keys(updateColumns).length) {
throw new NodeOperationError(
this.getNode(),
"Add values to update to the input item or set the 'Data Mode' to 'Define Below' to define the values to update.",
);
}

const updates: string[] = [];

for (const column of updateColumns) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { IExecuteFunctions } from 'n8n-core';
import type { IDataObject, INodeExecutionData, INodeProperties } from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';

import { updateDisplayOptions } from '../../../../../utils/utilities';

Expand Down Expand Up @@ -179,6 +180,20 @@ export async function execute(
item[columnToMatchOn] = this.getNodeParameter('valueToMatchOn', i) as string;
}

if (!item[columnToMatchOn]) {
throw new NodeOperationError(
this.getNode(),
"Column to match on not found in input item. Add a column to match on or set the 'Data Mode' to 'Define Below' to define the value to match on.",
);
}

if (item[columnToMatchOn] && Object.keys(item).length === 1) {
throw new NodeOperationError(
this.getNode(),
"Add values to update or insert to the input item or set the 'Data Mode' to 'Define Below' to define the values to insert or update.",
);
}

const tableSchema = await getTableSchema(db, schema, table);

item = checkItemAgainstSchema(this.getNode(), item, tableSchema, i);
Expand Down
9 changes: 2 additions & 7 deletions packages/nodes-base/nodes/Postgres/v2/actions/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@ import { NodeOperationError } from 'n8n-workflow';
import type { PostgresType } from './node.type';

import * as database from './database/Database.resource';
import { Connections } from '../transport';
import { configurePostgres } from '../transport';
import { configureQueryRunner } from '../helpers/utils';
import type { ConnectionsData } from '../helpers/interfaces';

export async function router(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let returnData: INodeExecutionData[] = [];
Expand All @@ -19,11 +18,7 @@ export async function router(this: IExecuteFunctions): Promise<INodeExecutionDat
const options = this.getNodeParameter('options', 0, {});
options.nodeVersion = this.getNode().typeVersion;

const { db, pgp, sshClient } = (await Connections.getInstance(
credentials,
options,
true,
)) as ConnectionsData;
const { db, pgp, sshClient } = await configurePostgres(credentials, options);

const runQueries = configureQueryRunner(
this.getNode(),
Expand Down
14 changes: 3 additions & 11 deletions packages/nodes-base/nodes/Postgres/v2/methods/credentialTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import type {
INodeCredentialTestResult,
} from 'n8n-workflow';

import { Connections } from '../transport';
import { configurePostgres } from '../transport';

import { Client } from 'ssh2';
import type { ConnectionsData, PgpClient } from '../helpers/interfaces';
import type { PgpClient } from '../helpers/interfaces';

export async function postgresConnectionTest(
this: ICredentialTestFunctions,
Expand All @@ -20,12 +20,7 @@ export async function postgresConnectionTest(
let pgpClientCreated: PgpClient | undefined;

try {
const { db, pgp, sshClient } = (await Connections.getInstance(
credentials,
{},
true,
sshClientCreated,
)) as ConnectionsData;
const { db, pgp, sshClient } = await configurePostgres(credentials, {}, sshClientCreated);

sshClientCreated = sshClient;
pgpClientCreated = pgp;
Expand Down Expand Up @@ -57,9 +52,6 @@ export async function postgresConnectionTest(
if (pgpClientCreated) {
pgpClientCreated.end();
}

//set the connection instance to null so that it can be recreated
await Connections.getInstance({}, {}, false, undefined, true);
}
return {
status: 'OK',
Expand Down
18 changes: 14 additions & 4 deletions packages/nodes-base/nodes/Postgres/v2/methods/listSearch.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import type { ILoadOptionsFunctions, INodeListSearchResult } from 'n8n-workflow';
import type { ConnectionsData } from '../helpers/interfaces';
import { Connections } from '../transport';

import { configurePostgres } from '../transport';

export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials('postgres');
const options = { nodeVersion: this.getNode().typeVersion };

const { db } = (await Connections.getInstance(credentials, options)) as ConnectionsData;
const { db, pgp, sshClient } = await configurePostgres(credentials, options);

try {
const response = await db.any('SELECT schema_name FROM information_schema.schemata');
Expand All @@ -19,13 +19,18 @@ export async function schemaSearch(this: ILoadOptionsFunctions): Promise<INodeLi
};
} catch (error) {
throw error;
} finally {
if (sshClient) {
sshClient.end();
}
pgp.end();
}
}
export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeListSearchResult> {
const credentials = await this.getCredentials('postgres');
const options = { nodeVersion: this.getNode().typeVersion };

const { db } = (await Connections.getInstance(credentials, options)) as ConnectionsData;
const { db, pgp, sshClient } = await configurePostgres(credentials, options);

const schema = this.getNodeParameter('schema', 0, {
extractValue: true,
Expand All @@ -45,5 +50,10 @@ export async function tableSearch(this: ILoadOptionsFunctions): Promise<INodeLis
};
} catch (error) {
throw error;
} finally {
if (sshClient) {
sshClient.end();
}
pgp.end();
}
}
11 changes: 8 additions & 3 deletions packages/nodes-base/nodes/Postgres/v2/methods/loadOptions.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import type { ILoadOptionsFunctions, INodePropertyOptions } from 'n8n-workflow';
import type { ConnectionsData } from '../helpers/interfaces';

import { getTableSchema } from '../helpers/utils';
import { Connections } from '../transport';
import { configurePostgres } from '../transport';

export async function getColumns(this: ILoadOptionsFunctions): Promise<INodePropertyOptions[]> {
const credentials = await this.getCredentials('postgres');
const options = { nodeVersion: this.getNode().typeVersion };

const { db } = (await Connections.getInstance(credentials, options)) as ConnectionsData;
const { db, pgp, sshClient } = await configurePostgres(credentials, options);

const schema = this.getNodeParameter('schema', 0, {
extractValue: true,
Expand All @@ -27,6 +27,11 @@ export async function getColumns(this: ILoadOptionsFunctions): Promise<INodeProp
}));
} catch (error) {
throw error;
} finally {
if (sshClient) {
sshClient.end();
}
pgp.end();
}
}

Expand Down
43 changes: 7 additions & 36 deletions packages/nodes-base/nodes/Postgres/v2/transport/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import pgPromise from 'pg-promise';
import { rm, writeFile } from 'fs/promises';
import { file } from 'tmp-promise';

import type { PgpClient, PgpDatabase } from '../helpers/interfaces';
import type { PgpDatabase } from '../helpers/interfaces';

async function createSshConnectConfig(credentials: IDataObject) {
if (credentials.sshAuthenticateWith === 'password') {
Expand Down Expand Up @@ -40,12 +40,16 @@ async function createSshConnectConfig(credentials: IDataObject) {
}
}

async function configurePostgres(
export async function configurePostgres(
credentials: IDataObject,
options: IDataObject = {},
createdSshClient?: Client,
) {
const pgp = pgPromise();
const pgp = pgPromise({
// prevent spam in console "WARNING: Creating a duplicate database object for the same connection."
// duplicate connections created when auto loading parameters, they are closed imidiatly after, but several could be open at the same time
noWarnings: true,
});

if (typeof options.nodeVersion == 'number' && options.nodeVersion >= 2.1) {
// Always return dates as ISO strings
Expand Down Expand Up @@ -183,36 +187,3 @@ async function configurePostgres(
return { db, pgp, sshClient };
}
}

export const Connections = (function () {
let instance: { db: PgpDatabase; pgp: PgpClient; sshClient?: Client } | null = null;

return {
async getInstance(
credentials: IDataObject = {},
options: IDataObject = {},
reload = false,
createdSshClient?: Client,
nulify = false,
) {
if (nulify) {
instance = null;
return instance;
}

if (instance !== null && reload) {
if (instance.sshClient) {
instance.sshClient.end();
}
instance.pgp.end();

instance = null;
}

if (instance === null && Object.keys(credentials).length) {
instance = await configurePostgres(credentials, options, createdSshClient);
}
return instance;
},
};
})();