Skip to content

Commit

Permalink
Adapt File Tracer
Browse files Browse the repository at this point in the history
  • Loading branch information
FlorianSchepersAA committed May 23, 2024
1 parent 4bb625b commit 2a7b18d
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 94 deletions.
6 changes: 2 additions & 4 deletions src/intelligence_layer/core/task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from typing import Generic, Iterable, Optional, Sequence, TypeVar, final
from typing import Generic, Iterable, Sequence, TypeVar, final

from pydantic import BaseModel

Expand Down Expand Up @@ -58,9 +58,7 @@ def do_run(self, input: Input, task_span: TaskSpan) -> Output:
...

@final
def run(
self, input: Input, tracer: Tracer, trace_id: Optional[str] = None
) -> Output:
def run(self, input: Input, tracer: Tracer) -> Output:
"""Executes the implementation of `do_run` for this use case.
This takes an input and runs the implementation to generate an output.
Expand Down
21 changes: 19 additions & 2 deletions src/intelligence_layer/core/tracer/composite_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from typing import Generic, Optional, Sequence, TypeVar

from intelligence_layer.core.tracer.tracer import (
Context,
ExportedSpan,
PydanticSerializable,
Span,
TaskSpan,
Tracer,
utc_now,
Context
)

TracerVar = TypeVar("TracerVar", bound=Tracer)
Expand Down Expand Up @@ -73,7 +73,10 @@ class CompositeSpan(Generic[SpanVar], CompositeTracer[SpanVar], Span):
Args:
tracers: spans that will be forwarded all subsequent log and span calls.
"""
def __init__(self, tracers: Sequence[SpanVar],context: Optional[Context] = None) -> None:

def __init__(
self, tracers: Sequence[SpanVar], context: Optional[Context] = None
) -> None:
CompositeTracer.__init__(self, tracers)
Span.__init__(self, context=context)

Expand All @@ -92,6 +95,20 @@ def end(self, timestamp: Optional[datetime] = None) -> None:
for tracer in self.tracers:
tracer.end(timestamp)

@property
def status_code(self):
status_codes = {tracer.status_code for tracer in self.tracers}
if len(status_codes) > 1:
raise ValueError(
"Inconsistent status of traces in composite tracer. Status of all traces should be the same but they are different."
)
return next(iter(status_codes))

@status_code.setter
def status_code(self, value):
for tracer in self.tracers:
tracer.status_code = value


class CompositeTaskSpan(CompositeSpan[TaskSpan], TaskSpan):
"""A :class:`TaskSpan` that allows for recording to multiple TaskSpans simultaneously.
Expand Down
23 changes: 4 additions & 19 deletions src/intelligence_layer/core/tracer/file_tracer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import datetime
from json import loads
from pathlib import Path
from typing import Optional, Sequence
from typing import Optional

from pydantic import BaseModel

Expand All @@ -11,22 +11,8 @@
PersistentTaskSpan,
PersistentTracer,
)
from intelligence_layer.core.tracer.tracer import LogLine, PydanticSerializable
from intelligence_layer.core.tracer.tracer import (
Context,
Event,
ExportedSpan,
ExportedSpanList,
LogEntry,
PydanticSerializable,
Span,
SpanAttributes,
TaskSpan,
TaskSpanAttributes,
Tracer,
_render_log_value,
utc_now,
)
from intelligence_layer.core.tracer.tracer import Context, LogLine, PydanticSerializable


class FileTracer(PersistentTracer):
"""A `Tracer` that logs to a file.
Expand Down Expand Up @@ -91,16 +77,15 @@ def trace(self, trace_id: Optional[str] = None) -> InMemoryTracer:
return self._parse_log(filtered_traces)



class FileSpan(PersistentSpan, FileTracer):
"""A `Span` created by `FileTracer.span`."""

