diff --git a/ddtrace/internal/forksafe.py b/ddtrace/internal/forksafe.py index 62a8371e4b0..6b311de6025 100644 --- a/ddtrace/internal/forksafe.py +++ b/ddtrace/internal/forksafe.py @@ -7,6 +7,7 @@ import typing import weakref +from ddtrace.internal.module import ModuleWatchdog from ddtrace.vendor import wrapt @@ -22,6 +23,25 @@ _soft = False +def patch_gevent_hub_reinit(module): + # The gevent hub is re-initialized *after* the after-in-child fork hooks are + # called, so we patch the gevent.hub.reinit function to ensure that the + # fork hooks run again after this further re-initialisation, if it is ever + # called. + from ddtrace.internal.wrapping import wrap + + def wrapped_reinit(f, args, kwargs): + try: + return f(*args, **kwargs) + finally: + ddtrace_after_in_child() + + wrap(module.reinit, wrapped_reinit) + + +ModuleWatchdog.register_module_hook("gevent.hub", patch_gevent_hub_reinit) + + def ddtrace_after_in_child(): # type: () -> None global _registry diff --git a/riotfile.py b/riotfile.py index 418d888838b..ea6ea85fb86 100644 --- a/riotfile.py +++ b/riotfile.py @@ -246,6 +246,7 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION): "structlog": latest, # httpretty v1.0 drops python 2.7 support "httpretty": "==0.9.7", + "gevent": latest, }, ) ], diff --git a/tests/tracer/test_forksafe.py b/tests/tracer/test_forksafe.py index 756675d8dc5..367af80d3b2 100644 --- a/tests/tracer/test_forksafe.py +++ b/tests/tracer/test_forksafe.py @@ -1,4 +1,5 @@ import os +import sys import pytest import six @@ -285,3 +286,59 @@ def fn(): pid, status = os.waitpid(child, 0) exit_code = os.WEXITSTATUS(status) assert exit_code == 42 + + +@pytest.mark.subprocess( + out="" if (3,) < sys.version_info < (3, 7) else ("CTCTCT" if sys.platform == "darwin" else "CCCTTT"), + err=None, +) +def test_gevent_reinit_patch(): + import os + import sys + + from ddtrace.internal import forksafe + from ddtrace.internal.periodic import PeriodicService + + class TestService(PeriodicService): + def __init__(self): + super(TestService, self).__init__(interval=1.0) + + def periodic(self): + sys.stdout.write("T") + + service = TestService() + service.start() + + def restart_service(): + global service + + service.stop() + service = TestService() + service.start() + + forksafe.register(restart_service) + + import gevent # noqa + + def run_child(): + global service + + # We mimic what gunicorn does in child processes + gevent.monkey.patch_all() + gevent.hub.reinit() + + sys.stdout.write("C") + + gevent.sleep(1.5) + + service.stop() + + def fork_workers(num): + for _ in range(num): + if os.fork() == 0: + run_child() + sys.exit(0) + + fork_workers(3) + + service.stop()