Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New branch with changes, because the refactoring made using the old b… #3176

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/test-integrations-data-processing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Start Redis
uses: supercharge/[email protected]
- name: Setup Test Env
run: |
pip install coverage tox
Expand Down Expand Up @@ -108,6 +110,8 @@ jobs:
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Start Redis
uses: supercharge/[email protected]
- name: Setup Test Env
run: |
pip install coverage tox
Expand Down
5 changes: 5 additions & 0 deletions scripts/split-tox-gh-actions/split-tox-gh-actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
"asyncpg",
}

FRAMEWORKS_NEEDING_REDIS = {
"celery",
}

FRAMEWORKS_NEEDING_CLICKHOUSE = {
"clickhouse_driver",
}
Expand Down Expand Up @@ -275,6 +279,7 @@ def render_template(group, frameworks, py_versions_pinned, py_versions_latest):
"needs_aws_credentials": bool(set(frameworks) & FRAMEWORKS_NEEDING_AWS),
"needs_clickhouse": bool(set(frameworks) & FRAMEWORKS_NEEDING_CLICKHOUSE),
"needs_postgres": bool(set(frameworks) & FRAMEWORKS_NEEDING_POSTGRES),
"needs_redis": bool(set(frameworks) & FRAMEWORKS_NEEDING_REDIS),
"needs_github_secrets": bool(
set(frameworks) & FRAMEWORKS_NEEDING_GITHUB_SECRETS
),
Expand Down
5 changes: 5 additions & 0 deletions scripts/split-tox-gh-actions/templates/test_group.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
- uses: getsentry/action-clickhouse-in-ci@v1
{% endif %}

{% if needs_redis %}
- name: Start Redis
uses: supercharge/[email protected]
{% endif %}

- name: Setup Test Env
run: |
pip install coverage tox
Expand Down
2 changes: 1 addition & 1 deletion sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(

_patch_beat_apply_entry()
_patch_redbeat_maybe_due()
_setup_celery_beat_signals()
_setup_celery_beat_signals(monitor_beat_tasks)

@staticmethod
def setup_once():
Expand Down
24 changes: 12 additions & 12 deletions sentry_sdk/integrations/celery/beat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from functools import wraps
import sentry_sdk
from sentry_sdk.crons import capture_checkin, MonitorStatus
from sentry_sdk.integrations import DidNotEnable
Expand Down Expand Up @@ -159,7 +158,7 @@ def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
schedule_entry.options["headers"] = headers


def _wrap_beat_scheduler(f):
def _wrap_beat_scheduler(original_function):
# type: (Callable[..., Any]) -> Callable[..., Any]
"""
Makes sure that:
Expand All @@ -171,14 +170,19 @@ def _wrap_beat_scheduler(f):
After the patched function is called,
Celery Beat will call apply_async to put the task in the queue.
"""
# Patch only once
# Can't use __name__ here, because some of our tests mock original_apply_entry
already_patched = "sentry_patched_scheduler" in str(original_function)
if already_patched:
return original_function

from sentry_sdk.integrations.celery import CeleryIntegration

@wraps(f)
def sentry_patched_scheduler(*args, **kwargs):
# type: (*Any, **Any) -> None
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
if integration is None:
return f(*args, **kwargs)
return original_function(*args, **kwargs)

# Tasks started by Celery Beat start a new Trace
scope = Scope.get_isolation_scope()
Expand All @@ -188,7 +192,7 @@ def sentry_patched_scheduler(*args, **kwargs):
scheduler, schedule_entry = args
_apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)

return f(*args, **kwargs)
return original_function(*args, **kwargs)

return sentry_patched_scheduler

Expand All @@ -206,13 +210,9 @@ def _patch_redbeat_maybe_due():
RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due)


def _setup_celery_beat_signals():
# type: () -> None
from sentry_sdk.integrations.celery import CeleryIntegration

integration = sentry_sdk.get_client().get_integration(CeleryIntegration)

if integration is not None and integration.monitor_beat_tasks:
def _setup_celery_beat_signals(monitor_beat_tasks):
# type: (bool) -> None
if monitor_beat_tasks:
task_success.connect(crons_task_success)
task_failure.connect(crons_task_failure)
task_retry.connect(crons_task_retry)
Expand Down
58 changes: 58 additions & 0 deletions tests/integrations/celery/integration_tests/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import os
import signal
import tempfile
import threading
import time

from celery.beat import Scheduler

from sentry_sdk.utils import logger


class ImmediateScheduler(Scheduler):
"""
A custom scheduler that starts tasks immediately after starting Celery beat.
"""

def setup_schedule(self):
super().setup_schedule()
for _, entry in self.schedule.items():
self.apply_entry(entry)

def tick(self):
# Override tick to prevent the normal schedule cycle
return 1


def kill_beat(beat_pid_file, delay_seconds=1):
"""
Terminates Celery Beat after the given `delay_seconds`.
"""
logger.info("Starting Celery Beat killer...")
time.sleep(delay_seconds)
pid = int(open(beat_pid_file, "r").read())
logger.info("Terminating Celery Beat...")
os.kill(pid, signal.SIGTERM)


