Skip to content

Commit

Permalink
[8.x] Skip claiming tasks that were modified during the task claiming…
Browse files Browse the repository at this point in the history
… phase (elastic#198711) (elastic#199251)

# Backport

This will backport the following commits from `main` to `8.x`:
- [Skip claiming tasks that were modified during the task claiming phase
(elastic#198711)](elastic#198711)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Mike
Côté","email":"[email protected]"},"sourceCommit":{"committedDate":"2024-11-07T03:15:31Z","message":"Skip
claiming tasks that were modified during the task claiming phase
(elastic#198711)\n\nResolves
https://github.com/elastic/kibana/issues/196300\r\n\r\nIn this PR, I'm
removing the fallback we had when `startedAt` value is\r\nmissing
(elastic#194759) in favour of\r\ndropping
the task document from the claiming cycle. The additional logs\r\nthat
were added showed that tasks that were missing a `startedAt`
value\r\nwas because they were being re-created during the exact same
time they\r\nwere being claimed, causing the task to have an `idle`
status and a\r\nmissing `startedAt` value. Given the scenario, it feels
better to drop\r\nthem from getting claimed and to instead try again at
the next claim\r\ncycle where the race condition shouldn't
occur.","sha":"a19dd8ea97bae73dc69ba71e74917e80f091a8a1","branchLabelMapping":{"^v9.0.0$":"main","^v8.17.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","Feature:Task
Manager","Team:ResponseOps","v9.0.0","backport:version","v8.17.0"],"title":"Skip
claiming tasks that were modified during the task claiming
phase","number":198711,"url":"https://github.com/elastic/kibana/pull/198711","mergeCommit":{"message":"Skip
claiming tasks that were modified during the task claiming phase
(elastic#198711)\n\nResolves
https://github.com/elastic/kibana/issues/196300\r\n\r\nIn this PR, I'm
removing the fallback we had when `startedAt` value is\r\nmissing
(elastic#194759) in favour of\r\ndropping
the task document from the claiming cycle. The additional logs\r\nthat
were added showed that tasks that were missing a `startedAt`
value\r\nwas because they were being re-created during the exact same
time they\r\nwere being claimed, causing the task to have an `idle`
status and a\r\nmissing `startedAt` value. Given the scenario, it feels
better to drop\r\nthem from getting claimed and to instead try again at
the next claim\r\ncycle where the race condition shouldn't
occur.","sha":"a19dd8ea97bae73dc69ba71e74917e80f091a8a1"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/198711","number":198711,"mergeCommit":{"message":"Skip
claiming tasks that were modified during the task claiming phase
(elastic#198711)\n\nResolves
https://github.com/elastic/kibana/issues/196300\r\n\r\nIn this PR, I'm
removing the fallback we had when `startedAt` value is\r\nmissing
(elastic#194759) in favour of\r\ndropping
the task document from the claiming cycle. The additional logs\r\nthat
were added showed that tasks that were missing a `startedAt`
value\r\nwas because they were being re-created during the exact same
time they\r\nwere being claimed, causing the task to have an `idle`
status and a\r\nmissing `startedAt` value. Given the scenario, it feels
better to drop\r\nthem from getting claimed and to instead try again at
the next claim\r\ncycle where the race condition shouldn't
occur.","sha":"a19dd8ea97bae73dc69ba71e74917e80f091a8a1"}},{"branch":"8.x","label":"v8.17.0","branchLabelMappingKey":"^v8.17.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Mike Côté <[email protected]>
  • Loading branch information
kibanamachine and mikecote authored Nov 7, 2024
1 parent 0b40d99 commit 4e8b6ec
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1344,15 +1344,15 @@ describe('TaskClaiming', () => {
expect(result.docs.length).toEqual(3);
});

test('should assign startedAt value if bulkGet returns task with null startedAt', async () => {
test('should skip tasks where bulkGet returns a newer task document than the bulkPartialUpdate', async () => {
const store = taskStoreMock.create({ taskManagerId: 'test-test' });
store.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`));

const fetchedTasks = [
mockInstance({ id: `id-1`, taskType: 'report' }),
mockInstance({ id: `id-2`, taskType: 'report' }),
mockInstance({ id: `id-3`, taskType: 'yawn' }),
mockInstance({ id: `id-4`, taskType: 'report' }),
mockInstance({ id: `id-1`, taskType: 'report', version: '123' }),
mockInstance({ id: `id-2`, taskType: 'report', version: '123' }),
mockInstance({ id: `id-3`, taskType: 'yawn', version: '123' }),
mockInstance({ id: `id-4`, taskType: 'report', version: '123' }),
];

const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks);
Expand All @@ -1365,7 +1365,7 @@ describe('TaskClaiming', () => {
);
store.bulkGet.mockResolvedValueOnce([
asOk({ ...fetchedTasks[0], startedAt: new Date() }),
asOk(fetchedTasks[1]),
asOk({ ...fetchedTasks[1], startedAt: new Date(), version: 'abc' }),
asOk({ ...fetchedTasks[2], startedAt: new Date() }),
asOk({ ...fetchedTasks[3], startedAt: new Date() }),
]);
Expand Down Expand Up @@ -1399,11 +1399,11 @@ describe('TaskClaiming', () => {
expect(mockApmTrans.end).toHaveBeenCalledWith('success');

expect(taskManagerLogger.debug).toHaveBeenCalledWith(
'task claimer claimed: 4; stale: 0; conflicts: 0; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
'task claimer claimed: 3; stale: 0; conflicts: 1; missing: 0; capacity reached: 0; updateErrors: 0; getErrors: 0; removed: 0;',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
expect(taskManagerLogger.warn).toHaveBeenCalledWith(
'Task id-2 has a null startedAt value, setting to current time - ownerId null, status idle',
'Task id-2 was modified during the claiming phase, skipping until the next claiming cycle.',
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);

Expand Down Expand Up @@ -1463,14 +1463,14 @@ describe('TaskClaiming', () => {
expect(store.bulkGet).toHaveBeenCalledWith(['id-1', 'id-2', 'id-3', 'id-4']);

expect(result.stats).toEqual({
tasksClaimed: 4,
tasksConflicted: 0,
tasksClaimed: 3,
tasksConflicted: 1,
tasksErrors: 0,
tasksUpdated: 4,
tasksUpdated: 3,
tasksLeftUnclaimed: 0,
staleTasks: 0,
});
expect(result.docs.length).toEqual(4);
expect(result.docs.length).toEqual(3);
for (const r of result.docs) {
expect(r.startedAt).not.toBeNull();
}
Expand Down
38 changes: 16 additions & 22 deletions x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}

// perform the task object updates, deal with errors
const updatedTaskIds: string[] = [];
const updatedTasks: Record<string, PartialConcreteTaskInstance> = {};
let conflicts = 0;
let bulkUpdateErrors = 0;
let bulkGetErrors = 0;

const updateResults = await taskStore.bulkPartialUpdate(taskUpdates);
for (const updateResult of updateResults) {
if (isOk(updateResult)) {
updatedTaskIds.push(updateResult.value.id);
updatedTasks[updateResult.value.id] = updateResult.value;
} else {
const { id, type, error, status } = updateResult.error;

Expand All @@ -218,29 +218,23 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise<ClaimOwnershi
}

// perform an mget to get the full task instance for claiming
const fullTasksToRun = (await taskStore.bulkGet(updatedTaskIds)).reduce<ConcreteTaskInstance[]>(
(acc, task) => {
if (isOk(task)) {
acc.push(task.value);
} else {
const { id, type, error } = task.error;
logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`);
bulkGetErrors++;
}
return acc;
},
[]
);

// Look for tasks that have a null startedAt value, log them and manually set a startedAt field
for (const task of fullTasksToRun) {
if (task.startedAt == null) {
const fullTasksToRun = (await taskStore.bulkGet(Object.keys(updatedTasks))).reduce<
ConcreteTaskInstance[]
>((acc, task) => {
if (isOk(task) && task.value.version !== updatedTasks[task.value.id].version) {
logger.warn(
`Task ${task.id} has a null startedAt value, setting to current time - ownerId ${task.ownerId}, status ${task.status}`
`Task ${task.value.id} was modified during the claiming phase, skipping until the next claiming cycle.`
);
task.startedAt = now;
conflicts++;
} else if (isOk(task)) {
acc.push(task.value);
} else {
const { id, type, error } = task.error;
logger.error(`Error getting full task ${id}:${type} during claim: ${error.message}`);
bulkGetErrors++;
}
}
return acc;
}, []);

// separate update for removed tasks; shouldn't happen often, so unlikely
// a performance concern, and keeps the rest of the logic simpler
Expand Down
17 changes: 12 additions & 5 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,10 @@ describe('TaskStore', () => {
refresh: false,
});

expect(result).toEqual([asOk(task)]);
expect(result).toEqual([
// New version returned after update
asOk({ ...task, version: 'Wzg0LDFd' }),
]);
});

test(`should perform partial update with minimal fields`, async () => {
Expand Down Expand Up @@ -1062,7 +1065,8 @@ describe('TaskStore', () => {
refresh: false,
});

expect(result).toEqual([asOk(task)]);
// New version returned after update
expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]);
});

test(`should perform partial update with no version`, async () => {
Expand Down Expand Up @@ -1100,7 +1104,8 @@ describe('TaskStore', () => {
refresh: false,
});

expect(result).toEqual([asOk(task)]);
// New version returned after update
expect(result).toEqual([asOk({ ...task, version: 'Wzg0LDFd' })]);
});

test(`should gracefully handle errors within the response`, async () => {
Expand Down Expand Up @@ -1183,7 +1188,8 @@ describe('TaskStore', () => {
});

expect(result).toEqual([
asOk(task1),
// New version returned after update
asOk({ ...task1, version: 'Wzg0LDFd' }),
asErr({
type: 'task',
id: '45343254',
Expand Down Expand Up @@ -1267,7 +1273,8 @@ describe('TaskStore', () => {
});

expect(result).toEqual([
asOk(task1),
// New version returned after update
asOk({ ...task1, version: 'Wzg0LDFd' }),
asErr({
type: 'task',
id: 'unknown',
Expand Down
3 changes: 2 additions & 1 deletion x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
ElasticsearchClient,
} from '@kbn/core/server';

import { decodeRequestVersion } from '@kbn/core-saved-objects-base-server-internal';
import { decodeRequestVersion, encodeVersion } from '@kbn/core-saved-objects-base-server-internal';
import { RequestTimeoutsConfig } from './config';
import { asOk, asErr, Result } from './lib/result_type';

Expand Down Expand Up @@ -427,6 +427,7 @@ export class TaskStore {
return asOk({
...doc,
id: docId,
version: encodeVersion(item.update._seq_no, item.update._primary_term),
});
});
}
Expand Down

0 comments on commit 4e8b6ec

Please sign in to comment.