Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporarily apply back pressure to maxWorkers and pollInterval when 429 errors occur #77096

Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6aa2f86
WIP
mikecote Aug 27, 2020
e71be99
Cleanup
mikecote Aug 27, 2020
0e093ba
Add error count to message
mikecote Aug 27, 2020
2f4a026
Reset observable values on stop
mikecote Sep 9, 2020
744ccfb
Add comments
mikecote Sep 9, 2020
7bed495
Fix issues when changing configurations
mikecote Sep 9, 2020
1a0de3e
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 16, 2020
c81634d
Cleanup code
mikecote Sep 16, 2020
b216f25
Cleanup pt2
mikecote Sep 16, 2020
c40cba1
Some renames
mikecote Sep 16, 2020
e3b1056
Fix typecheck
mikecote Sep 16, 2020
ad2531d
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 17, 2020
542537a
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 21, 2020
5a6bc17
Use observables to manage throughput
mikecote Sep 22, 2020
9f69ed3
Rename class
mikecote Sep 22, 2020
476a899
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 22, 2020
5f18950
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 24, 2020
38e3579
Switch to createManagedConfiguration
mikecote Sep 24, 2020
3effd76
Add some comments
mikecote Sep 24, 2020
02caec3
Start unit tests
mikecote Sep 24, 2020
a05e435
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 28, 2020
846784f
Add logs
mikecote Sep 28, 2020
b9cfd70
Fix log level
mikecote Sep 28, 2020
b7f46c5
Attempt at adding integration tests
mikecote Sep 29, 2020
6b8a614
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
mikecote Sep 29, 2020
c497979
Fix test failures
mikecote Sep 29, 2020
0817e5e
Fix timer
mikecote Sep 29, 2020
b2a62d0
Revert "Fix timer"
mikecote Sep 29, 2020
17c4435
Use Symbol
mikecote Sep 30, 2020
9f6a53b
Fix merge scan
mikecote Sep 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 173 additions & 0 deletions x-pack/plugins/task_manager/server/lib/throughput_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { interval, merge, of, BehaviorSubject, Observable, Subscription } from 'rxjs';
import { filter, mergeScan, map } from 'rxjs/operators';
import { Logger } from '../types';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';

const FLUSH_MARKER: string = 'FLUSH';
const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;

// When errors occur, reduce maxWorkers by MAX_WORKERS_DECREASE_PERCENTAGE
// When errors no longer occur, start increasing maxWorkers by MAX_WORKERS_INCREASE_PERCENTAGE
// until starting value is reached
const MAX_WORKERS_DECREASE_PERCENTAGE = 0.8;
const MAX_WORKERS_INCREASE_PERCENTAGE = 1.05;

// When errors occur, increase pollInterval by POLL_INTERVAL_INCREASE_PERCENTAGE
// When errors no longer occur, start decreasing pollInterval by POLL_INTERVAL_DECREASE_PERCENTAGE
// until starting value is reached
const POLL_INTERVAL_DECREASE_PERCENTAGE = 0.95;
const POLL_INTERVAL_INCREASE_PERCENTAGE = 1.2;

interface DynamicConfiguration<T> {
startingValue: T;
currentValue: T;
observable$: BehaviorSubject<T>;
}

interface ThroughputManagerOpts {
maxWorkers$: BehaviorSubject<number>;
pollInterval$: BehaviorSubject<number>;
startingMaxWorkers: number;
startingPollInterval: number;
errors$: Observable<Error>;
logger: Logger;
}