def run_beat(celery_app, runtime_seconds=1, loglevel="warning", quiet=True):
"""
Run Celery Beat that immediately starts tasks.
The Celery Beat instance is automatically terminated after `runtime_seconds`.
"""
logger.info("Starting Celery Beat...")
pid_file = os.path.join(tempfile.mkdtemp(), f"celery-beat-{os.getpid()}.pid")

t = threading.Thread(
target=kill_beat,
args=(pid_file,),
kwargs={"delay_seconds": runtime_seconds},
)
t.start()

beat_instance = celery_app.Beat(
loglevel=loglevel,
quiet=quiet,
pidfile=pid_file,
)
beat_instance.run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import os
import pytest

from celery.contrib.testing.worker import start_worker

from sentry_sdk.utils import logger

from tests.integrations.celery.integration_tests import run_beat


REDIS_SERVER = "redis://127.0.0.1:6379"
REDIS_DB = 15


@pytest.fixture()
def celery_config():
return {
"worker_concurrency": 1,
"broker_url": f"{REDIS_SERVER}/{REDIS_DB}",
"result_backend": f"{REDIS_SERVER}/{REDIS_DB}",
"beat_scheduler": "tests.integrations.celery.integration_tests:ImmediateScheduler",
"task_always_eager": False,
"task_create_missing_queues": True,
"task_default_queue": f"queue_{os.getpid()}",
}


@pytest.fixture
def celery_init(sentry_init, celery_config):
"""
Create a Sentry instrumented Celery app.
"""
from celery import Celery

from sentry_sdk.integrations.celery import CeleryIntegration

def inner(propagate_traces=True, monitor_beat_tasks=False, **kwargs):
sentry_init(
integrations=[
CeleryIntegration(
propagate_traces=propagate_traces,
monitor_beat_tasks=monitor_beat_tasks,
)
],
**kwargs,
)
app = Celery("tasks")
app.conf.update(celery_config)

return app

return inner


@pytest.mark.forked
def test_explanation(celery_init, capture_envelopes):
"""
This is a dummy test for explaining how to test using Celery Beat
"""

# First initialize a Celery app.
# You can give the options of CeleryIntegrations
# and the options for `sentry_dks.init` as keyword arguments.
# See the celery_init fixture for details.
app = celery_init(
monitor_beat_tasks=True,
)

# Capture envelopes.
envelopes = capture_envelopes()

# Define the task you want to run
@app.task
def test_task():
logger.info("Running test_task")

# Add the task to the beat schedule
app.add_periodic_task(60.0, test_task.s(), name="success_from_beat")

# Start a Celery worker
with start_worker(app, perform_ping_check=False):
# And start a Celery Beat instance
# This Celery Beat will start the task above immediately
# after start for the first time
# By default Celery Beat is terminated after 1 second.
# See `run_beat` function on how to change this.
run_beat(app)

# After the Celery Beat is terminated, you can check the envelopes
assert len(envelopes) >= 0


@pytest.mark.forked
def test_beat_task_crons_success(celery_init, capture_envelopes):
app = celery_init(
monitor_beat_tasks=True,
)
envelopes = capture_envelopes()

@app.task
def test_task():
logger.info("Running test_task")

app.add_periodic_task(60.0, test_task.s(), name="success_from_beat")

with start_worker(app, perform_ping_check=False):
run_beat(app)

assert len(envelopes) == 2
(envelop_in_progress, envelope_ok) = envelopes

assert envelop_in_progress.items[0].headers["type"] == "check_in"
check_in = envelop_in_progress.items[0].payload.json
assert check_in["type"] == "check_in"
assert check_in["monitor_slug"] == "success_from_beat"
assert check_in["status"] == "in_progress"

assert envelope_ok.items[0].headers["type"] == "check_in"
check_in = envelope_ok.items[0].payload.json
assert check_in["type"] == "check_in"
assert check_in["monitor_slug"] == "success_from_beat"
assert check_in["status"] == "ok"


@pytest.mark.forked
def test_beat_task_crons_error(celery_init, capture_envelopes):
app = celery_init(
monitor_beat_tasks=True,
)
envelopes = capture_envelopes()

@app.task
def test_task():
logger.info("Running test_task")
1 / 0

app.add_periodic_task(60.0, test_task.s(), name="failure_from_beat")

with start_worker(app, perform_ping_check=False):
run_beat(app)

envelop_in_progress = envelopes[0]
envelope_error = envelopes[-1]

check_in = envelop_in_progress.items[0].payload.json
assert check_in["type"] == "check_in"
assert check_in["monitor_slug"] == "failure_from_beat"
assert check_in["status"] == "in_progress"

check_in = envelope_error.items[0].payload.json
assert check_in["type"] == "check_in"
assert check_in["monitor_slug"] == "failure_from_beat"
assert check_in["status"] == "error"
14 changes: 12 additions & 2 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,19 @@ def inner(signal, f):

@pytest.fixture
def init_celery(sentry_init, request):
def inner(propagate_traces=True, backend="always_eager", **kwargs):
def inner(
propagate_traces=True,
backend="always_eager",
monitor_beat_tasks=False,
**kwargs,
):
sentry_init(
integrations=[CeleryIntegration(propagate_traces=propagate_traces)],
integrations=[
CeleryIntegration(
propagate_traces=propagate_traces,
monitor_beat_tasks=monitor_beat_tasks,
)
],
**kwargs,
)
celery = Celery(__name__)
Expand Down
Loading