Skip to content

Commit

Permalink
fix(debugging): emit probe status updates unconditionally (#6714)
Browse files Browse the repository at this point in the history
We ensure that probe status update are sent when due, regardless of
whether the RCM worker has found new data. This fixes a regression
introduced by the IPC refactor for the RCM client.

## Checklist

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed. If no release note is required, add label
`changelog/no-changelog`.
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist

- [ ] Title is accurate.
- [ ] No unnecessary changes are introduced.
- [ ] Description motivates each change.
- [ ] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [ ] Testing strategy adequately addresses listed risk(s).
- [ ] Change is maintainable (easy to change, telemetry, documentation).
- [ ] Release note makes sense to a user of the library.
- [ ] Reviewer has explicitly acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment.
- [ ] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
P403n1x87 authored Aug 23, 2023
1 parent d44903a commit 1726105
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 120 deletions.
22 changes: 13 additions & 9 deletions ddtrace/debugging/_probe/remoteconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,22 +318,26 @@ def __init__(self, data_connector, callback, name, status_logger):
self._next_status_update_timestamp()

def _exec_callback(self, data, test_tracer=None):
# Check if it is time to re-emit probe status messages.
# DEV: We use the periodic signal from the remote config client worker
# thread to avoid having to spawn a separate thread for this.
if time.time() > self._status_timestamp:
log.debug(
"[%s][P: %s] Dynamic Instrumentation,Emitting probe status log messages",
os.getpid(),
os.getppid(),
)
probes = [probe for config in self._configs.values() for probe in config.values()]
self._callback(ProbePollerEvent.STATUS_UPDATE, probes)
self._next_status_update_timestamp()

if data:
metadatas = data["metadata"]
rc_configs = data["config"]
# DEV: We emit a status update event here to avoid having to spawn a
# separate thread for this.
log.debug("[%s][P: %s] Dynamic Instrumentation Updated", os.getpid(), os.getppid())
for idx in range(len(rc_configs)):
if time.time() > self._status_timestamp:
log.debug(
"[%s][P: %s] Dynamic Instrumentation,Emitting probe status log messages",
os.getpid(),
os.getppid(),
)
probes = [probe for config in self._configs.values() for probe in config.values()]
self._callback(ProbePollerEvent.STATUS_UPDATE, probes)
self._next_status_update_timestamp()
if metadatas[idx] is None:
log.debug("[%s][P: %s] Dynamic Instrumentation, no RCM metadata", os.getpid(), os.getppid())
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
dynamic instrumentation: fixed a bug that might have caused probe status
to fail to update correctly.
171 changes: 60 additions & 111 deletions tests/debugging/test_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys
from threading import Thread
from time import sleep
from time import time

import mock
from mock.mock import call
Expand Down Expand Up @@ -397,7 +396,6 @@ def test_debugger_multiple_threads():
good_probe(),
create_snapshot_line_probe(probe_id="thread-test", source_file="tests/submod/stuff.py", line=40),
)
sleep(0.5)

callables = [Stuff().instancestuff, lambda: Stuff().propertystuff]
threads = [Thread(target=callables[_ % len(callables)]) for _ in range(10)]
Expand All @@ -408,12 +406,10 @@ def test_debugger_multiple_threads():
for t in threads:
t.join()

sleep(1.5)

if any(len(snapshots) < len(callables) for snapshots in d.uploader.payloads):
sleep(0.5)
d.uploader.wait_for_payloads(
lambda q: q and all(len(snapshots) >= len(callables) for snapshots in d.uploader.payloads)
)

