Skip to content

Commit

Permalink
[Observability AI Assistant] migrate to inference client elastic#197630
Browse files Browse the repository at this point in the history
… (elastic#199286)

## Summary

Closes elastic#183245

Closes elastic#197630 
[Observability AI Assistant] Partially migrate to inference client 

replacing `inferenceClient.chatComplete` to
`observabilityAIAssistantClient.chat` -
`observabilityAIAssistantClient.complete` does a bunch of stuff on top
of `chat`. keepping `observabilityAIAssistantClient.chat` as a wrapper
for now because it also adds instrumentation and logging.
  • Loading branch information
arturoliduena authored and Samiul-TheSoccerFan committed Dec 10, 2024
1 parent 71751d0 commit 9f462f3
Show file tree
Hide file tree
Showing 47 changed files with 559 additions and 2,667 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,6 @@ export enum ObservabilityAIAssistantConnectorType {
Gemini = '.gemini',
}

export const SUPPORTED_CONNECTOR_TYPES = [
ObservabilityAIAssistantConnectorType.OpenAI,
ObservabilityAIAssistantConnectorType.Bedrock,
ObservabilityAIAssistantConnectorType.Gemini,
];

export function isSupportedConnectorType(
type: string
): type is ObservabilityAIAssistantConnectorType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { TokenCount as TokenCountType, type Message } from './types';

export enum StreamingChatResponseEventType {
ChatCompletionChunk = 'chatCompletionChunk',
ChatCompletionMessage = 'chatCompletionMessage',
ConversationCreate = 'conversationCreate',
ConversationUpdate = 'conversationUpdate',
MessageAdd = 'messageAdd',
Expand All @@ -25,19 +26,26 @@ type StreamingChatResponseEventBase<
type: TEventType;
} & TData;

export type ChatCompletionChunkEvent = StreamingChatResponseEventBase<
StreamingChatResponseEventType.ChatCompletionChunk,
{
id: string;
message: {
content?: string;
function_call?: {
name?: string;
arguments?: string;
type BaseChatCompletionEvent<TType extends StreamingChatResponseEventType> =
StreamingChatResponseEventBase<
TType,
{
id: string;
message: {
content?: string;
function_call?: {
name?: string;
arguments?: string;
};
};
};
}
>;
}
>;

export type ChatCompletionChunkEvent =
BaseChatCompletionEvent<StreamingChatResponseEventType.ChatCompletionChunk>;

export type ChatCompletionMessageEvent =
BaseChatCompletionEvent<StreamingChatResponseEventType.ChatCompletionMessage>;

export type ConversationCreateEvent = StreamingChatResponseEventBase<
StreamingChatResponseEventType.ConversationCreate,
Expand Down Expand Up @@ -100,6 +108,7 @@ export type TokenCountEvent = StreamingChatResponseEventBase<

export type StreamingChatResponseEvent =
| ChatCompletionChunkEvent
| ChatCompletionMessageEvent
| ConversationCreateEvent
| ConversationUpdateEvent
| MessageAddEvent
Expand All @@ -112,7 +121,7 @@ export type StreamingChatResponseEventWithoutError = Exclude<
ChatCompletionErrorEvent
>;

export type ChatEvent = ChatCompletionChunkEvent | TokenCountEvent;
export type ChatEvent = ChatCompletionChunkEvent | TokenCountEvent | ChatCompletionMessageEvent;
export type MessageOrChatEvent = ChatEvent | MessageAddEvent;

export enum ChatCompletionErrorCode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
* 2.0.
*/

import { Message, MessageRole } from '@kbn/observability-ai-assistant-plugin/common';
import {
AssistantMessage,
Message as InferenceMessage,
MessageRole as InferenceMessageRole,
} from '@kbn/inference-common';
import { generateFakeToolCallId } from '@kbn/inference-plugin/common';
import { Message, MessageRole } from '.';

export function convertMessagesForInference(messages: Message[]): InferenceMessage[] {
const inferenceMessages: InferenceMessage[] = [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export {

export type {
ChatCompletionChunkEvent,
ChatCompletionMessageEvent,
TokenCountEvent,
ConversationCreateEvent,
ConversationUpdateEvent,
MessageAddEvent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
*/

import { cloneDeep } from 'lodash';
import { type Observable, scan } from 'rxjs';
import type { ChatCompletionChunkEvent } from '../conversation_complete';
import { type Observable, scan, filter, defaultIfEmpty } from 'rxjs';
import type { ChatCompletionChunkEvent, ChatEvent } from '../conversation_complete';
import { StreamingChatResponseEventType } from '../conversation_complete';
import { MessageRole } from '../types';

export interface ConcatenatedMessage {
Expand All @@ -24,8 +25,12 @@ export interface ConcatenatedMessage {

export const concatenateChatCompletionChunks =
() =>
(source: Observable<ChatCompletionChunkEvent>): Observable<ConcatenatedMessage> =>
(source: Observable<ChatEvent>): Observable<ConcatenatedMessage> =>
source.pipe(
filter(
(event): event is ChatCompletionChunkEvent =>
event.type === StreamingChatResponseEventType.ChatCompletionChunk
),
scan(
(acc, { message }) => {
acc.message.content += message.content ?? '';
Expand All @@ -45,5 +50,16 @@ export const concatenateChatCompletionChunks =
role: MessageRole.Assistant,
},
} as ConcatenatedMessage
)
),
defaultIfEmpty({
message: {
content: '',
function_call: {
name: '',
arguments: '',
trigger: MessageRole.Assistant,
},
role: MessageRole.Assistant,
},
})
);
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import {
OperatorFunction,
shareReplay,
withLatestFrom,
filter,
} from 'rxjs';
import { withoutTokenCountEvents } from './without_token_count_events';
import {
ChatCompletionChunkEvent,
type ChatCompletionChunkEvent,
ChatEvent,
MessageAddEvent,
StreamingChatResponseEventType,
StreamingChatResponseEvent,
} from '../conversation_complete';
import {
concatenateChatCompletionChunks,
Expand Down Expand Up @@ -51,13 +53,23 @@ function mergeWithEditedMessage(
);
}

function filterChunkEvents(): OperatorFunction<
StreamingChatResponseEvent,
ChatCompletionChunkEvent
> {
return filter(
(event): event is ChatCompletionChunkEvent =>
event.type === StreamingChatResponseEventType.ChatCompletionChunk
);
}

export function emitWithConcatenatedMessage<T extends ChatEvent>(
callback?: ConcatenateMessageCallback
): OperatorFunction<T, T | MessageAddEvent> {
return (source$) => {
const shared = source$.pipe(shareReplay());

const withoutTokenCount$ = shared.pipe(withoutTokenCountEvents());
const withoutTokenCount$ = shared.pipe(filterChunkEvents());

const response$ = concat(
shared,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,38 +1,26 @@
{
"type": "plugin",
"id": "@kbn/observability-ai-assistant-plugin",
"owner": [
"@elastic/obs-ai-assistant"
],
"owner": ["@elastic/obs-ai-assistant"],
"group": "platform",
"visibility": "shared",
"plugin": {
"id": "observabilityAIAssistant",
"browser": true,
"server": true,
"configPath": [
"xpack",
"observabilityAIAssistant"
],
"configPath": ["xpack", "observabilityAIAssistant"],
"requiredPlugins": [
"actions",
"features",
"licensing",
"security",
"taskManager",
"dataViews"
],
"optionalPlugins": [
"cloud",
"serverless"
],
"requiredBundles": [
"kibanaReact",
"kibanaUtils"
],
"runtimePluginDependencies": [
"ml"
"dataViews",
"inference"
],
"optionalPlugins": ["cloud", "serverless"],
"requiredBundles": ["kibanaReact", "kibanaUtils"],
"runtimePluginDependencies": ["ml"],
"extraPublicDirs": []
}
}
Loading

0 comments on commit 9f462f3

Please sign in to comment.