From fc76c5d2ab470955ca935550cad222f333f46ffd Mon Sep 17 00:00:00 2001 From: pemontto Date: Fri, 5 Jul 2024 10:52:05 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=A5=85=20Better=20handle=20errors=20from?= =?UTF-8?q?=20the=20=5Fbulk=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Elasticsearch/Elasticsearch.node.ts | 26 ++++----------- .../Elastic/Elasticsearch/GenericFunctions.ts | 33 ++++++++++++++----- 2 files changed, 30 insertions(+), 29 deletions(-) diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts index df616bb5d8feb..d94ac77725464 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts @@ -246,7 +246,7 @@ export class Elasticsearch implements INodeType { _id: documentId, }, }); - bulkBody[i] += '\n' + JSON.stringify(body); + bulkBody[i] += `\n${JSON.stringify(body)}`; } else { if (documentId) { const endpoint = `/${indexId}/_doc/${documentId}`; @@ -294,7 +294,7 @@ export class Elasticsearch implements INodeType { _id: documentId, }, }); - bulkBody[i] += '\n' + JSON.stringify(body); + bulkBody[i] += `\n${JSON.stringify(body)}`; } else { responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body); } @@ -387,20 +387,13 @@ export class Elasticsearch implements INodeType { } if (Object.keys(bulkBody).length >= 50) { responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[]; - // Enumerate the response data items and pair to the key of the bulkBody - for (let j = 0; j < Object.keys(responseData).length; j++) { - const itemData = { - ...(responseData[j].index as IDataObject), - ...(responseData[j].update as IDataObject), - ...(responseData[j].create as IDataObject), - ...(responseData[j].delete as IDataObject), - }; + for (let j = 0; j < responseData.length; j++) { + const itemData = responseData[j]; if (itemData.error) { const errorData = itemData.error as IDataObject; const message = errorData.type as string; const description = errorData.reason as string; const itemIndex = parseInt(Object.keys(bulkBody)[j]); - if (this.continueOnFail()) { returnData.push( ...this.helpers.constructExecutionMetaData( @@ -428,20 +421,13 @@ export class Elasticsearch implements INodeType { } if (Object.keys(bulkBody).length) { responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[]; - // Enumerate the response data items and pair to the key of the bulkBody - for (let j = 0; j < Object.keys(responseData).length; j++) { - const itemData = { - ...(responseData[j].index as IDataObject), - ...(responseData[j].update as IDataObject), - ...(responseData[j].create as IDataObject), - ...(responseData[j].delete as IDataObject), - }; + for (let j = 0; j < responseData.length; j++) { + const itemData = responseData[j]; if (itemData.error) { const errorData = itemData.error as IDataObject; const message = errorData.type as string; const description = errorData.reason as string; const itemIndex = parseInt(Object.keys(bulkBody)[j]); - if (this.continueOnFail()) { returnData.push( ...this.helpers.constructExecutionMetaData( diff --git a/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts b/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts index b510353d4f6f1..642ce1484fe4f 100644 --- a/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts +++ b/packages/nodes-base/nodes/Elastic/Elasticsearch/GenericFunctions.ts @@ -23,17 +23,32 @@ export async function elasticsearchBulkApiRequest(this: IExecuteFunctions, body: url: `${baseUrl}/_bulk`, skipSslCertificateValidation: ignoreSSLIssues, returnFullResponse: true, + ignoreHttpStatusErrors: true, }; - try { - const response = await this.helpers.requestWithAuthentication.call( - this, - 'elasticsearchApi', - options, - ); - return JSON.parse(response).items; - } catch (error) { - throw new NodeApiError(this.getNode(), error as JsonObject); + + const response = await this.helpers.httpRequestWithAuthentication.call( + this, + 'elasticsearchApi', + options, + ); + + if (response.statusCode > 299) { + if (this.continueOnFail()) { + return Object.values(body).map((_) => ({ error: response.body.error })); + } else { + throw new NodeApiError(this.getNode(), { error: response.body.error } as JsonObject); + } } + + return response.body.items.map((item: IDataObject) => { + return { + ...(item.index as IDataObject), + ...(item.update as IDataObject), + ...(item.create as IDataObject), + ...(item.delete as IDataObject), + ...(item.error as IDataObject), + }; + }); } export async function elasticsearchApiRequest(