Skip to content

Commit

Permalink
Merge branch 'main' into single-page-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Kludex authored Dec 20, 2024
2 parents 744bb9f + 5eced1c commit e693bb6
Show file tree
Hide file tree
Showing 9 changed files with 413 additions and 116 deletions.
6 changes: 2 additions & 4 deletions logfire-api/logfire_api/_internal/config.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ from .exporters.quiet_metrics import QuietMetricExporter as QuietMetricExporter
from .exporters.remove_pending import RemovePendingSpansExporter as RemovePendingSpansExporter
from .exporters.test import TestExporter as TestExporter
from .integrations.executors import instrument_executors as instrument_executors
from .main import FastLogfireSpan as FastLogfireSpan, Logfire as Logfire, LogfireSpan as LogfireSpan
from .main import Logfire as Logfire
from .metrics import ProxyMeterProvider as ProxyMeterProvider
from .scrubbing import BaseScrubber as BaseScrubber, NOOP_SCRUBBER as NOOP_SCRUBBER, Scrubber as Scrubber, ScrubbingOptions as ScrubbingOptions
from .stack_info import warn_at_user_stacklevel as warn_at_user_stacklevel
from .tracer import PendingSpanProcessor as PendingSpanProcessor, ProxyTracerProvider as ProxyTracerProvider
from .tracer import OPEN_SPANS as OPEN_SPANS, PendingSpanProcessor as PendingSpanProcessor, ProxyTracerProvider as ProxyTracerProvider
from .utils import SeededRandomIdGenerator as SeededRandomIdGenerator, UnexpectedResponse as UnexpectedResponse, ensure_data_dir_exists as ensure_data_dir_exists, handle_internal_errors as handle_internal_errors, read_toml_file as read_toml_file, suppress_instrumentation as suppress_instrumentation
from _typeshed import Incomplete
from dataclasses import dataclass
Expand All @@ -30,9 +30,7 @@ from opentelemetry.sdk.trace.id_generator import IdGenerator
from pathlib import Path
from typing import Any, Callable, Literal, Sequence, TypedDict
from typing_extensions import Self, Unpack
from weakref import WeakSet

