diff --git a/ddtrace/debugging/_probe/remoteconfig.py b/ddtrace/debugging/_probe/remoteconfig.py index a0ed657686a..d9bd510e28b 100644 --- a/ddtrace/debugging/_probe/remoteconfig.py +++ b/ddtrace/debugging/_probe/remoteconfig.py @@ -318,6 +318,19 @@ def __init__(self, data_connector, callback, name, status_logger): self._next_status_update_timestamp() def _exec_callback(self, data, test_tracer=None): + # Check if it is time to re-emit probe status messages. + # DEV: We use the periodic signal from the remote config client worker + # thread to avoid having to spawn a separate thread for this. + if time.time() > self._status_timestamp: + log.debug( + "[%s][P: %s] Dynamic Instrumentation,Emitting probe status log messages", + os.getpid(), + os.getppid(), + ) + probes = [probe for config in self._configs.values() for probe in config.values()] + self._callback(ProbePollerEvent.STATUS_UPDATE, probes) + self._next_status_update_timestamp() + if data: metadatas = data["metadata"] rc_configs = data["config"] @@ -325,15 +338,6 @@ def _exec_callback(self, data, test_tracer=None): # separate thread for this. log.debug("[%s][P: %s] Dynamic Instrumentation Updated", os.getpid(), os.getppid()) for idx in range(len(rc_configs)): - if time.time() > self._status_timestamp: - log.debug( - "[%s][P: %s] Dynamic Instrumentation,Emitting probe status log messages", - os.getpid(), - os.getppid(), - ) - probes = [probe for config in self._configs.values() for probe in config.values()] - self._callback(ProbePollerEvent.STATUS_UPDATE, probes) - self._next_status_update_timestamp() if metadatas[idx] is None: log.debug("[%s][P: %s] Dynamic Instrumentation, no RCM metadata", os.getpid(), os.getppid()) return diff --git a/releasenotes/notes/fix-di-probe-status-emit-on-no-data-eb9b13d0343f92c7.yaml b/releasenotes/notes/fix-di-probe-status-emit-on-no-data-eb9b13d0343f92c7.yaml new file mode 100644 index 00000000000..2fdf89fe6de --- /dev/null +++ b/releasenotes/notes/fix-di-probe-status-emit-on-no-data-eb9b13d0343f92c7.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + dynamic instrumentation: fixed a bug that might have caused probe status + to fail to update correctly. diff --git a/tests/debugging/test_debugger.py b/tests/debugging/test_debugger.py index 4475b3caf02..32c11b43fc0 100644 --- a/tests/debugging/test_debugger.py +++ b/tests/debugging/test_debugger.py @@ -3,7 +3,6 @@ import sys from threading import Thread from time import sleep -from time import time import mock from mock.mock import call @@ -397,7 +396,6 @@ def test_debugger_multiple_threads(): good_probe(), create_snapshot_line_probe(probe_id="thread-test", source_file="tests/submod/stuff.py", line=40), ) - sleep(0.5) callables = [Stuff().instancestuff, lambda: Stuff().propertystuff] threads = [Thread(target=callables[_ % len(callables)]) for _ in range(10)] @@ -408,12 +406,10 @@ def test_debugger_multiple_threads(): for t in threads: t.join() - sleep(1.5) - - if any(len(snapshots) < len(callables) for snapshots in d.uploader.payloads): - sleep(0.5) + d.uploader.wait_for_payloads( + lambda q: q and all(len(snapshots) >= len(callables) for snapshots in d.uploader.payloads) + ) - assert d.uploader.queue for snapshots in d.uploader.payloads[1:]: assert len(snapshots) >= len(callables) assert {s["debugger.snapshot"]["probe"]["id"] for s in snapshots} == { @@ -617,126 +613,79 @@ def test_debugger_line_probe_on_wrapped_function(stuff): assert snapshot.probe.probe_id == "line-probe-wrapped-method" -@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only") -def test_probe_status_logging(monkeypatch, remote_config_worker): - monkeypatch.setenv("DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "10") - remoteconfig_poller.interval = 10 - from ddtrace.internal.remoteconfig.client import RemoteConfigClient - - old_request = RemoteConfigClient.request - - def request(self, *args, **kwargs): - import uuid - - for cb in self.get_pubsubs(): - cb._subscriber._status_timestamp = time() - cb._subscriber.interval = 0.1 - cb.append_and_publish({"test": str(uuid.uuid4())}, "", None) - return True - - RemoteConfigClient.request = request +def test_probe_status_logging(remote_config_worker): assert remoteconfig_poller.status == ServiceStatus.STOPPED - try: - with rcm_endpoint(), debugger(diagnostics_interval=0.5, enabled=True) as d: - d.add_probes( - create_snapshot_line_probe( - probe_id="line-probe-ok", - source_file="tests/submod/stuff.py", - line=36, - condition=None, - ), - create_snapshot_function_probe( - probe_id="line-probe-error", - module="tests.submod.stuff", - func_qname="foo", - condition=None, - ), - ) - queue = d.probe_status_logger.queue - - def count_status(queue): - return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue) - - sleep(0.1) - assert count_status(queue) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1} - - remoteconfig_poller._client.request() - sleep(0.1) - assert count_status(queue) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2} + with rcm_endpoint(), debugger(diagnostics_interval=0.2, enabled=True) as d: + d.add_probes( + create_snapshot_line_probe( + probe_id="line-probe-ok", + source_file="tests/submod/stuff.py", + line=36, + condition=None, + ), + create_snapshot_function_probe( + probe_id="line-probe-error", + module="tests.submod.stuff", + func_qname="foo", + condition=None, + ), + ) - remoteconfig_poller._client.request() - sleep(0.1) - assert count_status(queue) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3} - finally: - RemoteConfigClient.request = old_request + logger = d.probe_status_logger + def count_status(queue): + return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue) -@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only") -def test_probe_status_logging_reemit_on_modify(monkeypatch, remote_config_worker): - monkeypatch.setenv("DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "10") - remoteconfig_poller.interval = 10 - from ddtrace.internal.remoteconfig.client import RemoteConfigClient + logger.wait(lambda q: count_status(q) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1}) - old_request = RemoteConfigClient.request + logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2}) - def request(self, *args, **kwargs): - import uuid + logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3}) - for cb in self.get_pubsubs(): - cb._subscriber._status_timestamp = time() - cb._subscriber.interval = 0.1 - cb.append_and_publish({"test": str(uuid.uuid4())}, "", None) - return True - RemoteConfigClient.request = request +def test_probe_status_logging_reemit_on_modify(remote_config_worker): assert remoteconfig_poller.status == ServiceStatus.STOPPED - try: - with rcm_endpoint(), debugger(diagnostics_interval=0.0, enabled=True) as d: - d.add_probes( - create_snapshot_line_probe( - version=1, - probe_id="line-probe-ok", - source_file="tests/submod/stuff.py", - line=36, - condition=None, - ), - ) - d.modify_probes( - create_snapshot_line_probe( - version=2, - probe_id="line-probe-ok", - source_file="tests/submod/stuff.py", - line=36, - condition=None, - ), - ) - - logger = d.probe_status_logger - queue = logger.queue - def count_status(queue): - return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue) + with rcm_endpoint(), debugger(diagnostics_interval=0.2, enabled=True) as d: + d.add_probes( + create_snapshot_line_probe( + version=1, + probe_id="line-probe-ok", + source_file="tests/submod/stuff.py", + line=36, + condition=None, + ), + ) + d.modify_probes( + create_snapshot_line_probe( + version=2, + probe_id="line-probe-ok", + source_file="tests/submod/stuff.py", + line=36, + condition=None, + ), + ) - def versions(queue, status): - return [ - _["debugger"]["diagnostics"]["probeVersion"] - for _ in queue - if _["debugger"]["diagnostics"]["status"] == status - ] + logger = d.probe_status_logger + queue = logger.queue - logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 1}) - assert versions(queue, "INSTALLED") == [1, 2] - assert versions(queue, "RECEIVED") == [1] + def count_status(queue): + return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue) - logger.clear() - remoteconfig_poller._client.request() + def versions(queue, status): + return [ + _["debugger"]["diagnostics"]["probeVersion"] + for _ in queue + if _["debugger"]["diagnostics"]["status"] == status + ] - logger.wait(lambda q: count_status(q) == {"INSTALLED": 1}) - assert versions(queue, "INSTALLED") == [2] + logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 1}) + assert versions(queue, "INSTALLED") == [1, 2] + assert versions(queue, "RECEIVED") == [1] - finally: - RemoteConfigClient.request = old_request + logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 1}) + assert versions(queue, "INSTALLED") == [1, 2, 2] @pytest.mark.parametrize("duration", [1e5, 1e6, 1e7])