diff --git a/datadog_checks_base/datadog_checks/base/utils/constants.py b/datadog_checks_base/datadog_checks/base/utils/constants.py index 22b719aa366e3..23a42525b1560 100644 --- a/datadog_checks_base/datadog_checks/base/utils/constants.py +++ b/datadog_checks_base/datadog_checks/base/utils/constants.py @@ -1,5 +1,9 @@ # (C) Datadog, Inc. 2019 # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +SECOND = 1 MILLISECOND = 1000 MICROSECOND = 1000000 +NANOSECOND = 1000000000 + +TIME_UNITS = {'microsecond': MICROSECOND, 'millisecond': MILLISECOND, 'nanosecond': NANOSECOND, 'second': SECOND} diff --git a/datadog_checks_base/datadog_checks/base/utils/db/__init__.py b/datadog_checks_base/datadog_checks/base/utils/db/__init__.py new file mode 100644 index 0000000000000..e99d6d00d338a --- /dev/null +++ b/datadog_checks_base/datadog_checks/base/utils/db/__init__.py @@ -0,0 +1,5 @@ +# (C) Datadog, Inc. 2019 +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from .core import QueryManager +from .query import Query diff --git a/datadog_checks_base/datadog_checks/base/utils/db/core.py b/datadog_checks_base/datadog_checks/base/utils/db/core.py new file mode 100644 index 0000000000000..c70d6d2537f04 --- /dev/null +++ b/datadog_checks_base/datadog_checks/base/utils/db/core.py @@ -0,0 +1,126 @@ +# (C) Datadog, Inc. 2019 +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from itertools import chain + +from ...config import is_affirmative +from ..containers import iter_unique +from .query import Query +from .transform import TRANSFORMERS +from .utils import SUBMISSION_METHODS, create_submission_transformer + + +class QueryManager(object): + def __init__(self, check, executor, queries=None, tags=None, error_handler=None): + self.check = check + self.executor = executor + self.queries = queries or [] + self.tags = tags or [] + self.error_handler = error_handler + + custom_queries = list(self.check.instance.get('custom_queries', [])) + use_global_custom_queries = self.check.instance.get('use_global_custom_queries', True) + + # Handle overrides + if use_global_custom_queries == 'extend': + custom_queries.extend(self.check.init_config.get('global_custom_queries', [])) + elif ( + not custom_queries + and 'global_custom_queries' in self.check.init_config + and is_affirmative(use_global_custom_queries) + ): + custom_queries = self.check.init_config.get('global_custom_queries', []) + + # Deduplicate + for i, custom_query in enumerate(iter_unique(custom_queries), 1): + query = Query(custom_query) + query.query_data.setdefault('name', 'custom query #{}'.format(i)) + self.queries.append(query) + + def compile_queries(self): + transformers = TRANSFORMERS.copy() + + for submission_method in SUBMISSION_METHODS: + method = getattr(self.check, submission_method) + # Save each method in the initializer -> callable format + transformers[submission_method] = create_submission_transformer(method) + + for query in self.queries: + query.compile(transformers) + + def execute(self): + logger = self.check.log + global_tags = self.tags + + for query in self.queries: + query_name = query.name + query_columns = query.columns + query_tags = query.tags + num_columns = len(query_columns) + + try: + rows = self.execute_query(query.query) + except Exception as e: + if self.error_handler: + logger.error('Error querying %s: %s', query_name, self.error_handler(str(e))) + else: + logger.error('Error querying %s: %s', query_name, e) + + continue + + for row in rows: + if not row: + logger.debug('Query %s returned an empty result', query_name) + continue + + if num_columns != len(row): + logger.error( + 'Query %s expected %d column%s, got %d', + query_name, + num_columns, + 's' if num_columns > 1 else '', + len(row), + ) + continue + + row_values = {} + submission_queue = [] + + tags = list(global_tags) + tags.extend(query_tags) + + for (column_name, transformer), value in zip(query_columns, row): + # Columns can be ignored via configuration + if not column_name: + continue + + row_values[column_name] = value + + column_type, transformer = transformer + + # The transformer can be None for `source` types. Those such columns do not submit + # anything but are collected into the row values for other columns to reference. + if transformer is None: + continue + elif column_type == 'tag': + tags.append(transformer(value, None)) + else: + submission_queue.append((transformer, value)) + + for transformer, value in submission_queue: + transformer(value, row_values, tags=tags) + + def execute_query(self, query): + rows = self.executor(query) + if rows is None: + return iter([]) + else: + rows = iter(rows) + + # Ensure we trigger query execution + try: + first_row = next(rows) + except StopIteration: + return iter([]) + + return chain((first_row,), rows) diff --git a/datadog_checks_base/datadog_checks/base/utils/db/query.py b/datadog_checks_base/datadog_checks/base/utils/db/query.py new file mode 100644 index 0000000000000..607e7cf116623 --- /dev/null +++ b/datadog_checks_base/datadog_checks/base/utils/db/query.py @@ -0,0 +1,96 @@ +# (C) Datadog, Inc. 2019 +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from copy import deepcopy + +from six import raise_from + + +class Query(object): + def __init__(self, query_data): + self.query_data = deepcopy(query_data or {}) + self.name = None + self.query = None + self.columns = None + self.tags = None + + def compile(self, transformers): + # Check for previous compilation + if self.name is not None: + return + + query_name = self.query_data.get('name') + if not query_name: + raise ValueError('query field `name` is required') + elif not isinstance(query_name, str): + raise ValueError('query field `name` must be a string') + + query = self.query_data.get('query') + if not query: + raise ValueError('field `query` for {} is required'.format(query_name)) + elif not isinstance(query, str): + raise ValueError('field `query` for {} must be a string'.format(query_name)) + + columns = self.query_data.get('columns') + if not columns: + raise ValueError('field `columns` for {} is required'.format(query_name)) + elif not isinstance(columns, list): + raise ValueError('field `columns` for {} must be a list'.format(query_name)) + + tags = self.query_data.get('tags') + if tags is not None and not isinstance(tags, list): + raise ValueError('field `tags` for {} must be a list'.format(query_name)) + + column_data = [] + for i, column in enumerate(columns, 1): + # Columns can be ignored via configuration. + if not column: + column_data.append((None, None)) + continue + elif not isinstance(column, dict): + raise ValueError('column #{} of {} is not a mapping'.format(i, query_name)) + + column_name = column.get('name') + if not column_name: + raise ValueError('field `name` for column #{} of {} is required'.format(i, query_name)) + elif not isinstance(column_name, str): + raise ValueError('field `name` for column #{} of {} must be a string'.format(i, query_name)) + + column_type = column.get('type') + if not column_type: + raise ValueError('field `type` for column {} of {} is required'.format(column_name, query_name)) + elif not isinstance(column_type, str): + raise ValueError('field `type` for column {} of {} must be a string'.format(column_name, query_name)) + elif column_type == 'source': + column_data.append((column_name, (None, None))) + continue + elif column_type not in transformers: + raise ValueError('unknown type `{}` for column {} of {}'.format(column_type, column_name, query_name)) + + modifiers = {key: value for key, value in column.items() if key not in ('name', 'type')} + + try: + transformer = transformers[column_type](column_name, transformers, **modifiers) + except Exception as e: + error = 'error compiling type `{}` for column {} of {}: {}'.format( + column_type, column_name, query_name, e + ) + + # Prepend helpful error text. + # + # When an exception is raised in the context of another one, both will be printed. To avoid + # this we set the context to None. https://www.python.org/dev/peps/pep-0409/ + raise_from(type(e)(error), None) + else: + if column_type == 'tag': + column_data.append((column_name, (column_type, transformer))) + else: + # All these would actually submit data. As that is the default case, we represent it as + # a reference to None since if we use e.g. `value` it would never be checked anyway. + column_data.append((column_name, (None, transformer))) + + self.name = query_name + self.query = query + self.columns = tuple(column_data) + self.tags = tags + del self.query_data diff --git a/datadog_checks_base/datadog_checks/base/utils/db/transform.py b/datadog_checks_base/datadog_checks/base/utils/db/transform.py new file mode 100644 index 0000000000000..14c918b78cdc5 --- /dev/null +++ b/datadog_checks_base/datadog_checks/base/utils/db/transform.py @@ -0,0 +1,119 @@ +# (C) Datadog, Inc. 2019 +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +from ... import is_affirmative +from .. import constants +from ..common import total_time_to_temporal_percent + + +def get_tag(column_name, transformers, **modifiers): + template = '{}:{{}}'.format(column_name) + boolean = is_affirmative(modifiers.pop('boolean', None)) + + def tag(value, *_, **kwargs): + if boolean: + value = str(is_affirmative(value)).lower() + + return template.format(value) + + return tag + + +def get_monotonic_gauge(column_name, transformers, **modifiers): + gauge = transformers['gauge']('{}.total'.format(column_name), transformers, **modifiers) + monotonic_count = transformers['monotonic_count']('{}.count'.format(column_name), transformers, **modifiers) + + def monotonic_gauge(value, *_, **kwargs): + gauge(value, **kwargs) + monotonic_count(value, **kwargs) + + return monotonic_gauge + + +def get_temporal_percent(column_name, transformers, **modifiers): + scale = modifiers.pop('scale', None) + if scale is None: + raise ValueError('the `scale` parameter is required') + + if isinstance(scale, str): + scale = constants.TIME_UNITS.get(scale.lower()) + if scale is None: + raise ValueError( + 'the `scale` parameter must be one of: {}'.format(' | '.join(sorted(constants.TIME_UNITS))) + ) + elif not isinstance(scale, int): + raise ValueError( + 'the `scale` parameter must be an integer representing parts of a second e.g. 1000 for millisecond' + ) + + rate = transformers['rate'](column_name, transformers, **modifiers) + + def temporal_percent(value, *_, **kwargs): + rate(total_time_to_temporal_percent(value, scale=scale), **kwargs) + + return temporal_percent + + +def get_match(column_name, transformers, **modifiers): + # Do work in a separate function to avoid having to `del` a bunch of variables + compiled_items = _compile_match_items(transformers, modifiers) + + def match(value, row, *_, **kwargs): + if value in compiled_items: + source, transformer = compiled_items[value] + transformer(row[source], **kwargs) + + return match + + +TRANSFORMERS = { + 'temporal_percent': get_temporal_percent, + 'monotonic_gauge': get_monotonic_gauge, + 'tag': get_tag, + 'match': get_match, +} + + +def _compile_match_items(transformers, modifiers): + items = modifiers.pop('items', None) + if items is None: + raise ValueError('the `items` parameter is required') + + if not isinstance(items, dict): + raise ValueError('the `items` parameter must be a mapping') + + global_transform_source = modifiers.pop('source', None) + + compiled_items = {} + for item, data in items.items(): + if not isinstance(data, dict): + raise ValueError('item `{}` is not a mapping'.format(item)) + + transform_name = data.pop('name', None) + if not transform_name: + raise ValueError('the `name` parameter for item `{}` is required'.format(item)) + elif not isinstance(transform_name, str): + raise ValueError('the `name` parameter for item `{}` must be a string'.format(item)) + + transform_type = data.pop('type', None) + if not transform_type: + raise ValueError('the `type` parameter for item `{}` is required'.format(item)) + elif not isinstance(transform_type, str): + raise ValueError('the `type` parameter for item `{}` must be a string'.format(item)) + elif transform_type not in transformers: + raise ValueError('unknown type `{}` for item `{}`'.format(transform_type, item)) + + transform_source = data.pop('source', global_transform_source) + if not transform_source: + raise ValueError('the `source` parameter for item `{}` is required'.format(item)) + elif not isinstance(transform_source, str): + raise ValueError('the `source` parameter for item `{}` must be a string'.format(item)) + + transform_modifiers = modifiers.copy() + transform_modifiers.update(data) + compiled_items[item] = ( + transform_source, + transformers[transform_type](transform_name, transformers, **transform_modifiers), + ) + + return compiled_items diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py new file mode 100644 index 0000000000000..dc8a8b8b97357 --- /dev/null +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -0,0 +1,15 @@ +# (C) Datadog, Inc. 2019 +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +SUBMISSION_METHODS = {'gauge', 'count', 'monotonic_count', 'rate', 'histogram', 'historate'} + + +def create_submission_transformer(submit_method): + def get_transformer(name, _, **modifiers): + def transformer(value, *_, **kwargs): + kwargs.update(modifiers) + submit_method(name, value, **kwargs) + + return transformer + + return get_transformer diff --git a/datadog_checks_base/tests/test_db.py b/datadog_checks_base/tests/test_db.py new file mode 100644 index 0000000000000..faeaec24450f8 --- /dev/null +++ b/datadog_checks_base/tests/test_db.py @@ -0,0 +1,977 @@ +# (C) Datadog, Inc. 2019 +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import logging + +import pytest + +from datadog_checks.base import AgentCheck +from datadog_checks.base.stubs.aggregator import AggregatorStub +from datadog_checks.base.utils.db import Query, QueryManager + +pytestmark = pytest.mark.db + + +def mock_executor(result=()): + def executor(_): + return result + + return executor + + +def create_query_manager(*args, **kwargs): + executor = kwargs.pop('executor', None) + if executor is None: + executor = mock_executor() + + check = kwargs.pop('check', None) or AgentCheck('test', {}, [{}]) + return QueryManager(check, executor, [Query(arg) for arg in args], **kwargs) + + +class TestQueryResultIteration: + def test_executor_empty_result(self): + query_manager = create_query_manager() + + assert list(query_manager.execute_query('foo')) == [] + + def test_executor_no_result(self): + query_manager = create_query_manager(executor=mock_executor(None)) + + assert list(query_manager.execute_query('foo')) == [] + + def test_executor_trigger_result(self): + class Result(object): + def __init__(self, _): + pass + + def __iter__(self): + raise ValueError('no result set') + + query_manager = create_query_manager(executor=Result) + + with pytest.raises(ValueError, match='^no result set$'): + query_manager.execute_query('foo') + + def test_executor_expected_result(self): + query_manager = create_query_manager(executor=mock_executor([[0, 1], [1, 2], [2, 3]])) + + assert list(query_manager.execute_query('foo')) == [[0, 1], [1, 2], [2, 3]] + + def test_executor_expected_result_generator(self): + class Result(object): + def __init__(self, _): + pass + + def __iter__(self): + for i in range(3): + yield [i, i + 1] + + query_manager = create_query_manager(executor=Result) + + assert list(query_manager.execute_query('foo')) == [[0, 1], [1, 2], [2, 3]] + + +class TestQueryCompilation: + def test_no_query_name(self): + query_manager = create_query_manager({}) + + with pytest.raises(ValueError, match='^query field `name` is required$'): + query_manager.compile_queries() + + def test_query_name_not_string(self): + query_manager = create_query_manager({'name': 5}) + + with pytest.raises(ValueError, match='^query field `name` must be a string$'): + query_manager.compile_queries() + + def test_no_query(self): + query_manager = create_query_manager({'name': 'test query'}) + + with pytest.raises(ValueError, match='^field `query` for test query is required$'): + query_manager.compile_queries() + + def test_query_not_string(self): + query_manager = create_query_manager({'name': 'test query', 'query': 5}) + + with pytest.raises(ValueError, match='^field `query` for test query must be a string$'): + query_manager.compile_queries() + + def test_no_columns(self): + query_manager = create_query_manager({'name': 'test query', 'query': 'foo'}) + + with pytest.raises(ValueError, match='^field `columns` for test query is required$'): + query_manager.compile_queries() + + def test_columns_not_list(self): + query_manager = create_query_manager({'name': 'test query', 'query': 'foo', 'columns': 'bar'}) + + with pytest.raises(ValueError, match='^field `columns` for test query must be a list$'): + query_manager.compile_queries() + + def test_tags_not_list(self): + query_manager = create_query_manager( + {'name': 'test query', 'query': 'foo', 'columns': [{}], 'tags': 'test:bar'} + ) + + with pytest.raises(ValueError, match='^field `tags` for test query must be a list$'): + query_manager.compile_queries() + + def test_column_not_dict(self): + query_manager = create_query_manager( + {'name': 'test query', 'query': 'foo', 'columns': [['column']], 'tags': ['test:bar']} + ) + + with pytest.raises(ValueError, match='^column #1 of test query is not a mapping$'): + query_manager.compile_queries() + + def test_column_no_name(self): + query_manager = create_query_manager( + {'name': 'test query', 'query': 'foo', 'columns': [{}, {'foo': 'bar'}], 'tags': ['test:bar']}, + ) + + with pytest.raises(ValueError, match='^field `name` for column #2 of test query is required$'): + query_manager.compile_queries() + + def test_column_name_not_string(self): + query_manager = create_query_manager( + {'name': 'test query', 'query': 'foo', 'columns': [{'name': 5}], 'tags': ['test:bar']} + ) + + with pytest.raises(ValueError, match='^field `name` for column #1 of test query must be a string$'): + query_manager.compile_queries() + + def test_column_no_type(self): + query_manager = create_query_manager( + {'name': 'test query', 'query': 'foo', 'columns': [{'name': 'test.foo'}], 'tags': ['test:bar']}, + ) + + with pytest.raises(ValueError, match='^field `type` for column test.foo of test query is required$'): + query_manager.compile_queries() + + def test_column_type_not_string(self): + query_manager = create_query_manager( + {'name': 'test query', 'query': 'foo', 'columns': [{'name': 'test.foo', 'type': 5}], 'tags': ['test:bar']} + ) + + with pytest.raises(ValueError, match='^field `type` for column test.foo of test query must be a string$'): + query_manager.compile_queries() + + def test_column_type_source_ok(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'source'}], + 'tags': ['test:bar'], + } + ) + + query_manager.compile_queries() + + def test_column_type_unknown(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'something'}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises(ValueError, match='^unknown type `something` for column test.foo of test query$'): + query_manager.compile_queries() + + def test_compilation_idempotent(self): + query_manager = create_query_manager( + {'name': 'test query', 'query': 'foo', 'columns': [{}], 'tags': ['test:bar']} + ) + + query_manager.compile_queries() + query_manager.compile_queries() + + +class TestTransformerCompilation: + def test_temporal_percent_no_scale(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'temporal_percent'}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `temporal_percent` for column test.foo of test query: ' + 'the `scale` parameter is required$' + ), + ): + query_manager.compile_queries() + + def test_temporal_percent_unknown_scale(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'temporal_percent', 'scale': 'bar'}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `temporal_percent` for column test.foo of test query: ' + 'the `scale` parameter must be one of: ' + ), + ): + query_manager.compile_queries() + + def test_temporal_percent_scale_not_int(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'temporal_percent', 'scale': 1.23}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `temporal_percent` for column test.foo of test query: ' + 'the `scale` parameter must be an integer representing parts of a second e.g. 1000 for millisecond$' + ), + ): + query_manager.compile_queries() + + def test_match_no_items(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'match'}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match='^error compiling type `match` for column test.foo of test query: the `items` parameter is required$', + ): + query_manager.compile_queries() + + def test_match_items_not_dict(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'match', 'items': []}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `match` for column test.foo of test query: ' + 'the `items` parameter must be a mapping$' + ), + ): + query_manager.compile_queries() + + def test_match_item_not_dict(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'match', 'items': {'foo': 'bar'}}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match='^error compiling type `match` for column test.foo of test query: item `foo` is not a mapping$', + ): + query_manager.compile_queries() + + def test_match_item_no_name(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'match', 'items': {'foo': {}}}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `match` for column test.foo of test query: ' + 'the `name` parameter for item `foo` is required$' + ), + ): + query_manager.compile_queries() + + def test_match_item_name_not_string(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'match', 'items': {'foo': {'name': 7}}}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `match` for column test.foo of test query: ' + 'the `name` parameter for item `foo` must be a string$' + ), + ): + query_manager.compile_queries() + + def test_match_item_no_type(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'match', 'items': {'foo': {'name': 'test.foo'}}}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `match` for column test.foo of test query: ' + 'the `type` parameter for item `foo` is required$' + ), + ): + query_manager.compile_queries() + + def test_match_item_type_not_string(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'match', 'items': {'foo': {'name': 'test.foo', 'type': 7}}}], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `match` for column test.foo of test query: ' + 'the `type` parameter for item `foo` must be a string$' + ), + ): + query_manager.compile_queries() + + def test_match_item_type_unknown(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [ + {'name': 'test.foo', 'type': 'match', 'items': {'foo': {'name': 'test.foo', 'type': 'unknown'}}} + ], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `match` for column test.foo of test query: ' + 'unknown type `unknown` for item `foo`$' + ), + ): + query_manager.compile_queries() + + def test_match_item_no_source(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [ + {'name': 'test.foo', 'type': 'match', 'items': {'foo': {'name': 'test.foo', 'type': 'gauge'}}} + ], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `match` for column test.foo of test query: ' + 'the `source` parameter for item `foo` is required$' + ), + ): + query_manager.compile_queries() + + def test_match_item_source_not_string(self): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [ + { + 'name': 'test.foo', + 'type': 'match', + 'items': {'foo': {'name': 'test.foo', 'type': 'gauge', 'source': 7}}, + } + ], + 'tags': ['test:bar'], + } + ) + + with pytest.raises( + ValueError, + match=( + '^error compiling type `match` for column test.foo of test query: ' + 'the `source` parameter for item `foo` must be a string$' + ), + ): + query_manager.compile_queries() + + +class TestSubmission: + @pytest.mark.parametrize( + 'metric_type_name, metric_type_id', + [item for item in AggregatorStub.METRIC_ENUM_MAP.items() if item[0] != 'counter'], + ids=[metric_type for metric_type in AggregatorStub.METRIC_ENUM_MAP if metric_type != 'counter'], + ) + def test_basic(self, metric_type_name, metric_type_id, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'level', 'type': 'tag'}, None, {'name': 'test.foo', 'type': metric_type_name}], + 'tags': ['test:bar'], + }, + executor=mock_executor([['over', 'stuff', 9000]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric( + 'test.foo', 9000, metric_type=metric_type_id, tags=['test:foo', 'test:bar', 'level:over'] + ) + aggregator.assert_all_metrics_covered() + + def test_aggregation(self, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'count'}, {'name': 'tag', 'type': 'tag'}], + 'tags': ['test:bar'], + }, + executor=mock_executor([[3, 'tag1'], [7, 'tag2'], [5, 'tag1']]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric('test.foo', 8, metric_type=aggregator.COUNT, tags=['test:foo', 'test:bar', 'tag:tag1']) + aggregator.assert_metric('test.foo', 7, metric_type=aggregator.COUNT, tags=['test:foo', 'test:bar', 'tag:tag2']) + aggregator.assert_all_metrics_covered() + + def test_kwarg_passing(self, aggregator): + class MyCheck(AgentCheck): + __NAMESPACE__ = 'test_check' + + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [ + {'name': 'test.foo', 'type': 'gauge', 'tags': ['override:ok']}, + {'name': 'test.foo', 'type': 'gauge', 'raw': True}, + ], + 'tags': ['test:bar'], + }, + check=MyCheck('test', {}, [{}]), + executor=mock_executor([[1, 2]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric('test_check.test.foo', 1, metric_type=aggregator.GAUGE, tags=['override:ok']) + aggregator.assert_metric('test.foo', 2, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar']) + aggregator.assert_all_metrics_covered() + + def test_query_execution_error(self, caplog, aggregator): + class Result(object): + def __init__(self, _): + pass + + def __iter__(self): + raise ValueError('no result set') + + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'gauge'}], + 'tags': ['test:bar'], + }, + executor=Result, + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + expected_message = 'Error querying test query: no result set' + matches = [level for _, level, message in caplog.record_tuples if message == expected_message] + + assert len(matches) == 1, 'Expected log with message: {}'.format(expected_message) + assert matches[0] == logging.ERROR + + aggregator.assert_all_metrics_covered() + + def test_query_execution_error_with_handler(self, caplog, aggregator): + class Result(object): + def __init__(self, _): + pass + + def __iter__(self): + raise ValueError('no result set') + + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'gauge'}], + 'tags': ['test:bar'], + }, + executor=Result, + tags=['test:foo'], + error_handler=lambda s: s.replace('re', 'in'), + ) + query_manager.compile_queries() + query_manager.execute() + + expected_message = 'Error querying test query: no insult set' + matches = [level for _, level, message in caplog.record_tuples if message == expected_message] + + assert len(matches) == 1, 'Expected log with message: {}'.format(expected_message) + assert matches[0] == logging.ERROR + + aggregator.assert_all_metrics_covered() + + def test_no_result(self, caplog, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'gauge'}], + 'tags': ['test:bar'], + }, + executor=mock_executor([[]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + + with caplog.at_level(logging.DEBUG): + query_manager.execute() + + expected_message = 'Query test query returned an empty result' + matches = [level for _, level, message in caplog.record_tuples if message == expected_message] + + assert len(matches) == 1, 'Expected log with message: {}'.format(expected_message) + assert matches[0] == logging.DEBUG + + aggregator.assert_all_metrics_covered() + + def test_result_length_mismatch(self, caplog, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test.foo', 'type': 'gauge'}, {'name': 'test.bar', 'type': 'gauge'}], + 'tags': ['test:bar'], + }, + executor=mock_executor([[1, 2, 3]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + expected_message = 'Query test query expected 2 columns, got 3' + matches = [level for _, level, message in caplog.record_tuples if message == expected_message] + + assert len(matches) == 1, 'Expected log with message: {}'.format(expected_message) + assert matches[0] == logging.ERROR + + aggregator.assert_all_metrics_covered() + + +class TestTransformers: + def test_tag_boolean(self, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [ + {'name': 'affirmative', 'type': 'tag', 'boolean': True}, + {'name': 'test.foo', 'type': 'gauge'}, + ], + 'tags': ['test:bar'], + }, + executor=mock_executor([[1, 5], [0, 7]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric( + 'test.foo', 5, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar', 'affirmative:true'] + ) + aggregator.assert_metric( + 'test.foo', 7, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar', 'affirmative:false'] + ) + aggregator.assert_all_metrics_covered() + + def test_monotonic_gauge(self, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [{'name': 'test', 'type': 'tag'}, {'name': 'test.foo', 'type': 'monotonic_gauge'}], + 'tags': ['test:bar'], + }, + executor=mock_executor([['tag1', 5], ['tag2', 7]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric( + 'test.foo.total', 5, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar', 'test:tag1'] + ) + aggregator.assert_metric( + 'test.foo.count', 5, metric_type=aggregator.MONOTONIC_COUNT, tags=['test:foo', 'test:bar', 'test:tag1'] + ) + aggregator.assert_metric( + 'test.foo.total', 7, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar', 'test:tag2'] + ) + aggregator.assert_metric( + 'test.foo.count', 7, metric_type=aggregator.MONOTONIC_COUNT, tags=['test:foo', 'test:bar', 'test:tag2'] + ) + aggregator.assert_all_metrics_covered() + + def test_temporal_percent_named(self, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [ + {'name': 'test', 'type': 'tag'}, + {'name': 'test.foo', 'type': 'temporal_percent', 'scale': 'second'}, + ], + 'tags': ['test:bar'], + }, + executor=mock_executor([['tag1', 5]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric( + 'test.foo', 500, metric_type=aggregator.RATE, tags=['test:foo', 'test:bar', 'test:tag1'] + ) + aggregator.assert_all_metrics_covered() + + def test_temporal_percent_int(self, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [ + {'name': 'test', 'type': 'tag'}, + {'name': 'test.foo', 'type': 'temporal_percent', 'scale': 1}, + ], + 'tags': ['test:bar'], + }, + executor=mock_executor([['tag1', 5]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric( + 'test.foo', 500, metric_type=aggregator.RATE, tags=['test:foo', 'test:bar', 'test:tag1'] + ) + aggregator.assert_all_metrics_covered() + + def test_match_global(self, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [ + { + 'name': 'columnar', + 'type': 'match', + 'items': { + 'global': {'name': 'test.global', 'type': 'gauge'}, + 'local': {'name': 'test.local', 'type': 'gauge'}, + }, + 'source': 'test1', + }, + {'name': 'test1', 'type': 'source'}, + {'name': 'test2', 'type': 'source'}, + ], + 'tags': ['test:bar'], + }, + executor=mock_executor([['global', 5, 7]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric('test.global', 5, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar']) + aggregator.assert_all_metrics_covered() + + def test_match_local(self, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [ + { + 'name': 'columnar', + 'type': 'match', + 'items': { + 'global': {'name': 'test.global', 'type': 'gauge'}, + 'local': {'name': 'test.local', 'type': 'gauge', 'source': 'test2'}, + }, + 'source': 'test1', + }, + {'name': 'test1', 'type': 'source'}, + {'name': 'test2', 'type': 'source'}, + ], + 'tags': ['test:bar'], + }, + executor=mock_executor([['local', 5, 7]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric('test.local', 7, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar']) + aggregator.assert_all_metrics_covered() + + def test_match_none(self, aggregator): + query_manager = create_query_manager( + { + 'name': 'test query', + 'query': 'foo', + 'columns': [ + { + 'name': 'columnar', + 'type': 'match', + 'items': { + 'global': {'name': 'test.global', 'type': 'gauge'}, + 'local': {'name': 'test.local', 'type': 'gauge', 'source': 'test2'}, + }, + 'source': 'test1', + }, + {'name': 'test1', 'type': 'source'}, + {'name': 'test2', 'type': 'source'}, + ], + 'tags': ['test:bar'], + }, + executor=mock_executor([['nonlocal', 5, 7]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_all_metrics_covered() + + +class TestCustomQueries: + def test_instance(self, aggregator): + query_manager = create_query_manager( + check=AgentCheck( + 'test', + {}, + [ + { + 'custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.foo', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + ], + ), + executor=mock_executor([[1]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric('test.foo', 1, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar']) + aggregator.assert_all_metrics_covered() + + def test_init_config(self, aggregator): + query_manager = create_query_manager( + check=AgentCheck( + 'test', + { + 'global_custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.foo', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + [{}], + ), + executor=mock_executor([[1]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric('test.foo', 1, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar']) + aggregator.assert_all_metrics_covered() + + def test_instance_override(self, aggregator): + query_manager = create_query_manager( + check=AgentCheck( + 'test', + { + 'global_custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.foo', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + [ + { + 'custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.bar', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + ], + ), + executor=mock_executor([[1]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric('test.bar', 1, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar']) + aggregator.assert_all_metrics_covered() + + def test_global_include(self, aggregator): + query_manager = create_query_manager( + check=AgentCheck( + 'test', + { + 'global_custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.foo', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + [ + { + 'use_global_custom_queries': 'extend', + 'custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.bar', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + ], + ), + executor=mock_executor([[1]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric('test.foo', 1, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar']) + aggregator.assert_metric('test.bar', 1, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar']) + aggregator.assert_all_metrics_covered() + + def test_global_exclude(self, aggregator): + query_manager = create_query_manager( + check=AgentCheck( + 'test', + { + 'global_custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.foo', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + [ + { + 'use_global_custom_queries': False, + 'custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.bar', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + ], + ), + executor=mock_executor([[1]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric('test.bar', 1, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar']) + aggregator.assert_all_metrics_covered() + + def test_deduplication(self, aggregator): + query_manager = create_query_manager( + check=AgentCheck( + 'test', + { + 'global_custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.foo', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + [ + { + 'use_global_custom_queries': 'extend', + 'custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.foo', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + ], + ), + executor=mock_executor([[1]]), + tags=['test:foo'], + ) + query_manager.compile_queries() + query_manager.execute() + + aggregator.assert_metric('test.foo', 1, count=1, metric_type=aggregator.GAUGE, tags=['test:foo', 'test:bar']) + aggregator.assert_all_metrics_covered() + + def test_default_name(self): + query_manager = create_query_manager( + check=AgentCheck( + 'test', + { + 'global_custom_queries': [ + {'query': 'foo', 'columns': [{'name': 'test.foo', 'type': 'gauge'}], 'tags': ['test:bar']}, + ], + }, + [ + { + 'use_global_custom_queries': 'extend', + 'custom_queries': [{'columns': [{'name': 'test.bar', 'type': 'gauge'}], 'tags': ['test:bar']}], + }, + ], + ), + executor=mock_executor([[1]]), + tags=['test:foo'], + ) + + with pytest.raises(ValueError, match='^field `query` for custom query #1 is required$'): + query_manager.compile_queries()