Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Task Manager] Add caching to the task partitioning logic #189562

Merged
merged 10 commits into from
Aug 5, 2024
36 changes: 30 additions & 6 deletions x-pack/plugins/task_manager/server/lib/task_partitioner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
createDiscoveryServiceMock,
createFindSO,
} from '../kibana_discovery_service/mock_kibana_discovery_service';
import { TaskPartitioner } from './task_partitioner';
import { CACHE_INTERVAL, TaskPartitioner } from './task_partitioner';

const POD_NAME = 'test-pod';

Expand Down Expand Up @@ -47,11 +47,20 @@ describe('getPodName()', () => {
describe('getPartitions()', () => {
const lastSeen = '2024-08-10T10:00:00.000Z';
const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME);
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
createFindSO(POD_NAME, lastSeen),
createFindSO('test-pod-2', lastSeen),
createFindSO('test-pod-3', lastSeen),
]);

beforeEach(() => {
jest.useFakeTimers();
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
createFindSO(POD_NAME, lastSeen),
createFindSO('test-pod-2', lastSeen),
createFindSO('test-pod-3', lastSeen),
]);
});

afterEach(() => {
jest.clearAllMocks();
jest.clearAllTimers();
});

test('correctly gets the partitons for this pod', async () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
Expand All @@ -67,4 +76,19 @@ describe('getPartitions()', () => {
247, 249, 250, 252, 253, 255,
]);
});

test('correctly caches the partitions on 10 second interval ', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for handling errors from the discovery service?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, resolved in this commit e39827c

const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
const shorterInterval = CACHE_INTERVAL / 2;

await taskPartitioner.getPartitions();

jest.advanceTimersByTime(shorterInterval);
await taskPartitioner.getPartitions();

jest.advanceTimersByTime(shorterInterval);
await taskPartitioner.getPartitions();

expect(discoveryServiceMock.getActiveKibanaNodes).toHaveBeenCalledTimes(2);
});
});
18 changes: 15 additions & 3 deletions x-pack/plugins/task_manager/server/lib/task_partitioner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ function range(start: number, end: number) {
}

export const MAX_PARTITIONS = 256;
export const CACHE_INTERVAL = 10000;

export class TaskPartitioner {
private readonly allPartitions: number[];
private readonly podName: string;
private kibanaDiscoveryService: KibanaDiscoveryService;
private podPartitions: number[];
private podPartitionsLastUpdated: number;

constructor(podName: string, kibanaDiscoveryService: KibanaDiscoveryService) {
this.allPartitions = range(0, MAX_PARTITIONS);
this.podName = podName;
this.kibanaDiscoveryService = kibanaDiscoveryService;
this.podPartitions = [];
this.podPartitionsLastUpdated = Date.now() - CACHE_INTERVAL;
}

getAllPartitions(): number[] {
Expand All @@ -38,9 +43,16 @@ export class TaskPartitioner {
}

async getPartitions(): Promise<number[]> {
const allPodNames = await this.getAllPodNames();
const podPartitions = assignPodPartitions(this.podName, allPodNames, this.allPartitions);
return podPartitions;
const lastUpdated = new Date(this.podPartitionsLastUpdated).getTime();
const now = Date.now();

// update the pod partitions cache after 10 seconds
if (now - lastUpdated >= CACHE_INTERVAL) {
const allPodNames = await this.getAllPodNames();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's an error retrieving this, should we return the cached partitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that sounds good, resolved in this commit e39827c

this.podPartitions = assignPodPartitions(this.podName, allPodNames, this.allPartitions);
this.podPartitionsLastUpdated = now;
}
return this.podPartitions;
}

private async getAllPodNames(): Promise<string[]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ import {
createFindSO,
} from '../kibana_discovery_service/mock_kibana_discovery_service';

jest.mock('../lib/assign_pod_partitions', () => ({
assignPodPartitions: jest.fn().mockReturnValue([1, 3]),
}));

jest.mock('../constants', () => ({
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [
'limitedToZero',
Expand Down Expand Up @@ -107,6 +103,7 @@ describe('TaskClaiming', () => {
.spyOn(apm, 'startTransaction')
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.mockImplementation(() => mockApmTrans as any);
jest.spyOn(taskPartitioner, 'getPartitions').mockResolvedValue([1, 3]);
});

describe('claimAvailableTasks', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
import { taskMappings as TaskManagerMapping } from '@kbn/task-manager-plugin/server/saved_objects/mappings';
import { asyncForEach } from '@kbn/std';
import { setTimeout as setTimeoutAsync } from 'timers/promises';
import { FtrProviderContext } from '../../ftr_provider_context';

const { properties: taskManagerIndexMapping } = TaskManagerMapping;
Expand Down Expand Up @@ -154,13 +155,17 @@ export default function ({ getService }: FtrProviderContext) {
});
});

it('should tasks with partitions assigned to this kibana node', async () => {
it('should run tasks with partitions assigned to this kibana node', async () => {
const partitions: Record<string, number> = {
'0': 127,
'1': 147,
'2': 23,
};

// wait for the pod partitions cache to update before scheduling tasks
await updateKibanaNodes();
await setTimeoutAsync(10000);

const tasksToSchedule = [];
for (let i = 0; i < 3; i++) {
tasksToSchedule.push(
Expand Down