Skip to content

Commit

Permalink
time out if work overruns in poller
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Aug 13, 2020
1 parent cdc7d25 commit f4ab5ed
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 6 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ describe('config validation', () => {
"enabled": true,
"index": ".kibana_task_manager",
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"poll_interval": 3000,
"request_capacity": 1000,
Expand Down
6 changes: 6 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { schema, TypeOf } from '@kbn/config-schema';

export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;

export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: true }),
Expand All @@ -21,6 +22,11 @@ export const configSchema = schema.object({
defaultValue: DEFAULT_POLL_INTERVAL,
min: 100,
}),
/* How many poll interval cycles can work take before it's timed out. */
max_poll_inactivity_cycles: schema.number({
defaultValue: DEFAULT_MAX_POLL_INACTIVITY_CYCLES,
min: 1,
}),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { timeoutPromiseAfter } from './timeout_promise_after';

const delay = (ms: number, result: unknown) =>
new Promise((resolve) => setTimeout(() => resolve(result), ms));

const delayRejection = (ms: number, result: unknown) =>
new Promise((resolve, reject) => setTimeout(() => reject(result), ms));

describe('Promise Timeout', () => {
test('resolves when wrapped promise resolves', async () => {
return expect(timeoutPromiseAfter(delay(100, 'OK'), 1000)).resolves.toMatchInlineSnapshot(
`"OK"`
);
});

test('reject when wrapped promise rejects', async () => {
return expect(
timeoutPromiseAfter(delayRejection(100, 'ERR'), 1000)
).rejects.toMatchInlineSnapshot(`"ERR"`);
});

test('reject it the timeout elapses', async () => {
return expect(
timeoutPromiseAfter(delay(1000, 'OK'), 100, () => 'ERR')
).rejects.toMatchInlineSnapshot(`"ERR"`);
});
});
16 changes: 16 additions & 0 deletions x-pack/plugins/task_manager/server/lib/timeout_promise_after.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export function timeoutPromiseAfter<T, G>(
future: Promise<T>,
ms: number,
onTimeout: () => G
): Promise<T> {
return new Promise((resolve, reject) => {
setTimeout(() => reject(onTimeout()), ms);
future.then(resolve).catch(reject);
});
}
5 changes: 5 additions & 0 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ export class TaskManager {
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles,
});
}

Expand Down
65 changes: 64 additions & 1 deletion x-pack/plugins/task_manager/server/task_poller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { Subject } from 'rxjs';
import { Option, none, some } from 'fp-ts/lib/Option';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import { sleep, resolvable } from './test_utils';
import { sleep, resolvable, Resolvable } from './test_utils';
import { asOk, asErr } from './lib/result_type';

describe('TaskPoller', () => {
Expand Down Expand Up @@ -243,6 +243,7 @@ describe('TaskPoller', () => {
},
getCapacity: () => 5,
pollRequests$,
workTimeout: pollInterval * 5,
}).subscribe(handler);

pollRequests$.next(some('one'));
Expand Down Expand Up @@ -272,6 +273,68 @@ describe('TaskPoller', () => {
})
);

test(
'work times out whe nit exceeds a predefined amount of time',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
const workTimeout = pollInterval * 2;
const bufferCapacity = 2;

const handler = jest.fn();

type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
const pollRequests$ = new Subject<Option<ResolvableTupple>>();
createTaskPoller<[string, Resolvable], string[]>({
pollInterval,
bufferCapacity,
work: async (...resolvables) => {
await Promise.all(resolvables.map(([, future]) => future));
return resolvables.map(([name]) => name);
},
getCapacity: () => 5,
pollRequests$,
workTimeout,
}).subscribe(handler);

const one: ResolvableTupple = ['one', resolvable()];
pollRequests$.next(some(one));

// split these into two payloads
advance(pollInterval);

const two: ResolvableTupple = ['two', resolvable()];
const three: ResolvableTupple = ['three', resolvable()];
pollRequests$.next(some(two));
pollRequests$.next(some(three));

advance(workTimeout);
await sleep(workTimeout);

// one resolves too late!
one[1].resolve();

expect(handler).toHaveBeenCalledWith(
asErr(
new PollingError<string>(
'Failed to poll for work: Error: work has timed out',
PollingErrorType.WorkError,
none
)
)
);
expect(handler.mock.calls[0][0].error.type).toEqual(PollingErrorType.WorkError);

// two and three in time
two[1].resolve();
three[1].resolve();

advance(pollInterval);
await sleep(pollInterval);

expect(handler).toHaveBeenCalledWith(asOk(['two', 'three']));
})
);

test(
'returns an error when polling for work fails',
fakeSchedulers(async (advance) => {
Expand Down
16 changes: 12 additions & 4 deletions x-pack/plugins/task_manager/server/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
asErr,
promiseResult,
} from './lib/result_type';
import { timeoutPromiseAfter } from './lib/timeout_promise_after';

type WorkFn<T, H> = (...params: T[]) => Promise<H>;

Expand All @@ -34,6 +35,7 @@ interface Opts<T, H> {
getCapacity: () => number;
pollRequests$: Observable<Option<T>>;
work: WorkFn<T, H>;
workTimeout?: number;
}

/**
Expand All @@ -55,6 +57,7 @@ export function createTaskPoller<T, H>({
pollRequests$,
bufferCapacity,
work,
workTimeout,
}: Opts<T, H>): Observable<Result<H, PollingError<T>>> {
const hasCapacity = () => getCapacity() > 0;

Expand Down Expand Up @@ -89,11 +92,15 @@ export function createTaskPoller<T, H>({
concatMap(async (set: Set<T>) => {
closeSleepPerf();
return mapResult<H, Error, Result<H, PollingError<T>>>(
await promiseResult<H, Error>(work(...pullFromSet(set, getCapacity()))),
await promiseResult<H, Error>(
timeoutPromiseAfter<H, Error>(
work(...pullFromSet(set, getCapacity())),
workTimeout ?? pollInterval,
() => new Error(`work has timed out`)
)
),
(workResult) => asOk(workResult),
(err: Error) => {
return asPollingError<T>(err, PollingErrorType.WorkError);
}
(err: Error) => asPollingError<T>(err, PollingErrorType.WorkError)
);
}),
tap(openSleepPerf),
Expand Down Expand Up @@ -129,6 +136,7 @@ function pushOptionalIntoSet<T>(

export enum PollingErrorType {
WorkError,
WorkTimeout,
RequestCapacityReached,
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/test_utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export function mockLogger() {
};
}

interface Resolvable {
export interface Resolvable {
resolve: () => void;
}

Expand Down

0 comments on commit f4ab5ed

Please sign in to comment.