Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Elasticsearch Node): Add bulk operations for Elasticsearch #9940

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 120 additions & 17 deletions packages/nodes-base/nodes/Elastic/Elasticsearch/Elasticsearch.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ import type {
INodeExecutionData,
INodeType,
INodeTypeDescription,
JsonObject,
} from 'n8n-workflow';
import { jsonParse } from 'n8n-workflow';
import { jsonParse, NodeApiError } from 'n8n-workflow';

import omit from 'lodash/omit';
import { elasticsearchApiRequest, elasticsearchApiRequestAllItems } from './GenericFunctions';
import {
elasticsearchApiRequest,
elasticsearchApiRequestAllItems,
elasticsearchBulkApiRequest,
} from './GenericFunctions';

import { documentFields, documentOperations, indexFields, indexOperations } from './descriptions';

Expand Down Expand Up @@ -68,12 +73,14 @@ export class Elasticsearch implements INodeType {

let responseData;

let bulkBody: IDataObject = {};

for (let i = 0; i < items.length; i++) {
const bulkOperation = this.getNodeParameter('options.bulkOperation', i, false);
if (resource === 'document') {
// **********************************************************************
// document
// **********************************************************************

if (operation === 'delete') {
// ----------------------------------------
// document: delete
Expand All @@ -84,8 +91,17 @@ export class Elasticsearch implements INodeType {
const indexId = this.getNodeParameter('indexId', i);
const documentId = this.getNodeParameter('documentId', i);

const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'DELETE', endpoint);
if (bulkOperation) {
bulkBody[i] = JSON.stringify({
delete: {
_index: indexId,
_id: documentId,
},
});
} else {
const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'DELETE', endpoint);
}
} else if (operation === 'get') {
// ----------------------------------------
// document: get
Expand Down Expand Up @@ -223,12 +239,22 @@ export class Elasticsearch implements INodeType {
const indexId = this.getNodeParameter('indexId', i);
const { documentId } = additionalFields;

if (documentId) {
const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body);
if (bulkOperation) {
bulkBody[i] = JSON.stringify({
index: {
_index: indexId,
_id: documentId,
},
});
bulkBody[i] += `\n${JSON.stringify(body)}`;
} else {
const endpoint = `/${indexId}/_doc`;
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
if (documentId) {
const endpoint = `/${indexId}/_doc/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'PUT', endpoint, body);
} else {
const endpoint = `/${indexId}/_doc`;
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
}
}
} else if (operation === 'update') {
// ----------------------------------------
Expand Down Expand Up @@ -261,7 +287,17 @@ export class Elasticsearch implements INodeType {
const documentId = this.getNodeParameter('documentId', i);

const endpoint = `/${indexId}/_update/${documentId}`;
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
if (bulkOperation) {
bulkBody[i] = JSON.stringify({
update: {
_index: indexId,
_id: documentId,
},
});
bulkBody[i] += `\n${JSON.stringify(body)}`;
} else {
responseData = await elasticsearchApiRequest.call(this, 'POST', endpoint, body);
}
}
} else if (resource === 'index') {
// **********************************************************************
Expand Down Expand Up @@ -341,13 +377,80 @@ export class Elasticsearch implements INodeType {
}
}
}
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(responseData as IDataObject[]),
{ itemData: { item: i } },
);
returnData.push(...executionData);
}

if (!bulkOperation) {
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(responseData as IDataObject[]),
{ itemData: { item: i } },
);
returnData.push(...executionData);
}
if (Object.keys(bulkBody).length >= 50) {
responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[];
for (let j = 0; j < responseData.length; j++) {
const itemData = responseData[j];
if (itemData.error) {
const errorData = itemData.error as IDataObject;
const message = errorData.type as string;
const description = errorData.reason as string;
const itemIndex = parseInt(Object.keys(bulkBody)[j]);
if (this.continueOnFail()) {
returnData.push(
...this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray({ error: message, message: itemData.error }),
{ itemData: { item: itemIndex } },
),
);
continue;
} else {
throw new NodeApiError(this.getNode(), {
message,
description,
itemIndex,
} as JsonObject);
}
}
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(itemData),
{ itemData: { item: parseInt(Object.keys(bulkBody)[j]) } },
);
returnData.push(...executionData);
}
bulkBody = {};
}
}
if (Object.keys(bulkBody).length) {
responseData = (await elasticsearchBulkApiRequest.call(this, bulkBody)) as IDataObject[];
for (let j = 0; j < responseData.length; j++) {
const itemData = responseData[j];
if (itemData.error) {
const errorData = itemData.error as IDataObject;
const message = errorData.type as string;
const description = errorData.reason as string;
const itemIndex = parseInt(Object.keys(bulkBody)[j]);
if (this.continueOnFail()) {
returnData.push(
...this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray({ error: message, message: itemData.error }),
{ itemData: { item: itemIndex } },
),
);
continue;
} else {
throw new NodeApiError(this.getNode(), {
message,
description,
itemIndex,
} as JsonObject);
}
}
const executionData = this.helpers.constructExecutionMetaData(
this.helpers.returnJsonArray(itemData),
{ itemData: { item: parseInt(Object.keys(bulkBody)[j]) } },
);
returnData.push(...executionData);
}
}
return [returnData];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,55 @@ import type {
IExecuteFunctions,
IDataObject,
JsonObject,
IRequestOptions,
IHttpRequestOptions,
IHttpRequestMethods,
} from 'n8n-workflow';
import { NodeApiError } from 'n8n-workflow';

import type { ElasticsearchApiCredentials } from './types';

export async function elasticsearchBulkApiRequest(this: IExecuteFunctions, body: IDataObject) {
const { baseUrl, ignoreSSLIssues } = (await this.getCredentials(
'elasticsearchApi',
)) as ElasticsearchApiCredentials;

const bulkBody = Object.values(body).flat().join('\n') + '\n';

const options: IHttpRequestOptions = {
method: 'POST',
headers: { 'Content-Type': 'application/x-ndjson' },
body: bulkBody,
url: `${baseUrl}/_bulk`,
skipSslCertificateValidation: ignoreSSLIssues,
returnFullResponse: true,
ignoreHttpStatusErrors: true,
};

const response = await this.helpers.httpRequestWithAuthentication.call(
this,
'elasticsearchApi',
options,
);

if (response.statusCode > 299) {
if (this.continueOnFail()) {
return Object.values(body).map((_) => ({ error: response.body.error }));
} else {
throw new NodeApiError(this.getNode(), { error: response.body.error } as JsonObject);
}
}

return response.body.items.map((item: IDataObject) => {
return {
...(item.index as IDataObject),
...(item.update as IDataObject),
...(item.create as IDataObject),
...(item.delete as IDataObject),
...(item.error as IDataObject),
};
});
}

export async function elasticsearchApiRequest(
this: IExecuteFunctions,
method: IHttpRequestMethods,
Expand All @@ -20,13 +62,13 @@ export async function elasticsearchApiRequest(
'elasticsearchApi',
)) as ElasticsearchApiCredentials;

const options: IRequestOptions = {
const options: IHttpRequestOptions = {
method,
body,
qs,
uri: `${baseUrl}${endpoint}`,
url: `${baseUrl}${endpoint}`,
json: true,
rejectUnauthorized: !ignoreSSLIssues,
skipSslCertificateValidation: ignoreSSLIssues,
pemontto marked this conversation as resolved.
Show resolved Hide resolved
};

if (!Object.keys(body).length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,28 @@ export const documentFields: INodeProperties[] = [
},
},
},
{
displayName: 'Options',
name: 'options',
type: 'collection',
placeholder: 'Add Option',
default: {},
displayOptions: {
show: {
resource: ['document'],
operation: ['delete'],
},
},
options: [
{
displayName: 'Bulk Delete',
name: 'bulkOperation',
type: 'boolean',
default: false,
description: 'Whether to use the bulk operation to delete the document/s',
},
],
},

// ----------------------------------------
// document: get
Expand Down Expand Up @@ -644,6 +666,13 @@ export const documentFields: INodeProperties[] = [
},
},
options: [
{
displayName: 'Bulk Create',
name: 'bulkOperation',
type: 'boolean',
default: false,
description: 'Whether to use the bulk operation to create the document/s',
},
{
displayName: 'Pipeline ID',
name: 'pipeline',
Expand Down Expand Up @@ -802,6 +831,13 @@ export const documentFields: INodeProperties[] = [
},
},
options: [
{
displayName: 'Bulk Update',
name: 'bulkOperation',
type: 'boolean',
default: false,
description: 'Whether to use the bulk operation to update the document/s',
},
{
displayName: 'Refresh',
name: 'refresh',
Expand Down