Skip to content

Commit

Permalink
feat(search-indexer): Wait before doing another request to Elasticsea…
Browse files Browse the repository at this point in the history
…rch in case of 429 status code (#16215)

* Wait in case of 429 status code from ES

* Add constants

* Add retry logic to ES calls

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and thoreyjona committed Oct 2, 2024
1 parent d5fcb17 commit b073327
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 40 deletions.
77 changes: 52 additions & 25 deletions libs/cms/src/lib/search/contentful.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type SyncCollection = ContentfulSyncCollection & {

const MAX_REQUEST_COUNT = 10

const MAX_RETRY_COUNT = 3
const INITIAL_DELAY = 500

// Taken from here: https://github.com/contentful/contentful-sdk-core/blob/054328ba2d0df364a5f1ce6d164c5018efb63572/lib/create-http-client.js#L34-L42
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const defaultContentfulClientLogging = (level: ClientLogLevel, data: any) => {
Expand Down Expand Up @@ -483,37 +486,61 @@ export class ContentfulService {
let response: ApiResponse<SearchResponse<MappedData>> | null = null
let total = -1

let delay = INITIAL_DELAY
let retries = MAX_RETRY_COUNT

while (response === null || items.length < total) {
response = await this.elasticService.findByQuery(elasticIndex, {
query: {
bool: {
should: idsChunk.map((id) => ({
nested: {
path: 'tags',
query: {
bool: {
must: [
{
term: {
'tags.key': id,
try {
response = await this.elasticService.findByQuery(elasticIndex, {
query: {
bool: {
should: idsChunk.map((id) => ({
nested: {
path: 'tags',
query: {
bool: {
must: [
{
term: {
'tags.key': id,
},
},
},
{
term: {
'tags.type': 'hasChildEntryWithId',
{
term: {
'tags.type': 'hasChildEntryWithId',
},
},
},
],
],
},
},
},
},
})),
minimum_should_match: 1,
})),
minimum_should_match: 1,
},
},
},
size,
from: (page - 1) * size,
})
size,
from: (page - 1) * size,
})
// Reset variables in case we successfully receive a response
delay = INITIAL_DELAY
retries = MAX_RETRY_COUNT
} catch (error) {
if (error?.statusCode === 429 && retries > 0) {
logger.info('Retrying nested resolution request...', {
retriesLeft: retries - 1,
delay,
})
await new Promise((resolve) => {
setTimeout(resolve, delay)
})
// Keep track of how often and for how long we should wait in case of failure
retries -= 1
delay *= 2
continue
} else {
throw error
}
}

if (response.body.hits.hits.length === 0) {
total = response.body.hits.total.value
Expand Down
54 changes: 39 additions & 15 deletions libs/content-search-toolkit/src/services/elastic.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ type RankResultMap<T extends string> = Record<string, RankEvaluationResponse<T>>

const { elastic } = environment

const INITIAL_DELAY = 500
const MAX_RETRY_COUNT = 3

@Injectable()
export class ElasticService {
private client: Client | null = null
Expand Down Expand Up @@ -111,29 +114,50 @@ export class ElasticService {
requests: Record<string, unknown>[],
refresh = false,
) {
const chunkSize = 14
let delay = INITIAL_DELAY
let retries = MAX_RETRY_COUNT

try {
const client = await this.getClient()

// elasticsearch does not like big requests (above 5mb) so we limit the size to X entries just in case
let requestChunk = getValidBulkRequestChunk(requests, 10)
let requestChunk = getValidBulkRequestChunk(requests, chunkSize)

while (requestChunk.length) {
// wait for request b4 continuing
const response = await client.bulk({
index: index,
body: requestChunk,
refresh: refresh ? 'true' : undefined,
})

// not all errors are thrown log if the response has any errors
if (response.body.errors) {
// Filter HUGE request object
filterDoc(response)
logger.error('Failed to import some documents in bulk import', {
response,
try {
const response = await client.bulk({
index: index,
body: requestChunk,
refresh: refresh ? 'true' : undefined,
})

// not all errors are thrown log if the response has any errors
if (response.body.errors) {
// Filter HUGE request object
filterDoc(response)
logger.error('Failed to import some documents in bulk import', {
response,
})
}
requestChunk = getValidBulkRequestChunk(requests, chunkSize)
delay = INITIAL_DELAY
retries = MAX_RETRY_COUNT
} catch (e) {
if (e?.statusCode === 429 && retries > 0) {
logger.info('Retrying Elasticsearch bulk request...', {
retriesLeft: retries - 1,
delay,
})
await new Promise((resolve) => {
setTimeout(resolve, delay)
})
delay *= 2
retries -= 1
} else {
throw e
}
}
requestChunk = getValidBulkRequestChunk(requests)
}

return true
Expand Down

0 comments on commit b073327

Please sign in to comment.