Skip to content

Commit

Permalink
Set default to task runner's currentPollInterval
Browse files Browse the repository at this point in the history
  • Loading branch information
mikecote committed Sep 13, 2024
1 parent a396408 commit d30bbfc
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 12 deletions.
17 changes: 17 additions & 0 deletions x-pack/plugins/task_manager/server/config.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { type TaskManagerConfig, configSchema } from './config';

const createConfigMock = (overwrites: Partial<TaskManagerConfig> = {}) => {
const mocked: TaskManagerConfig = configSchema.validate(overwrites);
return mocked;
};

export const configMock = {
create: createConfigMock,
};
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
import { schema } from '@kbn/config-schema';
import { CLAIM_STRATEGY_MGET, CLAIM_STRATEGY_UPDATE_BY_QUERY } from '../config';
import * as nextRunAtUtils from '../lib/get_next_run_at';
import { configMock } from '../config.mock';

const baseDelay = 5 * 60 * 1000;
const executionContext = executionContextServiceMock.createSetupContract();
Expand Down Expand Up @@ -2467,10 +2468,12 @@ describe('TaskManagerRunner', () => {
onTaskEvent: opts.onTaskEvent,
executionContext,
usageCounter,
eventLoopDelayConfig: {
monitor: true,
warn_threshold: 5000,
},
config: configMock.create({
event_loop_delay: {
monitor: true,
warn_threshold: 5000,
},
}),
allowReadingInvalidState: opts.allowReadingInvalidState || false,
strategy: opts.strategy ?? CLAIM_STRATEGY_UPDATE_BY_QUERY,
pollIntervalConfiguration$: new BehaviorSubject(500),
Expand Down
17 changes: 9 additions & 8 deletions x-pack/plugins/task_manager/server/task_running/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import {
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { isUnrecoverableError } from './errors';
import { CLAIM_STRATEGY_MGET, type EventLoopDelayConfig } from '../config';
import { CLAIM_STRATEGY_MGET, type TaskManagerConfig } from '../config';
import { TaskValidator } from '../task_validator';
import { getRetryAt, getRetryDate, getTimeout } from '../lib/get_retry_at';
import { getNextRunAt } from '../lib/get_next_run_at';
Expand Down Expand Up @@ -109,7 +109,7 @@ type Opts = {
defaultMaxAttempts: number;
executionContext: ExecutionContextStart;
usageCounter?: UsageCounter;
eventLoopDelayConfig: EventLoopDelayConfig;
config: TaskManagerConfig;
allowReadingInvalidState: boolean;
strategy: string;
pollIntervalConfiguration$: Observable<number>;
Expand Down Expand Up @@ -162,10 +162,10 @@ export class TaskManagerRunner implements TaskRunner {
private uuid: string;
private readonly executionContext: ExecutionContextStart;
private usageCounter?: UsageCounter;
private eventLoopDelayConfig: EventLoopDelayConfig;
private config: TaskManagerConfig;
private readonly taskValidator: TaskValidator;
private readonly claimStrategy: string;
private currentPollInterval?: number;
private currentPollInterval: number;

/**
* Creates an instance of TaskManagerRunner.
Expand All @@ -188,7 +188,7 @@ export class TaskManagerRunner implements TaskRunner {
onTaskEvent = identity,
executionContext,
usageCounter,
eventLoopDelayConfig,
config,
allowReadingInvalidState,
strategy,
pollIntervalConfiguration$,
Expand All @@ -204,13 +204,14 @@ export class TaskManagerRunner implements TaskRunner {
this.executionContext = executionContext;
this.usageCounter = usageCounter;
this.uuid = uuidv4();
this.eventLoopDelayConfig = eventLoopDelayConfig;
this.config = config;
this.taskValidator = new TaskValidator({
logger: this.logger,
definitions: this.definitions,
allowReadingInvalidState,
});
this.claimStrategy = strategy;
this.currentPollInterval = config.poll_interval;
pollIntervalConfiguration$.subscribe((pollInterval) => {
this.currentPollInterval = pollInterval;
});
Expand Down Expand Up @@ -338,7 +339,7 @@ export class TaskManagerRunner implements TaskRunner {
const apmTrans = apm.startTransaction(this.taskType, TASK_MANAGER_RUN_TRANSACTION_TYPE, {
childOf: this.instance.task.traceparent,
});
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.eventLoopDelayConfig);
const stopTaskTimer = startTaskTimerWithEventLoopMonitoring(this.config.event_loop_delay);

// Validate state
const stateValidationResult = this.validateTaskState(this.instance.task);
Expand Down Expand Up @@ -801,7 +802,7 @@ export class TaskManagerRunner implements TaskRunner {

const { eventLoopBlockMs = 0 } = taskTiming;
const taskLabel = `${this.taskType} ${this.instance.task.id}`;
if (eventLoopBlockMs > this.eventLoopDelayConfig.warn_threshold) {
if (eventLoopBlockMs > this.config.event_loop_delay.warn_threshold) {
this.logger.warn(
`event loop blocked for at least ${eventLoopBlockMs} ms while running task ${taskLabel}`,
{
Expand Down

0 comments on commit d30bbfc

Please sign in to comment.