Skip to content

Commit

Permalink
[RCA] AI-assisted root cause analysis (elastic#197200)
Browse files Browse the repository at this point in the history
Implements an LLM-based root cause analysis process. At a high level, it
works by investigating entities - which means pulling in alerts, SLOs,
and log patterns. From there, it can inspect related entities to get to
the root cause.

The backend implementation lives in
`x-pack/packages/observability_utils-*` (`service_rca`). It can be
imported into any server-side plugin and executed from there.

The UI changes are mostly contained to
`x-pack/plugins/observability_solution/observabillity_ai_assistant_app`.
This plugin now exports a `RootCauseAnalysisContainer` which takes a
stream of data that is returned by the root cause analysis process.

The current implementation lives in the Investigate app. There, it calls
its own endpoint that kicks off the RCA process, and feeds it into the
`RootCauseAnalysisContainer` exposed by the Observability AI Assistant
app plugin. I've left it in a route there so the investigation itself
can be updated as the process runs - this would allow the user to close
the browser and come back later, and see a full investigation.

> [!NOTE]
> Notes for reviewing teams
> 
> @kbn/es-types:
> - support both types and typesWithBodyKey
> - simplify KeysOfSources type
> 
> @kbn/server-route-repository:
> - abortable streamed responses
> 
> @kbn/sse-utils*:
> - abortable streamed responses
> - serialize errors in specific format for more reliable re-hydration
of errors
> - keep connection open with SSE comments
> 
> @kbn/inference-*:
> - export *Of variants of types, for easier manual inference
> - add automated retries for `output` API
> - add `name` to tool responses for type inference (get type of tool
response via tool name)
> - add `data` to tool responses for transporting internal data (not
sent to the LLM)
> - simplify `chunksIntoMessage`
> - allow consumers of nlToEsql task to add to `system` prompt
> - add toolCallId to validation error message
> 
> @kbn/aiops*:
> - export `categorizationAnalyzer` for use in observability-ai*
> 
> @kbn/observability-ai-assistant*
> - configurable limit (tokens or doc count) for knowledge base recall
> 
> @kbn/slo*:
> - export client that returns summary indices

---------

Co-authored-by: kibanamachine <[email protected]>
Co-authored-by: Maryam Saeidi <[email protected]>
Co-authored-by: Bena Kansara <[email protected]>
  • Loading branch information
4 people authored Dec 11, 2024
1 parent 64e9728 commit fa1998c
Show file tree
Hide file tree
Showing 144 changed files with 27,293 additions and 364 deletions.
2 changes: 2 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,7 @@ module.exports = {
'x-pack/plugins/observability_solution/exploratory_view/**/*.{js,mjs,ts,tsx}',
'x-pack/plugins/observability_solution/ux/**/*.{js,mjs,ts,tsx}',
'x-pack/plugins/observability_solution/slo/**/*.{js,mjs,ts,tsx}',
'x-pack/packages/observability/**/*.{js,mjs,ts,tsx}',
],
rules: {
'no-console': ['warn', { allow: ['error'] }],
Expand All @@ -938,6 +939,7 @@ module.exports = {
'x-pack/plugins/observability_solution/observability/**/*.stories.*',
'x-pack/plugins/observability_solution/exploratory_view/**/*.stories.*',
'x-pack/plugins/observability_solution/slo/**/*.stories.*',
'x-pack/packages/observability/**/*.{js,mjs,ts,tsx}',
],
rules: {
'react/function-component-definition': [
Expand Down
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,8 @@ x-pack/packages/observability/alerting_rule_utils @elastic/obs-ux-management-tea
x-pack/packages/observability/alerting_test_data @elastic/obs-ux-management-team
x-pack/packages/observability/get_padded_alert_time_range_util @elastic/obs-ux-management-team
x-pack/packages/observability/logs_overview @elastic/obs-ux-logs-team
x-pack/packages/observability/observability_ai/observability_ai_common @elastic/obs-ai-assistant
x-pack/packages/observability/observability_ai/observability_ai_server @elastic/obs-ai-assistant
x-pack/packages/observability/observability_utils/observability_utils_browser @elastic/observability-ui
x-pack/packages/observability/observability_utils/observability_utils_common @elastic/observability-ui
x-pack/packages/observability/observability_utils/observability_utils_server @elastic/observability-ui
Expand Down
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,8 @@
"@kbn/observability-ai-assistant-app-plugin": "link:x-pack/plugins/observability_solution/observability_ai_assistant_app",
"@kbn/observability-ai-assistant-management-plugin": "link:x-pack/plugins/observability_solution/observability_ai_assistant_management",
"@kbn/observability-ai-assistant-plugin": "link:x-pack/plugins/observability_solution/observability_ai_assistant",
"@kbn/observability-ai-common": "link:x-pack/packages/observability/observability_ai/observability_ai_common",
"@kbn/observability-ai-server": "link:x-pack/packages/observability/observability_ai/observability_ai_server",
"@kbn/observability-alert-details": "link:x-pack/packages/observability/alert_details",
"@kbn/observability-alerting-rule-utils": "link:x-pack/packages/observability/alerting_rule_utils",
"@kbn/observability-alerting-test-data": "link:x-pack/packages/observability/alerting_test_data",
Expand Down Expand Up @@ -1145,6 +1147,7 @@
"fnv-plus": "^1.3.1",
"formik": "^2.4.6",
"fp-ts": "^2.3.1",
"fuse.js": "^7.0.0",
"get-port": "^5.0.0",
"getopts": "^2.2.5",
"getos": "^3.1.0",
Expand Down
1 change: 1 addition & 0 deletions packages/kbn-es-types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export type {
SearchHit,
ESSearchResponse,
ESSearchRequest,
ESSearchRequestWithoutBody,
ESSourceOptions,
InferSearchResponseOf,
AggregationResultOf,
Expand Down
2 changes: 2 additions & 0 deletions packages/kbn-es-types/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*/

import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import * as estypesWithoutBody from '@elastic/elasticsearch/lib/api/types';
import type {
Field,
QueryDslFieldAndFormat,
Expand All @@ -26,6 +27,7 @@ import {

export type ESFilter = estypes.QueryDslQueryContainer;
export type ESSearchRequest = estypes.SearchRequest;
export type ESSearchRequestWithoutBody = estypesWithoutBody.SearchRequest;
export type AggregationOptionsByType = Required<estypes.AggregationsAggregationContainer>;

// Typings for Elasticsearch queries and aggregations. These are intended to be
Expand Down
21 changes: 8 additions & 13 deletions packages/kbn-es-types/src/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,15 @@ type InvalidAggregationRequest = unknown;
// Union keys are not included in keyof, but extends iterates over the types in a union.
type ValidAggregationKeysOf<T extends Record<string, any>> = T extends T ? keyof T : never;

type KeyOfSource<T> = Record<
keyof T,
(T extends Record<string, { terms: { missing_bucket: true } }> ? null : never) | string | number
>;
type KeyOfSource<T> = {
[key in keyof T]:
| (T[key] extends Record<string, { terms: { missing_bucket: true } }> ? null : never)
| string
| number;
};

type KeysOfSources<T extends any[]> = T extends [any]
? KeyOfSource<T[0]>
: T extends [any, any]
? KeyOfSource<T[0]> & KeyOfSource<T[1]>
: T extends [any, any, any]
? KeyOfSource<T[0]> & KeyOfSource<T[1]> & KeyOfSource<T[2]>
: T extends [any, any, any, any]
? KeyOfSource<T[0]> & KeyOfSource<T[1]> & KeyOfSource<T[2]> & KeyOfSource<T[3]>
: Record<string, null | string | number>;
// convert to intersection to be able to get all the keys
type KeysOfSources<T extends any[]> = UnionToIntersection<KeyOfSource<ValuesType<Pick<T, number>>>>;

type CompositeKeysOf<TAggregationContainer extends AggregationsAggregationContainer> =
TAggregationContainer extends {
Expand Down
3 changes: 3 additions & 0 deletions packages/kbn-investigation-shared/src/rest_specs/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ const updateInvestigationParamsSchema = z.object({
}),
tags: z.array(z.string()),
externalIncidentUrl: z.string().nullable(),
rootCauseAnalysis: z.object({
events: z.array(z.any()),
}),
})
.partial(),
});
Expand Down
5 changes: 5 additions & 0 deletions packages/kbn-investigation-shared/src/schema/investigation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ const investigationSchema = z.object({
notes: z.array(investigationNoteSchema),
items: z.array(investigationItemSchema),
externalIncidentUrl: z.string().nullable(),
rootCauseAnalysis: z
.object({
events: z.array(z.any()),
})
.optional(),
});

type Status = z.infer<typeof statusSchema>;
Expand Down
9 changes: 8 additions & 1 deletion packages/kbn-server-route-repository/src/register_routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,15 @@ export function registerRoutes<TDependencies extends Record<string, any>>({
if (isKibanaResponse(result)) {
return result;
} else if (isObservable(result)) {
const controller = new AbortController();
request.events.aborted$.subscribe(() => {
controller.abort();
});
return response.ok({
body: observableIntoEventSourceStream(result as Observable<ServerSentEvent>),
body: observableIntoEventSourceStream(result as Observable<ServerSentEvent>, {
logger,
signal: controller.signal,
}),
});
} else {
const body = result || {};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

import { Logger } from '@kbn/logging';
import { observableIntoEventSourceStream } from './observable_into_event_source_stream';
import { PassThrough } from 'node:stream';
import { Subject } from 'rxjs';
import { ServerSentEvent, ServerSentEventType } from '@kbn/sse-utils/src/events';
import {
ServerSentEventErrorCode,
createSSEInternalError,
createSSERequestError,
} from '@kbn/sse-utils/src/errors';

describe('observableIntoEventSourceStream', () => {
let logger: jest.Mocked<Logger>;

let controller: AbortController;

let stream: PassThrough;
let source$: Subject<ServerSentEvent>;

let data: string[];

beforeEach(() => {
jest.useFakeTimers();
logger = {
debug: jest.fn(),
error: jest.fn(),
} as unknown as jest.Mocked<Logger>;

controller = new AbortController();
source$ = new Subject();
data = [];

stream = observableIntoEventSourceStream(source$, { logger, signal: controller.signal });
stream.on('data', (chunk) => {
data.push(chunk.toString());
});
});

afterEach(() => {
jest.clearAllTimers();
});

it('writes events into the stream in SSE format', () => {
source$.next({ type: ServerSentEventType.data, data: { foo: 'bar' } });
source$.complete();

jest.runAllTimers();

expect(data).toEqual(['event: data\ndata: {"data":{"foo":"bar"}}\n\n']);
});

it('handles SSE errors', () => {
const sseError = createSSEInternalError('Invalid input');

source$.error(sseError);

jest.runAllTimers();

expect(logger.error).toHaveBeenCalledWith(sseError);
expect(logger.debug).toHaveBeenCalled();
const debugFn = logger.debug.mock.calls[0][0] as () => string;
const loggedError = JSON.parse(debugFn());
expect(loggedError).toEqual({
type: 'error',
error: {
code: ServerSentEventErrorCode.internalError,
message: 'Invalid input',
meta: {},
},
});

expect(data).toEqual([
`event: error\ndata: ${JSON.stringify({
error: {
code: ServerSentEventErrorCode.internalError,
message: 'Invalid input',
meta: {},
},
})}\n\n`,
]);
});

it('handles SSE errors with metadata', () => {
const sseError = createSSERequestError('Invalid request', 400);

source$.error(sseError);

jest.runAllTimers();

expect(logger.error).toHaveBeenCalledWith(sseError);
expect(logger.debug).toHaveBeenCalled();
const debugFn = logger.debug.mock.calls[0][0] as () => string;
const loggedError = JSON.parse(debugFn());
expect(loggedError).toEqual({
type: 'error',
error: {
code: ServerSentEventErrorCode.requestError,
message: 'Invalid request',
meta: {
status: 400,
},
},
});

expect(data).toEqual([
`event: error\ndata: ${JSON.stringify({
error: {
code: ServerSentEventErrorCode.requestError,
message: 'Invalid request',
meta: {
status: 400,
},
},
})}\n\n`,
]);
});

it('handles non-SSE errors', () => {
const error = new Error('Non-SSE Error');

source$.error(error);

jest.runAllTimers();

expect(logger.error).toHaveBeenCalledWith(error);
expect(data).toEqual([
`event: error\ndata: ${JSON.stringify({
error: {
code: ServerSentEventErrorCode.internalError,
message: 'Non-SSE Error',
},
})}\n\n`,
]);
});

it('should send keep-alive comments every 10 seconds', () => {
jest.advanceTimersByTime(10000);
expect(data).toContain(': keep-alive');

jest.advanceTimersByTime(10000);
expect(data.filter((d) => d === ': keep-alive')).toHaveLength(2);
});

describe('without fake timers', () => {
beforeEach(() => {
jest.useFakeTimers({ doNotFake: ['nextTick'] });
});

it('should end the stream when the observable completes', async () => {
jest.useFakeTimers({ doNotFake: ['nextTick'] });

const endSpy = jest.fn();
stream.on('end', endSpy);

source$.complete();

await new Promise((resolve) => process.nextTick(resolve));

expect(endSpy).toHaveBeenCalled();
});

it('should end stream when signal is aborted', async () => {
const endSpy = jest.fn();
stream.on('end', endSpy);

// Emit some data
source$.next({ type: ServerSentEventType.data, data: { initial: 'data' } });

// Abort the signal
controller.abort();

// Emit more data after abort
source$.next({ type: ServerSentEventType.data, data: { after: 'abort' } });

await new Promise((resolve) => process.nextTick(resolve));

expect(endSpy).toHaveBeenCalled();

// Data after abort should not be received
expect(data).toEqual([
`event: data\ndata: ${JSON.stringify({ data: { initial: 'data' } })}\n\n`,
]);
});

afterEach(() => {
jest.useFakeTimers();
});
});
});
Loading

0 comments on commit fa1998c

Please sign in to comment.