diff --git a/libs/cms/src/lib/search/contentful.service.ts b/libs/cms/src/lib/search/contentful.service.ts index 52c92df3645a..5d015de20bd9 100644 --- a/libs/cms/src/lib/search/contentful.service.ts +++ b/libs/cms/src/lib/search/contentful.service.ts @@ -31,6 +31,9 @@ type SyncCollection = ContentfulSyncCollection & { const MAX_REQUEST_COUNT = 10 +const MAX_RETRY_COUNT = 3 +const INITIAL_DELAY = 500 + // Taken from here: https://github.com/contentful/contentful-sdk-core/blob/054328ba2d0df364a5f1ce6d164c5018efb63572/lib/create-http-client.js#L34-L42 // eslint-disable-next-line @typescript-eslint/no-explicit-any const defaultContentfulClientLogging = (level: ClientLogLevel, data: any) => { @@ -483,37 +486,61 @@ export class ContentfulService { let response: ApiResponse> | null = null let total = -1 + let delay = INITIAL_DELAY + let retries = MAX_RETRY_COUNT + while (response === null || items.length < total) { - response = await this.elasticService.findByQuery(elasticIndex, { - query: { - bool: { - should: idsChunk.map((id) => ({ - nested: { - path: 'tags', - query: { - bool: { - must: [ - { - term: { - 'tags.key': id, + try { + response = await this.elasticService.findByQuery(elasticIndex, { + query: { + bool: { + should: idsChunk.map((id) => ({ + nested: { + path: 'tags', + query: { + bool: { + must: [ + { + term: { + 'tags.key': id, + }, }, - }, - { - term: { - 'tags.type': 'hasChildEntryWithId', + { + term: { + 'tags.type': 'hasChildEntryWithId', + }, }, - }, - ], + ], + }, }, }, - }, - })), - minimum_should_match: 1, + })), + minimum_should_match: 1, + }, }, - }, - size, - from: (page - 1) * size, - }) + size, + from: (page - 1) * size, + }) + // Reset variables in case we successfully receive a response + delay = INITIAL_DELAY + retries = MAX_RETRY_COUNT + } catch (error) { + if (error?.statusCode === 429 && retries > 0) { + logger.info('Retrying nested resolution request...', { + retriesLeft: retries - 1, + delay, + }) + await new Promise((resolve) => { + setTimeout(resolve, delay) + }) + // Keep track of how often and for how long we should wait in case of failure + retries -= 1 + delay *= 2 + continue + } else { + throw error + } + } if (response.body.hits.hits.length === 0) { total = response.body.hits.total.value diff --git a/libs/content-search-toolkit/src/services/elastic.service.ts b/libs/content-search-toolkit/src/services/elastic.service.ts index 93f0fdf9436a..79864f72fac7 100644 --- a/libs/content-search-toolkit/src/services/elastic.service.ts +++ b/libs/content-search-toolkit/src/services/elastic.service.ts @@ -42,6 +42,9 @@ type RankResultMap = Record> const { elastic } = environment +const INITIAL_DELAY = 500 +const MAX_RETRY_COUNT = 3 + @Injectable() export class ElasticService { private client: Client | null = null @@ -111,29 +114,50 @@ export class ElasticService { requests: Record[], refresh = false, ) { + const chunkSize = 14 + let delay = INITIAL_DELAY + let retries = MAX_RETRY_COUNT + try { const client = await this.getClient() // elasticsearch does not like big requests (above 5mb) so we limit the size to X entries just in case - let requestChunk = getValidBulkRequestChunk(requests, 10) + let requestChunk = getValidBulkRequestChunk(requests, chunkSize) while (requestChunk.length) { - // wait for request b4 continuing - const response = await client.bulk({ - index: index, - body: requestChunk, - refresh: refresh ? 'true' : undefined, - }) - - // not all errors are thrown log if the response has any errors - if (response.body.errors) { - // Filter HUGE request object - filterDoc(response) - logger.error('Failed to import some documents in bulk import', { - response, + try { + const response = await client.bulk({ + index: index, + body: requestChunk, + refresh: refresh ? 'true' : undefined, }) + + // not all errors are thrown log if the response has any errors + if (response.body.errors) { + // Filter HUGE request object + filterDoc(response) + logger.error('Failed to import some documents in bulk import', { + response, + }) + } + requestChunk = getValidBulkRequestChunk(requests, chunkSize) + delay = INITIAL_DELAY + retries = MAX_RETRY_COUNT + } catch (e) { + if (e?.statusCode === 429 && retries > 0) { + logger.info('Retrying Elasticsearch bulk request...', { + retriesLeft: retries - 1, + delay, + }) + await new Promise((resolve) => { + setTimeout(resolve, delay) + }) + delay *= 2 + retries -= 1 + } else { + throw e + } } - requestChunk = getValidBulkRequestChunk(requests) } return true