diff --git a/x-pack/plugins/alerts/server/alerts_client.ts b/x-pack/plugins/alerts/server/alerts_client.ts index eec60f924bf3..2dfca37bf5d0 100644 --- a/x-pack/plugins/alerts/server/alerts_client.ts +++ b/x-pack/plugins/alerts/server/alerts_client.ts @@ -386,11 +386,18 @@ export class AlertsClient { updateResult.scheduledTaskId && !isEqual(alertSavedObject.attributes.schedule, updateResult.schedule) ) { - this.taskManager.runNow(updateResult.scheduledTaskId).catch((err: Error) => { - this.logger.error( - `Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}` - ); - }); + this.taskManager + .runNow(updateResult.scheduledTaskId) + .then(() => { + this.logger.debug( + `Alert update has rescheduled the underlying task: ${updateResult.scheduledTaskId}` + ); + }) + .catch((err: Error) => { + this.logger.error( + `Alert update failed to run its underlying task. TaskManager runNow failed with Error: ${err.message}` + ); + }); } })(), ]); diff --git a/x-pack/plugins/task_manager/server/task_events.ts b/x-pack/plugins/task_manager/server/task_events.ts index b17a3636c173..e1dd85f868cd 100644 --- a/x-pack/plugins/task_manager/server/task_events.ts +++ b/x-pack/plugins/task_manager/server/task_events.ts @@ -4,6 +4,8 @@ * you may not use this file except in compliance with the Elastic License. */ +import { Option } from 'fp-ts/lib/Option'; + import { ConcreteTaskInstance } from './task'; import { Result, Err } from './lib/result_type'; @@ -22,7 +24,7 @@ export interface TaskEvent { } export type TaskMarkRunning = TaskEvent; export type TaskRun = TaskEvent; -export type TaskClaim = TaskEvent; +export type TaskClaim = TaskEvent>; export type TaskRunRequest = TaskEvent; export function asTaskMarkRunningEvent( @@ -46,7 +48,7 @@ export function asTaskRunEvent(id: string, event: Result + event: Result> ): TaskClaim { return { id, diff --git a/x-pack/plugins/task_manager/server/task_manager.test.ts b/x-pack/plugins/task_manager/server/task_manager.test.ts index 80215ffa7abb..7035971ad606 100644 --- a/x-pack/plugins/task_manager/server/task_manager.test.ts +++ b/x-pack/plugins/task_manager/server/task_manager.test.ts @@ -7,6 +7,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import { Subject } from 'rxjs'; +import { none } from 'fp-ts/lib/Option'; import { asTaskMarkRunningEvent, @@ -297,7 +298,9 @@ describe('TaskManager', () => { events$.next(asTaskMarkRunningEvent(id, asOk(task))); events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong')))); - return expect(result).rejects.toEqual(new Error('some thing gone wrong')); + return expect(result).rejects.toMatchInlineSnapshot( + `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]` + ); }); test('rejects when the task mark as running fails', () => { @@ -311,7 +314,9 @@ describe('TaskManager', () => { events$.next(asTaskClaimEvent(id, asOk(task))); events$.next(asTaskMarkRunningEvent(id, asErr(new Error('some thing gone wrong')))); - return expect(result).rejects.toEqual(new Error('some thing gone wrong')); + return expect(result).rejects.toMatchInlineSnapshot( + `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]` + ); }); test('when a task claim fails we ensure the task exists', async () => { @@ -321,7 +326,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toEqual( new Error(`Failed to run task "${id}" as it does not exist`) @@ -337,7 +342,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toEqual( new Error(`Failed to run task "${id}" as it is currently running`) @@ -353,7 +358,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toEqual( new Error(`Failed to run task "${id}" as it is currently running`) @@ -386,9 +391,11 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); - await expect(result).rejects.toEqual(new Error('failed to claim')); + await expect(result).rejects.toMatchInlineSnapshot( + `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]` + ); expect(getLifecycle).toHaveBeenCalledWith(id); }); @@ -400,9 +407,11 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); - await expect(result).rejects.toEqual(new Error('failed to claim')); + await expect(result).rejects.toMatchInlineSnapshot( + `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]` + ); expect(getLifecycle).toHaveBeenCalledWith(id); }); @@ -424,7 +433,9 @@ describe('TaskManager', () => { events$.next(asTaskRunEvent(id, asErr(new Error('some thing gone wrong')))); - return expect(result).rejects.toEqual(new Error('some thing gone wrong')); + return expect(result).rejects.toMatchInlineSnapshot( + `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2": Error: some thing gone wrong]` + ); }); }); }); diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 35ca439bb913..7165fd28678c 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -9,13 +9,14 @@ import { filter } from 'rxjs/operators'; import { performance } from 'perf_hooks'; import { pipe } from 'fp-ts/lib/pipeable'; -import { Option, some, map as mapOptional } from 'fp-ts/lib/Option'; +import { Option, some, map as mapOptional, getOrElse } from 'fp-ts/lib/Option'; + import { SavedObjectsSerializer, ILegacyScopedClusterClient, ISavedObjectsRepository, } from '../../../../src/core/server'; -import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type'; +import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type'; import { TaskManagerConfig } from './config'; import { Logger } from './types'; @@ -405,7 +406,9 @@ export async function claimAvailableTasks( if (docs.length !== claimedTasks) { logger.warn( - `[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched` + `[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${ + docs.length + } task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})` ); } return docs; @@ -437,48 +440,65 @@ export async function awaitTaskRunResult( // listen for all events related to the current task .pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId)) .subscribe((taskEvent: TaskLifecycleEvent) => { - either( - taskEvent.event, - (taskInstance: ConcreteTaskInstance) => { - // resolve if the task has run sucessfully - if (isTaskRunEvent(taskEvent)) { - subscription.unsubscribe(); - resolve({ id: taskInstance.id }); - } - }, - async (error: Error) => { + if (isTaskClaimEvent(taskEvent)) { + mapErr(async (error: Option) => { // reject if any error event takes place for the requested task subscription.unsubscribe(); - if (isTaskRunRequestEvent(taskEvent)) { - return reject( - new Error( - `Failed to run task "${taskId}" as Task Manager is at capacity, please try again later` - ) - ); - } else if (isTaskClaimEvent(taskEvent)) { - reject( - map( - // if the error happened in the Claim phase - we try to provide better insight - // into why we failed to claim by getting the task's current lifecycle status - await promiseResult(getLifecycle(taskId)), - (taskLifecycleStatus: TaskLifecycle) => { - if (taskLifecycleStatus === TaskLifecycleResult.NotFound) { - return new Error(`Failed to run task "${taskId}" as it does not exist`); - } else if ( - taskLifecycleStatus === TaskStatus.Running || - taskLifecycleStatus === TaskStatus.Claiming - ) { - return new Error(`Failed to run task "${taskId}" as it is currently running`); - } - return error; - }, - () => error - ) - ); + return reject( + map( + await pipe( + error, + mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)), + getOrElse(() => + // if the error happened in the Claim phase - we try to provide better insight + // into why we failed to claim by getting the task's current lifecycle status + promiseResult(getLifecycle(taskId)) + ) + ), + (taskLifecycleStatus: TaskLifecycle) => { + if (taskLifecycleStatus === TaskLifecycleResult.NotFound) { + return new Error(`Failed to run task "${taskId}" as it does not exist`); + } else if ( + taskLifecycleStatus === TaskStatus.Running || + taskLifecycleStatus === TaskStatus.Claiming + ) { + return new Error(`Failed to run task "${taskId}" as it is currently running`); + } + return new Error( + `Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")` + ); + }, + (getLifecycleError: Error) => + new Error( + `Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}` + ) + ) + ); + }, taskEvent.event); + } else { + either>( + taskEvent.event, + (taskInstance: ConcreteTaskInstance) => { + // resolve if the task has run sucessfully + if (isTaskRunEvent(taskEvent)) { + subscription.unsubscribe(); + resolve({ id: taskInstance.id }); + } + }, + async (error: Error | Option) => { + // reject if any error event takes place for the requested task + subscription.unsubscribe(); + if (isTaskRunRequestEvent(taskEvent)) { + return reject( + new Error( + `Failed to run task "${taskId}" as Task Manager is at capacity, please try again later` + ) + ); + } + return reject(new Error(`Failed to run task "${taskId}": ${error}`)); } - return reject(error); - } - ); + ); + } }); }); } diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 771b4e2d7d9c..d65c39f4f454 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -8,6 +8,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import uuid from 'uuid'; import { filter } from 'rxjs/operators'; +import { Option, some, none } from 'fp-ts/lib/Option'; import { TaskDictionary, @@ -972,7 +973,7 @@ if (doc['task.runAt'].size()!=0) { const runAt = new Date(); const tasks = [ { - _id: 'aaa', + _id: 'claimed-by-id', _source: { type: 'task', task: { @@ -980,7 +981,7 @@ if (doc['task.runAt'].size()!=0) { taskType: 'foo', schedule: undefined, attempts: 0, - status: 'idle', + status: 'claiming', params: '{ "hello": "world" }', state: '{ "baby": "Henhen" }', user: 'jimbo', @@ -996,7 +997,31 @@ if (doc['task.runAt'].size()!=0) { sort: ['a', 1], }, { - _id: 'bbb', + _id: 'claimed-by-schedule', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + schedule: { interval: '5m' }, + attempts: 2, + status: 'claiming', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + ownerId: taskManagerId, + startedAt: null, + retryAt: null, + scheduledAt: new Date(), + }, + }, + _seq_no: 3, + _primary_term: 4, + sort: ['b', 2], + }, + { + _id: 'already-running', _source: { type: 'task', task: { @@ -1045,19 +1070,24 @@ if (doc['task.runAt'].size()!=0) { }); const sub = store.events - .pipe(filter((event: TaskEvent) => event.id === 'aaa')) + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'claimed-by-id' + ) + ) .subscribe({ - next: (event: TaskEvent) => { + next: (event: TaskEvent>) => { expect(event).toMatchObject( asTaskClaimEvent( - 'aaa', + 'claimed-by-id', asOk({ - id: 'aaa', + id: 'claimed-by-id', runAt, taskType: 'foo', schedule: undefined, attempts: 0, - status: 'idle' as TaskStatus, + status: 'claiming' as TaskStatus, params: { hello: 'world' }, state: { baby: 'Henhen' }, user: 'jimbo', @@ -1075,7 +1105,7 @@ if (doc['task.runAt'].size()!=0) { }); await store.claimAvailableTasks({ - claimTasksById: ['aaa'], + claimTasksById: ['claimed-by-id'], claimOwnershipUntil: new Date(), size: 10, }); @@ -1102,19 +1132,24 @@ if (doc['task.runAt'].size()!=0) { }); const sub = store.events - .pipe(filter((event: TaskEvent) => event.id === 'bbb')) + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'claimed-by-schedule' + ) + ) .subscribe({ - next: (event: TaskEvent) => { + next: (event: TaskEvent>) => { expect(event).toMatchObject( asTaskClaimEvent( - 'bbb', + 'claimed-by-schedule', asOk({ - id: 'bbb', + id: 'claimed-by-schedule', runAt, taskType: 'bar', schedule: { interval: '5m' }, attempts: 2, - status: 'running' as TaskStatus, + status: 'claiming' as TaskStatus, params: { shazm: 1 }, state: { henry: 'The 8th' }, user: 'dabo', @@ -1132,14 +1167,14 @@ if (doc['task.runAt'].size()!=0) { }); await store.claimAvailableTasks({ - claimTasksById: ['aaa'], + claimTasksById: ['claimed-by-id'], claimOwnershipUntil: new Date(), size: 10, }); }); test('emits an event when the store fails to claim a required task by id', async (done) => { - const { taskManagerId, tasks } = generateTasks(); + const { taskManagerId, runAt, tasks } = generateTasks(); const callCluster = sinon.spy(async (name: string, params?: unknown) => name === 'updateByQuery' ? { @@ -1159,11 +1194,36 @@ if (doc['task.runAt'].size()!=0) { }); const sub = store.events - .pipe(filter((event: TaskEvent) => event.id === 'ccc')) + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'already-running' + ) + ) .subscribe({ - next: (event: TaskEvent) => { + next: (event: TaskEvent>) => { expect(event).toMatchObject( - asTaskClaimEvent('ccc', asErr(new Error(`failed to claim task 'ccc'`))) + asTaskClaimEvent( + 'already-running', + asErr( + some({ + id: 'already-running', + runAt, + taskType: 'bar', + schedule: { interval: '5m' }, + attempts: 2, + status: 'running' as TaskStatus, + params: { shazm: 1 }, + state: { henry: 'The 8th' }, + user: 'dabo', + scope: ['reporting', 'ceo'], + ownerId: taskManagerId, + startedAt: null, + retryAt: null, + scheduledAt: new Date(), + }) + ) + ) ); sub.unsubscribe(); done(); @@ -1171,7 +1231,49 @@ if (doc['task.runAt'].size()!=0) { }); await store.claimAvailableTasks({ - claimTasksById: ['ccc'], + claimTasksById: ['already-running'], + claimOwnershipUntil: new Date(), + size: 10, + }); + }); + + test('emits an event when the store fails to find a task which was required by id', async (done) => { + const { taskManagerId, tasks } = generateTasks(); + const callCluster = sinon.spy(async (name: string, params?: unknown) => + name === 'updateByQuery' + ? { + total: tasks.length, + updated: tasks.length, + } + : { hits: { hits: tasks } } + ); + const store = new TaskStore({ + callCluster, + maxAttempts: 2, + definitions: taskDefinitions, + serializer, + savedObjectsRepository: savedObjectsClient, + taskManagerId, + index: '', + }); + + const sub = store.events + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'unknown-task' + ) + ) + .subscribe({ + next: (event: TaskEvent>) => { + expect(event).toMatchObject(asTaskClaimEvent('unknown-task', asErr(none))); + sub.unsubscribe(); + done(); + }, + }); + + await store.claimAvailableTasks({ + claimTasksById: ['unknown-task'], claimOwnershipUntil: new Date(), size: 10, }); diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 7ec3db5c99aa..44b2d892f4c0 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -9,7 +9,9 @@ */ import apm from 'elastic-apm-node'; import { Subject, Observable } from 'rxjs'; -import { omit, difference, defaults } from 'lodash'; +import { omit, difference, partition, map, defaults } from 'lodash'; + +import { some, none } from 'fp-ts/lib/Option'; import { SearchResponse, UpdateDocumentByQueryResponse } from 'elasticsearch'; import { @@ -31,6 +33,7 @@ import { TaskLifecycle, TaskLifecycleResult, SerializedConcreteTaskInstance, + TaskStatus, } from './task'; import { TaskClaim, asTaskClaimEvent } from './task_events'; @@ -221,13 +224,35 @@ export class TaskStore { // emit success/fail events for claimed tasks by id if (claimTasksById && claimTasksById.length) { - this.emitEvents(docs.map((doc) => asTaskClaimEvent(doc.id, asOk(doc)))); + const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) => + claimTasksById.includes(doc.id) + ); + + const [documentsClaimedById, documentsRequestedButNotClaimed] = partition( + documentsReturnedById, + // we filter the schduled tasks down by status is 'claiming' in the esearch, + // but we do not apply this limitation on tasks claimed by ID so that we can + // provide more detailed error messages when we fail to claim them + (doc) => doc.status === TaskStatus.Claiming + ); + + const documentsRequestedButNotReturned = difference( + claimTasksById, + map(documentsReturnedById, 'id') + ); + + this.emitEvents( + [...documentsClaimedById, ...documentsClaimedBySchedule].map((doc) => + asTaskClaimEvent(doc.id, asOk(doc)) + ) + ); + + this.emitEvents( + documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))) + ); this.emitEvents( - difference( - claimTasksById, - docs.map((doc) => doc.id) - ).map((id) => asTaskClaimEvent(id, asErr(new Error(`failed to claim task '${id}'`)))) + documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))) ); } diff --git a/x-pack/test/alerting_api_integration/common/fixtures/plugins/task_manager_fixture/server/plugin.ts b/x-pack/test/alerting_api_integration/common/fixtures/plugins/task_manager_fixture/server/plugin.ts index 18fdd5f9c3ac..0833dd042589 100644 --- a/x-pack/test/alerting_api_integration/common/fixtures/plugins/task_manager_fixture/server/plugin.ts +++ b/x-pack/test/alerting_api_integration/common/fixtures/plugins/task_manager_fixture/server/plugin.ts @@ -51,7 +51,8 @@ export class SampleTaskManagerFixturePlugin .toPromise(); public setup(core: CoreSetup) { - core.http.createRouter().get( + const router = core.http.createRouter(); + router.get( { path: '/api/alerting_tasks/{taskId}', validate: { @@ -77,6 +78,23 @@ export class SampleTaskManagerFixturePlugin } } ); + + router.get( + { + path: `/api/ensure_tasks_index_refreshed`, + validate: {}, + }, + async function ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise> { + await core.elasticsearch.legacy.client.callAsInternalUser('indices.refresh', { + index: '.kibana_task_manager', + }); + return res.ok({ body: {} }); + } + ); } public start(core: CoreStart, { taskManager }: SampleTaskManagerFixtureStartDeps) { diff --git a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts index f35d6baac8f5..266e66b5a1a4 100644 --- a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts +++ b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts @@ -223,6 +223,21 @@ export function initRoutes( } ); + router.get( + { + path: `/api/ensure_tasks_index_refreshed`, + validate: {}, + }, + async function ( + context: RequestHandlerContext, + req: KibanaRequest, + res: KibanaResponseFactory + ): Promise> { + await ensureIndexIsRefreshed(); + return res.ok({ body: {} }); + } + ); + router.delete( { path: `/api/sample_tasks`, diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index 165e79fb311e..c87a5039360b 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -28,8 +28,7 @@ export default function ({ getService }) { const testHistoryIndex = '.kibana_task_manager_test_result'; const supertest = supertestAsPromised(url.format(config.get('servers.kibana'))); - // FLAKY: https://github.com/elastic/kibana/issues/71390 - describe.skip('scheduling and running tasks', () => { + describe('scheduling and running tasks', () => { beforeEach( async () => await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200) ); @@ -69,6 +68,14 @@ export default function ({ getService }) { .then((response) => response.body); } + function ensureTasksIndexRefreshed() { + return supertest + .get(`/api/ensure_tasks_index_refreshed`) + .send({}) + .expect(200) + .then((response) => response.body); + } + function historyDocs(taskId) { return es .search({ @@ -404,7 +411,7 @@ export default function ({ getService }) { const originalTask = await scheduleTask({ taskType: 'sampleTask', schedule: { interval: `30m` }, - params: { failWith: 'error on run now', failOn: 3 }, + params: { failWith: 'this task was meant to fail!', failOn: 3 }, }); await retry.try(async () => { @@ -415,11 +422,14 @@ export default function ({ getService }) { const task = await currentTask(originalTask.id); expect(task.state.count).to.eql(1); + expect(task.status).to.eql('idle'); // ensure this task shouldnt run for another half hour expectReschedule(Date.parse(originalTask.runAt), task, 30 * 60000); }); + await ensureTasksIndexRefreshed(); + // second run should still be successful const successfulRunNowResult = await runTaskNow({ id: originalTask.id, @@ -429,14 +439,20 @@ export default function ({ getService }) { await retry.try(async () => { const task = await currentTask(originalTask.id); expect(task.state.count).to.eql(2); + expect(task.status).to.eql('idle'); }); + await ensureTasksIndexRefreshed(); + // third run should fail const failedRunNowResult = await runTaskNow({ id: originalTask.id, }); - expect(failedRunNowResult).to.eql({ id: originalTask.id, error: `Error: error on run now` }); + expect(failedRunNowResult).to.eql({ + id: originalTask.id, + error: `Error: Failed to run task \"${originalTask.id}\": Error: this task was meant to fail!`, + }); await retry.try(async () => { expect( @@ -479,8 +495,13 @@ export default function ({ getService }) { expect( docs.filter((taskDoc) => taskDoc._source.taskId === longRunningTask.id).length ).to.eql(1); + + const task = await currentTask(longRunningTask.id); + expect(task.status).to.eql('running'); }); + await ensureTasksIndexRefreshed(); + // first runNow should fail const failedRunNowResult = await runTaskNow({ id: longRunningTask.id, @@ -496,8 +517,13 @@ export default function ({ getService }) { await retry.try(async () => { const tasks = (await currentTasks()).docs; expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1); + + const task = await currentTask(longRunningTask.id); + expect(task.status).to.eql('idle'); }); + await ensureTasksIndexRefreshed(); + // second runNow should be successful const successfulRunNowResult = runTaskNow({ id: longRunningTask.id,