Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] Add caching to the task partitioning logic #189562

Merged
merged 10 commits into from
Aug 5, 2024
71 changes: 54 additions & 17 deletions x-pack/plugins/task_manager/server/lib/task_partitioner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
createDiscoveryServiceMock,
createFindSO,
} from '../kibana_discovery_service/mock_kibana_discovery_service';
import { TaskPartitioner } from './task_partitioner';
import { CACHE_INTERVAL, TaskPartitioner } from './task_partitioner';

const POD_NAME = 'test-pod';

Expand Down Expand Up @@ -47,24 +47,61 @@ describe('getPodName()', () => {
describe('getPartitions()', () => {
const lastSeen = '2024-08-10T10:00:00.000Z';
const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME);
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
createFindSO(POD_NAME, lastSeen),
createFindSO('test-pod-2', lastSeen),
createFindSO('test-pod-3', lastSeen),
]);
const expectedPartitions = [
0, 1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 21, 22, 24, 25, 27, 28, 30, 31, 33, 34, 36, 37,
39, 40, 42, 43, 45, 46, 48, 49, 51, 52, 54, 55, 57, 58, 60, 61, 63, 64, 66, 67, 69, 70, 72, 73,
75, 76, 78, 79, 81, 82, 84, 85, 87, 88, 90, 91, 93, 94, 96, 97, 99, 100, 102, 103, 105, 106,
108, 109, 111, 112, 114, 115, 117, 118, 120, 121, 123, 124, 126, 127, 129, 130, 132, 133, 135,
136, 138, 139, 141, 142, 144, 145, 147, 148, 150, 151, 153, 154, 156, 157, 159, 160, 162, 163,
165, 166, 168, 169, 171, 172, 174, 175, 177, 178, 180, 181, 183, 184, 186, 187, 189, 190, 192,
193, 195, 196, 198, 199, 201, 202, 204, 205, 207, 208, 210, 211, 213, 214, 216, 217, 219, 220,
222, 223, 225, 226, 228, 229, 231, 232, 234, 235, 237, 238, 240, 241, 243, 244, 246, 247, 249,
250, 252, 253, 255,
];

beforeEach(() => {
jest.useFakeTimers();
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
createFindSO(POD_NAME, lastSeen),
createFindSO('test-pod-2', lastSeen),
createFindSO('test-pod-3', lastSeen),
]);
});

afterEach(() => {
jest.clearAllMocks();
jest.clearAllTimers();
});

test('correctly gets the partitons for this pod', async () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
expect(await taskPartitioner.getPartitions()).toEqual([
0, 1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 21, 22, 24, 25, 27, 28, 30, 31, 33, 34, 36,
37, 39, 40, 42, 43, 45, 46, 48, 49, 51, 52, 54, 55, 57, 58, 60, 61, 63, 64, 66, 67, 69, 70,
72, 73, 75, 76, 78, 79, 81, 82, 84, 85, 87, 88, 90, 91, 93, 94, 96, 97, 99, 100, 102, 103,
105, 106, 108, 109, 111, 112, 114, 115, 117, 118, 120, 121, 123, 124, 126, 127, 129, 130, 132,
133, 135, 136, 138, 139, 141, 142, 144, 145, 147, 148, 150, 151, 153, 154, 156, 157, 159, 160,
162, 163, 165, 166, 168, 169, 171, 172, 174, 175, 177, 178, 180, 181, 183, 184, 186, 187, 189,
190, 192, 193, 195, 196, 198, 199, 201, 202, 204, 205, 207, 208, 210, 211, 213, 214, 216, 217,
219, 220, 222, 223, 225, 226, 228, 229, 231, 232, 234, 235, 237, 238, 240, 241, 243, 244, 246,
247, 249, 250, 252, 253, 255,
]);
expect(await taskPartitioner.getPartitions()).toEqual(expectedPartitions);
});

test('correctly caches the partitions on 10 second interval', async () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
const shorterInterval = CACHE_INTERVAL / 2;

await taskPartitioner.getPartitions();

jest.advanceTimersByTime(shorterInterval);
await taskPartitioner.getPartitions();

jest.advanceTimersByTime(shorterInterval);
await taskPartitioner.getPartitions();

expect(discoveryServiceMock.getActiveKibanaNodes).toHaveBeenCalledTimes(2);
});

