From 5dd1739bbf4ed5ee68d75345854bba93e349e837 Mon Sep 17 00:00:00 2001 From: Dario Gieselaar Date: Wed, 17 Jul 2024 18:34:02 +0200 Subject: [PATCH] [8.14] [Obs AI Assistant] Keep connection open, limit no of fields (#186811) (#187131) # Backport This will backport the following commits from `main` to `8.14`: - [[Obs AI Assistant] Keep connection open, limit no of fields (#186811)](https://github.com/elastic/kibana/pull/186811) ### Questions ? Please refer to the [Backport tool documentation](https://github.com/sqren/backport) --- .../get_relevant_field_names.ts | 14 +++++++++--- .../functions/get_dataset_info/index.ts | 6 ++--- .../server/service/util/flush_buffer.ts | 22 ++++++++++++------- 3 files changed, 28 insertions(+), 14 deletions(-) diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/get_relevant_field_names.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/get_relevant_field_names.ts index 543641098836f..e17d569376f90 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/get_relevant_field_names.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/get_relevant_field_names.ts @@ -33,7 +33,7 @@ export async function getRelevantFieldNames({ messages: Message[]; chat: FunctionCallChatFunction; signal: AbortSignal; -}): Promise<{ fields: string[] }> { +}): Promise<{ fields: string[]; stats: { analyzed: number; total: number } }> { const dataViewsService = await dataViews.dataViewsServiceFactory(savedObjectsClient, esClient); const fields = await dataViewsService.getFieldsForWildcard({ @@ -71,8 +71,13 @@ export async function getRelevantFieldNames({ const groupedFields = groupBy(allFields, (field) => field.name); + const MAX_CHUNKS = 5; + const FIELD_NAMES_PER_CHUNK = 250; + + const fieldNamesToAnalyze = fieldNames.slice(0, MAX_CHUNKS * FIELD_NAMES_PER_CHUNK); + const relevantFields = await Promise.all( - chunk(fieldNames, 500).map(async (fieldsInChunk) => { + chunk(fieldNamesToAnalyze, FIELD_NAMES_PER_CHUNK).map(async (fieldsInChunk) => { const chunkResponse$ = ( await chat('get_relevent_dataset_names', { signal, @@ -138,5 +143,8 @@ export async function getRelevantFieldNames({ }) ); - return { fields: relevantFields.flat() }; + return { + fields: relevantFields.flat(), + stats: { analyzed: fieldNamesToAnalyze.length, total: fieldNames.length }, + }; } diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/index.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/index.ts index e5b4e21195003..e8fa125c8049b 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/index.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/functions/get_dataset_info/index.ts @@ -47,7 +47,7 @@ export function registerGetDatasetInfoFunction({ try { const body = await esClient.indices.resolveIndex({ - name: index === '' ? '*' : index, + name: index === '' ? '*' : index.split(','), expand_wildcards: 'open', }); indices = [ @@ -86,11 +86,11 @@ export function registerGetDatasetInfoFunction({ signal, chat, }); - return { content: { indices: [index], - fields: relevantFieldNames, + fields: relevantFieldNames.fields, + stats: relevantFieldNames.stats, }, }; } diff --git a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/flush_buffer.ts b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/flush_buffer.ts index eb494ec80bb50..a9826a180c969 100644 --- a/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/flush_buffer.ts +++ b/x-pack/plugins/observability_solution/observability_ai_assistant/server/service/util/flush_buffer.ts @@ -6,7 +6,7 @@ */ import { repeat } from 'lodash'; -import { identity, Observable, OperatorFunction } from 'rxjs'; +import { Observable, OperatorFunction } from 'rxjs'; import { BufferFlushEvent, StreamingChatResponseEventType, @@ -22,10 +22,6 @@ import { export function flushBuffer( isCloud: boolean ): OperatorFunction { - if (!isCloud) { - return identity; - } - return (source: Observable) => new Observable((subscriber) => { const cloudProxyBufferSize = 4096; @@ -41,7 +37,15 @@ export function flushBuffer { + subscriber.next({ + data: '0', + type: StreamingChatResponseEventType.BufferFlush, + }); + }; + + const flushIntervalId = isCloud ? setInterval(flushBufferIfNeeded, 250) : undefined; + const keepAliveIntervalId = setInterval(keepAlive, 30_000); source.subscribe({ next: (value) => { @@ -52,11 +56,13 @@ export function flushBuffer { - clearInterval(intervalId); + clearInterval(flushIntervalId); + clearInterval(keepAliveIntervalId); subscriber.error(error); }, complete: () => { - clearInterval(intervalId); + clearInterval(flushIntervalId); + clearInterval(keepAliveIntervalId); subscriber.complete(); }, });