Skip to content

Commit

Permalink
[Response Ops][Task Manager] Resource based task scheduling - 2nd att…
Browse files Browse the repository at this point in the history
…empt (#189626)

## Summary

Redoing the resource based task claim PR:
#187999 and followup PRs
#189220 and
#189117. Please see the
descriptions of those PRs for more details.

This was original reverted because unregistered task types in serverless
caused the task manager health aggregation to fail. This PR includes an
additional commit to exclude unregistered task types from the health
report:
58eb2b1.

To verify this, make sure you're using the `default` claim strategy,
start up Kibana so that the default set of tasks get created. Then
either disable a bunch of plugins via config:

```
# remove security and o11y
enterpriseSearch.enabled: false
xpack.apm.enabled: false
xpack.cloudSecurityPosture.enabled: false
xpack.fleet.enabled: false
xpack.infra.enabled: false
xpack.observability.enabled: false
xpack.observabilityAIAssistant.enabled: false
xpack.observabilityLogsExplorer.enabled: false
xpack.search.notebooks.enabled: false
xpack.securitySolution.enabled: false
xpack.uptime.enabled: false
```

or comment out the task registration of a task that was previously
scheduled (I'm using the observability AI assistant)

```
--- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/index.ts
+++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/index.ts
@@ -89,24 +89,24 @@ export class ObservabilityAIAssistantService {

     this.allowInit();

-    taskManager.registerTaskDefinitions({
-      [INDEX_QUEUED_DOCUMENTS_TASK_TYPE]: {
-        title: 'Index queued KB articles',
-        description:
-          'Indexes previously registered entries into the knowledge base when it is ready',
-        timeout: '30m',
-        maxAttempts: 2,
-        createTaskRunner: (context) => {
-          return {
-            run: async () => {
-              if (this.kbService) {
-                await this.kbService.processQueue();
-              }
-            },
-          };
-        },
-      },
-    });
+    // taskManager.registerTaskDefinitions({
+    //   [INDEX_QUEUED_DOCUMENTS_TASK_TYPE]: {
+    //     title: 'Index queued KB articles',
+    //     description:
+    //       'Indexes previously registered entries into the knowledge base when it is ready',
+    //     timeout: '30m',
+    //     maxAttempts: 2,
+    //     createTaskRunner: (context) => {
+    //       return {
+    //         run: async () => {
+    //           if (this.kbService) {
+    //             await this.kbService.processQueue();
+    //           }
+    //         },
+    //       };
+    //     },
+    //   },
+    // });
   }
```

and restart Kibana. You should still be able to access the TM health
report with the workload field and if you update the background health
logging so it always logs and more frequently, you should see the
logging succeed with no errors:

Below, I've made changes to always log the background health at a 15
second interval:

```
--- a/x-pack/plugins/task_manager/server/plugin.ts
+++ b/x-pack/plugins/task_manager/server/plugin.ts
@@ -236,6 +236,7 @@ export class TaskManagerPlugin
     if (this.isNodeBackgroundTasksOnly()) {
       setupIntervalLogging(monitoredHealth$, this.logger, LogHealthForBackgroundTasksOnlyMinutes);
     }
+    setupIntervalLogging(monitoredHealth$, this.logger, LogHealthForBackgroundTasksOnlyMinutes);
reduce the logging interval


--- a/x-pack/plugins/task_manager/server/lib/log_health_metrics.ts
+++ b/x-pack/plugins/task_manager/server/lib/log_health_metrics.ts
@@ -35,7 +35,8 @@ export function setupIntervalLogging(
     monitoredHealth = m;
   });

-  setInterval(onInterval, 1000 * 60 * minutes);
+  // setInterval(onInterval, 1000 * 60 * minutes);
+  setInterval(onInterval, 1000 * 15);

   function onInterval() {
```

---------

Co-authored-by: kibanamachine <[email protected]>
Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
3 people authored Aug 7, 2024
1 parent 0a01fdd commit e46e54a
Show file tree
Hide file tree
Showing 70 changed files with 5,092 additions and 1,546 deletions.
33 changes: 33 additions & 0 deletions x-pack/plugins/alerting/server/rule_type_registry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,39 @@ describe('Create Lifecycle', () => {
});
});