test('correctly catches the error from the discovery service and returns the cached value', async () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);

await taskPartitioner.getPartitions();
expect(taskPartitioner.getPodPartitions()).toEqual(expectedPartitions);

discoveryServiceMock.getActiveKibanaNodes.mockRejectedValueOnce([]);
jest.advanceTimersByTime(CACHE_INTERVAL);
await taskPartitioner.getPartitions();
expect(taskPartitioner.getPodPartitions()).toEqual(expectedPartitions);
});
});
27 changes: 24 additions & 3 deletions x-pack/plugins/task_manager/server/lib/task_partitioner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ function range(start: number, end: number) {
}

export const MAX_PARTITIONS = 256;
export const CACHE_INTERVAL = 10000;

export class TaskPartitioner {
private readonly allPartitions: number[];
private readonly podName: string;
private kibanaDiscoveryService: KibanaDiscoveryService;
private podPartitions: number[];
private podPartitionsLastUpdated: number;

constructor(podName: string, kibanaDiscoveryService: KibanaDiscoveryService) {
this.allPartitions = range(0, MAX_PARTITIONS);
this.podName = podName;
this.kibanaDiscoveryService = kibanaDiscoveryService;
this.podPartitions = [];
this.podPartitionsLastUpdated = Date.now() - CACHE_INTERVAL;
}

getAllPartitions(): number[] {
Expand All @@ -37,10 +42,26 @@ export class TaskPartitioner {
return this.podName;
}

getPodPartitions(): number[] {
return this.podPartitions;
}

async getPartitions(): Promise<number[]> {
const allPodNames = await this.getAllPodNames();
const podPartitions = assignPodPartitions(this.podName, allPodNames, this.allPartitions);
return podPartitions;
const lastUpdated = new Date(this.podPartitionsLastUpdated).getTime();
const now = Date.now();

// update the pod partitions cache after 10 seconds
if (now - lastUpdated >= CACHE_INTERVAL) {
try {
const allPodNames = await this.getAllPodNames();
this.podPartitions = assignPodPartitions(this.podName, allPodNames, this.allPartitions);
this.podPartitionsLastUpdated = now;
} catch (error) {
// return the cached value
return this.podPartitions;
}
}
return this.podPartitions;
}

private async getAllPodNames(): Promise<string[]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ import {
createFindSO,
} from '../kibana_discovery_service/mock_kibana_discovery_service';

jest.mock('../lib/assign_pod_partitions', () => ({
assignPodPartitions: jest.fn().mockReturnValue([1, 3]),
}));

jest.mock('../constants', () => ({
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [
'limitedToZero',
Expand Down Expand Up @@ -107,6 +103,7 @@ describe('TaskClaiming', () => {
.spyOn(apm, 'startTransaction')
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.mockImplementation(() => mockApmTrans as any);
jest.spyOn(taskPartitioner, 'getPartitions').mockResolvedValue([1, 3]);
});

describe('claimAvailableTasks', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
import { taskMappings as TaskManagerMapping } from '@kbn/task-manager-plugin/server/saved_objects/mappings';
import { asyncForEach } from '@kbn/std';
import { setTimeout as setTimeoutAsync } from 'timers/promises';
import { FtrProviderContext } from '../../ftr_provider_context';

const { properties: taskManagerIndexMapping } = TaskManagerMapping;
Expand Down Expand Up @@ -154,13 +155,17 @@ export default function ({ getService }: FtrProviderContext) {
});
});

it('should tasks with partitions assigned to this kibana node', async () => {
it('should run tasks with partitions assigned to this kibana node', async () => {
const partitions: Record<string, number> = {
'0': 127,
'1': 147,
'2': 23,
};

// wait for the pod partitions cache to update before scheduling tasks
await updateKibanaNodes();
await setTimeoutAsync(10000);

const tasksToSchedule = [];
for (let i = 0; i < 3; i++) {
tasksToSchedule.push(
Expand Down