Skip to content

Commit

Permalink
Add retry logic to ES calls
Browse files Browse the repository at this point in the history
  • Loading branch information
RunarVestmann committed Sep 30, 2024
1 parent b5592de commit 8d69027
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
4 changes: 4 additions & 0 deletions libs/cms/src/lib/search/contentful.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,10 @@ export class ContentfulService {
retries = MAX_RETRY_COUNT
} catch (error) {
if (error?.statusCode === 429 && retries > 0) {
logger.info('Retrying nested resolution request...', {
retriesLeft: retries - 1,
delay,
})
await new Promise((resolve) => {
setTimeout(resolve, delay)
})
Expand Down
54 changes: 39 additions & 15 deletions libs/content-search-toolkit/src/services/elastic.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type RankResultMap<T extends string> = Record<string, RankEvaluationResponse<T>>

const { elastic } = environment

const INITIAL_DELAY = 500
const MAX_RETRY_COUNT = 3

@Injectable()
export class ElasticService {
private client: Client | null = null
Expand Down Expand Up @@ -111,29 +114,50 @@ export class ElasticService {
requests: Record<string, unknown>[],
refresh = false,
) {
const chunkSize = 14
let delay = INITIAL_DELAY
let retries = MAX_RETRY_COUNT

try {
const client = await this.getClient()

// elasticsearch does not like big requests (above 5mb) so we limit the size to X entries just in case
let requestChunk = getValidBulkRequestChunk(requests, 10)
let requestChunk = getValidBulkRequestChunk(requests, chunkSize)

while (requestChunk.length) {
// wait for request b4 continuing
const response = await client.bulk({
index: index,
body: requestChunk,
refresh: refresh ? 'true' : undefined,
})

// not all errors are thrown log if the response has any errors
if (response.body.errors) {
// Filter HUGE request object
filterDoc(response)
logger.error('Failed to import some documents in bulk import', {
response,
try {
const response = await client.bulk({
index: index,
body: requestChunk,
refresh: refresh ? 'true' : undefined,
})

// not all errors are thrown log if the response has any errors
if (response.body.errors) {
// Filter HUGE request object
filterDoc(response)
logger.error('Failed to import some documents in bulk import', {
response,
})
}
requestChunk = getValidBulkRequestChunk(requests, chunkSize)
delay = INITIAL_DELAY
retries = MAX_RETRY_COUNT
} catch (e) {
if (e?.statusCode === 429 && retries > 0) {
logger.info('Retrying Elasticsearch bulk request...', {
retriesLeft: retries - 1,
delay,
})
await new Promise((resolve) => {
setTimeout(resolve, delay)
})
delay *= 2
retries -= 1
} else {
throw e
}
}
requestChunk = getValidBulkRequestChunk(requests)
}

return true
Expand Down

0 comments on commit 8d69027

Please sign in to comment.