Skip to content

Commit

Permalink
Support newer httpx versions (open-telemetry#866)
Browse files Browse the repository at this point in the history
  • Loading branch information
nozik authored Feb 4, 2022
1 parent 8d309af commit 0b9e96d
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 82 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-instrumentation-pymongo` now supports `pymongo v4`
([#876](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/876))

- `opentelemetry-instrumentation-httpx` now supports versions higher than `0.19.0`.
([#866](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/866))

### Fixed

- `opentelemetry-instrumentation-django` Django: Conditionally create SERVER spans
Expand Down
2 changes: 1 addition & 1 deletion docs-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ redis>=2.6
sqlalchemy>=1.0
tornado>=5.1.1
ddtrace>=0.34.0
httpx~=0.18.0
httpx>=0.18.0
1 change: 1 addition & 0 deletions docs/nitpick-exceptions.ini
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class_references=
httpx.AsyncBaseTransport
httpx.SyncByteStream
httpx.AsyncByteStream
httpx.Response
yarl.URL

anys=
Expand Down
2 changes: 1 addition & 1 deletion instrumentation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
| [opentelemetry-instrumentation-fastapi](./opentelemetry-instrumentation-fastapi) | fastapi ~= 0.58 |
| [opentelemetry-instrumentation-flask](./opentelemetry-instrumentation-flask) | flask >= 1.0, < 3.0 |
| [opentelemetry-instrumentation-grpc](./opentelemetry-instrumentation-grpc) | grpcio ~= 1.27 |
| [opentelemetry-instrumentation-httpx](./opentelemetry-instrumentation-httpx) | httpx >= 0.18.0, < 0.19.0 |
| [opentelemetry-instrumentation-httpx](./opentelemetry-instrumentation-httpx) | httpx >= 0.18.0 |
| [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2 >= 2.7, < 4.0 |
| [opentelemetry-instrumentation-kafka-python](./opentelemetry-instrumentation-kafka-python) | kafka-python >= 2.0 |
| [opentelemetry-instrumentation-logging](./opentelemetry-instrumentation-logging) | logging |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ install_requires =
test =
opentelemetry-sdk ~= 1.3
opentelemetry-test-utils == 0.28b1
respx ~= 0.17.0

[options.packages.find]
where = src
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,38 @@ def _prepare_headers(headers: typing.Optional[Headers]) -> httpx.Headers:
return httpx.Headers(headers)


def _extract_parameters(args, kwargs):
if isinstance(args[0], httpx.Request):
# In httpx >= 0.20.0, handle_request receives a Request object
request: httpx.Request = args[0]
method = request.method.encode()
url = request.url
headers = request.headers
stream = request.stream
extensions = request.extensions
else:
# In httpx < 0.20.0, handle_request receives the parameters separately
method = args[0]
url = args[1]
headers = kwargs.get("headers", args[2] if len(args) > 2 else None)
stream = kwargs.get("stream", args[3] if len(args) > 3 else None)
extensions = kwargs.get(
"extensions", args[4] if len(args) > 4 else None
)

return method, url, headers, stream, extensions


def _inject_propagation_headers(headers, args, kwargs):
_headers = _prepare_headers(headers)
inject(_headers)
if isinstance(args[0], httpx.Request):
request: httpx.Request = args[0]
request.headers = _headers
else:
kwargs["headers"] = _headers.raw


class SyncOpenTelemetryTransport(httpx.BaseTransport):
"""Sync transport class that will trace all requests made with a client.
Expand Down Expand Up @@ -263,60 +295,53 @@ def __init__(

def handle_request(
self,
method: bytes,
url: URL,
headers: typing.Optional[Headers] = None,
stream: typing.Optional[httpx.SyncByteStream] = None,
extensions: typing.Optional[dict] = None,
) -> typing.Tuple[int, "Headers", httpx.SyncByteStream, dict]:
*args,
**kwargs,
) -> typing.Union[
typing.Tuple[int, "Headers", httpx.SyncByteStream, dict],
httpx.Response,
]:
"""Add request info to span."""
if context.get_value("suppress_instrumentation"):
return self._transport.handle_request(
method,
url,
headers=headers,
stream=stream,
extensions=extensions,
)
return self._transport.handle_request(*args, **kwargs)

method, url, headers, stream, extensions = _extract_parameters(
args, kwargs
)
span_attributes = _prepare_attributes(method, url)
_headers = _prepare_headers(headers)

request_info = RequestInfo(method, url, headers, stream, extensions)
span_name = _get_default_span_name(
span_attributes[SpanAttributes.HTTP_METHOD]
)
request = RequestInfo(method, url, headers, stream, extensions)

with self._tracer.start_as_current_span(
span_name, kind=SpanKind.CLIENT, attributes=span_attributes
) as span:
if self._request_hook is not None:
self._request_hook(span, request)

inject(_headers)

(
status_code,
headers,
stream,
extensions,
) = self._transport.handle_request(
method,
url,
headers=_headers.raw,
stream=stream,
extensions=extensions,
)
self._request_hook(span, request_info)

_inject_propagation_headers(headers, args, kwargs)
response = self._transport.handle_request(*args, **kwargs)
if isinstance(response, httpx.Response):
response: httpx.Response = response
status_code = response.status_code
headers = response.headers
stream = response.stream
extensions = response.extensions
else:
status_code, headers, stream, extensions = response

_apply_status_code(span, status_code)

if self._response_hook is not None:
self._response_hook(
span,
request,
request_info,
ResponseInfo(status_code, headers, stream, extensions),
)

return status_code, headers, stream, extensions
return response


class AsyncOpenTelemetryTransport(httpx.AsyncBaseTransport):
Expand Down Expand Up @@ -348,61 +373,55 @@ def __init__(
self._response_hook = response_hook

async def handle_async_request(
self,
method: bytes,
url: URL,
headers: typing.Optional[Headers] = None,
stream: typing.Optional[httpx.AsyncByteStream] = None,
extensions: typing.Optional[dict] = None,
) -> typing.Tuple[int, "Headers", httpx.AsyncByteStream, dict]:
self, *args, **kwargs
) -> typing.Union[
typing.Tuple[int, "Headers", httpx.AsyncByteStream, dict],
httpx.Response,
]:
"""Add request info to span."""
if context.get_value("suppress_instrumentation"):
return await self._transport.handle_async_request(
method,
url,
headers=headers,
stream=stream,
extensions=extensions,
)
return await self._transport.handle_async_request(*args, **kwargs)

method, url, headers, stream, extensions = _extract_parameters(
args, kwargs
)
span_attributes = _prepare_attributes(method, url)
_headers = _prepare_headers(headers)

span_name = _get_default_span_name(
span_attributes[SpanAttributes.HTTP_METHOD]
)
request = RequestInfo(method, url, headers, stream, extensions)
request_info = RequestInfo(method, url, headers, stream, extensions)

with self._tracer.start_as_current_span(
span_name, kind=SpanKind.CLIENT, attributes=span_attributes
) as span:
if self._request_hook is not None:
await self._request_hook(span, request)

inject(_headers)

(
status_code,
headers,
stream,
extensions,
) = await self._transport.handle_async_request(
method,
url,
headers=_headers.raw,
stream=stream,
extensions=extensions,
await self._request_hook(span, request_info)

_inject_propagation_headers(headers, args, kwargs)

response = await self._transport.handle_async_request(
*args, **kwargs
)
if isinstance(response, httpx.Response):
response: httpx.Response = response
status_code = response.status_code
headers = response.headers
stream = response.stream
extensions = response.extensions
else:
status_code, headers, stream, extensions = response

_apply_status_code(span, status_code)

if self._response_hook is not None:
await self._response_hook(
span,
request,
request_info,
ResponseInfo(status_code, headers, stream, extensions),
)

return status_code, headers, stream, extensions
return response


class _InstrumentedClient(httpx.Client):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
# limitations under the License.


_instruments = ("httpx >= 0.18.0, < 0.19.0",)
_instruments = ("httpx >= 0.18.0",)
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def test_requests_timeout_exception(self):
self.assertEqual(span.status.status_code, StatusCode.ERROR)

def test_invalid_url(self):
url = "invalid://nope"
url = "invalid://nope/"

with respx.mock, self.assertRaises(httpx.UnsupportedProtocol):
respx.post("invalid://nope").pass_through()
Expand All @@ -259,14 +259,10 @@ def test_invalid_url(self):
span = self.assert_span()

self.assertEqual(span.name, "HTTP POST")
print(span.attributes)
self.assertEqual(
span.attributes,
{
SpanAttributes.HTTP_METHOD: "POST",
SpanAttributes.HTTP_URL: "invalid://nope/",
},
span.attributes[SpanAttributes.HTTP_METHOD], "POST"
)
self.assertEqual(span.attributes[SpanAttributes.HTTP_URL], url)
self.assertEqual(span.status.status_code, StatusCode.ERROR)

def test_if_headers_equals_none(self):
Expand Down Expand Up @@ -621,6 +617,17 @@ async def _perform_request():

return _async_call(_perform_request())

def test_basic_multiple(self):
# We need to create separate clients because in httpx >= 0.19,
# closing the client after "with" means the second http call fails
self.perform_request(
self.URL, client=self.create_client(self.transport)
)
self.perform_request(
self.URL, client=self.create_client(self.transport)
)
self.assert_span(num_spans=2)


class TestSyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
def create_client(
Expand All @@ -646,6 +653,13 @@ class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
request_hook = staticmethod(_async_request_hook)
no_update_request_hook = staticmethod(_async_no_update_request_hook)

def setUp(self):
super().setUp()
HTTPXClientInstrumentor().instrument()
self.client = self.create_client()
self.client2 = self.create_client()
HTTPXClientInstrumentor().uninstrument()

def create_client(
self,
transport: typing.Optional[AsyncOpenTelemetryTransport] = None,
Expand All @@ -668,3 +682,10 @@ async def _perform_request():
return await _client.request(method, url, headers=headers)

return _async_call(_perform_request())

def test_basic_multiple(self):
# We need to create separate clients because in httpx >= 0.19,
# closing the client after "with" means the second http call fails
self.perform_request(self.URL, client=self.client)
self.perform_request(self.URL, client=self.client2)
self.assert_span(num_spans=2)
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
"instrumentation": "opentelemetry-instrumentation-grpc==0.28b1",
},
"httpx": {
"library": "httpx >= 0.18.0, < 0.19.0",
"library": "httpx >= 0.18.0",
"instrumentation": "opentelemetry-instrumentation-httpx==0.28b1",
},
"jinja2": {
Expand Down
12 changes: 8 additions & 4 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ envlist =
pypy3-test-instrumentation-tornado

; opentelemetry-instrumentation-httpx
py3{6,7,8,9,10}-test-instrumentation-httpx
pypy3-test-instrumentation-httpx
py3{6,7,8,9,10}-test-instrumentation-httpx{18,21}
pypy3-test-instrumentation-httpx{18,21}

; opentelemetry-util-http
py3{6,7,8,9,10}-test-util-http
Expand Down Expand Up @@ -222,6 +222,10 @@ deps =
sqlalchemy14: sqlalchemy~=1.4
pika0: pika>=0.12.0,<1.0.0
pika1: pika>=1.0.0
httpx18: httpx>=0.18.0,<0.19.0
httpx18: respx~=0.17.0
httpx21: httpx>=0.19.0
httpx21: respx~=0.19.0

; FIXME: add coverage testing
; FIXME: add mypy testing
Expand Down Expand Up @@ -270,7 +274,7 @@ changedir =
test-instrumentation-starlette: instrumentation/opentelemetry-instrumentation-starlette/tests
test-instrumentation-tornado: instrumentation/opentelemetry-instrumentation-tornado/tests
test-instrumentation-wsgi: instrumentation/opentelemetry-instrumentation-wsgi/tests
test-instrumentation-httpx: instrumentation/opentelemetry-instrumentation-httpx/tests
test-instrumentation-httpx{18,21}: instrumentation/opentelemetry-instrumentation-httpx/tests
test-util-http: util/opentelemetry-util-http/tests
test-sdkextension-aws: sdk-extension/opentelemetry-sdk-extension-aws/tests
test-propagator-aws: propagator/opentelemetry-propagator-aws-xray/tests
Expand Down Expand Up @@ -366,7 +370,7 @@ commands_pre =

elasticsearch{2,5,6}: pip install {toxinidir}/opentelemetry-instrumentation[test] {toxinidir}/instrumentation/opentelemetry-instrumentation-elasticsearch[test]

httpx: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-httpx[test]
httpx{18,21}: pip install {toxinidir}/instrumentation/opentelemetry-instrumentation-httpx[test]

sdkextension-aws: pip install {toxinidir}/sdk-extension/opentelemetry-sdk-extension-aws[test]

Expand Down

0 comments on commit 0b9e96d

Please sign in to comment.