From 24bdc97413fbdd749db4d007ccf9f06cc1a243c8 Mon Sep 17 00:00:00 2001 From: Walter Rafelsberger Date: Fri, 20 May 2022 10:45:50 +0200 Subject: [PATCH] [ML] Explain log rate spikes: Page setup (#132121) Builds out UI/code boilerplate necessary before we start implementing the feature's own UI on a dedicated page. - Updates navigation to bring up data view/saved search selection before moving on to the explain log spike rates page. The bar chart race demo page was moved to the aiops/single_endpoint_streaming_demo url. It is kept in this PR so we have two different pages + API endpoints that use streaming. With this still in place it's easier to update the streaming code to be more generic and reusable. - The url/page aiops/explain_log_rate_spikes has been added with some dummy request that slowly streams a data view's fields to the client. This page will host the actual UI to be brought over from the PoC in follow ups to this PR. - The structure to embed aiops plugin pages in the ml plugin has been simplified. Instead of a lot of custom code to load the components at runtime in the aiops plugin itself, this now uses React lazy loading with Suspense, similar to how we load Vega charts in other places. We no longer initialize the aiops client side code during startup of the plugin itself and augment it, instead we statically import components and pass on props/contexts from the ml plugin. - The code to handle streaming chunks on the client side in stream_fetch.ts/use_stream_fetch_reducer.ts has been improved to make better use of TS generics so for a given API endpoint it's able to return the appropriate coresponding return data type and only allows to use the supported reducer actions for that endpoint. Buffering client side actions has been tweaked to handle state updates more quickly if updates from the server are stalling. --- .../aiops/common/api/example_stream.ts | 5 +- .../common/api/explain_log_rate_spikes.ts | 34 ++++ x-pack/plugins/aiops/common/api/index.ts | 15 +- x-pack/plugins/aiops/kibana.json | 2 +- x-pack/plugins/aiops/public/api/index.ts | 15 -- .../plugins/aiops/public/components/app.tsx | 167 ------------------ .../components/explain_log_rate_spikes.tsx | 34 ---- .../explain_log_rate_spikes.tsx | 49 +++++ .../explain_log_rate_spikes/index.ts | 13 ++ .../explain_log_rate_spikes/stream_reducer.ts | 37 ++++ .../get_status_message.tsx | 0 .../single_endpoint_streaming_demo}/index.ts | 7 +- .../single_endpoint_streaming_demo.tsx | 135 ++++++++++++++ .../stream_reducer.ts | 4 +- .../{components => hooks}/stream_fetch.ts | 35 +++- .../use_stream_fetch_reducer.ts | 23 ++- x-pack/plugins/aiops/public/index.ts | 4 +- .../plugins/aiops/public/kibana_services.ts | 19 -- .../aiops/public/lazy_load_bundle/index.ts | 30 ---- x-pack/plugins/aiops/public/plugin.ts | 11 +- .../aiops/public/shared_lazy_components.tsx | 42 +++++ .../server/lib/accept_compression.test.ts | 42 +++++ .../aiops/server/lib/accept_compression.ts | 44 +++++ .../aiops/server/lib/stream_factory.test.ts | 106 +++++++++++ .../aiops/server/lib/stream_factory.ts | 70 ++++++++ x-pack/plugins/aiops/server/plugin.ts | 27 ++- .../aiops/server/routes/example_stream.ts | 109 ++++++++++++ .../server/routes/explain_log_rate_spikes.ts | 90 ++++++++++ x-pack/plugins/aiops/server/routes/index.ts | 124 +------------ x-pack/plugins/aiops/server/types.ts | 10 ++ x-pack/plugins/ml/common/constants/locator.ts | 2 + x-pack/plugins/ml/common/types/locator.ts | 4 +- .../aiops/explain_log_rate_spikes.tsx | 40 ++--- .../aiops/single_endpoint_streaming_demo.tsx | 34 ++++ .../components/ml_page/side_nav.tsx | 11 +- .../application/contexts/ml/ml_context.ts | 6 +- .../public/application/routing/breadcrumbs.ts | 2 +- .../routes/aiops/explain_log_rate_spikes.tsx | 2 +- .../application/routing/routes/aiops/index.ts | 1 + .../aiops/single_endpoint_streaming_demo.tsx | 63 +++++++ .../routes/new_job/index_or_search.tsx | 30 ++++ .../plugins/ml/public/locator/ml_locator.ts | 2 + .../apis/aiops/example_stream.ts | 29 +-- .../apis/aiops/explain_log_rate_spikes.ts | 126 +++++++++++++ .../test/api_integration/apis/aiops/index.ts | 1 + .../apis/aiops/parse_stream.ts | 28 +++ 46 files changed, 1203 insertions(+), 481 deletions(-) create mode 100644 x-pack/plugins/aiops/common/api/explain_log_rate_spikes.ts delete mode 100644 x-pack/plugins/aiops/public/api/index.ts delete mode 100755 x-pack/plugins/aiops/public/components/app.tsx delete mode 100644 x-pack/plugins/aiops/public/components/explain_log_rate_spikes.tsx create mode 100644 x-pack/plugins/aiops/public/components/explain_log_rate_spikes/explain_log_rate_spikes.tsx create mode 100644 x-pack/plugins/aiops/public/components/explain_log_rate_spikes/index.ts create mode 100644 x-pack/plugins/aiops/public/components/explain_log_rate_spikes/stream_reducer.ts rename x-pack/plugins/aiops/public/components/{ => single_endpoint_streaming_demo}/get_status_message.tsx (100%) rename x-pack/plugins/aiops/public/{lazy_load_bundle/lazy => components/single_endpoint_streaming_demo}/index.ts (52%) create mode 100644 x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/single_endpoint_streaming_demo.tsx rename x-pack/plugins/aiops/public/components/{ => single_endpoint_streaming_demo}/stream_reducer.ts (92%) rename x-pack/plugins/aiops/public/{components => hooks}/stream_fetch.ts (62%) rename x-pack/plugins/aiops/public/{components => hooks}/use_stream_fetch_reducer.ts (77%) delete mode 100644 x-pack/plugins/aiops/public/kibana_services.ts delete mode 100644 x-pack/plugins/aiops/public/lazy_load_bundle/index.ts create mode 100644 x-pack/plugins/aiops/public/shared_lazy_components.tsx create mode 100644 x-pack/plugins/aiops/server/lib/accept_compression.test.ts create mode 100644 x-pack/plugins/aiops/server/lib/accept_compression.ts create mode 100644 x-pack/plugins/aiops/server/lib/stream_factory.test.ts create mode 100644 x-pack/plugins/aiops/server/lib/stream_factory.ts create mode 100644 x-pack/plugins/aiops/server/routes/example_stream.ts create mode 100644 x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts create mode 100644 x-pack/plugins/ml/public/application/aiops/single_endpoint_streaming_demo.tsx create mode 100644 x-pack/plugins/ml/public/application/routing/routes/aiops/single_endpoint_streaming_demo.tsx create mode 100644 x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts create mode 100644 x-pack/test/api_integration/apis/aiops/parse_stream.ts diff --git a/x-pack/plugins/aiops/common/api/example_stream.ts b/x-pack/plugins/aiops/common/api/example_stream.ts index 1210cccf55487..ccef04fc8473a 100644 --- a/x-pack/plugins/aiops/common/api/example_stream.ts +++ b/x-pack/plugins/aiops/common/api/example_stream.ts @@ -65,4 +65,7 @@ export function deleteEntityAction(payload: string): ApiActionDeleteEntity { }; } -export type ApiAction = ApiActionUpdateProgress | ApiActionAddToEntity | ApiActionDeleteEntity; +export type AiopsExampleStreamApiAction = + | ApiActionUpdateProgress + | ApiActionAddToEntity + | ApiActionDeleteEntity; diff --git a/x-pack/plugins/aiops/common/api/explain_log_rate_spikes.ts b/x-pack/plugins/aiops/common/api/explain_log_rate_spikes.ts new file mode 100644 index 0000000000000..b5c5524cdef01 --- /dev/null +++ b/x-pack/plugins/aiops/common/api/explain_log_rate_spikes.ts @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { schema, TypeOf } from '@kbn/config-schema'; + +export const aiopsExplainLogRateSpikesSchema = schema.object({ + /** The index to query for log rate spikes */ + index: schema.string(), +}); + +export type AiopsExplainLogRateSpikesSchema = TypeOf; + +export const API_ACTION_NAME = { + ADD_FIELDS: 'add_fields', +} as const; +export type ApiActionName = typeof API_ACTION_NAME[keyof typeof API_ACTION_NAME]; + +interface ApiActionAddFields { + type: typeof API_ACTION_NAME.ADD_FIELDS; + payload: string[]; +} + +export function addFieldsAction(payload: string[]): ApiActionAddFields { + return { + type: API_ACTION_NAME.ADD_FIELDS, + payload, + }; +} + +export type AiopsExplainLogRateSpikesApiAction = ApiActionAddFields; diff --git a/x-pack/plugins/aiops/common/api/index.ts b/x-pack/plugins/aiops/common/api/index.ts index da1e091d3fb54..6b987fef13d1a 100644 --- a/x-pack/plugins/aiops/common/api/index.ts +++ b/x-pack/plugins/aiops/common/api/index.ts @@ -5,15 +5,24 @@ * 2.0. */ -import type { AiopsExampleStreamSchema } from './example_stream'; +import type { + AiopsExplainLogRateSpikesSchema, + AiopsExplainLogRateSpikesApiAction, +} from './explain_log_rate_spikes'; +import type { AiopsExampleStreamSchema, AiopsExampleStreamApiAction } from './example_stream'; export const API_ENDPOINT = { EXAMPLE_STREAM: '/internal/aiops/example_stream', - ANOTHER: '/internal/aiops/another', + EXPLAIN_LOG_RATE_SPIKES: '/internal/aiops/explain_log_rate_spikes', } as const; export type ApiEndpoint = typeof API_ENDPOINT[keyof typeof API_ENDPOINT]; export interface ApiEndpointOptions { [API_ENDPOINT.EXAMPLE_STREAM]: AiopsExampleStreamSchema; - [API_ENDPOINT.ANOTHER]: { anotherOption: string }; + [API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES]: AiopsExplainLogRateSpikesSchema; +} + +export interface ApiEndpointActions { + [API_ENDPOINT.EXAMPLE_STREAM]: AiopsExampleStreamApiAction; + [API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES]: AiopsExplainLogRateSpikesApiAction; } diff --git a/x-pack/plugins/aiops/kibana.json b/x-pack/plugins/aiops/kibana.json index b74a23bf2bc9e..2d1e60bca74e3 100755 --- a/x-pack/plugins/aiops/kibana.json +++ b/x-pack/plugins/aiops/kibana.json @@ -9,7 +9,7 @@ "description": "AIOps plugin maintained by ML team.", "server": true, "ui": true, - "requiredPlugins": [], + "requiredPlugins": ["data"], "optionalPlugins": [], "requiredBundles": ["kibanaReact"], "extraPublicDirs": ["common"] diff --git a/x-pack/plugins/aiops/public/api/index.ts b/x-pack/plugins/aiops/public/api/index.ts deleted file mode 100644 index 6aa171df5286c..0000000000000 --- a/x-pack/plugins/aiops/public/api/index.ts +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { lazyLoadModules } from '../lazy_load_bundle'; - -import type { ExplainLogRateSpikesSpec } from '../components/explain_log_rate_spikes'; - -export async function getExplainLogRateSpikesComponent(): Promise<() => ExplainLogRateSpikesSpec> { - const modules = await lazyLoadModules(); - return () => modules.ExplainLogRateSpikes; -} diff --git a/x-pack/plugins/aiops/public/components/app.tsx b/x-pack/plugins/aiops/public/components/app.tsx deleted file mode 100755 index 963253b154e27..0000000000000 --- a/x-pack/plugins/aiops/public/components/app.tsx +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import React, { useEffect, useState } from 'react'; - -import { Chart, Settings, Axis, BarSeries, Position, ScaleType } from '@elastic/charts'; - -import { i18n } from '@kbn/i18n'; -import { FormattedMessage } from '@kbn/i18n-react'; -import { useKibana } from '@kbn/kibana-react-plugin/public'; - -import { - EuiBadge, - EuiButton, - EuiCheckbox, - EuiFlexGroup, - EuiFlexItem, - EuiPage, - EuiPageBody, - EuiPageContent, - EuiPageContentBody, - EuiPageContentHeader, - EuiProgress, - EuiSpacer, - EuiTitle, - EuiText, -} from '@elastic/eui'; - -import { getStatusMessage } from './get_status_message'; -import { initialState, resetStream, streamReducer } from './stream_reducer'; -import { useStreamFetchReducer } from './use_stream_fetch_reducer'; - -export const AiopsApp = () => { - const { notifications } = useKibana(); - - const [simulateErrors, setSimulateErrors] = useState(false); - - const { dispatch, start, cancel, data, isCancelled, isRunning } = useStreamFetchReducer( - '/internal/aiops/example_stream', - streamReducer, - initialState, - { simulateErrors } - ); - - const { errors, progress, entities } = data; - - const onClickHandler = async () => { - if (isRunning) { - cancel(); - } else { - dispatch(resetStream()); - start(); - } - }; - - useEffect(() => { - if (errors.length > 0) { - notifications.toasts.danger({ body: errors[errors.length - 1] }); - } - }, [errors, notifications.toasts]); - - const buttonLabel = isRunning - ? i18n.translate('xpack.aiops.stopbuttonText', { - defaultMessage: 'Stop development', - }) - : i18n.translate('xpack.aiops.startbuttonText', { - defaultMessage: 'Start development', - }); - - return ( - - - - - -

- -

-
-
- - - - - - {buttonLabel} - - - - - {progress}% - - - - - - - -
- - - - - - { - return { - x, - y, - }; - }) - .sort((a, b) => b.y - a.y)} - /> - -
-

