Skip to content

Commit

Permalink
Merge branch 'main' into feature/http-route-in-metric
Browse files Browse the repository at this point in the history
  • Loading branch information
GonzaloGuasch authored Jul 2, 2024
2 parents 36f2078 + b16394b commit c2db445
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 10 deletions.
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
- `opentelemetry-instrumentation-django` Handle exceptions from request/response hooks
([#2153](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2153))
- `opentelemetry-instrumentation-asyncio` instrumented `asyncio.wait_for` properly raises `asyncio.TimeoutError` as expected
([#2637](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2637))
- `opentelemetry-instrumentation-aws-lambda` Bugfix: AWS Lambda event source key incorrect for SNS in instrumentation library.
([#2612](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2612))
- `opentelemetry-instrumentation-system-metrics` Permit to use psutil 6.0+.
Expand Down Expand Up @@ -38,6 +42,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2590](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2590))
- Reference symbols from generated semantic conventions
([#2611](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2611))
- `opentelemetry-instrumentation-psycopg` Bugfix: Handle empty statement.
([#2644](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2644))
- `opentelemetry-instrumentation-confluent-kafka` Confluent Kafka: Ensure consume span is ended when consumer is closed
([#2640](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2640))

## Version 1.25.0/0.46b0 (2024-05-31)

Expand Down Expand Up @@ -145,7 +153,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#2136](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2136))
- `opentelemetry-resource-detector-azure` Suppress instrumentation for `urllib` call
([#2178](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2178))
- AwsLambdaInstrumentor handles and re-raises function exception ([#2245](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2245))
- AwsLambdaInstrumentor handles and re-raises function exception
([#2245](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2245))

### Added

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,11 @@ async def trace_coroutine(self, coro):
# CancelledError is raised when a coroutine is cancelled
# before it has a chance to run. We don't want to record
# this as an error.
# Still it needs to be raised in order for `asyncio.wait_for`
# to properly work with timeout and raise accordingly `asyncio.TimeoutError`
except asyncio.CancelledError:
attr["state"] = "cancelled"
raise
except Exception as exc:
exception = exc
state = determine_state(exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ async def main():
spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)

def test_asyncio_wait_for_with_timeout(self):
expected_timeout_error = None

async def main():
nonlocal expected_timeout_error
try:
await asyncio.wait_for(async_func(), 0.01)
except asyncio.TimeoutError as timeout_error:
expected_timeout_error = timeout_error

asyncio.run(main())
self.assertNotEqual(expected_timeout_error, None)

def test_asyncio_as_completed(self):
async def main():
if sys.version_info >= (3, 11):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ def consume(
): # pylint: disable=useless-super-delegation
return super().consume(*args, **kwargs)

# This method is deliberately implemented in order to allow wrapt to wrap this function
def close(self): # pylint: disable=useless-super-delegation
return super().close()


class ProxiedProducer(Producer):
def __init__(self, producer: Producer, tracer: Tracer):
Expand Down Expand Up @@ -181,6 +185,11 @@ def __init__(self, consumer: Consumer, tracer: Tracer):
self._current_consume_span = None
self._current_context_token = None

def close(self):
return ConfluentKafkaInstrumentor.wrap_close(
self._consumer.close, self
)

def committed(self, partitions, timeout=-1):
return self._consumer.committed(partitions, timeout)

Expand Down Expand Up @@ -303,6 +312,9 @@ def _inner_wrap_consume(func, instance, args, kwargs):
func, instance, self._tracer, args, kwargs
)

def _inner_wrap_close(func, instance):
return ConfluentKafkaInstrumentor.wrap_close(func, instance)

wrapt.wrap_function_wrapper(
AutoInstrumentedProducer,
"produce",
Expand All @@ -321,6 +333,12 @@ def _inner_wrap_consume(func, instance, args, kwargs):
_inner_wrap_consume,
)

wrapt.wrap_function_wrapper(
AutoInstrumentedConsumer,
"close",
_inner_wrap_close,
)

def _uninstrument(self, **kwargs):
confluent_kafka.Producer = self._original_kafka_producer
confluent_kafka.Consumer = self._original_kafka_consumer
Expand Down Expand Up @@ -403,3 +421,9 @@ def wrap_consume(func, instance, tracer, args, kwargs):
)

return records

@staticmethod
def wrap_close(func, instance):
if instance._current_consume_span:
_end_current_consume_span(instance)
func()
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,44 @@ def test_consume(self) -> None:
span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def test_close(self) -> None:
instrumentation = ConfluentKafkaInstrumentor()
mocked_messages = [
MockedMessage("topic-a", 0, 0, []),
]
expected_spans = [
{"name": "recv", "attributes": {}},
{
"name": "topic-a process",
"attributes": {
SpanAttributes.MESSAGING_OPERATION: "process",
SpanAttributes.MESSAGING_KAFKA_PARTITION: 0,
SpanAttributes.MESSAGING_SYSTEM: "kafka",
SpanAttributes.MESSAGING_DESTINATION: "topic-a",
SpanAttributes.MESSAGING_DESTINATION_KIND: MessagingDestinationKindValues.QUEUE.value,
SpanAttributes.MESSAGING_MESSAGE_ID: "topic-a.0.0",
},
},
]

consumer = MockConsumer(
mocked_messages,
{
"bootstrap.servers": "localhost:29092",
"group.id": "mygroup",
"auto.offset.reset": "earliest",
},
)
self.memory_exporter.clear()
consumer = instrumentation.instrument_consumer(consumer)
consumer.poll()
consumer.close()

span_list = self.memory_exporter.get_finished_spans()
self._compare_spans(span_list, expected_spans)

def _compare_spans(self, spans, expected_spans):
self.assertEqual(len(spans), len(expected_spans))
for span, expected_span in zip(spans, expected_spans):
self.assertEqual(expected_span["name"], span.name)
for attribute_key, expected_attribute_value in expected_span[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def _get_span_name(request):
return request.method

# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
def process_request(self, request):
# request.META is a dictionary containing all available HTTP headers
# Read more about request.META here:
Expand Down Expand Up @@ -286,9 +287,14 @@ def process_request(self, request):
request.META[self._environ_token] = token

if _DjangoMiddleware._otel_request_hook:
_DjangoMiddleware._otel_request_hook( # pylint: disable=not-callable
span, request
)
try:
_DjangoMiddleware._otel_request_hook( # pylint: disable=not-callable
span, request
)
except Exception: # pylint: disable=broad-exception-caught
# Raising an exception here would leak the request span since process_response
# would not be called. Log the exception instead.
_logger.exception("Exception raised by request_hook")

# pylint: disable=unused-argument
def process_view(self, request, view_func, *args, **kwargs):
Expand Down Expand Up @@ -385,10 +391,14 @@ def process_response(self, request, response):

# record any exceptions raised while processing the request
exception = request.META.pop(self._environ_exception_key, None)

if _DjangoMiddleware._otel_response_hook:
_DjangoMiddleware._otel_response_hook( # pylint: disable=not-callable
span, request, response
)
try:
_DjangoMiddleware._otel_response_hook( # pylint: disable=not-callable
span, request, response
)
except Exception: # pylint: disable=broad-exception-caught
_logger.exception("Exception raised by response_hook")

if exception:
activation.__exit__(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,32 @@ def response_hook(span, request, response):
self.assertIsInstance(response_hook_args[2], HttpResponse)
self.assertEqual(response_hook_args[2], response)

def test_request_hook_exception(self):
def request_hook(span, request):
# pylint: disable=broad-exception-raised
raise Exception("request hook exception")

_DjangoMiddleware._otel_request_hook = request_hook
Client().get("/span_name/1234/")
_DjangoMiddleware._otel_request_hook = None

# ensure that span ended
finished_spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(finished_spans), 1)

def test_response_hook_exception(self):
def response_hook(span, request, response):
# pylint: disable=broad-exception-raised
raise Exception("response hook exception")

_DjangoMiddleware._otel_response_hook = response_hook
Client().get("/span_name/1234/")
_DjangoMiddleware._otel_response_hook = None

# ensure that span ended
finished_spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(finished_spans), 1)

def test_trace_parent(self):
id_generator = RandomIdGenerator()
trace_id = format_trace_id(id_generator.generate_trace_id())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,8 @@ def get_operation_name(self, cursor, args):
if isinstance(statement, Composed):
statement = statement.as_string(cursor)

if isinstance(statement, str):
# `statement` can be empty string. See #2643
if statement and isinstance(statement, str):
# Strip leading comments so we get the operation name.
return self._leading_comment_remover.sub("", statement).split()[0]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,18 @@ def test_span_name(self):
cursor.execute("/* leading comment */ query")
cursor.execute("/* leading comment */ query /* trailing comment */")
cursor.execute("query /* trailing comment */")
cursor.execute("")
cursor.execute("--")
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 6)
self.assertEqual(len(spans_list), 8)
self.assertEqual(spans_list[0].name, "Test")
self.assertEqual(spans_list[1].name, "multi")
self.assertEqual(spans_list[2].name, "tab")
self.assertEqual(spans_list[3].name, "query")
self.assertEqual(spans_list[4].name, "query")
self.assertEqual(spans_list[5].name, "query")
self.assertEqual(spans_list[6].name, "postgresql")
self.assertEqual(spans_list[7].name, "--")

# pylint: disable=unused-argument
def test_not_recording(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
BaggageKeyPredicateT = Callable[[str], bool]

# A BaggageKeyPredicate that always returns True, allowing all baggage keys to be added to spans
ALLOW_ALL_BAGGAGE_KEYS: BaggageKeyPredicateT = lambda _: True
ALLOW_ALL_BAGGAGE_KEYS: BaggageKeyPredicateT = lambda _: True # noqa: E731


class BaggageSpanProcessor(SpanProcessor):
Expand Down

0 comments on commit c2db445

Please sign in to comment.