Skip to content

Commit

Permalink
fix(debugging): emit probe status updates unconditionally (#6714)
Browse files Browse the repository at this point in the history
We ensure that probe status update are sent when due, regardless of
whether the RCM worker has found new data. This fixes a regression
introduced by the IPC refactor for the RCM client.

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed. If no release note is required, add label
`changelog/no-changelog`.
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

- [ ] Title is accurate.
- [ ] No unnecessary changes are introduced.
- [ ] Description motivates each change.
- [ ] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [ ] Testing strategy adequately addresses listed risk(s).
- [ ] Change is maintainable (easy to change, telemetry, documentation).
- [ ] Release note makes sense to a user of the library.
- [ ] Reviewer has explicitly acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment.
- [ ] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

(cherry picked from commit 1726105)
  • Loading branch information
P403n1x87 committed Aug 24, 2023
1 parent dc2e7d0 commit 9f3bb1d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 121 deletions.
22 changes: 13 additions & 9 deletions ddtrace/debugging/_probe/remoteconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,22 +318,26 @@ def __init__(self, data_connector, callback, name, status_logger):
self._next_status_update_timestamp()

def _exec_callback(self, data, test_tracer=None):
# Check if it is time to re-emit probe status messages.
# DEV: We use the periodic signal from the remote config client worker
# thread to avoid having to spawn a separate thread for this.
if time.time() > self._status_timestamp:
log.debug(
"[%s][P: %s] Dynamic Instrumentation,Emitting probe status log messages",
os.getpid(),
os.getppid(),
)
probes = [probe for config in self._configs.values() for probe in config.values()]
self._callback(ProbePollerEvent.STATUS_UPDATE, probes)
self._next_status_update_timestamp()

if data:
metadatas = data["metadata"]
rc_configs = data["config"]
# DEV: We emit a status update event here to avoid having to spawn a
# separate thread for this.
log.debug("[%s][P: %s] Dynamic Instrumentation Updated", os.getpid(), os.getppid())
for idx in range(len(rc_configs)):
if time.time() > self._status_timestamp:
log.debug(
"[%s][P: %s] Dynamic Instrumentation,Emitting probe status log messages",
os.getpid(),
os.getppid(),
)
probes = [probe for config in self._configs.values() for probe in config.values()]
self._callback(ProbePollerEvent.STATUS_UPDATE, probes)
self._next_status_update_timestamp()
if metadatas[idx] is None:
log.debug("[%s][P: %s] Dynamic Instrumentation, no RCM metadata", os.getpid(), os.getppid())
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
dynamic instrumentation: fixed a bug that might have caused probe status
to fail to update correctly.
172 changes: 60 additions & 112 deletions tests/debugging/test_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys
from threading import Thread
from time import sleep
from time import time

import mock
from mock.mock import call
Expand Down Expand Up @@ -398,7 +397,6 @@ def test_debugger_multiple_threads():
good_probe(),
create_snapshot_line_probe(probe_id="thread-test", source_file="tests/submod/stuff.py", line=40),
)
sleep(0.5)

callables = [Stuff().instancestuff, lambda: Stuff().propertystuff]
threads = [Thread(target=callables[_ % len(callables)]) for _ in range(10)]
Expand All @@ -409,12 +407,10 @@ def test_debugger_multiple_threads():
for t in threads:
t.join()

sleep(1.5)

if any(len(snapshots) < len(callables) for snapshots in d.uploader.payloads):
sleep(0.5)
d.uploader.wait_for_payloads(
lambda q: q and all(len(snapshots) >= len(callables) for snapshots in d.uploader.payloads)
)

