Skip to content

Commit

Permalink
first draft on aiohttp-client transition
Browse files Browse the repository at this point in the history
  • Loading branch information
emdneto committed Jul 8, 2024
1 parent acf6811 commit 95c7bf5
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,26 @@ def response_hook(span: Span, params: typing.Union[

from opentelemetry import context as context_api
from opentelemetry import trace
from opentelemetry.instrumentation._semconv import (
_get_schema_url,
_HTTPStabilityMode,
_OpenTelemetrySemanticConventionStability,
_OpenTelemetryStabilitySignalType,
_set_error_type,
_set_http_method,
_set_http_url,
_set_status,
)
from opentelemetry.instrumentation.aiohttp_client.package import _instruments
from opentelemetry.instrumentation.aiohttp_client.version import __version__
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
http_status_to_status_code,
is_instrumentation_enabled,
unwrap,
)
from opentelemetry.propagate import inject
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span, SpanKind, TracerProvider, get_tracer
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util.http import remove_url_credentials
from opentelemetry.util.http import remove_url_credentials, sanitize_method

_UrlFilterT = typing.Optional[typing.Callable[[yarl.URL], str]]
_RequestHookT = typing.Optional[
Expand All @@ -122,11 +129,20 @@ def response_hook(span: Span, params: typing.Union[
]


def _get_default_span_name(method: str) -> str:
method = sanitize_method(method.upper().strip())
if method == "_OTHER":
method = "HTTP"

return method


def create_trace_config(
url_filter: _UrlFilterT = None,
request_hook: _RequestHookT = None,
response_hook: _ResponseHookT = None,
tracer_provider: TracerProvider = None,
sem_conv_opt_in_mode: _HTTPStabilityMode = _HTTPStabilityMode.DEFAULT,
) -> aiohttp.TraceConfig:
"""Create an aiohttp-compatible trace configuration.
Expand Down Expand Up @@ -167,9 +183,13 @@ def create_trace_config(
__name__,
__version__,
tracer_provider,
schema_url="https://opentelemetry.io/schemas/1.11.0",
schema_url=_get_schema_url(sem_conv_opt_in_mode),
)

# TODO: Use this when we have durations for aiohttp-client
metrics_attributes = {}
server_span = False

def _end_trace(trace_config_ctx: types.SimpleNamespace):
context_api.detach(trace_config_ctx.token)
trace_config_ctx.span.end()
Expand All @@ -183,18 +203,22 @@ async def on_request_start(
trace_config_ctx.span = None
return

http_method = params.method.upper()
request_span_name = f"{http_method}"
http_method = params.method
request_span_name = _get_default_span_name(http_method)
request_url = (
remove_url_credentials(trace_config_ctx.url_filter(params.url))
if callable(trace_config_ctx.url_filter)
else remove_url_credentials(str(params.url))
)

span_attributes = {
SpanAttributes.HTTP_METHOD: http_method,
SpanAttributes.HTTP_URL: request_url,
}
span_attributes = {}
_set_http_method(
span_attributes,
http_method,
request_span_name,
sem_conv_opt_in_mode,
)
_set_http_url(span_attributes, request_url, sem_conv_opt_in_mode)

trace_config_ctx.span = trace_config_ctx.tracer.start_span(
request_span_name, kind=SpanKind.CLIENT, attributes=span_attributes
Expand All @@ -221,11 +245,18 @@ async def on_request_end(
response_hook(trace_config_ctx.span, params)

if trace_config_ctx.span.is_recording():
trace_config_ctx.span.set_status(
Status(http_status_to_status_code(int(params.response.status)))
)
trace_config_ctx.span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE, params.response.status
status_code_str = str(params.response.status)
try:
status_code = int(status_code_str)
except ValueError:
status_code = -1
_set_status(
trace_config_ctx.span,
metrics_attributes,
status_code,
status_code_str,
server_span,
sem_conv_opt_in_mode,
)
_end_trace(trace_config_ctx)

Expand All @@ -238,7 +269,24 @@ async def on_request_exception(
return

if trace_config_ctx.span.is_recording() and params.exception:
trace_config_ctx.span.set_status(Status(StatusCode.ERROR))
_set_error_type(
trace_config_ctx.span,
metrics_attributes,
type(params.exception).__qualname__,
sem_conv_opt_in_mode,
)
# # if _report_new(sem_conv_opt_in_mode):
# # trace_config_ctx.set_attribute(ERROR_TYPE, exception_type)

# # trace_config_ctx.span.set_status(Status(StatusCode.ERROR,exception_type))
# _set_status(
# trace_config_ctx.span,
# metrics_attributes,
# status_code,
# exception_type,
# server_span,
# sem_conv_opt_in_mode,
# )
trace_config_ctx.span.record_exception(params.exception)

if callable(response_hook):
Expand Down Expand Up @@ -271,6 +319,7 @@ def _instrument(
trace_configs: typing.Optional[
typing.Sequence[aiohttp.TraceConfig]
] = None,
sem_conv_opt_in_mode: _HTTPStabilityMode = _HTTPStabilityMode.DEFAULT,
):
"""Enables tracing of all ClientSessions
Expand All @@ -293,6 +342,7 @@ def instrumented_init(wrapped, instance, args, kwargs):
request_hook=request_hook,
response_hook=response_hook,
tracer_provider=tracer_provider,
sem_conv_opt_in_mode=sem_conv_opt_in_mode,
)
trace_config._is_instrumented_by_opentelemetry = True
client_trace_configs.append(trace_config)
Expand Down Expand Up @@ -344,12 +394,17 @@ def _instrument(self, **kwargs):
``trace_configs``: An optional list of aiohttp.TraceConfig items, allowing customize enrichment of spans
based on aiohttp events (see specification: https://docs.aiohttp.org/en/stable/tracing_reference.html)
"""
_OpenTelemetrySemanticConventionStability._initialize()
_sem_conv_opt_in_mode = _OpenTelemetrySemanticConventionStability._get_opentelemetry_stability_opt_in_mode(
_OpenTelemetryStabilitySignalType.HTTP,
)
_instrument(
tracer_provider=kwargs.get("tracer_provider"),
url_filter=kwargs.get("url_filter"),
request_hook=kwargs.get("request_hook"),
response_hook=kwargs.get("response_hook"),
trace_configs=kwargs.get("trace_configs"),
sem_conv_opt_in_mode=_sem_conv_opt_in_mode,
)

def _uninstrument(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ async def do_request(url):
[
(
"GET",
(expected_status, None),
(expected_status, "ClientConnectorError"),
{
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: url,
Expand All @@ -290,7 +290,7 @@ async def request_handler(request):
[
(
"GET",
(StatusCode.ERROR, None),
(StatusCode.ERROR, "ServerTimeoutError"),
{
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: f"http://{host}:{port}/test_timeout",
Expand All @@ -317,7 +317,7 @@ async def request_handler(request):
[
(
"GET",
(StatusCode.ERROR, None),
(StatusCode.ERROR, "TooManyRedirects"),
{
SpanAttributes.HTTP_METHOD: "GET",
SpanAttributes.HTTP_URL: f"http://{host}:{port}/test_too_many_redirects",
Expand Down

0 comments on commit 95c7bf5

Please sign in to comment.