Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] [Task Manager] Correctly handle running tasks when calling RunNow and reduce flakiness in related tests (#73244) #74386

Merged
merged 1 commit into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions x-pack/plugins/alerts/server/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,18 @@ export class AlertsClient {
updateResult.scheduledTaskId &&
!isEqual(alertSavedObject.attributes.schedule, updateResult.schedule)
) {
this.taskManager.runNow(updateResult.scheduledTaskId).catch((err: Error) => {
this.logger.error(
`Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}`
);
});
this.taskManager
.runNow(updateResult.scheduledTaskId)
.then(() => {
this.logger.debug(
`Alert update has rescheduled the underlying task: ${updateResult.scheduledTaskId}`
);
})
.catch((err: Error) => {
this.logger.error(
`Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}`
);
});
}
})(),
]);
Expand Down
6 changes: 4 additions & 2 deletions x-pack/plugins/task_manager/server/task_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { Option } from 'fp-ts/lib/Option';

import { ConcreteTaskInstance } from './task';

import { Result, Err } from './lib/result_type';
Expand All @@ -22,7 +24,7 @@ export interface TaskEvent<T, E> {
}
export type TaskMarkRunning = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRun = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Option<ConcreteTaskInstance>>;
export type TaskRunRequest = TaskEvent<ConcreteTaskInstance, Error>;

export function asTaskMarkRunningEvent(
Expand All @@ -46,7 +48,7 @@ export function asTaskRunEvent(id: string, event: Result<ConcreteTaskInstance, E

export function asTaskClaimEvent(
id: string,
event: Result<ConcreteTaskInstance, Error>
event: Result<ConcreteTaskInstance, Option<ConcreteTaskInstance>>
): TaskClaim {
return {
id,
Expand Down
31 changes: 21 additions & 10 deletions x-pack/plugins/task_manager/server/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import _ from 'lodash';
import sinon from 'sinon';
import { Subject } from 'rxjs';
import { none } from 'fp-ts/lib/Option';

import {
asTaskMarkRunningEvent,
Expand Down Expand Up @@ -297,7 +298,9 @@ describe('TaskManager', () => {
events$.next(asTaskMarkRunningEvent(id, asOk(task)));
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));

return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});

test('rejects when the task mark as running fails', () => {
Expand All @@ -311,7 +314,9 @@ describe('TaskManager', () => {
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asErr(new Error('some thing gone wrong'))));

return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});

test('when a task claim fails we ensure the task exists', async () => {
Expand All @@ -321,7 +326,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it does not exist`)
Expand All @@ -337,7 +342,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
Expand All @@ -353,7 +358,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
Expand Down Expand Up @@ -386,9 +391,11 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(new Error('failed to claim'));
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]`
);

expect(getLifecycle).toHaveBeenCalledWith(id);
});
Expand All @@ -400,9 +407,11 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(new Error('failed to claim'));
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]`
);

expect(getLifecycle).toHaveBeenCalledWith(id);
});
Expand All @@ -424,7 +433,9 @@ describe('TaskManager', () => {

events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));

return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});
});
});
Expand Down
104 changes: 62 additions & 42 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import { filter } from 'rxjs/operators';
import { performance } from 'perf_hooks';

import { pipe } from 'fp-ts/lib/pipeable';
import { Option, some, map as mapOptional } from 'fp-ts/lib/Option';
import { Option, some, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';

import {
SavedObjectsSerializer,
ILegacyScopedClusterClient,
ISavedObjectsRepository,
} from '../../../../src/core/server';
import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { TaskManagerConfig } from './config';

import { Logger } from './types';
Expand Down Expand Up @@ -405,7 +406,9 @@ export async function claimAvailableTasks(

if (docs.length !== claimedTasks) {
logger.warn(
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
`[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${
docs.length
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
);
}
return docs;
Expand Down Expand Up @@ -437,48 +440,65 @@ export async function awaitTaskRunResult(
// listen for all events related to the current task
.pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId))
.subscribe((taskEvent: TaskLifecycleEvent) => {
either(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error) => {
if (isTaskClaimEvent(taskEvent)) {
mapErr(async (error: Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (isTaskRunRequestEvent(taskEvent)) {
return reject(
new Error(
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
)
);
} else if (isTaskClaimEvent(taskEvent)) {
reject(
map(
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
await promiseResult<TaskLifecycle, Error>(getLifecycle(taskId)),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return error;
},
() => error
)
);
return reject(
map(
await pipe(
error,
mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)),
getOrElse(() =>
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
promiseResult<TaskLifecycle, Error>(getLifecycle(taskId))
)
),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return new Error(
`Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")`
);
},
(getLifecycleError: Error) =>
new Error(
`Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}`
)
)
);
}, taskEvent.event);
} else {
either<ConcreteTaskInstance, Error | Option<ConcreteTaskInstance>>(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error | Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (isTaskRunRequestEvent(taskEvent)) {
return reject(
new Error(
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
)
);
}
return reject(new Error(`Failed to run task "${taskId}": ${error}`));
}
return reject(error);
}
);
);
}
});
});
}
Loading