Skip to content

Commit

Permalink
🏎️ Add bulk operations for Elasticsearch
Browse files Browse the repository at this point in the history
  • Loading branch information
pemontto committed Jul 4, 2024
1 parent cef1774 commit 5eb31c9
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 21 deletions.
151 changes: 134 additions & 17 deletions packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ import type {
INodeExecutionData,
INodeType,
INodeTypeDescription,
JsonObject,
} from 'n8n-workflow';
import { jsonParse } from 'n8n-workflow';
import { jsonParse, NodeApiError } from 'n8n-workflow';

import omit from 'lodash/omit';
import { elasticsearchApiRequest, elasticsearchApiRequestAllItems } from './GenericFunctions';
import {
elasticsearchApiRequest,
elasticsearchApiRequestAllItems,
elasticsearchBulkApiRequest,
} from './GenericFunctions';

import { documentFields, documentOperations, indexFields, indexOperations } from './descriptions';

Expand Down Expand Up @@ -68,12 +73,14 @@ export class Elasticsearch implements INodeType {

let responseData;

let bulkBody: IDataObject = {};

for (let i = 0; i < items.length; i++) {
const bulkOperation = this.getNodeParameter('options.bulkOperation', i, false);
if (resource === 'document') {
// **********************************************************************
// document
// **********************************************************************

if (operation === 'delete') {
// ----------------------------------------
// document: delete
Expand All @@ -84,8 +91,17 @@ export class Elasticsearch implements INodeType {
const indexId = this.getNodeParameter('indexId', i);
const documentId = this.getNodeParameter('documentId', i);

const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'DELETE', endpoint);
if (bulkOperation) {
bulkBody[i] = JSON.stringify({
delete: {
_index: indexId,
_id: documentId,
},
});
} else {
const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'DELETE', endpoint);
}
} else if (operation === 'get') {
// ----------------------------------------
// document: get
Expand Down Expand Up @@ -223,12 +239,22 @@ export class Elasticsearch implements INodeType {
const indexId = this.getNodeParameter('indexId', i);
const { documentId } = additionalFields;

if (documentId) {
const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body);
if (bulkOperation) {
bulkBody[i] = JSON.stringify({
index: {
_index: indexId,
_id: documentId,
},
});
bulkBody[i] += '\n' + JSON.stringify(body);
} else {
const endpoint = `/${indexId}/_doc`;
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
if (documentId) {
const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body);
} else {
const endpoint = `/${indexId}/_doc`;
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
}
}
} else if (operation === 'update') {
// ----------------------------------------
Expand Down Expand Up @@ -261,7 +287,17 @@ export class Elasticsearch implements INodeType {
const documentId = this.getNodeParameter('documentId', i);

const endpoint = `/${indexId}/_update/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
if (bulkOperation) {
bulkBody[i] = JSON.stringify({
update: {
_index: indexId,
_id: documentId,
},
});
bulkBody[i] += '\n' + JSON.stringify(body);
} else {
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
}
}
} else if (resource === 'index') {
// **********************************************************************
Expand Down Expand Up @@ -341,13 +377,94 @@ export class Elasticsearch implements INodeType {
}
}
}
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(responseData as IDataObject[]),
{ itemData: { item: i } },
);
returnData.push(...executionData);
}

if (!bulkOperation) {
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(responseData as IDataObject[]),
{ itemData: { item: i } },
);
returnData.push(...executionData);
}
if (Object.keys(bulkBody).length >= 50) {
responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[];
// Enumerate the response data items and pair to the key of the bulkBody
for (let j = 0; j < Object.keys(responseData).length; j++) {
const itemData = {
...(responseData[j].index as IDataObject),
...(responseData[j].update as IDataObject),
...(responseData[j].create as IDataObject),
...(responseData[j].delete as IDataObject),
};
if (itemData.error) {
const errorData = itemData.error as IDataObject;
const message = errorData.type as string;
const description = errorData.reason as string;
const itemIndex = parseInt(Object.keys(bulkBody)[j]);

if (this.continueOnFail()) {
returnData.push(
...this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray({ error: message, message: itemData.error }),
{ itemData: { item: itemIndex } },
),
);
continue;
} else {
throw new NodeApiError(this.getNode(), {
message,
description,
itemIndex,
} as JsonObject);
}
}
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(itemData),
{ itemData: { item: parseInt(Object.keys(bulkBody)[j]) } },
);
returnData.push(...executionData);
}
bulkBody = {};
}
}
if (Object.keys(bulkBody).length) {
responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[];
// Enumerate the response data items and pair to the key of the bulkBody
for (let j = 0; j < Object.keys(responseData).length; j++) {
const itemData = {
...(responseData[j].index as IDataObject),
...(responseData[j].update as IDataObject),
...(responseData[j].create as IDataObject),
...(responseData[j].delete as IDataObject),
};
if (itemData.error) {
const errorData = itemData.error as IDataObject;
const message = errorData.type as string;
const description = errorData.reason as string;
const itemIndex = parseInt(Object.keys(bulkBody)[j]);

if (this.continueOnFail()) {
returnData.push(
...this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray({ error: message, message: itemData.error }),
{ itemData: { item: itemIndex } },
),
);
continue;
} else {
throw new NodeApiError(this.getNode(), {
message,
description,
itemIndex,
} as JsonObject);
}
}
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(itemData),
{ itemData: { item: parseInt(Object.keys(bulkBody)[j]) } },
);
returnData.push(...executionData);
}
}
return [returnData];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,40 @@ import type {
IExecuteFunctions,
IDataObject,
JsonObject,
IRequestOptions,
IHttpRequestOptions,
IHttpRequestMethods,
} from 'n8n-workflow';
import { NodeApiError } from 'n8n-workflow';

import type { ElasticsearchApiCredentials } from './types';

export async function elasticsearchBulkApiRequest(this: IExecuteFunctions, body: IDataObject) {
const { baseUrl, ignoreSSLIssues } = (await this.getCredentials(
'elasticsearchApi',
)) as ElasticsearchApiCredentials;

const bulkBody = Object.values(body).flat().join('\n') + '\n';

const options: IHttpRequestOptions = {
method: 'POST',
headers: { 'Content-Type': 'application/x-ndjson' },
body: bulkBody,
url: `${baseUrl}/_bulk`,
skipSslCertificateValidation: ignoreSSLIssues,
returnFullResponse: true,
};
try {
const response = await this.helpers.requestWithAuthentication.call(
this,
'elasticsearchApi',
options,
);
return JSON.parse(response).items;
} catch (error) {
throw new NodeApiError(this.getNode(), error as JsonObject);
}
}

export async function elasticsearchApiRequest(
this: IExecuteFunctions,
method: IHttpRequestMethods,
Expand All @@ -20,13 +47,13 @@ export async function elasticsearchApiRequest(
'elasticsearchApi',
)) as ElasticsearchApiCredentials;

const options: IRequestOptions = {
const options: IHttpRequestOptions = {
method,
body,
qs,
uri: `${baseUrl}${endpoint}`,
url: `${baseUrl}${endpoint}`,
json: true,
rejectUnauthorized: !ignoreSSLIssues,
skipSslCertificateValidation: ignoreSSLIssues,
};

if (!Object.keys(body).length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,28 @@ export const documentFields: INodeProperties[] = [
},
},
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
displayOptions: {
show: {
resource: ['document'],
operation: ['delete'],
},
},
options: [
{
displayName: 'Bulk Delete',
name: 'bulkOperation',
type: 'boolean',
default: false,
description: 'Whether to use the bulk operation to delete the document/s',
},
],
},

// ----------------------------------------
// document: get
Expand Down Expand Up @@ -644,6 +666,13 @@ export const documentFields: INodeProperties[] = [
},
},
options: [
{
displayName: 'Bulk Create',
name: 'bulkOperation',
type: 'boolean',
default: false,
description: 'Whether to use the bulk operation to create the document/s',
},
{
displayName: 'Pipeline ID',
name: 'pipeline',
Expand Down Expand Up @@ -802,6 +831,13 @@ export const documentFields: INodeProperties[] = [
},
},
options: [
{
displayName: 'Bulk Update',
name: 'bulkOperation',
type: 'boolean',
default: false,
description: 'Whether to use the bulk operation to update the document/s',
},
{
displayName: 'Refresh',
name: 'refresh',
Expand Down

0 comments on commit 5eb31c9

Please sign in to comment.