Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporarily apply back pressure to maxWorkers and pollInterval when 429 errors occur #77096

Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6aa2f86
WIP
mikecote Aug 27, 2020
e71be99
Cleanup
mikecote Aug 27, 2020
0e093ba
Add error count to message
mikecote Aug 27, 2020
2f4a026
Reset observable values on stop
mikecote Sep 9, 2020
744ccfb
Add comments
mikecote Sep 9, 2020
7bed495
Fix issues when changing configurations
mikecote Sep 9, 2020
1a0de3e
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 16, 2020
c81634d
Cleanup code
mikecote Sep 16, 2020
b216f25
Cleanup pt2
mikecote Sep 16, 2020
c40cba1
Some renames
mikecote Sep 16, 2020
e3b1056
Fix typecheck
mikecote Sep 16, 2020
ad2531d
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 17, 2020
542537a
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 21, 2020
5a6bc17
Use observables to manage throughput
mikecote Sep 22, 2020
9f69ed3
Rename class
mikecote Sep 22, 2020
476a899
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 22, 2020
5f18950
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 24, 2020
38e3579
Switch to createManagedConfiguration
mikecote Sep 24, 2020
3effd76
Add some comments
mikecote Sep 24, 2020
02caec3
Start unit tests
mikecote Sep 24, 2020
a05e435
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 28, 2020
846784f
Add logs
mikecote Sep 28, 2020
b9cfd70
Fix log level
mikecote Sep 28, 2020
b7f46c5
Attempt at adding integration tests
mikecote Sep 29, 2020
6b8a614
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 29, 2020
c497979
Fix test failures
mikecote Sep 29, 2020
0817e5e
Fix timer
mikecote Sep 29, 2020
b2a62d0
Revert "Fix timer"
mikecote Sep 29, 2020
17c4435
Use Symbol
mikecote Sep 30, 2020
9f6a53b
Fix merge scan
mikecote Sep 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, new directory integration_tests? I guess the idea with these is that they actually launch a task manager to operate on, so a little different than our other jest tests. Cool - I could see us adding more tests here!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly! There's a few plugins that use the concept of jest integration tests to have something higher level than a unit test yet lower level than an API integration test to make sure it all works together. I agree there's a lot of potential here for future tests.

I realized the test ran by the node scripts/jest script instead of the node scripts/jest_integration. I'll look into it.

* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import sinon from 'sinon';
import { mockLogger } from '../test_utils';
import { TaskManager } from '../task_manager';
import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks';
import {
SavedObjectsSerializer,
SavedObjectTypeRegistry,
SavedObjectsErrorHelpers,
} from '../../../../../src/core/server';
import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration';

describe('managed configuration', () => {
let taskManager: TaskManager;
let clock: sinon.SinonFakeTimers;
const callAsInternalUser = jest.fn();
const logger = mockLogger();
const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry());
const savedObjectsClient = savedObjectsRepositoryMock.create();
const config = {
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
};

beforeEach(() => {
jest.resetAllMocks();
callAsInternalUser.mockResolvedValue({ total: 0, updated: 0, version_conflicts: 0 });
clock = sinon.useFakeTimers();
taskManager = new TaskManager({
config,
logger,
serializer,
callAsInternalUser,
taskManagerId: 'some-uuid',
savedObjectsRepository: savedObjectsClient,
});
taskManager.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
taskManager.start();
});

afterEach(() => clock.restore());

test('should lower max workers when Elasticsearch returns 429 error', async () => {
savedObjectsClient.create.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
);
// Cause "too many requests" error to be thrown
await expect(
taskManager.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Max workers configuration changing from 10 to 8 after seeing 1 error(s)'
);
expect(logger.debug).toHaveBeenCalledWith('Task pool now using 10 as the max worker value');
});

test('should increase poll interval when Elasticsearch returns 429 error', async () => {
savedObjectsClient.create.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
);
// Cause "too many requests" error to be thrown
await expect(
taskManager.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 3600 after seeing 1 error(s)'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import sinon from 'sinon';
import { Subject } from 'rxjs';
import { mockLogger } from '../test_utils';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import {
createManagedConfiguration,
ADJUST_THROUGHPUT_INTERVAL,
} from './create_managed_configuration';

describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
const logger = mockLogger();

beforeEach(() => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();
});

afterEach(() => clock.restore());

test('returns observables with initialized values', async () => {
const maxWorkersSubscription = jest.fn();
const pollIntervalSubscription = jest.fn();
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
logger,
errors$: new Subject<Error>(),
startingMaxWorkers: 1,
startingPollInterval: 2,
});
maxWorkersConfiguration$.subscribe(maxWorkersSubscription);
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1);
expect(maxWorkersSubscription).toHaveBeenNthCalledWith(1, 1);
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
expect(pollIntervalSubscription).toHaveBeenNthCalledWith(1, 2);
});

