Skip to content

Commit

Permalink
[8.14] [Obs AI Assistant] Keep connection open, limit no of fields (e…
Browse files Browse the repository at this point in the history
…lastic#186811) (elastic#187131)

# Backport

This will backport the following commits from `main` to `8.14`:
- [[Obs AI Assistant] Keep connection open, limit no of fields
(elastic#186811)](elastic#186811)

<!--- Backport version: 7.3.2 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT {commits} BACKPORT-->
  • Loading branch information
dgieselaar authored Jul 17, 2024
1 parent 5afbed3 commit 5dd1739
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export async function getRelevantFieldNames({
messages: Message[];
chat: FunctionCallChatFunction;
signal: AbortSignal;
}): Promise<{ fields: string[] }> {
}): Promise<{ fields: string[]; stats: { analyzed: number; total: number } }> {
const dataViewsService = await dataViews.dataViewsServiceFactory(savedObjectsClient, esClient);

const fields = await dataViewsService.getFieldsForWildcard({
Expand Down Expand Up @@ -71,8 +71,13 @@ export async function getRelevantFieldNames({

const groupedFields = groupBy(allFields, (field) => field.name);

const MAX_CHUNKS = 5;
const FIELD_NAMES_PER_CHUNK = 250;

const fieldNamesToAnalyze = fieldNames.slice(0, MAX_CHUNKS * FIELD_NAMES_PER_CHUNK);

const relevantFields = await Promise.all(
chunk(fieldNames, 500).map(async (fieldsInChunk) => {
chunk(fieldNamesToAnalyze, FIELD_NAMES_PER_CHUNK).map(async (fieldsInChunk) => {
const chunkResponse$ = (
await chat('get_relevent_dataset_names', {
signal,
Expand Down Expand Up @@ -138,5 +143,8 @@ export async function getRelevantFieldNames({
})
);

return { fields: relevantFields.flat() };
return {
fields: relevantFields.flat(),
stats: { analyzed: fieldNamesToAnalyze.length, total: fieldNames.length },
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export function registerGetDatasetInfoFunction({

try {
const body = await esClient.indices.resolveIndex({
name: index === '' ? '*' : index,
name: index === '' ? '*' : index.split(','),
expand_wildcards: 'open',
});
indices = [
Expand Down Expand Up @@ -86,11 +86,11 @@ export function registerGetDatasetInfoFunction({
signal,
chat,
});

return {
content: {
indices: [index],
fields: relevantFieldNames,
fields: relevantFieldNames.fields,
stats: relevantFieldNames.stats,
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import { repeat } from 'lodash';
import { identity, Observable, OperatorFunction } from 'rxjs';
import { Observable, OperatorFunction } from 'rxjs';
import {
BufferFlushEvent,
StreamingChatResponseEventType,
Expand All @@ -22,10 +22,6 @@ import {
export function flushBuffer<T extends StreamingChatResponseEventWithoutError | TokenCountEvent>(
isCloud: boolean
): OperatorFunction<T, T | BufferFlushEvent> {
if (!isCloud) {
return identity;
}

return (source: Observable<T>) =>
new Observable<T | BufferFlushEvent>((subscriber) => {
const cloudProxyBufferSize = 4096;
Expand All @@ -41,7 +37,15 @@ export function flushBuffer<T extends StreamingChatResponseEventWithoutError | T
}
};

const intervalId = setInterval(flushBufferIfNeeded, 250);
const keepAlive = () => {
subscriber.next({
data: '0',
type: StreamingChatResponseEventType.BufferFlush,
});
};

const flushIntervalId = isCloud ? setInterval(flushBufferIfNeeded, 250) : undefined;
const keepAliveIntervalId = setInterval(keepAlive, 30_000);

source.subscribe({
next: (value) => {
Expand All @@ -52,11 +56,13 @@ export function flushBuffer<T extends StreamingChatResponseEventWithoutError | T
subscriber.next(value);
},
error: (error) => {
clearInterval(intervalId);
clearInterval(flushIntervalId);
clearInterval(keepAliveIntervalId);
subscriber.error(error);
},
complete: () => {
clearInterval(intervalId);
clearInterval(flushIntervalId);
clearInterval(keepAliveIntervalId);
subscriber.complete();
},
});
Expand Down

0 comments on commit 5dd1739

Please sign in to comment.