Skip to content

Commit

Permalink
[Task Manager] Correctly handle running tasks when calling RunNow a…
Browse files Browse the repository at this point in the history
…nd reduce flakiness in related tests (#73244)

This PR addresses two issues which caused several tests to be flaky in TM.

When `runNow` was introduced to TM we added a pinned query which returned specific tasks by ID.
This query does not have the filter applied to it which causes task to return when they're already marked as `running` but we didn't address these correctly which caused flakyness in the tests.
This didn't cause a broken beahviour, but it did cause beahviour that was hard to reason about - we now address them correctly.

It seems that sometimes, especially if the ES queue is overworked, it can take some time for the update to the underlying task to be visible (we don't user `refresh:true` on purpose), so adding a wait for the index to refresh to make sure the task is updated in time for the next stage of the test.
  • Loading branch information
gmmorris committed Aug 5, 2020
1 parent 8656901 commit e295b41
Show file tree
Hide file tree
Showing 9 changed files with 316 additions and 90 deletions.
17 changes: 12 additions & 5 deletions x-pack/plugins/alerts/server/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,18 @@ export class AlertsClient {
updateResult.scheduledTaskId &&
!isEqual(alertSavedObject.attributes.schedule, updateResult.schedule)
) {
this.taskManager.runNow(updateResult.scheduledTaskId).catch((err: Error) => {
this.logger.error(
`Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}`
);
});
this.taskManager
.runNow(updateResult.scheduledTaskId)
.then(() => {
this.logger.debug(
`Alert update has rescheduled the underlying task: ${updateResult.scheduledTaskId}`
);
})
.catch((err: Error) => {
this.logger.error(
`Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}`
);
});
}
})(),
]);
Expand Down
6 changes: 4 additions & 2 deletions x-pack/plugins/task_manager/server/task_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { Option } from 'fp-ts/lib/Option';

import { ConcreteTaskInstance } from './task';

import { Result, Err } from './lib/result_type';
Expand All @@ -22,7 +24,7 @@ export interface TaskEvent<T, E> {
}
export type TaskMarkRunning = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRun = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Option<ConcreteTaskInstance>>;
export type TaskRunRequest = TaskEvent<ConcreteTaskInstance, Error>;

export function asTaskMarkRunningEvent(
Expand All @@ -46,7 +48,7 @@ export function asTaskRunEvent(id: string, event: Result<ConcreteTaskInstance, E

export function asTaskClaimEvent(
id: string,
event: Result<ConcreteTaskInstance, Error>
event: Result<ConcreteTaskInstance, Option<ConcreteTaskInstance>>
): TaskClaim {
return {
id,
Expand Down
31 changes: 21 additions & 10 deletions x-pack/plugins/task_manager/server/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import _ from 'lodash';
import sinon from 'sinon';
import { Subject } from 'rxjs';
import { none } from 'fp-ts/lib/Option';

import {
asTaskMarkRunningEvent,
Expand Down Expand Up @@ -297,7 +298,9 @@ describe('TaskManager', () => {
events$.next(asTaskMarkRunningEvent(id, asOk(task)));
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));

return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});

test('rejects when the task mark as running fails', () => {
Expand All @@ -311,7 +314,9 @@ describe('TaskManager', () => {
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asErr(new Error('some thing gone wrong'))));

return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});

test('when a task claim fails we ensure the task exists', async () => {
Expand All @@ -321,7 +326,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it does not exist`)
Expand All @@ -337,7 +342,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
Expand All @@ -353,7 +358,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
Expand Down Expand Up @@ -386,9 +391,11 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(new Error('failed to claim'));
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]`
);

expect(getLifecycle).toHaveBeenCalledWith(id);
});
Expand All @@ -400,9 +407,11 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(new Error('failed to claim'));
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]`
);

expect(getLifecycle).toHaveBeenCalledWith(id);
});
Expand All @@ -424,7 +433,9 @@ describe('TaskManager', () => {

events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));

return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});
});
});
Expand Down
104 changes: 62 additions & 42 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import { filter } from 'rxjs/operators';
import { performance } from 'perf_hooks';

import { pipe } from 'fp-ts/lib/pipeable';
import { Option, some, map as mapOptional } from 'fp-ts/lib/Option';
import { Option, some, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';

import {
SavedObjectsSerializer,
ILegacyScopedClusterClient,
ISavedObjectsRepository,
} from '../../../../src/core/server';
import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { TaskManagerConfig } from './config';

import { Logger } from './types';
Expand Down Expand Up @@ -405,7 +406,9 @@ export async function claimAvailableTasks(

if (docs.length !== claimedTasks) {
logger.warn(
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
`[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${
docs.length
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
);
}
return docs;
Expand Down Expand Up @@ -437,48 +440,65 @@ export async function awaitTaskRunResult(
// listen for all events related to the current task
.pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId))
.subscribe((taskEvent: TaskLifecycleEvent) => {
either(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error) => {
if (isTaskClaimEvent(taskEvent)) {
mapErr(async (error: Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (isTaskRunRequestEvent(taskEvent)) {
return reject(
new Error(
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
)
);
} else if (isTaskClaimEvent(taskEvent)) {
reject(
map(
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
await promiseResult<TaskLifecycle, Error>(getLifecycle(taskId)),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return error;
},
() => error
)
);
return reject(
map(
await pipe(
error,
mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)),
getOrElse(() =>
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
promiseResult<TaskLifecycle, Error>(getLifecycle(taskId))
)
),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return new Error(
`Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")`
);
},
(getLifecycleError: Error) =>
new Error(
`Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}`
)
)
);
}, taskEvent.event);
} else {
either<ConcreteTaskInstance, Error | Option<ConcreteTaskInstance>>(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error | Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (isTaskRunRequestEvent(taskEvent)) {
return reject(
new Error(
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
)
);
}
return reject(new Error(`Failed to run task "${taskId}": ${error}`));
}
return reject(error);
}
);
);
}
});
});
}
Loading

0 comments on commit e295b41

Please sign in to comment.