assert d.uploader.queue
for snapshots in d.uploader.payloads[1:]:
assert len(snapshots) >= len(callables)
assert {s["debugger.snapshot"]["probe"]["id"] for s in snapshots} == {
Expand Down Expand Up @@ -618,127 +614,79 @@ def test_debugger_line_probe_on_wrapped_function(stuff):
assert snapshot.probe.probe_id == "line-probe-wrapped-method"


@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only")
def test_probe_status_logging(monkeypatch, remote_config_worker):
monkeypatch.setenv("DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "10")
remoteconfig_poller.interval = 10
from ddtrace.internal.remoteconfig.client import RemoteConfigClient

old_request = RemoteConfigClient.request

def request(self, *args, **kwargs):
import uuid

for cb in self.get_pubsubs():
cb._subscriber._status_timestamp = time()
cb._subscriber.interval = 0.1
cb.append_and_publish({"test": str(uuid.uuid4())}, "", None)
return True

RemoteConfigClient.request = request
def test_probe_status_logging(remote_config_worker):
assert remoteconfig_poller.status == ServiceStatus.STOPPED
try:
with rcm_endpoint(), debugger(diagnostics_interval=0.5, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
create_snapshot_function_probe(
probe_id="line-probe-error",
module="tests.submod.stuff",
func_qname="foo",
condition=None,
),
)

queue = d.probe_status_logger.queue

def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)

sleep(0.1)
assert count_status(queue) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1}

remoteconfig_poller._client.request()
sleep(0.1)
assert count_status(queue) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2}
with rcm_endpoint(), debugger(diagnostics_interval=0.2, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
create_snapshot_function_probe(
probe_id="line-probe-error",
module="tests.submod.stuff",
func_qname="foo",
condition=None,
),
)

remoteconfig_poller._client.request()
sleep(0.1)
assert count_status(queue) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3}
finally:
RemoteConfigClient.request = old_request
logger = d.probe_status_logger

def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)

@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only")
def test_probe_status_logging_reemit_on_modify(monkeypatch, remote_config_worker):
monkeypatch.setenv("DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "10")
remoteconfig_poller.interval = 10
from ddtrace.internal.remoteconfig.client import RemoteConfigClient
logger.wait(lambda q: count_status(q) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1})

old_request = RemoteConfigClient.request
logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2})

def request(self, *args, **kwargs):
import uuid
logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3})

for cb in self.get_pubsubs():
cb._subscriber._status_timestamp = time()
cb._subscriber.interval = 0.1
cb.append_and_publish({"test": str(uuid.uuid4())}, "", None)
return True

RemoteConfigClient.request = request
def test_probe_status_logging_reemit_on_modify(remote_config_worker):
assert remoteconfig_poller.status == ServiceStatus.STOPPED
try:
with rcm_endpoint(), debugger(diagnostics_interval=0.3, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
version=1,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)
d.modify_probes(
create_snapshot_line_probe(
version=2,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)

queue = d.probe_status_logger.queue
with rcm_endpoint(), debugger(diagnostics_interval=0.2, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
version=1,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)
d.modify_probes(
create_snapshot_line_probe(
version=2,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)

def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)
logger = d.probe_status_logger
queue = logger.queue

def versions(queue, status):
return [
_["debugger"]["diagnostics"]["probeVersion"]
for _ in queue
if _["debugger"]["diagnostics"]["status"] == status
]
def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)

sleep(0.1)
assert count_status(queue) == {"INSTALLED": 2, "RECEIVED": 1}
assert versions(queue, "INSTALLED") == [1, 2]
assert versions(queue, "RECEIVED") == [1]
def versions(queue, status):
return [
_["debugger"]["diagnostics"]["probeVersion"]
for _ in queue
if _["debugger"]["diagnostics"]["status"] == status
]

queue[:] = []
sleep(0.5)
remoteconfig_poller._client.request()
sleep(0.1)
assert count_status(queue) == {"INSTALLED": 1}
assert versions(queue, "INSTALLED") == [2]
logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 1})
assert versions(queue, "INSTALLED") == [1, 2]
assert versions(queue, "RECEIVED") == [1]

finally:
RemoteConfigClient.request = old_request
logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 1})
assert versions(queue, "INSTALLED") == [1, 2, 2]


@pytest.mark.parametrize("duration", [1e5, 1e6, 1e7])
Expand Down

0 comments on commit 9f3bb1d

Please sign in to comment.