Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add database query utilities #5045

Merged
merged 2 commits into from
Nov 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
SECOND = 1
MILLISECOND = 1000
MICROSECOND = 1000000
NANOSECOND = 1000000000

TIME_UNITS = {'microsecond': MICROSECOND, 'millisecond': MILLISECOND, 'nanosecond': NANOSECOND, 'second': SECOND}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from .core import QueryManager
from .query import Query
126 changes: 126 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from itertools import chain

from ...config import is_affirmative
from ..containers import iter_unique
from .query import Query
from .transform import TRANSFORMERS
from .utils import SUBMISSION_METHODS, create_submission_transformer


class QueryManager(object):
def __init__(self, check, executor, queries=None, tags=None, error_handler=None):
self.check = check
self.executor = executor
self.queries = queries or []
self.tags = tags or []
self.error_handler = error_handler

custom_queries = list(self.check.instance.get('custom_queries', []))
use_global_custom_queries = self.check.instance.get('use_global_custom_queries', True)

# Handle overrides
if use_global_custom_queries == 'extend':
custom_queries.extend(self.check.init_config.get('global_custom_queries', []))
elif (
not custom_queries
and 'global_custom_queries' in self.check.init_config
and is_affirmative(use_global_custom_queries)
):
custom_queries = self.check.init_config.get('global_custom_queries', [])

# Deduplicate
for i, custom_query in enumerate(iter_unique(custom_queries), 1):
query = Query(custom_query)
query.query_data.setdefault('name', 'custom query #{}'.format(i))
self.queries.append(query)

def compile_queries(self):
transformers = TRANSFORMERS.copy()

for submission_method in SUBMISSION_METHODS:
method = getattr(self.check, submission_method)
# Save each method in the initializer -> callable format
transformers[submission_method] = create_submission_transformer(method)

for query in self.queries:
query.compile(transformers)

def execute(self):
logger = self.check.log
global_tags = self.tags

for query in self.queries:
query_name = query.name
query_columns = query.columns
query_tags = query.tags
num_columns = len(query_columns)

try:
rows = self.execute_query(query.query)
except Exception as e:
if self.error_handler:
logger.error('Error querying %s: %s', query_name, self.error_handler(str(e)))
else:
logger.error('Error querying %s: %s', query_name, e)

continue

for row in rows:
if not row:
logger.debug('Query %s returned an empty result', query_name)
continue

if num_columns != len(row):
logger.error(
'Query %s expected %d column%s, got %d',
query_name,
num_columns,
's' if num_columns > 1 else '',
len(row),
)
continue

row_values = {}
submission_queue = []

tags = list(global_tags)
tags.extend(query_tags)

for (column_name, transformer), value in zip(query_columns, row):
# Columns can be ignored via configuration
if not column_name:
continue

row_values[column_name] = value

column_type, transformer = transformer

# The transformer can be None for `source` types. Those such columns do not submit
# anything but are collected into the row values for other columns to reference.
if transformer is None:
continue
elif column_type == 'tag':
tags.append(transformer(value, None))
else:
submission_queue.append((transformer, value))

for transformer, value in submission_queue:
transformer(value, row_values, tags=tags)

def execute_query(self, query):
rows = self.executor(query)
if rows is None:
return iter([])
else:
rows = iter(rows)

# Ensure we trigger query execution
try:
first_row = next(rows)
except StopIteration:
return iter([])

return chain((first_row,), rows)
96 changes: 96 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from copy import deepcopy

from six import raise_from


class Query(object):
def __init__(self, query_data):
self.query_data = deepcopy(query_data or {})
self.name = None
self.query = None
self.columns = None
self.tags = None

def compile(self, transformers):
# Check for previous compilation
if self.name is not None:
return

query_name = self.query_data.get('name')
if not query_name:
raise ValueError('query field `name` is required')
elif not isinstance(query_name, str):
raise ValueError('query field `name` must be a string')

query = self.query_data.get('query')
if not query:
raise ValueError('field `query` for {} is required'.format(query_name))
elif not isinstance(query, str):
raise ValueError('field `query` for {} must be a string'.format(query_name))

columns = self.query_data.get('columns')
if not columns:
raise ValueError('field `columns` for {} is required'.format(query_name))
elif not isinstance(columns, list):
raise ValueError('field `columns` for {} must be a list'.format(query_name))

tags = self.query_data.get('tags')
if tags is not None and not isinstance(tags, list):
raise ValueError('field `tags` for {} must be a list'.format(query_name))

column_data = []
for i, column in enumerate(columns, 1):
# Columns can be ignored via configuration.
if not column:
column_data.append((None, None))
continue
elif not isinstance(column, dict):
raise ValueError('column #{} of {} is not a mapping'.format(i, query_name))

column_name = column.get('name')
if not column_name:
raise ValueError('field `name` for column #{} of {} is required'.format(i, query_name))
elif not isinstance(column_name, str):
raise ValueError('field `name` for column #{} of {} must be a string'.format(i, query_name))

column_type = column.get('type')
if not column_type:
raise ValueError('field `type` for column {} of {} is required'.format(column_name, query_name))
elif not isinstance(column_type, str):
raise ValueError('field `type` for column {} of {} must be a string'.format(column_name, query_name))
elif column_type == 'source':
column_data.append((column_name, (None, None)))
continue
elif column_type not in transformers:
raise ValueError('unknown type `{}` for column {} of {}'.format(column_type, column_name, query_name))