OPEN_SPANS: WeakSet[LogfireSpan | FastLogfireSpan]
CREDENTIALS_FILENAME: str
COMMON_REQUEST_HEADERS: Incomplete
PROJECT_NAME_PATTERN: str
Expand Down
10 changes: 5 additions & 5 deletions logfire-api/logfire_api/_internal/main.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import requests
from . import async_ as async_
from ..version import VERSION as VERSION
from .auto_trace import AutoTraceModule as AutoTraceModule, install_auto_tracing as install_auto_tracing
from .config import GLOBAL_CONFIG as GLOBAL_CONFIG, LogfireConfig as LogfireConfig, OPEN_SPANS as OPEN_SPANS
from .config import GLOBAL_CONFIG as GLOBAL_CONFIG, LogfireConfig as LogfireConfig
from .config_params import PydanticPluginRecordValues as PydanticPluginRecordValues
from .constants import ATTRIBUTES_JSON_SCHEMA_KEY as ATTRIBUTES_JSON_SCHEMA_KEY, ATTRIBUTES_LOG_LEVEL_NUM_KEY as ATTRIBUTES_LOG_LEVEL_NUM_KEY, ATTRIBUTES_MESSAGE_KEY as ATTRIBUTES_MESSAGE_KEY, ATTRIBUTES_MESSAGE_TEMPLATE_KEY as ATTRIBUTES_MESSAGE_TEMPLATE_KEY, ATTRIBUTES_SAMPLE_RATE_KEY as ATTRIBUTES_SAMPLE_RATE_KEY, ATTRIBUTES_SPAN_TYPE_KEY as ATTRIBUTES_SPAN_TYPE_KEY, ATTRIBUTES_TAGS_KEY as ATTRIBUTES_TAGS_KEY, ATTRIBUTES_VALIDATION_ERROR_KEY as ATTRIBUTES_VALIDATION_ERROR_KEY, DISABLE_CONSOLE_KEY as DISABLE_CONSOLE_KEY, LEVEL_NUMBERS as LEVEL_NUMBERS, LevelName as LevelName, NULL_ARGS_KEY as NULL_ARGS_KEY, OTLP_MAX_INT_SIZE as OTLP_MAX_INT_SIZE, log_level_attributes as log_level_attributes
from .constants import ATTRIBUTES_JSON_SCHEMA_KEY as ATTRIBUTES_JSON_SCHEMA_KEY, ATTRIBUTES_LOG_LEVEL_NUM_KEY as ATTRIBUTES_LOG_LEVEL_NUM_KEY, ATTRIBUTES_MESSAGE_KEY as ATTRIBUTES_MESSAGE_KEY, ATTRIBUTES_MESSAGE_TEMPLATE_KEY as ATTRIBUTES_MESSAGE_TEMPLATE_KEY, ATTRIBUTES_SAMPLE_RATE_KEY as ATTRIBUTES_SAMPLE_RATE_KEY, ATTRIBUTES_SPAN_TYPE_KEY as ATTRIBUTES_SPAN_TYPE_KEY, ATTRIBUTES_TAGS_KEY as ATTRIBUTES_TAGS_KEY, DISABLE_CONSOLE_KEY as DISABLE_CONSOLE_KEY, LEVEL_NUMBERS as LEVEL_NUMBERS, LevelName as LevelName, NULL_ARGS_KEY as NULL_ARGS_KEY, OTLP_MAX_INT_SIZE as OTLP_MAX_INT_SIZE, log_level_attributes as log_level_attributes
from .formatter import logfire_format as logfire_format, logfire_format_with_magic as logfire_format_with_magic
from .instrument import instrument as instrument
from .integrations.asgi import ASGIApp as ASGIApp, ASGIInstrumentKwargs as ASGIInstrumentKwargs
Expand All @@ -29,7 +29,7 @@ from .json_encoder import logfire_json_dumps as logfire_json_dumps
from .json_schema import JsonSchemaProperties as JsonSchemaProperties, attributes_json_schema as attributes_json_schema, attributes_json_schema_properties as attributes_json_schema_properties, create_json_schema as create_json_schema
from .metrics import ProxyMeterProvider as ProxyMeterProvider
from .stack_info import get_user_stack_info as get_user_stack_info
from .tracer import ProxyTracerProvider as ProxyTracerProvider
from .tracer import ProxyTracerProvider as ProxyTracerProvider, record_exception as record_exception, set_exception_status as set_exception_status
from .utils import SysExcInfo as SysExcInfo, get_version as get_version, handle_internal_errors as handle_internal_errors, log_internal_error as log_internal_error, uniquify_sequence as uniquify_sequence
from django.http import HttpRequest as HttpRequest, HttpResponse as HttpResponse
from fastapi import FastAPI
Expand Down Expand Up @@ -1034,7 +1034,7 @@ class LogfireSpan(ReadableSpan):
def message(self) -> str: ...
@message.setter
def message(self, message: str): ...
def end(self) -> None:
def end(self, end_time: int | None = None) -> None:
"""Sets the current time as the span's end time.
The span's end time is the wall time at which the operation finished.
Expand Down Expand Up @@ -1093,7 +1093,7 @@ class NoopSpan:
def is_recording(self) -> bool: ...
AttributesValueType = TypeVar('AttributesValueType', bound=Any | otel_types.AttributeValue)

def user_attributes(attributes: dict[str, Any]) -> dict[str, otel_types.AttributeValue]:
def prepare_otlp_attributes(attributes: dict[str, Any]) -> dict[str, otel_types.AttributeValue]:
"""Prepare attributes for sending to OpenTelemetry.
This will convert any non-OpenTelemetry compatible types to JSON.
Expand Down
15 changes: 11 additions & 4 deletions logfire-api/logfire_api/_internal/tracer.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import opentelemetry.trace as trace_api
from .config import LogfireConfig as LogfireConfig
from .constants import ATTRIBUTES_MESSAGE_KEY as ATTRIBUTES_MESSAGE_KEY, ATTRIBUTES_PENDING_SPAN_REAL_PARENT_KEY as ATTRIBUTES_PENDING_SPAN_REAL_PARENT_KEY, ATTRIBUTES_SAMPLE_RATE_KEY as ATTRIBUTES_SAMPLE_RATE_KEY, ATTRIBUTES_SPAN_TYPE_KEY as ATTRIBUTES_SPAN_TYPE_KEY, PENDING_SPAN_NAME_SUFFIX as PENDING_SPAN_NAME_SUFFIX
from .constants import ATTRIBUTES_MESSAGE_KEY as ATTRIBUTES_MESSAGE_KEY, ATTRIBUTES_PENDING_SPAN_REAL_PARENT_KEY as ATTRIBUTES_PENDING_SPAN_REAL_PARENT_KEY, ATTRIBUTES_SAMPLE_RATE_KEY as ATTRIBUTES_SAMPLE_RATE_KEY, ATTRIBUTES_SPAN_TYPE_KEY as ATTRIBUTES_SPAN_TYPE_KEY, ATTRIBUTES_VALIDATION_ERROR_KEY as ATTRIBUTES_VALIDATION_ERROR_KEY, PENDING_SPAN_NAME_SUFFIX as PENDING_SPAN_NAME_SUFFIX, log_level_attributes as log_level_attributes
from .utils import handle_internal_errors as handle_internal_errors
from _typeshed import Incomplete
from dataclasses import dataclass
from opentelemetry import context as context_api
Expand All @@ -13,7 +14,9 @@ from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util import types as otel_types
from threading import Lock
from typing import Any, Callable, Mapping, Sequence
from weakref import WeakKeyDictionary
from weakref import WeakKeyDictionary, WeakSet

