Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(debugging): emit probe status updates unconditionally #6714

Merged
22 changes: 13 additions & 9 deletions ddtrace/debugging/_probe/remoteconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,22 +318,26 @@ def __init__(self, data_connector, callback, name, status_logger):
self._next_status_update_timestamp()

def _exec_callback(self, data, test_tracer=None):
# Check if it is time to re-emit probe status messages.
# DEV: We use the periodic signal from the remote config client worker
# thread to avoid having to spawn a separate thread for this.
if time.time() > self._status_timestamp:
log.debug(
"[%s][P: %s] Dynamic Instrumentation,Emitting probe status log messages",
os.getpid(),
os.getppid(),
)
probes = [probe for config in self._configs.values() for probe in config.values()]
self._callback(ProbePollerEvent.STATUS_UPDATE, probes)
self._next_status_update_timestamp()

if data:
metadatas = data["metadata"]
rc_configs = data["config"]
# DEV: We emit a status update event here to avoid having to spawn a
# separate thread for this.
log.debug("[%s][P: %s] Dynamic Instrumentation Updated", os.getpid(), os.getppid())
for idx in range(len(rc_configs)):
if time.time() > self._status_timestamp:
log.debug(
"[%s][P: %s] Dynamic Instrumentation,Emitting probe status log messages",
os.getpid(),
os.getppid(),
)
probes = [probe for config in self._configs.values() for probe in config.values()]
self._callback(ProbePollerEvent.STATUS_UPDATE, probes)
self._next_status_update_timestamp()
if metadatas[idx] is None:
log.debug("[%s][P: %s] Dynamic Instrumentation, no RCM metadata", os.getpid(), os.getppid())
return
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
dynamic instrumentation: fixed a bug that might have caused probe status
to fail to update correctly.
171 changes: 60 additions & 111 deletions tests/debugging/test_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import sys
from threading import Thread
from time import sleep
from time import time

import mock
from mock.mock import call
Expand Down Expand Up @@ -397,7 +396,6 @@ def test_debugger_multiple_threads():
good_probe(),
create_snapshot_line_probe(probe_id="thread-test", source_file="tests/submod/stuff.py", line=40),
)
sleep(0.5)

callables = [Stuff().instancestuff, lambda: Stuff().propertystuff]
threads = [Thread(target=callables[_ % len(callables)]) for _ in range(10)]
Expand All @@ -408,12 +406,10 @@ def test_debugger_multiple_threads():
for t in threads:
t.join()

sleep(1.5)

if any(len(snapshots) < len(callables) for snapshots in d.uploader.payloads):
sleep(0.5)
d.uploader.wait_for_payloads(
lambda q: q and all(len(snapshots) >= len(callables) for snapshots in d.uploader.payloads)
)

assert d.uploader.queue
for snapshots in d.uploader.payloads[1:]:
assert len(snapshots) >= len(callables)
assert {s["debugger.snapshot"]["probe"]["id"] for s in snapshots} == {
Expand Down Expand Up @@ -617,126 +613,79 @@ def test_debugger_line_probe_on_wrapped_function(stuff):
assert snapshot.probe.probe_id == "line-probe-wrapped-method"


@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only")
def test_probe_status_logging(monkeypatch, remote_config_worker):
monkeypatch.setenv("DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "10")
remoteconfig_poller.interval = 10
from ddtrace.internal.remoteconfig.client import RemoteConfigClient

old_request = RemoteConfigClient.request

def request(self, *args, **kwargs):
import uuid

for cb in self.get_pubsubs():
cb._subscriber._status_timestamp = time()
cb._subscriber.interval = 0.1
cb.append_and_publish({"test": str(uuid.uuid4())}, "", None)
return True

RemoteConfigClient.request = request
def test_probe_status_logging(remote_config_worker):
assert remoteconfig_poller.status == ServiceStatus.STOPPED
try:
with rcm_endpoint(), debugger(diagnostics_interval=0.5, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
create_snapshot_function_probe(
probe_id="line-probe-error",
module="tests.submod.stuff",
func_qname="foo",
condition=None,
),
)

queue = d.probe_status_logger.queue

def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)

sleep(0.1)
assert count_status(queue) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1}

