From 9f3bb1dcdac6511813fd852a71964d3a4c77a375 Mon Sep 17 00:00:00 2001 From: "Gabriele N. Tornetta" Date: Wed, 23 Aug 2023 17:13:11 +0100 Subject: [PATCH] fix(debugging): emit probe status updates unconditionally (#6714) We ensure that probe status update are sent when due, regardless of whether the RCM worker has found new data. This fixes a regression introduced by the IPC refactor for the RCM client. - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [ ] Title is accurate. - [ ] No unnecessary changes are introduced. - [ ] Description motivates each change. - [ ] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [ ] Testing strategy adequately addresses listed risk(s). - [ ] Change is maintainable (easy to change, telemetry, documentation). - [ ] Release note makes sense to a user of the library. - [ ] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [ ] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) (cherry picked from commit 1726105fe1825e52855aeabcae61233d22fa4692) --- ddtrace/debugging/_probe/remoteconfig.py | 22 ++- ...atus-emit-on-no-data-eb9b13d0343f92c7.yaml | 5 + tests/debugging/test_debugger.py | 172 ++++++------------ 3 files changed, 78 insertions(+), 121 deletions(-) create mode 100644 releasenotes/notes/fix-di-probe-status-emit-on-no-data-eb9b13d0343f92c7.yaml diff --git a/ddtrace/debugging/_probe/remoteconfig.py b/ddtrace/debugging/_probe/remoteconfig.py index c22e8a80290..bad2db86a19 100644 --- a/ddtrace/debugging/_probe/remoteconfig.py +++ b/ddtrace/debugging/_probe/remoteconfig.py @@ -318,6 +318,19 @@ def __init__(self, data_connector, callback, name, status_logger): self._next_status_update_timestamp() def _exec_callback(self, data, test_tracer=None): + # Check if it is time to re-emit probe status messages. + # DEV: We use the periodic signal from the remote config client worker + # thread to avoid having to spawn a separate thread for this. + if time.time() > self._status_timestamp: + log.debug( + "[%s][P: %s] Dynamic Instrumentation,Emitting probe status log messages", + os.getpid(), + os.getppid(), + ) + probes = [probe for config in self._configs.values() for probe in config.values()] + self._callback(ProbePollerEvent.STATUS_UPDATE, probes) + self._next_status_update_timestamp() + if data: metadatas = data["metadata"] rc_configs = data["config"] @@ -325,15 +338,6 @@ def _exec_callback(self, data, test_tracer=None): # separate thread for this. log.debug("[%s][P: %s] Dynamic Instrumentation Updated", os.getpid(), os.getppid()) for idx in range(len(rc_configs)): - if time.time() > self._status_timestamp: - log.debug( - "[%s][P: %s] Dynamic Instrumentation,Emitting probe status log messages", - os.getpid(), - os.getppid(), - ) - probes = [probe for config in self._configs.values() for probe in config.values()] - self._callback(ProbePollerEvent.STATUS_UPDATE, probes) - self._next_status_update_timestamp() if metadatas[idx] is None: log.debug("[%s][P: %s] Dynamic Instrumentation, no RCM metadata", os.getpid(), os.getppid()) return diff --git a/releasenotes/notes/fix-di-probe-status-emit-on-no-data-eb9b13d0343f92c7.yaml b/releasenotes/notes/fix-di-probe-status-emit-on-no-data-eb9b13d0343f92c7.yaml new file mode 100644 index 00000000000..2fdf89fe6de --- /dev/null +++ b/releasenotes/notes/fix-di-probe-status-emit-on-no-data-eb9b13d0343f92c7.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + dynamic instrumentation: fixed a bug that might have caused probe status + to fail to update correctly. diff --git a/tests/debugging/test_debugger.py b/tests/debugging/test_debugger.py index f918e7fe0d4..445cd2b373e 100644 --- a/tests/debugging/test_debugger.py +++ b/tests/debugging/test_debugger.py @@ -3,7 +3,6 @@ import sys from threading import Thread from time import sleep -from time import time import mock from mock.mock import call @@ -398,7 +397,6 @@ def test_debugger_multiple_threads(): good_probe(), create_snapshot_line_probe(probe_id="thread-test", source_file="tests/submod/stuff.py", line=40), ) - sleep(0.5) callables = [Stuff().instancestuff, lambda: Stuff().propertystuff] threads = [Thread(target=callables[_ % len(callables)]) for _ in range(10)] @@ -409,12 +407,10 @@ def test_debugger_multiple_threads(): for t in threads: t.join() - sleep(1.5) - - if any(len(snapshots) < len(callables) for snapshots in d.uploader.payloads): - sleep(0.5) + d.uploader.wait_for_payloads( + lambda q: q and all(len(snapshots) >= len(callables) for snapshots in d.uploader.payloads) + ) - assert d.uploader.queue for snapshots in d.uploader.payloads[1:]: assert len(snapshots) >= len(callables) assert {s["debugger.snapshot"]["probe"]["id"] for s in snapshots} == { @@ -618,127 +614,79 @@ def test_debugger_line_probe_on_wrapped_function(stuff): assert snapshot.probe.probe_id == "line-probe-wrapped-method" -@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only") -def test_probe_status_logging(monkeypatch, remote_config_worker): - monkeypatch.setenv("DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "10") - remoteconfig_poller.interval = 10 - from ddtrace.internal.remoteconfig.client import RemoteConfigClient - - old_request = RemoteConfigClient.request - - def request(self, *args, **kwargs): - import uuid - - for cb in self.get_pubsubs(): - cb._subscriber._status_timestamp = time() - cb._subscriber.interval = 0.1 - cb.append_and_publish({"test": str(uuid.uuid4())}, "", None) - return True - - RemoteConfigClient.request = request +def test_probe_status_logging(remote_config_worker): assert remoteconfig_poller.status == ServiceStatus.STOPPED - try: - with rcm_endpoint(), debugger(diagnostics_interval=0.5, enabled=True) as d: - d.add_probes( - create_snapshot_line_probe( - probe_id="line-probe-ok", - source_file="tests/submod/stuff.py", - line=36, - condition=None, - ), - create_snapshot_function_probe( - probe_id="line-probe-error", - module="tests.submod.stuff", - func_qname="foo", - condition=None, - ), - ) - - queue = d.probe_status_logger.queue - - def count_status(queue): - return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue) - - sleep(0.1) - assert count_status(queue) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1} - remoteconfig_poller._client.request() - sleep(0.1) - assert count_status(queue) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2} + with rcm_endpoint(), debugger(diagnostics_interval=0.2, enabled=True) as d: + d.add_probes( + create_snapshot_line_probe( + probe_id="line-probe-ok", + source_file="tests/submod/stuff.py", + line=36, + condition=None, + ), + create_snapshot_function_probe( + probe_id="line-probe-error", + module="tests.submod.stuff", + func_qname="foo", + condition=None, + ), + ) - remoteconfig_poller._client.request() - sleep(0.1) - assert count_status(queue) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3} - finally: - RemoteConfigClient.request = old_request + logger = d.probe_status_logger + def count_status(queue): + return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue) -@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only") -def test_probe_status_logging_reemit_on_modify(monkeypatch, remote_config_worker): - monkeypatch.setenv("DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "10") - remoteconfig_poller.interval = 10 - from ddtrace.internal.remoteconfig.client import RemoteConfigClient + logger.wait(lambda q: count_status(q) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1}) - old_request = RemoteConfigClient.request + logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2}) - def request(self, *args, **kwargs): - import uuid + logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3}) - for cb in self.get_pubsubs(): - cb._subscriber._status_timestamp = time() - cb._subscriber.interval = 0.1 - cb.append_and_publish({"test": str(uuid.uuid4())}, "", None) - return True - RemoteConfigClient.request = request +def test_probe_status_logging_reemit_on_modify(remote_config_worker): assert remoteconfig_poller.status == ServiceStatus.STOPPED - try: - with rcm_endpoint(), debugger(diagnostics_interval=0.3, enabled=True) as d: - d.add_probes( - create_snapshot_line_probe( - version=1, - probe_id="line-probe-ok", - source_file="tests/submod/stuff.py", - line=36, - condition=None, - ), - ) - d.modify_probes( - create_snapshot_line_probe( - version=2, - probe_id="line-probe-ok", - source_file="tests/submod/stuff.py", - line=36, - condition=None, - ), - ) - queue = d.probe_status_logger.queue + with rcm_endpoint(), debugger(diagnostics_interval=0.2, enabled=True) as d: + d.add_probes( + create_snapshot_line_probe( + version=1, + probe_id="line-probe-ok", + source_file="tests/submod/stuff.py", + line=36, + condition=None, + ), + ) + d.modify_probes( + create_snapshot_line_probe( + version=2, + probe_id="line-probe-ok", + source_file="tests/submod/stuff.py", + line=36, + condition=None, + ), + ) - def count_status(queue): - return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue) + logger = d.probe_status_logger + queue = logger.queue - def versions(queue, status): - return [ - _["debugger"]["diagnostics"]["probeVersion"] - for _ in queue - if _["debugger"]["diagnostics"]["status"] == status - ] + def count_status(queue): + return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue) - sleep(0.1) - assert count_status(queue) == {"INSTALLED": 2, "RECEIVED": 1} - assert versions(queue, "INSTALLED") == [1, 2] - assert versions(queue, "RECEIVED") == [1] + def versions(queue, status): + return [ + _["debugger"]["diagnostics"]["probeVersion"] + for _ in queue + if _["debugger"]["diagnostics"]["status"] == status + ] - queue[:] = [] - sleep(0.5) - remoteconfig_poller._client.request() - sleep(0.1) - assert count_status(queue) == {"INSTALLED": 1} - assert versions(queue, "INSTALLED") == [2] + logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 1}) + assert versions(queue, "INSTALLED") == [1, 2] + assert versions(queue, "RECEIVED") == [1] - finally: - RemoteConfigClient.request = old_request + logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 1}) + assert versions(queue, "INSTALLED") == [1, 2, 2] @pytest.mark.parametrize("duration", [1e5, 1e6, 1e7])