From d9ad2d6a3d30b3b7bee0c776f50f488684179b42 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Thu, 13 Aug 2020 14:48:53 +0100 Subject: [PATCH] [Task manager] Prevents edge case where already running tasks are reschedule every polling interval (#74606) (#74941) Fixes flaky tests in Task Manager and Alerting. The fix in #73244 was correct, but it missed an edge case which causes the already running task to be rescheduled over and over. This prevents that edge case which was effecting both TM in general and Alerting specifically. --- .../task_manager/server/task_store.test.ts | 104 +++++++++++++++++- .../plugins/task_manager/server/task_store.ts | 55 ++++----- .../task_manager/task_manager_integration.js | 3 +- 3 files changed, 124 insertions(+), 38 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index d65c39f4f454d..a02123c4a3f8d 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -627,7 +627,7 @@ if (doc['task.runAt'].size()!=0) { }); }); - test('it returns task objects', async () => { + test('it filters out running tasks', async () => { const taskManagerId = uuid.v1(); const claimOwnershipUntil = new Date(Date.now()); const runAt = new Date(); @@ -641,7 +641,7 @@ if (doc['task.runAt'].size()!=0) { taskType: 'foo', schedule: undefined, attempts: 0, - status: 'idle', + status: 'claiming', params: '{ "hello": "world" }', state: '{ "baby": "Henhen" }', user: 'jimbo', @@ -715,7 +715,103 @@ if (doc['task.runAt'].size()!=0) { runAt, scope: ['reporting'], state: { baby: 'Henhen' }, - status: 'idle', + status: 'claiming', + taskType: 'foo', + user: 'jimbo', + ownerId: taskManagerId, + }, + ]); + }); + + test('it returns task objects', async () => { + const taskManagerId = uuid.v1(); + const claimOwnershipUntil = new Date(Date.now()); + const runAt = new Date(); + const tasks = [ + { + _id: 'aaa', + _source: { + type: 'task', + task: { + runAt, + taskType: 'foo', + schedule: undefined, + attempts: 0, + status: 'claiming', + params: '{ "hello": "world" }', + state: '{ "baby": "Henhen" }', + user: 'jimbo', + scope: ['reporting'], + ownerId: taskManagerId, + }, + }, + _seq_no: 1, + _primary_term: 2, + sort: ['a', 1], + }, + { + _id: 'bbb', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + schedule: { interval: '5m' }, + attempts: 2, + status: 'claiming', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + ownerId: taskManagerId, + }, + }, + _seq_no: 3, + _primary_term: 4, + sort: ['b', 2], + }, + ]; + const { + result: { docs }, + args: { + search: { + body: { query }, + }, + }, + } = await testClaimAvailableTasks({ + opts: { + taskManagerId, + }, + claimingOpts: { + claimOwnershipUntil, + size: 10, + }, + hits: tasks, + }); + + expect(query.bool.must).toContainEqual({ + bool: { + must: [ + { + term: { + 'task.ownerId': taskManagerId, + }, + }, + { term: { 'task.status': 'claiming' } }, + ], + }, + }); + + expect(docs).toMatchObject([ + { + attempts: 0, + id: 'aaa', + schedule: undefined, + params: { hello: 'world' }, + runAt, + scope: ['reporting'], + state: { baby: 'Henhen' }, + status: 'claiming', taskType: 'foo', user: 'jimbo', ownerId: taskManagerId, @@ -728,7 +824,7 @@ if (doc['task.runAt'].size()!=0) { runAt, scope: ['reporting', 'ceo'], state: { henry: 'The 8th' }, - status: 'running', + status: 'claiming', taskType: 'bar', user: 'dabo', ownerId: taskManagerId, diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index fbb21d154f32d..ece78943d165c 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -216,48 +216,39 @@ export class TaskStore { claimTasksByIdWithRawIds, size ); + const docs = numberOfTasksClaimed > 0 ? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size) : []; - // emit success/fail events for claimed tasks by id - if (claimTasksById && claimTasksById.length) { - const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) => - claimTasksById.includes(doc.id) - ); - - const [documentsClaimedById, documentsRequestedButNotClaimed] = partition( - documentsReturnedById, - // we filter the schduled tasks down by status is 'claiming' in the esearch, - // but we do not apply this limitation on tasks claimed by ID so that we can - // provide more detailed error messages when we fail to claim them - (doc) => doc.status === TaskStatus.Claiming - ); - - const documentsRequestedButNotReturned = difference( - claimTasksById, - map(documentsReturnedById, 'id') - ); + const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) => + claimTasksById.includes(doc.id) + ); - this.emitEvents( - [...documentsClaimedById, ...documentsClaimedBySchedule].map((doc) => - asTaskClaimEvent(doc.id, asOk(doc)) - ) - ); + const [documentsClaimedById, documentsRequestedButNotClaimed] = partition( + documentsReturnedById, + // we filter the schduled tasks down by status is 'claiming' in the esearch, + // but we do not apply this limitation on tasks claimed by ID so that we can + // provide more detailed error messages when we fail to claim them + (doc) => doc.status === TaskStatus.Claiming + ); - this.emitEvents( - documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))) - ); + const documentsRequestedButNotReturned = difference( + claimTasksById, + map(documentsReturnedById, 'id') + ); - this.emitEvents( - documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))) - ); - } + this.emitEvents([ + ...documentsClaimedById.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))), + ...documentsClaimedBySchedule.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))), + ...documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))), + ...documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))), + ]); return { - claimedTasks: numberOfTasksClaimed, - docs, + claimedTasks: documentsClaimedById.length + documentsClaimedBySchedule.length, + docs: docs.filter((doc) => doc.status === TaskStatus.Claiming), }; }; diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index ea95eb42dd6ff..c87a5039360b8 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -28,8 +28,7 @@ export default function ({ getService }) { const testHistoryIndex = '.kibana_task_manager_test_result'; const supertest = supertestAsPromised(url.format(config.get('servers.kibana'))); - // FLAKY: https://github.com/elastic/kibana/issues/71390 - describe.skip('scheduling and running tasks', () => { + describe('scheduling and running tasks', () => { beforeEach( async () => await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200) );