test('injects custom cost for certain rule types', () => {
const ruleType: RuleType<never, never, never, never, never, 'default', 'recovered', {}> = {
id: 'siem.indicatorRule',
name: 'Test',
actionGroups: [
{
id: 'default',
name: 'Default',
},
],
defaultActionGroupId: 'default',
minimumLicenseRequired: 'basic',
isExportable: true,
executor: jest.fn(),
category: 'test',
producer: 'alerts',
ruleTaskTimeout: '20m',
validate: {
params: { validate: (params) => params },
},
};
const registry = new RuleTypeRegistry(ruleTypeRegistryParams);
registry.register(ruleType);
expect(taskManager.registerTaskDefinitions).toHaveBeenCalledTimes(1);
expect(taskManager.registerTaskDefinitions.mock.calls[0][0]).toMatchObject({
'alerting:siem.indicatorRule': {
timeout: '20m',
title: 'Test',
cost: 10,
},
});
});

test('shallow clones the given rule type', () => {
const ruleType: RuleType<never, never, never, never, never, 'default', 'recovered', {}> = {
id: 'test',
Expand Down
7 changes: 7 additions & 0 deletions x-pack/plugins/alerting/server/rule_type_registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { Logger } from '@kbn/core/server';
import { LicensingPluginSetup } from '@kbn/licensing-plugin/server';
import { RunContext, TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import { stateSchemaByVersion } from '@kbn/alerting-state-types';
import { TaskCost } from '@kbn/task-manager-plugin/server/task';
import { TaskRunnerFactory } from './task_runner';
import {
RuleType,
Expand All @@ -40,6 +41,9 @@ import { AlertsService } from './alerts_service/alerts_service';
import { getRuleTypeIdValidLegacyConsumers } from './rule_type_registry_deprecated_consumers';
import { AlertingConfig } from './config';

const RULE_TYPES_WITH_CUSTOM_COST: Record<string, TaskCost> = {
'siem.indicatorRule': TaskCost.ExtraLarge,
};
export interface ConstructorOptions {
config: AlertingConfig;
logger: Logger;
Expand Down Expand Up @@ -289,6 +293,8 @@ export class RuleTypeRegistry {
normalizedRuleType as unknown as UntypedNormalizedRuleType
);

const taskCost: TaskCost | undefined = RULE_TYPES_WITH_CUSTOM_COST[ruleType.id];

this.taskManager.registerTaskDefinitions({
[`alerting:${ruleType.id}`]: {
title: ruleType.name,
Expand All @@ -310,6 +316,7 @@ export class RuleTypeRegistry {
spaceId: schema.string(),
consumer: schema.maybe(schema.string()),
}),
...(taskCost ? { cost: taskCost } : {}),
},
});

Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/kibana.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"task_manager"
],
"optionalPlugins": [
"cloud",
"usageCollection"
]
}
Expand Down
6 changes: 3 additions & 3 deletions x-pack/plugins/task_manager/server/MONITORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ The root `timestamp` is the time in which the summary was exposed (either to the
Follow this step-by-step guide to make sense of the stats: https://www.elastic.co/guide/en/kibana/master/task-manager-troubleshooting.html#task-manager-diagnosing-root-cause

#### The Configuration Section
The `configuration` section summarizes Task Manager's current configuration, including dynamic configurations which change over time, such as `poll_interval` and `max_workers` which adjust in reaction to changing load on the system.
The `configuration` section summarizes Task Manager's current configuration, including dynamic configurations which change over time, such as `poll_interval` and `capacity` which adjust in reaction to changing load on the system.

These are "Hot" stats which are updated whenever a change happens in the configuration.

Expand All @@ -69,8 +69,8 @@ The `runtime` tracks Task Manager's performance as it runs, making note of task
These include:
- The time it takes a task to run (p50, p90, p95 & p99, using a configurable running average window, `50` by default)
- The average _drift_ that tasks experience (p50, p90, p95 & p99, using the same configurable running average window as above). Drift tells us how long after a task's scheduled a task typically executes.
- The average _load_ (p50, p90, p95 & p99, using the same configurable running average window as above). Load tells us what percentage of workers is in use at the end of each polling cycle.
- The polling rate (the timestamp of the last time a polling cycle completed), the polling health stats (number of version clashes and mismatches) and the result [`No tasks | Filled task pool | Unexpectedly ran out of workers`] frequency the past 50 polling cycles (using the same window size as the one used for running averages)
- The average _load_ (p50, p90, p95 & p99, using the same configurable running average window as above). Load tells us what percentage of capacity is in use at the end of each polling cycle.
- The polling rate (the timestamp of the last time a polling cycle completed), the polling health stats (number of version clashes and mismatches) and the result [`No tasks | Filled task pool | Unexpectedly ran out of capacity`] frequency the past 50 polling cycles (using the same window size as the one used for running averages)
- The `Success | Retry | Failure ratio` by task type. This is different than the workload stats which tell you what's in the queue, but ca't keep track of retries and of non recurring tasks as they're wiped off the index when completed.

These are "Hot" stats which are updated reactively as Tasks are executed and interacted with.
3 changes: 0 additions & 3 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down Expand Up @@ -81,7 +80,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down Expand Up @@ -137,7 +135,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_workers": 10,
"metrics_reset_interval": 30000,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down
16 changes: 11 additions & 5 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import { schema, TypeOf } from '@kbn/config-schema';

export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_CAPACITY = 10;
export const MAX_CAPACITY = 50;
export const MIN_CAPACITY = 5;
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;
Expand Down Expand Up @@ -64,6 +67,8 @@ const requestTimeoutsConfig = schema.object({
export const configSchema = schema.object(
{
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
/* The number of normal cost tasks that this Kibana instance will run simultaneously */
capacity: schema.maybe(schema.number({ min: MIN_CAPACITY, max: MAX_CAPACITY })),
ephemeral_tasks: schema.object({
enabled: schema.boolean({ defaultValue: false }),
/* How many requests can Task Manager buffer before it rejects new requests. */
Expand All @@ -81,11 +86,12 @@ export const configSchema = schema.object(
min: 1,
}),
/* The maximum number of tasks that this Kibana instance will run simultaneously. */
max_workers: schema.number({
defaultValue: DEFAULT_MAX_WORKERS,
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
}),
max_workers: schema.maybe(
schema.number({
// disable the task manager rather than trying to specify it with 0 workers
min: 1,
})
),
/* The interval at which monotonically increasing metrics counters will reset */
metrics_reset_interval: schema.number({
defaultValue: DEFAULT_METRICS_RESET_INTERVAL,
Expand Down
29 changes: 14 additions & 15 deletions x-pack/plugins/task_manager/server/ephemeral_task_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { v4 as uuidv4 } from 'uuid';
import { asTaskPollingCycleEvent, asTaskRunEvent, TaskPersistence } from './task_events';
import { TaskRunResult } from './task_running';
import { TaskPoolRunResult } from './task_pool';
import { TaskPoolMock } from './task_pool.mock';
import { TaskPoolMock } from './task_pool/task_pool.mock';
import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { taskManagerMock } from './mocks';

Expand All @@ -45,7 +45,6 @@ describe('EphemeralTaskLifecycle', () => {
definitions: new TaskTypeDictionary(taskManagerLogger),
executionContext,
config: {
max_workers: 10,
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
Expand Down Expand Up @@ -156,7 +155,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));

poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});

lifecycleEvent$.next(
Expand All @@ -179,7 +178,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(task)).toMatchObject(asOk(task));

poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});

lifecycleEvent$.next(
Expand Down Expand Up @@ -216,7 +215,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(ephemeralTaskLifecycle.attemptToRun(tasks[2])).toMatchObject(asOk(tasks[2]));

poolCapacity.mockReturnValue({
availableWorkers: 2,
availableCapacity: 2,
});

lifecycleEvent$.next(
Expand Down Expand Up @@ -256,9 +255,9 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity for both
poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});
pool.getOccupiedWorkersByType.mockReturnValue(0);
pool.getUsedCapacityByType.mockReturnValue(0);

lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
Expand Down Expand Up @@ -296,10 +295,10 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity in general
poolCapacity.mockReturnValue({
availableWorkers: 2,
availableCapacity: 2,
});
// but when we ask how many it has occupied by type - wee always have one worker already occupied by that type
pool.getOccupiedWorkersByType.mockReturnValue(1);
pool.getUsedCapacityByType.mockReturnValue(1);

lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
Expand All @@ -308,7 +307,7 @@ describe('EphemeralTaskLifecycle', () => {
expect(pool.run).toHaveBeenCalledTimes(0);

// now we release the worker in the pool and cause another cycle in the epheemral queue
pool.getOccupiedWorkersByType.mockReturnValue(0);
pool.getUsedCapacityByType.mockReturnValue(0);
lifecycleEvent$.next(
asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed }))
);
Expand Down Expand Up @@ -356,9 +355,9 @@ describe('EphemeralTaskLifecycle', () => {

// pool has capacity for all
poolCapacity.mockReturnValue({
availableWorkers: 10,
availableCapacity: 10,
});
pool.getOccupiedWorkersByType.mockReturnValue(0);
pool.getUsedCapacityByType.mockReturnValue(0);

lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));

Expand Down Expand Up @@ -389,19 +388,19 @@ describe('EphemeralTaskLifecycle', () => {

expect(ephemeralTaskLifecycle.queuedTasks).toBe(3);
poolCapacity.mockReturnValue({
availableWorkers: 1,
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(2);

poolCapacity.mockReturnValue({
availableWorkers: 1,
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(1);

poolCapacity.mockReturnValue({
availableWorkers: 1,
availableCapacity: 1,
});
lifecycleEvent$.next(asTaskPollingCycleEvent(asOk({ result: FillPoolResult.NoTasksClaimed })));
expect(ephemeralTaskLifecycle.queuedTasks).toBe(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,13 @@ export class EphemeralTaskLifecycle {
taskType && this.definitions.get(taskType)?.maxConcurrency
? Math.max(
Math.min(
this.pool.availableWorkers,
this.pool.availableCapacity(),
this.definitions.get(taskType)!.maxConcurrency! -
this.pool.getOccupiedWorkersByType(taskType)
this.pool.getUsedCapacityByType(taskType)
),
0
)
: this.pool.availableWorkers;
: this.pool.availableCapacity();

private emitEvent = (event: TaskLifecycleEvent) => {
this.events$.next(event);
Expand Down
7 changes: 4 additions & 3 deletions x-pack/plugins/task_manager/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ export type {

export const config: PluginConfigDescriptor<TaskManagerConfig> = {
schema: configSchema,
exposeToUsage: {
max_workers: true,
},
deprecations: ({ deprecate }) => {
return [
deprecate('ephemeral_tasks.enabled', 'a future version', {
Expand All @@ -68,6 +65,10 @@ export const config: PluginConfigDescriptor<TaskManagerConfig> = {
level: 'warning',
message: `Configuring "xpack.task_manager.ephemeral_tasks.request_capacity" is deprecated and will be removed in a future version. Remove this setting to increase task execution resiliency.`,
}),
deprecate('max_workers', 'a future version', {
level: 'warning',
message: `Configuring "xpack.task_manager.max_workers" is deprecated and will be removed in a future version. Remove this setting and use "xpack.task_manager.capacity" instead.`,
}),
(settings, fromPath, addDeprecation) => {
const taskManager = get(settings, fromPath);
if (taskManager?.index) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit e46e54a

Please sign in to comment.