Skip to content

Commit

Permalink
Support Python 3 for datadog_checks_base
Browse files Browse the repository at this point in the history
  • Loading branch information
ofek committed Jul 29, 2018
1 parent 4e6f576 commit d7ddff5
Show file tree
Hide file tree
Showing 23 changed files with 520 additions and 612 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
- ddev test
- ddev test --bench || true
- stage: test
env: CHECK=datadog_checks_base
env: CHECK=datadog_checks_base PYTHON3=true
script:
- ddev dep verify
- ddev manifest verify --include-extras
Expand Down
18 changes: 10 additions & 8 deletions datadog_checks_base/datadog_checks/checks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import traceback
import unicodedata

from six import iteritems, text_type

try:
import datadog_agent
from ..log import init_logging
Expand Down Expand Up @@ -38,7 +40,7 @@ def __init__(self, *args, **kwargs):
args: `name`, `init_config`, `agentConfig` (deprecated), `instances`
"""
self.metrics = defaultdict(list)
self.check_id = ''
self.check_id = b''
self.instances = kwargs.get('instances', [])
self.name = kwargs.get('name', '')
self.init_config = kwargs.get('init_config', {})
Expand Down Expand Up @@ -78,7 +80,7 @@ def __init__(self, *args, **kwargs):
self._deprecations = {
'increment': [
False,
"DEPRECATION NOTICE: `AgentCheck.increment`/`AgentCheck.decrement` are deprecated, please use " +
"DEPRECATION NOTICE: `AgentCheck.increment`/`AgentCheck.decrement` are deprecated, please use "
"`AgentCheck.gauge` or `AgentCheck.count` instead, with a different metric name",
],
'device_name': [
Expand Down Expand Up @@ -124,7 +126,7 @@ def _submit_metric(self, mtype, name, value, tags=None, hostname=None, device_na

tags = self._normalize_tags(tags, device_name)
if hostname is None:
hostname = ""
hostname = b''

aggregator.submit_metric(self, self.check_id, mtype, name, float(value), tags, hostname)

Expand Down Expand Up @@ -176,9 +178,9 @@ def service_check(self, name, status, tags=None, hostname=None, message=None):

def event(self, event):
# Enforce types of some fields, considerably facilitates handling in go bindings downstream
for key, value in event.items():
for key, value in list(iteritems(event)):
# transform the unicode objects to plain strings with utf-8 encoding
if isinstance(value, unicode):
if isinstance(value, text_type):
try:
event[key] = event[key].encode('utf-8')
except UnicodeError:
Expand All @@ -190,7 +192,7 @@ def event(self, event):
if event.get('timestamp'):
event['timestamp'] = int(event['timestamp'])
if event.get('aggregation_key'):
event['aggregation_key'] = str(event['aggregation_key'])
event['aggregation_key'] = ensure_bytes(event['aggregation_key'])
aggregator.submit_event(self, self.check_id, event)

# TODO(olivier): implement service_metadata if it's worth it
Expand All @@ -209,7 +211,7 @@ def normalize(self, metric, prefix=None, fix_case=False):
:param fix_case A boolean, indicating whether to make sure that
the metric name returned is in underscore_case
"""
if isinstance(metric, unicode):
if isinstance(metric, text_type):
metric_name = unicodedata.normalize('NFKD', metric).encode('ascii', 'ignore')
else:
metric_name = metric
Expand Down Expand Up @@ -301,7 +303,7 @@ def get_warnings(self):
def run(self):
try:
self.check(copy.deepcopy(self.instances[0]))
result = ''
result = b''

