Skip to content

Commit

Permalink
Add logs
Browse files Browse the repository at this point in the history
  • Loading branch information
mikecote committed Sep 28, 2020
1 parent a05e435 commit 846784f
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import sinon from 'sinon';
import { Subject } from 'rxjs';
import { mockLogger } from '../test_utils';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import {
createManagedConfiguration,
Expand All @@ -14,6 +15,7 @@ import {

describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
const logger = mockLogger();

beforeEach(() => {
jest.resetAllMocks();
Expand All @@ -26,6 +28,7 @@ describe('createManagedConfiguration()', () => {
const maxWorkersSubscription = jest.fn();
const pollIntervalSubscription = jest.fn();
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
logger,
errors$: new Subject<Error>(),
startingMaxWorkers: 1,
startingPollInterval: 2,
Expand All @@ -44,6 +47,7 @@ describe('createManagedConfiguration()', () => {
const errors$ = new Subject<Error>();
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
errors$,
logger,
startingMaxWorkers: 100,
startingPollInterval: 100,
});
Expand All @@ -62,6 +66,7 @@ describe('createManagedConfiguration()', () => {
const { maxWorkersConfiguration$ } = createManagedConfiguration({
errors$,
startingMaxWorkers,
logger,
startingPollInterval: 1,
});
maxWorkersConfiguration$.subscribe(subscription);
Expand All @@ -85,6 +90,15 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 80);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
);
});

test('should increase configuration back to normal incrementally after an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
Expand Down Expand Up @@ -134,6 +148,7 @@ describe('createManagedConfiguration()', () => {
const errors$ = new Subject<Error>();
const subscription = jest.fn();
const { pollIntervalConfiguration$ } = createManagedConfiguration({
logger,
errors$,
startingPollInterval,
startingMaxWorkers: 1,
Expand All @@ -159,6 +174,15 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
);
});

test('should decrease configuration back to normal incrementally after an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
Expand Down
104 changes: 72 additions & 32 deletions x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import { interval, merge, of, Observable } from 'rxjs';
import { filter, mergeScan, map, scan, distinctUntilChanged, startWith } from 'rxjs/operators';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import { Logger } from '../types';

const FLUSH_MARKER: string = 'FLUSH';
export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;
Expand All @@ -24,6 +25,7 @@ const POLL_INTERVAL_DECREASE_PERCENTAGE = 0.95;
const POLL_INTERVAL_INCREASE_PERCENTAGE = 1.2;

interface ManagedConfigurationOpts {
logger: Logger;
startingMaxWorkers: number;
startingPollInterval: number;
errors$: Observable<Error>;
Expand All @@ -34,48 +36,86 @@ interface ManagedConfiguration {
pollIntervalConfiguration$: Observable<number>;
}

export function createManagedConfiguration(opts: ManagedConfigurationOpts): ManagedConfiguration {
const errorCheck$ = countErrors(opts.errors$, ADJUST_THROUGHPUT_INTERVAL);

export function createManagedConfiguration({
logger,
startingMaxWorkers,
startingPollInterval,
errors$,
}: ManagedConfigurationOpts): ManagedConfiguration {
const errorCheck$ = countErrors(errors$, ADJUST_THROUGHPUT_INTERVAL);
return {
maxWorkersConfiguration$: errorCheck$.pipe(
scan((previousMaxWorkers, errorCount) => {
if (errorCount > 0) {
// Decrease max workers by MAX_WORKERS_DECREASE_PERCENTAGE while making sure it doesn't go lower than 1.
// Using Math.floor to make sure the number is different than previous while not being a decimal value.
return Math.max(Math.floor(previousMaxWorkers * MAX_WORKERS_DECREASE_PERCENTAGE), 1);
}
// Increase max workers by MAX_WORKERS_INCREASE_PERCENTAGE while making sure it doesn't go
// higher than the starting value. Using Math.ceil to make sure the number is different than
// previous while not being a decimal value
return Math.min(
opts.startingMaxWorkers,
Math.ceil(previousMaxWorkers * MAX_WORKERS_INCREASE_PERCENTAGE)
);
}, opts.startingMaxWorkers),
startWith(opts.startingMaxWorkers),
createMaxWorkersScan(logger, startingMaxWorkers),
startWith(startingMaxWorkers),
distinctUntilChanged()
),
pollIntervalConfiguration$: errorCheck$.pipe(
scan((previousPollInterval, errorCount) => {
if (errorCount > 0) {
// Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to
// make sure the number is different than previous while not being a decimal value.
return Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE);
}
// Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to
// make sure the number is different than previous while not being a decimal value.
return Math.max(
opts.startingPollInterval,
Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE)
);
}, opts.startingPollInterval),
startWith(opts.startingPollInterval),
createPollIntervalScan(logger, startingPollInterval),
startWith(startingPollInterval),
distinctUntilChanged()
),
};
}

function createMaxWorkersScan(logger: Logger, startingMaxWorkers: number) {
return scan((previousMaxWorkers: number, errorCount: number) => {
let newMaxWorkers: number;
if (errorCount > 0) {
// Decrease max workers by MAX_WORKERS_DECREASE_PERCENTAGE while making sure it doesn't go lower than 1.
// Using Math.floor to make sure the number is different than previous while not being a decimal value.
newMaxWorkers = Math.max(Math.floor(previousMaxWorkers * MAX_WORKERS_DECREASE_PERCENTAGE), 1);
} else {
// Increase max workers by MAX_WORKERS_INCREASE_PERCENTAGE while making sure it doesn't go
// higher than the starting value. Using Math.ceil to make sure the number is different than
// previous while not being a decimal value
newMaxWorkers = Math.min(
startingMaxWorkers,
Math.ceil(previousMaxWorkers * MAX_WORKERS_INCREASE_PERCENTAGE)
);
}
if (newMaxWorkers !== previousMaxWorkers) {
logger.debug(
`Max workers configuration changing from ${previousMaxWorkers} to ${newMaxWorkers} after seeing ${errorCount} error(s)`
);
if (previousMaxWorkers === startingMaxWorkers) {
logger.warn(
`Max workers configuration is temporarily reduced after Elasticsearch returned ${errorCount} "too many request" error(s).`
);
}
}
return newMaxWorkers;
}, startingMaxWorkers);
}

function createPollIntervalScan(logger: Logger, startingPollInterval: number) {
return scan((previousPollInterval: number, errorCount: number) => {
let newPollInterval: number;
if (errorCount > 0) {
// Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to
// make sure the number is different than previous while not being a decimal value.
newPollInterval = Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE);
} else {
// Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to
// make sure the number is different than previous while not being a decimal value.
newPollInterval = Math.max(
startingPollInterval,
Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE)
);
}
if (newPollInterval !== previousPollInterval) {
logger.debug(
`Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} error(s)`
);
if (previousPollInterval === startingPollInterval) {
logger.warn(
`Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" error(s).`
);
}
}
return newPollInterval;
}, startingPollInterval);
}

function countErrors(errors$: Observable<Error>, countInterval: number): Observable<number> {
return merge(
// Flush error count at fixed interval
Expand Down
13 changes: 12 additions & 1 deletion x-pack/plugins/task_manager/server/polling/task_poller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Subject, of, BehaviorSubject } from 'rxjs';
import { Option, none, some } from 'fp-ts/lib/Option';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import { sleep, resolvable, Resolvable } from '../test_utils';
import { sleep, resolvable, Resolvable, mockLogger } from '../test_utils';
import { asOk, asErr } from '../lib/result_type';

describe('TaskPoller', () => {
Expand All @@ -24,6 +24,7 @@ describe('TaskPoller', () => {

const work = jest.fn(async () => true);
createTaskPoller<void, boolean>({
logger: mockLogger(),
pollInterval$: of(pollInterval),
bufferCapacity,
getCapacity: () => 1,
Expand Down Expand Up @@ -58,6 +59,7 @@ describe('TaskPoller', () => {

const work = jest.fn(async () => true);
createTaskPoller<void, boolean>({
logger: mockLogger(),
pollInterval$,
bufferCapacity,
getCapacity: () => 1,
Expand Down Expand Up @@ -99,6 +101,7 @@ describe('TaskPoller', () => {

let hasCapacity = true;
createTaskPoller<void, boolean>({
logger: mockLogger(),
pollInterval$: of(pollInterval),
bufferCapacity,
work,
Expand Down Expand Up @@ -157,6 +160,7 @@ describe('TaskPoller', () => {
const work = jest.fn(async () => true);
const pollRequests$ = new Subject<Option<void>>();
createTaskPoller<void, boolean>({
logger: mockLogger(),
pollInterval$: of(pollInterval),
bufferCapacity,
work,
Expand Down Expand Up @@ -202,6 +206,7 @@ describe('TaskPoller', () => {
const work = jest.fn(async () => true);
const pollRequests$ = new Subject<Option<void>>();
createTaskPoller<void, boolean>({
logger: mockLogger(),
pollInterval$: of(pollInterval),
bufferCapacity,
work,
Expand Down Expand Up @@ -246,6 +251,7 @@ describe('TaskPoller', () => {
const work = jest.fn(async () => true);
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, boolean>({
logger: mockLogger(),
pollInterval$: of(pollInterval),
bufferCapacity,
work,
Expand Down Expand Up @@ -282,6 +288,7 @@ describe('TaskPoller', () => {
const handler = jest.fn();
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, string[]>({
logger: mockLogger(),
pollInterval$: of(pollInterval),
bufferCapacity,
work: async (...args) => {
Expand Down Expand Up @@ -332,6 +339,7 @@ describe('TaskPoller', () => {
type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
const pollRequests$ = new Subject<Option<ResolvableTupple>>();
createTaskPoller<[string, Resolvable], string[]>({
logger: mockLogger(),
pollInterval$: of(pollInterval),
bufferCapacity,
work: async (...resolvables) => {
Expand Down Expand Up @@ -391,6 +399,7 @@ describe('TaskPoller', () => {
const handler = jest.fn();
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, string[]>({
logger: mockLogger(),
pollInterval$: of(pollInterval),
bufferCapacity,
work: async (...args) => {
Expand Down Expand Up @@ -431,6 +440,7 @@ describe('TaskPoller', () => {
return callCount;
});
createTaskPoller<string, number>({
logger: mockLogger(),
pollInterval$: of(pollInterval),
bufferCapacity,
work,
Expand Down Expand Up @@ -473,6 +483,7 @@ describe('TaskPoller', () => {
const work = jest.fn(async () => {});
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, void>({
logger: mockLogger(),
pollInterval$: of(pollInterval),
bufferCapacity,
work,
Expand Down
8 changes: 7 additions & 1 deletion x-pack/plugins/task_manager/server/polling/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { mapTo, filter, scan, concatMap, tap, catchError, switchMap } from 'rxjs

import { pipe } from 'fp-ts/lib/pipeable';
import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
import { Logger } from '../types';
import { pullFromSet } from '../lib/pull_from_set';
import {
Result,
Expand All @@ -30,6 +31,7 @@ import { timeoutPromiseAfter } from './timeout_promise_after';
type WorkFn<T, H> = (...params: T[]) => Promise<H>;

interface Opts<T, H> {
logger: Logger;
pollInterval$: Observable<number>;
bufferCapacity: number;
getCapacity: () => number;
Expand All @@ -52,6 +54,7 @@ interface Opts<T, H> {
* of unique request argumets of type T. The queue holds all the buffered request arguments streamed in via pollRequests$
*/
export function createTaskPoller<T, H>({
logger,
pollInterval$,
getCapacity,
pollRequests$,
Expand All @@ -68,7 +71,10 @@ export function createTaskPoller<T, H>({
pollRequests$,
// emit a polling event on a fixed interval
pollInterval$.pipe(
switchMap((period) => interval(period)),
switchMap((period) => {
logger.info(`Task poller now using interval of ${period}ms`);
return interval(period);
}),
mapTo(none)
)
).pipe(
Expand Down
2 changes: 2 additions & 0 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ export class TaskManager {
this.store.events.subscribe((event) => this.events$.next(event));

const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
logger: this.logger,
errors$: this.store.errors$,
startingMaxWorkers: opts.config.max_workers,
startingPollInterval: opts.config.poll_interval,
Expand All @@ -173,6 +174,7 @@ export class TaskManager {
this.poller$ = createObservableMonitor<Result<FillPoolResult, PollingError<string>>, Error>(
() =>
createTaskPoller<string, FillPoolResult>({
logger: this.logger,
pollInterval$: pollIntervalConfiguration$,
bufferCapacity: opts.config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export class TaskPool {
constructor(opts: Opts) {
this.logger = opts.logger;
opts.maxWorkers$.subscribe((maxWorkers) => {
this.logger.debug(`Task pool now using ${maxWorkers} as the max worker value`);
this.maxWorkers = maxWorkers;
});
}
Expand Down

0 comments on commit 846784f

Please sign in to comment.