modifiers = {key: value for key, value in column.items() if key not in ('name', 'type')}

try:
transformer = transformers[column_type](column_name, transformers, **modifiers)
except Exception as e:
error = 'error compiling type `{}` for column {} of {}: {}'.format(
column_type, column_name, query_name, e
)

# Prepend helpful error text.
#
# When an exception is raised in the context of another one, both will be printed. To avoid
# this we set the context to None. https://www.python.org/dev/peps/pep-0409/
raise_from(type(e)(error), None)
ofek marked this conversation as resolved.
Show resolved Hide resolved
else:
if column_type == 'tag':
column_data.append((column_name, (column_type, transformer)))
else:
# All these would actually submit data. As that is the default case, we represent it as
# a reference to None since if we use e.g. `value` it would never be checked anyway.
column_data.append((column_name, (None, transformer)))

self.name = query_name
self.query = query
self.columns = tuple(column_data)
self.tags = tags
del self.query_data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to do that instead of setting it to None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, goal is just to not use memory needlessly

119 changes: 119 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from ... import is_affirmative
from .. import constants
from ..common import total_time_to_temporal_percent


def get_tag(column_name, transformers, **modifiers):
template = '{}:{{}}'.format(column_name)
boolean = is_affirmative(modifiers.pop('boolean', None))

def tag(value, *_, **kwargs):
if boolean:
value = str(is_affirmative(value)).lower()

return template.format(value)

return tag


def get_monotonic_gauge(column_name, transformers, **modifiers):
gauge = transformers['gauge']('{}.total'.format(column_name), transformers, **modifiers)
monotonic_count = transformers['monotonic_count']('{}.count'.format(column_name), transformers, **modifiers)

def monotonic_gauge(value, *_, **kwargs):
gauge(value, **kwargs)
monotonic_count(value, **kwargs)

return monotonic_gauge


def get_temporal_percent(column_name, transformers, **modifiers):
scale = modifiers.pop('scale', None)
if scale is None:
raise ValueError('the `scale` parameter is required')

if isinstance(scale, str):
scale = constants.TIME_UNITS.get(scale.lower())
if scale is None:
raise ValueError(
'the `scale` parameter must be one of: {}'.format(' | '.join(sorted(constants.TIME_UNITS)))
)
elif not isinstance(scale, int):
raise ValueError(
'the `scale` parameter must be an integer representing parts of a second e.g. 1000 for millisecond'
)

rate = transformers['rate'](column_name, transformers, **modifiers)

def temporal_percent(value, *_, **kwargs):
rate(total_time_to_temporal_percent(value, scale=scale), **kwargs)

return temporal_percent


def get_match(column_name, transformers, **modifiers):
# Do work in a separate function to avoid having to `del` a bunch of variables
compiled_items = _compile_match_items(transformers, modifiers)

def match(value, row, *_, **kwargs):
if value in compiled_items:
source, transformer = compiled_items[value]
transformer(row[source], **kwargs)

return match


TRANSFORMERS = {
'temporal_percent': get_temporal_percent,
'monotonic_gauge': get_monotonic_gauge,
'tag': get_tag,
'match': get_match,
}


def _compile_match_items(transformers, modifiers):
items = modifiers.pop('items', None)
if items is None:
raise ValueError('the `items` parameter is required')

if not isinstance(items, dict):
raise ValueError('the `items` parameter must be a mapping')

global_transform_source = modifiers.pop('source', None)

compiled_items = {}
for item, data in items.items():
if not isinstance(data, dict):
raise ValueError('item `{}` is not a mapping'.format(item))

transform_name = data.pop('name', None)
if not transform_name:
raise ValueError('the `name` parameter for item `{}` is required'.format(item))
elif not isinstance(transform_name, str):
raise ValueError('the `name` parameter for item `{}` must be a string'.format(item))

transform_type = data.pop('type', None)
if not transform_type:
raise ValueError('the `type` parameter for item `{}` is required'.format(item))
elif not isinstance(transform_type, str):
raise ValueError('the `type` parameter for item `{}` must be a string'.format(item))
elif transform_type not in transformers:
raise ValueError('unknown type `{}` for item `{}`'.format(transform_type, item))

transform_source = data.pop('source', global_transform_source)
if not transform_source:
raise ValueError('the `source` parameter for item `{}` is required'.format(item))
elif not isinstance(transform_source, str):
raise ValueError('the `source` parameter for item `{}` must be a string'.format(item))

transform_modifiers = modifiers.copy()
transform_modifiers.update(data)
compiled_items[item] = (
transform_source,
transformers[transform_type](transform_name, transformers, **transform_modifiers),
)

return compiled_items
15 changes: 15 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# (C) Datadog, Inc. 2019
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
SUBMISSION_METHODS = {'gauge', 'count', 'monotonic_count', 'rate', 'histogram', 'historate'}


def create_submission_transformer(submit_method):
def get_transformer(name, _, **modifiers):
def transformer(value, *_, **kwargs):
kwargs.update(modifiers)
submit_method(name, value, **kwargs)

return transformer

return get_transformer
Loading