diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index d65c39f4f454..a02123c4a3f8 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -627,7 +627,7 @@ if (doc['task.runAt'].size()!=0) { }); }); - test('it returns task objects', async () => { + test('it filters out running tasks', async () => { const taskManagerId = uuid.v1(); const claimOwnershipUntil = new Date(Date.now()); const runAt = new Date(); @@ -641,7 +641,7 @@ if (doc['task.runAt'].size()!=0) { taskType: 'foo', schedule: undefined, attempts: 0, - status: 'idle', + status: 'claiming', params: '{ "hello": "world" }', state: '{ "baby": "Henhen" }', user: 'jimbo', @@ -715,7 +715,103 @@ if (doc['task.runAt'].size()!=0) { runAt, scope: ['reporting'], state: { baby: 'Henhen' }, - status: 'idle', + status: 'claiming', + taskType: 'foo', + user: 'jimbo', + ownerId: taskManagerId, + }, + ]); + }); + + test('it returns task objects', async () => { + const taskManagerId = uuid.v1(); + const claimOwnershipUntil = new Date(Date.now()); + const runAt = new Date(); + const tasks = [ + { + _id: 'aaa', + _source: { + type: 'task', + task: { + runAt, + taskType: 'foo', + schedule: undefined, + attempts: 0, + status: 'claiming', + params: '{ "hello": "world" }', + state: '{ "baby": "Henhen" }', + user: 'jimbo', + scope: ['reporting'], + ownerId: taskManagerId, + }, + }, + _seq_no: 1, + _primary_term: 2, + sort: ['a', 1], + }, + { + _id: 'bbb', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + schedule: { interval: '5m' }, + attempts: 2, + status: 'claiming', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + ownerId: taskManagerId, + }, + }, + _seq_no: 3, + _primary_term: 4, + sort: ['b', 2], + }, + ]; + const { + result: { docs }, + args: { + search: { + body: { query }, + }, + }, + } = await testClaimAvailableTasks({ + opts: { + taskManagerId, + }, + claimingOpts: { + claimOwnershipUntil, + size: 10, + }, + hits: tasks, + }); + + expect(query.bool.must).toContainEqual({ + bool: { + must: [ + { + term: { + 'task.ownerId': taskManagerId, + }, + }, + { term: { 'task.status': 'claiming' } }, + ], + }, + }); + + expect(docs).toMatchObject([ + { + attempts: 0, + id: 'aaa', + schedule: undefined, + params: { hello: 'world' }, + runAt, + scope: ['reporting'], + state: { baby: 'Henhen' }, + status: 'claiming', taskType: 'foo', user: 'jimbo', ownerId: taskManagerId, @@ -728,7 +824,7 @@ if (doc['task.runAt'].size()!=0) { runAt, scope: ['reporting', 'ceo'], state: { henry: 'The 8th' }, - status: 'running', + status: 'claiming', taskType: 'bar', user: 'dabo', ownerId: taskManagerId, diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index fbb21d154f32..ece78943d165 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -216,48 +216,39 @@ export class TaskStore { claimTasksByIdWithRawIds, size ); + const docs = numberOfTasksClaimed > 0 ? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size) : []; - // emit success/fail events for claimed tasks by id - if (claimTasksById && claimTasksById.length) { - const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) => - claimTasksById.includes(doc.id) - ); - - const [documentsClaimedById, documentsRequestedButNotClaimed] = partition( - documentsReturnedById, - // we filter the schduled tasks down by status is 'claiming' in the esearch, - // but we do not apply this limitation on tasks claimed by ID so that we can - // provide more detailed error messages when we fail to claim them - (doc) => doc.status === TaskStatus.Claiming - ); - - const documentsRequestedButNotReturned = difference( - claimTasksById, - map(documentsReturnedById, 'id') - ); + const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) => + claimTasksById.includes(doc.id) + ); - this.emitEvents( - [...documentsClaimedById, ...documentsClaimedBySchedule].map((doc) => - asTaskClaimEvent(doc.id, asOk(doc)) - ) - ); + const [documentsClaimedById, documentsRequestedButNotClaimed] = partition( + documentsReturnedById, + // we filter the schduled tasks down by status is 'claiming' in the esearch, + // but we do not apply this limitation on tasks claimed by ID so that we can + // provide more detailed error messages when we fail to claim them + (doc) => doc.status === TaskStatus.Claiming + ); - this.emitEvents( - documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))) - ); + const documentsRequestedButNotReturned = difference( + claimTasksById, + map(documentsReturnedById, 'id') + ); - this.emitEvents( - documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))) - ); - } + this.emitEvents([ + ...documentsClaimedById.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))), + ...documentsClaimedBySchedule.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))), + ...documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))), + ...documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))), + ]); return { - claimedTasks: numberOfTasksClaimed, - docs, + claimedTasks: documentsClaimedById.length + documentsClaimedBySchedule.length, + docs: docs.filter((doc) => doc.status === TaskStatus.Claiming), }; }; diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index ea95eb42dd6f..c87a5039360b 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -28,8 +28,7 @@ export default function ({ getService }) { const testHistoryIndex = '.kibana_task_manager_test_result'; const supertest = supertestAsPromised(url.format(config.get('servers.kibana'))); - // FLAKY: https://github.com/elastic/kibana/issues/71390 - describe.skip('scheduling and running tasks', () => { + describe('scheduling and running tasks', () => { beforeEach( async () => await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200) );