Skip to content

Commit

Permalink
feat(origin detection): implement new spec (#839)
Browse files Browse the repository at this point in the history
Signed-off-by: Wassim DHIF <[email protected]>
  • Loading branch information
wdhif authored Jul 25, 2024
1 parent 8ba18e8 commit 3f674b2
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 27 deletions.
7 changes: 6 additions & 1 deletion datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
# Env var to enable/disable sending the container ID field
ORIGIN_DETECTION_ENABLED = "DD_ORIGIN_DETECTION_ENABLED"

# Environment variable containing external data used for Origin Detection.
EXTERNAL_DATA_ENV_VAR = "DD_EXTERNAL_ENV"

# Default buffer settings based on socket type
UDP_OPTIMAL_PAYLOAD_LENGTH = 1432
UDS_OPTIMAL_PAYLOAD_LENGTH = 8192
Expand Down Expand Up @@ -402,6 +405,7 @@ def __init__(
container_id, origin_detection_enabled
)
self._set_container_id(container_id, origin_detection_enabled)
self._external_data = os.environ.get(EXTERNAL_DATA_ENV_VAR, None)

# init telemetry version
self._client_tags = [
Expand Down Expand Up @@ -992,14 +996,15 @@ def _serialize_metric(
self, metric, metric_type, value, tags, sample_rate=1, timestamp=0
):
# Create/format the metric packet
return "%s%s:%s|%s%s%s%s%s" % (
return "%s%s:%s|%s%s%s%s%s%s" % (
(self.namespace + ".") if self.namespace else "",
metric,
value,
metric_type,
("|@" + text(sample_rate)) if sample_rate != 1 else "",
("|#" + ",".join(normalize_tags(tags))) if tags else "",
("|c:" + self._container_id if self._container_id else ""),
("|e:" + self._external_data if self._external_data else ""),
("|T" + text(timestamp)) if timestamp > 0 else "",
)

Expand Down
2 changes: 1 addition & 1 deletion datadog/dogstatsd/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _read_cgroup_path(self):
if len(parts):
match = self.CONTAINER_RE.match(parts.pop())
if match:
return match.group(1)
return "ci-{0}".format(match.group(1))
except IOError as e:
if e.errno != errno.ENOENT:
raise NotImplementedError("Unable to open {}.".format(self.CGROUP_PATH))
Expand Down
10 changes: 5 additions & 5 deletions tests/unit/dogstatsd/test_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def get_mock_open(read_data=None):
2:cpu:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
1:cpuset:/docker/3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860
""",
"3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860",
"ci-3726184226f5d3147c25fdeab5b60097e378e8a720503a5e19ecfdf29f869860",
),
# k8s file
(
Expand All @@ -55,7 +55,7 @@ def get_mock_open(read_data=None):
2:hugetlb:/kubepods/test/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
1:name=systemd:/kubepods/test/pod3d274242-8ee0-11e9-a8a6-1e68d864ef1a/3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1
""",
"3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1",
"ci-3e74d3fd9db4c9dd921ae05c2502fb984d0cde1b36e581b13f79c639da4518a1",
),
# ECS file
(
Expand All @@ -70,7 +70,7 @@ def get_mock_open(read_data=None):
2:cpu:/ecs/test-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce
1:blkio:/ecs/test-ecs-classic/5a0d5ceddf6c44c1928d367a815d890f/38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce
""",
"38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce",
"ci-38fac3e99302b3622be089dd41e7ccf38aff368a86cc339972075136ee2710ce",
),
# Fargate file
(
Expand All @@ -87,7 +87,7 @@ def get_mock_open(read_data=None):
2:memory:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
1:name=systemd:/ecs/55091c13-b8cf-4801-b527-f4601742204d/432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da
""",
"432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da",
"ci-432624d2150b349fe35ba397284dea788c2bf66b885d14dfc1569b01890ca7da",
),
# Fargate file >= 1.4.0
(
Expand All @@ -104,7 +104,7 @@ def get_mock_open(read_data=None):
2:memory:/ecs/55091c13-b8cf-4801-b527-f4601742204d/34dc0b5e626f2c5c4c5170e34b10e765-1234567890
1:name=systemd:/ecs/34dc0b5e626f2c5c4c5170e34b10e765-1234567890
""",
"34dc0b5e626f2c5c4c5170e34b10e765-1234567890",
"ci-34dc0b5e626f2c5c4c5170e34b10e765-1234567890",
),
# Linux non-containerized file
(
Expand Down
55 changes: 35 additions & 20 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1667,12 +1667,27 @@ def test_entity_id_and_container_id(self):
os.environ['DD_ENTITY_ID'] = '04652bb7-19b7-11e9-9cc6-42010a9c016d'
dogstatsd = DogStatsd(telemetry_min_flush_interval=0)
dogstatsd.socket = FakeSocket()
dogstatsd._container_id = "fake-container-id"
dogstatsd._container_id = "ci-fake-container-id"

dogstatsd.increment("page.views")
dogstatsd.flush()
tags = "dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d"
metric = 'page.views:1|c|#' + tags + '|c:fake-container-id\n'
metric = 'page.views:1|c|#' + tags + '|c:ci-fake-container-id\n'
self.assertEqual(metric, dogstatsd.socket.recv())
self.assertEqual(telemetry_metrics(tags=tags, bytes_sent=len(metric)), dogstatsd.socket.recv())

def test_entity_id_and_container_id_and_external_env(self):
with preserve_environment_variable('DD_ENTITY_ID'), preserve_environment_variable('DD_EXTERNAL_ENV'):
os.environ['DD_ENTITY_ID'] = '04652bb7-19b7-11e9-9cc6-42010a9c016d'
os.environ['DD_EXTERNAL_ENV'] = 'it-false,cn-container-name,pu-04652bb7-19b7-11e9-9cc6-42010a9c016d'
dogstatsd = DogStatsd(telemetry_min_flush_interval=0)
dogstatsd.socket = FakeSocket()
dogstatsd._container_id = "ci-fake-container-id"

dogstatsd.increment("page.views")
dogstatsd.flush()
tags = "dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d"
metric = 'page.views:1|c|#' + tags + '|c:ci-fake-container-id' + '|e:it-false,cn-container-name,pu-04652bb7-19b7-11e9-9cc6-42010a9c016d' + '\n'
self.assertEqual(metric, dogstatsd.socket.recv())
self.assertEqual(telemetry_metrics(tags=tags, bytes_sent=len(metric)), dogstatsd.socket.recv())

Expand Down Expand Up @@ -1862,62 +1877,62 @@ def test_histogram_does_not_send_none(self):
self.assertIsNone(self.recv())

def test_set_with_container_field(self):
self.statsd._container_id = "fake-container-id"
self.statsd._container_id = "ci-fake-container-id"
self.statsd.set("set", 123)
self.assert_equal_telemetry("set:123|s|c:fake-container-id\n", self.recv(2))
self.assert_equal_telemetry("set:123|s|c:ci-fake-container-id\n", self.recv(2))
self.statsd._container_id = None

def test_gauge_with_container_field(self):
self.statsd._container_id = "fake-container-id"
self.statsd._container_id = "ci-fake-container-id"
self.statsd.gauge("gauge", 123.4)
self.assert_equal_telemetry("gauge:123.4|g|c:fake-container-id\n", self.recv(2))
self.assert_equal_telemetry("gauge:123.4|g|c:ci-fake-container-id\n", self.recv(2))
self.statsd._container_id = None

def test_counter_with_container_field(self):
self.statsd._container_id = "fake-container-id"
self.statsd._container_id = "ci-fake-container-id"

self.statsd.increment("page.views")
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c|c:fake-container-id\n", self.recv(2))
self.assert_equal_telemetry("page.views:1|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.increment("page.views", 11)
self.statsd.flush()
self.assert_equal_telemetry("page.views:11|c|c:fake-container-id\n", self.recv(2))
self.assert_equal_telemetry("page.views:11|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement("page.views")
self.statsd.flush()
self.assert_equal_telemetry("page.views:-1|c|c:fake-container-id\n", self.recv(2))
self.assert_equal_telemetry("page.views:-1|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement("page.views", 12)
self.statsd.flush()
self.assert_equal_telemetry("page.views:-12|c|c:fake-container-id\n", self.recv(2))
self.assert_equal_telemetry("page.views:-12|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._container_id = None

def test_histogram_with_container_field(self):
self.statsd._container_id = "fake-container-id"
self.statsd._container_id = "ci-fake-container-id"
self.statsd.histogram("histo", 123.4)
self.assert_equal_telemetry("histo:123.4|h|c:fake-container-id\n", self.recv(2))
self.assert_equal_telemetry("histo:123.4|h|c:ci-fake-container-id\n", self.recv(2))
self.statsd._container_id = None

def test_timing_with_container_field(self):
self.statsd._container_id = "fake-container-id"
self.statsd._container_id = "ci-fake-container-id"
self.statsd.timing("t", 123)
self.assert_equal_telemetry("t:123|ms|c:fake-container-id\n", self.recv(2))
self.assert_equal_telemetry("t:123|ms|c:ci-fake-container-id\n", self.recv(2))
self.statsd._container_id = None

def test_event_with_container_field(self):
self.statsd._container_id = "fake-container-id"
self.statsd._container_id = "ci-fake-container-id"
self.statsd.event(
"Title",
"L1\nL2",
priority="low",
date_happened=1375296969,
)
event2 = u"_e{5,6}:Title|L1\\nL2|d:1375296969|p:low|c:fake-container-id\n"
event2 = u"_e{5,6}:Title|L1\\nL2|d:1375296969|p:low|c:ci-fake-container-id\n"
self.assert_equal_telemetry(
event2,
self.recv(2),
Expand All @@ -1931,7 +1946,7 @@ def test_event_with_container_field(self):
self.statsd._reset_telemetry()

self.statsd.event("Title", u"♬ †øU †øU ¥ºu T0µ ♪", aggregation_key="key", tags=["t1", "t2:v2"])
event3 = u"_e{5,32}:Title|♬ †øU †øU ¥ºu T0µ ♪|k:key|#t1,t2:v2|c:fake-container-id\n"
event3 = u"_e{5,32}:Title|♬ †øU †øU ¥ºu T0µ ♪|k:key|#t1,t2:v2|c:ci-fake-container-id\n"
self.assert_equal_telemetry(
event3,
self.recv(2, reset_wait=True),
Expand All @@ -1944,7 +1959,7 @@ def test_event_with_container_field(self):
self.statsd._container_id = None

def test_service_check_with_container_field(self):
self.statsd._container_id = "fake-container-id"
self.statsd._container_id = "ci-fake-container-id"
now = int(time.time())
self.statsd.service_check(
"my_check.name",
Expand All @@ -1954,7 +1969,7 @@ def test_service_check_with_container_field(self):
hostname=u"i-abcd1234",
message=u"♬ †øU \n†øU ¥ºu|m: T0µ ♪",
)
check = u'_sc|my_check.name|{0}|d:{1}|h:i-abcd1234|#key1:val1,key2:val2|m:{2}|c:fake-container-id\n'.format(
check = u'_sc|my_check.name|{0}|d:{1}|h:i-abcd1234|#key1:val1,key2:val2|m:{2}|c:ci-fake-container-id\n'.format(
self.statsd.WARNING, now, u'♬ †øU \\n†øU ¥ºu|m\\: T0µ ♪'
)
self.assert_equal_telemetry(
Expand Down

0 comments on commit 3f674b2

Please sign in to comment.