Skip to content

Commit

Permalink
Fix task manager polling flow controls (elastic#153491)
Browse files Browse the repository at this point in the history
Fixes elastic#151938

In this PR, I'm re-writing the Task Manager poller so it doesn't run
concurrently when timeouts occur while also fixing the issue where
polling requests would pile up when polling takes time. To support this,
I've also made the following changes:
- Removed the observable monitor and the
`xpack.task_manager.max_poll_inactivity_cycles` setting
- Make the task store `search` and `updateByQuery` functions have no
retries. This prevents the request from retrying 5x whenever a timeout
occurs, causing each call taking up to 2 1/2 minutes before Kibana sees
the error (now down to 30s each). We have polling to manage retries in
these situations.
- Switch the task poller tests to use `sinon` for faking timers
- Removing the `assertStillInSetup` checks on plugin setup. Felt like a
maintenance burden that wasn't necessary to fix with my code changes.

The main code changes are within these files (to review thoroughly so
the polling cycle doesn't suddenly stop):
- x-pack/plugins/task_manager/server/polling/task_poller.ts
- x-pack/plugins/task_manager/server/polling_lifecycle.ts (easier to
review if you disregard whitespace `?w=1`)

## To verify
1. Tasks run normally (create a rule or something that goes through task
manager regularly).
2. When the update by query takes a while, the request is cancelled
after 30s or the time manually configured.
4. When the search for claimed tasks query takes a while, the request is
cancelled after 30s or the time manually configured.

**Tips:**
<details><summary>how to slowdown search for claimed task
queries</summary>

```
diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.ts
index 07042650a37..2caefd63672 100644
--- a/x-pack/plugins/task_manager/server/queries/task_claiming.ts
+++ b/x-pack/plugins/task_manager/server/queries/task_claiming.ts
@@ -247,7 +247,7 @@ export class TaskClaiming {
         taskTypes,
       });

-    const docs = tasksUpdated > 0 ? await this.sweepForClaimedTasks(taskTypes, size) : [];
+    const docs = await this.sweepForClaimedTasks(taskTypes, size);

     this.emitEvents(docs.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))));

@@ -346,6 +346,13 @@ export class TaskClaiming {
       size,
       sort: SortByRunAtAndRetryAt,
       seq_no_primary_term: true,
+      aggs: {
+        delay: {
+          shard_delay: {
+            value: '40s',
+          },
+        },
+      },
     });

     return docs;
```
</details>

<details><summary>how to slow down update by queries</summary>
Not the cleanest way but you'll see occasional request timeouts from the
updateByQuery calls. I had more luck creating rules running every 1s.

```
diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts
index a06ee7b918a..07aa81e5388 100644
--- a/x-pack/plugins/task_manager/server/task_store.ts
+++ b/x-pack/plugins/task_manager/server/task_store.ts
@@ -126,6 +126,7 @@ export class TaskStore {
       // Timeouts are retried and make requests timeout after (requestTimeout * (1 + maxRetries))
       // The poller doesn't need retry logic because it will try again at the next polling cycle
       maxRetries: 0,
+      requestTimeout: 900,
     });
   }

@@ -458,6 +459,7 @@ export class TaskStore {
           ignore_unavailable: true,
           refresh: true,
           conflicts: 'proceed',
+          requests_per_second: 1,
           body: {
             ...opts,
             max_docs,
```
</details>

---------

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
mikecote and kibanamachine authored May 3, 2023
1 parent d96c5a6 commit cb2e28d
Show file tree
Hide file tree
Showing 29 changed files with 459 additions and 829 deletions.
1 change: 0 additions & 1 deletion docs/api/task-manager/health.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ The API returns the following:
"timestamp": "2021-02-16T11:29:05.055Z",
"value": {
"request_capacity": 1000,
"max_poll_inactivity_cycles": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_running_average_window": 50,
"monitored_task_execution_thresholds": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ The API returns the following:
"timestamp": "2021-02-16T11:29:05.055Z",
"value": {
"request_capacity": 1000,
"max_poll_inactivity_cycles": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_running_average_window": 50,
"monitored_task_execution_thresholds": {
Expand Down Expand Up @@ -297,7 +296,6 @@ Evaluating the health stats, you can see the following output under `stats.confi
--------------------------------------------------
{
"request_capacity": 1000,
"max_poll_inactivity_cycles": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_running_average_window": 50,
"monitored_task_execution_thresholds": {
Expand All @@ -322,7 +320,6 @@ Now suppose the output under `stats.configuration.value` is the following:
--------------------------------------------------
{
"request_capacity": 1000,
"max_poll_inactivity_cycles": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_running_average_window": 50,
"monitored_task_execution_thresholds": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,6 @@ kibana_vars=(
xpack.securitySolution.prebuiltRulesPackageVersion
xpack.spaces.maxSpaces
xpack.task_manager.max_attempts
xpack.task_manager.max_poll_inactivity_cycles
xpack.task_manager.max_workers
xpack.task_manager.monitored_aggregated_stats_refresh_rate
xpack.task_manager.monitored_stats_required_freshness
Expand Down
1 change: 0 additions & 1 deletion x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM

- `max_attempts` - The maximum number of times a task will be attempted before being abandoned as failed
- `poll_interval` - How often the background worker should check the task_manager index for more work
- `max_poll_inactivity_cycles` - How many poll intervals is work allowed to block polling for before it's timed out. This does not include task execution, as task execution does not block the polling, but rather includes work needed to manage Task Manager's state.
- `index` - **deprecated** The name of the index that the task_manager will use. This is deprecated, and will be removed starting in 8.0
- `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10)
- `version_conflict_threshold` - The threshold percentage for workers experiencing version conflicts for shifting the polling interval
Expand Down
3 changes: 0 additions & 3 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down Expand Up @@ -73,7 +72,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down Expand Up @@ -123,7 +121,6 @@ describe('config validation', () => {
"warn_threshold": 5000,
},
"max_attempts": 3,
"max_poll_inactivity_cycles": 10,
"max_workers": 10,
"monitored_aggregated_stats_refresh_rate": 60000,
"monitored_stats_health_verbose_log": Object {
Expand Down
6 changes: 0 additions & 6 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { schema, TypeOf } from '@kbn/config-schema';
export const MAX_WORKERS_LIMIT = 100;
export const DEFAULT_MAX_WORKERS = 10;
export const DEFAULT_POLL_INTERVAL = 3000;
export const DEFAULT_MAX_POLL_INACTIVITY_CYCLES = 10;
export const DEFAULT_VERSION_CONFLICT_THRESHOLD = 80;
export const DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY = MAX_WORKERS_LIMIT;

Expand Down Expand Up @@ -64,11 +63,6 @@ export const configSchema = schema.object(
defaultValue: DEFAULT_POLL_INTERVAL,
min: 100,
}),
/* How many poll interval cycles can work take before it's timed out. */
max_poll_inactivity_cycles: schema.number({
defaultValue: DEFAULT_MAX_POLL_INACTIVITY_CYCLES,
min: 1,
}),
/* How many requests can Task Manager buffer before it rejects new requests. */
request_capacity: schema.number({
// a nice round contrived number, feel free to change as we learn how it behaves
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ describe('EphemeralTaskLifecycle', () => {
max_attempts: 9,
poll_interval: 6000000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_required_freshness: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import sinon from 'sinon';
import { Client } from '@elastic/elasticsearch';
import { elasticsearchServiceMock, savedObjectsRepositoryMock } from '@kbn/core/server/mocks';
import { SavedObjectsErrorHelpers, Logger } from '@kbn/core/server';
import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration';
Expand Down Expand Up @@ -43,7 +44,6 @@ describe('managed configuration', () => {
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
monitored_aggregated_stats_refresh_rate: 60000,
monitored_stats_health_verbose_log: {
enabled: false,
Expand Down Expand Up @@ -87,6 +87,9 @@ describe('managed configuration', () => {

const coreStart = coreMock.createStart();
coreStart.elasticsearch = esStart;
esStart.client.asInternalUser.child.mockReturnValue(
esStart.client.asInternalUser as unknown as Client
);
coreStart.savedObjects.createInternalRepository.mockReturnValue(savedObjectsClient);
taskManagerStart = await taskManager.start(coreStart);

Expand Down Expand Up @@ -146,7 +149,8 @@ describe('managed configuration', () => {
});

test('should lower max workers when Elasticsearch returns "cannot execute [inline] scripts" error', async () => {
esStart.client.asInternalUser.search.mockImplementationOnce(async () => {
const childEsClient = esStart.client.asInternalUser.child({}) as jest.Mocked<Client>;
childEsClient.search.mockImplementationOnce(async () => {
throw inlineScriptError;
});

Expand All @@ -165,7 +169,8 @@ describe('managed configuration', () => {
});

test('should increase poll interval when Elasticsearch returns "cannot execute [inline] scripts" error', async () => {
esStart.client.asInternalUser.search.mockImplementationOnce(async () => {
const childEsClient = esStart.client.asInternalUser.child({}) as jest.Mocked<Client>;
childEsClient.search.mockImplementationOnce(async () => {
throw inlineScriptError;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,6 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
value: {
max_workers: 10,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,6 @@ function mockStats(
value: {
max_workers: 0,
poll_interval: 0,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ describe('Configuration Statistics Aggregator', () => {
poll_interval: 6000000,
version_conflict_threshold: 80,
monitored_stats_required_freshness: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
Expand Down Expand Up @@ -61,7 +60,6 @@ describe('Configuration Statistics Aggregator', () => {
expect(initial.value).toEqual({
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand All @@ -76,7 +74,6 @@ describe('Configuration Statistics Aggregator', () => {
expect(updatedWorkers.value).toEqual({
max_workers: 8,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand All @@ -91,7 +88,6 @@ describe('Configuration Statistics Aggregator', () => {
expect(updatedInterval.value).toEqual({
max_workers: 8,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import { ManagedConfiguration } from '../lib/create_managed_configuration';

const CONFIG_FIELDS_TO_EXPOSE = [
'request_capacity',
'max_poll_inactivity_cycles',
'monitored_aggregated_stats_refresh_rate',
'monitored_stats_running_average_window',
'monitored_task_execution_thresholds',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ describe('createMonitoringStatsStream', () => {
poll_interval: 6000000,
version_conflict_threshold: 80,
monitored_stats_required_freshness: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
Expand Down Expand Up @@ -77,7 +76,6 @@ describe('createMonitoringStatsStream', () => {
value: {
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand Down Expand Up @@ -111,7 +109,6 @@ describe('createMonitoringStatsStream', () => {
value: {
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand Down Expand Up @@ -145,7 +142,6 @@ describe('createMonitoringStatsStream', () => {
value: {
max_workers: 10,
poll_interval: 6000000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand Down
45 changes: 0 additions & 45 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const pluginInitializerContextParams = {
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
Expand Down Expand Up @@ -97,50 +96,6 @@ describe('TaskManagerPlugin', () => {
);
});

test('throws if setup methods are called after start', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>(
pluginInitializerContextParams
);

const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);

const setupApi = await taskManagerPlugin.setup(coreMock.createSetup(), {
usageCollection: undefined,
});

// we only start a poller if we have task types that we support and we track
// phases (moving from Setup to Start) based on whether the poller is working
setupApi.registerTaskDefinitions({
setupTimeType: {
title: 'setupTimeType',
createTaskRunner: () => ({ async run() {} }),
},
});

await taskManagerPlugin.start(coreMock.createStart());

expect(() =>
setupApi.addMiddleware({
beforeSave: async (saveOpts) => saveOpts,
beforeRun: async (runOpts) => runOpts,
beforeMarkRunning: async (runOpts) => runOpts,
})
).toThrowErrorMatchingInlineSnapshot(
`"Cannot add Middleware after the task manager has started"`
);

expect(() =>
setupApi.registerTaskDefinitions({
lateRegisteredType: {
title: 'lateRegisteredType',
createTaskRunner: () => ({ async run() {} }),
},
})
).toThrowErrorMatchingInlineSnapshot(
`"Cannot register task definitions after the task manager has started"`
);
});

test('it logs a warning when the unsafe `exclude_task_types` config is used', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>({
...pluginInitializerContextParams,
Expand Down
14 changes: 0 additions & 14 deletions x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,9 @@ export class TaskManagerPlugin
return {
index: TASK_MANAGER_INDEX,
addMiddleware: (middleware: Middleware) => {
this.assertStillInSetup('add Middleware');
this.middleware = addMiddlewareToChain(this.middleware, middleware);
},
registerTaskDefinitions: (taskDefinition: TaskDefinitionRegistry) => {
this.assertStillInSetup('register task definitions');
this.definitions.registerTaskDefinitions(taskDefinition);
},
};
Expand Down Expand Up @@ -286,18 +284,6 @@ export class TaskManagerPlugin
getRegisteredTypes: () => this.definitions.getAllTypes(),
};
}

/**
* Ensures task manager hasn't started
*
* @param {string} the name of the operation being executed
* @returns void
*/
private assertStillInSetup(operation: string) {
if (this.taskPollingLifecycle?.isStarted) {
throw new Error(`Cannot ${operation} after the task manager has started`);
}
}
}

export function getElasticsearchAndSOAvailability(
Expand Down
2 changes: 0 additions & 2 deletions x-pack/plugins/task_manager/server/polling/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,5 @@
* 2.0.
*/

export { createObservableMonitor } from './observable_monitor';
export { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
export { timeoutPromiseAfter } from './timeout_promise_after';
export { delayOnClaimConflicts } from './delay_on_claim_conflicts';
Loading

0 comments on commit cb2e28d

Please sign in to comment.