Skip to content

Commit

Permalink
[Task Manager] improves cancelation messaging in Task Manager (#64075)
Browse files Browse the repository at this point in the history
Instead of warning that a task isn't cancellable despite having been cancelled, we now do so as a debug warning.
We now warn when a task has expired and is about to be cancelled including when it expired and how long it ran for.
  • Loading branch information
gmmorris authored Apr 23, 2020
1 parent 58d5688 commit dc5fb63
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 10 deletions.
33 changes: 32 additions & 1 deletion x-pack/plugins/task_manager/server/task_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { TaskPool, TaskPoolRunResult } from './task_pool';
import { mockLogger, resolvable, sleep } from './test_utils';
import { asOk } from './lib/result_type';
import { SavedObjectsErrorHelpers } from '../../../../src/core/server';
import moment from 'moment';

describe('TaskPool', () => {
test('occupiedWorkers are a sum of running tasks', async () => {
Expand Down Expand Up @@ -190,14 +191,16 @@ describe('TaskPool', () => {
});

test('run cancels expired tasks prior to running new tasks', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 2,
logger: mockLogger(),
logger,
});

const expired = resolvable();
const shouldRun = sinon.spy(() => Promise.resolve());
const shouldNotRun = sinon.spy(() => Promise.resolve());
const now = new Date();
const result = await pool.run([
{
...mockTask(),
Expand All @@ -207,6 +210,16 @@ describe('TaskPool', () => {
await sleep(10);
return asOk({ state: {} });
},
get expiration() {
return now;
},
get startedAt() {
// 5 and a half minutes
return moment(now)
.subtract(5, 'm')
.subtract(30, 's')
.toDate();
},
cancel: shouldRun,
},
{
Expand All @@ -231,6 +244,10 @@ describe('TaskPool', () => {

expect(pool.occupiedWorkers).toEqual(2);
expect(pool.availableWorkers).toEqual(0);

expect(logger.warn).toHaveBeenCalledWith(
`Cancelling task TaskType "shooooo" as it expired at ${now.toISOString()} after running for 05m 30s (with timeout set at 5m).`
);
});

test('logs if cancellation errors', async () => {
Expand Down Expand Up @@ -285,6 +302,20 @@ describe('TaskPool', () => {
markTaskAsRunning: jest.fn(async () => true),
run: mockRun(),
toString: () => `TaskType "shooooo"`,
get expiration() {
return new Date();
},
get startedAt() {
return new Date();
},
get definition() {
return {
type: '',
title: '',
timeout: '5m',
createTaskRunner: jest.fn(),
};
},
};
}
});
21 changes: 20 additions & 1 deletion x-pack/plugins/task_manager/server/task_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
* This module contains the logic that ensures we don't run too many
* tasks at once in a given Kibana instance.
*/
import moment, { Duration } from 'moment';
import { performance } from 'perf_hooks';
import { padLeft } from 'lodash';
import { Logger } from './types';
import { TaskRunner } from './task_runner';
import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error';
Expand Down Expand Up @@ -148,7 +150,19 @@ export class TaskPool {
private cancelExpiredTasks() {
for (const task of this.running) {
if (task.isExpired) {
this.logger.debug(`Cancelling expired task ${task.toString()}.`);
this.logger.warn(
`Cancelling task ${task.toString()} as it expired at ${task.expiration.toISOString()}${
task.startedAt
? ` after running for ${durationAsString(
moment.duration(
moment(new Date())
.utc()
.diff(task.startedAt)
)
)}`
: ``
}${task.definition.timeout ? ` (with timeout set at ${task.definition.timeout})` : ``}.`
);
this.cancelTask(task);
}
}
Expand All @@ -169,3 +183,8 @@ function partitionListByCount<T>(list: T[], count: number): [T[], T[]] {
const listInCount = list.splice(0, count);
return [listInCount, list];
}

function durationAsString(duration: Duration): string {
const [m, s] = [duration.minutes(), duration.seconds()].map(value => padLeft(`${value}`, 2, '0'));
return `${m}m ${s}s`;
}
62 changes: 57 additions & 5 deletions x-pack/plugins/task_manager/server/task_runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { ConcreteTaskInstance, TaskStatus } from './task';
import { TaskManagerRunner } from './task_runner';
import { mockLogger } from './test_utils';
import { SavedObjectsErrorHelpers } from '../../../../src/core/server';
import moment from 'moment';

let fakeTimer: sinon.SinonFakeTimers;

Expand Down Expand Up @@ -113,6 +114,60 @@ describe('TaskManagerRunner', () => {
expect(instance.runAt.getTime()).toBeLessThanOrEqual(minutesFromNow(10).getTime());
});

test('expiration returns time after which timeout will have elapsed from start', async () => {
const now = moment();
const { runner } = testOpts({
instance: {
schedule: { interval: '10m' },
status: TaskStatus.Running,
startedAt: now.toDate(),
},
definitions: {
bar: {
timeout: `1m`,
createTaskRunner: () => ({
async run() {
return;
},
}),
},
},
});

await runner.run();

expect(runner.isExpired).toBe(false);
expect(runner.expiration).toEqual(now.add(1, 'm').toDate());
});

test('runDuration returns duration which has elapsed since start', async () => {
const now = moment()
.subtract(30, 's')
.toDate();
const { runner } = testOpts({
instance: {
schedule: { interval: '10m' },
status: TaskStatus.Running,
startedAt: now,
},
definitions: {
bar: {
timeout: `1m`,
createTaskRunner: () => ({
async run() {
return;
},
}),
},
},
});

await runner.run();

expect(runner.isExpired).toBe(false);
expect(runner.startedAt).toEqual(now);
});

test('reschedules tasks that return a runAt', async () => {
const runAt = minutesFromNow(_.random(1, 10));
const { runner, store } = testOpts({
Expand Down Expand Up @@ -208,7 +263,7 @@ describe('TaskManagerRunner', () => {
expect(logger.warn).not.toHaveBeenCalled();
});

test('warns if cancel is called on a non-cancellable task', async () => {
test('debug logs if cancel is called on a non-cancellable task', async () => {
const { runner, logger } = testOpts({
definitions: {
bar: {
Expand All @@ -223,10 +278,7 @@ describe('TaskManagerRunner', () => {
await runner.cancel();
await promise;

expect(logger.warn).toHaveBeenCalledTimes(1);
expect(logger.warn.mock.calls[0][0]).toMatchInlineSnapshot(
`"The task bar \\"foo\\" is not cancellable."`
);
expect(logger.debug).toHaveBeenCalledWith(`The task bar "foo" is not cancellable.`);
});

test('sets startedAt, status, attempts and retryAt when claiming a task', async () => {
Expand Down
23 changes: 20 additions & 3 deletions x-pack/plugins/task_manager/server/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ const EMPTY_RUN_RESULT: SuccessfulRunResult = {};

export interface TaskRunner {
isExpired: boolean;
expiration: Date;
startedAt: Date | null;
definition: TaskDefinition;
cancel: CancelFunction;
markTaskAsRunning: () => Promise<boolean>;
run: () => Promise<Result<SuccessfulRunResult, FailedRunResult>>;
Expand Down Expand Up @@ -129,11 +132,25 @@ export class TaskManagerRunner implements TaskRunner {
return this.definitions[this.taskType];
}

/**
* Gets the time at which this task will expire.
*/
public get expiration() {
return intervalFromDate(this.instance.startedAt!, this.definition.timeout)!;
}

/**
* Gets the duration of the current task run
*/
public get startedAt() {
return this.instance.startedAt;
}

/**
* Gets whether or not this task has run longer than its expiration setting allows.
*/
public get isExpired() {
return intervalFromDate(this.instance.startedAt!, this.definition.timeout)! < new Date();
return this.expiration < new Date();
}

/**
Expand Down Expand Up @@ -261,12 +278,12 @@ export class TaskManagerRunner implements TaskRunner {
*/
public async cancel() {
const { task } = this;
if (task && task.cancel) {
if (task?.cancel) {
this.task = undefined;
return task.cancel();
}

this.logger.warn(`The task ${this} is not cancellable.`);
this.logger.debug(`The task ${this} is not cancellable.`);
}

private validateResult(result?: RunResult | void): Result<SuccessfulRunResult, FailedRunResult> {
Expand Down

0 comments on commit dc5fb63

Please sign in to comment.