Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] [Alerting] Improving health status check (#93282) #93639

Merged
merged 3 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 166 additions & 57 deletions x-pack/plugins/alerts/server/health/get_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,73 +5,182 @@
* 2.0.
*/

import { ServiceStatusLevels } from '../../../../../src/core/server';
import { taskManagerMock } from '../../../task_manager/server/mocks';
import { getHealthStatusStream } from '.';
import { TaskStatus } from '../../../task_manager/server';
import {
getHealthStatusStream,
getHealthServiceStatusWithRetryAndErrorHandling,
MAX_RETRY_ATTEMPTS,
} from './get_state';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server';
import { HealthStatus } from '../types';

describe('getHealthStatusStream()', () => {
const mockTaskManager = taskManagerMock.createStart();

it('should return an object with the "unavailable" level and proper summary of "Alerting framework is unhealthy"', async () => {
mockTaskManager.get.mockReturnValue(
new Promise((_resolve, _reject) => {
return {
id: 'test',
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {
runs: 1,
health_status: HealthStatus.Warning,
},
taskType: 'alerting:alerting_health_check',
params: {
alertId: '1',
},
ownerId: null,
};
const tick = () => new Promise((resolve) => setImmediate(resolve));

const getHealthCheckTask = (overrides = {}): ConcreteTaskInstance => ({
id: 'test',
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {
runs: 1,
health_status: HealthStatus.OK,
},
taskType: 'alerting:alerting_health_check',
params: {
alertId: '1',
},
ownerId: null,
...overrides,
});

describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
beforeEach(() => jest.useFakeTimers());

it('should get status at each interval', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(getHealthCheckTask());
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);

getHealthStatusStream(mockTaskManager, pollInterval).subscribe();

// shouldn't fire before poll interval passes
// should fire once each poll interval
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(1);
jest.advanceTimersByTime(pollInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(2);
jest.advanceTimersByTime(pollInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(3);
});

it('should retry on error', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue(new Error('Failure'));
const retryDelay = 10;
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);

getHealthStatusStream(mockTaskManager, pollInterval, retryDelay).subscribe();

jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(1);

// Retry on failure
let numTimesCalled = 1;
for (let i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
await tick();
jest.advanceTimersByTime(retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled++ + 1);
}

// Once we've exceeded max retries, should not try again
await tick();
jest.advanceTimersByTime(retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled);

// Once another poll interval passes, should call fn again
await tick();
jest.advanceTimersByTime(pollInterval - MAX_RETRY_ATTEMPTS * retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled + 1);
});

it('should return healthy status when health status is "ok"', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(getHealthCheckTask());

const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
).toPromise();

expect(status.level).toEqual(ServiceStatusLevels.available);
expect(status.summary).toEqual('Alerting framework is available');
});

it('should return degraded status when health status is "warn"', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(
getHealthCheckTask({
state: {
runs: 1,
health_status: HealthStatus.Warning,
},
})
);
getHealthStatusStream(mockTaskManager).subscribe(
(val: { level: Readonly<unknown>; summary: string }) => {
expect(val.level).toBe(false);
}
);

const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
).toPromise();

expect(status.level).toEqual(ServiceStatusLevels.degraded);
expect(status.summary).toEqual('Alerting framework is degraded');
});

it('should return an object with the "available" level and proper summary of "Alerting framework is healthy"', async () => {
mockTaskManager.get.mockReturnValue(
new Promise((_resolve, _reject) => {
return {
id: 'test',
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {
runs: 1,
health_status: HealthStatus.OK,
},
taskType: 'alerting:alerting_health_check',
params: {
alertId: '1',
},
ownerId: null,
};
it('should return unavailable status when health status is "error"', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(
getHealthCheckTask({
state: {
runs: 1,
health_status: HealthStatus.Error,
},
})
);
getHealthStatusStream(mockTaskManager).subscribe(
(val: { level: Readonly<unknown>; summary: string }) => {
expect(val.level).toBe(true);

const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
).toPromise();

expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.meta).toBeUndefined();
});

it('should retry on error and return healthy status if retry succeeds', async () => {
const retryDelay = 10;
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get
.mockRejectedValueOnce(new Error('Failure'))
.mockResolvedValue(getHealthCheckTask());

getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe(
(status) => {
expect(status.level).toEqual(ServiceStatusLevels.available);
expect(status.summary).toEqual('Alerting framework is available');
}
);

await tick();
jest.advanceTimersByTime(retryDelay * 2);
});

it('should retry on error and return unavailable status if retry fails', async () => {
const retryDelay = 10;
const err = new Error('Failure');
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue(err);

getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe(
(status) => {
expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.meta).toEqual({ error: err });
}
);

for (let i = 0; i < MAX_RETRY_ATTEMPTS + 1; i++) {
await tick();
jest.advanceTimersByTime(retryDelay);
}
expect(mockTaskManager.get).toHaveBeenCalledTimes(MAX_RETRY_ATTEMPTS + 1);
});
});
72 changes: 51 additions & 21 deletions x-pack/plugins/alerts/server/health/get_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
*/

import { i18n } from '@kbn/i18n';
import { interval, Observable } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators';
import { defer, of, interval, Observable, throwError, timer } from 'rxjs';
import { catchError, mergeMap, retryWhen, switchMap } from 'rxjs/operators';
import { ServiceStatus, ServiceStatusLevels } from '../../../../../src/core/server';
import { TaskManagerStartContract } from '../../../task_manager/server';
import { HEALTH_TASK_ID } from './task';
import { HealthStatus } from '../types';

export const MAX_RETRY_ATTEMPTS = 3;
const HEALTH_STATUS_INTERVAL = 60000 * 5; // Five minutes
const RETRY_DELAY = 5000; // Wait 5 seconds before retrying on errors

async function getLatestTaskState(taskManager: TaskManagerStartContract) {
try {
const result = await taskManager.get(HEALTH_TASK_ID);
Expand Down Expand Up @@ -48,27 +52,53 @@ const LEVEL_SUMMARY = {
),
};

export const getHealthStatusStream = (
const getHealthServiceStatus = async (
taskManager: TaskManagerStartContract
): Promise<ServiceStatus<unknown>> => {
const doc = await getLatestTaskState(taskManager);
const level =
doc?.state?.health_status === HealthStatus.OK
? ServiceStatusLevels.available
: doc?.state?.health_status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return {
level,
summary: LEVEL_SUMMARY[level.toString()],
};
};

export const getHealthServiceStatusWithRetryAndErrorHandling = (
taskManager: TaskManagerStartContract,
retryDelay?: number
): Observable<ServiceStatus<unknown>> => {
return interval(60000 * 5).pipe(
switchMap(async () => {
const doc = await getLatestTaskState(taskManager);
const level =
doc?.state?.health_status === HealthStatus.OK
? ServiceStatusLevels.available
: doc?.state?.health_status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return {
level,
summary: LEVEL_SUMMARY[level.toString()],
};
return defer(() => getHealthServiceStatus(taskManager)).pipe(
retryWhen((errors) => {
return errors.pipe(
mergeMap((error, i) => {
const retryAttempt = i + 1;
if (retryAttempt > MAX_RETRY_ATTEMPTS) {
return throwError(error);
}
return timer(retryDelay ?? RETRY_DELAY);
})
);
}),
catchError(async (error) => ({
level: ServiceStatusLevels.unavailable,
summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()],
meta: { error },
}))
catchError((error) => {
return of({
level: ServiceStatusLevels.unavailable,
summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()],
meta: { error },
});
})
);
};

export const getHealthStatusStream = (
taskManager: TaskManagerStartContract,
healthStatusInterval?: number,
retryDelay?: number
): Observable<ServiceStatus<unknown>> =>
interval(healthStatusInterval ?? HEALTH_STATUS_INTERVAL).pipe(
switchMap(() => getHealthServiceStatusWithRetryAndErrorHandling(taskManager, retryDelay))
);
5 changes: 3 additions & 2 deletions x-pack/plugins/alerts/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import type { PublicMethodsOf } from '@kbn/utility-types';
import { first, map } from 'rxjs/operators';
import { first, map, share } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { combineLatest } from 'rxjs';
Expand Down Expand Up @@ -251,7 +251,8 @@ export class AlertingPlugin {
} else {
return derivedStatus;
}
})
}),
share()
)
);
});
Expand Down