diff --git a/packages/nodes-base/nodes/Microsoft/Sql/GenericFunctions.ts b/packages/nodes-base/nodes/Microsoft/Sql/GenericFunctions.ts index 1b8e5322546b6..6ed51a31909e9 100644 --- a/packages/nodes-base/nodes/Microsoft/Sql/GenericFunctions.ts +++ b/packages/nodes-base/nodes/Microsoft/Sql/GenericFunctions.ts @@ -1,7 +1,8 @@ -/* eslint-disable @typescript-eslint/ban-types */ import type { IDataObject, INodeExecutionData } from 'n8n-workflow'; import { deepCopy } from 'n8n-workflow'; -import type { ITables } from './TableInterface'; +import type { ITables, OperationInputData } from './interfaces'; +import { chunk, flatten } from '@utils/utilities'; +import mssql from 'mssql'; /** * Returns a copy of the item which only contains the json data and @@ -30,6 +31,7 @@ export function copyInputItem(item: INodeExecutionData, properties: string[]): I * @param {function} getNodeParam getter for the Node's Parameters */ export function createTableStruct( + // eslint-disable-next-line @typescript-eslint/ban-types getNodeParam: Function, items: INodeExecutionData[], additionalProperties: string[] = [], @@ -61,10 +63,9 @@ export function createTableStruct( * @param {ITables} tables The ITables to be processed. * @param {function} buildQueryQueue function that builds the queue of promises */ - export async function executeQueryQueue( tables: ITables, - buildQueryQueue: Function, + buildQueryQueue: (data: OperationInputData) => Array>, ): Promise { return Promise.all( Object.keys(tables).map(async (table) => { @@ -82,68 +83,120 @@ export async function executeQueryQueue( ); } -/** - * Extracts the values from the item for INSERT - * - * @param {IDataObject} item The item to extract - */ -export function extractValues(item: IDataObject): string { - return `(${Object.values(item) - .map((val) => { - //the column cannot be found in the input - //so, set it to null in the sql query - if (val === null) { - return 'NULL'; - } else if (typeof val === 'string') { - return `'${val.replace(/'/g, "''")}'`; - } else if (typeof val === 'boolean') { - return +!!val; - } - return val; - }) // maybe other types such as dates have to be handled as well - .join(',')})`; +export function formatColumns(columns: string) { + return columns + .split(',') + .map((column) => `[${column.trim()}]`) + .join(', '); } -/** - * Extracts the SET from the item for UPDATE - * - * @param {IDataObject} item The item to extract from - * @param {string[]} columns The columns to update - */ -export function extractUpdateSet(item: IDataObject, columns: string[]): string { - return columns - .map( - (column) => - `"${column}" = ${typeof item[column] === 'string' ? `'${item[column]}'` : item[column]}`, - ) - .join(','); +export function configurePool(credentials: IDataObject) { + const config = { + server: credentials.server as string, + port: credentials.port as number, + database: credentials.database as string, + user: credentials.user as string, + password: credentials.password as string, + domain: credentials.domain ? (credentials.domain as string) : undefined, + connectionTimeout: credentials.connectTimeout as number, + requestTimeout: credentials.requestTimeout as number, + options: { + encrypt: credentials.tls as boolean, + enableArithAbort: false, + tdsVersion: credentials.tdsVersion as string, + trustServerCertificate: credentials.allowUnauthorizedCerts as boolean, + }, + }; + + return new mssql.ConnectionPool(config); } -/** - * Extracts the WHERE condition from the item for UPDATE - * - * @param {IDataObject} item The item to extract from - * @param {string} key The column name to build the condition with - */ -export function extractUpdateCondition(item: IDataObject, key: string): string { - return `${key} = ${typeof item[key] === 'string' ? `'${item[key]}'` : item[key]}`; +export async function insertOperation(tables: ITables, pool: mssql.ConnectionPool) { + return executeQueryQueue( + tables, + ({ table, columnString, items }: OperationInputData): Array> => { + return chunk(items, 1000).map(async (insertValues) => { + const request = pool.request(); + + const valuesPlaceholder = []; + + for (const [rIndex, entry] of insertValues.entries()) { + const row = Object.values(entry); + valuesPlaceholder.push(`(${row.map((_, vIndex) => `@r${rIndex}v${vIndex}`).join(', ')})`); + for (const [vIndex, value] of row.entries()) { + request.input(`r${rIndex}v${vIndex}`, value); + } + } + + const query = `INSERT INTO [${table}] (${formatColumns( + columnString, + )}) VALUES ${valuesPlaceholder.join(', ')};`; + + return request.query(query); + }); + }, + ); } -/** - * Extracts the WHERE condition from the items for DELETE - * - * @param {IDataObject[]} items The items to extract the values from - * @param {string} key The column name to extract the value from for the delete condition - */ -export function extractDeleteValues(items: IDataObject[], key: string): string { - return `(${items - .map((item) => (typeof item[key] === 'string' ? `'${item[key]}'` : item[key])) - .join(',')})`; +export async function updateOperation(tables: ITables, pool: mssql.ConnectionPool) { + return executeQueryQueue( + tables, + ({ table, columnString, items }: OperationInputData): Array> => { + return items.map(async (item) => { + const request = pool.request(); + const columns = columnString.split(',').map((column) => column.trim()); + + const setValues: string[] = []; + const condition = `${item.updateKey} = @condition`; + request.input('condition', item[item.updateKey as string]); + + for (const [index, col] of columns.entries()) { + setValues.push(`[${col}] = @v${index}`); + request.input(`v${index}`, item[col]); + } + + const query = `UPDATE [${table}] SET ${setValues.join(', ')} WHERE ${condition};`; + + return request.query(query); + }); + }, + ); } -export function formatColumns(columns: string) { - return columns - .split(',') - .map((column) => `"${column.trim()}"`) - .join(','); +export async function deleteOperation(tables: ITables, pool: mssql.ConnectionPool) { + const queriesResults = await Promise.all( + Object.keys(tables).map(async (table) => { + const deleteKeyResults = Object.keys(tables[table]).map(async (deleteKey) => { + const deleteItemsList = chunk( + tables[table][deleteKey].map((item) => + copyInputItem(item as INodeExecutionData, [deleteKey]), + ), + 1000, + ); + const queryQueue = deleteItemsList.map(async (deleteValues) => { + const request = pool.request(); + const valuesPlaceholder: string[] = []; + + for (const [index, entry] of deleteValues.entries()) { + valuesPlaceholder.push(`@v${index}`); + request.input(`v${index}`, entry[deleteKey]); + } + + const query = `DELETE FROM [${table}] WHERE [${deleteKey}] IN (${valuesPlaceholder.join( + ', ', + )});`; + + return request.query(query); + }); + return Promise.all(queryQueue); + }); + return Promise.all(deleteKeyResults); + }), + ); + + return flatten(queriesResults).reduce( + (acc: number, resp: mssql.IResult): number => + (acc += resp.rowsAffected.reduce((sum, val) => (sum += val))), + 0, + ); } diff --git a/packages/nodes-base/nodes/Microsoft/Sql/MicrosoftSql.node.ts b/packages/nodes-base/nodes/Microsoft/Sql/MicrosoftSql.node.ts index 7bbf15519a805..4105a7ea2d43a 100644 --- a/packages/nodes-base/nodes/Microsoft/Sql/MicrosoftSql.node.ts +++ b/packages/nodes-base/nodes/Microsoft/Sql/MicrosoftSql.node.ts @@ -11,21 +11,17 @@ import type { } from 'n8n-workflow'; import { NodeOperationError } from 'n8n-workflow'; -import mssql from 'mssql'; - -import type { ITables } from './TableInterface'; +import type { ITables } from './interfaces'; import { - copyInputItem, + configurePool, createTableStruct, - executeQueryQueue, - extractDeleteValues, - extractUpdateCondition, - extractUpdateSet, - extractValues, - formatColumns, + deleteOperation, + insertOperation, + updateOperation, } from './GenericFunctions'; -import { chunk, flatten, generatePairedItemData, getResolvables } from '@utils/utilities'; + +import { flatten, generatePairedItemData, getResolvables } from '@utils/utilities'; export class MicrosoftSql implements INodeType { description: INodeTypeDescription = { @@ -127,13 +123,13 @@ export class MicrosoftSql implements INodeType { displayName: 'Columns', name: 'columns', type: 'string', + requiresDataPath: 'multiple', displayOptions: { show: { operation: ['insert'], }, }, default: '', - placeholder: 'id,name,description', description: 'Comma-separated list of the properties which should used as columns for the new rows', @@ -159,6 +155,7 @@ export class MicrosoftSql implements INodeType { displayName: 'Update Key', name: 'updateKey', type: 'string', + requiresDataPath: 'single', displayOptions: { show: { operation: ['update'], @@ -174,6 +171,7 @@ export class MicrosoftSql implements INodeType { displayName: 'Columns', name: 'columns', type: 'string', + requiresDataPath: 'multiple', displayOptions: { show: { operation: ['update'], @@ -205,6 +203,7 @@ export class MicrosoftSql implements INodeType { displayName: 'Delete Key', name: 'deleteKey', type: 'string', + requiresDataPath: 'single', displayOptions: { show: { operation: ['delete'], @@ -227,23 +226,7 @@ export class MicrosoftSql implements INodeType { ): Promise { const credentials = credential.data as ICredentialDataDecryptedObject; try { - const config = { - server: credentials.server as string, - port: credentials.port as number, - database: credentials.database as string, - user: credentials.user as string, - password: credentials.password as string, - domain: credentials.domain ? (credentials.domain as string) : undefined, - connectionTimeout: credentials.connectTimeout as number, - requestTimeout: credentials.requestTimeout as number, - options: { - encrypt: credentials.tls as boolean, - enableArithAbort: false, - tdsVersion: credentials.tdsVersion as string, - trustServerCertificate: credentials.allowUnauthorizedCerts as boolean, - }, - }; - const pool = new mssql.ConnectionPool(config); + const pool = configurePool(credentials); await pool.connect(); } catch (error) { return { @@ -262,24 +245,7 @@ export class MicrosoftSql implements INodeType { async execute(this: IExecuteFunctions): Promise { const credentials = await this.getCredentials('microsoftSql'); - const config = { - server: credentials.server as string, - port: credentials.port as number, - database: credentials.database as string, - user: credentials.user as string, - password: credentials.password as string, - domain: credentials.domain ? (credentials.domain as string) : undefined, - connectionTimeout: credentials.connectTimeout as number, - requestTimeout: credentials.requestTimeout as number, - options: { - encrypt: credentials.tls as boolean, - enableArithAbort: false, - tdsVersion: credentials.tdsVersion as string, - trustServerCertificate: credentials.allowUnauthorizedCerts as boolean, - }, - }; - - const pool = new mssql.ConnectionPool(config); + const pool = configurePool(credentials); await pool.connect(); const returnItems: INodeExecutionData[] = []; @@ -290,10 +256,6 @@ export class MicrosoftSql implements INodeType { try { if (operation === 'executeQuery') { - // ---------------------------------- - // executeQuery - // ---------------------------------- - let rawQuery = this.getNodeParameter('query', 0) as string; for (const resolvable of getResolvables(rawQuery)) { @@ -309,76 +271,27 @@ export class MicrosoftSql implements INodeType { responseData = result; } else if (operation === 'insert') { - // ---------------------------------- - // insert - // ---------------------------------- - const tables = createTableStruct(this.getNodeParameter, items); - await executeQueryQueue( - tables, - ({ - table, - columnString, - // eslint-disable-next-line @typescript-eslint/no-shadow - items, - }: { - table: string; - columnString: string; - items: IDataObject[]; - }): Array> => { - return chunk(items, 1000).map(async (insertValues) => { - const values = insertValues.map((item: IDataObject) => extractValues(item)).join(','); - return pool - .request() - .query(`INSERT INTO ${table}(${formatColumns(columnString)}) VALUES ${values};`); - }); - }, - ); + + await insertOperation(tables, pool); responseData = items; } else if (operation === 'update') { - // ---------------------------------- - // update - // ---------------------------------- - const updateKeys = items.map( (item, index) => this.getNodeParameter('updateKey', index) as string, ); + const tables = createTableStruct( this.getNodeParameter, items, ['updateKey'].concat(updateKeys), 'updateKey', ); - await executeQueryQueue( - tables, - ({ - table, - columnString, - // eslint-disable-next-line @typescript-eslint/no-shadow - items, - }: { - table: string; - columnString: string; - items: IDataObject[]; - }): Array> => { - return items.map(async (item) => { - const columns = columnString.split(',').map((column) => column.trim()); - - const setValues = extractUpdateSet(item, columns); - const condition = extractUpdateCondition(item, item.updateKey as string); - return pool.request().query(`UPDATE ${table} SET ${setValues} WHERE ${condition};`); - }); - }, - ); + await updateOperation(tables, pool); responseData = items; } else if (operation === 'delete') { - // ---------------------------------- - // delete - // ---------------------------------- - const tables = items.reduce((acc, item, index) => { const table = this.getNodeParameter('table', index) as string; const deleteKey = this.getNodeParameter('deleteKey', index) as string; @@ -392,38 +305,7 @@ export class MicrosoftSql implements INodeType { return acc; }, {} as ITables); - const queriesResults = await Promise.all( - Object.keys(tables).map(async (table) => { - const deleteKeyResults = Object.keys(tables[table]).map(async (deleteKey) => { - const deleteItemsList = chunk( - tables[table][deleteKey].map((item) => - copyInputItem(item as INodeExecutionData, [deleteKey]), - ), - 1000, - ); - const queryQueue = deleteItemsList.map(async (deleteValues) => { - return pool - .request() - .query( - `DELETE FROM ${table} WHERE "${deleteKey}" IN ${extractDeleteValues( - deleteValues, - deleteKey, - )};`, - ); - }); - return Promise.all(queryQueue); - }); - return Promise.all(deleteKeyResults); - }), - ); - - const rowsDeleted = flatten(queriesResults).reduce( - (acc: number, resp: mssql.IResult): number => - (acc += resp.rowsAffected.reduce((sum, val) => (sum += val))), - 0, - ); - - responseData = rowsDeleted; + responseData = await deleteOperation(tables, pool); } else { await pool.close(); throw new NodeOperationError( @@ -444,6 +326,7 @@ export class MicrosoftSql implements INodeType { await pool.close(); const itemData = generatePairedItemData(items.length); + const executionData = this.helpers.constructExecutionMetaData( this.helpers.returnJsonArray(responseData), { itemData }, diff --git a/packages/nodes-base/nodes/Microsoft/Sql/TableInterface.ts b/packages/nodes-base/nodes/Microsoft/Sql/interfaces.ts similarity index 56% rename from packages/nodes-base/nodes/Microsoft/Sql/TableInterface.ts rename to packages/nodes-base/nodes/Microsoft/Sql/interfaces.ts index ec4b098ea5d3a..45bc3e2b3f8cd 100644 --- a/packages/nodes-base/nodes/Microsoft/Sql/TableInterface.ts +++ b/packages/nodes-base/nodes/Microsoft/Sql/interfaces.ts @@ -5,3 +5,9 @@ export interface ITables { [key: string]: IDataObject[]; }; } + +export type OperationInputData = { + table: string; + columnString: string; + items: IDataObject[]; +}; diff --git a/packages/nodes-base/nodes/Microsoft/Sql/test/utils.test.ts b/packages/nodes-base/nodes/Microsoft/Sql/test/utils.test.ts new file mode 100644 index 0000000000000..15686bbaf6340 --- /dev/null +++ b/packages/nodes-base/nodes/Microsoft/Sql/test/utils.test.ts @@ -0,0 +1,145 @@ +import { Request } from 'mssql'; +import type { IDataObject } from 'n8n-workflow'; +import { + configurePool, + deleteOperation, + insertOperation, + updateOperation, +} from '../GenericFunctions'; + +describe('MSSQL tests', () => { + let querySpy: jest.SpyInstance>; + let request: Request; + + const assertParameters = (parameters: unknown[][] | IDataObject) => { + if (Array.isArray(parameters)) { + parameters.forEach((values, rowIndex) => { + values.forEach((value, index) => { + const received = (request.parameters[`r${rowIndex}v${index}`] as IDataObject).value; + expect(received).toEqual(value); + }); + }); + } else { + for (const key in parameters) { + expect((request.parameters[key] as IDataObject).value).toEqual(parameters[key]); + } + } + }; + + beforeEach(() => { + jest.resetAllMocks(); + querySpy = jest.spyOn(Request.prototype, 'query').mockImplementation(async function ( + this: Request, + ) { + // eslint-disable-next-line @typescript-eslint/no-this-alias + request = this; + return [ + [ + [ + { + recordsets: [], + recordset: undefined, + output: {}, + rowsAffected: [0], + }, + ], + ], + ]; + }); + }); + + it('should perform insert operation', async () => { + const pool = configurePool({}); + const tables = { + users: { + 'id, name, age, active': [ + { + id: 1, + name: 'Sam', + age: 31, + active: false, + }, + { + id: 3, + name: 'Jon', + age: null, + active: true, + }, + { + id: 4, + name: undefined, + age: 25, + active: false, + }, + ], + }, + }; + + await insertOperation(tables, pool); + + expect(querySpy).toHaveBeenCalledTimes(1); + expect(querySpy).toHaveBeenCalledWith( + 'INSERT INTO [users] ([id], [name], [age], [active]) VALUES (@r0v0, @r0v1, @r0v2, @r0v3), (@r1v0, @r1v1, @r1v2, @r1v3), (@r2v0, @r2v1, @r2v2, @r2v3);', + ); + assertParameters([ + [1, 'Sam', 31, false], + [3, 'Jon', null, true], + [4, null, 25, false], + ]); + }); + + it('should perform update operation', async () => { + const pool = configurePool({}); + const tables = { + users: { + 'name, age, active': [ + { + name: 'Greg', + age: 43, + active: 0, + updateKey: 'id', + id: 2, + }, + ], + }, + }; + + await updateOperation(tables, pool); + + expect(querySpy).toHaveBeenCalledTimes(1); + expect(querySpy).toHaveBeenCalledWith( + 'UPDATE [users] SET [name] = @v0, [age] = @v1, [active] = @v2 WHERE id = @condition;', + ); + assertParameters({ + v0: 'Greg', + v1: 43, + v2: 0, + condition: 2, + }); + }); + + it('should perform delete operation', async () => { + const pool = configurePool({}); + const tables = { + users: { + id: [ + { + json: { + id: 2, + }, + pairedItem: { + item: 0, + input: undefined, + }, + }, + ], + }, + }; + + await deleteOperation(tables, pool); + + expect(querySpy).toHaveBeenCalledTimes(1); + expect(querySpy).toHaveBeenCalledWith('DELETE FROM [users] WHERE [id] IN (@v0);'); + assertParameters({ v0: 2 }); + }); +});