Skip to content

Commit

Permalink
Merge branch 'master' of github.com:elastic/kibana into implement/ci-…
Browse files Browse the repository at this point in the history
…for-plugin-readmes
  • Loading branch information
spalger committed Aug 5, 2020
2 parents 9c979db + 4150a23 commit ec39440
Show file tree
Hide file tree
Showing 10 changed files with 325 additions and 95 deletions.
17 changes: 12 additions & 5 deletions x-pack/plugins/alerts/server/alerts_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,18 @@ export class AlertsClient {
updateResult.scheduledTaskId &&
!isEqual(alertSavedObject.attributes.schedule, updateResult.schedule)
) {
this.taskManager.runNow(updateResult.scheduledTaskId).catch((err: Error) => {
this.logger.error(
`Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}`
);
});
this.taskManager
.runNow(updateResult.scheduledTaskId)
.then(() => {
this.logger.debug(
`Alert update has rescheduled the underlying task: ${updateResult.scheduledTaskId}`
);
})
.catch((err: Error) => {
this.logger.error(
`Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}`
);
});
}
})(),
]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { i18n } from '@kbn/i18n';

import {
Direction,
EuiButtonEmpty,
EuiButton,
EuiCallOut,
EuiEmptyPrompt,
EuiFlexGroup,
Expand Down Expand Up @@ -147,25 +147,29 @@ export const DataFrameAnalyticsList: FC<Props> = ({
return (
<>
<EuiEmptyPrompt
iconType="createAdvancedJob"
title={
<h2>
{i18n.translate('xpack.ml.dataFrame.analyticsList.emptyPromptTitle', {
defaultMessage: 'No data frame analytics jobs found',
defaultMessage: 'Create your first data frame analytics job',
})}
</h2>
}
actions={
!isManagementTable
? [
<EuiButtonEmpty
<EuiButton
onClick={() => setIsSourceIndexModalVisible(true)}
isDisabled={disabled}
color="primary"
iconType="plusInCircle"
fill
data-test-subj="mlAnalyticsCreateFirstButton"
>
{i18n.translate('xpack.ml.dataFrame.analyticsList.emptyPromptButtonText', {
defaultMessage: 'Create your first data frame analytics job',
defaultMessage: 'Create job',
})}
</EuiButtonEmpty>,
</EuiButton>,
]
: []
}
Expand Down
6 changes: 4 additions & 2 deletions x-pack/plugins/task_manager/server/task_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { Option } from 'fp-ts/lib/Option';

import { ConcreteTaskInstance } from './task';

import { Result, Err } from './lib/result_type';
Expand All @@ -22,7 +24,7 @@ export interface TaskEvent<T, E> {
}
export type TaskMarkRunning = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRun = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Option<ConcreteTaskInstance>>;
export type TaskRunRequest = TaskEvent<ConcreteTaskInstance, Error>;

export function asTaskMarkRunningEvent(
Expand All @@ -46,7 +48,7 @@ export function asTaskRunEvent(id: string, event: Result<ConcreteTaskInstance, E

export function asTaskClaimEvent(
id: string,
event: Result<ConcreteTaskInstance, Error>
event: Result<ConcreteTaskInstance, Option<ConcreteTaskInstance>>
): TaskClaim {
return {
id,
Expand Down
31 changes: 21 additions & 10 deletions x-pack/plugins/task_manager/server/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import _ from 'lodash';
import sinon from 'sinon';
import { Subject } from 'rxjs';
import { none } from 'fp-ts/lib/Option';

import {
asTaskMarkRunningEvent,
Expand Down Expand Up @@ -297,7 +298,9 @@ describe('TaskManager', () => {
events$.next(asTaskMarkRunningEvent(id, asOk(task)));
events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));

return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});

test('rejects when the task mark as running fails', () => {
Expand All @@ -311,7 +314,9 @@ describe('TaskManager', () => {
events$.next(asTaskClaimEvent(id, asOk(task)));
events$.next(asTaskMarkRunningEvent(id, asErr(new Error('some thing gone wrong'))));

return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});

test('when a task claim fails we ensure the task exists', async () => {
Expand All @@ -321,7 +326,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it does not exist`)
Expand All @@ -337,7 +342,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
Expand All @@ -353,7 +358,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
Expand Down Expand Up @@ -386,9 +391,11 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(new Error('failed to claim'));
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]`
);

expect(getLifecycle).toHaveBeenCalledWith(id);
});
Expand All @@ -400,9 +407,11 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(new Error('failed to claim'));
await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]`
);

expect(getLifecycle).toHaveBeenCalledWith(id);
});
Expand All @@ -424,7 +433,9 @@ describe('TaskManager', () => {

events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong'))));

return expect(result).rejects.toEqual(new Error('some thing gone wrong'));
return expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]`
);
});
});
});
Expand Down
104 changes: 62 additions & 42 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import { filter } from 'rxjs/operators';
import { performance } from 'perf_hooks';

import { pipe } from 'fp-ts/lib/pipeable';
import { Option, some, map as mapOptional } from 'fp-ts/lib/Option';
import { Option, some, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';

import {
SavedObjectsSerializer,
ILegacyScopedClusterClient,
ISavedObjectsRepository,
} from '../../../../src/core/server';
import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { TaskManagerConfig } from './config';

import { Logger } from './types';
Expand Down Expand Up @@ -405,7 +406,9 @@ export async function claimAvailableTasks(

if (docs.length !== claimedTasks) {
logger.warn(
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
`[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${
docs.length
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
);
}
return docs;
Expand Down Expand Up @@ -437,48 +440,65 @@ export async function awaitTaskRunResult(
// listen for all events related to the current task
.pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId))
.subscribe((taskEvent: TaskLifecycleEvent) => {
either(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error) => {
if (isTaskClaimEvent(taskEvent)) {
mapErr(async (error: Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (isTaskRunRequestEvent(taskEvent)) {
return reject(
new Error(
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
)
);
} else if (isTaskClaimEvent(taskEvent)) {
reject(
map(
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
await promiseResult<TaskLifecycle, Error>(getLifecycle(taskId)),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return error;
},
() => error
)
);
return reject(
map(
await pipe(
error,
mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)),
getOrElse(() =>
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
promiseResult<TaskLifecycle, Error>(getLifecycle(taskId))
)
),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return new Error(
`Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")`
);
},
(getLifecycleError: Error) =>
new Error(
`Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}`
)
)
);
}, taskEvent.event);
} else {
either<ConcreteTaskInstance, Error | Option<ConcreteTaskInstance>>(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error | Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (isTaskRunRequestEvent(taskEvent)) {
return reject(
new Error(
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
)
);
}
return reject(new Error(`Failed to run task "${taskId}": ${error}`));
}
return reject(error);
}
);
);
}
});
});
}
Loading

0 comments on commit ec39440

Please sign in to comment.