except Exception as e:
result = json.dumps([
Expand Down
148 changes: 12 additions & 136 deletions datadog_checks_base/datadog_checks/checks/libs/thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@

# flake8: noqa

import Queue
import sys
import threading
import traceback

from six.moves import queue, range


# Item pushed on the work queue to tell the worker threads to terminate
SENTINEL = "QUIT"
Expand All @@ -53,7 +54,7 @@ def __init__(self, workq, *args, **kwds):

def run(self):
"""Process the work unit, or wait for sentinel to exit"""
while 1:
while True:
self.running = True
workunit = self._workq.get()
if is_sentinel(workunit):
Expand All @@ -65,8 +66,6 @@ def run(self):
self.running = False




class Pool(object):
"""
The Pool class represents a pool of worker threads. It has methods
Expand All @@ -79,10 +78,10 @@ def __init__(self, nworkers, name="Pool"):
\param nworkers (integer) number of worker threads to start
\param name (string) prefix for the worker threads' name
"""
self._workq = Queue.Queue()
self._workq = queue.Queue()
self._closed = False
self._workers = []
for idx in xrange(nworkers):
for idx in range(nworkers):
thr = PoolWorker(self._workq, name="Worker-%s-%d" % (name, idx))
try:
thr.start()
Expand Down Expand Up @@ -215,12 +214,12 @@ def terminate(self):
try:
while 1:
self._workq.get_nowait()
except Queue.Empty:
except queue.Empty:
pass

# Send one sentinel for each worker thread: each thread will die
# eventually, leaving the next sentinel for the next thread
for thr in self._workers:
for _ in self._workers:
self._workq.put(SENTINEL)

def join(self):
Expand All @@ -229,7 +228,7 @@ def join(self):
for thr in self._workers:
thr.join()

def _create_sequences(self, func, iterable, chunksize, collector = None):
def _create_sequences(self, func, iterable, chunksize, collector=None):
"""
Create the WorkUnit objects to process and pushes them on the
work queue. Each work unit is meant to process a slice of
Expand All @@ -247,9 +246,9 @@ def _create_sequences(self, func, iterable, chunksize, collector = None):
exit_loop = False
while not exit_loop:
seq = []
for i in xrange(chunksize or 1):
for _ in range(chunksize or 1):
try:
arg = it_.next()
arg = next(it_)
except StopIteration:
exit_loop = True
break
Expand Down Expand Up @@ -324,7 +323,7 @@ class ApplyResult(object):
The result objects returns by the Pool::*_async() methods are of
this type"""
def __init__(self, collector = None, callback = None):
def __init__(self, collector=None, callback=None):
"""
\param collector when not None, the notify_ready() method of
the collector will be called when the result from the Job is
Expand Down Expand Up @@ -354,7 +353,7 @@ def get(self, timeout=None):
raise TimeoutError("Result not available within %fs" % timeout)
if self._success:
return self._data
raise self._data[0], self._data[1], self._data[2]
raise self._data[0]

def wait(self, timeout=None):
"""Waits until the result is available or until timeout
Expand Down Expand Up @@ -629,127 +628,4 @@ def notify_ready(self, apply_result):
else:
self._to_notify._set_value(lst)


def _test():
"""Some tests"""
import thread
import time

def f(x):
return x*x

def work(seconds):
print "[%d] Start to work for %fs..." % (thread.get_ident(), seconds)
time.sleep(seconds)
print "[%d] Work done (%fs)." % (thread.get_ident(), seconds)
return "%d slept %fs" % (thread.get_ident(), seconds)

### Test copy/pasted from multiprocessing
pool = Pool(9) # start 4 worker threads

result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless slow computer

print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"

it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless slow computer

# Test apply_sync exceptions
result = pool.apply_async(time.sleep, (3,))
try:
print result.get(timeout=1) # raises `TimeoutError`
except TimeoutError:
print "Good. Got expected timeout exception."
else:
assert False, "Expected exception !"
print result.get()

def cb(s):
print "Result ready: %s" % s

# Test imap()
for res in pool.imap(work, xrange(10, 3, -1), chunksize=4):
print "Item:", res

# Test imap_unordered()
for res in pool.imap_unordered(work, xrange(10, 3, -1)):
print "Item:", res

# Test map_async()
result = pool.map_async(work, xrange(10), callback=cb)
try:
print result.get(timeout=1) # raises `TimeoutError`
except TimeoutError:
print "Good. Got expected timeout exception."
else:
assert False, "Expected exception !"
print result.get()

# Test imap_async()
result = pool.imap_async(work, xrange(3, 10), callback=cb)
try:
print result.get(timeout=1) # raises `TimeoutError`
except TimeoutError:
print "Good. Got expected timeout exception."
else:
assert False, "Expected exception !"
for i in result.get():
print "Item:", i
print "### Loop again:"
for i in result.get():
print "Item2:", i

# Test imap_unordered_async()
result = pool.imap_unordered_async(work, xrange(10, 3, -1), callback=cb)
try:
print result.get(timeout=1) # raises `TimeoutError`
except TimeoutError:
print "Good. Got expected timeout exception."
else:
assert False, "Expected exception !"
for i in result.get():
print "Item1:", i
for i in result.get():
print "Item2:", i
r = result.get()
for i in r:
print "Item3:", i
for i in r:
print "Item4:", i
for i in r:
print "Item5:", i

#
# The case for the exceptions
#

# Exceptions in imap_unordered_async()
result = pool.imap_unordered_async(work, xrange(2, -10, -1), callback=cb)
time.sleep(3)
try:
for i in result.get():
print "Got item:", i
except IOError:
print "Good. Got expected exception:"
traceback.print_exc()

# Exceptions in imap_async()
result = pool.imap_async(work, xrange(2, -10, -1), callback=cb)
time.sleep(3)
try:
for i in result.get():
print "Got item:", i
except IOError:
print "Good. Got expected exception:"
traceback.print_exc()

# Stop the test: need to stop the pool !!!
pool.terminate()
print "End of tests"

if __name__ == "__main__":
_test()
## end of http://code.activestate.com/recipes/576519/ }}}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# (C) Datadog, Inc. 2018
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)

from .mixins import PrometheusScraperMixin

from .. import AgentCheck
from ..base import AgentCheck
from ...errors import CheckException

from six import string_types


class PrometheusScraper(PrometheusScraperMixin):
"""
Expand All @@ -29,7 +29,6 @@ def _submit_rate(self, metric_name, val, metric, custom_tags=None, hostname=None
_tags = self._metric_tags(metric_name, val, metric, custom_tags, hostname)
self.check.rate('{}.{}'.format(self.NAMESPACE, metric_name), val, _tags, hostname=hostname)


def _submit_gauge(self, metric_name, val, metric, custom_tags=None, hostname=None):
"""
Submit a metric as a gauge, additional tags provided will be added to
Expand Down Expand Up @@ -63,7 +62,9 @@ def _metric_tags(self, metric_name, val, metric, custom_tags=None, hostname=None
if self.labels_mapper is not None and label.name in self.labels_mapper:
tag_name = self.labels_mapper[label.name]
_tags.append('{}:{}'.format(tag_name, label.value))
return self._finalize_tags_to_submit(_tags, metric_name, val, metric, custom_tags=custom_tags, hostname=hostname)
return self._finalize_tags_to_submit(
_tags, metric_name, val, metric, custom_tags=custom_tags, hostname=hostname
)

def _submit_service_check(self, *args, **kwargs):
self.check.service_check(*args, **kwargs)
Expand All @@ -83,10 +84,10 @@ class GenericPrometheusCheck(AgentCheck):
- bar
- foo
"""
def __init__(self, name, init_config, agentConfig, instances=None, default_instances={}, default_namespace=""):
def __init__(self, name, init_config, agentConfig, instances=None, default_instances=None, default_namespace=""):
super(GenericPrometheusCheck, self).__init__(name, init_config, agentConfig, instances)
self.scrapers_map = {}
self.default_instances = default_instances
self.default_instances = default_instances if default_instances is not None else {}
self.default_namespace = default_namespace
for instance in instances:
self.get_scraper(instance)
Expand All @@ -113,7 +114,6 @@ def _extract_rate_metrics(self, type_overrides):
type_overrides[metric] = "gauge"
return rate_metrics


def get_scraper(self, instance):
namespace = instance.get("namespace", "")
# Check if we have a namespace
Expand All @@ -140,7 +140,7 @@ def get_scraper(self, instance):
# We merge list and dictionnaries from optional defaults & instance settings
metrics = default_instance.get("metrics", []) + instance.get("metrics", [])
for metric in metrics:
if isinstance(metric, basestring):
if isinstance(metric, string_types):
metrics_mapper[metric] = metric
else:
metrics_mapper.update(metric)
Expand Down
Loading

0 comments on commit d7ddff5

Please sign in to comment.