Skip to content

Commit

Permalink
Apply backpressure to the task poller whenever Elasticsearch requests…
Browse files Browse the repository at this point in the history
… respond with 503 errors (elastic#196900)

Resolves: elastic#195134

This PR adds 503 error check to the error filter of
`createManagedConfiguration` function, besides the 501 error .
So it applies backpressure to the task poller for 503 errors as well.

(cherry picked from commit 292a7d3)
  • Loading branch information
ersin-erdal committed Oct 24, 2024
1 parent 2cd1a7c commit c8a8b4b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,17 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should decrease configuration at the next interval when a 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(SavedObjectsErrorHelpers.createGenericNotFoundEsUnavailableError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(10);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
Expand Down Expand Up @@ -235,6 +246,17 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should decrease configuration at the next interval when a 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET);
errors$.next(SavedObjectsErrorHelpers.createGenericNotFoundEsUnavailableError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
Expand Down Expand Up @@ -305,6 +327,16 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});

test('should increase configuration at the next interval when a 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createGenericNotFoundEsUnavailableError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ function countErrors(errors$: Observable<Error>, countInterval: number): Observa
interval(countInterval).pipe(map(() => FLUSH_MARKER)),
errors$.pipe(
filter(
(e) => SavedObjectsErrorHelpers.isTooManyRequestsError(e) || isEsCannotExecuteScriptError(e)
(e) =>
SavedObjectsErrorHelpers.isTooManyRequestsError(e) ||
SavedObjectsErrorHelpers.isEsUnavailableError(e) ||
isEsCannotExecuteScriptError(e)
)
)
).pipe(
Expand Down

0 comments on commit c8a8b4b

Please sign in to comment.