test(`skips errors that aren't about too many requests`, async () => {
const maxWorkersSubscription = jest.fn();
const pollIntervalSubscription = jest.fn();
const errors$ = new Subject<Error>();
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
errors$,
logger,
startingMaxWorkers: 100,
startingPollInterval: 100,
});
maxWorkersConfiguration$.subscribe(maxWorkersSubscription);
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
errors$.next(new Error('foo'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1);
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
});

describe('maxWorker configuration', () => {
function setupScenario(startingMaxWorkers: number) {
const errors$ = new Subject<Error>();
const subscription = jest.fn();
const { maxWorkersConfiguration$ } = createManagedConfiguration({
errors$,
startingMaxWorkers,
logger,
startingPollInterval: 1,
});
maxWorkersConfiguration$.subscribe(subscription);
return { subscription, errors$ };
}

beforeEach(() => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();
});

afterEach(() => clock.restore());

test('should decrease configuration at the next interval when an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 80);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
);
});

test('should increase configuration back to normal incrementally after an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
expect(subscription).toHaveBeenNthCalledWith(2, 80);
expect(subscription).toHaveBeenNthCalledWith(3, 84);
// 88.2- > 89 from Math.ceil
expect(subscription).toHaveBeenNthCalledWith(4, 89);
expect(subscription).toHaveBeenNthCalledWith(5, 94);
expect(subscription).toHaveBeenNthCalledWith(6, 99);
// 103.95 -> 100 from Math.min with starting value
expect(subscription).toHaveBeenNthCalledWith(7, 100);
// No new calls due to value not changing and usage of distinctUntilChanged()
expect(subscription).toHaveBeenCalledTimes(7);
});

test('should keep reducing configuration when errors keep emitting', async () => {
const { subscription, errors$ } = setupScenario(100);
for (let i = 0; i < 20; i++) {
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
}
expect(subscription).toHaveBeenNthCalledWith(2, 80);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could put these numbers in an array and do the expect()'s in a loop, but may be harder to debug problems that way ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about this as well. The two up sides I saw this way 1) it provides a clear example of how the configuration gets reduced from 100 when errors keep emitting 2) it allowed to add comments to explain some of the inner usage of Math.floor and distinctUntilChanged() as the assertions happened. I could always cut a few assertions out.

expect(subscription).toHaveBeenNthCalledWith(3, 64);
// 51.2 -> 51 from Math.floor
expect(subscription).toHaveBeenNthCalledWith(4, 51);
expect(subscription).toHaveBeenNthCalledWith(5, 40);
expect(subscription).toHaveBeenNthCalledWith(6, 32);
expect(subscription).toHaveBeenNthCalledWith(7, 25);
expect(subscription).toHaveBeenNthCalledWith(8, 20);
expect(subscription).toHaveBeenNthCalledWith(9, 16);
expect(subscription).toHaveBeenNthCalledWith(10, 12);
expect(subscription).toHaveBeenNthCalledWith(11, 9);
expect(subscription).toHaveBeenNthCalledWith(12, 7);
expect(subscription).toHaveBeenNthCalledWith(13, 5);
expect(subscription).toHaveBeenNthCalledWith(14, 4);
expect(subscription).toHaveBeenNthCalledWith(15, 3);
expect(subscription).toHaveBeenNthCalledWith(16, 2);
expect(subscription).toHaveBeenNthCalledWith(17, 1);
// No new calls due to value not changing and usage of distinctUntilChanged()
expect(subscription).toHaveBeenCalledTimes(17);
});
});

describe('pollInterval configuration', () => {
function setupScenario(startingPollInterval: number) {
const errors$ = new Subject<Error>();
const subscription = jest.fn();
const { pollIntervalConfiguration$ } = createManagedConfiguration({
logger,
errors$,
startingPollInterval,
startingMaxWorkers: 1,
});
pollIntervalConfiguration$.subscribe(subscription);
return { subscription, errors$ };
}

beforeEach(() => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();
});

afterEach(() => clock.restore());

test('should increase configuration at the next interval when an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
);
});

test('should decrease configuration back to normal incrementally after an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
expect(subscription).toHaveBeenNthCalledWith(2, 120);
expect(subscription).toHaveBeenNthCalledWith(3, 114);
// 108.3 -> 108 from Math.floor
expect(subscription).toHaveBeenNthCalledWith(4, 108);
expect(subscription).toHaveBeenNthCalledWith(5, 102);
// 96.9 -> 100 from Math.max with the starting value
expect(subscription).toHaveBeenNthCalledWith(6, 100);
// No new calls due to value not changing and usage of distinctUntilChanged()
expect(subscription).toHaveBeenCalledTimes(6);
});

test('should increase configuration when errors keep emitting', async () => {
const { subscription, errors$ } = setupScenario(100);
for (let i = 0; i < 3; i++) {
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
}
expect(subscription).toHaveBeenNthCalledWith(2, 120);
expect(subscription).toHaveBeenNthCalledWith(3, 144);
// 172.8 -> 173 from Math.ceil
expect(subscription).toHaveBeenNthCalledWith(4, 173);
});
});
});
Loading