Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[task manager] provide better diagnostics when task manager performance is degraded #109741

Merged
merged 9 commits into from
Sep 1, 2021
44 changes: 27 additions & 17 deletions x-pack/plugins/task_manager/server/lib/calculate_health_status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,42 @@ import { isString } from 'lodash';
import { JsonValue } from '@kbn/utility-types';
import { HealthStatus, RawMonitoringStats } from '../monitoring';
import { TaskManagerConfig } from '../config';
import { Logger } from '../../../../../src/core/server';

export function calculateHealthStatus(
summarizedStats: RawMonitoringStats,
config: TaskManagerConfig
config: TaskManagerConfig,
logger: Logger
): HealthStatus {
const now = Date.now();

// if "hot" health stats are any more stale than monitored_stats_required_freshness (pollInterval +1s buffer by default)
mikecote marked this conversation as resolved.
Show resolved Hide resolved
// consider the system unhealthy
const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness;
// if "hot" health stats are any more stale than monitored_stats_required_freshness
// times a multiplier, consider the system unhealthy
const requiredHotStatsFreshness: number = config.monitored_stats_required_freshness * 3;

// if "cold" health stats are any more stale than the configured refresh (+ a buffer), consider the system unhealthy
// if "cold" health stats are any more stale than the configured refresh
// times a multiplier, consider the system unhealthy
const requiredColdStatsFreshness: number = config.monitored_aggregated_stats_refresh_rate * 1.5;

/**
* If the monitored stats aren't fresh, return a red status
*/
const healthStatus =
hasStatus(summarizedStats.stats, HealthStatus.Error) ||
hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness) ||
hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness)
? HealthStatus.Error
: hasStatus(summarizedStats.stats, HealthStatus.Warning)
? HealthStatus.Warning
: HealthStatus.OK;
return healthStatus;
if (hasStatus(summarizedStats.stats, HealthStatus.Error)) {
return HealthStatus.Error;
}

if (hasExpiredHotTimestamps(summarizedStats, now, requiredHotStatsFreshness)) {
logger.debug('setting HealthStatus.Error because of expired hot timestamps');
return HealthStatus.Error;
}

if (hasExpiredColdTimestamps(summarizedStats, now, requiredColdStatsFreshness)) {
logger.debug('setting HealthStatus.Error because of expired cold timestamps');
return HealthStatus.Error;
}

if (hasStatus(summarizedStats.stats, HealthStatus.Warning)) {
return HealthStatus.Warning;
}

return HealthStatus.OK;
}

function hasStatus(stats: RawMonitoringStats['stats'], status: HealthStatus): boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,15 @@ describe('logHealthMetrics', () => {
(calculateHealthStatus as jest.Mock<HealthStatus>).mockImplementation(() => HealthStatus.Error);
logHealthMetrics(health, logger, config);

expect((logger as jest.Mocked<Logger>).warn.mock.calls[0][0] as string).toBe(
`Detected potential performance issue with Task Manager. Set 'xpack.task_manager.monitored_stats_health_verbose_log.enabled: true' in your Kibana.yml to enable debug logging`
);
expect((logger as jest.Mocked<Logger>).warn.mock.calls[1][0] as string).toBe(
`Detected potential performance issue with Task Manager. Set 'xpack.task_manager.monitored_stats_health_verbose_log.enabled: true' in your Kibana.yml to enable debug logging`
);
const debugCalls = (logger as jest.Mocked<Logger>).debug.mock.calls;
const performanceMessage = `Detected potential performance issue with Task Manager. Set 'xpack.task_manager.monitored_stats_health_verbose_log.enabled: true' in your Kibana.yml to enable debug logging`;
const lastStatsMessage = /^Latest Monitored Stats: \{.*\}$/;
expect(debugCalls[0][0] as string).toMatch(lastStatsMessage);
expect(debugCalls[1][0] as string).toMatch(lastStatsMessage);
expect(debugCalls[2][0] as string).toBe(performanceMessage);
expect(debugCalls[3][0] as string).toMatch(lastStatsMessage);
expect(debugCalls[4][0] as string).toMatch(lastStatsMessage);
expect(debugCalls[5][0] as string).toBe(performanceMessage);
});

