From fae73dc8e500e3e665fa4791f8c463033e69b2e0 Mon Sep 17 00:00:00 2001 From: Simone Vellei Date: Tue, 14 May 2024 23:40:27 +0200 Subject: [PATCH] Implement embeddings observer (#195) --- embedder/cohere/cohere.go | 49 ++++++++++++++++++++++++- embedder/huggingface/huggingface.go | 57 +++++++++++++++++++++++++++-- embedder/nomic/nomic.go | 52 +++++++++++++++++++++++--- embedder/observer/observer.go | 49 +++++++++++++++++++++++++ embedder/ollama/ollama.go | 52 +++++++++++++++++++++++--- embedder/openai/openai.go | 52 ++++++++++++++++++++++++-- embedder/voyage/voyage.go | 55 ++++++++++++++++++++++++++-- observer/langfuse/formatter.go | 14 +++++++ observer/langfuse/langfuse.go | 19 ++++++++++ observer/observer.go | 13 +++++++ 10 files changed, 389 insertions(+), 23 deletions(-) create mode 100644 embedder/observer/observer.go diff --git a/embedder/cohere/cohere.go b/embedder/cohere/cohere.go index 464a41c6..dc5b8552 100644 --- a/embedder/cohere/cohere.go +++ b/embedder/cohere/cohere.go @@ -8,7 +8,10 @@ import ( "github.com/henomis/cohere-go/model" "github.com/henomis/cohere-go/request" "github.com/henomis/cohere-go/response" + "github.com/henomis/lingoose/embedder" + embobserver "github.com/henomis/lingoose/embedder/observer" + "github.com/henomis/lingoose/observer" ) type EmbedderModel = model.EmbedModel @@ -36,14 +39,18 @@ var EmbedderModelsSize = map[EmbedderModel]int{ } type Embedder struct { - model EmbedderModel - client *coherego.Client + model EmbedderModel + client *coherego.Client + name string + observer embobserver.EmbeddingObserver + observerTraceID string } func New() *Embedder { return &Embedder{ client: coherego.New(os.Getenv("COHERE_API_KEY")), model: defaultEmbedderModel, + name: "cohere", } } @@ -61,6 +68,44 @@ func (e *Embedder) WithModel(model EmbedderModel) *Embedder { // Embed returns the embeddings for the given texts func (e *Embedder) Embed(ctx context.Context, texts []string) ([]embedder.Embedding, error) { + var observerEmbedding *observer.Embedding + var err error + + if e.observer != nil { + observerEmbedding, err = embobserver.StartObserveEmbedding( + e.observer, + e.name, + string(e.model), + nil, + e.observerTraceID, + observer.ContextValueParentID(ctx), + texts, + ) + if err != nil { + return nil, err + } + } + + embeddings, err := e.embed(ctx, texts) + if err != nil { + return nil, err + } + + if e.observer != nil { + err = embobserver.StopObserveEmbedding( + e.observer, + observerEmbedding, + embeddings, + ) + if err != nil { + return nil, err + } + } + + return embeddings, nil +} + +func (e *Embedder) embed(ctx context.Context, texts []string) ([]embedder.Embedding, error) { resp := &response.Embed{} err := e.client.Embed( ctx, diff --git a/embedder/huggingface/huggingface.go b/embedder/huggingface/huggingface.go index 47116346..1a4867b4 100644 --- a/embedder/huggingface/huggingface.go +++ b/embedder/huggingface/huggingface.go @@ -6,6 +6,8 @@ import ( "os" "github.com/henomis/lingoose/embedder" + embobserver "github.com/henomis/lingoose/embedder/observer" + "github.com/henomis/lingoose/observer" ) const ( @@ -13,9 +15,12 @@ const ( ) type HuggingFaceEmbedder struct { - token string - model string - httpClient *http.Client + token string + model string + httpClient *http.Client + name string + observer embobserver.EmbeddingObserver + observerTraceID string } func New() *HuggingFaceEmbedder { @@ -23,6 +28,7 @@ func New() *HuggingFaceEmbedder { token: os.Getenv("HUGGING_FACE_HUB_TOKEN"), model: hfDefaultEmbedderModel, httpClient: http.DefaultClient, + name: "huggingface", } } @@ -38,6 +44,15 @@ func (h *HuggingFaceEmbedder) WithModel(model string) *HuggingFaceEmbedder { return h } +func (h *HuggingFaceEmbedder) WithObserver( + observer embobserver.EmbeddingObserver, + traceID string, +) *HuggingFaceEmbedder { + h.observer = observer + h.observerTraceID = traceID + return h +} + // WithHTTPClient sets the http client to use for the LLM func (h *HuggingFaceEmbedder) WithHTTPClient(httpClient *http.Client) *HuggingFaceEmbedder { h.httpClient = httpClient @@ -46,5 +61,39 @@ func (h *HuggingFaceEmbedder) WithHTTPClient(httpClient *http.Client) *HuggingFa // Embed returns the embeddings for the given texts func (h *HuggingFaceEmbedder) Embed(ctx context.Context, texts []string) ([]embedder.Embedding, error) { - return h.featureExtraction(ctx, texts) + var observerEmbedding *observer.Embedding + var err error + + if h.observer != nil { + observerEmbedding, err = embobserver.StartObserveEmbedding( + h.observer, + h.name, + h.model, + nil, + h.observerTraceID, + observer.ContextValueParentID(ctx), + texts, + ) + if err != nil { + return nil, err + } + } + + embeddings, err := h.featureExtraction(ctx, texts) + if err != nil { + return nil, err + } + + if h.observer != nil { + err = embobserver.StopObserveEmbedding( + h.observer, + observerEmbedding, + embeddings, + ) + if err != nil { + return nil, err + } + } + + return embeddings, nil } diff --git a/embedder/nomic/nomic.go b/embedder/nomic/nomic.go index a485e142..cdf03478 100644 --- a/embedder/nomic/nomic.go +++ b/embedder/nomic/nomic.go @@ -5,8 +5,11 @@ import ( "net/http" "os" - "github.com/henomis/lingoose/embedder" "github.com/henomis/restclientgo" + + "github.com/henomis/lingoose/embedder" + embobserver "github.com/henomis/lingoose/embedder/observer" + "github.com/henomis/lingoose/observer" ) const ( @@ -15,9 +18,12 @@ const ( ) type Embedder struct { - taskType TaskType - model Model - restClient *restclientgo.RestClient + taskType TaskType + model Model + restClient *restclientgo.RestClient + name string + observer embobserver.EmbeddingObserver + observerTraceID string } func New() *Embedder { @@ -31,6 +37,7 @@ func New() *Embedder { }, ), model: defaultModel, + name: "nomic", } } @@ -54,10 +61,34 @@ func (e *Embedder) WithModel(model Model) *Embedder { return e } +func (e *Embedder) WithObserver(observer embobserver.EmbeddingObserver, traceID string) *Embedder { + e.observer = observer + e.observerTraceID = traceID + return e +} + // Embed returns the embeddings for the given texts func (e *Embedder) Embed(ctx context.Context, texts []string) ([]embedder.Embedding, error) { + var observerEmbedding *observer.Embedding + var err error + + if e.observer != nil { + observerEmbedding, err = embobserver.StartObserveEmbedding( + e.observer, + e.name, + string(e.model), + nil, + e.observerTraceID, + observer.ContextValueParentID(ctx), + texts, + ) + if err != nil { + return nil, err + } + } + var resp response - err := e.restClient.Post( + err = e.restClient.Post( ctx, &request{ Texts: texts, @@ -70,5 +101,16 @@ func (e *Embedder) Embed(ctx context.Context, texts []string) ([]embedder.Embedd return nil, err } + if e.observer != nil { + err = embobserver.StopObserveEmbedding( + e.observer, + observerEmbedding, + resp.Embeddings, + ) + if err != nil { + return nil, err + } + } + return resp.Embeddings, nil } diff --git a/embedder/observer/observer.go b/embedder/observer/observer.go new file mode 100644 index 00000000..84f55273 --- /dev/null +++ b/embedder/observer/observer.go @@ -0,0 +1,49 @@ +package observer + +import ( + "fmt" + + "github.com/henomis/lingoose/embedder" + obs "github.com/henomis/lingoose/observer" + "github.com/henomis/lingoose/types" +) + +type EmbeddingObserver interface { + Embedding(*obs.Embedding) (*obs.Embedding, error) + EmbeddingEnd(*obs.Embedding) (*obs.Embedding, error) +} + +func StartObserveEmbedding( + o EmbeddingObserver, + name string, + modelName string, + ModelParameters types.M, + traceID string, + parentID string, + texts []string, +) (*obs.Embedding, error) { + embedding, err := o.Embedding( + &obs.Embedding{ + TraceID: traceID, + ParentID: parentID, + Name: fmt.Sprintf("embedding-%s", name), + Model: modelName, + ModelParameters: ModelParameters, + Input: texts, + }, + ) + if err != nil { + return nil, err + } + return embedding, nil +} + +func StopObserveEmbedding( + o EmbeddingObserver, + embedding *obs.Embedding, + embeddings []embedder.Embedding, +) error { + embedding.Output = embeddings + _, err := o.EmbeddingEnd(embedding) + return err +} diff --git a/embedder/ollama/ollama.go b/embedder/ollama/ollama.go index 25d800ac..b90c867a 100644 --- a/embedder/ollama/ollama.go +++ b/embedder/ollama/ollama.go @@ -3,8 +3,11 @@ package ollamaembedder import ( "context" - "github.com/henomis/lingoose/embedder" "github.com/henomis/restclientgo" + + "github.com/henomis/lingoose/embedder" + embobserver "github.com/henomis/lingoose/embedder/observer" + "github.com/henomis/lingoose/observer" ) const ( @@ -13,14 +16,18 @@ const ( ) type Embedder struct { - model string - restClient *restclientgo.RestClient + model string + restClient *restclientgo.RestClient + name string + observer embobserver.EmbeddingObserver + observerTraceID string } func New() *Embedder { return &Embedder{ restClient: restclientgo.New(defaultEndpoint), model: defaultModel, + name: "ollama", } } @@ -34,15 +41,50 @@ func (e *Embedder) WithModel(model string) *Embedder { return e } +func (e *Embedder) WithObserver(observer embobserver.EmbeddingObserver, traceID string) *Embedder { + e.observer = observer + e.observerTraceID = traceID + return e +} + // Embed returns the embeddings for the given texts func (e *Embedder) Embed(ctx context.Context, texts []string) ([]embedder.Embedding, error) { + var observerEmbedding *observer.Embedding + var err error + + if e.observer != nil { + observerEmbedding, err = embobserver.StartObserveEmbedding( + e.observer, + e.name, + e.model, + nil, + e.observerTraceID, + observer.ContextValueParentID(ctx), + texts, + ) + if err != nil { + return nil, err + } + } + embeddings := make([]embedder.Embedding, len(texts)) for i, text := range texts { - embedding, err := e.embed(ctx, text) + embedding, errEmbedd := e.embed(ctx, text) + if errEmbedd != nil { + return nil, errEmbedd + } + embeddings[i] = embedding + } + + if e.observer != nil { + err = embobserver.StopObserveEmbedding( + e.observer, + observerEmbedding, + embeddings, + ) if err != nil { return nil, err } - embeddings[i] = embedding } return embeddings, nil diff --git a/embedder/openai/openai.go b/embedder/openai/openai.go index 32a22c0c..797b4cb4 100644 --- a/embedder/openai/openai.go +++ b/embedder/openai/openai.go @@ -5,6 +5,8 @@ import ( "os" "github.com/henomis/lingoose/embedder" + embobserver "github.com/henomis/lingoose/embedder/observer" + "github.com/henomis/lingoose/observer" "github.com/sashabaranov/go-openai" ) @@ -17,8 +19,11 @@ const ( ) type OpenAIEmbedder struct { - openAIClient *openai.Client - model Model + openAIClient *openai.Client + model Model + Name string + observer embobserver.EmbeddingObserver + observerTraceID string } func New(model Model) *OpenAIEmbedder { @@ -27,6 +32,7 @@ func New(model Model) *OpenAIEmbedder { return &OpenAIEmbedder{ openAIClient: openai.NewClient(openAIKey), model: model, + Name: "openai", } } @@ -36,9 +42,49 @@ func (o *OpenAIEmbedder) WithClient(client *openai.Client) *OpenAIEmbedder { return o } +func (o *OpenAIEmbedder) WithObserver(observer embobserver.EmbeddingObserver, traceID string) *OpenAIEmbedder { + o.observer = observer + o.observerTraceID = traceID + return o +} + // Embed returns the embeddings for the given texts func (o *OpenAIEmbedder) Embed(ctx context.Context, texts []string) ([]embedder.Embedding, error) { - return o.openAICreateEmebeddings(ctx, texts) + var observerEmbedding *observer.Embedding + var err error + + if o.observer != nil { + observerEmbedding, err = embobserver.StartObserveEmbedding( + o.observer, + o.Name, + string(o.model), + nil, + o.observerTraceID, + observer.ContextValueParentID(ctx), + texts, + ) + if err != nil { + return nil, err + } + } + + embeddings, err := o.openAICreateEmebeddings(ctx, texts) + if err != nil { + return nil, err + } + + if o.observer != nil { + err = embobserver.StopObserveEmbedding( + o.observer, + observerEmbedding, + embeddings, + ) + if err != nil { + return nil, err + } + } + + return embeddings, nil } func (o *OpenAIEmbedder) openAICreateEmebeddings(ctx context.Context, texts []string) ([]embedder.Embedding, error) { diff --git a/embedder/voyage/voyage.go b/embedder/voyage/voyage.go index d1f3f7c6..a5ed1c5c 100644 --- a/embedder/voyage/voyage.go +++ b/embedder/voyage/voyage.go @@ -5,8 +5,11 @@ import ( "net/http" "os" - "github.com/henomis/lingoose/embedder" "github.com/henomis/restclientgo" + + "github.com/henomis/lingoose/embedder" + embobserver "github.com/henomis/lingoose/embedder/observer" + "github.com/henomis/lingoose/observer" ) const ( @@ -15,8 +18,11 @@ const ( ) type Embedder struct { - model string - restClient *restclientgo.RestClient + model string + restClient *restclientgo.RestClient + name string + observer embobserver.EmbeddingObserver + observerTraceID string } func New() *Embedder { @@ -29,6 +35,7 @@ func New() *Embedder { return req }), model: defaultModel, + name: "voyage", } } @@ -37,9 +44,49 @@ func (e *Embedder) WithModel(model string) *Embedder { return e } +func (e *Embedder) WithObserver(observer embobserver.EmbeddingObserver, traceID string) *Embedder { + e.observer = observer + e.observerTraceID = traceID + return e +} + // Embed returns the embeddings for the given texts func (e *Embedder) Embed(ctx context.Context, texts []string) ([]embedder.Embedding, error) { - return e.embed(ctx, texts) + var observerEmbedding *observer.Embedding + var err error + + if e.observer != nil { + observerEmbedding, err = embobserver.StartObserveEmbedding( + e.observer, + e.name, + e.model, + nil, + e.observerTraceID, + observer.ContextValueParentID(ctx), + texts, + ) + if err != nil { + return nil, err + } + } + + embeddings, err := e.embed(ctx, texts) + if err != nil { + return nil, err + } + + if e.observer != nil { + err = embobserver.StopObserveEmbedding( + e.observer, + observerEmbedding, + embeddings, + ) + if err != nil { + return nil, err + } + } + + return embeddings, nil } // Embed returns the embeddings for the given texts diff --git a/observer/langfuse/formatter.go b/observer/langfuse/formatter.go index aef81725..7f76baa9 100644 --- a/observer/langfuse/formatter.go +++ b/observer/langfuse/formatter.go @@ -82,6 +82,20 @@ func observerGenerationToLangfuseGeneration(g *observer.Generation) *model.Gener } } +func observerEmbeddingToLangfuseGeneration(e *observer.Embedding) *model.Generation { + return &model.Generation{ + ID: e.ID, + TraceID: e.TraceID, + Name: e.Name, + ParentObservationID: e.ParentID, + Model: e.Model, + ModelParameters: e.ModelParameters, + Input: e.Input, + Output: e.Output, + Metadata: e.Metadata, + } +} + func observerEventToLangfuseEvent(e *observer.Event) *model.Event { return &model.Event{ ID: e.ID, diff --git a/observer/langfuse/langfuse.go b/observer/langfuse/langfuse.go index 093cecd8..0c076be1 100644 --- a/observer/langfuse/langfuse.go +++ b/observer/langfuse/langfuse.go @@ -69,6 +69,25 @@ func (l *Langfuse) GenerationEnd(g *observer.Generation) (*observer.Generation, return g, nil } +func (l *Langfuse) Embedding(e *observer.Embedding) (*observer.Embedding, error) { + langfuseGeneration := observerEmbeddingToLangfuseGeneration(e) + langfuseGeneration, err := l.client.Generation(langfuseGeneration, nil) + if err != nil { + return nil, err + } + e.ID = langfuseGeneration.ID + return e, nil +} + +func (l *Langfuse) EmbeddingEnd(e *observer.Embedding) (*observer.Embedding, error) { + langfuseGeneration := observerEmbeddingToLangfuseGeneration(e) + _, err := l.client.Generation(langfuseGeneration, nil) + if err != nil { + return nil, err + } + return e, nil +} + func (l *Langfuse) Event(e *observer.Event) (*observer.Event, error) { langfuseEvent := observerEventToLangfuseEvent(e) langfuseEvent, err := l.client.Event(langfuseEvent, nil) diff --git a/observer/observer.go b/observer/observer.go index 4f63d2aa..06a9a157 100644 --- a/observer/observer.go +++ b/observer/observer.go @@ -3,6 +3,7 @@ package observer import ( "context" + "github.com/henomis/lingoose/embedder" "github.com/henomis/lingoose/thread" "github.com/henomis/lingoose/types" ) @@ -38,6 +39,18 @@ type Generation struct { Metadata types.M } +type Embedding struct { + ID string + ParentID string + TraceID string + Name string + Model string + ModelParameters types.M + Input []string + Output []embedder.Embedding + Metadata types.M +} + type Event struct { ID string ParentID string