diff --git a/CHANGELOG.md b/CHANGELOG.md index 14eadca65..3dc606761 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,11 +3,20 @@ ## Unreleased ### Breaking Changes -... + - We removed the `trace_id` as a concept from various tracing-related functions and moved them to a `context`. If you did not directly use the `trace_id` there is nothing to change. + - `Task.run` no longer takes a trace id. This was an largely unused feature and we revamped the trace ids for the traces. + - Creating `Span`, `TaskSpan` or logs no longer takes `trace_id`. This is handled by the spans themselves, who now have a `context` that identifies them. + - `Span.id` is therefore also removed. This can be accessed by `span.context.trace_id`, but has a different type. + - The `OpenTelemetryTracer` no longer logs a custom `trace_id` into the attributes. Use the existing ids from its context instead. + - Accessing a single trace from a `PersistentTracer.trace()` is no longer supported, as the user does not have access to the `trace_id` anyway. The function is now called `traces` and returns all available traces for a tracer. + - `InMemoryTracer` and derivatives are no longer `pydantic.BaseModel`. Use the `export_for_viewing` function to export a serializable representation of the trace. ### New Features - Add `how_to_implement_incremental_evaluation`. - Improve README.md + - Add `export_for_viewing` to tracers to be able to export traces in a unified format similar to opentelemetry. + - This is not supported for the `OpenTelemetryTracer` because of technical incompatibilities. + - All exported spans now contain the status of the span. ### Fixes - The document index client now correctly URL-encodes document names in its queries. diff --git a/src/intelligence_layer/core/__init__.py b/src/intelligence_layer/core/__init__.py index 02d25369a..5c5d72d26 100644 --- a/src/intelligence_layer/core/__init__.py +++ b/src/intelligence_layer/core/__init__.py @@ -44,23 +44,27 @@ from .tracer.in_memory_tracer import InMemorySpan as InMemorySpan from .tracer.in_memory_tracer import InMemoryTaskSpan as InMemoryTaskSpan from .tracer.in_memory_tracer import InMemoryTracer as InMemoryTracer +from .tracer.in_memory_tracer import LogEntry as LogEntry from .tracer.open_telemetry_tracer import OpenTelemetryTracer as OpenTelemetryTracer +from .tracer.persistent_tracer import EndSpan as EndSpan +from .tracer.persistent_tracer import EndTask as EndTask +from .tracer.persistent_tracer import LogLine as LogLine from .tracer.persistent_tracer import PersistentSpan as PersistentSpan from .tracer.persistent_tracer import PersistentTaskSpan as PersistentTaskSpan from .tracer.persistent_tracer import PersistentTracer as PersistentTracer +from .tracer.persistent_tracer import PlainEntry as PlainEntry +from .tracer.persistent_tracer import StartSpan as StartSpan +from .tracer.persistent_tracer import StartTask as StartTask from .tracer.persistent_tracer import TracerLogEntryFailed as TracerLogEntryFailed -from .tracer.tracer import EndSpan as EndSpan -from .tracer.tracer import EndTask as EndTask +from .tracer.tracer import ErrorValue as ErrorValue from .tracer.tracer import JsonSerializer as JsonSerializer -from .tracer.tracer import LogEntry as LogEntry -from .tracer.tracer import LogLine as LogLine from .tracer.tracer import NoOpTracer as NoOpTracer -from .tracer.tracer import PlainEntry as PlainEntry from .tracer.tracer import PydanticSerializable as PydanticSerializable from .tracer.tracer import Span as Span -from .tracer.tracer import StartSpan as StartSpan -from .tracer.tracer import StartTask as StartTask +from .tracer.tracer import SpanStatus as SpanStatus +from .tracer.tracer import SpanType as SpanType from .tracer.tracer import TaskSpan as TaskSpan +from .tracer.tracer import TaskSpanAttributes as TaskSpanAttributes from .tracer.tracer import Tracer as Tracer from .tracer.tracer import utc_now as utc_now diff --git a/src/intelligence_layer/core/task.py b/src/intelligence_layer/core/task.py index 885079187..d63d4d2a2 100644 --- a/src/intelligence_layer/core/task.py +++ b/src/intelligence_layer/core/task.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from concurrent.futures import ThreadPoolExecutor -from typing import Generic, Iterable, Optional, Sequence, TypeVar, final +from typing import Generic, Iterable, Sequence, TypeVar, final from pydantic import BaseModel @@ -58,9 +58,7 @@ def do_run(self, input: Input, task_span: TaskSpan) -> Output: ... @final - def run( - self, input: Input, tracer: Tracer, trace_id: Optional[str] = None - ) -> Output: + def run(self, input: Input, tracer: Tracer) -> Output: """Executes the implementation of `do_run` for this use case. This takes an input and runs the implementation to generate an output. @@ -70,13 +68,10 @@ def run( Args: input: Generic input defined by the task implementation tracer: The `Tracer` used for tracing. - trace_id: An optional id of the run, used to track its trace. Returns: Generic output defined by the task implementation. """ - with tracer.task_span( - type(self).__name__, input, trace_id=trace_id - ) as task_span: + with tracer.task_span(type(self).__name__, input) as task_span: output = self.do_run(input, task_span) task_span.record_output(output) return output @@ -87,7 +82,6 @@ def run_concurrently( inputs: Iterable[Input], tracer: Tracer, concurrency_limit: int = MAX_CONCURRENCY, - trace_id: Optional[str] = None, ) -> Sequence[Output]: """Executes multiple processes of this task concurrently. @@ -100,16 +94,13 @@ def run_concurrently( concurrency_limit: An optional additional limit for the number of concurrently executed task for this method call. This can be used to prevent queue-full or similar error of downstream APIs when the global concurrency limit is too high for a certain task. - trace_id: An optional id of the run, used to track its trace. Returns: The Outputs generated by calling `run` for each given Input. The order of Outputs corresponds to the order of the Inputs. """ - with tracer.span( - f"Concurrent {type(self).__name__} tasks", trace_id=trace_id - ) as span: + with tracer.span(f"Concurrent {type(self).__name__} tasks") as span: with ThreadPoolExecutor( max_workers=min(concurrency_limit, MAX_CONCURRENCY) ) as executor: diff --git a/src/intelligence_layer/core/tracer/composite_tracer.py b/src/intelligence_layer/core/tracer/composite_tracer.py index 02face451..5f1e44c80 100644 --- a/src/intelligence_layer/core/tracer/composite_tracer.py +++ b/src/intelligence_layer/core/tracer/composite_tracer.py @@ -1,16 +1,21 @@ from datetime import datetime -from typing import Generic, Optional, Sequence +from typing import Generic, Optional, Sequence, TypeVar from intelligence_layer.core.tracer.tracer import ( + Context, + ExportedSpan, PydanticSerializable, Span, - SpanVar, + SpanStatus, TaskSpan, Tracer, - TracerVar, utc_now, ) +TracerVar = TypeVar("TracerVar", bound=Tracer) + +SpanVar = TypeVar("SpanVar", bound=Span) + class CompositeTracer(Tracer, Generic[TracerVar]): """A :class:`Tracer` that allows for recording to multiple tracers simultaneously. @@ -34,35 +39,32 @@ class CompositeTracer(Tracer, Generic[TracerVar]): def __init__(self, tracers: Sequence[TracerVar]) -> None: assert len(tracers) > 0 self.tracers = tracers + self.context = tracers[0].context def span( self, name: str, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "CompositeSpan[Span]": timestamp = timestamp or utc_now() - trace_id = self.ensure_id(trace_id) - return CompositeSpan( - [tracer.span(name, timestamp, trace_id) for tracer in self.tracers] - ) + return CompositeSpan([tracer.span(name, timestamp) for tracer in self.tracers]) def task_span( self, task_name: str, input: PydanticSerializable, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "CompositeTaskSpan": timestamp = timestamp or utc_now() - trace_id = self.ensure_id(trace_id) return CompositeTaskSpan( - [ - tracer.task_span(task_name, input, timestamp, trace_id) - for tracer in self.tracers - ] + [tracer.task_span(task_name, input, timestamp) for tracer in self.tracers] ) + def export_for_viewing(self) -> Sequence[ExportedSpan]: + if len(self.tracers) > 0: + return self.tracers[0].export_for_viewing() + return [] + class CompositeSpan(Generic[SpanVar], CompositeTracer[SpanVar], Span): """A :class:`Span` that allows for recording to multiple spans simultaneously. @@ -73,8 +75,11 @@ class CompositeSpan(Generic[SpanVar], CompositeTracer[SpanVar], Span): tracers: spans that will be forwarded all subsequent log and span calls. """ - def id(self) -> str: - return self.tracers[0].id() + def __init__( + self, tracers: Sequence[SpanVar], context: Optional[Context] = None + ) -> None: + CompositeTracer.__init__(self, tracers) + Span.__init__(self, context=context) def log( self, @@ -91,6 +96,20 @@ def end(self, timestamp: Optional[datetime] = None) -> None: for tracer in self.tracers: tracer.end(timestamp) + @property + def status_code(self) -> SpanStatus: + status_codes = {tracer.status_code for tracer in self.tracers} + if len(status_codes) > 1: + raise ValueError( + "Inconsistent status of traces in composite tracer. Status of all traces should be the same but they are different." + ) + return next(iter(status_codes)) + + @status_code.setter + def status_code(self, span_status: SpanStatus) -> None: + for tracer in self.tracers: + tracer.status_code = span_status + class CompositeTaskSpan(CompositeSpan[TaskSpan], TaskSpan): """A :class:`TaskSpan` that allows for recording to multiple TaskSpans simultaneously. diff --git a/src/intelligence_layer/core/tracer/file_tracer.py b/src/intelligence_layer/core/tracer/file_tracer.py index d320134e0..5c7813caa 100644 --- a/src/intelligence_layer/core/tracer/file_tracer.py +++ b/src/intelligence_layer/core/tracer/file_tracer.py @@ -2,16 +2,18 @@ from json import loads from pathlib import Path from typing import Optional +from uuid import UUID from pydantic import BaseModel from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer from intelligence_layer.core.tracer.persistent_tracer import ( + LogLine, PersistentSpan, PersistentTaskSpan, PersistentTracer, ) -from intelligence_layer.core.tracer.tracer import LogLine, PydanticSerializable +from intelligence_layer.core.tracer.tracer import Context, PydanticSerializable class FileTracer(PersistentTracer): @@ -34,7 +36,7 @@ def __init__(self, log_file_path: Path | str) -> None: super().__init__() self._log_file_path = Path(log_file_path) - def _log_entry(self, id: str, entry: BaseModel) -> None: + def _log_entry(self, id: UUID, entry: BaseModel) -> None: self._log_file_path.parent.mkdir(parents=True, exist_ok=True) with self._log_file_path.open(mode="a", encoding="utf-8") as f: f.write( @@ -48,9 +50,8 @@ def span( self, name: str, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "FileSpan": - span = FileSpan(self._log_file_path, trace_id=self.ensure_id(trace_id)) + span = FileSpan(self._log_file_path, context=self.context) self._log_span(span, name, timestamp) return span @@ -59,16 +60,15 @@ def task_span( task_name: str, input: PydanticSerializable, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "FileTaskSpan": task = FileTaskSpan( self._log_file_path, - trace_id=self.ensure_id(trace_id), + context=self.context, ) self._log_task(task, task_name, input, timestamp) return task - def trace(self, trace_id: Optional[str] = None) -> InMemoryTracer: + def traces(self, trace_id: Optional[str] = None) -> InMemoryTracer: with self._log_file_path.open("r") as f: traces = (LogLine.model_validate(loads(line)) for line in f) filtered_traces = ( @@ -82,20 +82,12 @@ def trace(self, trace_id: Optional[str] = None) -> InMemoryTracer: class FileSpan(PersistentSpan, FileTracer): """A `Span` created by `FileTracer.span`.""" - def id(self) -> str: - return self.trace_id - - def __init__(self, log_file_path: Path, trace_id: str) -> None: - super().__init__(log_file_path) - self.trace_id = trace_id + def __init__(self, log_file_path: Path, context: Optional[Context] = None) -> None: + PersistentSpan.__init__(self, context=context) + FileTracer.__init__(self, log_file_path=log_file_path) class FileTaskSpan(PersistentTaskSpan, FileSpan): """A `TaskSpan` created by `FileTracer.task_span`.""" - def __init__( - self, - log_file_path: Path, - trace_id: str, - ) -> None: - super().__init__(log_file_path, trace_id) + pass diff --git a/src/intelligence_layer/core/tracer/in_memory_tracer.py b/src/intelligence_layer/core/tracer/in_memory_tracer.py index fbfda4b23..534b1b440 100644 --- a/src/intelligence_layer/core/tracer/in_memory_tracer.py +++ b/src/intelligence_layer/core/tracer/in_memory_tracer.py @@ -1,56 +1,52 @@ -import json import os from datetime import datetime -from typing import Optional, Union +from typing import Optional, Sequence, Union from uuid import UUID import requests import rich from pydantic import BaseModel, Field, SerializeAsAny from requests import HTTPError +from rich.panel import Panel +from rich.syntax import Syntax from rich.tree import Tree from intelligence_layer.core.tracer.tracer import ( - EndSpan, - EndTask, - LogEntry, - LogLine, - PlainEntry, + Context, + Event, + ExportedSpan, + ExportedSpanList, + JsonSerializer, PydanticSerializable, Span, - StartSpan, - StartTask, + SpanAttributes, TaskSpan, + TaskSpanAttributes, Tracer, - _render_log_value, utc_now, ) -class InMemoryTracer(BaseModel, Tracer): +class InMemoryTracer(Tracer): """Collects log entries in a nested structure, and keeps them in memory. - If desired, the structure is serializable with Pydantic, so you can write out the JSON - representation to a file, or return via an API, or something similar. - Attributes: - name: A descriptive name of what the tracer contains log entries about. entries: A sequential list of log entries and/or nested InMemoryTracers with their own log entries. """ - entries: list[Union[LogEntry, "InMemoryTaskSpan", "InMemorySpan"]] = [] + def __init__(self) -> None: + self.entries: list[Union[LogEntry, "InMemoryTaskSpan", "InMemorySpan"]] = [] def span( self, name: str, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "InMemorySpan": child = InMemorySpan( name=name, start_timestamp=timestamp or utc_now(), - trace_id=self.ensure_id(trace_id), + context=self.context, ) self.entries.append(child) return child @@ -60,13 +56,12 @@ def task_span( task_name: str, input: PydanticSerializable, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "InMemoryTaskSpan": child = InMemoryTaskSpan( name=task_name, input=input, start_timestamp=timestamp or utc_now(), - trace_id=self.ensure_id(trace_id), + context=self.context, ) self.entries.append(child) return child @@ -92,7 +87,8 @@ def submit_to_trace_viewer(self) -> bool: trace_viewer_trace_upload = f"{trace_viewer_url}/trace" try: res = requests.post( - trace_viewer_trace_upload, json=json.loads(self.model_dump_json()) + trace_viewer_trace_upload, + json=ExportedSpanList(self.export_for_viewing()).model_dump_json(), ) if res.status_code != 200: raise HTTPError(res.status_code) @@ -106,15 +102,51 @@ def submit_to_trace_viewer(self) -> bool: ) return False + def export_for_viewing(self) -> Sequence[ExportedSpan]: + exported_root_spans: list[ExportedSpan] = [] + for entry in self.entries: + if isinstance(entry, LogEntry): + raise Exception( + "Found a log outside of a span. Logs can only be part of a span." + ) + else: + exported_root_spans.extend(entry.export_for_viewing()) + return exported_root_spans + class InMemorySpan(InMemoryTracer, Span): - name: str - start_timestamp: datetime = Field(default_factory=datetime.utcnow) - end_timestamp: Optional[datetime] = None - trace_id: str + """A span that keeps all important information in memory. + + Attributes: + context: Ids that uniquely describe the span. + parent_id: Id of the parent span. None if the span is a root span. + name: The name of the span. + start_timestamp: The start of the timestamp. + end_timestamp: The end of the timestamp. None until the span is closed. + status_code: The status of the context. + """ - def id(self) -> str: - return self.trace_id + def __init__( + self, + name: str, + context: Optional[Context] = None, + start_timestamp: Optional[datetime] = None, + ) -> None: + """Initializes a span and sets all necessary attributes. + + Args: + name: The name of the span. + context: The parent context. Used to derive the span's context. Defaults to None. + start_timestamp: Custom start time of the span. Defaults to None. + """ + InMemoryTracer.__init__(self) + Span.__init__(self, context=context) + self.parent_id = None if context is None else context.span_id + self.name = name + self.start_timestamp = ( + start_timestamp if start_timestamp is not None else utc_now() + ) + self.end_timestamp: datetime | None = None def log( self, @@ -127,13 +159,14 @@ def log( message=message, value=value, timestamp=timestamp or utc_now(), - trace_id=self.id(), + trace_id=self.context.span_id, ) ) def end(self, timestamp: Optional[datetime] = None) -> None: if not self.end_timestamp: self.end_timestamp = timestamp or utc_now() + super().end(timestamp) def _rich_render_(self) -> Tree: """Renders the trace via classes in the `rich` package""" @@ -144,14 +177,85 @@ def _rich_render_(self) -> Tree: return tree + def _span_attributes(self) -> SpanAttributes: + return SpanAttributes() + + def export_for_viewing(self) -> Sequence[ExportedSpan]: + if not self._closed: + raise RuntimeError( + "Span is not closed. A Span must be closed before it is exported for viewing." + ) + assert self.end_timestamp is not None + + logs: list[LogEntry] = [] + exported_spans: list[ExportedSpan] = [] + for entry in self.entries: + if isinstance(entry, LogEntry): + logs.append(entry) + else: + exported_spans.extend(entry.export_for_viewing()) + exported_spans.append( + ExportedSpan( + context=self.context, + name=self.name, + parent_id=self.parent_id, + start_time=self.start_timestamp, + end_time=self.end_timestamp, + attributes=self._span_attributes(), + events=[ + Event( + name="log", + body=log.value, + message=log.message, + timestamp=log.timestamp, + ) + for log in logs + ], + status=self.status_code, + ) + ) + return exported_spans + class InMemoryTaskSpan(InMemorySpan, TaskSpan): - input: SerializeAsAny[PydanticSerializable] - output: Optional[SerializeAsAny[PydanticSerializable]] = None + """A span of a task that keeps all important information in memory. + + Attributes: + context: Ids that uniquely describe the span. + parent_id: Id of the parent span. None if the span is a root span. + name: The name of the span. + start_timestamp: The start of the timestamp. + end_timestamp: The end of the timestamp. None until the span is closed. + status_code: The status of the context. + input: The input of the task. + output: The output of the task. + """ + + def __init__( + self, + name: str, + input: SerializeAsAny[PydanticSerializable], + context: Optional[Context] = None, + start_timestamp: Optional[datetime] = None, + ) -> None: + """Initializes a task span and sets all necessary attributes. + + Args: + name: The name of the span. + input: The input of a task. Needs to be serializable. + context: The parent context. Used to derive the span's context. Defaults to None. + start_timestamp: Custom start time of the span. Defaults to None. + """ + super().__init__(name=name, context=context, start_timestamp=start_timestamp) + self.input = input + self.output: SerializeAsAny[PydanticSerializable] | None = None def record_output(self, output: PydanticSerializable) -> None: self.output = output + def _span_attributes(self) -> SpanAttributes: + return TaskSpanAttributes(input=self.input, output=self.output) + def _rich_render_(self) -> Tree: """Renders the trace via classes in the `rich` package""" tree = Tree(label=self.name) @@ -166,57 +270,42 @@ def _rich_render_(self) -> Tree: return tree -class TreeBuilder(BaseModel): - root: InMemoryTracer = InMemoryTracer() - tracers: dict[UUID, InMemoryTracer] = Field(default_factory=dict) - tasks: dict[UUID, InMemoryTaskSpan] = Field(default_factory=dict) - spans: dict[UUID, InMemorySpan] = Field(default_factory=dict) +class LogEntry(BaseModel): + """An individual log entry, currently used to represent individual logs by the + `InMemoryTracer`. - def start_task(self, log_line: LogLine) -> None: - start_task = StartTask.model_validate(log_line.entry) - child = InMemoryTaskSpan( - name=start_task.name, - input=start_task.input, - start_timestamp=start_task.start, - trace_id=start_task.trace_id, - ) - self.tracers[start_task.uuid] = child - self.tasks[start_task.uuid] = child - self.tracers.get(start_task.parent, self.root).entries.append(child) - - def end_task(self, log_line: LogLine) -> None: - end_task = EndTask.model_validate(log_line.entry) - task_span = self.tasks[end_task.uuid] - task_span.end_timestamp = end_task.end - task_span.record_output(end_task.output) - - def start_span(self, log_line: LogLine) -> None: - start_span = StartSpan.model_validate(log_line.entry) - child = InMemorySpan( - name=start_span.name, - start_timestamp=start_span.start, - trace_id=start_span.trace_id, - ) - self.tracers[start_span.uuid] = child - self.spans[start_span.uuid] = child - self.tracers.get(start_span.parent, self.root).entries.append(child) - - def end_span(self, log_line: LogLine) -> None: - end_span = EndSpan.model_validate(log_line.entry) - span = self.spans[end_span.uuid] - span.end_timestamp = end_span.end - - def plain_entry(self, log_line: LogLine) -> None: - plain_entry = PlainEntry.model_validate(log_line.entry) - entry = LogEntry( - message=plain_entry.message, - value=plain_entry.value, - timestamp=plain_entry.timestamp, - trace_id=plain_entry.trace_id, - ) - self.tracers[plain_entry.parent].entries.append(entry) + Attributes: + message: A description of the value you are logging, such as the step in the task this + is related to. + value: The relevant data you want to log. Can be anything that is serializable by + Pydantic, which gives the tracers flexibility in how they store and emit the logs. + timestamp: The time that the log was emitted. + id: The ID of the trace to which this log entry belongs. + """ + + message: str + value: SerializeAsAny[PydanticSerializable] + timestamp: datetime = Field(default_factory=datetime.utcnow) + trace_id: UUID + + def _rich_render_(self) -> Panel: + """Renders the trace via classes in the `rich` package""" + return _render_log_value(self.value, self.message) + + def _ipython_display_(self) -> None: + """Default rendering for Jupyter notebooks""" + from rich import print + + print(self._rich_render_()) -# Required for sphinx, see also: https://docs.pydantic.dev/2.4/errors/usage_errors/#class-not-fully-defined -InMemorySpan.model_rebuild() -InMemoryTracer.model_rebuild() +def _render_log_value(value: PydanticSerializable, title: str) -> Panel: + value = value if isinstance(value, BaseModel) else JsonSerializer(root=value) + return Panel( + Syntax( + value.model_dump_json(indent=2, exclude_defaults=True), + "json", + word_wrap=True, + ), + title=title, + ) diff --git a/src/intelligence_layer/core/tracer/open_telemetry_tracer.py b/src/intelligence_layer/core/tracer/open_telemetry_tracer.py index 82da828c5..f0ee03089 100644 --- a/src/intelligence_layer/core/tracer/open_telemetry_tracer.py +++ b/src/intelligence_layer/core/tracer/open_telemetry_tracer.py @@ -1,17 +1,23 @@ from datetime import datetime -from typing import Optional +from typing import Optional, Sequence from opentelemetry.context import attach, detach from opentelemetry.trace import Span as OpenTSpan +from opentelemetry.trace import StatusCode from opentelemetry.trace import Tracer as OpenTTracer from opentelemetry.trace import set_span_in_context +from pydantic import BaseModel, SerializeAsAny from intelligence_layer.core.tracer.tracer import ( + Context, + ExportedSpan, + JsonSerializer, PydanticSerializable, Span, + SpanStatus, + SpanType, TaskSpan, Tracer, - _serialize, ) @@ -25,33 +31,33 @@ def span( self, name: str, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "OpenTelemetrySpan": - trace_id = self.ensure_id(trace_id) tracer_span = self._tracer.start_span( name, - attributes={"trace_id": trace_id}, + attributes={"type": SpanType.SPAN.value}, start_time=None if not timestamp else _open_telemetry_timestamp(timestamp), ) token = attach(set_span_in_context(tracer_span)) - return OpenTelemetrySpan(tracer_span, self._tracer, token, trace_id) + return OpenTelemetrySpan(tracer_span, self._tracer, token, self.context) def task_span( self, task_name: str, input: PydanticSerializable, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "OpenTelemetryTaskSpan": - trace_id = self.ensure_id(trace_id) - tracer_span = self._tracer.start_span( task_name, - attributes={"input": _serialize(input), "trace_id": trace_id}, + attributes={"input": _serialize(input), "type": SpanType.TASK_SPAN.value}, start_time=None if not timestamp else _open_telemetry_timestamp(timestamp), ) token = attach(set_span_in_context(tracer_span)) - return OpenTelemetryTaskSpan(tracer_span, self._tracer, token, trace_id) + return OpenTelemetryTaskSpan(tracer_span, self._tracer, token, self.context) + + def export_for_viewing(self) -> Sequence[ExportedSpan]: + raise NotImplementedError( + "The OpenTelemetryTracer does not support export for viewing, as it can not access its own traces." + ) class OpenTelemetrySpan(Span, OpenTelemetryTracer): @@ -59,16 +65,17 @@ class OpenTelemetrySpan(Span, OpenTelemetryTracer): end_timestamp: Optional[datetime] = None - def id(self) -> str: - return self._trace_id - def __init__( - self, span: OpenTSpan, tracer: OpenTTracer, token: object, trace_id: str + self, + span: OpenTSpan, + tracer: OpenTTracer, + token: object, + context: Optional[Context] = None, ) -> None: - super().__init__(tracer) + OpenTelemetryTracer.__init__(self, tracer) + Span.__init__(self, context=context) self.open_ts_span = span self._token = token - self._trace_id = trace_id def log( self, @@ -78,11 +85,15 @@ def log( ) -> None: self.open_ts_span.add_event( message, - {"value": _serialize(value), "trace_id": self.id()}, + {"value": _serialize(value)}, None if not timestamp else _open_telemetry_timestamp(timestamp), ) def end(self, timestamp: Optional[datetime] = None) -> None: + super().end(timestamp) + self.open_ts_span.set_status( + StatusCode.OK if self.status_code == SpanStatus.OK else StatusCode.ERROR + ) detach(self._token) self.open_ts_span.end( _open_telemetry_timestamp(timestamp) if timestamp is not None else None @@ -94,11 +105,6 @@ class OpenTelemetryTaskSpan(TaskSpan, OpenTelemetrySpan): output: Optional[PydanticSerializable] = None - def __init__( - self, span: OpenTSpan, tracer: OpenTTracer, token: object, trace_id: str - ) -> None: - super().__init__(span, tracer, token, trace_id) - def record_output(self, output: PydanticSerializable) -> None: self.open_ts_span.set_attribute("output", _serialize(output)) @@ -107,3 +113,8 @@ def _open_telemetry_timestamp(t: datetime) -> int: # Open telemetry expects *nanoseconds* since epoch t_float = t.timestamp() * 1e9 return int(t_float) + + +def _serialize(s: SerializeAsAny[PydanticSerializable]) -> str: + value = s if isinstance(s, BaseModel) else JsonSerializer(root=s) + return value.model_dump_json() diff --git a/src/intelligence_layer/core/tracer/persistent_tracer.py b/src/intelligence_layer/core/tracer/persistent_tracer.py index f836bc24a..9597229f8 100644 --- a/src/intelligence_layer/core/tracer/persistent_tracer.py +++ b/src/intelligence_layer/core/tracer/persistent_tracer.py @@ -1,49 +1,74 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import Iterable, Optional -from uuid import uuid4 +from typing import Iterable, Optional, Sequence +from uuid import UUID, uuid4 -from pydantic import BaseModel +from pydantic import BaseModel, SerializeAsAny -from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer, TreeBuilder +from intelligence_layer.core.tracer.in_memory_tracer import ( + InMemorySpan, + InMemoryTaskSpan, + InMemoryTracer, + LogEntry, +) from intelligence_layer.core.tracer.tracer import ( - EndSpan, - EndTask, - LogLine, - PlainEntry, + Context, + ExportedSpan, PydanticSerializable, Span, - StartSpan, - StartTask, + SpanStatus, TaskSpan, Tracer, utc_now, ) +class LogLine(BaseModel): + """Represents a complete log-line. + + Attributes: + entry_type: The type of the entry. This is the class-name of one of the classes + representing a log-entry (e.g. "StartTask"). + entry: The actual entry. + + """ + + trace_id: UUID + entry_type: str + entry: SerializeAsAny[PydanticSerializable] + + class PersistentTracer(Tracer, ABC): def __init__(self) -> None: - self.uuid = uuid4() + self.current_id = uuid4() @abstractmethod - def _log_entry(self, id: str, entry: BaseModel) -> None: + def _log_entry(self, id: UUID, entry: BaseModel) -> None: pass @abstractmethod - def trace(self, trace_id: str) -> InMemoryTracer: + def traces(self) -> InMemoryTracer: + """Returns all traces of the given tracer. + + Returns: + An InMemoryTracer that contains all traces of the tracer. + """ pass + def export_for_viewing(self) -> Sequence[ExportedSpan]: + return self.traces().export_for_viewing() + def _log_span( self, span: "PersistentSpan", name: str, timestamp: Optional[datetime] = None ) -> None: self._log_entry( - span.id(), + span.context.trace_id, StartSpan( - uuid=span.uuid, - parent=self.uuid, + uuid=span.context.span_id, + parent=self.context.span_id if self.context else span.context.trace_id, name=name, start=timestamp or utc_now(), - trace_id=span.id(), + trace_id=span.context.trace_id, ), ) @@ -56,26 +81,30 @@ def _log_task( ) -> None: try: self._log_entry( - task_span.id(), + task_span.context.trace_id, StartTask( - uuid=task_span.uuid, - parent=self.uuid, + uuid=task_span.context.span_id, + parent=self.context.span_id + if self.context + else task_span.context.trace_id, name=task_name, start=timestamp or utc_now(), input=input, - trace_id=task_span.id(), + trace_id=task_span.context.trace_id, ), ) except TracerLogEntryFailed as error: self._log_entry( - task_span.id(), + task_span.context.trace_id, StartTask( - uuid=task_span.uuid, - parent=self.uuid, + uuid=task_span.context.span_id, + parent=self.context.span_id + if self.context + else task_span.context.trace_id, name=task_name, start=timestamp or utc_now(), input=error.description, - trace_id=task_span.id(), + trace_id=task_span.context.trace_id, ), ) @@ -109,31 +138,38 @@ def log( ) -> None: try: self._log_entry( - self.id(), + self.context.trace_id, PlainEntry( message=message, value=value, timestamp=timestamp or utc_now(), - parent=self.uuid, - trace_id=self.id(), + parent=self.context.span_id, + trace_id=self.context.trace_id, ), ) except TracerLogEntryFailed as error: self._log_entry( - self.id(), + self.context.trace_id, PlainEntry( message="log entry failed", value=error.description, timestamp=timestamp or utc_now(), - parent=self.uuid, - trace_id=self.id(), + parent=self.context.span_id, + trace_id=self.context.trace_id, ), ) def end(self, timestamp: Optional[datetime] = None) -> None: if not self.end_timestamp: self.end_timestamp = timestamp or utc_now() - self._log_entry(self.id(), EndSpan(uuid=self.uuid, end=self.end_timestamp)) + self._log_entry( + self.context.trace_id, + EndSpan( + uuid=self.context.span_id, + end=self.end_timestamp, + status_code=self.status_code, + ), + ) class PersistentTaskSpan(TaskSpan, PersistentSpan, ABC): @@ -146,8 +182,13 @@ def end(self, timestamp: Optional[datetime] = None) -> None: if not self.end_timestamp: self.end_timestamp = timestamp or utc_now() self._log_entry( - self.id(), - EndTask(uuid=self.uuid, end=self.end_timestamp, output=self.output), + self.context.trace_id, + EndTask( + uuid=self.context.span_id, + end=self.end_timestamp, + output=self.output, + status_code=self.status_code, + ), ) @@ -157,3 +198,158 @@ def __init__(self, error_message: str, id: str) -> None: f"Log entry with id {id} failed with error message {error_message}." ) self.description = error_message + + +class StartTask(BaseModel): + """Represents the payload/entry of a log-line indicating that a `TaskSpan` was opened through `Tracer.task_span`. + + Attributes: + uuid: A unique id for the opened `TaskSpan`. + parent: The unique id of the parent element of opened `TaskSpan`. + This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. + name: The name of the task. + start: The timestamp when this `Task` was started (i.e. `run` was called). + input: The `Input` (i.e. parameter for `run`) the `Task` was started with. + trace_id: The trace id of the opened `TaskSpan`. + + """ + + uuid: UUID + parent: UUID + name: str + start: datetime + input: SerializeAsAny[PydanticSerializable] + trace_id: UUID + + +class EndTask(BaseModel): + """Represents the payload/entry of a log-line that indicates that a `TaskSpan` ended (i.e. the context-manager exited). + + Attributes: + uuid: The uuid of the corresponding `StartTask`. + end: the timestamp when this `Task` completed (i.e. `run` returned). + output: the `Output` (i.e. return value of `run`) the `Task` returned. + """ + + uuid: UUID + end: datetime + output: SerializeAsAny[PydanticSerializable] + status_code: SpanStatus = SpanStatus.OK + + +class StartSpan(BaseModel): + """Represents the payload/entry of a log-line indicating that a `Span` was opened through `Tracer.span`. + + Attributes: + uuid: A unique id for the opened `Span`. + parent: The unique id of the parent element of opened `TaskSpan`. + This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. + name: The name of the task. + start: The timestamp when this `Span` was started. + trace_id: The ID of the trace this span belongs to. + """ + + uuid: UUID + parent: UUID + name: str + start: datetime + trace_id: UUID + + +class EndSpan(BaseModel): + """Represents the payload/entry of a log-line that indicates that a `Span` ended. + + Attributes: + uuid: The uuid of the corresponding `StartSpan`. + end: the timestamp when this `Span` completed. + """ + + uuid: UUID + end: datetime + status_code: SpanStatus = SpanStatus.OK + + +class PlainEntry(BaseModel): + """Represents a plain log-entry created through `Tracer.log`. + + Attributes: + message: the message-parameter of `Tracer.log` + value: the value-parameter of `Tracer.log` + timestamp: the timestamp when `Tracer.log` was called. + parent: The unique id of the parent element of the log. + This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. + trace_id: The ID of the trace this entry belongs to. + """ + + message: str + value: SerializeAsAny[PydanticSerializable] + timestamp: datetime + parent: UUID + trace_id: UUID + + +class TreeBuilder: + def __init__(self) -> None: + self.root = InMemoryTracer() + self.tracers: dict[UUID, InMemoryTracer] = dict() + self.tasks: dict[UUID, InMemoryTaskSpan] = dict() + self.spans: dict[UUID, InMemorySpan] = dict() + + def start_task(self, log_line: LogLine) -> None: + start_task = StartTask.model_validate(log_line.entry) + converted_span = InMemoryTaskSpan( + name=start_task.name, + input=start_task.input, + start_timestamp=start_task.start, + context=Context(trace_id=start_task.trace_id, span_id=start_task.parent) + if start_task.trace_id != start_task.uuid + else None, + ) + # if root, also change the trace id + if converted_span.context.trace_id == converted_span.context.span_id: + converted_span.context.trace_id = start_task.uuid + converted_span.context.span_id = start_task.uuid + self.tracers.get(start_task.parent, self.root).entries.append(converted_span) + self.tracers[start_task.uuid] = converted_span + self.tasks[start_task.uuid] = converted_span + + def end_task(self, log_line: LogLine) -> None: + end_task = EndTask.model_validate(log_line.entry) + task_span = self.tasks[end_task.uuid] + task_span.record_output(end_task.output) + task_span.status_code = end_task.status_code + task_span.end(end_task.end) + + def start_span(self, log_line: LogLine) -> None: + start_span = StartSpan.model_validate(log_line.entry) + converted_span = InMemorySpan( + name=start_span.name, + start_timestamp=start_span.start, + context=Context(trace_id=start_span.trace_id, span_id=start_span.parent) + if start_span.trace_id != start_span.uuid + else None, + ) + # if root, also change the trace id + if converted_span.context.trace_id == converted_span.context.span_id: + converted_span.context.trace_id = start_span.uuid + converted_span.context.span_id = start_span.uuid + + self.tracers.get(start_span.parent, self.root).entries.append(converted_span) + self.tracers[start_span.uuid] = converted_span + self.spans[start_span.uuid] = converted_span + + def end_span(self, log_line: LogLine) -> None: + end_span = EndSpan.model_validate(log_line.entry) + span = self.spans[end_span.uuid] + span.status_code = end_span.status_code + span.end(end_span.end) + + def plain_entry(self, log_line: LogLine) -> None: + plain_entry = PlainEntry.model_validate(log_line.entry) + entry = LogEntry( + message=plain_entry.message, + value=plain_entry.value, + timestamp=plain_entry.timestamp, + trace_id=plain_entry.trace_id, + ) + self.tracers[plain_entry.parent].entries.append(entry) diff --git a/src/intelligence_layer/core/tracer/tracer.py b/src/intelligence_layer/core/tracer/tracer.py index b4654d21c..f97a6c9e4 100644 --- a/src/intelligence_layer/core/tracer/tracer.py +++ b/src/intelligence_layer/core/tracer/tracer.py @@ -2,13 +2,12 @@ from abc import ABC, abstractmethod from contextlib import AbstractContextManager from datetime import datetime, timezone +from enum import Enum from types import TracebackType -from typing import TYPE_CHECKING, Mapping, Optional, Sequence, TypeVar +from typing import TYPE_CHECKING, Mapping, Optional, Sequence from uuid import UUID, uuid4 from pydantic import BaseModel, Field, RootModel, SerializeAsAny -from rich.panel import Panel -from rich.syntax import Syntax from typing_extensions import Self, TypeAliasType if TYPE_CHECKING: @@ -50,6 +49,53 @@ def utc_now() -> datetime: return datetime.now(timezone.utc) +class Event(BaseModel): + name: str + message: str + body: SerializeAsAny[PydanticSerializable] + timestamp: datetime = Field(default_factory=utc_now) + + +class SpanType(Enum): + SPAN = "SPAN" + TASK_SPAN = "TASK_SPAN" + + +class SpanAttributes(BaseModel): + type: SpanType = SpanType.SPAN + + +class TaskSpanAttributes(SpanAttributes): + type: SpanType = SpanType.TASK_SPAN + input: SerializeAsAny[PydanticSerializable] + output: SerializeAsAny[PydanticSerializable] + + +class SpanStatus(Enum): + OK = "OK" + ERROR = "ERROR" + + +class Context(BaseModel): + trace_id: UUID + span_id: UUID + + +class ExportedSpan(BaseModel): + context: Context + name: str | None + parent_id: UUID | None + start_time: datetime + end_time: datetime + attributes: SpanAttributes + events: Sequence[Event] + status: SpanStatus + # we ignore the links concept + + +ExportedSpanList = RootModel[Sequence[ExportedSpan]] + + class Tracer(ABC): """Provides a consistent way to instrument a :class:`Task` with logging for each step of the workflow. @@ -61,12 +107,13 @@ class Tracer(ABC): documentation of each implementation to see how to use the resulting tracer. """ + context: Context | None = None + @abstractmethod def span( self, name: str, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "Span": """Generate a span from the current span or logging instance. @@ -79,8 +126,7 @@ def span( Args: name: A descriptive name of what this span will contain logs about. - timestamp: optional override of the starting timestamp. Otherwise should default to now. - trace_id: optional override of a trace id. Otherwise it creates a new default id. + timestamp: Override of the starting timestamp. Defaults to call time. Returns: An instance of a Span. @@ -93,7 +139,6 @@ def task_span( task_name: str, input: PydanticSerializable, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "TaskSpan": """Generate a task-specific span from the current span or logging instance. @@ -106,25 +151,24 @@ def task_span( Args: task_name: The name of the task that is being logged input: The input for the task that is being logged. - timestamp: optional override of the starting timestamp. Otherwise should default to now. - trace_id: optional override of a trace id. Otherwise it creates a new default id. - + timestamp: Override of the starting timestamp. Defaults to call time. Returns: An instance of a TaskSpan. """ ... - def ensure_id(self, id: Optional[str]) -> str: - """Returns a valid id for tracing. + @abstractmethod + def export_for_viewing(self) -> Sequence[ExportedSpan]: + """Converts the trace to a format that can be read by the trace viewer. + + The format is inspired by the OpenTelemetry Format, but does not abide by it. + Specifically, it cuts away unused concepts, such as links. - Args: - id: current id to use if present. Returns: - `id` if present, otherwise a new unique ID. + A list of spans which includes the current span and all its child spans. """ - - return id if id is not None else str(uuid4()) + ... class ErrorValue(BaseModel): @@ -138,15 +182,36 @@ class Span(Tracer, AbstractContextManager["Span"]): Logs and other spans can be nested underneath. - Can also be used as a Context Manager to easily capture the start and end time, and keep the - span only in scope while it is active. + Can also be used as a context manager to easily capture the start and end time, and keep the + span only in scope while it is open. + + Attributes: + context: The context of the current span. If the span is a root span, the trace id will be equal to its span id. + status_code: Status of the span. Will be "OK" unless the span was interrupted by an exception. """ - @abstractmethod - def id(self) -> str: - pass + context: Context + + def __init__(self, context: Optional[Context] = None): + """Creates a span from the context of its parent. + + Initializes the spans `context` based on the parent context and its `status_code`. + + Args: + context: Context of the parent. Defaults to None. + """ + span_id = uuid4() + if context is None: + trace_id = span_id + else: + trace_id = context.trace_id + self.context = Context(trace_id=trace_id, span_id=span_id) + self.status_code = SpanStatus.OK + self._closed = False def __enter__(self) -> Self: + if self._closed: + raise ValueError("Spans cannot be opened once they have been closed.") return self @abstractmethod @@ -161,6 +226,8 @@ def log( By default, the `Input` and `Output` of each :class:`Task` are logged automatically, but you can log anything else that seems relevant to understanding the process of a given task. + Logging to closed spans is undefined behavior. + Args: message: A description of the value you are logging, such as the step in the task this is related to. @@ -172,23 +239,15 @@ def log( @abstractmethod def end(self, timestamp: Optional[datetime] = None) -> None: - """Marks the Span as done, with the end time of the span. The Span should be regarded + """Marks the Span as closed, with the end time of the span. The Span should be regarded as complete, and no further logging should happen with it. - Args: - timestamp: Optional override of the timestamp, otherwise should be set to now. - """ - ... - - def ensure_id(self, id: str | None) -> str: - """Returns a valid id for tracing. + Ending a closed span in undefined behavior. Args: - id: current id to use if present. - Returns: - `id` if present, otherwise id of this `Span` + timestamp: Optional override of the timestamp. Defaults to call time. """ - return id if id is not None else self.id() + self._closed = True def __exit__( self, @@ -203,7 +262,9 @@ def __exit__( stack_trace=str(traceback.format_exc()), ) self.log(error_value.message, error_value) + self.status_code = SpanStatus.ERROR self.end() + self._closed = True class TaskSpan(Span): @@ -220,9 +281,6 @@ def record_output(self, output: PydanticSerializable) -> None: """Record :class:`Task` output. Since a Context Manager can't provide this in the `__exit__` method, output should be captured once it is generated. - This should be handled automatically within the execution of the task, and it is - unlikely this would be called directly by you. - Args: output: The output of the task that is being logged. """ @@ -244,11 +302,6 @@ def __exit__( self.end() -TracerVar = TypeVar("TracerVar", bound=Tracer) - -SpanVar = TypeVar("SpanVar", bound=Span) - - class NoOpTracer(TaskSpan): """A no-op tracer. @@ -273,7 +326,6 @@ def span( self, name: str, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "NoOpTracer": return self @@ -282,7 +334,6 @@ def task_span( task_name: str, input: PydanticSerializable, timestamp: Optional[datetime] = None, - trace_id: Optional[str] = None, ) -> "NoOpTracer": return self @@ -292,156 +343,9 @@ def record_output(self, output: PydanticSerializable) -> None: def end(self, timestamp: Optional[datetime] = None) -> None: pass + def export_for_viewing(self) -> Sequence[ExportedSpan]: + return [] + class JsonSerializer(RootModel[PydanticSerializable]): root: SerializeAsAny[PydanticSerializable] - - -def _render_log_value(value: PydanticSerializable, title: str) -> Panel: - value = value if isinstance(value, BaseModel) else JsonSerializer(root=value) - return Panel( - Syntax( - value.model_dump_json(indent=2, exclude_defaults=True), - "json", - word_wrap=True, - ), - title=title, - ) - - -class LogEntry(BaseModel): - """An individual log entry, currently used to represent individual logs by the - `InMemoryTracer`. - - Attributes: - message: A description of the value you are logging, such as the step in the task this - is related to. - value: The relevant data you want to log. Can be anything that is serializable by - Pydantic, which gives the tracers flexibility in how they store and emit the logs. - timestamp: The time that the log was emitted. - id: The ID of the trace to which this log entry belongs. - """ - - message: str - value: SerializeAsAny[PydanticSerializable] - timestamp: datetime = Field(default_factory=datetime.utcnow) - trace_id: str - - def id(self) -> str: - return self.trace_id - - def _rich_render_(self) -> Panel: - """Renders the trace via classes in the `rich` package""" - return _render_log_value(self.value, self.message) - - def _ipython_display_(self) -> None: - """Default rendering for Jupyter notebooks""" - from rich import print - - print(self._rich_render_()) - - -class StartTask(BaseModel): - """Represents the payload/entry of a log-line indicating that a `TaskSpan` was opened through `Tracer.task_span`. - - Attributes: - uuid: A unique id for the opened `TaskSpan`. - parent: The unique id of the parent element of opened `TaskSpan`. - This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. - name: The name of the task. - start: The timestamp when this `Task` was started (i.e. `run` was called). - input: The `Input` (i.e. parameter for `run`) the `Task` was started with. - trace_id: The trace id of the opened `TaskSpan`. - - """ - - uuid: UUID - parent: UUID - name: str - start: datetime - input: SerializeAsAny[PydanticSerializable] - trace_id: str - - -class EndTask(BaseModel): - """Represents the payload/entry of a log-line that indicates that a `TaskSpan` ended (i.e. the context-manager exited). - - Attributes: - uuid: The uuid of the corresponding `StartTask`. - end: the timestamp when this `Task` completed (i.e. `run` returned). - output: the `Output` (i.e. return value of `run`) the `Task` returned. - """ - - uuid: UUID - end: datetime - output: SerializeAsAny[PydanticSerializable] - - -class StartSpan(BaseModel): - """Represents the payload/entry of a log-line indicating that a `Span` was opened through `Tracer.span`. - - Attributes: - uuid: A unique id for the opened `Span`. - parent: The unique id of the parent element of opened `TaskSpan`. - This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. - name: The name of the task. - start: The timestamp when this `Span` was started. - trace_id: The ID of the trace this span belongs to. - """ - - uuid: UUID - parent: UUID - name: str - start: datetime - trace_id: str - - -class EndSpan(BaseModel): - """Represents the payload/entry of a log-line that indicates that a `Span` ended. - - Attributes: - uuid: The uuid of the corresponding `StartSpan`. - end: the timestamp when this `Span` completed. - """ - - uuid: UUID - end: datetime - - -class PlainEntry(BaseModel): - """Represents a plain log-entry created through `Tracer.log`. - - Attributes: - message: the message-parameter of `Tracer.log` - value: the value-parameter of `Tracer.log` - timestamp: the timestamp when `Tracer.log` was called. - parent: The unique id of the parent element of the log. - This could refer to either a surrounding `TaskSpan`, `Span` or the top-level `Tracer`. - trace_id: The ID of the trace this entry belongs to. - """ - - message: str - value: SerializeAsAny[PydanticSerializable] - timestamp: datetime - parent: UUID - trace_id: str - - -class LogLine(BaseModel): - """Represents a complete log-line. - - Attributes: - entry_type: The type of the entry. This is the class-name of one of the classes - representing a log-entry (e.g. "StartTask"). - entry: The actual entry. - - """ - - trace_id: str - entry_type: str - entry: SerializeAsAny[PydanticSerializable] - - -def _serialize(s: SerializeAsAny[PydanticSerializable]) -> str: - value = s if isinstance(s, BaseModel) else JsonSerializer(root=s) - return value.model_dump_json() diff --git a/src/intelligence_layer/evaluation/run/file_run_repository.py b/src/intelligence_layer/evaluation/run/file_run_repository.py index 454f42a04..0a201c4e2 100644 --- a/src/intelligence_layer/evaluation/run/file_run_repository.py +++ b/src/intelligence_layer/evaluation/run/file_run_repository.py @@ -121,7 +121,7 @@ def _example_trace_path(self, run_id: str, example_id: str) -> Path: @staticmethod def _parse_log(log_path: Path) -> InMemoryTracer: - return FileTracer(log_path).trace() + return FileTracer(log_path).traces() def _example_output_path(self, run_id: str, example_id: str) -> Path: return (self._run_output_directory(run_id) / example_id).with_suffix(".json") diff --git a/tests/core/test_task.py b/tests/core/test_task.py index bdc2fa4e8..34629181c 100644 --- a/tests/core/test_task.py +++ b/tests/core/test_task.py @@ -4,8 +4,13 @@ from time import sleep from typing import Callable -from intelligence_layer.core import InMemorySpan, InMemoryTracer, NoOpTracer, TaskSpan -from intelligence_layer.core.task import MAX_CONCURRENCY, Task +from intelligence_layer.core import ( + MAX_CONCURRENCY, + InMemoryTracer, + NoOpTracer, + Task, + TaskSpan, +) class ConcurrencyCounter(Task[None, None]): @@ -111,21 +116,3 @@ def test_sub_tasks_do_not_introduce_multiple_task_spans() -> None: assert isinstance(tracer.entries[0], TaskSpan) assert tracer.entries[0].entries assert not isinstance(tracer.entries[0].entries[0], TaskSpan) - - -def test_ids_are_set_in_concurrent_run() -> None: - tracer = InMemoryTracer() - task = DeadlockDetector() - - task.run_concurrently([None] * MAX_CONCURRENCY, tracer, trace_id="ID") - assert tracer.entries - assert tracer.entries[0].id() == "ID" - - -def test_ids_are_equal_for_multiple_subtasks() -> None: - tracer = InMemoryTracer() - NestedTask().run(None, tracer, "ID") - assert isinstance(tracer.entries[0], InMemorySpan) - assert tracer.entries[0].id() == "ID" - assert isinstance(tracer.entries[0].entries[0], InMemorySpan) - assert tracer.entries[0].entries[0].id() == "ID" diff --git a/tests/core/test_tracer.py b/tests/core/test_tracer.py deleted file mode 100644 index a4f82891c..000000000 --- a/tests/core/test_tracer.py +++ /dev/null @@ -1,392 +0,0 @@ -import contextlib -import json -import os -import time -from pathlib import Path -from typing import Any, Iterator, Optional -from unittest.mock import Mock - -import pytest -import requests -from aleph_alpha_client import Prompt -from opentelemetry import trace -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import SERVICE_NAME, Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from pytest import fixture - -from intelligence_layer.core import ( - CompleteInput, - CompleteOutput, - CompositeTracer, - FileTracer, - InMemorySpan, - InMemoryTaskSpan, - InMemoryTracer, - LogEntry, - LuminousControlModel, - OpenTelemetryTracer, - Task, - TaskSpan, - utc_now, -) -from intelligence_layer.core.tracer.persistent_tracer import TracerLogEntryFailed -from intelligence_layer.core.tracer.tracer import ErrorValue - - -@fixture -def complete( - luminous_control_model: LuminousControlModel, -) -> Task[CompleteInput, CompleteOutput]: - return luminous_control_model.complete_task() - - -@fixture -def open_telemetry_tracer() -> tuple[str, OpenTelemetryTracer]: - service_name = "test-service" - url = "http://localhost:16686/api/traces?service=" + service_name - resource = Resource.create({SERVICE_NAME: service_name}) - provider = TracerProvider(resource=resource) - trace.set_tracer_provider(provider) - processor = BatchSpanProcessor(OTLPSpanExporter()) - provider.add_span_processor(processor) - openTracer = OpenTelemetryTracer(trace.get_tracer("intelligence-layer")) - return (url, openTracer) - - -def test_composite_tracer_id_consistent_across_children( - file_tracer: FileTracer, -) -> None: - input = "input" - tracer1 = InMemoryTracer() - - TestTask().run(input, CompositeTracer([tracer1])) - assert isinstance(tracer1.entries[0], InMemorySpan) - assert tracer1.entries[0].id() == tracer1.entries[0].entries[0].id() - - -def test_tracer_id_exists_for_all_children_of_task_span() -> None: - tracer = InMemoryTracer() - parent_span = tracer.task_span("child", "input", trace_id="ID") - parent_span.span("child2") - - assert isinstance(tracer.entries[0], InMemorySpan) - assert tracer.entries[0].id() == "ID" - - assert tracer.entries[0].entries[0].id() == tracer.entries[0].id() - - parent_span.task_span("child3", "input") - assert tracer.entries[0].entries[1].id() == tracer.entries[0].id() - - -def test_tracer_id_exists_for_all_children_of_span() -> None: - tracer = InMemoryTracer() - parent_span = tracer.span("child", trace_id="ID") - parent_span.span("child2") - - assert isinstance(tracer.entries[0], InMemorySpan) - assert tracer.entries[0].id() == "ID" - assert tracer.entries[0].entries[0].id() == tracer.entries[0].id() - - parent_span.task_span("child3", "input") - assert tracer.entries[0].entries[1].id() == tracer.entries[0].id() - - -def test_can_add_child_tracer() -> None: - tracer = InMemoryTracer() - tracer.span("child") - - assert len(tracer.entries) == 1 - - log = tracer.entries[0] - assert isinstance(log, InMemoryTracer) - assert log.name == "child" - assert len(log.entries) == 0 - - -def test_can_add_parent_and_child_entries() -> None: - parent = InMemoryTracer() - with parent.span("child") as child: - child.log("Two", 2) - - assert isinstance(parent.entries[0], InMemoryTracer) - assert isinstance(parent.entries[0].entries[0], LogEntry) - - -def test_task_logs_error_value() -> None: - tracer = InMemoryTracer() - - with pytest.raises(ValueError): - with tracer.span("failing task"): - raise ValueError("my bad, sorry") - - assert isinstance(tracer.entries[0], InMemorySpan) - assert isinstance(tracer.entries[0].entries[0], LogEntry) - error = tracer.entries[0].entries[0].value - assert isinstance(error, ErrorValue) - assert error.message == "my bad, sorry" - assert error.error_type == "ValueError" - assert error.stack_trace.startswith("Traceback") - - -def test_task_span_records_error_value() -> None: - tracer = InMemoryTracer() - - with pytest.raises(ValueError): - with tracer.task_span("failing task", None): - raise ValueError("my bad, sorry") - - assert isinstance(tracer.entries[0], InMemoryTaskSpan) - error = tracer.entries[0].output - assert isinstance(error, ErrorValue) - assert error.message == "my bad, sorry" - assert error.error_type == "ValueError" - assert error.stack_trace.startswith("Traceback") - - -def test_task_automatically_logs_input_and_output( - complete: Task[CompleteInput, CompleteOutput], -) -> None: - tracer = InMemoryTracer() - input = CompleteInput(prompt=Prompt.from_text("test")) - output = complete.run(input=input, tracer=tracer) - - assert len(tracer.entries) == 1 - task_span = tracer.entries[0] - assert isinstance(task_span, InMemoryTaskSpan) - assert task_span.name == type(complete).__name__ - assert task_span.input == input - assert task_span.output == output - assert task_span.start_timestamp and task_span.end_timestamp - assert task_span.start_timestamp < task_span.end_timestamp - - -def test_tracer_can_set_custom_start_time_for_log_entry() -> None: - tracer = InMemoryTracer() - timestamp = utc_now() - - with tracer.span("span") as span: - span.log("log", "message", timestamp) - - assert isinstance(tracer.entries[0], InMemorySpan) - assert isinstance(tracer.entries[0].entries[0], LogEntry) - assert tracer.entries[0].entries[0].timestamp == timestamp - - -def test_tracer_can_set_custom_start_time_for_span() -> None: - tracer = InMemoryTracer() - start = utc_now() - - span = tracer.span("span", start) - - assert span.start_timestamp == start - - -def test_span_sets_end_timestamp() -> None: - tracer = InMemoryTracer() - start = utc_now() - - span = tracer.span("span", start) - span.end() - - assert span.end_timestamp and span.start_timestamp <= span.end_timestamp - - -def test_span_only_updates_end_timestamp_once() -> None: - tracer = InMemoryTracer() - - span = tracer.span("span") - end = utc_now() - span.end(end) - span.end() - - assert span.end_timestamp == end - - -def test_composite_tracer(complete: Task[CompleteInput, CompleteOutput]) -> None: - tracer1 = InMemoryTracer() - tracer2 = InMemoryTracer() - input = CompleteInput(prompt=Prompt.from_text("test")) - complete.run(input=input, tracer=CompositeTracer([tracer1, tracer2])) - - assert tracer1 == tracer2 - - -class TestSubTask(Task[None, None]): - def do_run(self, input: None, task_span: TaskSpan) -> None: - task_span.log("subtask", "value") - - -class TestTask(Task[str, str]): - sub_task = TestSubTask() - - def do_run(self, input: str, task_span: TaskSpan) -> str: - with task_span.span("span") as sub_span: - sub_span.log("message", "a value") - self.sub_task.run(None, sub_span) - self.sub_task.run(None, task_span) - - return "output" - - -@fixture -def file_tracer(tmp_path: Path) -> FileTracer: - return FileTracer(tmp_path / "log.log") - - -def test_file_tracer(file_tracer: FileTracer) -> None: - input = "input" - expected = InMemoryTracer() - - TestTask().run(input, CompositeTracer([expected, file_tracer])) - - log_tree = file_tracer.trace() - assert log_tree == expected - - -def test_file_tracer_retrieves_correct_trace(file_tracer: FileTracer) -> None: - input = "input" - expected = InMemoryTracer() - compositeTracer = CompositeTracer([expected, file_tracer]) - TestTask().run(input, compositeTracer, "ID1") - TestTask().run(input, file_tracer, "ID2") - log_tree = file_tracer.trace("ID1") - assert log_tree == expected - - -def test_file_tracer_handles_tracer_log_entry_failed_exception( - file_tracer: FileTracer, -) -> None: - file_tracer._log_entry = Mock( # type: ignore[method-assign] - side_effect=[TracerLogEntryFailed("Hi I am an error", "21"), None] - ) - - try: - file_tracer.task_span( - task_name="mock_task_name", input="42", timestamp=None, trace_id="21" - ) - except Exception as exception: - assert False, f"'Unexpected exception: {exception}" - - -def test_file_tracer_raises_non_log_entry_failed_exceptions( - file_tracer: FileTracer, -) -> None: - file_tracer._log_entry = Mock(side_effect=[Exception("Hi I am an error", "21")]) # type: ignore[method-assign] - with pytest.raises(Exception): - file_tracer.task_span( - task_name="mock_task_name", input="42", timestamp=None, trace_id="21" - ) - - -# take from and modified: https://stackoverflow.com/questions/2059482/temporarily-modify-the-current-processs-environment -@contextlib.contextmanager -def set_env(name: str, value: str | None) -> Iterator[None]: - old_environ = dict(os.environ) - if value is None: - if os.getenv(name, None) is not None: - os.environ.pop(name) - else: - os.environ[name] = value - try: - yield - finally: - os.environ.clear() - os.environ.update(old_environ) - - -def test_in_memory_tracer_trace_viewer_doesnt_crash_if_it_cant_reach() -> None: - # note that this test sets the environment variable, which might - # become a problem with multi-worker tests - ENV_VARIABLE_NAME = "TRACE_VIEWER_URL" - # ensure that the code works even with the variable is not set - with set_env(ENV_VARIABLE_NAME, None): - expected = InMemoryTracer() - expected._ipython_display_() - - -@pytest.mark.docker -def test_open_telemetry_tracer_check_consistency_in_trace_ids( - open_telemetry_tracer: tuple[str, OpenTelemetryTracer], -) -> None: - tracing_service, tracer = open_telemetry_tracer - expected_trace_id = tracer.ensure_id(None) - TestTask().run("test-input", tracer, trace_id=expected_trace_id) - trace = _get_trace_by_id(tracing_service, expected_trace_id) - - assert trace is not None - assert _get_trace_id_from_trace(trace) == expected_trace_id - spans = trace["spans"] - assert len(spans) == 4 - for span in spans: - assert _get_trace_id_from_span(span) == expected_trace_id - - -@pytest.mark.docker -def test_open_telemetry_tracer_loggs_input_and_output( - open_telemetry_tracer: tuple[str, OpenTelemetryTracer], - complete: Task[CompleteInput, CompleteOutput], -) -> None: - tracing_service, tracer = open_telemetry_tracer - input = CompleteInput(prompt=Prompt.from_text("test")) - trace_id = tracer.ensure_id(None) - complete.run(input, tracer, trace_id) - trace = _get_trace_by_id(tracing_service, trace_id) - - assert trace is not None - - spans = trace["spans"] - assert spans is not [] - - task_span = next((span for span in spans if span["references"] == []), None) - assert task_span is not None - - tags = task_span["tags"] - open_tel_input_tag = [tag for tag in tags if tag["key"] == "input"] - assert len(open_tel_input_tag) == 1 - - open_tel_output_tag = [tag for tag in tags if tag["key"] == "output"] - assert len(open_tel_output_tag) == 1 - - -def _get_trace_by_id(tracing_service: str, wanted_trace_id: str) -> Optional[Any]: - request_timeout_in_seconds = 10 - traces = _get_current_traces(tracing_service) - if traces: - for current_trace in traces: - trace_id = _get_trace_id_from_trace(current_trace) - if trace_id == wanted_trace_id: - return trace - - request_start = time.time() - while time.time() - request_start < request_timeout_in_seconds: - traces = _get_current_traces(tracing_service) - if traces: - for current_trace in traces: - trace_id = _get_trace_id_from_trace(current_trace) - if trace_id == wanted_trace_id: - return current_trace - time.sleep(0.1) - return None - - -def _get_current_traces(tracing_service: str) -> Any: - response = requests.get(tracing_service) - response_text = json.loads(response.text) - return response_text["data"] - - -def _get_trace_id_from_trace(trace: Any) -> Optional[str]: - spans = trace["spans"] - if not spans: - return None - return _get_trace_id_from_span(spans[0]) - - -def _get_trace_id_from_span(span: Any) -> Optional[str]: - tags = span["tags"] - if not tags: - return None - trace_id_tag = next(tag for tag in tags if tag["key"] == "trace_id") - return str(trace_id_tag["value"]) diff --git a/tests/core/tracer/conftest.py b/tests/core/tracer/conftest.py new file mode 100644 index 000000000..0d6bb538c --- /dev/null +++ b/tests/core/tracer/conftest.py @@ -0,0 +1,40 @@ +from pathlib import Path + +from pytest import fixture + +from intelligence_layer.core import FileTracer, InMemoryTracer, Task, TaskSpan + + +class TracerTestSubTask(Task[None, None]): + def do_run(self, input: None, task_span: TaskSpan) -> None: + task_span.log("subtask", "value") + + +class TracerTestTask(Task[str, str]): + sub_task = TracerTestSubTask() + + def do_run(self, input: str, task_span: TaskSpan) -> str: + with task_span.span("span") as sub_span: + sub_span.log("message", "a value") + self.sub_task.run(None, sub_span) + self.sub_task.run(None, task_span) + return "output" + + +class SpecificTestException(Exception): + pass + + +@fixture +def tracer_test_task() -> Task[str, str]: + return TracerTestTask() + + +@fixture +def file_tracer(tmp_path: Path) -> FileTracer: + return FileTracer(tmp_path / "log.log") + + +@fixture +def in_memory_tracer() -> InMemoryTracer: + return InMemoryTracer() diff --git a/tests/core/tracer/fixtures/old_file_trace_format.jsonl b/tests/core/tracer/fixtures/old_file_trace_format.jsonl new file mode 100644 index 000000000..bf1fb390e --- /dev/null +++ b/tests/core/tracer/fixtures/old_file_trace_format.jsonl @@ -0,0 +1,11 @@ +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartTask","entry":{"uuid":"41528209-1b78-4785-a00d-7f65af1bb09c","parent":"75e79a11-1a26-4731-8b49-ef8634c352ed","name":"TestTask","start":"2024-05-22T09:43:37.428758Z","input":"input","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}} +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartSpan","entry":{"uuid":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","parent":"41528209-1b78-4785-a00d-7f65af1bb09c","name":"span","start":"2024-05-22T09:43:37.429448Z","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}} +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"PlainEntry","entry":{"message":"message","value":"a value","timestamp":"2024-05-22T09:43:37.429503Z","parent":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}} +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartTask","entry":{"uuid":"e8cca541-57a8-440a-b848-7c3b33a97f52","parent":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","name":"TestSubTask","start":"2024-05-22T09:43:37.429561Z","input":null,"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}} +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"PlainEntry","entry":{"message":"subtask","value":"value","timestamp":"2024-05-22T09:43:37.429605Z","parent":"e8cca541-57a8-440a-b848-7c3b33a97f52","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}} +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndTask","entry":{"uuid":"e8cca541-57a8-440a-b848-7c3b33a97f52","end":"2024-05-22T09:43:37.429647Z","output":null}} +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndSpan","entry":{"uuid":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","end":"2024-05-22T09:43:37.429687Z"}} +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartTask","entry":{"uuid":"8840185c-2019-4105-9178-1b0e20ab6388","parent":"41528209-1b78-4785-a00d-7f65af1bb09c","name":"TestSubTask","start":"2024-05-22T09:43:37.429728Z","input":null,"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}} +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"PlainEntry","entry":{"message":"subtask","value":"value","timestamp":"2024-05-22T09:43:37.429768Z","parent":"8840185c-2019-4105-9178-1b0e20ab6388","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}} +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndTask","entry":{"uuid":"8840185c-2019-4105-9178-1b0e20ab6388","end":"2024-05-22T09:43:37.429806Z","output":null}} +{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndTask","entry":{"uuid":"41528209-1b78-4785-a00d-7f65af1bb09c","end":"2024-05-22T09:43:37.429842Z","output":"output"}} diff --git a/tests/core/tracer/test_composite_tracer.py b/tests/core/tracer/test_composite_tracer.py new file mode 100644 index 000000000..a0ce65771 --- /dev/null +++ b/tests/core/tracer/test_composite_tracer.py @@ -0,0 +1,51 @@ +import pytest + +from intelligence_layer.core import CompositeTracer, InMemoryTracer, SpanStatus, Task +from tests.core.tracer.conftest import SpecificTestException + + +def test_composite_tracer(tracer_test_task: Task[str, str]) -> None: + tracer_1 = InMemoryTracer() + tracer_2 = InMemoryTracer() + tracer_test_task.run(input="input", tracer=CompositeTracer([tracer_1, tracer_2])) + + trace_1 = tracer_1.export_for_viewing()[0] + trace_2 = tracer_2.export_for_viewing()[0] + assert trace_1.name == trace_2.name + assert trace_1.attributes == trace_2.attributes + assert trace_1.status == trace_2.status + assert trace_1.context.trace_id != trace_2.context.trace_id + assert trace_1.context.span_id != trace_2.context.span_id + + +def test_composite_tracer_can_get_span_status( + tracer_test_task: Task[str, str], +) -> None: + tracer_1 = InMemoryTracer() + tracer_2 = InMemoryTracer() + + composite_tracer = CompositeTracer([tracer_1, tracer_2]) + + with composite_tracer.span("test_name") as composite_span: + composite_span.status_code == SpanStatus.OK + + +def test_composite_tracer_raises_for_inconsistent_span_status( + tracer_test_task: Task[str, str], +) -> None: + tracer_1 = InMemoryTracer() + tracer_2 = InMemoryTracer() + + composite_tracer = CompositeTracer([tracer_1, tracer_2]) + + with composite_tracer.span("test_name") as composite_span: + spans = composite_span.tracers + single_span = spans[0] + try: + with single_span: + raise SpecificTestException + except SpecificTestException: + pass + + with pytest.raises(ValueError): + composite_span.status_code diff --git a/tests/core/tracer/test_file_tracer.py b/tests/core/tracer/test_file_tracer.py new file mode 100644 index 000000000..0f01147c5 --- /dev/null +++ b/tests/core/tracer/test_file_tracer.py @@ -0,0 +1,70 @@ +from pathlib import Path +from unittest.mock import Mock + +import pytest +from pytest import fixture + +from intelligence_layer.core import ( + FileTracer, + InMemoryTaskSpan, + Task, + TracerLogEntryFailed, +) +from tests.core.tracer.conftest import SpecificTestException + + +@fixture +def file_tracer(tmp_path: Path) -> FileTracer: + return FileTracer(tmp_path / "log.log") + + +def test_file_tracer_retrieves_all_file_traces( + file_tracer: FileTracer, tracer_test_task: Task[str, str] +) -> None: + input = "input" + + tracer_test_task.run(input, file_tracer) + tracer_test_task.run(input, file_tracer) + traces = file_tracer.traces() + assert len(traces.entries) == 2 + assert isinstance(traces.entries[0], InMemoryTaskSpan) + assert isinstance(traces.entries[1], InMemoryTaskSpan) + assert traces.entries[0].context.trace_id != traces.entries[1].context.trace_id + + +def test_file_tracer_handles_tracer_log_entry_failed_exception( + file_tracer: FileTracer, +) -> None: + file_tracer._log_entry = Mock( # type: ignore[method-assign] + side_effect=[TracerLogEntryFailed("Hi I am an error", "21"), None] + ) + + try: + file_tracer.task_span(task_name="mock_task_name", input="42", timestamp=None) + except Exception as exception: + assert False, f"'Unexpected exception: {exception}" + + +def test_file_tracer_raises_non_log_entry_failed_exceptions( + file_tracer: FileTracer, +) -> None: + file_tracer._log_entry = Mock( # type: ignore[method-assign] + side_effect=[SpecificTestException("Hi I am an error", "21")] + ) + with pytest.raises(SpecificTestException): + file_tracer.task_span(task_name="mock_task_name", input="42", timestamp=None) + + +def test_file_tracer_is_backwards_compatible() -> None: + current_file_location = Path(__file__) + file_tracer = FileTracer( + current_file_location.parent / "fixtures/old_file_trace_format.jsonl" + ) + tracer = file_tracer.traces() + + assert len(tracer.entries) == 1 + task_span = tracer.entries[0] + assert isinstance(task_span, InMemoryTaskSpan) + assert task_span.input == "input" + assert task_span.start_timestamp and task_span.end_timestamp + assert task_span.start_timestamp < task_span.end_timestamp diff --git a/tests/core/tracer/test_in_memory_tracer.py b/tests/core/tracer/test_in_memory_tracer.py new file mode 100644 index 000000000..cb1b31480 --- /dev/null +++ b/tests/core/tracer/test_in_memory_tracer.py @@ -0,0 +1,194 @@ +import contextlib +import os +from typing import Iterator + +import pytest + +from intelligence_layer.core import ( + ErrorValue, + InMemorySpan, + InMemoryTaskSpan, + InMemoryTracer, + LogEntry, + Task, + utc_now, +) + + +def test_trace_id_exists_for_all_children_of_task_span() -> None: + tracer = InMemoryTracer() + parent_span = tracer.task_span("child", "input") + parent_span.span("child2") + + assert isinstance(tracer.entries[0], InMemoryTaskSpan) + assert isinstance(tracer.entries[0].entries[0], InMemorySpan) + assert ( + tracer.entries[0].entries[0].context.trace_id + == tracer.entries[0].context.trace_id + ) + + parent_span.task_span("child3", "input") + assert isinstance(tracer.entries[0].entries[1], InMemoryTaskSpan) + assert ( + tracer.entries[0].entries[1].context.trace_id + == tracer.entries[0].context.trace_id + ) + + +def test_trace_id_exists_for_all_children_of_span() -> None: + tracer = InMemoryTracer() + parent_span = tracer.span("child") + parent_span.span("child2") + + assert isinstance(tracer.entries[0], InMemorySpan) + assert isinstance(tracer.entries[0].entries[0], InMemorySpan) + assert ( + tracer.entries[0].entries[0].context.trace_id + == tracer.entries[0].context.trace_id + ) + + parent_span.task_span("child3", "input") + assert isinstance(tracer.entries[0].entries[1], InMemorySpan) + assert ( + tracer.entries[0].entries[1].context.trace_id + == tracer.entries[0].context.trace_id + ) + + +def test_can_add_child_tracer() -> None: + tracer = InMemoryTracer() + tracer.span("child") + + assert len(tracer.entries) == 1 + + log = tracer.entries[0] + assert isinstance(log, InMemoryTracer) + assert log.name == "child" + assert len(log.entries) == 0 + + +def test_can_add_parent_and_child_entries() -> None: + parent = InMemoryTracer() + with parent.span("child") as child: + child.log("Two", 2) + + assert isinstance(parent.entries[0], InMemoryTracer) + assert isinstance(parent.entries[0].entries[0], LogEntry) + + +def test_task_logs_error_value() -> None: + tracer = InMemoryTracer() + + with pytest.raises(ValueError): + with tracer.span("failing task"): + raise ValueError("my bad, sorry") + + assert isinstance(tracer.entries[0], InMemorySpan) + assert isinstance(tracer.entries[0].entries[0], LogEntry) + error = tracer.entries[0].entries[0].value + assert isinstance(error, ErrorValue) + assert error.message == "my bad, sorry" + assert error.error_type == "ValueError" + assert error.stack_trace.startswith("Traceback") + + +def test_task_span_records_error_value() -> None: + tracer = InMemoryTracer() + + with pytest.raises(ValueError): + with tracer.task_span("failing task", None): + raise ValueError("my bad, sorry") + + assert isinstance(tracer.entries[0], InMemoryTaskSpan) + error = tracer.entries[0].output + assert isinstance(error, ErrorValue) + assert error.message == "my bad, sorry" + assert error.error_type == "ValueError" + assert error.stack_trace.startswith("Traceback") + + +def test_task_automatically_logs_input_and_output( + tracer_test_task: Task[str, str], +) -> None: + input = "input" + tracer = InMemoryTracer() + output = tracer_test_task.run(input=input, tracer=tracer) + + assert len(tracer.entries) == 1 + task_span = tracer.entries[0] + assert isinstance(task_span, InMemoryTaskSpan) + assert task_span.name == type(tracer_test_task).__name__ + assert task_span.input == input + assert task_span.output == output + assert task_span.start_timestamp and task_span.end_timestamp + assert task_span.start_timestamp < task_span.end_timestamp + + +def test_tracer_can_set_custom_start_time_for_log_entry() -> None: + tracer = InMemoryTracer() + timestamp = utc_now() + + with tracer.span("span") as span: + span.log("log", "message", timestamp) + + assert isinstance(tracer.entries[0], InMemorySpan) + assert isinstance(tracer.entries[0].entries[0], LogEntry) + assert tracer.entries[0].entries[0].timestamp == timestamp + + +def test_tracer_can_set_custom_start_time_for_span() -> None: + tracer = InMemoryTracer() + start = utc_now() + + span = tracer.span("span", start) + + assert span.start_timestamp == start + + +def test_span_sets_end_timestamp() -> None: + tracer = InMemoryTracer() + start = utc_now() + + span = tracer.span("span", start) + span.end() + + assert span.end_timestamp and span.start_timestamp <= span.end_timestamp + + +def test_span_only_updates_end_timestamp_once() -> None: + tracer = InMemoryTracer() + + span = tracer.span("span") + end = utc_now() + span.end(end) + span.end() + + assert span.end_timestamp == end + + +# take from and modified: https://stackoverflow.com/questions/2059482/temporarily-modify-the-current-processs-environment +@contextlib.contextmanager +def set_env(name: str, value: str | None) -> Iterator[None]: + old_environ = dict(os.environ) + if value is None: + if os.getenv(name, None) is not None: + os.environ.pop(name) + else: + os.environ[name] = value + try: + yield + finally: + os.environ.clear() + os.environ.update(old_environ) + + +def test_in_memory_tracer_trace_viewer_doesnt_crash_if_it_cant_reach_document_index() -> ( + None +): + # note that this test sets the environment variable, which might + # become a problem with multi-worker tests + ENV_VARIABLE_NAME = "TRACE_VIEWER_URL" + # ensure that the code works even with the variable is not set + with set_env(ENV_VARIABLE_NAME, None): + expected = InMemoryTracer() + expected._ipython_display_() diff --git a/tests/core/tracer/test_open_telemetry_tracer.py b/tests/core/tracer/test_open_telemetry_tracer.py new file mode 100644 index 000000000..212f1aa74 --- /dev/null +++ b/tests/core/tracer/test_open_telemetry_tracer.py @@ -0,0 +1,177 @@ +import json +import time +from typing import Any, Sequence +from uuid import uuid4 + +import pytest +import requests +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import ReadableSpan, TracerProvider +from opentelemetry.sdk.trace.export import ( + SimpleSpanProcessor, + SpanExporter, + SpanExportResult, +) +from pytest import fixture + +from intelligence_layer.core import OpenTelemetryTracer, SpanType, Task + + +class DummyExporter(SpanExporter): + def __init__(self) -> None: + self.spans: list[ReadableSpan] = [] + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + self.spans.extend(spans) + return SpanExportResult.SUCCESS + + def shutdown(self) -> None: + self.spans = [] + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + +@fixture() +def exporter() -> DummyExporter: + return DummyExporter() + + +@fixture(scope="module") +def service_name() -> str: + return "test-service" + + +@fixture(scope="module") +def trace_provider(service_name: str) -> TracerProvider: + resource = Resource.create({SERVICE_NAME: service_name}) + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) + return provider + + +@fixture() +def test_opentelemetry_tracer( + exporter: DummyExporter, trace_provider: TracerProvider +) -> OpenTelemetryTracer: + processor = SimpleSpanProcessor(exporter) + trace_provider.add_span_processor(processor) + tracer = OpenTelemetryTracer(trace.get_tracer("intelligence-layer")) + return tracer + + +@fixture +def jaeger_compatible_tracer(trace_provider: TracerProvider) -> OpenTelemetryTracer: + processor = SimpleSpanProcessor(OTLPSpanExporter()) + trace_provider.add_span_processor(processor) + tracer = OpenTelemetryTracer(trace.get_tracer("intelligence-layer")) + + return tracer + + +def test_open_telemetry_tracer_has_consistent_trace_id( + test_opentelemetry_tracer: OpenTelemetryTracer, + exporter: DummyExporter, + tracer_test_task: Task[str, str], +) -> None: + tracer_test_task.run("test-input", test_opentelemetry_tracer) + spans = exporter.spans + assert len(spans) == 4 + assert len(set(span.context.trace_id for span in spans)) == 1 + + +def test_open_telemetry_tracer_sets_attributes_correctly( + test_opentelemetry_tracer: OpenTelemetryTracer, + exporter: DummyExporter, + tracer_test_task: Task[str, str], +) -> None: + tracer_test_task.run("test-input", test_opentelemetry_tracer) + spans = exporter.spans + assert len(spans) == 4 + spans_sorted_by_start: list[ReadableSpan] = sorted( + spans, key=lambda span: span.start_time if span.start_time else 0 + ) + assert spans_sorted_by_start[0].attributes is not None + assert spans_sorted_by_start[0].name == "TracerTestTask" + assert spans_sorted_by_start[0].attributes["input"] == '"test-input"' + assert spans_sorted_by_start[0].attributes["output"] == '"output"' + assert spans_sorted_by_start[0].attributes["type"] == SpanType.TASK_SPAN.value + assert spans_sorted_by_start[0].status.is_ok + + assert spans_sorted_by_start[1].attributes is not None + assert spans_sorted_by_start[1].name == "span" + assert "input" not in spans_sorted_by_start[1].attributes.keys() + assert spans_sorted_by_start[1].attributes["type"] == SpanType.SPAN.value + assert spans_sorted_by_start[1].status.is_ok + + assert spans_sorted_by_start[2].attributes is not None + assert spans_sorted_by_start[2].name == "TracerTestSubTask" + assert spans_sorted_by_start[2].attributes["input"] == "null" + assert spans_sorted_by_start[2].attributes["output"] == "null" + assert spans_sorted_by_start[2].attributes["type"] == SpanType.TASK_SPAN.value + assert spans_sorted_by_start[2].status.is_ok + + assert spans_sorted_by_start[3].attributes is not None + assert spans_sorted_by_start[3].name == "TracerTestSubTask" + assert spans_sorted_by_start[3].attributes["input"] == "null" + assert spans_sorted_by_start[3].attributes["output"] == "null" + assert spans_sorted_by_start[3].attributes["type"] == SpanType.TASK_SPAN.value + assert spans_sorted_by_start[3].status.is_ok + + spans_sorted_by_end: list[ReadableSpan] = sorted( + spans_sorted_by_start, key=lambda span: span.end_time if span.end_time else 0 + ) + + assert spans_sorted_by_end[0] == spans_sorted_by_start[2] + assert spans_sorted_by_end[1] == spans_sorted_by_start[1] + assert spans_sorted_by_end[2] == spans_sorted_by_start[3] + assert spans_sorted_by_end[3] == spans_sorted_by_start[0] + + +def test_open_telemetry_tracer_logs_error_code_correctly( + test_opentelemetry_tracer: OpenTelemetryTracer, + exporter: DummyExporter, + tracer_test_task: Task[str, str], +) -> None: + with pytest.raises(ValueError): + with test_opentelemetry_tracer.span("failing task"): + raise ValueError("my bad, sorry") + + spans = exporter.spans + assert len(spans) == 1 + assert not spans[0].status.is_ok + + +def has_span_with_input(trace: Any, input_value: str) -> bool: + return any( + tag["key"] == "input" and tag["value"] == f'"{input_value}"' + for span in trace["spans"] + for tag in span["tags"] + ) + + +def get_current_traces(tracing_service: str) -> Any: + response = requests.get(tracing_service) + response_text = json.loads(response.text) + return response_text["data"] + + +@pytest.mark.docker +def test_open_telemetry_tracer_works_with_jaeger( + jaeger_compatible_tracer: OpenTelemetryTracer, + tracer_test_task: Task[str, str], + service_name: str, +) -> None: + url = "http://localhost:16686/api/traces?service=" + service_name + input_value = str(uuid4()) + tracer_test_task.run(input_value, jaeger_compatible_tracer) + # the processor needs time to submit the trace to jaeger + time.sleep(1) + + res = get_current_traces(url) + + test_res = [trace_ for trace_ in res if has_span_with_input(trace_, input_value)] + + assert len(test_res) == 1 diff --git a/tests/core/tracer/test_tracer.py b/tests/core/tracer/test_tracer.py new file mode 100644 index 000000000..b055ca177 --- /dev/null +++ b/tests/core/tracer/test_tracer.py @@ -0,0 +1,250 @@ +import pytest +from pydantic import BaseModel +from pytest import fixture + +from intelligence_layer.core import ( + CompositeTracer, + FileTracer, + InMemoryTracer, + SpanStatus, + SpanType, + TaskSpanAttributes, + Tracer, + utc_now, +) +from tests.core.tracer.conftest import SpecificTestException + + +class DummyObject(BaseModel): + content: str + + +@fixture +def composite_tracer( + in_memory_tracer: InMemoryTracer, file_tracer: FileTracer +) -> CompositeTracer[Tracer]: + return CompositeTracer(tracers=[in_memory_tracer, file_tracer]) + + +tracer_fixtures = ["in_memory_tracer", "file_tracer", "composite_tracer"] + + +@pytest.mark.parametrize( + "tracer_fixture", + tracer_fixtures, +) +def test_tracer_exports_spans_to_unified_format( + tracer_fixture: str, + request: pytest.FixtureRequest, +) -> None: + tracer: Tracer = request.getfixturevalue(tracer_fixture) + dummy_object = DummyObject(content="cool") + with tracer.span("name") as temp_span: + temp_span.log("test", dummy_object) + + unified_format = tracer.export_for_viewing() + + assert len(unified_format) == 1 + span = unified_format[0] + assert span.name == "name" + assert span.start_time < span.end_time < utc_now() + assert span.attributes.type == SpanType.SPAN + assert span.status == SpanStatus.OK + + assert len(span.events) == 1 + log = span.events[0] + assert log.message == "test" + assert ( + log.body == dummy_object or DummyObject.model_validate(log.body) == dummy_object + ) + assert span.start_time < log.timestamp < span.end_time + + +@pytest.mark.parametrize( + "tracer_fixture", + tracer_fixtures, +) +def test_tracer_exports_task_spans_to_unified_format( + tracer_fixture: str, + request: pytest.FixtureRequest, +) -> None: + tracer: Tracer = request.getfixturevalue(tracer_fixture) + + with tracer.task_span("name", "input") as task_span: + task_span.record_output("output") + + unified_format = tracer.export_for_viewing() + + assert len(unified_format) == 1 + span = unified_format[0] + assert span.name == "name" + assert span.parent_id is None + assert span.start_time < span.end_time < utc_now() + assert span.attributes.type == SpanType.TASK_SPAN + assert isinstance(span.attributes, TaskSpanAttributes) # for mypy + assert span.attributes.input == "input" + assert span.attributes.output == "output" + assert span.status == SpanStatus.OK + assert span.context.trace_id == span.context.span_id + + +@pytest.mark.parametrize( + "tracer_fixture", + tracer_fixtures, +) +def test_tracer_exports_error_correctly( + tracer_fixture: str, + request: pytest.FixtureRequest, +) -> None: + tracer: Tracer = request.getfixturevalue(tracer_fixture) + + try: + with tracer.span("name"): + raise SpecificTestException + except SpecificTestException: + pass + unified_format = tracer.export_for_viewing() + + assert len(unified_format) == 1 + span = unified_format[0] + assert span.name == "name" + assert span.parent_id is None + assert span.start_time < span.end_time < utc_now() + assert span.attributes.type == SpanType.SPAN + assert span.status == SpanStatus.ERROR + + +@pytest.mark.parametrize( + "tracer_fixture", + tracer_fixtures, +) +def test_tracer_export_nests_correctly( + tracer_fixture: str, + request: pytest.FixtureRequest, +) -> None: + tracer: Tracer = request.getfixturevalue(tracer_fixture) + + with tracer.span("name") as parent_span: + with parent_span.span("name-2") as child_span: + child_span.log("", value="") + + unified_format = tracer.export_for_viewing() + + assert len(unified_format) == 2 + parent, child = unified_format[0], unified_format[1] + if parent.parent_id is not None: + parent, child = child, parent + assert parent.name == "name" + assert parent.parent_id is None + assert parent.end_time >= child.end_time + assert parent.start_time <= child.start_time + assert child.name == "name-2" + assert child.parent_id == parent.context.span_id + assert len(child.events) == 1 + assert len(parent.events) == 0 + assert child.context.trace_id == parent.context.trace_id + assert child.context.span_id != parent.context.span_id + + +@pytest.mark.parametrize( + "tracer_fixture", + tracer_fixtures, +) +def test_tracer_exports_unrelated_spans_correctly( + tracer_fixture: str, + request: pytest.FixtureRequest, +) -> None: + tracer: Tracer = request.getfixturevalue(tracer_fixture) + + tracer.span("name").end() + tracer.span("name-2").end() + + unified_format = tracer.export_for_viewing() + + assert len(unified_format) == 2 + span_1, span_2 = unified_format[0], unified_format[1] + + assert span_1.parent_id is None + assert span_2.parent_id is None + + assert span_1.context.trace_id != span_2.context.trace_id + + +@pytest.mark.parametrize( + "tracer_fixture", + tracer_fixtures, +) +def test_tracer_raises_if_open_span_is_exported( + tracer_fixture: str, + request: pytest.FixtureRequest, +) -> None: + tracer: Tracer = request.getfixturevalue(tracer_fixture) + + with tracer.span("name") as root_span: + child_span = root_span.span("name-2") + child_span.log("test_message", "test_body") + + with pytest.raises(RuntimeError): + child_span.export_for_viewing() + + +@pytest.mark.skip("Not yet implemented") +@pytest.mark.parametrize( + "tracer_fixture", + tracer_fixtures, +) +def test_spans_cannot_be_closed_twice( + tracer_fixture: str, + request: pytest.FixtureRequest, +) -> None: + tracer: Tracer = request.getfixturevalue(tracer_fixture) + + span = tracer.span("name") + span.end() + span.end() + + +@pytest.mark.parametrize( + "tracer_fixture", + tracer_fixtures, +) +def test_spans_cannot_be_used_as_context_twice( + tracer_fixture: str, + request: pytest.FixtureRequest, +) -> None: + tracer: Tracer = request.getfixturevalue(tracer_fixture) + + span = tracer.span("name") + with span: + pass + with pytest.raises(Exception): + with span: + pass + + +@pytest.mark.skip("Not yet implemented") +@pytest.mark.parametrize( + "tracer_fixture", + tracer_fixtures, +) +def test_tracer_can_not_log_on_closed_span( + tracer_fixture: str, + request: pytest.FixtureRequest, +) -> None: + tracer: Tracer = request.getfixturevalue(tracer_fixture) + + span = tracer.span("name") + # ok + span.log("test_message", "test_body") + span.end() + # not ok + with pytest.raises(Exception): + span.log("test_message", "test_body") + + span = tracer.span("name") + # ok + with span: + span.log("test_message", "test_body") + # not ok + with pytest.raises(Exception): + span.log("test_message", "test_body") diff --git a/tests/evaluation/test_domain.py b/tests/evaluation/test_domain.py index e4db9d584..df2ef0edc 100644 --- a/tests/evaluation/test_domain.py +++ b/tests/evaluation/test_domain.py @@ -1,6 +1,7 @@ from pytest import raises -from intelligence_layer.core import InMemorySpan, InMemoryTaskSpan, LogEntry, utc_now +from intelligence_layer.core import utc_now +from intelligence_layer.core.tracer.in_memory_tracer import InMemoryTracer from intelligence_layer.evaluation import ( AggregationOverview, EvaluationFailed, @@ -14,22 +15,13 @@ def test_to_trace_entry() -> None: now = utc_now() - entry = _to_trace_entry( - InMemoryTaskSpan( - name="task", - input="input", - output="output", - start_timestamp=now, - end_timestamp=now, - entries=[ - LogEntry(message="message", value="value", trace_id="ID"), - InMemorySpan( - name="span", start_timestamp=now, end_timestamp=now, trace_id="ID" - ), - ], - trace_id="ID", - ) - ) + span = InMemoryTracer().task_span("task", timestamp=now, input="input") + span.span("span", now).end(now) + span.log(message="message", value="value", timestamp=now) + span.record_output("output") + span.end(now) + + entry = _to_trace_entry(span) assert entry == TaskSpanTrace( name="task", @@ -38,8 +30,8 @@ def test_to_trace_entry() -> None: start=now, end=now, traces=[ - LogTrace(message="message", value="value"), SpanTrace(name="span", traces=[], start=now, end=now), + LogTrace(message="message", value="value"), ], ) @@ -49,7 +41,10 @@ def test_deserialize_task_trace() -> None: name="task", start=utc_now(), end=utc_now(), - traces=[], + traces=[ + SpanTrace(name="span", traces=[], start=utc_now(), end=utc_now()), + LogTrace(message="message", value="value"), + ], input=[{"a": "b"}], output=["c"], )