it('should not log a warning message to enable verbose logging when the status goes from Warning to OK', () => {
Expand Down
4 changes: 2 additions & 2 deletions x-pack/plugins/task_manager/server/lib/log_health_metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ export function logHealthMetrics(
capacity_estimation: undefined,
},
};
const statusWithoutCapacity = calculateHealthStatus(healthWithoutCapacity, config);
const statusWithoutCapacity = calculateHealthStatus(healthWithoutCapacity, config, logger);
if (statusWithoutCapacity === HealthStatus.Warning) {
logLevel = LogLevel.Warn;
} else if (statusWithoutCapacity === HealthStatus.Error && !isEmpty(monitoredHealth.stats)) {
Expand Down Expand Up @@ -80,7 +80,7 @@ export function logHealthMetrics(
// This is legacy support - we used to always show this
logger.debug(message);
if (logLevel !== LogLevel.Debug && lastLogLevel === LogLevel.Debug) {
logger.warn(
logger.debug(
mikecote marked this conversation as resolved.
Show resolved Hide resolved
`Detected potential performance issue with Task Manager. Set 'xpack.task_manager.monitored_stats_health_verbose_log.enabled: true' in your Kibana.yml to enable debug logging`
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@

import { CapacityEstimationParams, estimateCapacity } from './capacity_estimation';
import { HealthStatus, RawMonitoringStats } from './monitoring_stats_stream';
import { mockLogger } from '../test_utils';

describe('estimateCapacity', () => {
const logger = mockLogger();

beforeAll(() => {
jest.resetAllMocks();
});

test('estimates the max throughput per minute based on the workload and the assumed kibana instances', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -67,6 +75,7 @@ describe('estimateCapacity', () => {
test('reduces the available capacity per kibana when average task duration exceeds the poll interval', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -124,6 +133,7 @@ describe('estimateCapacity', () => {
test('estimates the max throughput per minute when duration by persistence is empty', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -160,6 +170,7 @@ describe('estimateCapacity', () => {
test('estimates the max throughput per minute based on the workload and the assumed kibana instances when there are tasks that repeat each hour or day', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -215,6 +226,7 @@ describe('estimateCapacity', () => {
test('estimates the max throughput available when there are no active Kibana', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -271,6 +283,7 @@ describe('estimateCapacity', () => {
test('estimates the max throughput available to handle the workload when there are multiple active kibana instances', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -332,6 +345,7 @@ describe('estimateCapacity', () => {

expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -412,6 +426,7 @@ describe('estimateCapacity', () => {

expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -493,6 +508,7 @@ describe('estimateCapacity', () => {
test('marks estimated capacity as OK state when workload and load suggest capacity is sufficient', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -557,6 +573,7 @@ describe('estimateCapacity', () => {
test('marks estimated capacity as Warning state when capacity is insufficient for recent spikes of non-recurring workload, but sufficient for the recurring workload', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -618,6 +635,7 @@ describe('estimateCapacity', () => {
test('marks estimated capacity as Error state when workload and load suggest capacity is insufficient', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -679,6 +697,7 @@ describe('estimateCapacity', () => {
test('recommmends a 20% increase in kibana when a spike in non-recurring tasks forces recurring task capacity to zero', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down Expand Up @@ -754,6 +773,7 @@ describe('estimateCapacity', () => {
test('recommmends a 20% increase in kibana when a spike in non-recurring tasks in a system with insufficient capacity even for recurring tasks', async () => {
expect(
estimateCapacity(
logger,
mockStats(
{ max_workers: 10, poll_interval: 3000 },
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { RawMonitoringStats, RawMonitoredStat, HealthStatus } from './monitoring
import { AveragedStat } from './task_run_calcultors';
import { TaskPersistenceTypes } from './task_run_statistics';
import { asErr, asOk, map, Result } from '../lib/result_type';
import { Logger } from '../../../../../src/core/server';

export interface CapacityEstimationStat extends JsonObject {
observed: {
Expand Down Expand Up @@ -44,6 +45,7 @@ function isCapacityEstimationParams(
}

export function estimateCapacity(
logger: Logger,
capacityStats: CapacityEstimationParams
): RawMonitoredStat<CapacityEstimationStat> {
const workload = capacityStats.workload.value;
Expand Down Expand Up @@ -183,13 +185,14 @@ export function estimateCapacity(
const assumedRequiredThroughputPerMinutePerKibana =
averageCapacityUsedByNonRecurringAndEphemeralTasksPerKibana +
averageRecurringRequiredPerMinute / assumedKibanaInstances;

const status = getHealthStatus(logger, {
assumedRequiredThroughputPerMinutePerKibana,
assumedAverageRecurringRequiredThroughputPerMinutePerKibana,
capacityPerMinutePerKibana,
});
return {
status:
assumedRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana
? HealthStatus.OK
: assumedAverageRecurringRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana
? HealthStatus.Warning
: HealthStatus.Error,
status,
timestamp: new Date().toISOString(),
value: {
observed: mapValues(
Expand Down Expand Up @@ -220,13 +223,43 @@ export function estimateCapacity(
};
}

interface GetHealthStatusParams {
assumedRequiredThroughputPerMinutePerKibana: number;
assumedAverageRecurringRequiredThroughputPerMinutePerKibana: number;
capacityPerMinutePerKibana: number;
}

function getHealthStatus(logger: Logger, params: GetHealthStatusParams): HealthStatus {
const {
assumedRequiredThroughputPerMinutePerKibana,
assumedAverageRecurringRequiredThroughputPerMinutePerKibana,
capacityPerMinutePerKibana,
} = params;
if (assumedRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana) {
return HealthStatus.OK;
}

if (assumedAverageRecurringRequiredThroughputPerMinutePerKibana < capacityPerMinutePerKibana) {
logger.debug(
`setting HealthStatus.Warning because assumedAverageRecurringRequiredThroughputPerMinutePerKibana (${assumedAverageRecurringRequiredThroughputPerMinutePerKibana}) < capacityPerMinutePerKibana (${capacityPerMinutePerKibana})`
);
return HealthStatus.Warning;
}

logger.debug(
`setting HealthStatus.Error because assumedRequiredThroughputPerMinutePerKibana (${assumedRequiredThroughputPerMinutePerKibana}) >= capacityPerMinutePerKibana (${capacityPerMinutePerKibana}) AND assumedAverageRecurringRequiredThroughputPerMinutePerKibana (${assumedAverageRecurringRequiredThroughputPerMinutePerKibana}) >= capacityPerMinutePerKibana (${capacityPerMinutePerKibana})`
);
return HealthStatus.Error;
}

export function withCapacityEstimate(
logger: Logger,
monitoredStats: RawMonitoringStats['stats']
): RawMonitoringStats['stats'] {
if (isCapacityEstimationParams(monitoredStats)) {
return {
...monitoredStats,
capacity_estimation: estimateCapacity(monitoredStats),
capacity_estimation: estimateCapacity(logger, monitoredStats),
};
}
return monitoredStats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,15 @@ export function createMonitoringStatsStream(
}

export function summarizeMonitoringStats(
logger: Logger,
{
// eslint-disable-next-line @typescript-eslint/naming-convention
last_update,
stats: { runtime, workload, configuration, ephemeral },
}: MonitoringStats,
config: TaskManagerConfig
): RawMonitoringStats {
const summarizedStats = withCapacityEstimate({
const summarizedStats = withCapacityEstimate(logger, {
...(configuration
? {
configuration: {
Expand All @@ -156,7 +157,7 @@ export function summarizeMonitoringStats(
? {
runtime: {
timestamp: runtime.timestamp,
...summarizeTaskRunStat(runtime.value, config),
...summarizeTaskRunStat(logger, runtime.value, config),
},
}
: {}),
Expand Down
Loading