{getStatusMessage(isRunning, isCancelled, data.progress)}

- setSimulateErrors(!simulateErrors)} - compressed - /> -
-
-
-
-
- ); -}; diff --git a/x-pack/plugins/aiops/public/components/explain_log_rate_spikes.tsx b/x-pack/plugins/aiops/public/components/explain_log_rate_spikes.tsx deleted file mode 100644 index 21d7b39a2a148..0000000000000 --- a/x-pack/plugins/aiops/public/components/explain_log_rate_spikes.tsx +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import React, { FC } from 'react'; - -import { KibanaContextProvider, KibanaThemeProvider } from '@kbn/kibana-react-plugin/public'; -import { I18nProvider } from '@kbn/i18n-react'; - -import { getCoreStart } from '../kibana_services'; - -import { AiopsApp } from './app'; - -/** - * Spec used for lazy loading in the ML plugin - */ -export type ExplainLogRateSpikesSpec = typeof ExplainLogRateSpikes; - -export const ExplainLogRateSpikes: FC = () => { - const coreStart = getCoreStart(); - - return ( - - - - - - - - ); -}; diff --git a/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/explain_log_rate_spikes.tsx b/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/explain_log_rate_spikes.tsx new file mode 100644 index 0000000000000..12c4837194f80 --- /dev/null +++ b/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/explain_log_rate_spikes.tsx @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React, { useEffect, FC } from 'react'; + +import { EuiBadge, EuiSpacer, EuiText } from '@elastic/eui'; + +import type { DataView } from '@kbn/data-views-plugin/public'; + +import { useStreamFetchReducer } from '../../hooks/use_stream_fetch_reducer'; + +import { initialState, streamReducer } from './stream_reducer'; + +/** + * ExplainLogRateSpikes props require a data view. + */ +export interface ExplainLogRateSpikesProps { + /** The data view to analyze. */ + dataView: DataView; +} + +export const ExplainLogRateSpikes: FC = ({ dataView }) => { + const { start, data, isRunning } = useStreamFetchReducer( + '/internal/aiops/explain_log_rate_spikes', + streamReducer, + initialState, + { index: dataView.title } + ); + + useEffect(() => { + start(); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []); + + return ( + +

{dataView.title}

+

{isRunning ? 'Loading fields ...' : 'Loaded all fields.'}

+ + {data.fields.map((field) => ( + {field} + ))} +
+ ); +}; diff --git a/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/index.ts b/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/index.ts new file mode 100644 index 0000000000000..3e48c6816dda9 --- /dev/null +++ b/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/index.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export type { ExplainLogRateSpikesProps } from './explain_log_rate_spikes'; +import { ExplainLogRateSpikes } from './explain_log_rate_spikes'; + +// required for dynamic import using React.lazy() +// eslint-disable-next-line import/no-default-export +export default ExplainLogRateSpikes; diff --git a/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/stream_reducer.ts b/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/stream_reducer.ts new file mode 100644 index 0000000000000..7ec710f4ae65d --- /dev/null +++ b/x-pack/plugins/aiops/public/components/explain_log_rate_spikes/stream_reducer.ts @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + API_ACTION_NAME, + AiopsExplainLogRateSpikesApiAction, +} from '../../../common/api/explain_log_rate_spikes'; + +interface StreamState { + fields: string[]; +} + +export const initialState: StreamState = { + fields: [], +}; + +export function streamReducer( + state: StreamState, + action: AiopsExplainLogRateSpikesApiAction | AiopsExplainLogRateSpikesApiAction[] +): StreamState { + if (Array.isArray(action)) { + return action.reduce(streamReducer, state); + } + + switch (action.type) { + case API_ACTION_NAME.ADD_FIELDS: + return { + fields: [...state.fields, ...action.payload], + }; + default: + return state; + } +} diff --git a/x-pack/plugins/aiops/public/components/get_status_message.tsx b/x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/get_status_message.tsx similarity index 100% rename from x-pack/plugins/aiops/public/components/get_status_message.tsx rename to x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/get_status_message.tsx diff --git a/x-pack/plugins/aiops/public/lazy_load_bundle/lazy/index.ts b/x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/index.ts similarity index 52% rename from x-pack/plugins/aiops/public/lazy_load_bundle/lazy/index.ts rename to x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/index.ts index 967525de9bd6e..38eb279568051 100644 --- a/x-pack/plugins/aiops/public/lazy_load_bundle/lazy/index.ts +++ b/x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/index.ts @@ -5,5 +5,8 @@ * 2.0. */ -export type { ExplainLogRateSpikesSpec } from '../../components/explain_log_rate_spikes'; -export { ExplainLogRateSpikes } from '../../components/explain_log_rate_spikes'; +import { SingleEndpointStreamingDemo } from './single_endpoint_streaming_demo'; + +// required for dynamic import using React.lazy() +// eslint-disable-next-line import/no-default-export +export default SingleEndpointStreamingDemo; diff --git a/x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/single_endpoint_streaming_demo.tsx b/x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/single_endpoint_streaming_demo.tsx new file mode 100644 index 0000000000000..12f33aada133c --- /dev/null +++ b/x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/single_endpoint_streaming_demo.tsx @@ -0,0 +1,135 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React, { useEffect, useState, FC } from 'react'; + +import { Chart, Settings, Axis, BarSeries, Position, ScaleType } from '@elastic/charts'; + +import { i18n } from '@kbn/i18n'; +import { useKibana } from '@kbn/kibana-react-plugin/public'; + +import { + EuiBadge, + EuiButton, + EuiCheckbox, + EuiFlexGroup, + EuiFlexItem, + EuiProgress, + EuiSpacer, + EuiText, +} from '@elastic/eui'; + +import { useStreamFetchReducer } from '../../hooks/use_stream_fetch_reducer'; + +import { getStatusMessage } from './get_status_message'; +import { initialState, resetStream, streamReducer } from './stream_reducer'; + +export const SingleEndpointStreamingDemo: FC = () => { + const { notifications } = useKibana(); + + const [simulateErrors, setSimulateErrors] = useState(false); + + const { dispatch, start, cancel, data, isCancelled, isRunning } = useStreamFetchReducer( + '/internal/aiops/example_stream', + streamReducer, + initialState, + { simulateErrors } + ); + + const { errors, progress, entities } = data; + + const onClickHandler = async () => { + if (isRunning) { + cancel(); + } else { + dispatch(resetStream()); + start(); + } + }; + + useEffect(() => { + if (errors.length > 0) { + notifications.toasts.danger({ body: errors[errors.length - 1] }); + } + }, [errors, notifications.toasts]); + + const buttonLabel = isRunning + ? i18n.translate('xpack.aiops.stopbuttonText', { + defaultMessage: 'Stop development', + }) + : i18n.translate('xpack.aiops.startbuttonText', { + defaultMessage: 'Start development', + }); + + return ( + + + + + {buttonLabel} + + + + + {progress}% + + + + + + + +
+ + + + + + { + return { + x, + y, + }; + }) + .sort((a, b) => b.y - a.y)} + /> + +
+

