Skip to content

Commit

Permalink
Support passing Unix timestamps to dogstatsd (#831)
Browse files Browse the repository at this point in the history
* Support passing Unix timestamps to dogstatsd

Following [v1.3 protocol](https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#dogstatsd-protocol-v13) allow passing timestamps along with the metric, which can mitigate the load on the agent when emitting a lot of metrics in a short time span.

Only gauges and counts are supported.
---------

Co-authored-by: Yann Schwartz <[email protected]>
  • Loading branch information
carlosroman and abolibibelot authored May 23, 2024
1 parent b05583e commit cca8ac7
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 11 deletions.
79 changes: 72 additions & 7 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -774,13 +774,67 @@ def gauge(
"""
return self._report(metric, "g", value, tags, sample_rate)

# Minimum Datadog Agent version: 7.40.0
def gauge_with_timestamp(
self,
metric, # type: Text
value, # type: float
timestamp, # type: int
tags=None, # type: Optional[List[str]]
sample_rate=None, # type: Optional[float]
): # type(...) -> None
"""u
Record the value of a gauge with a Unix timestamp (in seconds),
optionally setting a list of tags and a sample rate.
Minimum Datadog Agent version: 7.40.0
>>> statsd.gauge("users.online", 123, 1713804588)
>>> statsd.gauge("active.connections", 1001, 1713804588, tags=["protocol:http"])
"""
return self._report(metric, "g", value, tags, sample_rate, timestamp)

def count(
self,
metric, # type: Text
value, # type: float
tags=None, # type: Optional[List[str]]
sample_rate=None, # type: Optional[float]
): # type(...) -> None
"""
Count tracks how many times something happened per second, tags and a sample
rate.
>>> statsd.count("page.views", 123)
"""
self._report(metric, "c", value, tags, sample_rate)

# Minimum Datadog Agent version: 7.40.0
def count_with_timestamp(
self,
metric, # type: Text
value, # type: float
timestamp=0, # type: int
tags=None, # type: Optional[List[str]]
sample_rate=None, # type: Optional[float]
): # type(...) -> None
"""
Count how many times something happened at a given Unix timestamp in seconds,
tags and a sample rate.
Minimum Datadog Agent version: 7.40.0
>>> statsd.count("files.transferred", 124, timestamp=1713804588)
"""
self._report(metric, "c", value, tags, sample_rate, timestamp)

def increment(
self,
metric, # type: Text
value=1, # type: float
tags=None, # type: Optional[List[str]]
sample_rate=None, # type: Optional[float]
): # type: (...) -> None
): # type(...) -> None
"""
Increment a counter, optionally setting a value, tags and a sample
rate.
Expand Down Expand Up @@ -934,23 +988,27 @@ def close_socket(self):
log.error("Unexpected error: %s", str(e))
self.telemetry_socket = None

def _serialize_metric(self, metric, metric_type, value, tags, sample_rate=1):
def _serialize_metric(
self, metric, metric_type, value, tags, sample_rate=1, timestamp=0
):
# Create/format the metric packet
return "%s%s:%s|%s%s%s%s" % (
return "%s%s:%s|%s%s%s%s%s" % (
(self.namespace + ".") if self.namespace else "",
metric,
value,
metric_type,
("|@" + text(sample_rate)) if sample_rate != 1 else "",
("|#" + ",".join(normalize_tags(tags))) if tags else "",
("|c:" + self._container_id if self._container_id else "")
("|c:" + self._container_id if self._container_id else ""),
("|T" + text(timestamp)) if timestamp > 0 else "",
)

def _report(self, metric, metric_type, value, tags, sample_rate):
def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0):
"""
Create a metric packet and send it.
More information about the packets' format: http://docs.datadoghq.com/guides/dogstatsd/
More information about the packets' format:
https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol
"""
if value is None:
return
Expand All @@ -967,9 +1025,16 @@ def _report(self, metric, metric_type, value, tags, sample_rate):
if sample_rate != 1 and random() > sample_rate:
return

# timestamps (protocol v1.3) only allowed on gauges and counts
allows_timestamp = metric_type == "g" or metric_type == "c"
if not allows_timestamp or timestamp < 0:
timestamp = 0

# Resolve the full tag list
tags = self._add_constant_tags(tags)
payload = self._serialize_metric(metric, metric_type, value, tags, sample_rate)
payload = self._serialize_metric(
metric, metric_type, value, tags, sample_rate, timestamp
)

# Send it
self._send(payload)
Expand Down
45 changes: 41 additions & 4 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,31 @@ def test_set(self):
self.statsd.set('set', 123)
self.assert_equal_telemetry('set:123|s\n', self.recv(2))

def test_report(self):
self.statsd._report('report', 'g', 123.4, tags=None, sample_rate=None)
self.assert_equal_telemetry('report:123.4|g\n', self.recv(2))

def test_report_metric_with_unsupported_ts(self):
self.statsd._reset_telemetry()
self.statsd._report('report', 'h', 123.5, tags=None, sample_rate=None, timestamp=100)
self.assert_equal_telemetry('report:123.5|h\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd._report('set', 's', 123, tags=None, sample_rate=None, timestamp=100)
self.assert_equal_telemetry('set:123|s\n', self.recv(2))

def test_gauge(self):
self.statsd.gauge('gauge', 123.4)
self.assert_equal_telemetry('gauge:123.4|g\n', self.recv(2))

def test_gauge_with_ts(self):
self.statsd.gauge_with_timestamp("gauge", 123.4, timestamp=1066)
self.assert_equal_telemetry("gauge:123.4|g|T1066\n", self.recv(2))

def test_gauge_with_invalid_ts_should_be_ignored(self):
self.statsd.gauge_with_timestamp("gauge", 123.4, timestamp=-500)
self.assert_equal_telemetry("gauge:123.4|g\n", self.recv(2))

def test_counter(self):
self.statsd.increment('page.views')
self.statsd.flush()
Expand All @@ -328,6 +349,26 @@ def test_counter(self):
self.statsd.flush()
self.assert_equal_telemetry('page.views:-12|c\n', self.recv(2))

def test_count(self):
self.statsd.count('page.views', 11)
self.statsd.flush()
self.assert_equal_telemetry('page.views:11|c\n', self.recv(2))

def test_count_with_ts(self):
self.statsd.count_with_timestamp("page.views", 1, timestamp=1066)
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c|T1066\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.count_with_timestamp("page.views", 11, timestamp=2121)
self.statsd.flush()
self.assert_equal_telemetry("page.views:11|c|T2121\n", self.recv(2))

def test_count_with_invalid_ts_should_be_ignored(self):
self.statsd.count_with_timestamp("page.views", 1, timestamp=-1066)
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c\n", self.recv(2))

def test_histogram(self):
self.statsd.histogram('histo', 123.4)
self.assert_equal_telemetry('histo:123.4|h\n', self.recv(2))
Expand Down Expand Up @@ -518,7 +559,6 @@ def func():
# check that the method does not fail with a small payload
self.statsd.event("title", "message")


def test_service_check(self):
now = int(time.time())
self.statsd.service_check(
Expand Down Expand Up @@ -1106,7 +1146,6 @@ def test_batching_sequential(self):
self.recv(2),
telemetry=expected_metrics1)


expected2 = 'page.views:123|g\ntimer:123|ms\n'
self.assert_equal_telemetry(
expected2,
Expand Down Expand Up @@ -1276,7 +1315,6 @@ def test_telemetry(self):
self.statsd.bytes_dropped_queue = 8
self.statsd.packets_dropped_queue = 9


self.statsd.open_buffer()
self.statsd.gauge('page.views', 123)
self.statsd.close_buffer()
Expand Down Expand Up @@ -1383,7 +1421,6 @@ def test_telemetry_flush_interval_batch(self):
# assert that _last_flush_time has been updated
self.assertTrue(time1 < dogstatsd._last_flush_time)


def test_dedicated_udp_telemetry_dest(self):
listener_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
listener_sock.bind(('localhost', 0))
Expand Down

0 comments on commit cca8ac7

Please sign in to comment.