From db6228355bdf458e8d925cbe8d76b44a0598e9dd Mon Sep 17 00:00:00 2001 From: pemontto Date: Thu, 4 Jul 2024 17:34:32 +0100 Subject: [PATCH 1/2] =?UTF-8?q?=F0=9F=8F=8E=EF=B8=8F=20Add=20bulk=20operat?= =?UTF-8?q?ions=20for=20Elasticsearch?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Elasticsearch/Elasticsearch.node.ts | 151 ++++++++++++++++-- .../Elastic/Elasticsearch/GenericFunctions.ts | 35 +++- .../descriptions/DocumentDescription.ts | 36 +++++ 3 files changed, 201 insertions(+), 21 deletions(-) diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts index c209b84538bff..df616bb5d8feb 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts @@ -4,11 +4,16 @@ import type { INodeExecutionData, INodeType, INodeTypeDescription, + JsonObject, } from 'n8n-workflow'; -import { jsonParse } from 'n8n-workflow'; +import { jsonParse, NodeApiError } from 'n8n-workflow'; import omit from 'lodash/omit'; -import { elasticsearchApiRequest, elasticsearchApiRequestAllItems } from './GenericFunctions'; +import { + elasticsearchApiRequest, + elasticsearchApiRequestAllItems, + elasticsearchBulkApiRequest, +} from './GenericFunctions'; import { documentFields, documentOperations, indexFields, indexOperations } from './descriptions'; @@ -68,12 +73,14 @@ export class Elasticsearch implements INodeType { let responseData; + let bulkBody: IDataObject = {}; + for (let i = 0; i < items.length; i++) { + const bulkOperation = this.getNodeParameter('options.bulkOperation', i, false); if (resource === 'document') { // ********************************************************************** // document // ********************************************************************** - if (operation === 'delete') { // ---------------------------------------- // document: delete @@ -84,8 +91,17 @@ export class Elasticsearch implements INodeType { const indexId = this.getNodeParameter('indexId', i); const documentId = this.getNodeParameter('documentId', i); - const endpoint = `/${indexId}/_doc/${documentId}`; - responseData = await elasticsearchApiRequest.call(this, 'DELETE', endpoint); + if (bulkOperation) { + bulkBody[i] = JSON.stringify({ + delete: { + _index: indexId, + _id: documentId, + }, + }); + } else { + const endpoint = `/${indexId}/_doc/${documentId}`; + responseData = await elasticsearchApiRequest.call(this, 'DELETE', endpoint); + } } else if (operation === 'get') { // ---------------------------------------- // document: get @@ -223,12 +239,22 @@ export class Elasticsearch implements INodeType { const indexId = this.getNodeParameter('indexId', i); const { documentId } = additionalFields; - if (documentId) { - const endpoint = `/${indexId}/_doc/${documentId}`; - responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body); + if (bulkOperation) { + bulkBody[i] = JSON.stringify({ + index: { + _index: indexId, + _id: documentId, + }, + }); + bulkBody[i] += '\n' + JSON.stringify(body); } else { - const endpoint = `/${indexId}/_doc`; - responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); + if (documentId) { + const endpoint = `/${indexId}/_doc/${documentId}`; + responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body); + } else { + const endpoint = `/${indexId}/_doc`; + responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); + } } } else if (operation === 'update') { // ---------------------------------------- @@ -261,7 +287,17 @@ export class Elasticsearch implements INodeType { const documentId = this.getNodeParameter('documentId', i); const endpoint = `/${indexId}/_update/${documentId}`; - responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); + if (bulkOperation) { + bulkBody[i] = JSON.stringify({ + update: { + _index: indexId, + _id: documentId, + }, + }); + bulkBody[i] += '\n' + JSON.stringify(body); + } else { + responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); + } } } else if (resource === 'index') { // ********************************************************************** @@ -341,13 +377,94 @@ export class Elasticsearch implements INodeType { } } } - const executionData = this.helpers.constructExecutionMetaData( - this.helpers.returnJsonArray(responseData as IDataObject[]), - { itemData: { item: i } }, - ); - returnData.push(...executionData); - } + if (!bulkOperation) { + const executionData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray(responseData as IDataObject[]), + { itemData: { item: i } }, + ); + returnData.push(...executionData); + } + if (Object.keys(bulkBody).length >= 50) { + responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[]; + // Enumerate the response data items and pair to the key of the bulkBody + for (let j = 0; j < Object.keys(responseData).length; j++) { + const itemData = { + ...(responseData[j].index as IDataObject), + ...(responseData[j].update as IDataObject), + ...(responseData[j].create as IDataObject), + ...(responseData[j].delete as IDataObject), + }; + if (itemData.error) { + const errorData = itemData.error as IDataObject; + const message = errorData.type as string; + const description = errorData.reason as string; + const itemIndex = parseInt(Object.keys(bulkBody)[j]); + + if (this.continueOnFail()) { + returnData.push( + ...this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray({ error: message, message: itemData.error }), + { itemData: { item: itemIndex } }, + ), + ); + continue; + } else { + throw new NodeApiError(this.getNode(), { + message, + description, + itemIndex, + } as JsonObject); + } + } + const executionData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray(itemData), + { itemData: { item: parseInt(Object.keys(bulkBody)[j]) } }, + ); + returnData.push(...executionData); + } + bulkBody = {}; + } + } + if (Object.keys(bulkBody).length) { + responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[]; + // Enumerate the response data items and pair to the key of the bulkBody + for (let j = 0; j < Object.keys(responseData).length; j++) { + const itemData = { + ...(responseData[j].index as IDataObject), + ...(responseData[j].update as IDataObject), + ...(responseData[j].create as IDataObject), + ...(responseData[j].delete as IDataObject), + }; + if (itemData.error) { + const errorData = itemData.error as IDataObject; + const message = errorData.type as string; + const description = errorData.reason as string; + const itemIndex = parseInt(Object.keys(bulkBody)[j]); + + if (this.continueOnFail()) { + returnData.push( + ...this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray({ error: message, message: itemData.error }), + { itemData: { item: itemIndex } }, + ), + ); + continue; + } else { + throw new NodeApiError(this.getNode(), { + message, + description, + itemIndex, + } as JsonObject); + } + } + const executionData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray(itemData), + { itemData: { item: parseInt(Object.keys(bulkBody)[j]) } }, + ); + returnData.push(...executionData); + } + } return [returnData]; } } diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts index ed013079f8cc6..b510353d4f6f1 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts @@ -2,13 +2,40 @@ import type { IExecuteFunctions, IDataObject, JsonObject, - IRequestOptions, + IHttpRequestOptions, IHttpRequestMethods, } from 'n8n-workflow'; import { NodeApiError } from 'n8n-workflow'; import type { ElasticsearchApiCredentials } from './types'; +export async function elasticsearchBulkApiRequest(this: IExecuteFunctions, body: IDataObject) { + const { baseUrl, ignoreSSLIssues } = (await this.getCredentials( + 'elasticsearchApi', + )) as ElasticsearchApiCredentials; + + const bulkBody = Object.values(body).flat().join('\n') + '\n'; + + const options: IHttpRequestOptions = { + method: 'POST', + headers: { 'Content-Type': 'application/x-ndjson' }, + body: bulkBody, + url: `${baseUrl}/_bulk`, + skipSslCertificateValidation: ignoreSSLIssues, + returnFullResponse: true, + }; + try { + const response = await this.helpers.requestWithAuthentication.call( + this, + 'elasticsearchApi', + options, + ); + return JSON.parse(response).items; + } catch (error) { + throw new NodeApiError(this.getNode(), error as JsonObject); + } +} + export async function elasticsearchApiRequest( this: IExecuteFunctions, method: IHttpRequestMethods, @@ -20,13 +47,13 @@ export async function elasticsearchApiRequest( 'elasticsearchApi', )) as ElasticsearchApiCredentials; - const options: IRequestOptions = { + const options: IHttpRequestOptions = { method, body, qs, - uri: `${baseUrl}${endpoint}`, + url: `${baseUrl}${endpoint}`, json: true, - rejectUnauthorized: !ignoreSSLIssues, + skipSslCertificateValidation: ignoreSSLIssues, }; if (!Object.keys(body).length) { diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts index 0e07cecd4ea64..468da09ffec2b 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/descriptions/DocumentDescription.ts @@ -81,6 +81,28 @@ export const documentFields: INodeProperties[] = [ }, }, }, + { + displayName: 'Options', + name: 'options', + type: 'collection', + placeholder: 'Add Option', + default: {}, + displayOptions: { + show: { + resource: ['document'], + operation: ['delete'], + }, + }, + options: [ + { + displayName: 'Bulk Delete', + name: 'bulkOperation', + type: 'boolean', + default: false, + description: 'Whether to use the bulk operation to delete the document/s', + }, + ], + }, // ---------------------------------------- // document: get @@ -644,6 +666,13 @@ export const documentFields: INodeProperties[] = [ }, }, options: [ + { + displayName: 'Bulk Create', + name: 'bulkOperation', + type: 'boolean', + default: false, + description: 'Whether to use the bulk operation to create the document/s', + }, { displayName: 'Pipeline ID', name: 'pipeline', @@ -802,6 +831,13 @@ export const documentFields: INodeProperties[] = [ }, }, options: [ + { + displayName: 'Bulk Update', + name: 'bulkOperation', + type: 'boolean', + default: false, + description: 'Whether to use the bulk operation to update the document/s', + }, { displayName: 'Refresh', name: 'refresh', From fc76c5d2ab470955ca935550cad222f333f46ffd Mon Sep 17 00:00:00 2001 From: pemontto Date: Fri, 5 Jul 2024 10:52:05 +0100 Subject: [PATCH 2/2] =?UTF-8?q?=F0=9F=A5=85=20Better=20handle=20errors=20f?= =?UTF-8?q?rom=20the=20=5Fbulk=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Elasticsearch/Elasticsearch.node.ts | 26 ++++----------- .../Elastic/Elasticsearch/GenericFunctions.ts | 33 ++++++++++++++----- 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts index df616bb5d8feb..d94ac77725464 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts @@ -246,7 +246,7 @@ export class Elasticsearch implements INodeType { _id: documentId, }, }); - bulkBody[i] += '\n' + JSON.stringify(body); + bulkBody[i] += `\n${JSON.stringify(body)}`; } else { if (documentId) { const endpoint = `/${indexId}/_doc/${documentId}`; @@ -294,7 +294,7 @@ export class Elasticsearch implements INodeType { _id: documentId, }, }); - bulkBody[i] += '\n' + JSON.stringify(body); + bulkBody[i] += `\n${JSON.stringify(body)}`; } else { responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); } @@ -387,20 +387,13 @@ export class Elasticsearch implements INodeType { } if (Object.keys(bulkBody).length >= 50) { responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[]; - // Enumerate the response data items and pair to the key of the bulkBody - for (let j = 0; j < Object.keys(responseData).length; j++) { - const itemData = { - ...(responseData[j].index as IDataObject), - ...(responseData[j].update as IDataObject), - ...(responseData[j].create as IDataObject), - ...(responseData[j].delete as IDataObject), - }; + for (let j = 0; j < responseData.length; j++) { + const itemData = responseData[j]; if (itemData.error) { const errorData = itemData.error as IDataObject; const message = errorData.type as string; const description = errorData.reason as string; const itemIndex = parseInt(Object.keys(bulkBody)[j]); - if (this.continueOnFail()) { returnData.push( ...this.helpers.constructExecutionMetaData( @@ -428,20 +421,13 @@ export class Elasticsearch implements INodeType { } if (Object.keys(bulkBody).length) { responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[]; - // Enumerate the response data items and pair to the key of the bulkBody - for (let j = 0; j < Object.keys(responseData).length; j++) { - const itemData = { - ...(responseData[j].index as IDataObject), - ...(responseData[j].update as IDataObject), - ...(responseData[j].create as IDataObject), - ...(responseData[j].delete as IDataObject), - }; + for (let j = 0; j < responseData.length; j++) { + const itemData = responseData[j]; if (itemData.error) { const errorData = itemData.error as IDataObject; const message = errorData.type as string; const description = errorData.reason as string; const itemIndex = parseInt(Object.keys(bulkBody)[j]); - if (this.continueOnFail()) { returnData.push( ...this.helpers.constructExecutionMetaData( diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts index b510353d4f6f1..642ce1484fe4f 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts @@ -23,17 +23,32 @@ export async function elasticsearchBulkApiRequest(this: IExecuteFunctions, body: url: `${baseUrl}/_bulk`, skipSslCertificateValidation: ignoreSSLIssues, returnFullResponse: true, + ignoreHttpStatusErrors: true, }; - try { - const response = await this.helpers.requestWithAuthentication.call( - this, - 'elasticsearchApi', - options, - ); - return JSON.parse(response).items; - } catch (error) { - throw new NodeApiError(this.getNode(), error as JsonObject); + + const response = await this.helpers.httpRequestWithAuthentication.call( + this, + 'elasticsearchApi', + options, + ); + + if (response.statusCode > 299) { + if (this.continueOnFail()) { + return Object.values(body).map((_) => ({ error: response.body.error })); + } else { + throw new NodeApiError(this.getNode(), { error: response.body.error } as JsonObject); + } } + + return response.body.items.map((item: IDataObject) => { + return { + ...(item.index as IDataObject), + ...(item.update as IDataObject), + ...(item.create as IDataObject), + ...(item.delete as IDataObject), + ...(item.error as IDataObject), + }; + }); } export async function elasticsearchApiRequest(