Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unwrap Celery's ExceptionInfo #1863

Merged
merged 5 commits into from
Sep 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#1889](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1889))
- Fixed union typing error not compatible with Python 3.7 introduced in `opentelemetry-util-http`, fix tests introduced by patch related to sanitize method for wsgi
([#1913](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1913))
- `opentelemetry-instrumentation-celery` Unwrap Celery's `ExceptionInfo` errors and report the actual exception that was raised. ([#1863](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1863))

### Added

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def add(x, y):
from timeit import default_timer
from typing import Collection, Iterable

from billiard.einfo import ExceptionInfo
from celery import signals # pylint: disable=no-name-in-module

from opentelemetry import trace
Expand All @@ -75,6 +76,13 @@ def add(x, y):
from opentelemetry.propagators.textmap import Getter
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace.status import Status, StatusCode
from billiard import VERSION


if VERSION >= (4, 0, 1):
from billiard.einfo import ExceptionWithTraceback
else:
ExceptionWithTraceback = None

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -271,6 +279,18 @@ def _trace_failure(*args, **kwargs):
return

if ex is not None:
# Unwrap the actual exception wrapped by billiard's
unflxw marked this conversation as resolved.
Show resolved Hide resolved
# `ExceptionInfo` and `ExceptionWithTraceback`.
if isinstance(ex, ExceptionInfo) and ex.exception is not None:
ex = ex.exception

if (
ocelotl marked this conversation as resolved.
Show resolved Hide resolved
ExceptionWithTraceback is not None
and isinstance(ex, ExceptionWithTraceback)
and ex.exc is not None
):
ex = ex.exc

status_kwargs["description"] = str(ex)
span.record_exception(ex)
span.set_status(Status(**status_kwargs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import logging

from celery import registry # pylint: disable=no-name-in-module
from billiard import VERSION

from opentelemetry.semconv.trace import SpanAttributes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ class Config:
app.config_from_object(Config)


class CustomError(Exception):
pass


@app.task
def task_add(num_a, num_b):
return num_a + num_b


@app.task
def task_raises():
raise CustomError("The task failed!")
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.test.test_base import TestBase
from opentelemetry.trace import SpanKind
from opentelemetry.trace import SpanKind, StatusCode

from .celery_test_tasks import app, task_add
from .celery_test_tasks import app, task_add, task_raises


class TestCeleryInstrumentation(TestBase):
Expand Down Expand Up @@ -66,6 +66,10 @@ def test_task(self):
},
)

self.assertEqual(consumer.status.status_code, StatusCode.UNSET)

self.assertEqual(0, len(consumer.events))

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_add"
)
Expand All @@ -84,6 +88,70 @@ def test_task(self):
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

def test_task_raises(self):
CeleryInstrumentor().instrument()

result = task_raises.delay()

timeout = time.time() + 60 * 1 # 1 minutes from now
while not result.ready():
if time.time() > timeout:
break
time.sleep(0.05)

spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 2)

consumer, producer = spans

self.assertEqual(
consumer.name, "run/tests.celery_test_tasks.task_raises"
)
self.assertEqual(consumer.kind, SpanKind.CONSUMER)
self.assertSpanHasAttributes(
consumer,
{
"celery.action": "run",
"celery.state": "FAILURE",
SpanAttributes.MESSAGING_DESTINATION: "celery",
"celery.task_name": "tests.celery_test_tasks.task_raises",
},
)

self.assertEqual(consumer.status.status_code, StatusCode.ERROR)

self.assertEqual(1, len(consumer.events))
event = consumer.events[0]

self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes)

self.assertEqual(
event.attributes[SpanAttributes.EXCEPTION_TYPE], "CustomError"
)

self.assertEqual(
event.attributes[SpanAttributes.EXCEPTION_MESSAGE],
"The task failed!",
)

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_raises"
)
self.assertEqual(producer.kind, SpanKind.PRODUCER)
self.assertSpanHasAttributes(
producer,
{
"celery.action": "apply_async",
"celery.task_name": "tests.celery_test_tasks.task_raises",
SpanAttributes.MESSAGING_DESTINATION_KIND: "queue",
SpanAttributes.MESSAGING_DESTINATION: "celery",
},
)

self.assertNotEqual(consumer.parent, producer.context)
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)

def test_uninstrument(self):
CeleryInstrumentor().instrument()
CeleryInstrumentor().uninstrument()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def fn_exception():
assert len(span.events) == 1
event = span.events[0]
assert event.name == "exception"
assert event.attributes[SpanAttributes.EXCEPTION_TYPE] == "ExceptionInfo"
assert event.attributes[SpanAttributes.EXCEPTION_TYPE] == "Exception"
assert SpanAttributes.EXCEPTION_MESSAGE in event.attributes
assert (
span.attributes.get(SpanAttributes.MESSAGING_MESSAGE_ID)
Expand Down Expand Up @@ -420,7 +420,7 @@ def run(self):
assert "Task class is failing" in span.status.description


def test_class_task_exception_excepted(celery_app, memory_exporter):
def test_class_task_exception_expected(celery_app, memory_exporter):
class BaseTask(celery_app.Task):
throws = (MyException,)

Expand Down
Loading