{getStatusMessage(isRunning, isCancelled, data.progress)}

+ setSimulateErrors(!simulateErrors)} + compressed + /> +
+ ); +}; diff --git a/x-pack/plugins/aiops/public/components/stream_reducer.ts b/x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/stream_reducer.ts similarity index 92% rename from x-pack/plugins/aiops/public/components/stream_reducer.ts rename to x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/stream_reducer.ts index 3e68e139ceeca..a3e9724f24a1f 100644 --- a/x-pack/plugins/aiops/public/components/stream_reducer.ts +++ b/x-pack/plugins/aiops/public/components/single_endpoint_streaming_demo/stream_reducer.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { ApiAction, API_ACTION_NAME } from '../../common/api/example_stream'; +import { AiopsExampleStreamApiAction, API_ACTION_NAME } from '../../../common/api/example_stream'; export const UI_ACTION_NAME = { ERROR: 'error', @@ -37,7 +37,7 @@ export function resetStream(): UiActionResetStream { } type UiAction = UiActionResetStream | UiActionError; -export type ReducerAction = ApiAction | UiAction; +export type ReducerAction = AiopsExampleStreamApiAction | UiAction; export function streamReducer( state: StreamState, action: ReducerAction | ReducerAction[] diff --git a/x-pack/plugins/aiops/public/components/stream_fetch.ts b/x-pack/plugins/aiops/public/hooks/stream_fetch.ts similarity index 62% rename from x-pack/plugins/aiops/public/components/stream_fetch.ts rename to x-pack/plugins/aiops/public/hooks/stream_fetch.ts index 37d7c13dd3b55..abfec63702012 100644 --- a/x-pack/plugins/aiops/public/components/stream_fetch.ts +++ b/x-pack/plugins/aiops/public/hooks/stream_fetch.ts @@ -7,14 +7,19 @@ import type React from 'react'; -import type { ApiEndpoint, ApiEndpointOptions } from '../../common/api'; +import type { ApiEndpoint, ApiEndpointActions, ApiEndpointOptions } from '../../common/api'; -export async function* streamFetch( +interface ErrorAction { + type: 'error'; + payload: string; +} + +export async function* streamFetch( endpoint: E, abortCtrl: React.MutableRefObject, - options: ApiEndpointOptions[ApiEndpoint], + options: ApiEndpointOptions[E], basePath = '' -) { +): AsyncGenerator> { const stream = await fetch(`${basePath}${endpoint}`, { signal: abortCtrl.current.signal, method: 'POST', @@ -36,7 +41,7 @@ export async function* streamFetch( const bufferBounce = 100; let partial = ''; - let actionBuffer: A[] = []; + let actionBuffer: Array = []; let lastCall = 0; while (true) { @@ -52,7 +57,7 @@ export async function* streamFetch( partial = last ?? ''; - const actions = parts.map((p) => JSON.parse(p)); + const actions = parts.map((p) => JSON.parse(p)) as Array; actionBuffer.push(...actions); const now = Date.now(); @@ -61,10 +66,26 @@ export async function* streamFetch( yield actionBuffer; actionBuffer = []; lastCall = now; + + // In cases where the next chunk takes longer to be received than the `bufferBounce` timeout, + // we trigger this client side timeout to clear a potential intermediate buffer state. + // Since `yield` cannot be passed on to other scopes like callbacks, + // this pattern using a Promise is used to wait for the timeout. + yield new Promise>((resolve) => { + setTimeout(() => { + if (actionBuffer.length > 0) { + resolve(actionBuffer); + actionBuffer = []; + lastCall = now; + } else { + resolve([]); + } + }, bufferBounce + 10); + }); } } catch (error) { if (error.name !== 'AbortError') { - yield { type: 'error', payload: error.toString() }; + yield [{ type: 'error', payload: error.toString() }]; } break; } diff --git a/x-pack/plugins/aiops/public/components/use_stream_fetch_reducer.ts b/x-pack/plugins/aiops/public/hooks/use_stream_fetch_reducer.ts similarity index 77% rename from x-pack/plugins/aiops/public/components/use_stream_fetch_reducer.ts rename to x-pack/plugins/aiops/public/hooks/use_stream_fetch_reducer.ts index 77ac09e0ff429..ba64831bec60e 100644 --- a/x-pack/plugins/aiops/public/components/use_stream_fetch_reducer.ts +++ b/x-pack/plugins/aiops/public/hooks/use_stream_fetch_reducer.ts @@ -5,7 +5,15 @@ * 2.0. */ -import { useReducer, useRef, useState, Reducer, ReducerAction, ReducerState } from 'react'; +import { + useEffect, + useReducer, + useRef, + useState, + Reducer, + ReducerAction, + ReducerState, +} from 'react'; import { useKibana } from '@kbn/kibana-react-plugin/public'; @@ -13,11 +21,11 @@ import type { ApiEndpoint, ApiEndpointOptions } from '../../common/api'; import { streamFetch } from './stream_fetch'; -export const useStreamFetchReducer = , E = ApiEndpoint>( +export const useStreamFetchReducer = , E extends ApiEndpoint>( endpoint: E, reducer: R, initialState: ReducerState, - options: ApiEndpointOptions[ApiEndpoint] + options: ApiEndpointOptions[E] ) => { const kibana = useKibana(); @@ -44,7 +52,9 @@ export const useStreamFetchReducer = , E = ApiEndpoi options, kibana.services.http?.basePath.get() )) { - dispatch(actions as ReducerAction); + if (actions.length > 0) { + dispatch(actions as ReducerAction); + } } setIsRunning(false); @@ -56,6 +66,11 @@ export const useStreamFetchReducer = , E = ApiEndpoi setIsRunning(false); }; + // If components using this custom hook get unmounted, cancel any ongoing request. + useEffect(() => { + return () => abortCtrl.current.abort(); + }, []); + return { cancel, data, diff --git a/x-pack/plugins/aiops/public/index.ts b/x-pack/plugins/aiops/public/index.ts index 30bcaf5afabdc..53fc1d7a6eeca 100755 --- a/x-pack/plugins/aiops/public/index.ts +++ b/x-pack/plugins/aiops/public/index.ts @@ -13,6 +13,6 @@ export function plugin() { return new AiopsPlugin(); } +export type { ExplainLogRateSpikesProps } from './components/explain_log_rate_spikes'; +export { ExplainLogRateSpikes, SingleEndpointStreamingDemo } from './shared_lazy_components'; export type { AiopsPluginSetup, AiopsPluginStart } from './types'; - -export type { ExplainLogRateSpikesSpec } from './components/explain_log_rate_spikes'; diff --git a/x-pack/plugins/aiops/public/kibana_services.ts b/x-pack/plugins/aiops/public/kibana_services.ts deleted file mode 100644 index 9a43d2de5e5a1..0000000000000 --- a/x-pack/plugins/aiops/public/kibana_services.ts +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { CoreStart } from '@kbn/core/public'; -import { AppPluginStartDependencies } from './types'; - -let coreStart: CoreStart; -let pluginsStart: AppPluginStartDependencies; -export function setStartServices(core: CoreStart, plugins: AppPluginStartDependencies) { - coreStart = core; - pluginsStart = plugins; -} - -export const getCoreStart = () => coreStart; -export const getPluginsStart = () => pluginsStart; diff --git a/x-pack/plugins/aiops/public/lazy_load_bundle/index.ts b/x-pack/plugins/aiops/public/lazy_load_bundle/index.ts deleted file mode 100644 index 0072336080175..0000000000000 --- a/x-pack/plugins/aiops/public/lazy_load_bundle/index.ts +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import type { ExplainLogRateSpikesSpec } from '../components/explain_log_rate_spikes'; - -let loadModulesPromise: Promise; - -interface LazyLoadedModules { - ExplainLogRateSpikes: ExplainLogRateSpikesSpec; -} - -export async function lazyLoadModules(): Promise { - if (typeof loadModulesPromise !== 'undefined') { - return loadModulesPromise; - } - - loadModulesPromise = new Promise(async (resolve, reject) => { - try { - const lazyImports = await import('./lazy'); - resolve({ ...lazyImports }); - } catch (error) { - reject(error); - } - }); - return loadModulesPromise; -} diff --git a/x-pack/plugins/aiops/public/plugin.ts b/x-pack/plugins/aiops/public/plugin.ts index 3c3cff39abb80..ef65ab247c40f 100755 --- a/x-pack/plugins/aiops/public/plugin.ts +++ b/x-pack/plugins/aiops/public/plugin.ts @@ -7,19 +7,10 @@ import { CoreSetup, CoreStart, Plugin } from '@kbn/core/public'; -import { getExplainLogRateSpikesComponent } from './api'; -import { setStartServices } from './kibana_services'; import { AiopsPluginSetup, AiopsPluginStart } from './types'; export class AiopsPlugin implements Plugin { public setup(core: CoreSetup) {} - - public start(core: CoreStart) { - setStartServices(core, {}); - return { - getExplainLogRateSpikesComponent, - }; - } - + public start(core: CoreStart) {} public stop() {} } diff --git a/x-pack/plugins/aiops/public/shared_lazy_components.tsx b/x-pack/plugins/aiops/public/shared_lazy_components.tsx new file mode 100644 index 0000000000000..f707a77cf7f90 --- /dev/null +++ b/x-pack/plugins/aiops/public/shared_lazy_components.tsx @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React, { FC, Suspense } from 'react'; + +import { EuiErrorBoundary, EuiLoadingContent } from '@elastic/eui'; + +import type { ExplainLogRateSpikesProps } from './components/explain_log_rate_spikes'; + +const ExplainLogRateSpikesLazy = React.lazy(() => import('./components/explain_log_rate_spikes')); +const SingleEndpointStreamingDemoLazy = React.lazy( + () => import('./components/single_endpoint_streaming_demo') +); + +const LazyWrapper: FC = ({ children }) => ( + + }>{children} + +); + +/** + * Lazy-wrapped ExplainLogRateSpikes React component + * @param {ExplainLogRateSpikesProps} props - properties specifying the data on which to run the analysis. + */ +export const ExplainLogRateSpikes: FC = (props) => ( + + + +); + +/** + * Lazy-wrapped SingleEndpointStreamingDemo React component + */ +export const SingleEndpointStreamingDemo: FC = () => ( + + + +); diff --git a/x-pack/plugins/aiops/server/lib/accept_compression.test.ts b/x-pack/plugins/aiops/server/lib/accept_compression.test.ts new file mode 100644 index 0000000000000..f1c51f75cbe0c --- /dev/null +++ b/x-pack/plugins/aiops/server/lib/accept_compression.test.ts @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { acceptCompression } from './accept_compression'; + +describe('acceptCompression', () => { + it('should return false for empty headers', () => { + expect(acceptCompression({})).toBe(false); + }); + it('should return false for other header containing gzip as string', () => { + expect(acceptCompression({ 'other-header': 'gzip, other' })).toBe(false); + }); + it('should return false for other header containing gzip as array', () => { + expect(acceptCompression({ 'other-header': ['gzip', 'other'] })).toBe(false); + }); + it('should return true for upper-case header containing gzip as string', () => { + expect(acceptCompression({ 'Accept-Encoding': 'gzip, other' })).toBe(true); + }); + it('should return true for lower-case header containing gzip as string', () => { + expect(acceptCompression({ 'accept-encoding': 'gzip, other' })).toBe(true); + }); + it('should return true for upper-case header containing gzip as array', () => { + expect(acceptCompression({ 'Accept-Encoding': ['gzip', 'other'] })).toBe(true); + }); + it('should return true for lower-case header containing gzip as array', () => { + expect(acceptCompression({ 'accept-encoding': ['gzip', 'other'] })).toBe(true); + }); + it('should return true for mixed headers containing gzip as string', () => { + expect( + acceptCompression({ 'accept-encoding': 'gzip, other', 'other-header': 'other-value' }) + ).toBe(true); + }); + it('should return true for mixed headers containing gzip as array', () => { + expect( + acceptCompression({ 'accept-encoding': ['gzip', 'other'], 'other-header': 'other-value' }) + ).toBe(true); + }); +}); diff --git a/x-pack/plugins/aiops/server/lib/accept_compression.ts b/x-pack/plugins/aiops/server/lib/accept_compression.ts new file mode 100644 index 0000000000000..0fd092d647314 --- /dev/null +++ b/x-pack/plugins/aiops/server/lib/accept_compression.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Headers } from '@kbn/core/server'; + +/** + * Returns whether request headers accept a response using gzip compression. + * + * @param headers - Request headers. + * @returns boolean + */ +export function acceptCompression(headers: Headers) { + let compressed = false; + + Object.keys(headers).forEach((key) => { + if (key.toLocaleLowerCase() === 'accept-encoding') { + const acceptEncoding = headers[key]; + + function containsGzip(s: string) { + return s + .split(',') + .map((d) => d.trim()) + .includes('gzip'); + } + + if (typeof acceptEncoding === 'string') { + compressed = containsGzip(acceptEncoding); + } else if (Array.isArray(acceptEncoding)) { + for (const ae of acceptEncoding) { + if (containsGzip(ae)) { + compressed = true; + break; + } + } + } + } + }); + + return compressed; +} diff --git a/x-pack/plugins/aiops/server/lib/stream_factory.test.ts b/x-pack/plugins/aiops/server/lib/stream_factory.test.ts new file mode 100644 index 0000000000000..7082a4e7e763c --- /dev/null +++ b/x-pack/plugins/aiops/server/lib/stream_factory.test.ts @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import zlib from 'zlib'; + +import { loggerMock, MockedLogger } from '@kbn/logging-mocks'; + +import { API_ENDPOINT } from '../../common/api'; +import type { ApiEndpointActions } from '../../common/api'; + +import { streamFactory } from './stream_factory'; + +type Action = ApiEndpointActions['/internal/aiops/explain_log_rate_spikes']; + +const mockItem1: Action = { + type: 'add_fields', + payload: ['clientip'], +}; +const mockItem2: Action = { + type: 'add_fields', + payload: ['referer'], +}; + +describe('streamFactory', () => { + let mockLogger: MockedLogger; + + beforeEach(() => { + mockLogger = loggerMock.create(); + }); + + it('should encode and receive an uncompressed stream', async () => { + const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory< + typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES + >(mockLogger, {}); + + push(mockItem1); + push(mockItem2); + end(); + + let streamResult = ''; + for await (const chunk of stream) { + streamResult += chunk.toString('utf8'); + } + + const streamItems = streamResult.split(DELIMITER); + const lastItem = streamItems.pop(); + + const parsedItems = streamItems.map((d) => JSON.parse(d)); + + expect(responseWithHeaders.headers).toBe(undefined); + expect(parsedItems).toHaveLength(2); + expect(parsedItems[0]).toStrictEqual(mockItem1); + expect(parsedItems[1]).toStrictEqual(mockItem2); + expect(lastItem).toBe(''); + }); + + // Because zlib.gunzip's API expects a callback, we need to use `done` here + // to indicate once all assertions are run. However, it's not allowed to use both + // `async` and `done` for the test callback. That's why we're using an "async IIFE" + // pattern inside the tests callback to still be able to do async/await for the + // `for await()` part. Note that the unzipping here is done just to be able to + // decode the stream for the test and assert it. When used in actual code, + // the browser on the client side will automatically take care of unzipping + // without the need for additional custom code. + it('should encode and receive a compressed stream', (done) => { + (async () => { + const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory< + typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES + >(mockLogger, { 'accept-encoding': 'gzip' }); + + push(mockItem1); + push(mockItem2); + end(); + + const chunks = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + + const buffer = Buffer.concat(chunks); + + zlib.gunzip(buffer, function (err, decoded) { + expect(err).toBe(null); + + const streamResult = decoded.toString('utf8'); + + const streamItems = streamResult.split(DELIMITER); + const lastItem = streamItems.pop(); + + const parsedItems = streamItems.map((d) => JSON.parse(d)); + + expect(responseWithHeaders.headers).toStrictEqual({ 'content-encoding': 'gzip' }); + expect(parsedItems).toHaveLength(2); + expect(parsedItems[0]).toStrictEqual(mockItem1); + expect(parsedItems[1]).toStrictEqual(mockItem2); + expect(lastItem).toBe(''); + + done(); + }); + })(); + }); +}); diff --git a/x-pack/plugins/aiops/server/lib/stream_factory.ts b/x-pack/plugins/aiops/server/lib/stream_factory.ts new file mode 100644 index 0000000000000..dc67a54902527 --- /dev/null +++ b/x-pack/plugins/aiops/server/lib/stream_factory.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Stream } from 'stream'; +import zlib from 'zlib'; + +import type { Headers, Logger } from '@kbn/core/server'; + +import { ApiEndpoint, ApiEndpointActions } from '../../common/api'; + +import { acceptCompression } from './accept_compression'; + +// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error. +class ResponseStream extends Stream.PassThrough { + flush() {} + _read() {} +} + +const DELIMITER = '\n'; + +/** + * Sets up a response stream with support for gzip compression depending on provided + * request headers. + * + * @param logger - Kibana provided logger. + * @param headers - Request headers. + * @returns An object with stream attributes and methods. + */ +export function streamFactory(logger: Logger, headers: Headers) { + const isCompressed = acceptCompression(headers); + + const stream = isCompressed ? zlib.createGzip() : new ResponseStream(); + + function push(d: ApiEndpointActions[T]) { + try { + const line = JSON.stringify(d); + stream.write(`${line}${DELIMITER}`); + + // Calling .flush() on a compression stream will + // make zlib return as much output as currently possible. + if (isCompressed) { + stream.flush(); + } + } catch (error) { + logger.error('Could not serialize or stream a message.'); + logger.error(error); + } + } + + function end() { + stream.end(); + } + + const responseWithHeaders = { + body: stream, + ...(isCompressed + ? { + headers: { + 'content-encoding': 'gzip', + }, + } + : {}), + }; + + return { DELIMITER, end, push, responseWithHeaders, stream }; +} diff --git a/x-pack/plugins/aiops/server/plugin.ts b/x-pack/plugins/aiops/server/plugin.ts index c6b1b8b22a187..3743d32e3a081 100755 --- a/x-pack/plugins/aiops/server/plugin.ts +++ b/x-pack/plugins/aiops/server/plugin.ts @@ -6,23 +6,38 @@ */ import { PluginInitializerContext, CoreSetup, CoreStart, Plugin, Logger } from '@kbn/core/server'; +import type { DataRequestHandlerContext } from '@kbn/data-plugin/server'; -import { AiopsPluginSetup, AiopsPluginStart } from './types'; -import { defineRoutes } from './routes'; +import { AIOPS_ENABLED } from '../common'; -export class AiopsPlugin implements Plugin { +import { + AiopsPluginSetup, + AiopsPluginStart, + AiopsPluginSetupDeps, + AiopsPluginStartDeps, +} from './types'; +import { defineExampleStreamRoute, defineExplainLogRateSpikesRoute } from './routes'; + +export class AiopsPlugin + implements Plugin +{ private readonly logger: Logger; constructor(initializerContext: PluginInitializerContext) { this.logger = initializerContext.logger.get(); } - public setup(core: CoreSetup) { + public setup(core: CoreSetup, deps: AiopsPluginSetupDeps) { this.logger.debug('aiops: Setup'); - const router = core.http.createRouter(); + const router = core.http.createRouter(); // Register server side APIs - defineRoutes(router, this.logger); + if (AIOPS_ENABLED) { + core.getStartServices().then(([_, depsStart]) => { + defineExampleStreamRoute(router, this.logger); + defineExplainLogRateSpikesRoute(router, this.logger); + }); + } return {}; } diff --git a/x-pack/plugins/aiops/server/routes/example_stream.ts b/x-pack/plugins/aiops/server/routes/example_stream.ts new file mode 100644 index 0000000000000..38ca28ce6f176 --- /dev/null +++ b/x-pack/plugins/aiops/server/routes/example_stream.ts @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { IRouter, Logger } from '@kbn/core/server'; + +import { + aiopsExampleStreamSchema, + updateProgressAction, + addToEntityAction, + deleteEntityAction, +} from '../../common/api/example_stream'; +import { API_ENDPOINT } from '../../common/api'; + +import { streamFactory } from '../lib/stream_factory'; + +export const defineExampleStreamRoute = (router: IRouter, logger: Logger) => { + router.post( + { + path: API_ENDPOINT.EXAMPLE_STREAM, + validate: { + body: aiopsExampleStreamSchema, + }, + }, + async (context, request, response) => { + const maxTimeoutMs = request.body.timeout ?? 250; + const simulateError = request.body.simulateErrors ?? false; + + let shouldStop = false; + request.events.aborted$.subscribe(() => { + shouldStop = true; + }); + request.events.completed$.subscribe(() => { + shouldStop = true; + }); + + const { DELIMITER, end, push, responseWithHeaders, stream } = streamFactory< + typeof API_ENDPOINT.EXAMPLE_STREAM + >(logger, request.headers); + + const entities = [ + 'kimchy', + 's1monw', + 'martijnvg', + 'jasontedor', + 'nik9000', + 'javanna', + 'rjernst', + 'jrodewig', + ]; + + const actions = [...Array(19).fill('add'), 'delete']; + + if (simulateError) { + actions.push('server-only-error'); + actions.push('server-to-client-error'); + actions.push('client-error'); + } + + let progress = 0; + + async function pushStreamUpdate() { + setTimeout(() => { + try { + progress++; + + if (progress > 100 || shouldStop) { + end(); + return; + } + + push(updateProgressAction(progress)); + + const randomEntity = entities[Math.floor(Math.random() * entities.length)]; + const randomAction = actions[Math.floor(Math.random() * actions.length)]; + + if (randomAction === 'add') { + const randomCommits = Math.floor(Math.random() * 100); + push(addToEntityAction(randomEntity, randomCommits)); + } else if (randomAction === 'delete') { + push(deleteEntityAction(randomEntity)); + } else if (randomAction === 'server-to-client-error') { + // Throw an error. It should not crash Kibana! + throw new Error('There was a (simulated) server side error!'); + } else if (randomAction === 'client-error') { + // Return not properly encoded JSON to the client. + stream.push(`{body:'Not valid JSON${DELIMITER}`); + } + + pushStreamUpdate(); + } catch (error) { + stream.push( + `${JSON.stringify({ type: 'error', payload: error.toString() })}${DELIMITER}` + ); + end(); + } + }, Math.floor(Math.random() * maxTimeoutMs)); + } + + // do not call this using `await` so it will run asynchronously while we return the stream already. + pushStreamUpdate(); + + return response.ok(responseWithHeaders); + } + ); +}; diff --git a/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts new file mode 100644 index 0000000000000..f8aeb06435b76 --- /dev/null +++ b/x-pack/plugins/aiops/server/routes/explain_log_rate_spikes.ts @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { firstValueFrom } from 'rxjs'; + +import type { IRouter, Logger } from '@kbn/core/server'; +import type { DataRequestHandlerContext, IEsSearchRequest } from '@kbn/data-plugin/server'; + +import { + aiopsExplainLogRateSpikesSchema, + addFieldsAction, +} from '../../common/api/explain_log_rate_spikes'; +import { API_ENDPOINT } from '../../common/api'; + +import { streamFactory } from '../lib/stream_factory'; + +export const defineExplainLogRateSpikesRoute = ( + router: IRouter, + logger: Logger +) => { + router.post( + { + path: API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES, + validate: { + body: aiopsExplainLogRateSpikesSchema, + }, + }, + async (context, request, response) => { + const index = request.body.index; + + const controller = new AbortController(); + + let shouldStop = false; + request.events.aborted$.subscribe(() => { + shouldStop = true; + controller.abort(); + }); + request.events.completed$.subscribe(() => { + shouldStop = true; + controller.abort(); + }); + + const search = await context.search; + const res = await firstValueFrom( + search.search( + { + params: { + index, + body: { size: 1 }, + }, + } as IEsSearchRequest, + { abortSignal: controller.signal } + ) + ); + + const doc = res.rawResponse.hits.hits.pop(); + const fields = Object.keys(doc?._source ?? {}); + + const { end, push, responseWithHeaders } = streamFactory< + typeof API_ENDPOINT.EXPLAIN_LOG_RATE_SPIKES + >(logger, request.headers); + + async function pushField() { + setTimeout(() => { + if (shouldStop) { + end(); + return; + } + + const field = fields.pop(); + + if (field !== undefined) { + push(addFieldsAction([field])); + pushField(); + } else { + end(); + } + }, Math.random() * 1000); + } + + pushField(); + + return response.ok(responseWithHeaders); + } + ); +}; diff --git a/x-pack/plugins/aiops/server/routes/index.ts b/x-pack/plugins/aiops/server/routes/index.ts index e87c27e2af81e..d69ef6cc7df09 100755 --- a/x-pack/plugins/aiops/server/routes/index.ts +++ b/x-pack/plugins/aiops/server/routes/index.ts @@ -5,125 +5,5 @@ * 2.0. */ -import { Readable } from 'stream'; - -import type { IRouter, Logger } from '@kbn/core/server'; - -import { AIOPS_ENABLED } from '../../common'; -import type { ApiAction } from '../../common/api/example_stream'; -import { - aiopsExampleStreamSchema, - updateProgressAction, - addToEntityAction, - deleteEntityAction, -} from '../../common/api/example_stream'; - -// We need this otherwise Kibana server will crash with a 'ERR_METHOD_NOT_IMPLEMENTED' error. -class ResponseStream extends Readable { - _read(): void {} -} - -const delimiter = '\n'; - -export function defineRoutes(router: IRouter, logger: Logger) { - if (AIOPS_ENABLED) { - router.post( - { - path: '/internal/aiops/example_stream', - validate: { - body: aiopsExampleStreamSchema, - }, - }, - async (context, request, response) => { - const maxTimeoutMs = request.body.timeout ?? 250; - const simulateError = request.body.simulateErrors ?? false; - - let shouldStop = false; - request.events.aborted$.subscribe(() => { - shouldStop = true; - }); - request.events.completed$.subscribe(() => { - shouldStop = true; - }); - - const stream = new ResponseStream(); - - function streamPush(d: ApiAction) { - try { - const line = JSON.stringify(d); - stream.push(`${line}${delimiter}`); - } catch (error) { - logger.error('Could not serialize or stream a message.'); - logger.error(error); - } - } - - const entities = [ - 'kimchy', - 's1monw', - 'martijnvg', - 'jasontedor', - 'nik9000', - 'javanna', - 'rjernst', - 'jrodewig', - ]; - - const actions = [...Array(19).fill('add'), 'delete']; - - if (simulateError) { - actions.push('server-only-error'); - actions.push('server-to-client-error'); - actions.push('client-error'); - } - - let progress = 0; - - async function pushStreamUpdate() { - setTimeout(() => { - try { - progress++; - - if (progress > 100 || shouldStop) { - stream.push(null); - return; - } - - streamPush(updateProgressAction(progress)); - - const randomEntity = entities[Math.floor(Math.random() * entities.length)]; - const randomAction = actions[Math.floor(Math.random() * actions.length)]; - - if (randomAction === 'add') { - const randomCommits = Math.floor(Math.random() * 100); - streamPush(addToEntityAction(randomEntity, randomCommits)); - } else if (randomAction === 'delete') { - streamPush(deleteEntityAction(randomEntity)); - } else if (randomAction === 'server-to-client-error') { - // Throw an error. It should not crash Kibana! - throw new Error('There was a (simulated) server side error!'); - } else if (randomAction === 'client-error') { - // Return not properly encoded JSON to the client. - stream.push(`{body:'Not valid JSON${delimiter}`); - } - - pushStreamUpdate(); - } catch (error) { - stream.push( - `${JSON.stringify({ type: 'error', payload: error.toString() })}${delimiter}` - ); - stream.push(null); - } - }, Math.floor(Math.random() * maxTimeoutMs)); - } - - // do not call this using `await` so it will run asynchronously while we return the stream already. - pushStreamUpdate(); - - return response.ok({ - body: stream, - }); - } - ); - } -} +export { defineExampleStreamRoute } from './example_stream'; +export { defineExplainLogRateSpikesRoute } from './explain_log_rate_spikes'; diff --git a/x-pack/plugins/aiops/server/types.ts b/x-pack/plugins/aiops/server/types.ts index 526e7280e9495..3d27a9625db4c 100755 --- a/x-pack/plugins/aiops/server/types.ts +++ b/x-pack/plugins/aiops/server/types.ts @@ -5,6 +5,16 @@ * 2.0. */ +import { PluginSetup, PluginStart } from '@kbn/data-plugin/server'; + +export interface AiopsPluginSetupDeps { + data: PluginSetup; +} + +export interface AiopsPluginStartDeps { + data: PluginStart; +} + /** * aiops plugin server setup contract */ diff --git a/x-pack/plugins/ml/common/constants/locator.ts b/x-pack/plugins/ml/common/constants/locator.ts index 7b98eefe0ab24..a5b94836e5a1d 100644 --- a/x-pack/plugins/ml/common/constants/locator.ts +++ b/x-pack/plugins/ml/common/constants/locator.ts @@ -54,6 +54,8 @@ export const ML_PAGES = { OVERVIEW: 'overview', AIOPS: 'aiops', AIOPS_EXPLAIN_LOG_RATE_SPIKES: 'aiops/explain_log_rate_spikes', + AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT: 'aiops/explain_log_rate_spikes_index_select', + AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO: 'aiops/single_endpoint_streaming_demo', } as const; export type MlPages = typeof ML_PAGES[keyof typeof ML_PAGES]; diff --git a/x-pack/plugins/ml/common/types/locator.ts b/x-pack/plugins/ml/common/types/locator.ts index 0d5cb7aeddd81..742486c78b5bf 100644 --- a/x-pack/plugins/ml/common/types/locator.ts +++ b/x-pack/plugins/ml/common/types/locator.ts @@ -63,7 +63,9 @@ export type MlGenericUrlState = MLPageState< | typeof ML_PAGES.DATA_VISUALIZER_FILE | typeof ML_PAGES.DATA_VISUALIZER_INDEX_SELECT | typeof ML_PAGES.AIOPS - | typeof ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES, + | typeof ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES + | typeof ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT + | typeof ML_PAGES.AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO, MlGenericUrlPageState | undefined >; diff --git a/x-pack/plugins/ml/public/application/aiops/explain_log_rate_spikes.tsx b/x-pack/plugins/ml/public/application/aiops/explain_log_rate_spikes.tsx index 473525d40ca9a..39fa5272799fd 100644 --- a/x-pack/plugins/ml/public/application/aiops/explain_log_rate_spikes.tsx +++ b/x-pack/plugins/ml/public/application/aiops/explain_log_rate_spikes.tsx @@ -5,44 +5,32 @@ * 2.0. */ -import React, { FC, useEffect, useState } from 'react'; +import React, { FC } from 'react'; import { FormattedMessage } from '@kbn/i18n-react'; -import type { ExplainLogRateSpikesSpec } from '@kbn/aiops-plugin/public'; -import { useMlKibana, useTimefilter } from '../contexts/kibana'; +import { ExplainLogRateSpikes } from '@kbn/aiops-plugin/public'; + +import { useMlContext } from '../contexts/ml'; +import { useMlKibana } from '../contexts/kibana'; import { HelpMenu } from '../components/help_menu'; import { MlPageHeader } from '../components/page_header'; export const ExplainLogRateSpikesPage: FC = () => { - useTimefilter({ timeRangeSelector: false, autoRefreshSelector: false }); const { - services: { docLinks, aiops }, + services: { docLinks }, } = useMlKibana(); - const [ExplainLogRateSpikes, setExplainLogRateSpikes] = useState( - null - ); - - useEffect(() => { - if (aiops !== undefined) { - const { getExplainLogRateSpikesComponent } = aiops; - getExplainLogRateSpikesComponent().then(setExplainLogRateSpikes); - } - }, []); + const context = useMlContext(); return ( <> - {ExplainLogRateSpikes !== null ? ( - <> - - - - - - ) : null} + + + + ); diff --git a/x-pack/plugins/ml/public/application/aiops/single_endpoint_streaming_demo.tsx b/x-pack/plugins/ml/public/application/aiops/single_endpoint_streaming_demo.tsx new file mode 100644 index 0000000000000..fa2bc7f7051e4 --- /dev/null +++ b/x-pack/plugins/ml/public/application/aiops/single_endpoint_streaming_demo.tsx @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React, { FC } from 'react'; +import { FormattedMessage } from '@kbn/i18n-react'; +import { SingleEndpointStreamingDemo } from '@kbn/aiops-plugin/public'; +import { useMlKibana, useTimefilter } from '../contexts/kibana'; +import { HelpMenu } from '../components/help_menu'; + +import { MlPageHeader } from '../components/page_header'; + +export const SingleEndpointStreamingDemoPage: FC = () => { + useTimefilter({ timeRangeSelector: false, autoRefreshSelector: false }); + const { + services: { docLinks }, + } = useMlKibana(); + + return ( + <> + + + + + + + ); +}; diff --git a/x-pack/plugins/ml/public/application/components/ml_page/side_nav.tsx b/x-pack/plugins/ml/public/application/components/ml_page/side_nav.tsx index 84474e85330d6..250dbc52cfd9c 100644 --- a/x-pack/plugins/ml/public/application/components/ml_page/side_nav.tsx +++ b/x-pack/plugins/ml/public/application/components/ml_page/side_nav.tsx @@ -229,13 +229,22 @@ export function useSideNavItems(activeRoute: MlRoute | undefined) { items: [ { id: 'explainlogratespikes', - pathId: ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES, + pathId: ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT, name: i18n.translate('xpack.ml.navMenu.explainLogRateSpikesLinkText', { defaultMessage: 'Explain log rate spikes', }), disabled: disableLinks, testSubj: 'mlMainTab explainLogRateSpikes', }, + { + id: 'singleEndpointStreamingDemo', + pathId: ML_PAGES.AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO, + name: i18n.translate('xpack.ml.navMenu.singleEndpointStreamingDemoLinkText', { + defaultMessage: 'Single endpoint streaming demo', + }), + disabled: disableLinks, + testSubj: 'mlMainTab singleEndpointStreamingDemo', + }, ], }); } diff --git a/x-pack/plugins/ml/public/application/contexts/ml/ml_context.ts b/x-pack/plugins/ml/public/application/contexts/ml/ml_context.ts index 2a8806bf3ff38..8b755b02f99b9 100644 --- a/x-pack/plugins/ml/public/application/contexts/ml/ml_context.ts +++ b/x-pack/plugins/ml/public/application/contexts/ml/ml_context.ts @@ -6,9 +6,9 @@ */ import React from 'react'; -import { DataView, DataViewsContract } from '@kbn/data-views-plugin/public'; -import { SavedSearchSavedObject } from '../../../../common/types/kibana'; -import { MlServicesContext } from '../../app'; +import type { DataView, DataViewsContract } from '@kbn/data-views-plugin/public'; +import type { SavedSearchSavedObject } from '../../../../common/types/kibana'; +import type { MlServicesContext } from '../../app'; export interface MlContextValue { combinedQuery: any; diff --git a/x-pack/plugins/ml/public/application/routing/breadcrumbs.ts b/x-pack/plugins/ml/public/application/routing/breadcrumbs.ts index 54aedb4a71857..38ace0233cbb8 100644 --- a/x-pack/plugins/ml/public/application/routing/breadcrumbs.ts +++ b/x-pack/plugins/ml/public/application/routing/breadcrumbs.ts @@ -59,7 +59,7 @@ export const AIOPS_BREADCRUMB: ChromeBreadcrumb = Object.freeze({ text: i18n.translate('xpack.ml.aiopsBreadcrumbLabel', { defaultMessage: 'AIOps', }), - href: '/aiops', + href: '/aiops/explain_log_rate_spikes_index_select', }); export const CREATE_JOB_BREADCRUMB: ChromeBreadcrumb = Object.freeze({ diff --git a/x-pack/plugins/ml/public/application/routing/routes/aiops/explain_log_rate_spikes.tsx b/x-pack/plugins/ml/public/application/routing/routes/aiops/explain_log_rate_spikes.tsx index ca670df258a6a..5fac891a79675 100644 --- a/x-pack/plugins/ml/public/application/routing/routes/aiops/explain_log_rate_spikes.tsx +++ b/x-pack/plugins/ml/public/application/routing/routes/aiops/explain_log_rate_spikes.tsx @@ -37,7 +37,7 @@ export const explainLogRateSpikesRouteFactory = ( getBreadcrumbWithUrlForApp('ML_BREADCRUMB', navigateToPath, basePath), getBreadcrumbWithUrlForApp('AIOPS_BREADCRUMB', navigateToPath, basePath), { - text: i18n.translate('xpack.ml.AiopsBreadcrumbs.explainLogRateSpikesLabel', { + text: i18n.translate('xpack.ml.aiopsBreadcrumbs.explainLogRateSpikesLabel', { defaultMessage: 'Explain log rate spikes', }), }, diff --git a/x-pack/plugins/ml/public/application/routing/routes/aiops/index.ts b/x-pack/plugins/ml/public/application/routing/routes/aiops/index.ts index f2b192a4cd097..10f0eba1adeda 100644 --- a/x-pack/plugins/ml/public/application/routing/routes/aiops/index.ts +++ b/x-pack/plugins/ml/public/application/routing/routes/aiops/index.ts @@ -6,3 +6,4 @@ */ export * from './explain_log_rate_spikes'; +export * from './single_endpoint_streaming_demo'; diff --git a/x-pack/plugins/ml/public/application/routing/routes/aiops/single_endpoint_streaming_demo.tsx b/x-pack/plugins/ml/public/application/routing/routes/aiops/single_endpoint_streaming_demo.tsx new file mode 100644 index 0000000000000..636357518d0d0 --- /dev/null +++ b/x-pack/plugins/ml/public/application/routing/routes/aiops/single_endpoint_streaming_demo.tsx @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React, { FC } from 'react'; +import { parse } from 'query-string'; + +import { i18n } from '@kbn/i18n'; + +import { AIOPS_ENABLED } from '@kbn/aiops-plugin/common'; + +import { NavigateToPath } from '../../../contexts/kibana'; + +import { MlRoute, PageLoader, PageProps } from '../../router'; +import { useResolver } from '../../use_resolver'; +import { SingleEndpointStreamingDemoPage as Page } from '../../../aiops/single_endpoint_streaming_demo'; + +import { checkBasicLicense } from '../../../license'; +import { checkGetJobsCapabilitiesResolver } from '../../../capabilities/check_capabilities'; +import { cacheDataViewsContract } from '../../../util/index_utils'; +import { getBreadcrumbWithUrlForApp } from '../../breadcrumbs'; + +export const singleEndpointStreamingDemoRouteFactory = ( + navigateToPath: NavigateToPath, + basePath: string +): MlRoute => ({ + id: 'single_endpoint_streaming_demo', + path: '/aiops/single_endpoint_streaming_demo', + title: i18n.translate('xpack.ml.aiops.singleEndpointStreamingDemo.docTitle', { + defaultMessage: 'Single endpoint streaming demo', + }), + render: (props, deps) => , + breadcrumbs: [ + getBreadcrumbWithUrlForApp('ML_BREADCRUMB', navigateToPath, basePath), + getBreadcrumbWithUrlForApp('AIOPS_BREADCRUMB', navigateToPath, basePath), + { + text: i18n.translate('xpack.ml.aiopsBreadcrumbs.singleEndpointStreamingDemoLabel', { + defaultMessage: 'Single endpoint streaming demo', + }), + }, + ], + disabled: !AIOPS_ENABLED, +}); + +const PageWrapper: FC = ({ location, deps }) => { + const { redirectToMlAccessDeniedPage } = deps; + + const { index, savedSearchId }: Record = parse(location.search, { sort: false }); + const { context } = useResolver(index, savedSearchId, deps.config, deps.dataViewsContract, { + checkBasicLicense, + cacheDataViewsContract: () => cacheDataViewsContract(deps.dataViewsContract), + checkGetJobsCapabilities: () => checkGetJobsCapabilitiesResolver(redirectToMlAccessDeniedPage), + }); + + return ( + + + + ); +}; diff --git a/x-pack/plugins/ml/public/application/routing/routes/new_job/index_or_search.tsx b/x-pack/plugins/ml/public/application/routing/routes/new_job/index_or_search.tsx index d1d547ca8bc90..5ea3bfa9d35eb 100644 --- a/x-pack/plugins/ml/public/application/routing/routes/new_job/index_or_search.tsx +++ b/x-pack/plugins/ml/public/application/routing/routes/new_job/index_or_search.tsx @@ -50,6 +50,16 @@ const getDataVisBreadcrumbs = (navigateToPath: NavigateToPath, basePath: string) }, ]; +const getExplainLogRateSpikesBreadcrumbs = (navigateToPath: NavigateToPath, basePath: string) => [ + getBreadcrumbWithUrlForApp('ML_BREADCRUMB', navigateToPath, basePath), + getBreadcrumbWithUrlForApp('AIOPS_BREADCRUMB', navigateToPath, basePath), + { + text: i18n.translate('xpack.ml.aiopsBreadcrumbs.selectDateViewLabel', { + defaultMessage: 'Data View', + }), + }, +]; + export const indexOrSearchRouteFactory = ( navigateToPath: NavigateToPath, basePath: string @@ -86,6 +96,26 @@ export const dataVizIndexOrSearchRouteFactory = ( breadcrumbs: getDataVisBreadcrumbs(navigateToPath, basePath), }); +export const explainLogRateSpikesIndexOrSearchRouteFactory = ( + navigateToPath: NavigateToPath, + basePath: string +): MlRoute => ({ + id: 'data_view_explain_log_rate_spikes', + path: '/aiops/explain_log_rate_spikes_index_select', + title: i18n.translate('xpack.ml.selectDataViewLabel', { + defaultMessage: 'Select Data View', + }), + render: (props, deps) => ( + + ), + breadcrumbs: getExplainLogRateSpikesBreadcrumbs(navigateToPath, basePath), +}); + const PageWrapper: FC = ({ nextStepPath, deps, mode }) => { const { services: { diff --git a/x-pack/plugins/ml/public/locator/ml_locator.ts b/x-pack/plugins/ml/public/locator/ml_locator.ts index 295dbaebbbae6..b36029329c087 100644 --- a/x-pack/plugins/ml/public/locator/ml_locator.ts +++ b/x-pack/plugins/ml/public/locator/ml_locator.ts @@ -86,6 +86,8 @@ export class MlLocatorDefinition implements LocatorDefinition { case ML_PAGES.DATA_VISUALIZER_INDEX_SELECT: case ML_PAGES.AIOPS: case ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES: + case ML_PAGES.AIOPS_EXPLAIN_LOG_RATE_SPIKES_INDEX_SELECT: + case ML_PAGES.AIOPS_SINGLE_ENDPOINT_STREAMING_DEMO: case ML_PAGES.OVERVIEW: case ML_PAGES.SETTINGS: case ML_PAGES.FILTER_LISTS_MANAGE: diff --git a/x-pack/test/api_integration/apis/aiops/example_stream.ts b/x-pack/test/api_integration/apis/aiops/example_stream.ts index 693a6de2c6716..c1e410655dbfc 100644 --- a/x-pack/test/api_integration/apis/aiops/example_stream.ts +++ b/x-pack/test/api_integration/apis/aiops/example_stream.ts @@ -12,6 +12,8 @@ import expect from '@kbn/expect'; import { FtrProviderContext } from '../../ftr_provider_context'; +import { parseStream } from './parse_stream'; + export default ({ getService }: FtrProviderContext) => { const supertest = getService('supertest'); const config = getService('config'); @@ -67,34 +69,15 @@ export default ({ getService }: FtrProviderContext) => { expect(stream).not.to.be(null); if (stream !== null) { - let partial = ''; - let threw = false; const progressData: any[] = []; - try { - for await (const value of stream) { - const full = `${partial}${value}`; - const parts = full.split('\n'); - const last = parts.pop(); - - partial = last ?? ''; - - const actions = parts.map((p) => JSON.parse(p)); - - actions.forEach((action) => { - expect(typeof action.type).to.be('string'); - - if (action.type === 'update_progress') { - progressData.push(action); - } - }); + for await (const action of parseStream(stream)) { + expect(action.type).not.to.be('error'); + if (action.type === 'update_progress') { + progressData.push(action); } - } catch (e) { - threw = true; } - expect(threw).to.be(false); - expect(progressData.length).to.be(100); expect(progressData[0].payload).to.be(1); expect(progressData[progressData.length - 1].payload).to.be(100); diff --git a/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts b/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts new file mode 100644 index 0000000000000..11ef63809a52f --- /dev/null +++ b/x-pack/test/api_integration/apis/aiops/explain_log_rate_spikes.ts @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import fetch from 'node-fetch'; +import { format as formatUrl } from 'url'; + +import expect from '@kbn/expect'; + +import { FtrProviderContext } from '../../ftr_provider_context'; + +import { parseStream } from './parse_stream'; + +export default ({ getService }: FtrProviderContext) => { + const supertest = getService('supertest'); + const config = getService('config'); + const kibanaServerUrl = formatUrl(config.get('servers.kibana')); + + const expectedFields = [ + 'category', + 'currency', + 'customer_first_name', + 'customer_full_name', + 'customer_gender', + 'customer_id', + 'customer_last_name', + 'customer_phone', + 'day_of_week', + 'day_of_week_i', + 'email', + 'geoip', + 'manufacturer', + 'order_date', + 'order_id', + 'products', + 'sku', + 'taxful_total_price', + 'taxless_total_price', + 'total_quantity', + 'total_unique_products', + 'type', + 'user', + ]; + + describe('POST /internal/aiops/explain_log_rate_spikes', () => { + const esArchiver = getService('esArchiver'); + + before(async () => { + await esArchiver.loadIfNeeded('x-pack/test/functional/es_archives/ml/ecommerce'); + }); + + after(async () => { + await esArchiver.unload('x-pack/test/functional/es_archives/ml/ecommerce'); + }); + + it('should return full data without streaming', async () => { + const resp = await supertest + .post(`/internal/aiops/explain_log_rate_spikes`) + .set('kbn-xsrf', 'kibana') + .send({ + index: 'ft_ecommerce', + }) + .expect(200); + + expect(Buffer.isBuffer(resp.body)).to.be(true); + + const chunks: string[] = resp.body.toString().split('\n'); + + expect(chunks.length).to.be(24); + + const lastChunk = chunks.pop(); + expect(lastChunk).to.be(''); + + let data: any[] = []; + + expect(() => { + data = chunks.map((c) => JSON.parse(c)); + }).not.to.throwError(); + + data.forEach((d) => { + expect(typeof d.type).to.be('string'); + }); + + const fields = data.map((d) => d.payload[0]).sort(); + + expect(fields.length).to.equal(expectedFields.length); + fields.forEach((f) => { + expect(expectedFields.includes(f)); + }); + }); + + it('should return data in chunks with streaming', async () => { + const response = await fetch(`${kibanaServerUrl}/internal/aiops/explain_log_rate_spikes`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'kbn-xsrf': 'stream', + }, + body: JSON.stringify({ index: 'ft_ecommerce' }), + }); + + const stream = response.body; + + expect(stream).not.to.be(null); + + if (stream !== null) { + const data: any[] = []; + + for await (const action of parseStream(stream)) { + expect(action.type).not.to.be('error'); + data.push(action); + } + + const fields = data.map((d) => d.payload[0]).sort(); + + expect(fields.length).to.equal(expectedFields.length); + fields.forEach((f) => { + expect(expectedFields.includes(f)); + }); + } + }); + }); +}; diff --git a/x-pack/test/api_integration/apis/aiops/index.ts b/x-pack/test/api_integration/apis/aiops/index.ts index 04b4181906dbf..d2aacc454b567 100644 --- a/x-pack/test/api_integration/apis/aiops/index.ts +++ b/x-pack/test/api_integration/apis/aiops/index.ts @@ -12,5 +12,6 @@ export default function ({ loadTestFile }: FtrProviderContext) { this.tags(['ml']); loadTestFile(require.resolve('./example_stream')); + loadTestFile(require.resolve('./explain_log_rate_spikes')); }); } diff --git a/x-pack/test/api_integration/apis/aiops/parse_stream.ts b/x-pack/test/api_integration/apis/aiops/parse_stream.ts new file mode 100644 index 0000000000000..f3da52e6024bb --- /dev/null +++ b/x-pack/test/api_integration/apis/aiops/parse_stream.ts @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export async function* parseStream(stream: NodeJS.ReadableStream) { + let partial = ''; + + try { + for await (const value of stream) { + const full = `${partial}${value}`; + const parts = full.split('\n'); + const last = parts.pop(); + + partial = last ?? ''; + + const actions = parts.map((p) => JSON.parse(p)); + + for (const action of actions) { + yield action; + } + } + } catch (error) { + yield { type: 'error', payload: error.toString() }; + } +}