Skip to content

Commit

Permalink
[Alerting] Improving health status check (#93282) (#93639)
Browse files Browse the repository at this point in the history
* wip

* Moving catchError so observable stream does not complete. Adding retry on failure

* Using retryWhen. Updating unit tests

* PR fixes

Co-authored-by: Kibana Machine <[email protected]>

Co-authored-by: ymao1 <[email protected]>
  • Loading branch information
kibanamachine and ymao1 authored Mar 4, 2021
1 parent 1d032ef commit 4fb0186
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 80 deletions.
223 changes: 166 additions & 57 deletions x-pack/plugins/alerts/server/health/get_state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,73 +5,182 @@
* 2.0.
*/

import { ServiceStatusLevels } from '../../../../../src/core/server';
import { taskManagerMock } from '../../../task_manager/server/mocks';
import { getHealthStatusStream } from '.';
import { TaskStatus } from '../../../task_manager/server';
import {
getHealthStatusStream,
getHealthServiceStatusWithRetryAndErrorHandling,
MAX_RETRY_ATTEMPTS,
} from './get_state';
import { ConcreteTaskInstance, TaskStatus } from '../../../task_manager/server';
import { HealthStatus } from '../types';

describe('getHealthStatusStream()', () => {
const mockTaskManager = taskManagerMock.createStart();

it('should return an object with the "unavailable" level and proper summary of "Alerting framework is unhealthy"', async () => {
mockTaskManager.get.mockReturnValue(
new Promise((_resolve, _reject) => {
return {
id: 'test',
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {
runs: 1,
health_status: HealthStatus.Warning,
},
taskType: 'alerting:alerting_health_check',
params: {
alertId: '1',
},
ownerId: null,
};
const tick = () => new Promise((resolve) => setImmediate(resolve));

const getHealthCheckTask = (overrides = {}): ConcreteTaskInstance => ({
id: 'test',
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {
runs: 1,
health_status: HealthStatus.OK,
},
taskType: 'alerting:alerting_health_check',
params: {
alertId: '1',
},
ownerId: null,
...overrides,
});

describe('getHealthServiceStatusWithRetryAndErrorHandling', () => {
beforeEach(() => jest.useFakeTimers());

it('should get status at each interval', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(getHealthCheckTask());
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);

getHealthStatusStream(mockTaskManager, pollInterval).subscribe();

// shouldn't fire before poll interval passes
// should fire once each poll interval
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(1);
jest.advanceTimersByTime(pollInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(2);
jest.advanceTimersByTime(pollInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(3);
});

it('should retry on error', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue(new Error('Failure'));
const retryDelay = 10;
const pollInterval = 100;
const halfInterval = Math.floor(pollInterval / 2);

getHealthStatusStream(mockTaskManager, pollInterval, retryDelay).subscribe();

jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(0);
jest.advanceTimersByTime(halfInterval);
expect(mockTaskManager.get).toHaveBeenCalledTimes(1);

// Retry on failure
let numTimesCalled = 1;
for (let i = 0; i < MAX_RETRY_ATTEMPTS; i++) {
await tick();
jest.advanceTimersByTime(retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled++ + 1);
}

// Once we've exceeded max retries, should not try again
await tick();
jest.advanceTimersByTime(retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled);

// Once another poll interval passes, should call fn again
await tick();
jest.advanceTimersByTime(pollInterval - MAX_RETRY_ATTEMPTS * retryDelay);
expect(mockTaskManager.get).toHaveBeenCalledTimes(numTimesCalled + 1);
});

it('should return healthy status when health status is "ok"', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(getHealthCheckTask());

const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
).toPromise();

expect(status.level).toEqual(ServiceStatusLevels.available);
expect(status.summary).toEqual('Alerting framework is available');
});

it('should return degraded status when health status is "warn"', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(
getHealthCheckTask({
state: {
runs: 1,
health_status: HealthStatus.Warning,
},
})
);
getHealthStatusStream(mockTaskManager).subscribe(
(val: { level: Readonly<unknown>; summary: string }) => {
expect(val.level).toBe(false);
}
);

const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
).toPromise();

expect(status.level).toEqual(ServiceStatusLevels.degraded);
expect(status.summary).toEqual('Alerting framework is degraded');
});

it('should return an object with the "available" level and proper summary of "Alerting framework is healthy"', async () => {
mockTaskManager.get.mockReturnValue(
new Promise((_resolve, _reject) => {
return {
id: 'test',
attempts: 0,
status: TaskStatus.Running,
version: '123',
runAt: new Date(),
scheduledAt: new Date(),
startedAt: new Date(),
retryAt: new Date(Date.now() + 5 * 60 * 1000),
state: {
runs: 1,
health_status: HealthStatus.OK,
},
taskType: 'alerting:alerting_health_check',
params: {
alertId: '1',
},
ownerId: null,
};
it('should return unavailable status when health status is "error"', async () => {
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockResolvedValue(
getHealthCheckTask({
state: {
runs: 1,
health_status: HealthStatus.Error,
},
})
);
getHealthStatusStream(mockTaskManager).subscribe(
(val: { level: Readonly<unknown>; summary: string }) => {
expect(val.level).toBe(true);

const status = await getHealthServiceStatusWithRetryAndErrorHandling(
mockTaskManager
).toPromise();

expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.meta).toBeUndefined();
});

it('should retry on error and return healthy status if retry succeeds', async () => {
const retryDelay = 10;
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get
.mockRejectedValueOnce(new Error('Failure'))
.mockResolvedValue(getHealthCheckTask());

getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe(
(status) => {
expect(status.level).toEqual(ServiceStatusLevels.available);
expect(status.summary).toEqual('Alerting framework is available');
}
);

await tick();
jest.advanceTimersByTime(retryDelay * 2);
});

it('should retry on error and return unavailable status if retry fails', async () => {
const retryDelay = 10;
const err = new Error('Failure');
const mockTaskManager = taskManagerMock.createStart();
mockTaskManager.get.mockRejectedValue(err);

getHealthServiceStatusWithRetryAndErrorHandling(mockTaskManager, retryDelay).subscribe(
(status) => {
expect(status.level).toEqual(ServiceStatusLevels.unavailable);
expect(status.summary).toEqual('Alerting framework is unavailable');
expect(status.meta).toEqual({ error: err });
}
);

for (let i = 0; i < MAX_RETRY_ATTEMPTS + 1; i++) {
await tick();
jest.advanceTimersByTime(retryDelay);
}
expect(mockTaskManager.get).toHaveBeenCalledTimes(MAX_RETRY_ATTEMPTS + 1);
});
});
72 changes: 51 additions & 21 deletions x-pack/plugins/alerts/server/health/get_state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
*/

import { i18n } from '@kbn/i18n';
import { interval, Observable } from 'rxjs';
import { catchError, switchMap } from 'rxjs/operators';
import { defer, of, interval, Observable, throwError, timer } from 'rxjs';
import { catchError, mergeMap, retryWhen, switchMap } from 'rxjs/operators';
import { ServiceStatus, ServiceStatusLevels } from '../../../../../src/core/server';
import { TaskManagerStartContract } from '../../../task_manager/server';
import { HEALTH_TASK_ID } from './task';
import { HealthStatus } from '../types';

export const MAX_RETRY_ATTEMPTS = 3;
const HEALTH_STATUS_INTERVAL = 60000 * 5; // Five minutes
const RETRY_DELAY = 5000; // Wait 5 seconds before retrying on errors

async function getLatestTaskState(taskManager: TaskManagerStartContract) {
try {
const result = await taskManager.get(HEALTH_TASK_ID);
Expand Down Expand Up @@ -48,27 +52,53 @@ const LEVEL_SUMMARY = {
),
};

export const getHealthStatusStream = (
const getHealthServiceStatus = async (
taskManager: TaskManagerStartContract
): Promise<ServiceStatus<unknown>> => {
const doc = await getLatestTaskState(taskManager);
const level =
doc?.state?.health_status === HealthStatus.OK
? ServiceStatusLevels.available
: doc?.state?.health_status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return {
level,
summary: LEVEL_SUMMARY[level.toString()],
};
};

export const getHealthServiceStatusWithRetryAndErrorHandling = (
taskManager: TaskManagerStartContract,
retryDelay?: number
): Observable<ServiceStatus<unknown>> => {
return interval(60000 * 5).pipe(
switchMap(async () => {
const doc = await getLatestTaskState(taskManager);
const level =
doc?.state?.health_status === HealthStatus.OK
? ServiceStatusLevels.available
: doc?.state?.health_status === HealthStatus.Warning
? ServiceStatusLevels.degraded
: ServiceStatusLevels.unavailable;
return {
level,
summary: LEVEL_SUMMARY[level.toString()],
};
return defer(() => getHealthServiceStatus(taskManager)).pipe(
retryWhen((errors) => {
return errors.pipe(
mergeMap((error, i) => {
const retryAttempt = i + 1;
if (retryAttempt > MAX_RETRY_ATTEMPTS) {
return throwError(error);
}
return timer(retryDelay ?? RETRY_DELAY);
})
);
}),
catchError(async (error) => ({
level: ServiceStatusLevels.unavailable,
summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()],
meta: { error },
}))
catchError((error) => {
return of({
level: ServiceStatusLevels.unavailable,
summary: LEVEL_SUMMARY[ServiceStatusLevels.unavailable.toString()],
meta: { error },
});
})
);
};

export const getHealthStatusStream = (
taskManager: TaskManagerStartContract,
healthStatusInterval?: number,
retryDelay?: number
): Observable<ServiceStatus<unknown>> =>
interval(healthStatusInterval ?? HEALTH_STATUS_INTERVAL).pipe(
switchMap(() => getHealthServiceStatusWithRetryAndErrorHandling(taskManager, retryDelay))
);
5 changes: 3 additions & 2 deletions x-pack/plugins/alerts/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import type { PublicMethodsOf } from '@kbn/utility-types';
import { first, map } from 'rxjs/operators';
import { first, map, share } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { UsageCollectionSetup } from 'src/plugins/usage_collection/server';
import { combineLatest } from 'rxjs';
Expand Down Expand Up @@ -251,7 +251,8 @@ export class AlertingPlugin {
} else {
return derivedStatus;
}
})
}),
share()
)
);
});
Expand Down

0 comments on commit 4fb0186

Please sign in to comment.