Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture httpx response JSON bodies #700

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 85 additions & 8 deletions logfire/_internal/integrations/httpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import httpx

from logfire.propagate import attach_context, get_context

try:
from opentelemetry.instrumentation.httpx import (
AsyncRequestHook,
Expand Down Expand Up @@ -64,6 +66,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Unpack[ClientKwargs],
) -> None: ...

Expand All @@ -74,6 +77,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Unpack[AsyncClientKwargs],
) -> None: ...

Expand All @@ -84,6 +88,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Unpack[HTTPXInstrumentKwargs],
) -> None: ...

Expand All @@ -94,6 +99,7 @@ def instrument_httpx(
capture_request_headers: bool,
capture_response_headers: bool,
capture_request_json_body: bool,
capture_response_json_body: bool,
**kwargs: Any,
) -> None:
"""Instrument the `httpx` module so that spans are automatically created for each request.
Expand All @@ -108,6 +114,7 @@ def instrument_httpx(
del kwargs # make sure only final_kwargs is used

instrumentor = HTTPXClientInstrumentor()
logfire_instance = logfire_instance.with_settings(custom_scope_suffix='httpx')

if client is None:
request_hook = cast('RequestHook | None', final_kwargs.get('request_hook'))
Expand All @@ -117,11 +124,15 @@ def instrument_httpx(
final_kwargs['request_hook'] = make_request_hook(
request_hook, capture_request_headers, capture_request_json_body
)
final_kwargs['response_hook'] = make_response_hook(response_hook, capture_response_headers)
final_kwargs['response_hook'] = make_response_hook(
response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)
final_kwargs['async_request_hook'] = make_async_request_hook(
async_request_hook, capture_request_headers, capture_request_json_body
)
final_kwargs['async_response_hook'] = make_async_response_hook(async_response_hook, capture_response_headers)
final_kwargs['async_response_hook'] = make_async_response_hook(
async_response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)

instrumentor.instrument(**final_kwargs)
else:
Expand All @@ -130,13 +141,17 @@ def instrument_httpx(
response_hook = cast('ResponseHook | AsyncResponseHook | None', final_kwargs.get('response_hook'))

request_hook = make_async_request_hook(request_hook, capture_request_headers, capture_request_json_body)
response_hook = make_async_response_hook(response_hook, capture_response_headers)
response_hook = make_async_response_hook(
response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)
else:
request_hook = cast('RequestHook | None', final_kwargs.get('request_hook'))
response_hook = cast('ResponseHook | None', final_kwargs.get('response_hook'))

request_hook = make_request_hook(request_hook, capture_request_headers, capture_request_json_body)
response_hook = make_response_hook(response_hook, capture_response_headers)
response_hook = make_response_hook(
response_hook, capture_response_headers, capture_response_json_body, logfire_instance
)

tracer_provider = final_kwargs['tracer_provider']
instrumentor.instrument_client(client, tracer_provider, request_hook, response_hook)
Expand Down Expand Up @@ -176,34 +191,96 @@ async def new_hook(span: Span, request: RequestInfo) -> None:
return new_hook


def make_response_hook(hook: ResponseHook | None, should_capture_headers: bool) -> ResponseHook | None:
if not should_capture_headers and not hook:
def make_response_hook(
hook: ResponseHook | None, should_capture_headers: bool, should_capture_json: bool, logfire_instance: Logfire
) -> ResponseHook | None:
if not should_capture_headers and not should_capture_json and not hook:
return None

def new_hook(span: Span, request: RequestInfo, response: ResponseInfo) -> None:
with handle_internal_errors():
if should_capture_headers:
capture_response_headers(span, response)
if should_capture_json:
capture_response_json(logfire_instance, response, False)
run_hook(hook, span, request, response)

return new_hook


def make_async_response_hook(
hook: ResponseHook | AsyncResponseHook | None, should_capture_headers: bool
hook: ResponseHook | AsyncResponseHook | None,
should_capture_headers: bool,
should_capture_json: bool,
logfire_instance: Logfire,
) -> AsyncResponseHook | None:
if not should_capture_headers and not hook:
if not should_capture_headers and not should_capture_json and not hook:
return None

async def new_hook(span: Span, request: RequestInfo, response: ResponseInfo) -> None:
with handle_internal_errors():
if should_capture_headers:
capture_response_headers(span, response)
if should_capture_json:
capture_response_json(logfire_instance, response, True)
await run_async_hook(hook, span, request, response)

return new_hook


def capture_response_json(logfire_instance: Logfire, response_info: ResponseInfo, is_async: bool) -> None:
headers = cast('httpx.Headers', response_info.headers)
if not headers.get('content-type', '').lower().startswith('application/json'):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the lower is redundant.

return

frame = inspect.currentframe().f_back.f_back # type: ignore
while frame:
response = frame.f_locals.get('response')
frame = frame.f_back
if isinstance(response, httpx.Response): # pragma: no branch
break
else: # pragma: no cover
return

ctx = get_context()
attr_name = 'http.response.body.json'

if is_async: # these two branches should be kept almost identical
original_aread = response.aread

async def aread(*args: Any, **kwargs: Any):
try:
# Only log the body the first time it's read
return response.content
except httpx.ResponseNotRead:
pass
with attach_context(ctx), logfire_instance.span('Reading response body') as span:
content = await original_aread(*args, **kwargs)
span.set_attribute(attr_name, {}) # Set the JSON schema
# Set the attribute to the raw text so that the backend can parse it
span._span.set_attribute(attr_name, response.text) # type: ignore
return content

response.aread = aread
else:
original_read = response.read

def read(*args: Any, **kwargs: Any):
try:
# Only log the body the first time it's read
return response.content
except httpx.ResponseNotRead:
pass
with attach_context(ctx), logfire_instance.span('Reading response body') as span:
content = original_read(*args, **kwargs)
span.set_attribute(attr_name, {}) # Set the JSON schema
# Set the attribute to the raw text so that the backend can parse it
span._span.set_attribute(attr_name, response.text) # type: ignore
return content

response.read = read


async def run_async_hook(hook: Callable[P, Any] | None, *args: P.args, **kwargs: P.kwargs) -> None:
if hook:
result = hook(*args, **kwargs)
Expand Down
6 changes: 6 additions & 0 deletions logfire/_internal/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,7 @@ def instrument_httpx(
capture_request_headers: bool = False,
capture_response_headers: bool = False,
capture_request_json_body: bool = False,
capture_response_json_body: bool = False,
**kwargs: Unpack[ClientKwargs],
) -> None: ...

Expand All @@ -1175,6 +1176,7 @@ def instrument_httpx(
capture_request_headers: bool = False,
capture_response_headers: bool = False,
capture_request_json_body: bool = False,
capture_response_json_body: bool = False,
**kwargs: Unpack[AsyncClientKwargs],
) -> None: ...

Expand All @@ -1185,6 +1187,7 @@ def instrument_httpx(
capture_request_headers: bool = False,
capture_response_headers: bool = False,
capture_request_json_body: bool = False,
capture_response_json_body: bool = False,
**kwargs: Unpack[HTTPXInstrumentKwargs],
) -> None: ...

Expand All @@ -1194,6 +1197,7 @@ def instrument_httpx(
capture_request_headers: bool = False,
capture_response_headers: bool = False,
capture_request_json_body: bool = False,
capture_response_json_body: bool = False,
**kwargs: Any,
) -> None:
"""Instrument the `httpx` module so that spans are automatically created for each request.
Expand All @@ -1210,6 +1214,7 @@ def instrument_httpx(
capture_request_headers: Set to `True` to capture all request headers.
capture_response_headers: Set to `True` to capture all response headers.
capture_request_json_body: Set to `True` to capture the request JSON body.
capture_response_json_body: Set to `True` to capture the response JSON body.
**kwargs: Additional keyword arguments to pass to the OpenTelemetry `instrument` method, for future compatibility.
"""
from .integrations.httpx import instrument_httpx
Expand All @@ -1221,6 +1226,7 @@ def instrument_httpx(
capture_request_headers,
capture_response_headers,
capture_request_json_body=capture_request_json_body,
capture_response_json_body=capture_response_json_body,
**kwargs,
)

Expand Down
Loading
Loading