From 8d69027e750ee28a29a327ea0c435a845774cacc Mon Sep 17 00:00:00 2001 From: runarvestmann Date: Mon, 30 Sep 2024 15:35:24 +0000 Subject: [PATCH] Add retry logic to ES calls --- libs/cms/src/lib/search/contentful.service.ts | 4 ++ .../src/services/elastic.service.ts | 54 +++++++++++++------ 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/libs/cms/src/lib/search/contentful.service.ts b/libs/cms/src/lib/search/contentful.service.ts index 2066d7a51504..5d015de20bd9 100644 --- a/libs/cms/src/lib/search/contentful.service.ts +++ b/libs/cms/src/lib/search/contentful.service.ts @@ -526,6 +526,10 @@ export class ContentfulService { retries = MAX_RETRY_COUNT } catch (error) { if (error?.statusCode === 429 && retries > 0) { + logger.info('Retrying nested resolution request...', { + retriesLeft: retries - 1, + delay, + }) await new Promise((resolve) => { setTimeout(resolve, delay) }) diff --git a/libs/content-search-toolkit/src/services/elastic.service.ts b/libs/content-search-toolkit/src/services/elastic.service.ts index 93f0fdf9436a..79864f72fac7 100644 --- a/libs/content-search-toolkit/src/services/elastic.service.ts +++ b/libs/content-search-toolkit/src/services/elastic.service.ts @@ -42,6 +42,9 @@ type RankResultMap = Record> const { elastic } = environment +const INITIAL_DELAY = 500 +const MAX_RETRY_COUNT = 3 + @Injectable() export class ElasticService { private client: Client | null = null @@ -111,29 +114,50 @@ export class ElasticService { requests: Record[], refresh = false, ) { + const chunkSize = 14 + let delay = INITIAL_DELAY + let retries = MAX_RETRY_COUNT + try { const client = await this.getClient() // elasticsearch does not like big requests (above 5mb) so we limit the size to X entries just in case - let requestChunk = getValidBulkRequestChunk(requests, 10) + let requestChunk = getValidBulkRequestChunk(requests, chunkSize) while (requestChunk.length) { - // wait for request b4 continuing - const response = await client.bulk({ - index: index, - body: requestChunk, - refresh: refresh ? 'true' : undefined, - }) - - // not all errors are thrown log if the response has any errors - if (response.body.errors) { - // Filter HUGE request object - filterDoc(response) - logger.error('Failed to import some documents in bulk import', { - response, + try { + const response = await client.bulk({ + index: index, + body: requestChunk, + refresh: refresh ? 'true' : undefined, }) + + // not all errors are thrown log if the response has any errors + if (response.body.errors) { + // Filter HUGE request object + filterDoc(response) + logger.error('Failed to import some documents in bulk import', { + response, + }) + } + requestChunk = getValidBulkRequestChunk(requests, chunkSize) + delay = INITIAL_DELAY + retries = MAX_RETRY_COUNT + } catch (e) { + if (e?.statusCode === 429 && retries > 0) { + logger.info('Retrying Elasticsearch bulk request...', { + retriesLeft: retries - 1, + delay, + }) + await new Promise((resolve) => { + setTimeout(resolve, delay) + }) + delay *= 2 + retries -= 1 + } else { + throw e + } } - requestChunk = getValidBulkRequestChunk(requests) } return true