Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

feat: Add tags back to metrics #1401

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 40 additions & 59 deletions autopush/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@
)

from twisted.internet import reactor
from txstatsd.client import StatsDClientProtocol, TwistedStatsDClient
from txstatsd.metrics.metrics import Metrics

import datadog
from datadog import ThreadStats
import markus

from autopush import logging

Expand All @@ -34,6 +31,9 @@ def __init__(self, *args, **kwargs):
def start(self):
"""Start any connection needed for metric transmission"""

def make_tags(self, base=None, **kwargs):
AzureMarker marked this conversation as resolved.
Show resolved Hide resolved
""" convert tags if needed """

def increment(self, name, count=1, **kwargs):
"""Increment a counter for a metric name"""
raise NotImplementedError("No increment implemented")
Expand All @@ -52,86 +52,67 @@ class SinkMetrics(IMetrics):
def increment(self, name, count=1, **kwargs):
pass

def gauge(self, name, count, **kwargs):
pass

def timing(self, name, duration, **kwargs):
def make_tags(self, base=None, **kwargs):
pass


class TwistedMetrics(object):
"""Twisted implementation of statsd output"""
def __init__(self, statsd_host="localhost", statsd_port=8125):
self.client = TwistedStatsDClient.create(statsd_host, statsd_port)
self._metric = Metrics(connection=self.client, namespace="autopush")

def start(self):
protocol = StatsDClientProtocol(self.client)
reactor.listenUDP(0, protocol)

def increment(self, name, count=1, **kwargs):
self._metric.increment(name, count)

def gauge(self, name, count, **kwargs):
self._metric.gauge(name, count)
pass

def timing(self, name, duration, **kwargs):
self._metric.timing(name, duration)


def make_tags(base=None, **kwargs):
# type: (Sequence[str], **Any) -> Sequence[str]
"""Generate a list of tag values"""
tags = list(base or [])
tags.extend('{}:{}'.format(key, val) for key, val in kwargs.iteritems())
return tags
pass


class DatadogMetrics(object):
"""DataDog Metric backend"""
def __init__(self, api_key, app_key, hostname, flush_interval=10,
namespace="autopush"):
class TaggedMetrics(IMetrics):
"""DataDog like tagged Metric backend"""
def __init__(self, hostname, namespace="autopush"):

datadog.initialize(api_key=api_key, app_key=app_key,
host_name=hostname)
self._client = ThreadStats()
self._flush_interval = flush_interval
markus.configure(
backends=[{
'class': 'markus.backends.datadog.DatadogMetrics',
'options': {
'statsd_host': hostname,
'statsd_namespace': namespace,
}}])
self._client = markus.get_metrics(namespace)
self._host = hostname
self._namespace = namespace

def _prefix_name(self, name):
return "%s.%s" % (self._namespace, name)
return name

def start(self):
self._client.start(flush_interval=self._flush_interval,
roll_up_interval=self._flush_interval)
pass

def increment(self, name, count=1, **kwargs):
self._client.increment(self._prefix_name(name), count, host=self._host,
**kwargs)
def make_tags(self, base=None, **kwargs):
if "host" not in kwargs:
kwargs["host"] = self._host
if base is None:
base = {}
base.update(kwargs)
return base

def gauge(self, name, count, **kwargs):
self._client.gauge(self._prefix_name(name), count, host=self._host,
**kwargs)
def increment(self, name, count=1, tags=None, **kwargs):
tags = self.make_tags(tags, **kwargs)
self._client.incr(self._prefix_name(name), count, **tags)

def timing(self, name, duration, **kwargs):
def gauge(self, name, count, tags=None, **kwargs):
tags = self.make_tags(tags, **kwargs)
self._client.gauge(self._prefix_name(name), count, **tags)

def timing(self, name, duration, tags=None, **kwargs):
tags = self.make_tags(tags, **kwargs)
self._client.timing(self._prefix_name(name), value=duration,
host=self._host, **kwargs)
**tags)


