-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Temporarily apply back pressure to maxWorkers and pollInterval when 429 errors occur #77096
Changes from 28 commits
6aa2f86
e71be99
0e093ba
2f4a026
744ccfb
7bed495
1a0de3e
c81634d
b216f25
c40cba1
e3b1056
ad2531d
542537a
5a6bc17
9f69ed3
476a899
5f18950
38e3579
3effd76
02caec3
a05e435
846784f
b9cfd70
b7f46c5
6b8a614
c497979
0817e5e
b2a62d0
17c4435
9f6a53b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import sinon from 'sinon'; | ||
import { mockLogger } from '../test_utils'; | ||
import { TaskManager } from '../task_manager'; | ||
import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks'; | ||
import { | ||
SavedObjectsSerializer, | ||
SavedObjectTypeRegistry, | ||
SavedObjectsErrorHelpers, | ||
} from '../../../../../src/core/server'; | ||
import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration'; | ||
|
||
describe('managed configuration', () => { | ||
let taskManager: TaskManager; | ||
let clock: sinon.SinonFakeTimers; | ||
const callAsInternalUser = jest.fn(); | ||
const logger = mockLogger(); | ||
const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry()); | ||
const savedObjectsClient = savedObjectsRepositoryMock.create(); | ||
const config = { | ||
enabled: true, | ||
max_workers: 10, | ||
index: 'foo', | ||
max_attempts: 9, | ||
poll_interval: 3000, | ||
max_poll_inactivity_cycles: 10, | ||
request_capacity: 1000, | ||
}; | ||
|
||
beforeEach(() => { | ||
jest.resetAllMocks(); | ||
callAsInternalUser.mockResolvedValue({ total: 0, updated: 0, version_conflicts: 0 }); | ||
clock = sinon.useFakeTimers(); | ||
taskManager = new TaskManager({ | ||
config, | ||
logger, | ||
serializer, | ||
callAsInternalUser, | ||
taskManagerId: 'some-uuid', | ||
savedObjectsRepository: savedObjectsClient, | ||
}); | ||
taskManager.registerTaskDefinitions({ | ||
foo: { | ||
type: 'foo', | ||
title: 'Foo', | ||
createTaskRunner: jest.fn(), | ||
}, | ||
}); | ||
taskManager.start(); | ||
}); | ||
|
||
afterEach(() => clock.restore()); | ||
|
||
test('should lower max workers when Elasticsearch returns 429 error', async () => { | ||
savedObjectsClient.create.mockRejectedValueOnce( | ||
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b') | ||
); | ||
// Cause "too many requests" error to be thrown | ||
await expect( | ||
taskManager.schedule({ | ||
taskType: 'foo', | ||
state: {}, | ||
params: {}, | ||
}) | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL); | ||
expect(logger.warn).toHaveBeenCalledWith( | ||
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).' | ||
); | ||
expect(logger.debug).toHaveBeenCalledWith( | ||
'Max workers configuration changing from 10 to 8 after seeing 1 error(s)' | ||
); | ||
expect(logger.debug).toHaveBeenCalledWith('Task pool now using 10 as the max worker value'); | ||
}); | ||
|
||
test('should increase poll interval when Elasticsearch returns 429 error', async () => { | ||
savedObjectsClient.create.mockRejectedValueOnce( | ||
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b') | ||
); | ||
// Cause "too many requests" error to be thrown | ||
await expect( | ||
taskManager.schedule({ | ||
taskType: 'foo', | ||
state: {}, | ||
params: {}, | ||
}) | ||
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL); | ||
expect(logger.warn).toHaveBeenCalledWith( | ||
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).' | ||
); | ||
expect(logger.debug).toHaveBeenCalledWith( | ||
'Poll interval configuration changing from 3000 to 3600 after seeing 1 error(s)' | ||
); | ||
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms'); | ||
}); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License; | ||
* you may not use this file except in compliance with the Elastic License. | ||
*/ | ||
|
||
import sinon from 'sinon'; | ||
import { Subject } from 'rxjs'; | ||
import { mockLogger } from '../test_utils'; | ||
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server'; | ||
import { | ||
createManagedConfiguration, | ||
ADJUST_THROUGHPUT_INTERVAL, | ||
} from './create_managed_configuration'; | ||
|
||
describe('createManagedConfiguration()', () => { | ||
let clock: sinon.SinonFakeTimers; | ||
const logger = mockLogger(); | ||
|
||
beforeEach(() => { | ||
jest.resetAllMocks(); | ||
clock = sinon.useFakeTimers(); | ||
}); | ||
|
||
afterEach(() => clock.restore()); | ||
|
||
test('returns observables with initialized values', async () => { | ||
const maxWorkersSubscription = jest.fn(); | ||
const pollIntervalSubscription = jest.fn(); | ||
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({ | ||
logger, | ||
errors$: new Subject<Error>(), | ||
startingMaxWorkers: 1, | ||
startingPollInterval: 2, | ||
}); | ||
maxWorkersConfiguration$.subscribe(maxWorkersSubscription); | ||
pollIntervalConfiguration$.subscribe(pollIntervalSubscription); | ||
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1); | ||
expect(maxWorkersSubscription).toHaveBeenNthCalledWith(1, 1); | ||
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1); | ||
expect(pollIntervalSubscription).toHaveBeenNthCalledWith(1, 2); | ||
}); | ||
|
||
test(`skips errors that aren't about too many requests`, async () => { | ||
const maxWorkersSubscription = jest.fn(); | ||
const pollIntervalSubscription = jest.fn(); | ||
const errors$ = new Subject<Error>(); | ||
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({ | ||
errors$, | ||
logger, | ||
startingMaxWorkers: 100, | ||
startingPollInterval: 100, | ||
}); | ||
maxWorkersConfiguration$.subscribe(maxWorkersSubscription); | ||
pollIntervalConfiguration$.subscribe(pollIntervalSubscription); | ||
errors$.next(new Error('foo')); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL); | ||
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1); | ||
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1); | ||
}); | ||
|
||
describe('maxWorker configuration', () => { | ||
function setupScenario(startingMaxWorkers: number) { | ||
const errors$ = new Subject<Error>(); | ||
const subscription = jest.fn(); | ||
const { maxWorkersConfiguration$ } = createManagedConfiguration({ | ||
errors$, | ||
startingMaxWorkers, | ||
logger, | ||
startingPollInterval: 1, | ||
}); | ||
maxWorkersConfiguration$.subscribe(subscription); | ||
return { subscription, errors$ }; | ||
} | ||
|
||
beforeEach(() => { | ||
jest.resetAllMocks(); | ||
clock = sinon.useFakeTimers(); | ||
}); | ||
|
||
afterEach(() => clock.restore()); | ||
|
||
test('should decrease configuration at the next interval when an error is emitted', async () => { | ||
const { subscription, errors$ } = setupScenario(100); | ||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); | ||
expect(subscription).toHaveBeenCalledTimes(1); | ||
clock.tick(1); | ||
expect(subscription).toHaveBeenCalledTimes(2); | ||
expect(subscription).toHaveBeenNthCalledWith(2, 80); | ||
}); | ||
|
||
test('should log a warning when the configuration changes from the starting value', async () => { | ||
const { errors$ } = setupScenario(100); | ||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL); | ||
expect(logger.warn).toHaveBeenCalledWith( | ||
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).' | ||
); | ||
}); | ||
|
||
test('should increase configuration back to normal incrementally after an error is emitted', async () => { | ||
const { subscription, errors$ } = setupScenario(100); | ||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10); | ||
expect(subscription).toHaveBeenNthCalledWith(2, 80); | ||
expect(subscription).toHaveBeenNthCalledWith(3, 84); | ||
// 88.2- > 89 from Math.ceil | ||
expect(subscription).toHaveBeenNthCalledWith(4, 89); | ||
expect(subscription).toHaveBeenNthCalledWith(5, 94); | ||
expect(subscription).toHaveBeenNthCalledWith(6, 99); | ||
// 103.95 -> 100 from Math.min with starting value | ||
expect(subscription).toHaveBeenNthCalledWith(7, 100); | ||
// No new calls due to value not changing and usage of distinctUntilChanged() | ||
expect(subscription).toHaveBeenCalledTimes(7); | ||
}); | ||
|
||
test('should keep reducing configuration when errors keep emitting', async () => { | ||
const { subscription, errors$ } = setupScenario(100); | ||
for (let i = 0; i < 20; i++) { | ||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL); | ||
} | ||
expect(subscription).toHaveBeenNthCalledWith(2, 80); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: could put these numbers in an array and do the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've been thinking about this as well. The two up sides I saw this way 1) it provides a clear example of how the configuration gets reduced from 100 when errors keep emitting 2) it allowed to add comments to explain some of the inner usage of Math.floor and distinctUntilChanged() as the assertions happened. I could always cut a few assertions out. |
||
expect(subscription).toHaveBeenNthCalledWith(3, 64); | ||
// 51.2 -> 51 from Math.floor | ||
expect(subscription).toHaveBeenNthCalledWith(4, 51); | ||
expect(subscription).toHaveBeenNthCalledWith(5, 40); | ||
expect(subscription).toHaveBeenNthCalledWith(6, 32); | ||
expect(subscription).toHaveBeenNthCalledWith(7, 25); | ||
expect(subscription).toHaveBeenNthCalledWith(8, 20); | ||
expect(subscription).toHaveBeenNthCalledWith(9, 16); | ||
expect(subscription).toHaveBeenNthCalledWith(10, 12); | ||
expect(subscription).toHaveBeenNthCalledWith(11, 9); | ||
expect(subscription).toHaveBeenNthCalledWith(12, 7); | ||
expect(subscription).toHaveBeenNthCalledWith(13, 5); | ||
expect(subscription).toHaveBeenNthCalledWith(14, 4); | ||
expect(subscription).toHaveBeenNthCalledWith(15, 3); | ||
expect(subscription).toHaveBeenNthCalledWith(16, 2); | ||
expect(subscription).toHaveBeenNthCalledWith(17, 1); | ||
// No new calls due to value not changing and usage of distinctUntilChanged() | ||
expect(subscription).toHaveBeenCalledTimes(17); | ||
}); | ||
}); | ||
|
||
describe('pollInterval configuration', () => { | ||
function setupScenario(startingPollInterval: number) { | ||
const errors$ = new Subject<Error>(); | ||
const subscription = jest.fn(); | ||
const { pollIntervalConfiguration$ } = createManagedConfiguration({ | ||
logger, | ||
errors$, | ||
startingPollInterval, | ||
startingMaxWorkers: 1, | ||
}); | ||
pollIntervalConfiguration$.subscribe(subscription); | ||
return { subscription, errors$ }; | ||
} | ||
|
||
beforeEach(() => { | ||
jest.resetAllMocks(); | ||
clock = sinon.useFakeTimers(); | ||
}); | ||
|
||
afterEach(() => clock.restore()); | ||
|
||
test('should increase configuration at the next interval when an error is emitted', async () => { | ||
const { subscription, errors$ } = setupScenario(100); | ||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); | ||
expect(subscription).toHaveBeenCalledTimes(1); | ||
clock.tick(1); | ||
expect(subscription).toHaveBeenCalledTimes(2); | ||
expect(subscription).toHaveBeenNthCalledWith(2, 120); | ||
}); | ||
|
||
test('should log a warning when the configuration changes from the starting value', async () => { | ||
const { errors$ } = setupScenario(100); | ||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL); | ||
expect(logger.warn).toHaveBeenCalledWith( | ||
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).' | ||
); | ||
}); | ||
|
||
test('should decrease configuration back to normal incrementally after an error is emitted', async () => { | ||
const { subscription, errors$ } = setupScenario(100); | ||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10); | ||
expect(subscription).toHaveBeenNthCalledWith(2, 120); | ||
expect(subscription).toHaveBeenNthCalledWith(3, 114); | ||
// 108.3 -> 108 from Math.floor | ||
expect(subscription).toHaveBeenNthCalledWith(4, 108); | ||
expect(subscription).toHaveBeenNthCalledWith(5, 102); | ||
// 96.9 -> 100 from Math.max with the starting value | ||
expect(subscription).toHaveBeenNthCalledWith(6, 100); | ||
// No new calls due to value not changing and usage of distinctUntilChanged() | ||
expect(subscription).toHaveBeenCalledTimes(6); | ||
}); | ||
|
||
test('should increase configuration when errors keep emitting', async () => { | ||
const { subscription, errors$ } = setupScenario(100); | ||
for (let i = 0; i < 3; i++) { | ||
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); | ||
clock.tick(ADJUST_THROUGHPUT_INTERVAL); | ||
} | ||
expect(subscription).toHaveBeenNthCalledWith(2, 120); | ||
expect(subscription).toHaveBeenNthCalledWith(3, 144); | ||
// 172.8 -> 173 from Math.ceil | ||
expect(subscription).toHaveBeenNthCalledWith(4, 173); | ||
}); | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, new directory
integration_tests
? I guess the idea with these is that they actually launch a task manager to operate on, so a little different than our other jest tests. Cool - I could see us adding more tests here!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly! There's a few plugins that use the concept of jest integration tests to have something higher level than a unit test yet lower level than an API integration test to make sure it all works together. I agree there's a lot of potential here for future tests.
I realized the test ran by the
node scripts/jest
script instead of thenode scripts/jest_integration
. I'll look into it.