Skip to content

Commit

Permalink
Add database query utilities (#5045)
Browse files Browse the repository at this point in the history
* Add database query utilities

* address review
  • Loading branch information
ofek authored Nov 20, 2019
1 parent bb16139 commit 1ad533b
Show file tree
Hide file tree
Showing 7 changed files with 1,342 additions and 0 deletions.
4 changes: 4 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
SECOND = 1
MILLISECOND = 1000
MICROSECOND = 1000000
NANOSECOND = 1000000000

TIME_UNITS = {'microsecond': MICROSECOND, 'millisecond': MILLISECOND, 'nanosecond': NANOSECOND, 'second': SECOND}
5 changes: 5 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from .core import QueryManager
from .query import Query
126 changes: 126 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from itertools import chain

from ...config import is_affirmative
from ..containers import iter_unique
from .query import Query
from .transform import TRANSFORMERS
from .utils import SUBMISSION_METHODS, create_submission_transformer


class QueryManager(object):
def __init__(self, check, executor, queries=None, tags=None, error_handler=None):
self.check = check
self.executor = executor
self.queries = queries or []
self.tags = tags or []
self.error_handler = error_handler

custom_queries = list(self.check.instance.get('custom_queries', []))
use_global_custom_queries = self.check.instance.get('use_global_custom_queries', True)

# Handle overrides
if use_global_custom_queries == 'extend':
custom_queries.extend(self.check.init_config.get('global_custom_queries', []))
elif (
not custom_queries
and 'global_custom_queries' in self.check.init_config
and is_affirmative(use_global_custom_queries)
):
custom_queries = self.check.init_config.get('global_custom_queries', [])

# Deduplicate
for i, custom_query in enumerate(iter_unique(custom_queries), 1):
query = Query(custom_query)
query.query_data.setdefault('name', 'custom query #{}'.format(i))
self.queries.append(query)

def compile_queries(self):
transformers = TRANSFORMERS.copy()

for submission_method in SUBMISSION_METHODS:
method = getattr(self.check, submission_method)
# Save each method in the initializer -> callable format
transformers[submission_method] = create_submission_transformer(method)

for query in self.queries:
query.compile(transformers)

def execute(self):
logger = self.check.log
global_tags = self.tags

for query in self.queries:
query_name = query.name
query_columns = query.columns
query_tags = query.tags
num_columns = len(query_columns)

try:
rows = self.execute_query(query.query)
except Exception as e:
if self.error_handler:
logger.error('Error querying %s: %s', query_name, self.error_handler(str(e)))
else:
logger.error('Error querying %s: %s', query_name, e)

continue

for row in rows:
if not row:
logger.debug('Query %s returned an empty result', query_name)
continue

if num_columns != len(row):
logger.error(
'Query %s expected %d column%s, got %d',
query_name,
num_columns,
's' if num_columns > 1 else '',
len(row),
)
continue

row_values = {}
submission_queue = []

tags = list(global_tags)
tags.extend(query_tags)

for (column_name, transformer), value in zip(query_columns, row):
# Columns can be ignored via configuration
if not column_name:
continue

row_values[column_name] = value

column_type, transformer = transformer

# The transformer can be None for `source` types. Those such columns do not submit
# anything but are collected into the row values for other columns to reference.
if transformer is None:
continue
elif column_type == 'tag':
tags.append(transformer(value, None))
else:
submission_queue.append((transformer, value))

for transformer, value in submission_queue:
transformer(value, row_values, tags=tags)

def execute_query(self, query):
rows = self.executor(query)
if rows is None:
return iter([])
else:
rows = iter(rows)

# Ensure we trigger query execution
try:
first_row = next(rows)
except StopIteration:
return iter([])

return chain((first_row,), rows)
96 changes: 96 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from copy import deepcopy

from six import raise_from


class Query(object):
def __init__(self, query_data):
self.query_data = deepcopy(query_data or {})
self.name = None
self.query = None
self.columns = None
self.tags = None

def compile(self, transformers):
# Check for previous compilation
if self.name is not None:
return

query_name = self.query_data.get('name')
if not query_name:
raise ValueError('query field `name` is required')
elif not isinstance(query_name, str):
raise ValueError('query field `name` must be a string')

query = self.query_data.get('query')
if not query:
raise ValueError('field `query` for {} is required'.format(query_name))
elif not isinstance(query, str):
raise ValueError('field `query` for {} must be a string'.format(query_name))

columns = self.query_data.get('columns')
if not columns:
raise ValueError('field `columns` for {} is required'.format(query_name))
elif not isinstance(columns, list):
raise ValueError('field `columns` for {} must be a list'.format(query_name))

tags = self.query_data.get('tags')
if tags is not None and not isinstance(tags, list):
raise ValueError('field `tags` for {} must be a list'.format(query_name))

column_data = []
for i, column in enumerate(columns, 1):
# Columns can be ignored via configuration.
if not column:
column_data.append((None, None))
continue
elif not isinstance(column, dict):
raise ValueError('column #{} of {} is not a mapping'.format(i, query_name))

column_name = column.get('name')
if not column_name:
raise ValueError('field `name` for column #{} of {} is required'.format(i, query_name))
elif not isinstance(column_name, str):
raise ValueError('field `name` for column #{} of {} must be a string'.format(i, query_name))

column_type = column.get('type')
if not column_type:
raise ValueError('field `type` for column {} of {} is required'.format(column_name, query_name))
elif not isinstance(column_type, str):
raise ValueError('field `type` for column {} of {} must be a string'.format(column_name, query_name))
elif column_type == 'source':
column_data.append((column_name, (None, None)))
continue
elif column_type not in transformers:
raise ValueError('unknown type `{}` for column {} of {}'.format(column_type, column_name, query_name))

modifiers = {key: value for key, value in column.items() if key not in ('name', 'type')}

try:
transformer = transformers[column_type](column_name, transformers, **modifiers)
except Exception as e:
error = 'error compiling type `{}` for column {} of {}: {}'.format(
column_type, column_name, query_name, e
)

# Prepend helpful error text.
#
# When an exception is raised in the context of another one, both will be printed. To avoid
# this we set the context to None. https://www.python.org/dev/peps/pep-0409/
raise_from(type(e)(error), None)
else:
if column_type == 'tag':
column_data.append((column_name, (column_type, transformer)))
else:
# All these would actually submit data. As that is the default case, we represent it as
# a reference to None since if we use e.g. `value` it would never be checked anyway.
column_data.append((column_name, (None, transformer)))

self.name = query_name
self.query = query
self.columns = tuple(column_data)
self.tags = tags
del self.query_data
119 changes: 119 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from ... import is_affirmative
from .. import constants
from ..common import total_time_to_temporal_percent


def get_tag(column_name, transformers, **modifiers):
template = '{}:{{}}'.format(column_name)
boolean = is_affirmative(modifiers.pop('boolean', None))

def tag(value, *_, **kwargs):
if boolean:
value = str(is_affirmative(value)).lower()

return template.format(value)

return tag


def get_monotonic_gauge(column_name, transformers, **modifiers):
gauge = transformers['gauge']('{}.total'.format(column_name), transformers, **modifiers)
monotonic_count = transformers['monotonic_count']('{}.count'.format(column_name), transformers, **modifiers)

def monotonic_gauge(value, *_, **kwargs):
gauge(value, **kwargs)
monotonic_count(value, **kwargs)

return monotonic_gauge


def get_temporal_percent(column_name, transformers, **modifiers):
scale = modifiers.pop('scale', None)
if scale is None:
raise ValueError('the `scale` parameter is required')

if isinstance(scale, str):
scale = constants.TIME_UNITS.get(scale.lower())
if scale is None:
raise ValueError(
'the `scale` parameter must be one of: {}'.format(' | '.join(sorted(constants.TIME_UNITS)))
)
elif not isinstance(scale, int):
raise ValueError(
'the `scale` parameter must be an integer representing parts of a second e.g. 1000 for millisecond'
)

rate = transformers['rate'](column_name, transformers, **modifiers)

def temporal_percent(value, *_, **kwargs):
rate(total_time_to_temporal_percent(value, scale=scale), **kwargs)

return temporal_percent


def get_match(column_name, transformers, **modifiers):
# Do work in a separate function to avoid having to `del` a bunch of variables
compiled_items = _compile_match_items(transformers, modifiers)

def match(value, row, *_, **kwargs):
if value in compiled_items:
source, transformer = compiled_items[value]
transformer(row[source], **kwargs)

return match


TRANSFORMERS = {
'temporal_percent': get_temporal_percent,
'monotonic_gauge': get_monotonic_gauge,
'tag': get_tag,
'match': get_match,
}


def _compile_match_items(transformers, modifiers):
items = modifiers.pop('items', None)
if items is None:
raise ValueError('the `items` parameter is required')

if not isinstance(items, dict):
raise ValueError('the `items` parameter must be a mapping')

global_transform_source = modifiers.pop('source', None)

compiled_items = {}
for item, data in items.items():
if not isinstance(data, dict):
raise ValueError('item `{}` is not a mapping'.format(item))

transform_name = data.pop('name', None)
if not transform_name:
raise ValueError('the `name` parameter for item `{}` is required'.format(item))
elif not isinstance(transform_name, str):
raise ValueError('the `name` parameter for item `{}` must be a string'.format(item))

transform_type = data.pop('type', None)
if not transform_type:
raise ValueError('the `type` parameter for item `{}` is required'.format(item))
elif not isinstance(transform_type, str):
raise ValueError('the `type` parameter for item `{}` must be a string'.format(item))
elif transform_type not in transformers:
raise ValueError('unknown type `{}` for item `{}`'.format(transform_type, item))

transform_source = data.pop('source', global_transform_source)
if not transform_source:
raise ValueError('the `source` parameter for item `{}` is required'.format(item))
elif not isinstance(transform_source, str):
raise ValueError('the `source` parameter for item `{}` must be a string'.format(item))

transform_modifiers = modifiers.copy()
transform_modifiers.update(data)
compiled_items[item] = (
transform_source,
transformers[transform_type](transform_name, transformers, **transform_modifiers),
)

return compiled_items
15 changes: 15 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
SUBMISSION_METHODS = {'gauge', 'count', 'monotonic_count', 'rate', 'histogram', 'historate'}


def create_submission_transformer(submit_method):
def get_transformer(name, _, **modifiers):
def transformer(value, *_, **kwargs):
kwargs.update(modifiers)
submit_method(name, value, **kwargs)

return transformer

return get_transformer
Loading

0 comments on commit 1ad533b

Please sign in to comment.