assert d.uploader.queue
for snapshots in d.uploader.payloads[1:]:
assert len(snapshots) >= len(callables)
assert {s["debugger.snapshot"]["probe"]["id"] for s in snapshots} == {
Expand Down Expand Up @@ -617,126 +613,79 @@ def test_debugger_line_probe_on_wrapped_function(stuff):
assert snapshot.probe.probe_id == "line-probe-wrapped-method"


@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only")
def test_probe_status_logging(monkeypatch, remote_config_worker):
monkeypatch.setenv("DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "10")
remoteconfig_poller.interval = 10
from ddtrace.internal.remoteconfig.client import RemoteConfigClient

old_request = RemoteConfigClient.request

def request(self, *args, **kwargs):
import uuid

for cb in self.get_pubsubs():
cb._subscriber._status_timestamp = time()
cb._subscriber.interval = 0.1
cb.append_and_publish({"test": str(uuid.uuid4())}, "", None)
return True

RemoteConfigClient.request = request
def test_probe_status_logging(remote_config_worker):
assert remoteconfig_poller.status == ServiceStatus.STOPPED
try:
with rcm_endpoint(), debugger(diagnostics_interval=0.5, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
create_snapshot_function_probe(
probe_id="line-probe-error",
module="tests.submod.stuff",
func_qname="foo",
condition=None,
),
)

queue = d.probe_status_logger.queue

def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)

sleep(0.1)
assert count_status(queue) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1}

remoteconfig_poller._client.request()
sleep(0.1)
assert count_status(queue) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2}
with rcm_endpoint(), debugger(diagnostics_interval=0.2, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
create_snapshot_function_probe(
probe_id="line-probe-error",
module="tests.submod.stuff",
func_qname="foo",
condition=None,
),
)

remoteconfig_poller._client.request()
sleep(0.1)
assert count_status(queue) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3}
finally:
RemoteConfigClient.request = old_request
logger = d.probe_status_logger

def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)

@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only")
def test_probe_status_logging_reemit_on_modify(monkeypatch, remote_config_worker):
monkeypatch.setenv("DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "10")
remoteconfig_poller.interval = 10
from ddtrace.internal.remoteconfig.client import RemoteConfigClient
logger.wait(lambda q: count_status(q) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1})

old_request = RemoteConfigClient.request
logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2})

def request(self, *args, **kwargs):
import uuid
logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3})

for cb in self.get_pubsubs():
cb._subscriber._status_timestamp = time()
cb._subscriber.interval = 0.1
cb.append_and_publish({"test": str(uuid.uuid4())}, "", None)
return True

RemoteConfigClient.request = request
def test_probe_status_logging_reemit_on_modify(remote_config_worker):
assert remoteconfig_poller.status == ServiceStatus.STOPPED
try:
with rcm_endpoint(), debugger(diagnostics_interval=0.0, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
version=1,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)
d.modify_probes(
create_snapshot_line_probe(
version=2,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)

logger = d.probe_status_logger
queue = logger.queue

def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)
with rcm_endpoint(), debugger(diagnostics_interval=0.2, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
version=1,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)
d.modify_probes(
create_snapshot_line_probe(
version=2,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)

def versions(queue, status):
return [
_["debugger"]["diagnostics"]["probeVersion"]
for _ in queue
if _["debugger"]["diagnostics"]["status"] == status
]
logger = d.probe_status_logger
queue = logger.queue

logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 1})
assert versions(queue, "INSTALLED") == [1, 2]
assert versions(queue, "RECEIVED") == [1]
def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)

logger.clear()
remoteconfig_poller._client.request()
def versions(queue, status):
return [
_["debugger"]["diagnostics"]["probeVersion"]
for _ in queue
if _["debugger"]["diagnostics"]["status"] == status
]

logger.wait(lambda q: count_status(q) == {"INSTALLED": 1})
assert versions(queue, "INSTALLED") == [2]
logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 1})
assert versions(queue, "INSTALLED") == [1, 2]
assert versions(queue, "RECEIVED") == [1]

finally:
RemoteConfigClient.request = old_request
logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 1})
assert versions(queue, "INSTALLED") == [1, 2, 2]


@pytest.mark.parametrize("duration", [1e5, 1e6, 1e7])
Expand Down

0 comments on commit 1726105

Please sign in to comment.