def __init__(self, log_file_path: Path, context: Optional[Context] = None) -> None:
PersistentSpan.__init__(self, context=context)
FileTracer.__init__(self, log_file_path=log_file_path)



class FileTaskSpan(PersistentTaskSpan, FileSpan):
"""A `TaskSpan` created by `FileTracer.task_span`."""

pass
21 changes: 16 additions & 5 deletions src/intelligence_layer/core/tracer/in_memory_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import requests
import rich
from pydantic import BaseModel, Field, SerializeAsAny
from pydantic import SerializeAsAny
from requests import HTTPError
from rich.tree import Tree

Expand Down Expand Up @@ -157,6 +157,7 @@ def log(
def end(self, timestamp: Optional[datetime] = None) -> None:
if not self.end_timestamp:
self.end_timestamp = timestamp or utc_now()
super().end(timestamp)

def _rich_render_(self) -> Tree:
"""Renders the trace via classes in the `rich` package"""
Expand All @@ -171,6 +172,10 @@ def _span_attributes(self) -> SpanAttributes:
return SpanAttributes()

def export_for_viewing(self) -> Sequence[ExportedSpan]:
if not self._closed:
raise RuntimeError(
"Span is not closed. A Span must be closed before it is exported for viewing."
)
logs: list[LogEntry] = []
exported_spans: list[ExportedSpan] = []
for entry in self.entries:
Expand Down Expand Up @@ -246,9 +251,11 @@ def start_task(self, log_line: LogLine) -> None:
name=start_task.name,
input=start_task.input,
start_timestamp=start_task.start,
context=Context(
context=Context(
trace_id=start_task.trace_id, span_id=str(start_task.parent)
) if start_task.trace_id != str(start_task.uuid) else None
)
if start_task.trace_id != str(start_task.uuid)
else None,
)
# if root, also change the trace id
if converted_span.context.trace_id == converted_span.context.span_id:
Expand All @@ -262,6 +269,7 @@ def end_task(self, log_line: LogLine) -> None:
end_task = EndTask.model_validate(log_line.entry)
task_span = self.tasks[end_task.uuid]
task_span.record_output(end_task.output)
task_span.status_code = end_task.status_code
task_span.end(end_task.end)

def start_span(self, log_line: LogLine) -> None:
Expand All @@ -271,20 +279,23 @@ def start_span(self, log_line: LogLine) -> None:
start_timestamp=start_span.start,
context=Context(
trace_id=start_span.trace_id, span_id=str(start_span.parent)
) if start_span.trace_id != str(start_span.uuid) else None
)
if start_span.trace_id != str(start_span.uuid)
else None,
)
# if root, also change the trace id
if converted_span.context.trace_id == converted_span.context.span_id:
converted_span.context.trace_id = str(start_span.uuid)
converted_span.context.span_id = str(start_span.uuid)

self.tracers.get(start_span.parent, self.root).entries.append(converted_span)
self.tracers[start_span.uuid] = converted_span
self.spans[start_span.uuid] = converted_span

def end_span(self, log_line: LogLine) -> None:
end_span = EndSpan.model_validate(log_line.entry)
span = self.spans[end_span.uuid]
span.status_code = end_span.status_code
span.end(end_span.end)

def plain_entry(self, log_line: LogLine) -> None:
Expand Down
11 changes: 9 additions & 2 deletions src/intelligence_layer/core/tracer/open_telemetry_tracer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from datetime import datetime
from typing import Optional
from typing import Optional, Sequence

from opentelemetry.context import attach, detach
from opentelemetry.trace import Span as OpenTSpan
from opentelemetry.trace import Tracer as OpenTTracer
from opentelemetry.trace import set_span_in_context