OPEN_SPANS: WeakSet[_LogfireWrappedSpan]

@dataclass
class ProxyTracerProvider(TracerProvider):
Expand All @@ -32,11 +35,12 @@ class ProxyTracerProvider(TracerProvider):
def resource(self) -> Resource: ...
def force_flush(self, timeout_millis: int = 30000) -> bool: ...

@dataclass
class _MaybeDeterministicTimestampSpan(trace_api.Span, ReadableSpan):
@dataclass(eq=False)
class _LogfireWrappedSpan(trace_api.Span, ReadableSpan):
"""Span that overrides end() to use a timestamp generator if one was provided."""
span: Span
ns_timestamp_generator: Callable[[], int]
def __post_init__(self) -> None: ...
def end(self, end_time: int | None = None) -> None: ...
def get_span_context(self) -> SpanContext: ...
def set_attributes(self, attributes: dict[str, otel_types.AttributeValue]) -> None: ...
Expand Down Expand Up @@ -85,3 +89,6 @@ def should_sample(span_context: SpanContext, attributes: Mapping[str, otel_types
This is used to sample spans that are not sampled by the OTEL sampler.
"""
def get_sample_rate_from_attributes(attributes: otel_types.Attributes) -> float | None: ...
def record_exception(span: trace_api.Span, exception: BaseException, *, attributes: otel_types.Attributes = None, timestamp: int | None = None, escaped: bool = False) -> None:
"""Similar to the OTEL SDK Span.record_exception method, with our own additions."""
def set_exception_status(span: trace_api.Span, exception: BaseException): ...
6 changes: 3 additions & 3 deletions logfire/_internal/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
)
from .exporters.file import FileSpanExporter
from .formatter import logfire_format
from .main import user_attributes
from .main import prepare_otlp_attributes
from .scrubbing import Scrubber

try:
Expand Down Expand Up @@ -150,7 +150,7 @@ def write(self, data: Union[Log, Span]) -> None:
)
else:
parent_context = None # pragma: no cover
otlp_attributes = user_attributes(data.attributes)
otlp_attributes = prepare_otlp_attributes(data.attributes)

if data.formatted_msg is None: # pragma: no cover
formatted_message = logfire_format(data.msg_template, data.attributes, self.scrubber)
Expand Down Expand Up @@ -196,7 +196,7 @@ def write(self, data: Union[Log, Span]) -> None:
start_timestamp = data.start_timestamp
if start_timestamp.tzinfo is None: # pragma: no branch
start_timestamp = start_timestamp.replace(tzinfo=timezone.utc)
otlp_attributes = user_attributes(data.log_attributes)
otlp_attributes = prepare_otlp_attributes(data.log_attributes)
if data.formatted_msg is None: # pragma: no branch
formatted_message = logfire_format(data.msg_template, data.log_attributes, self.scrubber)
else: # pragma: no cover
Expand Down
9 changes: 3 additions & 6 deletions logfire/_internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from typing import TYPE_CHECKING, Any, Callable, Literal, Sequence, TypedDict, cast
from urllib.parse import urljoin
from uuid import uuid4
from weakref import WeakSet

import requests
from opentelemetry import trace
Expand Down Expand Up @@ -87,7 +86,7 @@
from .metrics import ProxyMeterProvider
from .scrubbing import NOOP_SCRUBBER, BaseScrubber, Scrubber, ScrubbingOptions
from .stack_info import warn_at_user_stacklevel
from .tracer import PendingSpanProcessor, ProxyTracerProvider
from .tracer import OPEN_SPANS, PendingSpanProcessor, ProxyTracerProvider
from .utils import (
SeededRandomIdGenerator,
UnexpectedResponse,
Expand All @@ -98,10 +97,8 @@
)

if TYPE_CHECKING:
from .main import FastLogfireSpan, Logfire, LogfireSpan
from .main import Logfire

# NOTE: this WeakSet is the reason that FastLogfireSpan.__slots__ has a __weakref__ slot.
OPEN_SPANS: WeakSet[LogfireSpan | FastLogfireSpan] = WeakSet()

CREDENTIALS_FILENAME = 'logfire_credentials.json'
"""Default base URL for the Logfire API."""
Expand Down Expand Up @@ -948,7 +945,7 @@ def _exit_open_spans(): # type: ignore[reportUnusedFunction] # pragma: no cove
# Registering this callback here after the OTEL one means that this runs first.
# Otherwise OTEL would log an error "Already shutdown, dropping span."
for span in list(OPEN_SPANS):
span.__exit__(None, None, None)
span.end()

self._initialized = True

Expand Down
93 changes: 85 additions & 8 deletions logfire/_internal/integrations/httpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import httpx

from logfire.propagate import attach_context, get_context

try:
from opentelemetry.instrumentation.httpx import (
AsyncRequestHook,
Expand Down Expand Up @@ -64,6 +66,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Unpack[ClientKwargs],
) -> None: ...

Expand All @@ -74,6 +77,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Unpack[AsyncClientKwargs],
) -> None: ...

Expand All @@ -84,6 +88,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Unpack[HTTPXInstrumentKwargs],
) -> None: ...

Expand All @@ -94,6 +99,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Any,
) -> None:
"""Instrument the `httpx` module so that spans are automatically created for each request.
Expand All @@ -108,6 +114,7 @@ def instrument_httpx(
del kwargs # make sure only final_kwargs is used

instrumentor = HTTPXClientInstrumentor()
logfire_instance = logfire_instance.with_settings(custom_scope_suffix='httpx')

if client is None:
request_hook = cast('RequestHook | None', final_kwargs.get('request_hook'))
Expand All @@ -117,11 +124,15 @@ def instrument_httpx(
final_kwargs['request_hook'] = make_request_hook(
request_hook, capture_request_headers, capture_request_json_body
)
final_kwargs['response_hook'] = make_response_hook(response_hook, capture_response_headers)
final_kwargs['response_hook'] = make_response_hook(
response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)
final_kwargs['async_request_hook'] = make_async_request_hook(
async_request_hook, capture_request_headers, capture_request_json_body
)
final_kwargs['async_response_hook'] = make_async_response_hook(async_response_hook, capture_response_headers)
final_kwargs['async_response_hook'] = make_async_response_hook(
async_response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)

instrumentor.instrument(**final_kwargs)
else:
Expand All @@ -130,13 +141,17 @@ def instrument_httpx(
response_hook = cast('ResponseHook | AsyncResponseHook | None', final_kwargs.get('response_hook'))

request_hook = make_async_request_hook(request_hook, capture_request_headers, capture_request_json_body)
response_hook = make_async_response_hook(response_hook, capture_response_headers)
response_hook = make_async_response_hook(
response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)
else:
request_hook = cast('RequestHook | None', final_kwargs.get('request_hook'))
response_hook = cast('ResponseHook | None', final_kwargs.get('response_hook'))

request_hook = make_request_hook(request_hook, capture_request_headers, capture_request_json_body)
response_hook = make_response_hook(response_hook, capture_response_headers)
response_hook = make_response_hook(
response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)

tracer_provider = final_kwargs['tracer_provider']
instrumentor.instrument_client(client, tracer_provider, request_hook, response_hook)
Expand Down Expand Up @@ -176,34 +191,96 @@ async def new_hook(span: Span, request: RequestInfo) -> None:
return new_hook


def make_response_hook(hook: ResponseHook | None, should_capture_headers: bool) -> ResponseHook | None:
if not should_capture_headers and not hook:
def make_response_hook(
hook: ResponseHook | None, should_capture_headers: bool, should_capture_json: bool, logfire_instance: Logfire
) -> ResponseHook | None:
if not should_capture_headers and not should_capture_json and not hook:
return None

def new_hook(span: Span, request: RequestInfo, response: ResponseInfo) -> None:
with handle_internal_errors():
if should_capture_headers:
capture_response_headers(span, response)
if should_capture_json:
capture_response_json(logfire_instance, response, False)
run_hook(hook, span, request, response)

return new_hook


def make_async_response_hook(
hook: ResponseHook | AsyncResponseHook | None, should_capture_headers: bool
hook: ResponseHook | AsyncResponseHook | None,
should_capture_headers: bool,
should_capture_json: bool,
logfire_instance: Logfire,
) -> AsyncResponseHook | None:
if not should_capture_headers and not hook:
if not should_capture_headers and not should_capture_json and not hook:
return None

async def new_hook(span: Span, request: RequestInfo, response: ResponseInfo) -> None:
with handle_internal_errors():
if should_capture_headers:
capture_response_headers(span, response)
if should_capture_json:
capture_response_json(logfire_instance, response, True)
await run_async_hook(hook, span, request, response)

return new_hook


def capture_response_json(logfire_instance: Logfire, response_info: ResponseInfo, is_async: bool) -> None:
headers = cast('httpx.Headers', response_info.headers)
if not headers.get('content-type', '').lower().startswith('application/json'):
return

frame = inspect.currentframe().f_back.f_back # type: ignore
while frame:
response = frame.f_locals.get('response')
frame = frame.f_back
if isinstance(response, httpx.Response): # pragma: no branch
break
else: # pragma: no cover
return

ctx = get_context()
attr_name = 'http.response.body.json'

if is_async: # these two branches should be kept almost identical
original_aread = response.aread

async def aread(*args: Any, **kwargs: Any):
try:
# Only log the body the first time it's read
return response.content
except httpx.ResponseNotRead:
pass
with attach_context(ctx), logfire_instance.span('Reading response body') as span:
content = await original_aread(*args, **kwargs)
span.set_attribute(attr_name, {}) # Set the JSON schema
# Set the attribute to the raw text so that the backend can parse it
span._span.set_attribute(attr_name, response.text) # type: ignore
return content

response.aread = aread
else:
original_read = response.read

def read(*args: Any, **kwargs: Any):
try:
# Only log the body the first time it's read
return response.content
except httpx.ResponseNotRead:
pass
with attach_context(ctx), logfire_instance.span('Reading response body') as span:
content = original_read(*args, **kwargs)
span.set_attribute(attr_name, {}) # Set the JSON schema
# Set the attribute to the raw text so that the backend can parse it
span._span.set_attribute(attr_name, response.text) # type: ignore
return content

response.read = read


async def run_async_hook(hook: Callable[P, Any] | None, *args: P.args, **kwargs: P.kwargs) -> None:
if hook:
result = hook(*args, **kwargs)
Expand Down
Loading

0 comments on commit e693bb6

Please sign in to comment.