Skip to content

Commit

Permalink
remove SpanAttributes and refactor streamwrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
alizenhom committed Sep 5, 2024
1 parent b8dde6c commit ec3c320
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@
---
"""

import importlib.metadata

from typing import Collection

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.openai.package import _instruments
from opentelemetry.trace import get_tracer
from wrapt import wrap_function_wrapper as _W
from wrapt import wrap_function_wrapper
from .patch import chat_completions_create


Expand All @@ -59,7 +59,7 @@ def _instrument(self, **kwargs):
"""Enable OpenAI instrumentation."""
tracer_provider = kwargs.get("tracer_provider")
tracer = get_tracer(__name__, "", tracer_provider)
_W(
wrap_function_wrapper(
module="openai.resources.chat.completions",
name="Completions.create",
wrapper=chat_completions_create(tracer),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
from opentelemetry.trace import SpanKind, Span
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.trace.propagation import set_span_in_context

from .span_attributes import LLMSpanAttributes, SpanAttributes
from .span_attributes import LLMSpanAttributes
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
Expand Down Expand Up @@ -94,7 +93,9 @@ def _set_response_attributes(span, result):
set_span_attribute(
span, GenAIAttributes.GEN_AI_RESPONSE_MODEL, result.model
)
print(result)
if getattr(result, "choices", None):
choices = result.choices
responses = [
{
"role": (
Expand All @@ -113,16 +114,19 @@ def _set_response_attributes(span, result):
else {}
),
}
for choice in result.choices
for choice in choices
]
for choice in choices:
if choice.finish_reason:
set_span_attribute(
span,
GenAIAttributes.GEN_AI_RESPONSE_FINISH_REASONS,
choice.finish_reason,
)
set_event_completion(span, responses)

if getattr(result, "system_fingerprint", None):
set_span_attribute(
span,
SpanAttributes.LLM_SYSTEM_FINGERPRINT,
result.system_fingerprint,
)
if getattr(result, "id", None):
set_span_attribute(span, GenAIAttributes.GEN_AI_RESPONSE_ID, result.id)

# Get the usage
if getattr(result, "usage", None):
Expand All @@ -145,6 +149,8 @@ def _set_response_attributes(span, result):

class StreamWrapper:
span: Span
response_id: str = ""
response_model: str = ""

def __init__(
self,
Expand All @@ -170,6 +176,20 @@ def setup(self):

def cleanup(self):
if self._span_started:
if self.response_model:
set_span_attribute(
self.span,
GenAIAttributes.GEN_AI_RESPONSE_MODEL,
self.response_model,
)

if self.response_id:
set_span_attribute(
self.span,
GenAIAttributes.GEN_AI_RESPONSE_ID,
self.response_id,
)

set_span_attribute(
self.span,
GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS,
Expand Down Expand Up @@ -218,50 +238,71 @@ def __next__(self):
self.cleanup()
raise

def process_chunk(self, chunk):
if getattr(chunk, "model", None):
set_span_attribute(
self.span,
GenAIAttributes.GEN_AI_RESPONSE_MODEL,
chunk.model,
)
def set_response_model(self, chunk):
if self.response_model:
return

if getattr(chunk, "choices", None):
content = []
if not self.function_call and not self.tool_calls:
for choice in chunk.choices:
if choice.delta and choice.delta.content is not None:
content = [choice.delta.content]
elif self.function_call:
for choice in chunk.choices:
if (
choice.delta
and choice.delta.function_call is not None
and choice.delta.function_call.arguments is not None
):
content = [choice.delta.function_call.arguments]
elif self.tool_calls:
for choice in chunk.choices:
if choice.delta and choice.delta.tool_calls is not None:
toolcalls = choice.delta.tool_calls
content = []
for tool_call in toolcalls:
if (
tool_call
and tool_call.function is not None
and tool_call.function.arguments is not None
):
content.append(tool_call.function.arguments)

if content:
self.result_content.append(content[0])

if getattr(chunk, "text", None):
content = [chunk.text]

if content:
self.result_content.append(content[0])
if getattr(chunk, "model", None):
self.response_model = chunk.model

def set_response_id(self, chunk):
if self.response_id:
return

if getattr(chunk, "id", None):
self.response_id = chunk.id

def build_streaming_response(self, chunk):
if getattr(chunk, "choices", None) is None:
return

choices = chunk.choices
content = []
if not self.function_call and not self.tool_calls:
for choice in choices:
if choice.delta and choice.delta.content is not None:
content = [choice.delta.content]

elif self.function_call:
for choice in choices:
if (
choice.delta
and choice.delta.function_call is not None
and choice.delta.function_call.arguments is not None
):
content = [choice.delta.function_call.arguments]

elif self.tool_calls:
for choice in choices:
if choice.delta and choice.delta.tool_calls is not None:
toolcalls = choice.delta.tool_calls
content = []
for tool_call in toolcalls:
if (
tool_call
and tool_call.function is not None
and tool_call.function.arguments is not None
):
content.append(tool_call.function.arguments)

for choice in choices:
finish_reason = choice.finish_reason
if finish_reason:
set_span_attribute(
self.span,
GenAIAttributes.GEN_AI_RESPONSE_FINISH_REASONS,
finish_reason,
)
if content:
self.result_content.append(content[0])

def set_usage(self, chunk):
if getattr(chunk, "usage", None):
self.completion_tokens = chunk.usage.completion_tokens
self.prompt_tokens = chunk.usage.prompt_tokens

def process_chunk(self, chunk):
self.set_response_id(chunk)
self.set_response_model(chunk)
self.build_streaming_response(chunk)
self.set_usage(chunk)
Original file line number Diff line number Diff line change
Expand Up @@ -13,64 +13,10 @@
# limitations under the License.

from __future__ import annotations
from enum import Enum
from typing import Dict, List, Optional
from pydantic import BaseModel, ConfigDict, Field


class SpanAttributes:

LLM_SYSTEM_FINGERPRINT = "gen_ai.system_fingerprint"
LLM_REQUEST_DOCUMENTS = "gen_ai.request.documents"
LLM_REQUEST_SEARCH_REQUIRED = "gen_ai.request.is_search_required"
LLM_PROMPTS = "gen_ai.prompt"
LLM_CONTENT_PROMPT = "gen_ai.content.prompt"
LLM_COMPLETIONS = "gen_ai.completion"
LLM_CONTENT_COMPLETION = "gen_ai.content.completion"
LLM_RESPONSE_MODEL = "gen_ai.response.model"
LLM_USAGE_COMPLETION_TOKENS = "gen_ai.usage.output_tokens"
LLM_USAGE_PROMPT_TOKENS = "gen_ai.usage.input_tokens"
LLM_USAGE_TOTAL_TOKENS = "gen_ai.request.total_tokens"
LLM_USAGE_TOKEN_TYPE = "gen_ai.usage.token_type"
LLM_USAGE_SEARCH_UNITS = "gen_ai.usage.search_units"
LLM_GENERATION_ID = "gen_ai.generation_id"
LLM_TOKEN_TYPE = "gen_ai.token.type"
LLM_RESPONSE_ID = "gen_ai.response_id"
LLM_URL = "url.full"
LLM_PATH = "url.path"
LLM_RESPONSE_FORMAT = "gen_ai.response.format"
LLM_IMAGE_SIZE = "gen_ai.image.size"
LLM_REQUEST_ENCODING_FORMATS = "gen_ai.request.encoding_formats"
LLM_REQUEST_DIMENSIONS = "gen_ai.request.dimensions"
LLM_REQUEST_SEED = "gen_ai.request.seed"
LLM_REQUEST_TOP_LOGPROPS = "gen_ai.request.top_props"
LLM_REQUEST_LOGPROPS = "gen_ai.request.log_props"
LLM_REQUEST_LOGITBIAS = "gen_ai.request.logit_bias"
LLM_REQUEST_TYPE = "gen_ai.request.type"
LLM_HEADERS = "gen_ai.headers"
LLM_USER = "gen_ai.user"
LLM_TOOLS = "gen_ai.request.tools"
LLM_TOOL_CHOICE = "gen_ai.request.tool_choice"
LLM_TOOL_RESULTS = "gen_ai.request.tool_results"
LLM_TOP_K = "gen_ai.request.top_k"
LLM_IS_STREAMING = "gen_ai.request.stream"
LLM_FREQUENCY_PENALTY = "gen_ai.request.frequency_penalty"
LLM_PRESENCE_PENALTY = "gen_ai.request.presence_penalty"
LLM_CHAT_STOP_SEQUENCES = "gen_ai.chat.stop_sequences"
LLM_REQUEST_FUNCTIONS = "gen_ai.request.functions"
LLM_REQUEST_REPETITION_PENALTY = "gen_ai.request.repetition_penalty"
LLM_RESPONSE_FINISH_REASON = "gen_ai.response.finish_reasons"
LLM_RESPONSE_STOP_REASON = "gen_ai.response.stop_reason"
LLM_CONTENT_COMPLETION_CHUNK = "gen_ai.completion.chunk"


class Event(Enum):
STREAM_START = "stream.start"
STREAM_OUTPUT = "stream.output"
STREAM_END = "stream.end"
RESPONSE = "response"


class LLMSpanAttributes(BaseModel):
model_config = ConfigDict(extra="allow")
gen_ai_operation_name: str = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import logging
from typing import Optional, Union
from openai import NOT_GIVEN
from .span_attributes import SpanAttributes
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAIAttributes,
)
Expand Down Expand Up @@ -164,7 +163,6 @@ def get_llm_request_attributes(
)

top_p = kwargs.get("p") or kwargs.get("top_p")
tools = kwargs.get("tools")

return {
GenAIAttributes.GEN_AI_OPERATION_NAME: operation_name,
Expand All @@ -183,14 +181,4 @@ def get_llm_request_attributes(
GenAIAttributes.GEN_AI_REQUEST_FREQUENCY_PENALTY: kwargs.get(
"frequency_penalty"
),
SpanAttributes.LLM_SYSTEM_FINGERPRINT: kwargs.get(
"system_fingerprint"
),
SpanAttributes.LLM_IS_STREAMING: kwargs.get("stream"),
SpanAttributes.LLM_USER: user,
SpanAttributes.LLM_TOOLS: json.dumps(tools) if tools else None,
SpanAttributes.LLM_TOOL_CHOICE: kwargs.get("tool_choice"),
SpanAttributes.LLM_REQUEST_LOGPROPS: kwargs.get("logprobs"),
SpanAttributes.LLM_REQUEST_LOGITBIAS: kwargs.get("logit_bias"),
SpanAttributes.LLM_REQUEST_TOP_LOGPROPS: kwargs.get("top_logprobs"),
}

0 comments on commit ec3c320

Please sign in to comment.