from intelligence_layer.core.tracer.tracer import (
ExportedSpan,
LogEntry,
PydanticSerializable,
Span,
TaskSpan,
Expand All @@ -19,7 +21,7 @@ class OpenTelemetryTracer(Tracer):
"""A `Tracer` that uses open telemetry."""

def __init__(self, tracer: OpenTTracer) -> None:
self._tracer = tracer
self.O_tracer = tracer

def span(
self,
Expand All @@ -34,6 +36,7 @@ def span(
start_time=None if not timestamp else _open_telemetry_timestamp(timestamp),
)
token = attach(set_span_in_context(tracer_span))
self._tracer
return OpenTelemetrySpan(tracer_span, self._tracer, token, trace_id)

def task_span(
Expand All @@ -52,6 +55,9 @@ def task_span(
)
token = attach(set_span_in_context(tracer_span))
return OpenTelemetryTaskSpan(tracer_span, self._tracer, token, trace_id)

def export_for_viewing(self) -> Sequence[ExportedSpan]:
raise NotImplementedError("The OpenTelemetryTracer does not support export for viewing, as it can not acces its own traces.")


class OpenTelemetrySpan(Span, OpenTelemetryTracer):
Expand Down Expand Up @@ -87,6 +93,7 @@ def end(self, timestamp: Optional[datetime] = None) -> None:
self.open_ts_span.end(
_open_telemetry_timestamp(timestamp) if timestamp is not None else None
)
super().end(timestamp)


class OpenTelemetryTaskSpan(TaskSpan, OpenTelemetrySpan):
Expand Down
42 changes: 22 additions & 20 deletions src/intelligence_layer/core/tracer/persistent_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from intelligence_layer.core.tracer.tracer import (
EndSpan,
EndTask,
ExportedSpan,
LogLine,
PlainEntry,
PydanticSerializable,
Expand All @@ -20,26 +21,11 @@
utc_now,
)

from intelligence_layer.core.tracer.tracer import (
Context,
Event,
ExportedSpan,
ExportedSpanList,
LogEntry,
PydanticSerializable,
Span,
SpanAttributes,
TaskSpan,
TaskSpanAttributes,
Tracer,
_render_log_value,
utc_now,
)

class PersistentTracer(Tracer, ABC):
def __init__(self) -> None:
self.current_id = uuid4()

@abstractmethod
def _log_entry(self, id: str, entry: BaseModel) -> None:
pass
Expand Down Expand Up @@ -77,7 +63,9 @@ def _log_task(
task_span.context.trace_id,
StartTask(
uuid=task_span.context.span_id,
parent=self.context.span_id if self.context else task_span.context.trace_id,
parent=self.context.span_id
if self.context
else task_span.context.trace_id,
name=task_name,
start=timestamp or utc_now(),
input=input,
Expand All @@ -89,7 +77,9 @@ def _log_task(
task_span.context.trace_id,
StartTask(
uuid=task_span.context.span_id,
parent=self.context.span_id if self.context else task_span.context.trace_id,
parent=self.context.span_id
if self.context
else task_span.context.trace_id,
name=task_name,
start=timestamp or utc_now(),
input=error.description,
Expand Down Expand Up @@ -151,7 +141,14 @@ def log(
def end(self, timestamp: Optional[datetime] = None) -> None:
if not self.end_timestamp:
self.end_timestamp = timestamp or utc_now()
self._log_entry(self.context.trace_id, EndSpan(uuid=self.context.span_id, end=self.end_timestamp))
self._log_entry(
self.context.trace_id,
EndSpan(
uuid=self.context.span_id,
end=self.end_timestamp,
status_code=self.status_code,
),
)


class PersistentTaskSpan(TaskSpan, PersistentSpan, ABC):
Expand All @@ -165,7 +162,12 @@ def end(self, timestamp: Optional[datetime] = None) -> None:
self.end_timestamp = timestamp or utc_now()
self._log_entry(
self.context.trace_id,
EndTask(uuid=self.context.span_id, end=self.end_timestamp, output=self.output),
EndTask(
uuid=self.context.span_id,
end=self.end_timestamp,
output=self.output,
status_code=self.status_code,
),
)


Expand Down
8 changes: 5 additions & 3 deletions src/intelligence_layer/core/tracer/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class Span(Tracer, AbstractContextManager["Span"]):
"""

def __init__(self, context: Optional[Context] = None):
#super().__init__()
# super().__init__()
span_id = str(uuid4())
if context is None:
trace_id = span_id
Expand All @@ -216,7 +216,7 @@ def __init__(self, context: Optional[Context] = None):

def __enter__(self) -> Self:
if self._closed:
raise ValueError("Spans cannot be opened once they have been close.")
raise ValueError("Spans cannot be opened once they have been closed.")
return self

@abstractmethod
Expand Down Expand Up @@ -252,7 +252,7 @@ def end(self, timestamp: Optional[datetime] = None) -> None:
Args:
timestamp: Optional override of the timestamp, otherwise should be set to now.
"""
...
self._closed = True

def ensure_id(self, id: str | None) -> str:
"""Returns a valid id for tracing.
Expand Down Expand Up @@ -444,6 +444,7 @@ class EndTask(BaseModel):
uuid: UUID
end: datetime
output: SerializeAsAny[PydanticSerializable]
status_code: SpanStatus = SpanStatus.OK


class StartSpan(BaseModel):
Expand Down Expand Up @@ -475,6 +476,7 @@ class EndSpan(BaseModel):

uuid: UUID
end: datetime
status_code: SpanStatus = SpanStatus.OK


class PlainEntry(BaseModel):
Expand Down
11 changes: 11 additions & 0 deletions tests/core/tracer/fixtures/old_file_trace_format.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartTask","entry":{"uuid":"41528209-1b78-4785-a00d-7f65af1bb09c","parent":"75e79a11-1a26-4731-8b49-ef8634c352ed","name":"TestTask","start":"2024-05-22T09:43:37.428758Z","input":"input","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartSpan","entry":{"uuid":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","parent":"41528209-1b78-4785-a00d-7f65af1bb09c","name":"span","start":"2024-05-22T09:43:37.429448Z","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"PlainEntry","entry":{"message":"message","value":"a value","timestamp":"2024-05-22T09:43:37.429503Z","parent":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartTask","entry":{"uuid":"e8cca541-57a8-440a-b848-7c3b33a97f52","parent":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","name":"TestSubTask","start":"2024-05-22T09:43:37.429561Z","input":null,"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"PlainEntry","entry":{"message":"subtask","value":"value","timestamp":"2024-05-22T09:43:37.429605Z","parent":"e8cca541-57a8-440a-b848-7c3b33a97f52","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndTask","entry":{"uuid":"e8cca541-57a8-440a-b848-7c3b33a97f52","end":"2024-05-22T09:43:37.429647Z","output":null}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndSpan","entry":{"uuid":"ad1ed79b-6ad6-4ea5-8ee8-26be4055e228","end":"2024-05-22T09:43:37.429687Z"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"StartTask","entry":{"uuid":"8840185c-2019-4105-9178-1b0e20ab6388","parent":"41528209-1b78-4785-a00d-7f65af1bb09c","name":"TestSubTask","start":"2024-05-22T09:43:37.429728Z","input":null,"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"PlainEntry","entry":{"message":"subtask","value":"value","timestamp":"2024-05-22T09:43:37.429768Z","parent":"8840185c-2019-4105-9178-1b0e20ab6388","trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a"}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndTask","entry":{"uuid":"8840185c-2019-4105-9178-1b0e20ab6388","end":"2024-05-22T09:43:37.429806Z","output":null}}
{"trace_id":"00b96d92-d1d1-454a-b8e9-f67e8541759a","entry_type":"EndTask","entry":{"uuid":"41528209-1b78-4785-a00d-7f65af1bb09c","end":"2024-05-22T09:43:37.429842Z","output":"output"}}
Loading

0 comments on commit 2a7b18d

Please sign in to comment.