Skip to content

Commit

Permalink
New branch with changes, because the refactoring made using the old b… (
Browse files Browse the repository at this point in the history
#3176)

After refactoring the Celery Beat integration there was a regression, where the ok/error check ins for crons where not sent. This fixes the problem and adds integrations tests to not have this regression again.
  • Loading branch information
antonpirker authored Jun 18, 2024
1 parent 1f70e05 commit c883fa8
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 15 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/test-integrations-data-processing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Start Redis
uses: supercharge/[email protected]
- name: Setup Test Env
run: |
pip install coverage tox
Expand Down Expand Up @@ -108,6 +110,8 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Start Redis
uses: supercharge/[email protected]
- name: Setup Test Env
run: |
pip install coverage tox
Expand Down
5 changes: 5 additions & 0 deletions scripts/split-tox-gh-actions/split-tox-gh-actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
"asyncpg",
}

FRAMEWORKS_NEEDING_REDIS = {
"celery",
}

FRAMEWORKS_NEEDING_CLICKHOUSE = {
"clickhouse_driver",
}
Expand Down Expand Up @@ -275,6 +279,7 @@ def render_template(group, frameworks, py_versions_pinned, py_versions_latest):
"needs_aws_credentials": bool(set(frameworks) & FRAMEWORKS_NEEDING_AWS),
"needs_clickhouse": bool(set(frameworks) & FRAMEWORKS_NEEDING_CLICKHOUSE),
"needs_postgres": bool(set(frameworks) & FRAMEWORKS_NEEDING_POSTGRES),
"needs_redis": bool(set(frameworks) & FRAMEWORKS_NEEDING_REDIS),
"needs_github_secrets": bool(
set(frameworks) & FRAMEWORKS_NEEDING_GITHUB_SECRETS
),
Expand Down
5 changes: 5 additions & 0 deletions scripts/split-tox-gh-actions/templates/test_group.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
- uses: getsentry/action-clickhouse-in-ci@v1
{% endif %}

{% if needs_redis %}
- name: Start Redis
uses: supercharge/[email protected]
{% endif %}

- name: Setup Test Env
run: |
pip install coverage tox
Expand Down
2 changes: 1 addition & 1 deletion sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(

_patch_beat_apply_entry()
_patch_redbeat_maybe_due()
_setup_celery_beat_signals()
_setup_celery_beat_signals(monitor_beat_tasks)

@staticmethod
def setup_once():
Expand Down
24 changes: 12 additions & 12 deletions sentry_sdk/integrations/celery/beat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from functools import wraps
import sentry_sdk
from sentry_sdk.crons import capture_checkin, MonitorStatus
from sentry_sdk.integrations import DidNotEnable
Expand Down Expand Up @@ -159,7 +158,7 @@ def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
schedule_entry.options["headers"] = headers


def _wrap_beat_scheduler(f):
def _wrap_beat_scheduler(original_function):
# type: (Callable[..., Any]) -> Callable[..., Any]
"""
Makes sure that:
Expand All @@ -171,14 +170,19 @@ def _wrap_beat_scheduler(f):
After the patched function is called,
Celery Beat will call apply_async to put the task in the queue.
"""
# Patch only once
# Can't use __name__ here, because some of our tests mock original_apply_entry
already_patched = "sentry_patched_scheduler" in str(original_function)
if already_patched:
return original_function

from sentry_sdk.integrations.celery import CeleryIntegration

@wraps(f)
def sentry_patched_scheduler(*args, **kwargs):
# type: (*Any, **Any) -> None
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
if integration is None:
return f(*args, **kwargs)
return original_function(*args, **kwargs)

# Tasks started by Celery Beat start a new Trace
scope = Scope.get_isolation_scope()
Expand All @@ -188,7 +192,7 @@ def sentry_patched_scheduler(*args, **kwargs):
scheduler, schedule_entry = args
_apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)

return f(*args, **kwargs)
return original_function(*args, **kwargs)

return sentry_patched_scheduler

Expand All @@ -206,13 +210,9 @@ def _patch_redbeat_maybe_due():
RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due)


def _setup_celery_beat_signals():
# type: () -> None
from sentry_sdk.integrations.celery import CeleryIntegration

integration = sentry_sdk.get_client().get_integration(CeleryIntegration)

if integration is not None and integration.monitor_beat_tasks:
def _setup_celery_beat_signals(monitor_beat_tasks):
# type: (bool) -> None
if monitor_beat_tasks:
task_success.connect(crons_task_success)
task_failure.connect(crons_task_failure)
task_retry.connect(crons_task_retry)
Expand Down
58 changes: 58 additions & 0 deletions tests/integrations/celery/integration_tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import os
import signal
import tempfile
import threading
import time

from celery.beat import Scheduler

from sentry_sdk.utils import logger


class ImmediateScheduler(Scheduler):
"""
A custom scheduler that starts tasks immediately after starting Celery beat.
"""

def setup_schedule(self):
super().setup_schedule()
for _, entry in self.schedule.items():
self.apply_entry(entry)

def tick(self):
# Override tick to prevent the normal schedule cycle
return 1


def kill_beat(beat_pid_file, delay_seconds=1):
"""
Terminates Celery Beat after the given `delay_seconds`.
"""
logger.info("Starting Celery Beat killer...")
time.sleep(delay_seconds)
pid = int(open(beat_pid_file, "r").read())
logger.info("Terminating Celery Beat...")
os.kill(pid, signal.SIGTERM)


