From aa787b6ddc2036f9cdd625bb92ce3263b65f0b86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20C=C3=B4t=C3=A9?= Date: Wed, 30 Sep 2020 19:54:18 -0400 Subject: [PATCH] Temporarily apply back pressure to maxWorkers and pollInterval when 429 errors occur (#77096) * WIP * Cleanup * Add error count to message * Reset observable values on stop * Add comments * Fix issues when changing configurations * Cleanup code * Cleanup pt2 * Some renames * Fix typecheck * Use observables to manage throughput * Rename class * Switch to createManagedConfiguration * Add some comments * Start unit tests * Add logs * Fix log level * Attempt at adding integration tests * Fix test failures * Fix timer * Revert "Fix timer" This reverts commit 0817e5e6a5ef9bdfe9329a559f4a5674ebbbef24. * Use Symbol * Fix merge scan --- .../managed_configuration.test.ts | 102 +++++++++ .../lib/create_managed_configuration.test.ts | 213 ++++++++++++++++++ .../lib/create_managed_configuration.ts | 160 +++++++++++++ .../server/polling/observable_monitor.ts | 11 +- .../server/polling/task_poller.test.ts | 13 +- .../server/polling/task_poller.ts | 8 +- .../task_manager/server/task_manager.ts | 16 +- .../task_manager/server/task_pool.test.ts | 8 +- .../plugins/task_manager/server/task_pool.ts | 1 + 9 files changed, 519 insertions(+), 13 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts create mode 100644 x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts create mode 100644 x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts new file mode 100644 index 0000000000000..4fc8ae899518c --- /dev/null +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import sinon from 'sinon'; +import { mockLogger } from '../test_utils'; +import { TaskManager } from '../task_manager'; +import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks'; +import { + SavedObjectsSerializer, + SavedObjectTypeRegistry, + SavedObjectsErrorHelpers, +} from '../../../../../src/core/server'; +import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration'; + +describe('managed configuration', () => { + let taskManager: TaskManager; + let clock: sinon.SinonFakeTimers; + const callAsInternalUser = jest.fn(); + const logger = mockLogger(); + const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry()); + const savedObjectsClient = savedObjectsRepositoryMock.create(); + const config = { + enabled: true, + max_workers: 10, + index: 'foo', + max_attempts: 9, + poll_interval: 3000, + max_poll_inactivity_cycles: 10, + request_capacity: 1000, + }; + + beforeEach(() => { + jest.resetAllMocks(); + callAsInternalUser.mockResolvedValue({ total: 0, updated: 0, version_conflicts: 0 }); + clock = sinon.useFakeTimers(); + taskManager = new TaskManager({ + config, + logger, + serializer, + callAsInternalUser, + taskManagerId: 'some-uuid', + savedObjectsRepository: savedObjectsClient, + }); + taskManager.registerTaskDefinitions({ + foo: { + type: 'foo', + title: 'Foo', + createTaskRunner: jest.fn(), + }, + }); + taskManager.start(); + }); + + afterEach(() => clock.restore()); + + test('should lower max workers when Elasticsearch returns 429 error', async () => { + savedObjectsClient.create.mockRejectedValueOnce( + SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b') + ); + // Cause "too many requests" error to be thrown + await expect( + taskManager.schedule({ + taskType: 'foo', + state: {}, + params: {}, + }) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + expect(logger.warn).toHaveBeenCalledWith( + 'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).' + ); + expect(logger.debug).toHaveBeenCalledWith( + 'Max workers configuration changing from 10 to 8 after seeing 1 error(s)' + ); + expect(logger.debug).toHaveBeenCalledWith('Task pool now using 10 as the max worker value'); + }); + + test('should increase poll interval when Elasticsearch returns 429 error', async () => { + savedObjectsClient.create.mockRejectedValueOnce( + SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b') + ); + // Cause "too many requests" error to be thrown + await expect( + taskManager.schedule({ + taskType: 'foo', + state: {}, + params: {}, + }) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + expect(logger.warn).toHaveBeenCalledWith( + 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).' + ); + expect(logger.debug).toHaveBeenCalledWith( + 'Poll interval configuration changing from 3000 to 3600 after seeing 1 error(s)' + ); + expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms'); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts new file mode 100644 index 0000000000000..b6b5cd003c5d4 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts @@ -0,0 +1,213 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import sinon from 'sinon'; +import { Subject } from 'rxjs'; +import { mockLogger } from '../test_utils'; +import { SavedObjectsErrorHelpers } from '../../../../../src/core/server'; +import { + createManagedConfiguration, + ADJUST_THROUGHPUT_INTERVAL, +} from './create_managed_configuration'; + +describe('createManagedConfiguration()', () => { + let clock: sinon.SinonFakeTimers; + const logger = mockLogger(); + + beforeEach(() => { + jest.resetAllMocks(); + clock = sinon.useFakeTimers(); + }); + + afterEach(() => clock.restore()); + + test('returns observables with initialized values', async () => { + const maxWorkersSubscription = jest.fn(); + const pollIntervalSubscription = jest.fn(); + const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({ + logger, + errors$: new Subject(), + startingMaxWorkers: 1, + startingPollInterval: 2, + }); + maxWorkersConfiguration$.subscribe(maxWorkersSubscription); + pollIntervalConfiguration$.subscribe(pollIntervalSubscription); + expect(maxWorkersSubscription).toHaveBeenCalledTimes(1); + expect(maxWorkersSubscription).toHaveBeenNthCalledWith(1, 1); + expect(pollIntervalSubscription).toHaveBeenCalledTimes(1); + expect(pollIntervalSubscription).toHaveBeenNthCalledWith(1, 2); + }); + + test(`skips errors that aren't about too many requests`, async () => { + const maxWorkersSubscription = jest.fn(); + const pollIntervalSubscription = jest.fn(); + const errors$ = new Subject(); + const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({ + errors$, + logger, + startingMaxWorkers: 100, + startingPollInterval: 100, + }); + maxWorkersConfiguration$.subscribe(maxWorkersSubscription); + pollIntervalConfiguration$.subscribe(pollIntervalSubscription); + errors$.next(new Error('foo')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + expect(maxWorkersSubscription).toHaveBeenCalledTimes(1); + expect(pollIntervalSubscription).toHaveBeenCalledTimes(1); + }); + + describe('maxWorker configuration', () => { + function setupScenario(startingMaxWorkers: number) { + const errors$ = new Subject(); + const subscription = jest.fn(); + const { maxWorkersConfiguration$ } = createManagedConfiguration({ + errors$, + startingMaxWorkers, + logger, + startingPollInterval: 1, + }); + maxWorkersConfiguration$.subscribe(subscription); + return { subscription, errors$ }; + } + + beforeEach(() => { + jest.resetAllMocks(); + clock = sinon.useFakeTimers(); + }); + + afterEach(() => clock.restore()); + + test('should decrease configuration at the next interval when an error is emitted', async () => { + const { subscription, errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 80); + }); + + test('should log a warning when the configuration changes from the starting value', async () => { + const { errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + expect(logger.warn).toHaveBeenCalledWith( + 'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).' + ); + }); + + test('should increase configuration back to normal incrementally after an error is emitted', async () => { + const { subscription, errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10); + expect(subscription).toHaveBeenNthCalledWith(2, 80); + expect(subscription).toHaveBeenNthCalledWith(3, 84); + // 88.2- > 89 from Math.ceil + expect(subscription).toHaveBeenNthCalledWith(4, 89); + expect(subscription).toHaveBeenNthCalledWith(5, 94); + expect(subscription).toHaveBeenNthCalledWith(6, 99); + // 103.95 -> 100 from Math.min with starting value + expect(subscription).toHaveBeenNthCalledWith(7, 100); + // No new calls due to value not changing and usage of distinctUntilChanged() + expect(subscription).toHaveBeenCalledTimes(7); + }); + + test('should keep reducing configuration when errors keep emitting', async () => { + const { subscription, errors$ } = setupScenario(100); + for (let i = 0; i < 20; i++) { + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + } + expect(subscription).toHaveBeenNthCalledWith(2, 80); + expect(subscription).toHaveBeenNthCalledWith(3, 64); + // 51.2 -> 51 from Math.floor + expect(subscription).toHaveBeenNthCalledWith(4, 51); + expect(subscription).toHaveBeenNthCalledWith(5, 40); + expect(subscription).toHaveBeenNthCalledWith(6, 32); + expect(subscription).toHaveBeenNthCalledWith(7, 25); + expect(subscription).toHaveBeenNthCalledWith(8, 20); + expect(subscription).toHaveBeenNthCalledWith(9, 16); + expect(subscription).toHaveBeenNthCalledWith(10, 12); + expect(subscription).toHaveBeenNthCalledWith(11, 9); + expect(subscription).toHaveBeenNthCalledWith(12, 7); + expect(subscription).toHaveBeenNthCalledWith(13, 5); + expect(subscription).toHaveBeenNthCalledWith(14, 4); + expect(subscription).toHaveBeenNthCalledWith(15, 3); + expect(subscription).toHaveBeenNthCalledWith(16, 2); + expect(subscription).toHaveBeenNthCalledWith(17, 1); + // No new calls due to value not changing and usage of distinctUntilChanged() + expect(subscription).toHaveBeenCalledTimes(17); + }); + }); + + describe('pollInterval configuration', () => { + function setupScenario(startingPollInterval: number) { + const errors$ = new Subject(); + const subscription = jest.fn(); + const { pollIntervalConfiguration$ } = createManagedConfiguration({ + logger, + errors$, + startingPollInterval, + startingMaxWorkers: 1, + }); + pollIntervalConfiguration$.subscribe(subscription); + return { subscription, errors$ }; + } + + beforeEach(() => { + jest.resetAllMocks(); + clock = sinon.useFakeTimers(); + }); + + afterEach(() => clock.restore()); + + test('should increase configuration at the next interval when an error is emitted', async () => { + const { subscription, errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 120); + }); + + test('should log a warning when the configuration changes from the starting value', async () => { + const { errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + expect(logger.warn).toHaveBeenCalledWith( + 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).' + ); + }); + + test('should decrease configuration back to normal incrementally after an error is emitted', async () => { + const { subscription, errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10); + expect(subscription).toHaveBeenNthCalledWith(2, 120); + expect(subscription).toHaveBeenNthCalledWith(3, 114); + // 108.3 -> 108 from Math.floor + expect(subscription).toHaveBeenNthCalledWith(4, 108); + expect(subscription).toHaveBeenNthCalledWith(5, 102); + // 96.9 -> 100 from Math.max with the starting value + expect(subscription).toHaveBeenNthCalledWith(6, 100); + // No new calls due to value not changing and usage of distinctUntilChanged() + expect(subscription).toHaveBeenCalledTimes(6); + }); + + test('should increase configuration when errors keep emitting', async () => { + const { subscription, errors$ } = setupScenario(100); + for (let i = 0; i < 3; i++) { + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + } + expect(subscription).toHaveBeenNthCalledWith(2, 120); + expect(subscription).toHaveBeenNthCalledWith(3, 144); + // 172.8 -> 173 from Math.ceil + expect(subscription).toHaveBeenNthCalledWith(4, 173); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts new file mode 100644 index 0000000000000..3dc5fd50d3ca4 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { interval, merge, of, Observable } from 'rxjs'; +import { filter, mergeScan, map, scan, distinctUntilChanged, startWith } from 'rxjs/operators'; +import { SavedObjectsErrorHelpers } from '../../../../../src/core/server'; +import { Logger } from '../types'; + +const FLUSH_MARKER = Symbol('flush'); +export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000; + +// When errors occur, reduce maxWorkers by MAX_WORKERS_DECREASE_PERCENTAGE +// When errors no longer occur, start increasing maxWorkers by MAX_WORKERS_INCREASE_PERCENTAGE +// until starting value is reached +const MAX_WORKERS_DECREASE_PERCENTAGE = 0.8; +const MAX_WORKERS_INCREASE_PERCENTAGE = 1.05; + +// When errors occur, increase pollInterval by POLL_INTERVAL_INCREASE_PERCENTAGE +// When errors no longer occur, start decreasing pollInterval by POLL_INTERVAL_DECREASE_PERCENTAGE +// until starting value is reached +const POLL_INTERVAL_DECREASE_PERCENTAGE = 0.95; +const POLL_INTERVAL_INCREASE_PERCENTAGE = 1.2; + +interface ManagedConfigurationOpts { + logger: Logger; + startingMaxWorkers: number; + startingPollInterval: number; + errors$: Observable; +} + +interface ManagedConfiguration { + maxWorkersConfiguration$: Observable; + pollIntervalConfiguration$: Observable; +} + +export function createManagedConfiguration({ + logger, + startingMaxWorkers, + startingPollInterval, + errors$, +}: ManagedConfigurationOpts): ManagedConfiguration { + const errorCheck$ = countErrors(errors$, ADJUST_THROUGHPUT_INTERVAL); + return { + maxWorkersConfiguration$: errorCheck$.pipe( + createMaxWorkersScan(logger, startingMaxWorkers), + startWith(startingMaxWorkers), + distinctUntilChanged() + ), + pollIntervalConfiguration$: errorCheck$.pipe( + createPollIntervalScan(logger, startingPollInterval), + startWith(startingPollInterval), + distinctUntilChanged() + ), + }; +} + +function createMaxWorkersScan(logger: Logger, startingMaxWorkers: number) { + return scan((previousMaxWorkers: number, errorCount: number) => { + let newMaxWorkers: number; + if (errorCount > 0) { + // Decrease max workers by MAX_WORKERS_DECREASE_PERCENTAGE while making sure it doesn't go lower than 1. + // Using Math.floor to make sure the number is different than previous while not being a decimal value. + newMaxWorkers = Math.max(Math.floor(previousMaxWorkers * MAX_WORKERS_DECREASE_PERCENTAGE), 1); + } else { + // Increase max workers by MAX_WORKERS_INCREASE_PERCENTAGE while making sure it doesn't go + // higher than the starting value. Using Math.ceil to make sure the number is different than + // previous while not being a decimal value + newMaxWorkers = Math.min( + startingMaxWorkers, + Math.ceil(previousMaxWorkers * MAX_WORKERS_INCREASE_PERCENTAGE) + ); + } + if (newMaxWorkers !== previousMaxWorkers) { + logger.debug( + `Max workers configuration changing from ${previousMaxWorkers} to ${newMaxWorkers} after seeing ${errorCount} error(s)` + ); + if (previousMaxWorkers === startingMaxWorkers) { + logger.warn( + `Max workers configuration is temporarily reduced after Elasticsearch returned ${errorCount} "too many request" error(s).` + ); + } + } + return newMaxWorkers; + }, startingMaxWorkers); +} + +function createPollIntervalScan(logger: Logger, startingPollInterval: number) { + return scan((previousPollInterval: number, errorCount: number) => { + let newPollInterval: number; + if (errorCount > 0) { + // Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to + // make sure the number is different than previous while not being a decimal value. + newPollInterval = Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE); + } else { + // Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to + // make sure the number is different than previous while not being a decimal value. + newPollInterval = Math.max( + startingPollInterval, + Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE) + ); + } + if (newPollInterval !== previousPollInterval) { + logger.debug( + `Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} error(s)` + ); + if (previousPollInterval === startingPollInterval) { + logger.warn( + `Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" error(s).` + ); + } + } + return newPollInterval; + }, startingPollInterval); +} + +function countErrors(errors$: Observable, countInterval: number): Observable { + return merge( + // Flush error count at fixed interval + interval(countInterval).pipe(map(() => FLUSH_MARKER)), + errors$.pipe(filter((e) => SavedObjectsErrorHelpers.isTooManyRequestsError(e))) + ).pipe( + // When tag is "flush", reset the error counter + // Otherwise increment the error counter + mergeScan(({ count }, next) => { + return next === FLUSH_MARKER + ? of(emitErrorCount(count), resetErrorCount()) + : of(incementErrorCount(count)); + }, emitErrorCount(0)), + filter(isEmitEvent), + map(({ count }) => count) + ); +} + +function emitErrorCount(count: number) { + return { + tag: 'emit', + count, + }; +} + +function isEmitEvent(event: { tag: string; count: number }) { + return event.tag === 'emit'; +} + +function incementErrorCount(count: number) { + return { + tag: 'inc', + count: count + 1, + }; +} + +function resetErrorCount() { + return { + tag: 'initial', + count: 0, + }; +} diff --git a/x-pack/plugins/task_manager/server/polling/observable_monitor.ts b/x-pack/plugins/task_manager/server/polling/observable_monitor.ts index 7b06117ef59d1..e0c31f7014a6a 100644 --- a/x-pack/plugins/task_manager/server/polling/observable_monitor.ts +++ b/x-pack/plugins/task_manager/server/polling/observable_monitor.ts @@ -5,8 +5,16 @@ */ import { Subject, Observable, throwError, interval, timer, Subscription } from 'rxjs'; -import { exhaustMap, tap, takeUntil, switchMap, switchMapTo, catchError } from 'rxjs/operators'; import { noop } from 'lodash'; +import { + exhaustMap, + tap, + takeUntil, + switchMap, + switchMapTo, + catchError, + startWith, +} from 'rxjs/operators'; const DEFAULT_HEARTBEAT_INTERVAL = 1000; @@ -31,6 +39,7 @@ export function createObservableMonitor( return new Observable((subscriber) => { const subscription: Subscription = interval(heartbeatInterval) .pipe( + startWith(0), // switch from the heartbeat interval to the instantiated observable until it completes / errors exhaustMap(() => takeUntilDurationOfInactivity(observableFactory(), inactivityTimeout)), // if an error is thrown, catch it, notify and try to recover diff --git a/x-pack/plugins/task_manager/server/polling/task_poller.test.ts b/x-pack/plugins/task_manager/server/polling/task_poller.test.ts index 1c6aff2ad58b9..956c8b05f3860 100644 --- a/x-pack/plugins/task_manager/server/polling/task_poller.test.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.test.ts @@ -9,7 +9,7 @@ import { Subject, of, BehaviorSubject } from 'rxjs'; import { Option, none, some } from 'fp-ts/lib/Option'; import { createTaskPoller, PollingError, PollingErrorType } from './task_poller'; import { fakeSchedulers } from 'rxjs-marbles/jest'; -import { sleep, resolvable, Resolvable } from '../test_utils'; +import { sleep, resolvable, Resolvable, mockLogger } from '../test_utils'; import { asOk, asErr } from '../lib/result_type'; describe('TaskPoller', () => { @@ -24,6 +24,7 @@ describe('TaskPoller', () => { const work = jest.fn(async () => true); createTaskPoller({ + logger: mockLogger(), pollInterval$: of(pollInterval), bufferCapacity, getCapacity: () => 1, @@ -58,6 +59,7 @@ describe('TaskPoller', () => { const work = jest.fn(async () => true); createTaskPoller({ + logger: mockLogger(), pollInterval$, bufferCapacity, getCapacity: () => 1, @@ -99,6 +101,7 @@ describe('TaskPoller', () => { let hasCapacity = true; createTaskPoller({ + logger: mockLogger(), pollInterval$: of(pollInterval), bufferCapacity, work, @@ -157,6 +160,7 @@ describe('TaskPoller', () => { const work = jest.fn(async () => true); const pollRequests$ = new Subject>(); createTaskPoller({ + logger: mockLogger(), pollInterval$: of(pollInterval), bufferCapacity, work, @@ -202,6 +206,7 @@ describe('TaskPoller', () => { const work = jest.fn(async () => true); const pollRequests$ = new Subject>(); createTaskPoller({ + logger: mockLogger(), pollInterval$: of(pollInterval), bufferCapacity, work, @@ -246,6 +251,7 @@ describe('TaskPoller', () => { const work = jest.fn(async () => true); const pollRequests$ = new Subject>(); createTaskPoller({ + logger: mockLogger(), pollInterval$: of(pollInterval), bufferCapacity, work, @@ -282,6 +288,7 @@ describe('TaskPoller', () => { const handler = jest.fn(); const pollRequests$ = new Subject>(); createTaskPoller({ + logger: mockLogger(), pollInterval$: of(pollInterval), bufferCapacity, work: async (...args) => { @@ -332,6 +339,7 @@ describe('TaskPoller', () => { type ResolvableTupple = [string, PromiseLike & Resolvable]; const pollRequests$ = new Subject>(); createTaskPoller<[string, Resolvable], string[]>({ + logger: mockLogger(), pollInterval$: of(pollInterval), bufferCapacity, work: async (...resolvables) => { @@ -391,6 +399,7 @@ describe('TaskPoller', () => { const handler = jest.fn(); const pollRequests$ = new Subject>(); createTaskPoller({ + logger: mockLogger(), pollInterval$: of(pollInterval), bufferCapacity, work: async (...args) => { @@ -431,6 +440,7 @@ describe('TaskPoller', () => { return callCount; }); createTaskPoller({ + logger: mockLogger(), pollInterval$: of(pollInterval), bufferCapacity, work, @@ -473,6 +483,7 @@ describe('TaskPoller', () => { const work = jest.fn(async () => {}); const pollRequests$ = new Subject>(); createTaskPoller({ + logger: mockLogger(), pollInterval$: of(pollInterval), bufferCapacity, work, diff --git a/x-pack/plugins/task_manager/server/polling/task_poller.ts b/x-pack/plugins/task_manager/server/polling/task_poller.ts index 867d01691c41d..7515668a19d40 100644 --- a/x-pack/plugins/task_manager/server/polling/task_poller.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.ts @@ -15,6 +15,7 @@ import { mapTo, filter, scan, concatMap, tap, catchError, switchMap } from 'rxjs import { pipe } from 'fp-ts/lib/pipeable'; import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option'; +import { Logger } from '../types'; import { pullFromSet } from '../lib/pull_from_set'; import { Result, @@ -30,6 +31,7 @@ import { timeoutPromiseAfter } from './timeout_promise_after'; type WorkFn = (...params: T[]) => Promise; interface Opts { + logger: Logger; pollInterval$: Observable; bufferCapacity: number; getCapacity: () => number; @@ -52,6 +54,7 @@ interface Opts { * of unique request argumets of type T. The queue holds all the buffered request arguments streamed in via pollRequests$ */ export function createTaskPoller({ + logger, pollInterval$, getCapacity, pollRequests$, @@ -68,7 +71,10 @@ export function createTaskPoller({ pollRequests$, // emit a polling event on a fixed interval pollInterval$.pipe( - switchMap((period) => interval(period)), + switchMap((period) => { + logger.debug(`Task poller now using interval of ${period}ms`); + return interval(period); + }), mapTo(none) ) ).pipe( diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 6a39f2a762e75..cc611e124ea7b 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -3,7 +3,7 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -import { BehaviorSubject, Subject, Observable, Subscription } from 'rxjs'; +import { Subject, Observable, Subscription } from 'rxjs'; import { filter } from 'rxjs/operators'; import { performance } from 'perf_hooks'; @@ -17,6 +17,7 @@ import { ISavedObjectsRepository, } from '../../../../src/core/server'; import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type'; +import { createManagedConfiguration } from './lib/create_managed_configuration'; import { TaskManagerConfig } from './config'; import { Logger } from './types'; @@ -149,8 +150,12 @@ export class TaskManager { // pipe store events into the TaskManager's event stream this.store.events.subscribe((event) => this.events$.next(event)); - const maxWorkers$ = new BehaviorSubject(opts.config.max_workers); - const pollInterval$ = new BehaviorSubject(opts.config.poll_interval); + const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({ + logger: this.logger, + errors$: this.store.errors$, + startingMaxWorkers: opts.config.max_workers, + startingPollInterval: opts.config.poll_interval, + }); this.bufferedStore = new BufferedTaskStore(this.store, { bufferMaxOperations: opts.config.max_workers, @@ -159,7 +164,7 @@ export class TaskManager { this.pool = new TaskPool({ logger: this.logger, - maxWorkers$, + maxWorkers$: maxWorkersConfiguration$, }); const { @@ -169,7 +174,8 @@ export class TaskManager { this.poller$ = createObservableMonitor>, Error>( () => createTaskPoller({ - pollInterval$, + logger: this.logger, + pollInterval$: pollIntervalConfiguration$, bufferCapacity: opts.config.request_capacity, getCapacity: () => this.pool.availableWorkers, pollRequests$: this.claimRequests$, diff --git a/x-pack/plugins/task_manager/server/task_pool.test.ts b/x-pack/plugins/task_manager/server/task_pool.test.ts index ec6613ece4eed..12b731b2b78ae 100644 --- a/x-pack/plugins/task_manager/server/task_pool.test.ts +++ b/x-pack/plugins/task_manager/server/task_pool.test.ts @@ -130,11 +130,9 @@ describe('TaskPool', () => { const result = await pool.run([mockTask(), taskFailedToRun, mockTask()]); - expect(logger.debug.mock.calls[0]).toMatchInlineSnapshot(` - Array [ - "Task TaskType \\"shooooo\\" failed in attempt to run: Saved object [task/foo] not found", - ] - `); + expect(logger.debug).toHaveBeenCalledWith( + 'Task TaskType "shooooo" failed in attempt to run: Saved object [task/foo] not found' + ); expect(logger.warn).not.toHaveBeenCalled(); expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks); diff --git a/x-pack/plugins/task_manager/server/task_pool.ts b/x-pack/plugins/task_manager/server/task_pool.ts index c029349c13b77..44f5f5648c2ac 100644 --- a/x-pack/plugins/task_manager/server/task_pool.ts +++ b/x-pack/plugins/task_manager/server/task_pool.ts @@ -47,6 +47,7 @@ export class TaskPool { constructor(opts: Opts) { this.logger = opts.logger; opts.maxWorkers$.subscribe((maxWorkers) => { + this.logger.debug(`Task pool now using ${maxWorkers} as the max worker value`); this.maxWorkers = maxWorkers; }); }