export class ThroughputManager {
private throughputCheckSubcription?: Subscription;
private readonly throughputCheck$: Observable<number>;
private readonly logger: Logger;
private readonly maxWorkers: DynamicConfiguration<number>;
private readonly pollInterval: DynamicConfiguration<number>;
private readonly errors$: Observable<Error>;

constructor(opts: ThroughputManagerOpts) {
this.logger = opts.logger;
this.errors$ = opts.errors$;
this.maxWorkers = {
startingValue: opts.startingMaxWorkers,
currentValue: opts.startingMaxWorkers,
observable$: opts.maxWorkers$,
};
this.pollInterval = {
startingValue: opts.startingPollInterval,
currentValue: opts.startingPollInterval,
observable$: opts.pollInterval$,
};
// Count the number of errors from errors$ and reset whenever throughputCheckInterval$ flushes it
this.throughputCheck$ = merge(
// Flush error count at fixed interval
interval(ADJUST_THROUGHPUT_INTERVAL).pipe(map(() => FLUSH_MARKER)),
this.errors$.pipe(filter((e) => SavedObjectsErrorHelpers.isTooManyRequestsError(e)))
).pipe(
// When tag is "flush", reset the error counter
// Otherwise increment the error counter
mergeScan(
({ count }, next) =>
next === FLUSH_MARKER
? of(emitErrorCount(count), resetErrorCount())
: of(incementErrorCount(count)),
emitErrorCount(0)
),
filter(isEmitEvent),
map(({ count }) => count)
);
}

public get isStarted() {
return this.throughputCheckSubcription && !this.throughputCheckSubcription.closed;
}

public start() {
if (!this.isStarted) {
this.throughputCheckSubcription = this.throughputCheck$.subscribe((errorCount) => {
if (errorCount > 0) {
this.reduceThroughput(errorCount);
return;
}
this.increaseThroughput();
});
mikecote marked this conversation as resolved.
Show resolved Hide resolved
}
}

public stop() {
this.throughputCheckSubcription?.unsubscribe();
// Reset observable values to original values
this.updateConfiguration(this.maxWorkers.startingValue, this.pollInterval.startingValue);
}
private reduceThroughput(errorCount: number) {
const newMaxWorkers = Math.max(
Math.floor(this.maxWorkers.currentValue * MAX_WORKERS_DECREASE_PERCENTAGE),
1
);
const newPollInterval = Math.ceil(
this.pollInterval.currentValue * POLL_INTERVAL_INCREASE_PERCENTAGE
);
mikecote marked this conversation as resolved.
Show resolved Hide resolved
this.logger.info(
`Throughput reduced after seeing ${errorCount} error(s): maxWorkers: ${this.maxWorkers.currentValue}->${newMaxWorkers}, pollInterval: ${this.pollInterval.currentValue}->${newPollInterval}`
);
this.updateConfiguration(newMaxWorkers, newPollInterval);
}

private increaseThroughput() {
const newMaxWorkers = Math.min(
this.maxWorkers.startingValue,
Math.ceil(this.maxWorkers.currentValue * MAX_WORKERS_INCREASE_PERCENTAGE + 1)
);
const newPollInterval = Math.max(
this.pollInterval.startingValue,
Math.floor(this.pollInterval.currentValue * POLL_INTERVAL_DECREASE_PERCENTAGE - 1)
);
if (
newMaxWorkers !== this.maxWorkers.currentValue ||
newPollInterval !== this.pollInterval.currentValue
) {
this.logger.info(
`Throughput increasing after seeing no errors: maxWorkers: ${this.maxWorkers.currentValue}->${newMaxWorkers}, pollInterval: ${this.pollInterval.currentValue}->${newPollInterval}`
);
}
this.updateConfiguration(newMaxWorkers, newPollInterval);
}

private updateConfiguration(newMaxWorkers: number, newPollInterval: number) {
if (this.maxWorkers.currentValue !== newMaxWorkers) {
this.maxWorkers.currentValue = newMaxWorkers;
this.maxWorkers.observable$.next(newMaxWorkers);
}
if (this.pollInterval.currentValue !== newPollInterval) {
this.pollInterval.currentValue = newPollInterval;
this.pollInterval.observable$.next(newPollInterval);
}
mikecote marked this conversation as resolved.
Show resolved Hide resolved
}
}

function emitErrorCount(count: number) {
return {
tag: 'emit',
count,
};
}

function isEmitEvent(event: { tag: string; count: number }) {
return event.tag === 'emit';
}

function incementErrorCount(count: number) {
return {
tag: 'inc',
count: count + 1,
};
}

function resetErrorCount() {
return {
tag: 'initial',
count: 0,
};
}
14 changes: 14 additions & 0 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
ISavedObjectsRepository,
} from '../../../../src/core/server';
import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { ThroughputManager } from './lib/throughput_manager';
import { TaskManagerConfig } from './config';

import { Logger } from './types';
Expand Down Expand Up @@ -100,6 +101,7 @@ export class TaskManager {

private store: TaskStore;
private bufferedStore: BufferedTaskStore;
private throughputManager: ThroughputManager;

private logger: Logger;
private pool: TaskPool;
Expand Down Expand Up @@ -162,6 +164,15 @@ export class TaskManager {
maxWorkers$,
});

this.throughputManager = new ThroughputManager({
maxWorkers$,
pollInterval$,
startingMaxWorkers: opts.config.max_workers,
startingPollInterval: opts.config.poll_interval,
logger: this.logger,
errors$: this.store.errors$,
});

const {
max_poll_inactivity_cycles: maxPollInactivityCycles,
poll_interval: pollInterval,
Expand Down Expand Up @@ -242,6 +253,8 @@ export class TaskManager {
*/
public start() {
if (!this.isStarted) {
this.throughputManager.start();

// Some calls are waiting until task manager is started
this.startQueue.forEach((fn) => fn());
this.startQueue = [];
Expand Down Expand Up @@ -275,6 +288,7 @@ export class TaskManager {
if (this.isStarted) {
this.pollingSubscription.unsubscribe();
this.pool.cancelRunningTasks();
this.throughputManager.stop();
}
}

Expand Down