def from_config(conf):
# type: (AutopushConfig) -> IMetrics
"""Create an IMetrics from the given config"""
if conf.datadog_api_key:
return DatadogMetrics(
if conf.statsd_host:
return TaggedMetrics(
hostname=logging.instance_id_or_hostname if conf.ami_id else
conf.hostname,
api_key=conf.datadog_api_key,
app_key=conf.datadog_app_key,
flush_interval=conf.datadog_flush_interval,
conf.hostname
)
elif conf.statsd_host:
return TwistedMetrics(conf.statsd_host, conf.statsd_port)
else:
return SinkMetrics()

Expand Down
9 changes: 4 additions & 5 deletions autopush/router/adm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

from autopush.constants import DEFAULT_ROUTER_TIMEOUT
from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict # noqa

Expand Down Expand Up @@ -118,7 +117,7 @@ def __init__(self, conf, router_conf, metrics):
logger=self.log,
metrics=self.metrics,
timeout=timeout)
self._base_tags = ["platform:adm"]
self._base_tags = {"platform": "adm"}
self.log.debug("Starting ADM router...")

def amend_endpoint_response(self, response, router_data):
Expand Down Expand Up @@ -180,7 +179,7 @@ def _route(self, notification, uaid_data):
except Timeout as e:
self.log.warn("ADM Timeout: %s" % e)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
tags=self.metrics.make_tags(
self._base_tags,
reason="timeout"))
raise RouterException("Server error", status_code=502,
Expand All @@ -189,7 +188,7 @@ def _route(self, notification, uaid_data):
except ConnectionError as e:
self.log.warn("ADM Unavailable: %s" % e)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
tags=self.metrics.make_tags(
self._base_tags,
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
Expand All @@ -198,7 +197,7 @@ def _route(self, notification, uaid_data):
except ADMAuthError as e:
self.log.error("ADM unable to authorize: %s" % e)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
tags=self.metrics.make_tags(
self._base_tags,
reason="auth failure"
))
Expand Down
20 changes: 11 additions & 9 deletions autopush/router/apnsrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.apns2 import (
APNSClient,
APNS_MAX_CONNECTIONS,
Expand Down Expand Up @@ -64,7 +63,7 @@ def __init__(self, conf, router_conf, metrics, load_connections=True):
self.conf = conf
self.router_conf = router_conf
self.metrics = metrics
self._base_tags = ["platform:apns"]
self._base_tags = {"platform": "apns"}
self.apns = dict()
for rel_channel in router_conf:
self.apns[rel_channel] = self._connect(rel_channel,
Expand Down Expand Up @@ -164,9 +163,10 @@ def _route(self, notification, router_data):
else:
reason = "unknown"
self.metrics.increment("notification.bridge.connection.error",
tags=make_tags(self._base_tags,
application=rel_channel,
reason=reason))
tags=self.metrics.make_tags(
self._base_tags,
application=rel_channel,
reason=reason))
raise RouterException(
str(e),
status_code=502,
Expand All @@ -175,8 +175,9 @@ def _route(self, notification, router_data):

location = "%s/m/%s" % (self.conf.endpoint_url, notification.version)
self.metrics.increment("notification.bridge.sent",
tags=make_tags(self._base_tags,
application=rel_channel))
tags=self.metrics.make_tags(
self._base_tags,
application=rel_channel))

self.metrics.increment(
"updates.client.bridge.apns.{}.sent".format(
Expand All @@ -186,8 +187,9 @@ def _route(self, notification, router_data):
)
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination='Direct'))
tags=self.metrics.make_tags(
self._base_tags,
destination='Direct'))
return RouterResponse(status_code=201, response_body="",
headers={"TTL": notification.ttl,
"Location": location},
Expand Down
20 changes: 11 additions & 9 deletions autopush/router/fcm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.types import JSONDict # noqa

Expand Down Expand Up @@ -117,7 +116,7 @@ def __init__(self, conf, router_conf, metrics):
except Exception as e:
self.log.error("Could not instantiate FCM {ex}", ex=e)
raise IOError("FCM Bridge not initiated in main")
self._base_tags = ["platform:fcm"]
self._base_tags = {"platform": "fcm"}
self.log.debug("Starting FCM router...")

def amend_endpoint_response(self, response, router_data):
Expand Down Expand Up @@ -204,7 +203,7 @@ def _route(self, notification, router_data):
raise RouterException("Server error", status_code=500)
except ConnectionError as e:
self.metrics.increment("notification.bridge.error",
tags=make_tags(
tags=self.metrics.make_tags(
self._base_tags,
reason="connection_unavailable"))
self.log.warn("Could not connect to FCM server: %s" % e)
Expand Down Expand Up @@ -234,15 +233,17 @@ def _process_reply(self, reply, notification, router_data, ttl):
self.log.debug("FCM id changed : {old} => {new}",
old=old_id, new=new_id)
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="reregister"))
tags=self.metrics.make_tags(
self._base_tags,
reason="reregister"))
return RouterResponse(status_code=503,
response_body="Please try request again.",
router_data=dict(token=new_id))
if reply.get('failure'):
self.metrics.increment("notification.bridge.error",
tags=make_tags(self._base_tags,
reason="failure"))
tags=self.metrics.make_tags(
self._base_tags,
reason="failure"))
reason = result.get('error', "Unreported")
err = self.reasonTable.get(reason)
if err.get("crit", False):
Expand Down Expand Up @@ -272,8 +273,9 @@ def _process_reply(self, reply, notification, router_data, ttl):
tags=self._base_tags)
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination="Direct"))
tags=self.metrics.make_tags(
self._base_tags,
destination="Direct"))
location = "%s/m/%s" % (self.conf.endpoint_url, notification.version)
return RouterResponse(status_code=201, response_body="",
headers={"TTL": ttl,
Expand Down
16 changes: 8 additions & 8 deletions autopush/router/fcm_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from twisted.logger import Logger

from autopush.exceptions import RouterException
from autopush.metrics import make_tags
from autopush.router.interface import RouterResponse
from autopush.router.fcm import FCMRouter
from autopush.router.fcmv1client import (
Expand Down Expand Up @@ -44,7 +43,7 @@ def __init__(self, conf, router_conf, metrics):
self.log.error("Could not instantiate FCMv1: missing credentials,",
ex=e)
raise IOError("FCMv1 Bridge not initiated in main")
self._base_tags = ["platform:fcmv1"]
self._base_tags = {"platform": "fcmv1"}
self.log.debug("Starting FCMv1 router...")

def amend_endpoint_response(self, response, router_data):
Expand Down Expand Up @@ -132,7 +131,7 @@ def _process_error(self, failure):
if isinstance(err, TimeoutError):
self.log.warn("FCM Timeout: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
tags=self.metrics.make_tags(
self._base_tags,
reason="timeout"))
raise RouterException("Server error", status_code=502,
Expand All @@ -141,7 +140,7 @@ def _process_error(self, failure):
if isinstance(err, ConnectError):
self.log.warn("FCM Unavailable: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
tags=self.metrics.make_tags(
self._base_tags,
reason="connection_unavailable"))
raise RouterException("Server error", status_code=502,
Expand All @@ -150,7 +149,7 @@ def _process_error(self, failure):
if isinstance(err, FCMNotFoundError):
self.log.debug("FCM Recipient not found: %s" % err)
self.metrics.increment("notification.bridge.error",
tags=make_tags(
tags=self.metrics.make_tags(
self._base_tags,
reason="recpient_gone"
))
Expand All @@ -161,7 +160,7 @@ def _process_error(self, failure):
if isinstance(err, RouterException):
self.log.warn("FCM Error: {}".format(err))
self.metrics.increment("notification.bridge.error",
tags=make_tags(
tags=self.metrics.make_tags(
self._base_tags,
reason="server_error"))
return failure
Expand All @@ -179,8 +178,9 @@ def _process_reply(self, reply, notification, router_data, ttl):
tags=self._base_tags)
self.metrics.increment("notification.message_data",
notification.data_length,
tags=make_tags(self._base_tags,
destination="Direct"))
tags=self.metrics.make_tags(
self._base_tags,
destination="Direct"))
location = "%s/m/%s" % (self.conf.endpoint_url, notification.version)
return RouterResponse(status_code=201, response_body="",
headers={"TTL": ttl,
Expand Down
Loading