remoteconfig_poller._client.request()
sleep(0.1)
assert count_status(queue) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2}
with rcm_endpoint(), debugger(diagnostics_interval=0.2, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
create_snapshot_function_probe(
probe_id="line-probe-error",
module="tests.submod.stuff",
func_qname="foo",
condition=None,
),
)

remoteconfig_poller._client.request()
sleep(0.1)
assert count_status(queue) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3}
finally:
RemoteConfigClient.request = old_request
logger = d.probe_status_logger

def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)

@pytest.mark.skipif(sys.version_info < (3, 6, 0), reason="Python 3.6+ only")
def test_probe_status_logging_reemit_on_modify(monkeypatch, remote_config_worker):
monkeypatch.setenv("DD_REMOTE_CONFIG_POLL_INTERVAL_SECONDS", "10")
remoteconfig_poller.interval = 10
from ddtrace.internal.remoteconfig.client import RemoteConfigClient
logger.wait(lambda q: count_status(q) == {"INSTALLED": 1, "RECEIVED": 2, "ERROR": 1})

old_request = RemoteConfigClient.request
logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 2, "ERROR": 2})

def request(self, *args, **kwargs):
import uuid
logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 2, "ERROR": 3})

for cb in self.get_pubsubs():
cb._subscriber._status_timestamp = time()
cb._subscriber.interval = 0.1
cb.append_and_publish({"test": str(uuid.uuid4())}, "", None)
return True

RemoteConfigClient.request = request
def test_probe_status_logging_reemit_on_modify(remote_config_worker):
assert remoteconfig_poller.status == ServiceStatus.STOPPED
try:
with rcm_endpoint(), debugger(diagnostics_interval=0.0, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
version=1,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)
d.modify_probes(
create_snapshot_line_probe(
version=2,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)

logger = d.probe_status_logger
queue = logger.queue

def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)
with rcm_endpoint(), debugger(diagnostics_interval=0.2, enabled=True) as d:
d.add_probes(
create_snapshot_line_probe(
version=1,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)
d.modify_probes(
create_snapshot_line_probe(
version=2,
probe_id="line-probe-ok",
source_file="tests/submod/stuff.py",
line=36,
condition=None,
),
)

def versions(queue, status):
return [
_["debugger"]["diagnostics"]["probeVersion"]
for _ in queue
if _["debugger"]["diagnostics"]["status"] == status
]
logger = d.probe_status_logger
queue = logger.queue

logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 1})
assert versions(queue, "INSTALLED") == [1, 2]
assert versions(queue, "RECEIVED") == [1]
def count_status(queue):
return Counter(_["debugger"]["diagnostics"]["status"] for _ in queue)

logger.clear()
remoteconfig_poller._client.request()
def versions(queue, status):
return [
_["debugger"]["diagnostics"]["probeVersion"]
for _ in queue
if _["debugger"]["diagnostics"]["status"] == status
]

logger.wait(lambda q: count_status(q) == {"INSTALLED": 1})
assert versions(queue, "INSTALLED") == [2]
logger.wait(lambda q: count_status(q) == {"INSTALLED": 2, "RECEIVED": 1})
assert versions(queue, "INSTALLED") == [1, 2]
assert versions(queue, "RECEIVED") == [1]

finally:
RemoteConfigClient.request = old_request
logger.wait(lambda q: count_status(q) == {"INSTALLED": 3, "RECEIVED": 1})
assert versions(queue, "INSTALLED") == [1, 2, 2]


@pytest.mark.parametrize("duration", [1e5, 1e6, 1e7])
Expand Down