From 25b75af8d2649435acfc529eaebf432b8ede5e52 Mon Sep 17 00:00:00 2001 From: Anton Pirker Date: Tue, 18 Jun 2024 13:35:02 +0200 Subject: [PATCH] Reapply "Refactor the Celery Beat integration (#3105)" (#3144) (#3175) This reverts the revert that was done to mitigate the regression error with Crons not being sending ok/error checkins. This reapplies the refactoring and also fixes the root cause of the regression and also adds integration tests to make sure it does not happen again. --- .../test-integrations-data-processing.yml | 4 + .../split-tox-gh-actions.py | 5 + .../templates/test_group.jinja | 5 + sentry_sdk/integrations/celery/__init__.py | 17 +- sentry_sdk/integrations/celery/beat.py | 168 +++++++---------- sentry_sdk/scope.py | 7 +- .../celery/integration_tests/__init__.py | 58 ++++++ .../test_celery_beat_cron_monitoring.py | 153 +++++++++++++++ tests/integrations/celery/test_celery.py | 14 +- .../celery/test_update_celery_task_headers.py | 177 ++++++++++++++---- 10 files changed, 463 insertions(+), 145 deletions(-) create mode 100644 tests/integrations/celery/integration_tests/__init__.py create mode 100644 tests/integrations/celery/integration_tests/test_celery_beat_cron_monitoring.py diff --git a/.github/workflows/test-integrations-data-processing.yml b/.github/workflows/test-integrations-data-processing.yml index 399de7c283..25daf9aada 100644 --- a/.github/workflows/test-integrations-data-processing.yml +++ b/.github/workflows/test-integrations-data-processing.yml @@ -36,6 +36,8 @@ jobs: - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Start Redis + uses: supercharge/redis-github-action@1.7.0 - name: Setup Test Env run: | pip install coverage tox @@ -108,6 +110,8 @@ jobs: - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + - name: Start Redis + uses: supercharge/redis-github-action@1.7.0 - name: Setup Test Env run: | pip install coverage tox diff --git a/scripts/split-tox-gh-actions/split-tox-gh-actions.py b/scripts/split-tox-gh-actions/split-tox-gh-actions.py index a4e4038156..f0f689b139 100755 --- a/scripts/split-tox-gh-actions/split-tox-gh-actions.py +++ b/scripts/split-tox-gh-actions/split-tox-gh-actions.py @@ -35,6 +35,10 @@ "asyncpg", } +FRAMEWORKS_NEEDING_REDIS = { + "celery", +} + FRAMEWORKS_NEEDING_CLICKHOUSE = { "clickhouse_driver", } @@ -275,6 +279,7 @@ def render_template(group, frameworks, py_versions_pinned, py_versions_latest): "needs_aws_credentials": bool(set(frameworks) & FRAMEWORKS_NEEDING_AWS), "needs_clickhouse": bool(set(frameworks) & FRAMEWORKS_NEEDING_CLICKHOUSE), "needs_postgres": bool(set(frameworks) & FRAMEWORKS_NEEDING_POSTGRES), + "needs_redis": bool(set(frameworks) & FRAMEWORKS_NEEDING_REDIS), "needs_github_secrets": bool( set(frameworks) & FRAMEWORKS_NEEDING_GITHUB_SECRETS ), diff --git a/scripts/split-tox-gh-actions/templates/test_group.jinja b/scripts/split-tox-gh-actions/templates/test_group.jinja index 33da6fa59d..4d17717499 100644 --- a/scripts/split-tox-gh-actions/templates/test_group.jinja +++ b/scripts/split-tox-gh-actions/templates/test_group.jinja @@ -53,6 +53,11 @@ - uses: getsentry/action-clickhouse-in-ci@v1 {% endif %} + {% if needs_redis %} + - name: Start Redis + uses: supercharge/redis-github-action@1.7.0 + {% endif %} + - name: Setup Test Env run: | pip install coverage tox diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 2b05871d70..d0908a039e 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -70,10 +70,9 @@ def __init__( self.monitor_beat_tasks = monitor_beat_tasks self.exclude_beat_tasks = exclude_beat_tasks - if monitor_beat_tasks: - _patch_beat_apply_entry() - _patch_redbeat_maybe_due() - _setup_celery_beat_signals() + _patch_beat_apply_entry() + _patch_redbeat_maybe_due() + _setup_celery_beat_signals(monitor_beat_tasks) @staticmethod def setup_once(): @@ -167,11 +166,11 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks): """ updated_headers = original_headers.copy() with capture_internal_exceptions(): - headers = {} - if span is not None: - headers = dict( - Scope.get_current_scope().iter_trace_propagation_headers(span=span) - ) + # if span is None (when the task was started by Celery Beat) + # this will return the trace headers from the scope. + headers = dict( + Scope.get_isolation_scope().iter_trace_propagation_headers(span=span) + ) if monitor_beat_tasks: headers.update( diff --git a/sentry_sdk/integrations/celery/beat.py b/sentry_sdk/integrations/celery/beat.py index 060045eb37..cedda5c467 100644 --- a/sentry_sdk/integrations/celery/beat.py +++ b/sentry_sdk/integrations/celery/beat.py @@ -113,133 +113,109 @@ def _get_monitor_config(celery_schedule, app, monitor_name): return monitor_config -def _patch_beat_apply_entry(): - # type: () -> None +def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration): + # type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None """ - Makes sure that the Sentry Crons information is set in the Celery Beat task's - headers so that is is monitored with Sentry Crons. - - This is only called by Celery Beat. After apply_entry is called - Celery will call apply_async to put the task in the queue. + Add Sentry Crons information to the schedule_entry headers. """ - from sentry_sdk.integrations.celery import CeleryIntegration - - original_apply_entry = Scheduler.apply_entry - - def sentry_apply_entry(*args, **kwargs): - # type: (*Any, **Any) -> None - scheduler, schedule_entry = args - app = scheduler.app - - celery_schedule = schedule_entry.schedule - monitor_name = schedule_entry.name - - integration = sentry_sdk.get_client().get_integration(CeleryIntegration) - if integration is None: - return original_apply_entry(*args, **kwargs) - - if match_regex_list(monitor_name, integration.exclude_beat_tasks): - return original_apply_entry(*args, **kwargs) + if not integration.monitor_beat_tasks: + return - # Tasks started by Celery Beat start a new Trace - scope = Scope.get_isolation_scope() - scope.set_new_propagation_context() - scope._name = "celery-beat" + monitor_name = schedule_entry.name - monitor_config = _get_monitor_config(celery_schedule, app, monitor_name) + task_should_be_excluded = match_regex_list( + monitor_name, integration.exclude_beat_tasks + ) + if task_should_be_excluded: + return - is_supported_schedule = bool(monitor_config) - if is_supported_schedule: - headers = schedule_entry.options.pop("headers", {}) - headers.update( - { - "sentry-monitor-slug": monitor_name, - "sentry-monitor-config": monitor_config, - } - ) + celery_schedule = schedule_entry.schedule + app = scheduler.app - check_in_id = capture_checkin( - monitor_slug=monitor_name, - monitor_config=monitor_config, - status=MonitorStatus.IN_PROGRESS, - ) - headers.update({"sentry-monitor-check-in-id": check_in_id}) + monitor_config = _get_monitor_config(celery_schedule, app, monitor_name) - # Set the Sentry configuration in the options of the ScheduleEntry. - # Those will be picked up in `apply_async` and added to the headers. - schedule_entry.options["headers"] = headers + is_supported_schedule = bool(monitor_config) + if not is_supported_schedule: + return - return original_apply_entry(*args, **kwargs) + headers = schedule_entry.options.pop("headers", {}) + headers.update( + { + "sentry-monitor-slug": monitor_name, + "sentry-monitor-config": monitor_config, + } + ) - Scheduler.apply_entry = sentry_apply_entry + check_in_id = capture_checkin( + monitor_slug=monitor_name, + monitor_config=monitor_config, + status=MonitorStatus.IN_PROGRESS, + ) + headers.update({"sentry-monitor-check-in-id": check_in_id}) + # Set the Sentry configuration in the options of the ScheduleEntry. + # Those will be picked up in `apply_async` and added to the headers. + schedule_entry.options["headers"] = headers -def _patch_redbeat_maybe_due(): - # type: () -> None - if RedBeatScheduler is None: - return +def _wrap_beat_scheduler(original_function): + # type: (Callable[..., Any]) -> Callable[..., Any] + """ + Makes sure that: + - a new Sentry trace is started for each task started by Celery Beat and + it is propagated to the task. + - the Sentry Crons information is set in the Celery Beat task's + headers so that is is monitored with Sentry Crons. + + After the patched function is called, + Celery Beat will call apply_async to put the task in the queue. + """ + # Patch only once + # Can't use __name__ here, because some of our tests mock original_apply_entry + already_patched = "sentry_patched_scheduler" in str(original_function) + if already_patched: + return original_function from sentry_sdk.integrations.celery import CeleryIntegration - original_maybe_due = RedBeatScheduler.maybe_due - - def sentry_maybe_due(*args, **kwargs): + def sentry_patched_scheduler(*args, **kwargs): # type: (*Any, **Any) -> None - scheduler, schedule_entry = args - app = scheduler.app - - celery_schedule = schedule_entry.schedule - monitor_name = schedule_entry.name - integration = sentry_sdk.get_client().get_integration(CeleryIntegration) if integration is None: - return original_maybe_due(*args, **kwargs) - - task_should_be_excluded = match_regex_list( - monitor_name, integration.exclude_beat_tasks - ) - if task_should_be_excluded: - return original_maybe_due(*args, **kwargs) + return original_function(*args, **kwargs) # Tasks started by Celery Beat start a new Trace scope = Scope.get_isolation_scope() scope.set_new_propagation_context() scope._name = "celery-beat" - monitor_config = _get_monitor_config(celery_schedule, app, monitor_name) - - is_supported_schedule = bool(monitor_config) - if is_supported_schedule: - headers = schedule_entry.options.pop("headers", {}) - headers.update( - { - "sentry-monitor-slug": monitor_name, - "sentry-monitor-config": monitor_config, - } - ) + scheduler, schedule_entry = args + _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration) - check_in_id = capture_checkin( - monitor_slug=monitor_name, - monitor_config=monitor_config, - status=MonitorStatus.IN_PROGRESS, - ) - headers.update({"sentry-monitor-check-in-id": check_in_id}) + return original_function(*args, **kwargs) - # Set the Sentry configuration in the options of the ScheduleEntry. - # Those will be picked up in `apply_async` and added to the headers. - schedule_entry.options["headers"] = headers + return sentry_patched_scheduler - return original_maybe_due(*args, **kwargs) - RedBeatScheduler.maybe_due = sentry_maybe_due +def _patch_beat_apply_entry(): + # type: () -> None + Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry) -def _setup_celery_beat_signals(): +def _patch_redbeat_maybe_due(): # type: () -> None - task_success.connect(crons_task_success) - task_failure.connect(crons_task_failure) - task_retry.connect(crons_task_retry) + if RedBeatScheduler is None: + return + + RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due) + + +def _setup_celery_beat_signals(monitor_beat_tasks): + # type: (bool) -> None + if monitor_beat_tasks: + task_success.connect(crons_task_success) + task_failure.connect(crons_task_failure) + task_retry.connect(crons_task_retry) def crons_task_success(sender, **kwargs): diff --git a/sentry_sdk/scope.py b/sentry_sdk/scope.py index 516dcd1032..302701b236 100644 --- a/sentry_sdk/scope.py +++ b/sentry_sdk/scope.py @@ -604,9 +604,10 @@ def iter_headers(self): def iter_trace_propagation_headers(self, *args, **kwargs): # type: (Any, Any) -> Generator[Tuple[str, str], None, None] """ - Return HTTP headers which allow propagation of trace data. Data taken - from the span representing the request, if available, or the current - span on the scope if not. + Return HTTP headers which allow propagation of trace data. + + If a span is given, the trace data will taken from the span. + If no span is given, the trace data is taken from the scope. """ client = Scope.get_client() if not client.options.get("propagate_traces"): diff --git a/tests/integrations/celery/integration_tests/__init__.py b/tests/integrations/celery/integration_tests/__init__.py new file mode 100644 index 0000000000..2dfe2ddcf7 --- /dev/null +++ b/tests/integrations/celery/integration_tests/__init__.py @@ -0,0 +1,58 @@ +import os +import signal +import tempfile +import threading +import time + +from celery.beat import Scheduler + +from sentry_sdk.utils import logger + + +class ImmediateScheduler(Scheduler): + """ + A custom scheduler that starts tasks immediately after starting Celery beat. + """ + + def setup_schedule(self): + super().setup_schedule() + for _, entry in self.schedule.items(): + self.apply_entry(entry) + + def tick(self): + # Override tick to prevent the normal schedule cycle + return 1 + + +def kill_beat(beat_pid_file, delay_seconds=1): + """ + Terminates Celery Beat after the given `delay_seconds`. + """ + logger.info("Starting Celery Beat killer...") + time.sleep(delay_seconds) + pid = int(open(beat_pid_file, "r").read()) + logger.info("Terminating Celery Beat...") + os.kill(pid, signal.SIGTERM) + + +def run_beat(celery_app, runtime_seconds=1, loglevel="warning", quiet=True): + """ + Run Celery Beat that immediately starts tasks. + The Celery Beat instance is automatically terminated after `runtime_seconds`. + """ + logger.info("Starting Celery Beat...") + pid_file = os.path.join(tempfile.mkdtemp(), f"celery-beat-{os.getpid()}.pid") + + t = threading.Thread( + target=kill_beat, + args=(pid_file,), + kwargs={"delay_seconds": runtime_seconds}, + ) + t.start() + + beat_instance = celery_app.Beat( + loglevel=loglevel, + quiet=quiet, + pidfile=pid_file, + ) + beat_instance.run() diff --git a/tests/integrations/celery/integration_tests/test_celery_beat_cron_monitoring.py b/tests/integrations/celery/integration_tests/test_celery_beat_cron_monitoring.py new file mode 100644 index 0000000000..53f2f63215 --- /dev/null +++ b/tests/integrations/celery/integration_tests/test_celery_beat_cron_monitoring.py @@ -0,0 +1,153 @@ +import os +import pytest + +from celery.contrib.testing.worker import start_worker + +from sentry_sdk.utils import logger + +from tests.integrations.celery.integration_tests import run_beat + + +REDIS_SERVER = "redis://127.0.0.1:6379" +REDIS_DB = 15 + + +@pytest.fixture() +def celery_config(): + return { + "worker_concurrency": 1, + "broker_url": f"{REDIS_SERVER}/{REDIS_DB}", + "result_backend": f"{REDIS_SERVER}/{REDIS_DB}", + "beat_scheduler": "tests.integrations.celery.integration_tests:ImmediateScheduler", + "task_always_eager": False, + "task_create_missing_queues": True, + "task_default_queue": f"queue_{os.getpid()}", + } + + +@pytest.fixture +def celery_init(sentry_init, celery_config): + """ + Create a Sentry instrumented Celery app. + """ + from celery import Celery + + from sentry_sdk.integrations.celery import CeleryIntegration + + def inner(propagate_traces=True, monitor_beat_tasks=False, **kwargs): + sentry_init( + integrations=[ + CeleryIntegration( + propagate_traces=propagate_traces, + monitor_beat_tasks=monitor_beat_tasks, + ) + ], + **kwargs, + ) + app = Celery("tasks") + app.conf.update(celery_config) + + return app + + return inner + + +@pytest.mark.forked +def test_explanation(celery_init, capture_envelopes): + """ + This is a dummy test for explaining how to test using Celery Beat + """ + + # First initialize a Celery app. + # You can give the options of CeleryIntegrations + # and the options for `sentry_dks.init` as keyword arguments. + # See the celery_init fixture for details. + app = celery_init( + monitor_beat_tasks=True, + ) + + # Capture envelopes. + envelopes = capture_envelopes() + + # Define the task you want to run + @app.task + def test_task(): + logger.info("Running test_task") + + # Add the task to the beat schedule + app.add_periodic_task(60.0, test_task.s(), name="success_from_beat") + + # Start a Celery worker + with start_worker(app, perform_ping_check=False): + # And start a Celery Beat instance + # This Celery Beat will start the task above immediately + # after start for the first time + # By default Celery Beat is terminated after 1 second. + # See `run_beat` function on how to change this. + run_beat(app) + + # After the Celery Beat is terminated, you can check the envelopes + assert len(envelopes) >= 0 + + +@pytest.mark.forked +def test_beat_task_crons_success(celery_init, capture_envelopes): + app = celery_init( + monitor_beat_tasks=True, + ) + envelopes = capture_envelopes() + + @app.task + def test_task(): + logger.info("Running test_task") + + app.add_periodic_task(60.0, test_task.s(), name="success_from_beat") + + with start_worker(app, perform_ping_check=False): + run_beat(app) + + assert len(envelopes) == 2 + (envelop_in_progress, envelope_ok) = envelopes + + assert envelop_in_progress.items[0].headers["type"] == "check_in" + check_in = envelop_in_progress.items[0].payload.json + assert check_in["type"] == "check_in" + assert check_in["monitor_slug"] == "success_from_beat" + assert check_in["status"] == "in_progress" + + assert envelope_ok.items[0].headers["type"] == "check_in" + check_in = envelope_ok.items[0].payload.json + assert check_in["type"] == "check_in" + assert check_in["monitor_slug"] == "success_from_beat" + assert check_in["status"] == "ok" + + +@pytest.mark.forked +def test_beat_task_crons_error(celery_init, capture_envelopes): + app = celery_init( + monitor_beat_tasks=True, + ) + envelopes = capture_envelopes() + + @app.task + def test_task(): + logger.info("Running test_task") + 1 / 0 + + app.add_periodic_task(60.0, test_task.s(), name="failure_from_beat") + + with start_worker(app, perform_ping_check=False): + run_beat(app) + + envelop_in_progress = envelopes[0] + envelope_error = envelopes[-1] + + check_in = envelop_in_progress.items[0].payload.json + assert check_in["type"] == "check_in" + assert check_in["monitor_slug"] == "failure_from_beat" + assert check_in["status"] == "in_progress" + + check_in = envelope_error.items[0].payload.json + assert check_in["type"] == "check_in" + assert check_in["monitor_slug"] == "failure_from_beat" + assert check_in["status"] == "error" diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index c5311a9d62..ae5647b81d 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -26,9 +26,19 @@ def inner(signal, f): @pytest.fixture def init_celery(sentry_init, request): - def inner(propagate_traces=True, backend="always_eager", **kwargs): + def inner( + propagate_traces=True, + backend="always_eager", + monitor_beat_tasks=False, + **kwargs, + ): sentry_init( - integrations=[CeleryIntegration(propagate_traces=propagate_traces)], + integrations=[ + CeleryIntegration( + propagate_traces=propagate_traces, + monitor_beat_tasks=monitor_beat_tasks, + ) + ], **kwargs, ) celery = Celery(__name__) diff --git a/tests/integrations/celery/test_update_celery_task_headers.py b/tests/integrations/celery/test_update_celery_task_headers.py index d1ab7ef0c1..1680e54d80 100644 --- a/tests/integrations/celery/test_update_celery_task_headers.py +++ b/tests/integrations/celery/test_update_celery_task_headers.py @@ -1,4 +1,5 @@ from copy import copy +import itertools import pytest from unittest import mock @@ -23,23 +24,18 @@ def test_monitor_beat_tasks(monitor_beat_tasks): headers = {} span = None - updated_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) + outgoing_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) assert headers == {} # left unchanged if monitor_beat_tasks: - assert updated_headers == { - "headers": { - "sentry-monitor-start-timestamp-s": mock.ANY, - "sentry-task-enqueued-time": mock.ANY, - }, - "sentry-monitor-start-timestamp-s": mock.ANY, - "sentry-task-enqueued-time": mock.ANY, - } + assert outgoing_headers["sentry-monitor-start-timestamp-s"] == mock.ANY + assert ( + outgoing_headers["headers"]["sentry-monitor-start-timestamp-s"] == mock.ANY + ) else: - assert updated_headers == { - "sentry-task-enqueued-time": mock.ANY, - } + assert "sentry-monitor-start-timestamp-s" not in outgoing_headers + assert "sentry-monitor-start-timestamp-s" not in outgoing_headers["headers"] @pytest.mark.parametrize("monitor_beat_tasks", [True, False, None, "", "bla", 1, 0]) @@ -51,37 +47,45 @@ def test_monitor_beat_tasks_with_headers(monitor_beat_tasks): } span = None - updated_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) + outgoing_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) + + assert headers == { + "blub": "foo", + "sentry-something": "bar", + "sentry-task-enqueued-time": mock.ANY, + } # left unchanged if monitor_beat_tasks: - assert updated_headers == { - "blub": "foo", - "sentry-something": "bar", - "headers": { - "sentry-monitor-start-timestamp-s": mock.ANY, - "sentry-something": "bar", - "sentry-task-enqueued-time": mock.ANY, - }, - "sentry-monitor-start-timestamp-s": mock.ANY, - "sentry-task-enqueued-time": mock.ANY, - } + assert outgoing_headers["blub"] == "foo" + assert outgoing_headers["sentry-something"] == "bar" + assert outgoing_headers["sentry-monitor-start-timestamp-s"] == mock.ANY + assert outgoing_headers["headers"]["sentry-something"] == "bar" + assert ( + outgoing_headers["headers"]["sentry-monitor-start-timestamp-s"] == mock.ANY + ) else: - assert updated_headers == headers + assert outgoing_headers["blub"] == "foo" + assert outgoing_headers["sentry-something"] == "bar" + assert "sentry-monitor-start-timestamp-s" not in outgoing_headers + assert "sentry-monitor-start-timestamp-s" not in outgoing_headers["headers"] def test_span_with_transaction(sentry_init): sentry_init(enable_tracing=True) headers = {} + monitor_beat_tasks = False with sentry_sdk.start_transaction(name="test_transaction") as transaction: with sentry_sdk.start_span(op="test_span") as span: - updated_headers = _update_celery_task_headers(headers, span, False) + outgoing_headers = _update_celery_task_headers( + headers, span, monitor_beat_tasks + ) - assert updated_headers["sentry-trace"] == span.to_traceparent() - assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent() - assert updated_headers["baggage"] == transaction.get_baggage().serialize() + assert outgoing_headers["sentry-trace"] == span.to_traceparent() + assert outgoing_headers["headers"]["sentry-trace"] == span.to_traceparent() + assert outgoing_headers["baggage"] == transaction.get_baggage().serialize() assert ( - updated_headers["headers"]["baggage"] + outgoing_headers["headers"]["baggage"] == transaction.get_baggage().serialize() ) @@ -95,10 +99,10 @@ def test_span_with_transaction_custom_headers(sentry_init): with sentry_sdk.start_transaction(name="test_transaction") as transaction: with sentry_sdk.start_span(op="test_span") as span: - updated_headers = _update_celery_task_headers(headers, span, False) + outgoing_headers = _update_celery_task_headers(headers, span, False) - assert updated_headers["sentry-trace"] == span.to_traceparent() - assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent() + assert outgoing_headers["sentry-trace"] == span.to_traceparent() + assert outgoing_headers["headers"]["sentry-trace"] == span.to_traceparent() incoming_baggage = Baggage.from_incoming_header(headers["baggage"]) combined_baggage = copy(transaction.get_baggage()) @@ -113,9 +117,112 @@ def test_span_with_transaction_custom_headers(sentry_init): if x is not None and x != "" ] ) - assert updated_headers["baggage"] == combined_baggage.serialize( + assert outgoing_headers["baggage"] == combined_baggage.serialize( include_third_party=True ) - assert updated_headers["headers"]["baggage"] == combined_baggage.serialize( + assert outgoing_headers["headers"]["baggage"] == combined_baggage.serialize( include_third_party=True ) + + +@pytest.mark.parametrize("monitor_beat_tasks", [True, False]) +def test_celery_trace_propagation_default(sentry_init, monitor_beat_tasks): + """ + The celery integration does not check the traces_sample_rate. + By default traces_sample_rate is None which means "do not propagate traces". + But the celery integration does not check this value. + The Celery integration has its own mechanism to propagate traces: + https://docs.sentry.io/platforms/python/integrations/celery/#distributed-traces + """ + sentry_init() + + headers = {} + span = None + + scope = sentry_sdk.Scope.get_isolation_scope() + + outgoing_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) + + assert outgoing_headers["sentry-trace"] == scope.get_traceparent() + assert outgoing_headers["headers"]["sentry-trace"] == scope.get_traceparent() + assert outgoing_headers["baggage"] == scope.get_baggage().serialize() + assert outgoing_headers["headers"]["baggage"] == scope.get_baggage().serialize() + + if monitor_beat_tasks: + assert "sentry-monitor-start-timestamp-s" in outgoing_headers + assert "sentry-monitor-start-timestamp-s" in outgoing_headers["headers"] + else: + assert "sentry-monitor-start-timestamp-s" not in outgoing_headers + assert "sentry-monitor-start-timestamp-s" not in outgoing_headers["headers"] + + +@pytest.mark.parametrize( + "traces_sample_rate,monitor_beat_tasks", + list(itertools.product([None, 0, 0.0, 0.5, 1.0, 1, 2], [True, False])), +) +def test_celery_trace_propagation_traces_sample_rate( + sentry_init, traces_sample_rate, monitor_beat_tasks +): + """ + The celery integration does not check the traces_sample_rate. + By default traces_sample_rate is None which means "do not propagate traces". + But the celery integration does not check this value. + The Celery integration has its own mechanism to propagate traces: + https://docs.sentry.io/platforms/python/integrations/celery/#distributed-traces + """ + sentry_init(traces_sample_rate=traces_sample_rate) + + headers = {} + span = None + + scope = sentry_sdk.Scope.get_isolation_scope() + + outgoing_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) + + assert outgoing_headers["sentry-trace"] == scope.get_traceparent() + assert outgoing_headers["headers"]["sentry-trace"] == scope.get_traceparent() + assert outgoing_headers["baggage"] == scope.get_baggage().serialize() + assert outgoing_headers["headers"]["baggage"] == scope.get_baggage().serialize() + + if monitor_beat_tasks: + assert "sentry-monitor-start-timestamp-s" in outgoing_headers + assert "sentry-monitor-start-timestamp-s" in outgoing_headers["headers"] + else: + assert "sentry-monitor-start-timestamp-s" not in outgoing_headers + assert "sentry-monitor-start-timestamp-s" not in outgoing_headers["headers"] + + +@pytest.mark.parametrize( + "enable_tracing,monitor_beat_tasks", + list(itertools.product([None, True, False], [True, False])), +) +def test_celery_trace_propagation_enable_tracing( + sentry_init, enable_tracing, monitor_beat_tasks +): + """ + The celery integration does not check the traces_sample_rate. + By default traces_sample_rate is None which means "do not propagate traces". + But the celery integration does not check this value. + The Celery integration has its own mechanism to propagate traces: + https://docs.sentry.io/platforms/python/integrations/celery/#distributed-traces + """ + sentry_init(enable_tracing=enable_tracing) + + headers = {} + span = None + + scope = sentry_sdk.Scope.get_isolation_scope() + + outgoing_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) + + assert outgoing_headers["sentry-trace"] == scope.get_traceparent() + assert outgoing_headers["headers"]["sentry-trace"] == scope.get_traceparent() + assert outgoing_headers["baggage"] == scope.get_baggage().serialize() + assert outgoing_headers["headers"]["baggage"] == scope.get_baggage().serialize() + + if monitor_beat_tasks: + assert "sentry-monitor-start-timestamp-s" in outgoing_headers + assert "sentry-monitor-start-timestamp-s" in outgoing_headers["headers"] + else: + assert "sentry-monitor-start-timestamp-s" not in outgoing_headers + assert "sentry-monitor-start-timestamp-s" not in outgoing_headers["headers"]