Skip to content

Commit

Permalink
[Task Manager] time out work when it overruns in poller (elastic#74980)
Browse files Browse the repository at this point in the history
If the work performed by the poller hangs, meaning the promise fails to resolve/reject, then the poller can get stuck in a mode where it just waits for ever and no longer polls for fresh work.
This PR introduces a timeout after which the poller will automatically reject the work, freeing the poller to restart pulling fresh work.
  • Loading branch information
gmmorris committed Aug 18, 2020
1 parent 02d92cd commit b763ea6
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 6 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM

- `max_attempts` - The maximum number of times a task will be attempted before being abandoned as failed
- `poll_interval` - How often the background worker should check the task_manager index for more work
- `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state.
- `index` - The name of the index that the task_manager
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
- `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security.
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ describe('config validation', () => {
"enabled": true,
"index": ".kibana_task_manager",
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"poll_interval": 3000,
"request_capacity": 1000,
Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { schema, TypeOf } from '@kbn/config-schema';

export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;

export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: true }),
Expand All @@ -21,6 +22,11 @@ export const configSchema = schema.object({
defaultValue: DEFAULT_POLL_INTERVAL,
min: 100,
}),
/* How many poll interval cycles can work take before it's timed out. */
max_poll_inactivity_cycles: schema.number({
defaultValue: DEFAULT_MAX_POLL_INACTIVITY_CYCLES,
min: 1,
}),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { timeoutPromiseAfter } from './timeout_promise_after';

const delay = (ms: number, result: unknown) =>
new Promise((resolve) => setTimeout(() => resolve(result), ms));

const delayRejection = (ms: number, result: unknown) =>
new Promise((resolve, reject) => setTimeout(() => reject(result), ms));

describe('Promise Timeout', () => {
test('resolves when wrapped promise resolves', async () => {
return expect(
timeoutPromiseAfter(delay(100, 'OK'), 1000, () => 'TIMEOUT ERR')
).resolves.toMatchInlineSnapshot(`"OK"`);
});

test('reject when wrapped promise rejects', async () => {
return expect(
timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000, () => 'TIMEOUT ERR')
).rejects.toMatchInlineSnapshot(`"ERR"`);
});

test('reject it the timeout elapses', async () => {
return expect(
timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'TIMEOUT ERR')
).rejects.toMatchInlineSnapshot(`"TIMEOUT ERR"`);
});
});
16 changes: 16 additions & 0 deletions x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export function timeoutPromiseAfter<T, G>(
future: Promise<T>,
ms: number,
onTimeout: () => G
): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(() => reject(onTimeout()), ms);
future.then(resolve).catch(reject);
});
}
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ describe('TaskManager', () => {
index: 'foo',
max_attempts: 9,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
};
const taskManagerOpts = {
Expand Down
5 changes: 5 additions & 0 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ export class TaskManager {
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles,
});
}

Expand Down
65 changes: 64 additions & 1 deletion x-pack/plugins/task_manager/server/task_poller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Subject } from 'rxjs';
import { Option, none, some } from 'fp-ts/lib/Option';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import { sleep, resolvable } from './test_utils';
import { sleep, resolvable, Resolvable } from './test_utils';
import { asOk, asErr } from './lib/result_type';

describe('TaskPoller', () => {
Expand Down Expand Up @@ -243,6 +243,7 @@ describe('TaskPoller', () => {
},
getCapacity: () => 5,
pollRequests$,
workTimeout: pollInterval * 5,
}).subscribe(handler);

pollRequests$.next(some('one'));
Expand Down Expand Up @@ -272,6 +273,68 @@ describe('TaskPoller', () => {
})
);

test(
'work times out whe nit exceeds a predefined amount of time',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
const workTimeout = pollInterval * 2;
const bufferCapacity = 2;

const handler = jest.fn();

type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
const pollRequests$ = new Subject<Option<ResolvableTupple>>();
createTaskPoller<[string, Resolvable], string[]>({
pollInterval,
bufferCapacity,
work: async (...resolvables) => {
await Promise.all(resolvables.map(([, future]) => future));
return resolvables.map(([name]) => name);
},
getCapacity: () => 5,
pollRequests$,
workTimeout,
}).subscribe(handler);

const one: ResolvableTupple = ['one', resolvable()];
pollRequests$.next(some(one));

// split these into two payloads
advance(pollInterval);

const two: ResolvableTupple = ['two', resolvable()];
const three: ResolvableTupple = ['three', resolvable()];
pollRequests$.next(some(two));
pollRequests$.next(some(three));

advance(workTimeout);
await sleep(workTimeout);

// one resolves too late!
one[1].resolve();

expect(handler).toHaveBeenCalledWith(
asErr(
new PollingError<string>(
'Failed to poll for work: Error: work has timed out',
PollingErrorType.WorkError,
none
)
)
);
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);

// two and three in time
two[1].resolve();
three[1].resolve();

advance(pollInterval);
await sleep(pollInterval);

expect(handler).toHaveBeenCalledWith(asOk(['two', 'three']));
})
);

test(
'returns an error when polling for work fails',
fakeSchedulers(async (advance) => {
Expand Down
16 changes: 12 additions & 4 deletions x-pack/plugins/task_manager/server/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
asErr,
promiseResult,
} from './lib/result_type';
import { timeoutPromiseAfter } from './lib/timeout_promise_after';

type WorkFn<T, H> = (...params: T[]) => Promise<H>;

Expand All @@ -34,6 +35,7 @@ interface Opts<T, H> {
getCapacity: () => number;
pollRequests$: Observable<Option<T>>;
work: WorkFn<T, H>;
workTimeout?: number;
}

/**
Expand All @@ -55,6 +57,7 @@ export function createTaskPoller<T, H>({
pollRequests$,
bufferCapacity,
work,
workTimeout,
}: Opts<T, H>): Observable<Result<H, PollingError<T>>> {
const hasCapacity = () => getCapacity() > 0;

Expand Down Expand Up @@ -89,11 +92,15 @@ export function createTaskPoller<T, H>({
concatMap(async (set: Set<T>) => {
closeSleepPerf();
return mapResult<H, Error, Result<H, PollingError<T>>>(
await promiseResult<H, Error>(work(...pullFromSet(set, getCapacity()))),
await promiseResult<H, Error>(
timeoutPromiseAfter<H, Error>(
work(...pullFromSet(set, getCapacity())),
workTimeout ?? pollInterval,
() => new Error(`work has timed out`)
)
),
(workResult) => asOk(workResult),
(err: Error) => {
return asPollingError<T>(err, PollingErrorType.WorkError);
}
(err: Error) => asPollingError<T>(err, PollingErrorType.WorkError)
);
}),
tap(openSleepPerf),
Expand Down Expand Up @@ -129,6 +136,7 @@ function pushOptionalIntoSet<T>(

export enum PollingErrorType {
WorkError,
WorkTimeout,
RequestCapacityReached,
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/test_utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export function mockLogger() {
};
}

interface Resolvable {
export interface Resolvable {
resolve: () => void;
}

Expand Down

0 comments on commit b763ea6

Please sign in to comment.