def run_beat(celery_app, runtime_seconds=1, loglevel="warning", quiet=True):
"""
Run Celery Beat that immediately starts tasks.
The Celery Beat instance is automatically terminated after `runtime_seconds`.
"""
logger.info("Starting Celery Beat...")
pid_file = os.path.join(tempfile.mkdtemp(), f"celery-beat-{os.getpid()}.pid")

t = threading.Thread(
target=kill_beat,
args=(pid_file,),
kwargs={"delay_seconds": runtime_seconds},
)
t.start()

beat_instance = celery_app.Beat(
loglevel=loglevel,
quiet=quiet,
pidfile=pid_file,
)
beat_instance.run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import os
import pytest

from celery.contrib.testing.worker import start_worker

from sentry_sdk.utils import logger

from tests.integrations.celery.integration_tests import run_beat


REDIS_SERVER = "redis://127.0.0.1:6379"
REDIS_DB = 15


@pytest.fixture()
def celery_config():
return {
"worker_concurrency": 1,
"broker_url": f"{REDIS_SERVER}/{REDIS_DB}",
"result_backend": f"{REDIS_SERVER}/{REDIS_DB}",
"beat_scheduler": "tests.integrations.celery.integration_tests:ImmediateScheduler",
"task_always_eager": False,
"task_create_missing_queues": True,
"task_default_queue": f"queue_{os.getpid()}",
}


@pytest.fixture
def celery_init(sentry_init, celery_config):
"""
Create a Sentry instrumented Celery app.
"""
from celery import Celery

from sentry_sdk.integrations.celery import CeleryIntegration

def inner(propagate_traces=True, monitor_beat_tasks=False, **kwargs):
sentry_init(
integrations=[
CeleryIntegration(
propagate_traces=propagate_traces,
monitor_beat_tasks=monitor_beat_tasks,
)
],
**kwargs,
)
app = Celery("tasks")
app.conf.update(celery_config)

return app

return inner


@pytest.mark.forked
def test_explanation(celery_init, capture_envelopes):
"""
This is a dummy test for explaining how to test using Celery Beat
"""

# First initialize a Celery app.
# You can give the options of CeleryIntegrations
# and the options for `sentry_dks.init` as keyword arguments.
# See the celery_init fixture for details.
app = celery_init(
monitor_beat_tasks=True,
)

# Capture envelopes.
envelopes = capture_envelopes()

# Define the task you want to run
@app.task
def test_task():
logger.info("Running test_task")

# Add the task to the beat schedule
app.add_periodic_task(60.0, test_task.s(), name="success_from_beat")

# Start a Celery worker
with start_worker(app, perform_ping_check=False):
# And start a Celery Beat instance
# This Celery Beat will start the task above immediately
# after start for the first time
# By default Celery Beat is terminated after 1 second.
# See `run_beat` function on how to change this.
run_beat(app)

# After the Celery Beat is terminated, you can check the envelopes
assert len(envelopes) >= 0


@pytest.mark.forked
def test_beat_task_crons_success(celery_init, capture_envelopes):
app = celery_init(
monitor_beat_tasks=True,
)
envelopes = capture_envelopes()

@app.task
def test_task():
logger.info("Running test_task")

app.add_periodic_task(60.0, test_task.s(), name="success_from_beat")

with start_worker(app, perform_ping_check=False):
run_beat(app)

assert len(envelopes) == 2
(envelop_in_progress, envelope_ok) = envelopes

assert envelop_in_progress.items[0].headers["type"] == "check_in"
check_in = envelop_in_progress.items[0].payload.json
assert check_in["type"] == "check_in"
assert check_in["monitor_slug"] == "success_from_beat"
assert check_in["status"] == "in_progress"

assert envelope_ok.items[0].headers["type"] == "check_in"
check_in = envelope_ok.items[0].payload.json
assert check_in["type"] == "check_in"
assert check_in["monitor_slug"] == "success_from_beat"
assert check_in["status"] == "ok"


@pytest.mark.forked
def test_beat_task_crons_error(celery_init, capture_envelopes):
app = celery_init(
monitor_beat_tasks=True,
)
envelopes = capture_envelopes()

@app.task
def test_task():
logger.info("Running test_task")
1 / 0

app.add_periodic_task(60.0, test_task.s(), name="failure_from_beat")

with start_worker(app, perform_ping_check=False):
run_beat(app)

envelop_in_progress = envelopes[0]
envelope_error = envelopes[-1]

check_in = envelop_in_progress.items[0].payload.json
assert check_in["type"] == "check_in"
assert check_in["monitor_slug"] == "failure_from_beat"
assert check_in["status"] == "in_progress"

check_in = envelope_error.items[0].payload.json
assert check_in["type"] == "check_in"
assert check_in["monitor_slug"] == "failure_from_beat"
assert check_in["status"] == "error"
14 changes: 12 additions & 2 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,19 @@ def inner(signal, f):

@pytest.fixture
def init_celery(sentry_init, request):
def inner(propagate_traces=True, backend="always_eager", **kwargs):
def inner(
propagate_traces=True,
backend="always_eager",
monitor_beat_tasks=False,
**kwargs,
):
sentry_init(
integrations=[CeleryIntegration(propagate_traces=propagate_traces)],
integrations=[
CeleryIntegration(
propagate_traces=propagate_traces,
monitor_beat_tasks=monitor_beat_tasks,
)
],
**kwargs,
)
celery = Celery(__name__)
Expand Down

0 comments on commit c883fa8

Please sign in to comment.