diff --git a/datadog_checks_base/datadog_checks/base/data/agent_requirements.in b/datadog_checks_base/datadog_checks/base/data/agent_requirements.in index 15899f03d3a26..b65e784f86ae2 100644 --- a/datadog_checks_base/datadog_checks/base/data/agent_requirements.in +++ b/datadog_checks_base/datadog_checks/base/data/agent_requirements.in @@ -7,6 +7,7 @@ binary==1.0.0 boto3==1.10.27 boto==2.46.1 botocore==1.13.42 +cachetools==3.1.1 clickhouse-cityhash==1.0.2.3 clickhouse-driver==0.1.5 contextlib2==0.6.0; python_version < "3.0" diff --git a/datadog_checks_base/datadog_checks/base/log.py b/datadog_checks_base/datadog_checks/base/log.py index c0fe3e2c33e54..760c758f17681 100644 --- a/datadog_checks_base/datadog_checks/base/log.py +++ b/datadog_checks_base/datadog_checks/base/log.py @@ -66,6 +66,12 @@ def trace(self, msg, *args, **kwargs): def warn(self, msg, *args, **kwargs): self.log(logging.WARNING, msg, *args, **kwargs) + def getEffectiveLevel(self): + """ + Get the effective level for the underlying logger. + """ + return self.logger.getEffectiveLevel() + class CheckLogFormatter(logging.Formatter): def format(self, record): diff --git a/datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py b/datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py index c9ab622859c1e..6e86a4a76ee0f 100644 --- a/datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py +++ b/datadog_checks_base/datadog_checks/base/stubs/datadog_agent.py @@ -1,6 +1,7 @@ # (C) Datadog, Inc. 2018-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import re class DatadogAgentStub(object): @@ -65,7 +66,11 @@ def read_persistent_cache(self, key): return self._cache.get(key, '') def obfuscate_sql(self, query): - return query + # this is only whitespace cleanup, NOT obfuscation + return re.sub(r'\s+', ' ', query or '') + + def obfuscate_sql_exec_plan(self, plan, normalize=False): + return plan # Use the stub as a singleton diff --git a/datadog_checks_base/datadog_checks/base/utils/db/sql.py b/datadog_checks_base/datadog_checks/base/utils/db/sql.py index ec338e5e623e8..0be6e17595d82 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/sql.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/sql.py @@ -4,11 +4,15 @@ # Licensed under a 3-clause BSD style license (see LICENSE) from __future__ import unicode_literals +import json + import mmh3 # Unicode character "Arabic Decimal Separator" (U+066B) is a character which looks like an ascii # comma, but is not treated like a comma when parsing metrics tags. This is used to replace # commas so that tags which have commas in them (such as SQL queries) properly display. + + ARABIC_DECIMAL_SEPARATOR = ',' @@ -40,3 +44,14 @@ def normalize_query_tag(query): """ query = query.replace(', ', '{} '.format(ARABIC_DECIMAL_SEPARATOR)).replace(',', ARABIC_DECIMAL_SEPARATOR) return query + + +def compute_exec_plan_signature(normalized_json_plan): + """ + Given an already normalized json string query execution plan, generate its 64-bit hex signature. + TODO: try to push this logic into the agent go code to avoid the two extra json serialization steps here + """ + if not normalized_json_plan: + return None + with_sorted_keys = json.dumps(json.loads(normalized_json_plan), sort_keys=True) + return format(mmh3.hash64(with_sorted_keys, signed=False)[0], 'x') diff --git a/datadog_checks_base/datadog_checks/base/utils/db/statement_samples.py b/datadog_checks_base/datadog_checks/base/utils/db/statement_samples.py new file mode 100644 index 0000000000000..b938f886c1e3b --- /dev/null +++ b/datadog_checks_base/datadog_checks/base/utils/db/statement_samples.py @@ -0,0 +1,134 @@ +import datetime +import decimal +import itertools +import json +import logging + +import requests +from requests.adapters import HTTPAdapter, Retry + +try: + import datadog_agent + + using_stub_datadog_agent = False +except ImportError: + from ....stubs import datadog_agent + + using_stub_datadog_agent = True + +logger = logging.getLogger(__file__) + + +class EventEncoder(json.JSONEncoder): + def default(self, o): + if isinstance(o, decimal.Decimal): + return float(o) + if isinstance(o, (datetime.date, datetime.datetime)): + return o.isoformat() + return super(EventEncoder, self).default(o) + + +def _chunks(items, n): + it = iter(items) + while True: + chunk = tuple(itertools.islice(it, n)) + if not chunk: + return + yield chunk + + +def _new_api_session(api_key): + http = requests.Session() + http.mount( + "https://", HTTPAdapter(max_retries=Retry(connect=2, read=2, redirect=2, status=2, method_whitelist=['POST'])) + ) + http.headers.update({'DD-API-KEY': api_key}) + return http + + +def _event_intake_url(host): + if host.endswith("."): + host = host[:-1] + if not host.startswith("https://"): + host = "https://" + host + return host + "/v1/input" + + +default_dbm_url = "dbquery-http-intake.logs.datadoghq.com" + + +def _load_event_endpoints_from_config(config_prefix, default_url): + """ + Returns a list of requests sessions and their endpoint urls [(http, url), ...] + Requests sessions are initialized the first time this is called and reused thereafter + :return: list of (http, url) + + :param config_prefix: + :param default_url: + :return: + """ + url = _event_intake_url(datadog_agent.get_config('{}.dd_url'.format(config_prefix)) or default_url) + endpoints = [(_new_api_session(datadog_agent.get_config('api_key')), url)] + logger.debug("initializing event endpoints from %s. url=%s", config_prefix, url) + + for additional_endpoint in datadog_agent.get_config('{}.additional_endpoints'.format(config_prefix)) or []: + api_key, host = additional_endpoint.get('api_key'), additional_endpoint.get('host') + missing_keys = [k for k, v in [('api_key', api_key), ('host', host)] if not v] + if missing_keys: + logger.warning( + "invalid event endpoint found in %s.additional_endpoints. missing required keys %s", + config_prefix, + ', '.join(missing_keys), + ) + continue + url = _event_intake_url(host) + endpoints.append((_new_api_session(api_key), url)) + logger.debug("initializing additional event endpoint from %s. url=%s", config_prefix, url) + + return endpoints + + +class StatementSamplesClient: + def __init__(self): + self._endpoints = _load_event_endpoints_from_config("database_monitoring", default_dbm_url) + + def submit_events(self, events): + """ + Submit the statement sample events to the event intake + :return: submitted_count, failed_count + """ + submitted_count = 0 + failed_count = 0 + for chunk in _chunks(events, 100): + for http, url in self._endpoints: + try: + r = http.request( + 'post', + url, + data=json.dumps(chunk, cls=EventEncoder), + timeout=5, + headers={'Content-Type': 'application/json'}, + ) + r.raise_for_status() + logger.debug("Submitted %s statement samples to %s", len(chunk), url) + submitted_count += len(chunk) + except requests.HTTPError as e: + logger.warning("Failed to submit statement samples to %s: %s", url, e) + failed_count += len(chunk) + except Exception: + logger.exception("Failed to submit statement samples to %s", url) + failed_count += len(chunk) + return submitted_count, failed_count + + +class StubStatementSamplesClient: + def __init__(self): + self._events = [] + + def submit_events(self, events): + events = list(events) + self._events.extend(events) + return len(events), 0 + + +statement_samples_client = StubStatementSamplesClient() if using_stub_datadog_agent else StatementSamplesClient() diff --git a/datadog_checks_base/datadog_checks/base/utils/db/utils.py b/datadog_checks_base/datadog_checks/base/utils/db/utils.py index 4f480deb837c9..8d694e0a17776 100644 --- a/datadog_checks_base/datadog_checks/base/utils/db/utils.py +++ b/datadog_checks_base/datadog_checks/base/utils/db/utils.py @@ -1,8 +1,18 @@ # (C) Datadog, Inc. 2019-present # All rights reserved # Licensed under a 3-clause BSD style license (see LICENSE) +import logging +import socket +import time from itertools import chain +try: + import datadog_agent +except ImportError: + from ....stubs import datadog_agent + +logger = logging.getLogger(__file__) + # AgentCheck methods to transformer name e.g. set_metadata -> metadata SUBMISSION_METHODS = { 'gauge': 'gauge', @@ -19,11 +29,9 @@ def create_submission_transformer(submit_method): - # During the compilation phase every transformer will have access to all the others and may be # passed the first arguments (e.g. name) that will be forwarded the actual AgentCheck methods. def get_transformer(_transformers, *creation_args, **modifiers): - # The first argument of every transformer is a map of named references to collected values. def transformer(_sources, *call_args, **kwargs): kwargs.update(modifiers) @@ -52,3 +60,58 @@ def transformer(sources, **kwargs): transformer = column_transformer return transformer + + +class ConstantRateLimiter: + """ + Basic rate limiter that sleeps long enough to ensure the rate limit is not exceeded. Not thread safe. + """ + + def __init__(self, rate_limit_s): + """ + :param rate_limit_s: rate limit in seconds + """ + self.rate_limit_s = rate_limit_s + self.period_s = 1 / rate_limit_s if rate_limit_s > 0 else 0 + self.last_event = 0 + + def sleep(self): + """ + Sleeps long enough to enforce the rate limit + """ + elapsed_s = time.time() - self.last_event + sleep_amount = max(self.period_s - elapsed_s, 0) + time.sleep(sleep_amount) + self.last_event = time.time() + + +def resolve_db_host(db_host): + agent_hostname = datadog_agent.get_hostname() + if not db_host or db_host in {'localhost', '127.0.0.1'}: + return agent_hostname + + try: + host_ip = socket.gethostbyname(db_host) + except socket.gaierror as e: + # could be connecting via a unix domain socket + logger.debug( + "failed to resolve DB host '%s' due to socket.gaierror(%s). falling back to agent hostname: %s", + db_host, + e, + agent_hostname, + ) + return agent_hostname + + try: + agent_host_ip = socket.gethostbyname(agent_hostname) + if agent_host_ip == host_ip: + return agent_hostname + except socket.gaierror as e: + logger.debug( + "failed to resolve agent host '%s' due to socket.gaierror(%s). using DB host: %s", + agent_hostname, + e, + db_host, + ) + + return db_host diff --git a/postgres/datadog_checks/postgres/config.py b/postgres/datadog_checks/postgres/config.py index e86bf71bc9735..444a4bc724cc5 100644 --- a/postgres/datadog_checks/postgres/config.py +++ b/postgres/datadog_checks/postgres/config.py @@ -58,11 +58,15 @@ def __init__(self, instance): self.service_check_tags = self._get_service_check_tags() self.custom_metrics = self._get_custom_metrics(instance.get('custom_metrics', [])) self.max_relations = int(instance.get('max_relations', 300)) + self.min_collection_interval = instance.get('min_collection_interval', 15) # Deep Database monitoring adds additional telemetry for statement metrics self.deep_database_monitoring = is_affirmative(instance.get('deep_database_monitoring', False)) # Support a custom view when datadog user has insufficient privilege to see queries self.pg_stat_statements_view = instance.get('pg_stat_statements_view', 'pg_stat_statements') + # statement samples & execution plans + self.pg_stat_activity_view = instance.get('pg_stat_activity_view', 'pg_stat_activity') + self.statement_samples_config = instance.get('statement_samples', {}) or {} def _build_tags(self, custom_tags): # Clean up tags in case there was a None entry in the instance diff --git a/postgres/datadog_checks/postgres/postgres.py b/postgres/datadog_checks/postgres/postgres.py index 97e93daf6d90e..b6faadcb65421 100644 --- a/postgres/datadog_checks/postgres/postgres.py +++ b/postgres/datadog_checks/postgres/postgres.py @@ -9,6 +9,7 @@ from datadog_checks.base import AgentCheck from datadog_checks.postgres.metrics_cache import PostgresMetricsCache +from datadog_checks.postgres.statement_samples import PostgresStatementSamples from datadog_checks.postgres.statements import PostgresStatementMetrics from .config import PostgresConfig @@ -52,6 +53,7 @@ def __init__(self, name, init_config, instances): self.config = PostgresConfig(self.instance) self.metrics_cache = PostgresMetricsCache(self.config) self.statement_metrics = PostgresStatementMetrics(self.config) + self.statement_samples = PostgresStatementSamples(self, self.config) self._clean_state() def _clean_state(self): @@ -286,6 +288,32 @@ def _collect_stats(self, instance_tags): cursor.close() + def _new_connection(self): + if self.config.host == 'localhost' and self.config.password == '': + # Use ident method + connection_string = "user=%s dbname=%s application_name=%s" % ( + self.config.user, + self.config.dbname, + self.config.application_name, + ) + if self.config.query_timeout: + connection_string += " options='-c statement_timeout=%s'" % self.config.query_timeout + return psycopg2.connect(connection_string) + else: + args = { + 'host': self.config.host, + 'user': self.config.user, + 'password': self.config.password, + 'database': self.config.dbname, + 'sslmode': self.config.ssl_mode, + 'application_name': self.config.application_name, + } + if self.config.port: + args['port'] = self.config.port + if self.config.query_timeout: + args['options'] = '-c statement_timeout=%s' % self.config.query_timeout + return psycopg2.connect(**args) + def _connect(self): """Get and memoize connections to instances""" if self.db and self.db.closed: @@ -297,30 +325,7 @@ def _connect(self): # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that self.db.rollback() else: - if self.config.host == 'localhost' and self.config.password == '': - # Use ident method - connection_string = "user=%s dbname=%s application_name=%s" % ( - self.config.user, - self.config.dbname, - self.config.application_name, - ) - if self.config.query_timeout: - connection_string += " options='-c statement_timeout=%s'" % self.config.query_timeout - self.db = psycopg2.connect(connection_string) - else: - args = { - 'host': self.config.host, - 'user': self.config.user, - 'password': self.config.password, - 'database': self.config.dbname, - 'sslmode': self.config.ssl_mode, - 'application_name': self.config.application_name, - } - if self.config.port: - args['port'] = self.config.port - if self.config.query_timeout: - args['options'] = '-c statement_timeout=%s' % self.config.query_timeout - self.db = psycopg2.connect(**args) + self.db = self._new_connection() def _collect_custom_queries(self, tags): """ @@ -436,6 +441,8 @@ def check(self, _): self._collect_custom_queries(tags) if self.config.deep_database_monitoring: self._collect_per_statement_metrics(tags) + self.statement_samples.run_sampler(tags) + except Exception as e: self.log.error("Unable to collect postgres metrics.") self._clean_state() diff --git a/postgres/datadog_checks/postgres/statement_samples.py b/postgres/datadog_checks/postgres/statement_samples.py new file mode 100644 index 0000000000000..2d1354b83b614 --- /dev/null +++ b/postgres/datadog_checks/postgres/statement_samples.py @@ -0,0 +1,353 @@ +import json +import logging +import os +import re +import time +from concurrent.futures.thread import ThreadPoolExecutor + +import psycopg2 +from cachetools import TTLCache + +try: + import datadog_agent +except ImportError: + from ..stubs import datadog_agent + +from datadog_checks.base import is_affirmative +from datadog_checks.base.log import get_check_logger +from datadog_checks.base.utils.db.sql import compute_exec_plan_signature, compute_sql_signature +from datadog_checks.base.utils.db.statement_samples import statement_samples_client +from datadog_checks.base.utils.db.utils import ConstantRateLimiter, resolve_db_host + +VALID_EXPLAIN_STATEMENTS = frozenset({'select', 'table', 'delete', 'insert', 'replace', 'update'}) + +# columns from pg_stat_activity which correspond to attributes common to all databases and are therefore stored in +# under other standard keys +pg_stat_activity_sample_exclude_keys = { + # we process & obfuscate this separately + 'query', + # stored separately + 'application_name', + 'datname', + 'usename', + 'client_addr', + 'client_hostname', + 'client_port', +} + +# +PG_STAT_ACTIVITY_QUERY = re.sub( + r'\s+', + ' ', + """ + SELECT * FROM {pg_stat_activity_view} + WHERE datname = %s + AND coalesce(TRIM(query), '') != '' + AND query_start IS NOT NULL +""", +).strip() + + +class PostgresStatementSamples(object): + """ + Collects statement samples and execution plans. + """ + + executor = ThreadPoolExecutor() + + def __init__(self, check, config): + self._check = check + self._db = None + self._config = config + self._log = get_check_logger() + self._activity_last_query_start = None + self._last_check_run = 0 + self._collection_loop_future = None + self._tags = None + self._tags_str = None + self._service = "postgres" + self._db_hostname = resolve_db_host(self._config.host) + self._enabled = is_affirmative(self._config.statement_samples_config.get('enabled', False)) + self._run_sync = is_affirmative(self._config.statement_samples_config.get('run_sync', False)) + self._rate_limiter = ConstantRateLimiter( + float(self._config.statement_samples_config.get('collections_per_second', 1)) + ) + self._explain_function = self._config.statement_samples_config.get( + 'explain_function', 'datadog.explain_statement' + ) + + # explained_statements_cache: limit how often we try to re-explain the same query + self._explained_statements_cache = TTLCache( + maxsize=int(self._config.statement_samples_config.get('explained_statements_cache_maxsize', 5000)), + ttl=60 * 60 / int(self._config.statement_samples_config.get('explained_statements_per_hour_per_query', 60)), + ) + + # seen_samples_cache: limit the ingestion rate per (query_signature, plan_signature) + self._seen_samples_cache = TTLCache( + # assuming ~100 bytes per entry (query & plan signature, key hash, 4 pointers (ordered dict), expiry time) + # total size: 10k * 100 = 1 Mb + maxsize=int(self._config.statement_samples_config.get('seen_samples_cache_maxsize', 10000)), + ttl=60 * 60 / int(self._config.statement_samples_config.get('samples_per_hour_per_query', 15)), + ) + + def run_sampler(self, tags): + """ + start the sampler thread if not already running + :param tags: + :return: + """ + if not self._enabled: + self._log.debug("Statement sampler not enabled") + return + self._tags = tags + self._tags_str = ','.join(self._tags) + for t in self._tags: + if t.startswith('service:'): + self._service = t[len('service:') :] + self._last_check_run = time.time() + if self._run_sync or is_affirmative(os.environ.get('DBM_STATEMENT_SAMPLER_RUN_SYNC', "false")): + self._log.debug("Running statement sampler synchronously") + self._collect_statement_samples() + elif self._collection_loop_future is None or not self._collection_loop_future.running(): + self._collection_loop_future = PostgresStatementSamples.executor.submit(self._collection_loop) + else: + self._log.debug("Statement sampler collection loop already running") + + def _get_new_pg_stat_activity(self): + start_time = time.time() + query = PG_STAT_ACTIVITY_QUERY.format(pg_stat_activity_view=self._config.pg_stat_activity_view) + with self._get_db().cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: + params = (self._config.dbname,) + if self._activity_last_query_start: + query = query + " AND query_start > %s" + params = params + (self._activity_last_query_start,) + self._log.debug("Running query [%s] %s", query, params) + cursor.execute(query, params) + rows = cursor.fetchall() + self._check.histogram( + "dd.postgres.get_new_pg_stat_activity.time", (time.time() - start_time) * 1000, tags=self._tags + ) + self._check.histogram("dd.postgres.get_new_pg_stat_activity.rows", len(rows), tags=self._tags) + self._log.debug("Loaded %s rows from %s", len(rows), self._config.pg_stat_activity_view) + return rows + + def _filter_valid_statement_rows(self, rows): + insufficient_privilege_count = 0 + total_count = 0 + for row in rows: + total_count += 1 + if not row['datname']: + continue + query = row['query'] + if not query: + continue + if query == '': + insufficient_privilege_count += 1 + continue + if self._activity_last_query_start is None or row['query_start'] > self._activity_last_query_start: + self._activity_last_query_start = row['query_start'] + yield row + if insufficient_privilege_count > 0: + self._log.warning( + "Insufficient privilege for %s/%s queries when collecting from %s.", self._config.pg_stat_activity_view + ) + self._check.count( + "dd.postgres.statement_samples.error", + insufficient_privilege_count, + tags=self._tags + ["error:insufficient-privilege"], + ) + + def _get_db(self): + # while psycopg2 is threadsafe (meaning in theory we should be able to use the same connection as the parent + # check), the parent doesn't use autocommit and instead calls commit() and rollback() explicitly, meaning + # it can cause strange clashing issues if we're trying to use the same connection from another thread here. + # since the statement sampler runs continuously it's best we have our own connection here with autocommit + # enabled + if not self._db or self._db.closed: + self._db = self._check._new_connection() + self._db.set_session(autocommit=True) + if self._db.status != psycopg2.extensions.STATUS_READY: + # Some transaction went wrong and the connection is in an unhealthy state. Let's fix that + self._db.rollback() + return self._db + + def _collection_loop(self): + try: + self._log.info("Starting statement sampler collection loop") + while True: + if time.time() - self._last_check_run > self._config.min_collection_interval * 2: + self._log.info("Sampler collection loop stopping due to check inactivity") + self._check.count("dd.postgres.statement_samples.collection_loop_inactive_stop", 1, tags=self._tags) + break + self._collect_statement_samples() + except psycopg2.errors.DatabaseError as e: + self._log.warning( + "Statement sampler database error: %s", e, exc_info=self._log.getEffectiveLevel() == logging.DEBUG + ) + self._check.count( + "dd.postgres.statement_samples.error", + 1, + tags=self._tags + ["error:database-{}".format(type(e))], + ) + except Exception as e: + self._log.exception("Statement sampler collection loop crash") + self._check.count( + "dd.postgres.statement_samples.error", + 1, + tags=self._tags + ["error:collection-loop-crash-{}".format(type(e))], + ) + finally: + self.close() + + def close(self): + if self._db and not self._db.closed: + try: + self._db.close() + except Exception: + self._log.exception("failed to close DB connection") + + def _collect_statement_samples(self): + self._rate_limiter.sleep() + start_time = time.time() + rows = self._get_new_pg_stat_activity() + rows = self._filter_valid_statement_rows(rows) + events = self._explain_pg_stat_activity(rows) + submitted_count, failed_count = statement_samples_client.submit_events(events) + elapsed_ms = (time.time() - start_time) * 1000 + self._check.histogram("dd.postgres.collect_statement_samples.time", elapsed_ms, tags=self._tags) + self._check.count( + "dd.postgres.collect_statement_samples.events_submitted.count", submitted_count, tags=self._tags + ) + self._check.count( + "dd.postgres.statement_samples.error", failed_count, tags=self._tags + ["error:submit-events"] + ) + self._check.gauge( + "dd.postgres.collect_statement_samples.seen_samples_cache.len", + len(self._seen_samples_cache), + tags=self._tags, + ) + self._check.gauge( + "dd.postgres.collect_statement_samples.explained_statements_cache.len", + len(self._explained_statements_cache), + tags=self._tags, + ) + + def _can_explain_statement(self, obfuscated_statement): + if obfuscated_statement.startswith('SELECT {}'.format(self._explain_function)): + return False + if obfuscated_statement.startswith('autovacuum:'): + return False + if obfuscated_statement.split(' ', 1)[0].lower() not in VALID_EXPLAIN_STATEMENTS: + return False + return True + + def _run_explain(self, statement, obfuscated_statement): + if not self._can_explain_statement(obfuscated_statement): + return None + with self._get_db().cursor() as cursor: + try: + start_time = time.time() + self._log.debug("Running query: %s(%s)", self._explain_function, obfuscated_statement) + cursor.execute( + """SELECT {explain_function}($stmt${statement}$stmt$)""".format( + explain_function=self._explain_function, statement=statement + ) + ) + result = cursor.fetchone() + self._check.histogram( + "dd.postgres.run_explain.time", (time.time() - start_time) * 1000, tags=self._tags + ) + except psycopg2.errors.UndefinedFunction: + self._log.warning( + "Failed to collect execution plan due to undefined explain_function=%s", + self._explain_function, + ) + self._check.count( + "dd.postgres.statement_samples.error", 1, tags=self._tags + ["error:explain-undefined-function"] + ) + return None + except Exception as e: + self._log.debug("Failed to collect execution plan. query='%s'", obfuscated_statement, exc_info=1) + self._check.count( + "dd.postgres.statement_samples.error", 1, tags=self._tags + ["error:explain-{}".format(type(e))] + ) + return None + if not result or len(result) < 1 or len(result[0]) < 1: + return None + return result[0][0] + + def _collect_plan_for_statement(self, row): + try: + obfuscated_statement = datadog_agent.obfuscate_sql(row['query']) + except Exception as e: + self._log.debug("Failed to obfuscate statement: %s", e) + self._check.count("dd.postgres.statement_samples.error", 1, tags=self._tags + ["error:sql-obfuscate"]) + return None + + # limit the rate of explains done to the database + query_signature = compute_sql_signature(obfuscated_statement) + if query_signature in self._explained_statements_cache: + return None + self._explained_statements_cache[query_signature] = True + + # Plans have several important signatures to tag events with. Note that for postgres, the + # query_signature and resource_hash will be the same value. + # - `plan_signature` - hash computed from the normalized JSON plan to group identical plan trees + # - `resource_hash` - hash computed off the raw sql text to match apm resources + # - `query_signature` - hash computed from the raw sql text to match query metrics + plan_dict = self._run_explain(row['query'], obfuscated_statement) + plan, normalized_plan, obfuscated_plan, plan_signature, plan_cost = None, None, None, None, None + if plan_dict: + plan = json.dumps(plan_dict) + normalized_plan = datadog_agent.obfuscate_sql_exec_plan(plan, normalize=True) + obfuscated_plan = datadog_agent.obfuscate_sql_exec_plan(plan) + plan_signature = compute_exec_plan_signature(normalized_plan) + plan_cost = plan_dict.get('Plan', {}).get('Total Cost', 0.0) or 0.0 + + statement_plan_sig = (query_signature, plan_signature) + if statement_plan_sig not in self._seen_samples_cache: + self._seen_samples_cache[statement_plan_sig] = True + event = { + "host": self._db_hostname, + "service": self._service, + "ddsource": "postgres", + "ddtags": self._tags_str, + "network": { + "client": { + "ip": row.get('client_addr', None), + "port": row.get('client_port', None), + "hostname": row.get('client_hostname', None), + } + }, + "db": { + "instance": row.get('datname', None), + "plan": {"definition": obfuscated_plan, "cost": plan_cost, "signature": plan_signature}, + "query_signature": query_signature, + "resource_hash": query_signature, + "application": row.get('application_name', None), + "user": row['usename'], + "statement": obfuscated_statement, + }, + 'postgres': {k: v for k, v in row.items() if k not in pg_stat_activity_sample_exclude_keys}, + } + if row['state'] in {'idle', 'idle in transaction'}: + if row['state_change'] and row['query_start']: + event['duration'] = (row['state_change'] - row['query_start']).total_seconds() * 1e9 + event['timestamp'] = time.mktime(row['state_change'].timetuple()) * 1000 + else: + event['timestamp'] = time.time() * 1000 + return event + + def _explain_pg_stat_activity(self, rows): + for row in rows: + try: + event = self._collect_plan_for_statement(row) + if event: + yield event + except Exception: + self._log.exception("Crashed trying to collect execution plan for statement") + self._check.count( + "dd.postgres.statement_samples.error", + 1, + tags=self._tags + ["error:collect-plan-for-statement-crash"], + ) diff --git a/postgres/requirements.in b/postgres/requirements.in index 2bf3b334b33d0..c569ac834604e 100644 --- a/postgres/requirements.in +++ b/postgres/requirements.in @@ -1,2 +1,4 @@ +cachetools==3.1.1 psycopg2-binary==2.8.4 semver==2.9.0 +futures==3.3.0; python_version < '3.0' diff --git a/postgres/tests/compose/resources/01_postgres.sql b/postgres/tests/compose/resources/01_postgres.sql index c8916054c52a6..4766c002c067f 100644 --- a/postgres/tests/compose/resources/01_postgres.sql +++ b/postgres/tests/compose/resources/01_postgres.sql @@ -1,5 +1,9 @@ CREATE USER datadog WITH PASSWORD 'datadog'; +CREATE USER bob WITH PASSWORD 'bob'; +CREATE USER dd_admin WITH PASSWORD 'dd_admin'; +ALTER USER dd_admin WITH SUPERUSER; GRANT SELECT ON pg_stat_database TO datadog; CREATE DATABASE datadog_test; GRANT ALL PRIVILEGES ON DATABASE datadog_test TO datadog; CREATE DATABASE dogs; +GRANT USAGE on SCHEMA public to bob; diff --git a/postgres/tests/compose/resources/02_setup.sh b/postgres/tests/compose/resources/02_setup.sh new file mode 100755 index 0000000000000..e889475dd5520 --- /dev/null +++ b/postgres/tests/compose/resources/02_setup.sh @@ -0,0 +1,38 @@ +#!/bin/bash +set -e + +# pg_monitor is only available on 10+ +if [[ !("$PG_MAJOR" == 9.* ) ]]; then +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" datadog_test <<-'EOSQL' + GRANT pg_monitor TO datadog; +EOSQL +fi + +# setup extensions & functions required for collection of statement metrics & samples +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" datadog_test <<-'EOSQL' + CREATE EXTENSION pg_stat_statements SCHEMA public; + GRANT SELECT ON pg_stat_statements TO datadog; + + CREATE SCHEMA datadog; + GRANT USAGE ON SCHEMA datadog TO datadog; + + CREATE OR REPLACE FUNCTION datadog.explain_statement(l_query text, out explain JSON) RETURNS SETOF JSON AS + $$ + BEGIN + RETURN QUERY EXECUTE 'EXPLAIN (FORMAT JSON) ' || l_query; + END; + $$ + LANGUAGE plpgsql + RETURNS NULL ON NULL INPUT + SECURITY DEFINER; + + ALTER FUNCTION datadog.explain_statement(l_query text, out explain json) OWNER TO postgres; + + CREATE OR REPLACE FUNCTION datadog.pg_stat_activity() RETURNS SETOF pg_stat_activity AS + $$ SELECT * FROM pg_catalog.pg_stat_activity; $$ + LANGUAGE sql + SECURITY DEFINER; + + ALTER FUNCTION datadog.pg_stat_activity() owner to postgres; + +EOSQL diff --git a/postgres/tests/compose/resources/02_load_data.sh b/postgres/tests/compose/resources/03_load_data.sh similarity index 94% rename from postgres/tests/compose/resources/02_load_data.sh rename to postgres/tests/compose/resources/03_load_data.sh index db0a7f14a5f86..6297fbdc131c2 100755 --- a/postgres/tests/compose/resources/02_load_data.sh +++ b/postgres/tests/compose/resources/03_load_data.sh @@ -2,8 +2,6 @@ set -e psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" datadog_test <<-EOSQL - CREATE EXTENSION pg_stat_statements SCHEMA public; - GRANT SELECT ON pg_stat_statements TO datadog; CREATE TABLE persons (personid SERIAL, lastname VARCHAR(255), firstname VARCHAR(255), address VARCHAR(255), city VARCHAR(255)); INSERT INTO persons (lastname, firstname, address, city) VALUES ('Cavaille', 'Leo', 'Midtown', 'New York'), ('Someveryveryveryveryveryveryveryveryveryverylongname', 'something', 'Avenue des Champs Elysees', 'Beautiful city of lights'); CREATE TABLE personsdup1 (personid SERIAL, lastname VARCHAR(255), firstname VARCHAR(255), address VARCHAR(255), city VARCHAR(255)); @@ -13,6 +11,7 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" datadog_test <<-EOSQL SELECT * FROM persons; SELECT * FROM persons; SELECT * FROM persons; + GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO bob; EOSQL psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" dogs <<-EOSQL diff --git a/postgres/tests/test_pg_integration.py b/postgres/tests/test_pg_integration.py index b49637690f4df..33e61133ca5c9 100644 --- a/postgres/tests/test_pg_integration.py +++ b/postgres/tests/test_pg_integration.py @@ -1,13 +1,16 @@ # (C) Datadog, Inc. 2010-present # All rights reserved # Licensed under Simplified BSD License (see LICENSE) +import json import socket +import time import mock import psycopg2 import pytest from semver import VersionInfo +from datadog_checks.base.utils.db.statement_samples import statement_samples_client from datadog_checks.postgres import PostgreSql from datadog_checks.postgres.util import PartialFormatter, fmt @@ -265,6 +268,129 @@ def test_statement_metrics(aggregator, integration_check, pg_instance): aggregator.assert_metric(name, count=1, tags=expected_tags) +@pytest.fixture +def bob_conn(): + conn = psycopg2.connect(host=HOST, dbname=DB_NAME, user="bob", password="bob") + yield conn + conn.close() + + +@pytest.fixture +def dbm_instance(pg_instance): + pg_instance['deep_database_monitoring'] = True + pg_instance['min_collection_interval'] = 1 + pg_instance['pg_stat_activity_view'] = "datadog.pg_stat_activity()" + pg_instance['statement_samples'] = {'enabled': True, 'run_sync': True, 'collections_per_second': 1} + return pg_instance + + +@pytest.mark.parametrize("pg_stat_activity_view", ["pg_stat_activity", "datadog.pg_stat_activity()"]) +def test_statement_samples_collect(integration_check, dbm_instance, bob_conn, pg_stat_activity_view): + dbm_instance['pg_stat_activity_view'] = pg_stat_activity_view + check = integration_check(dbm_instance) + check._connect() + # clear out any samples kept from previous runs + statement_samples_client._events = [] + query = "SELECT city FROM persons WHERE city = %s" + # we are able to see the full query (including the raw parameters) in pg_stat_activity because psycopg2 uses + # the simple query protocol, sending the whole query as a plain string to postgres. + # if a client is using the extended query protocol with prepare then the query would appear as + expected_query = query % "'hello'" + # leave bob's connection open until after the check has run to ensure we're able to see the query in + # pg_stat_activity + cursor = bob_conn.cursor() + cursor.execute(query, ("hello",)) + check.check(dbm_instance) + matching = [e for e in statement_samples_client._events if e['db']['statement'] == expected_query] + if POSTGRES_VERSION.split('.')[0] == "9" and pg_stat_activity_view == "pg_stat_activity": + # pg_monitor role exists only in version 10+ + assert len(matching) == 0, "did not expect to catch any events" + return + assert len(matching) > 0, "should have collected an event" + event = matching[0] + assert event['db']['plan']['definition'] is not None, "missing execution plan" + assert 'Plan' in json.loads(event['db']['plan']['definition']), "invalid json execution plan" + # we're expected to get a duration because the connection is in "idle" state + assert event['duration'] + cursor.close() + + +def test_statement_samples_rate_limits(aggregator, integration_check, dbm_instance, bob_conn): + dbm_instance['statement_samples']['run_sync'] = False + # one collection every two seconds + dbm_instance['statement_samples']['collections_per_second'] = 0.5 + check = integration_check(dbm_instance) + check._connect() + # clear out any samples kept from previous runs + statement_samples_client._events = [] + query = "SELECT city FROM persons WHERE city = 'hello'" + # leave bob's connection open until after the check has run to ensure we're able to see the query in + # pg_stat_activity + cursor = bob_conn.cursor() + for _ in range(5): + cursor.execute(query) + check.check(dbm_instance) + time.sleep(1) + cursor.close() + + matching = [e for e in statement_samples_client._events if e['db']['statement'] == query] + assert len(matching) == 1, "should have collected exactly one event due to sample rate limit" + + metrics = aggregator.metrics("dd.postgres.collect_statement_samples.time") + assert 2 < len(metrics) < 6 + + +def test_statement_samples_collection_loop_inactive_stop(aggregator, integration_check, dbm_instance): + dbm_instance['statement_samples']['run_sync'] = False + check = integration_check(dbm_instance) + check._connect() + check.check(dbm_instance) + while check.statement_samples._collection_loop_future.running(): + time.sleep(0.1) + # make sure there were no unhandled exceptions + check.statement_samples._collection_loop_future.result() + aggregator.assert_metric("dd.postgres.statement_samples.collection_loop_inactive_stop") + + +def test_statement_samples_invalid_activity_view(aggregator, integration_check, dbm_instance): + dbm_instance['pg_stat_activity_view'] = "wrong_view" + + # run synchronously, so we expect it to blow up right away + dbm_instance['statement_samples'] = {'enabled': True, 'run_sync': True} + check = integration_check(dbm_instance) + check._connect() + with pytest.raises(psycopg2.errors.UndefinedTable): + check.check(dbm_instance) + + # run asynchronously, loop will crash the first time it tries to run as the table doesn't exist + dbm_instance['statement_samples']['run_sync'] = False + check = integration_check(dbm_instance) + check._connect() + check.check(dbm_instance) + while check.statement_samples._collection_loop_future.running(): + time.sleep(0.1) + # make sure there were no unhandled exceptions + check.statement_samples._collection_loop_future.result() + aggregator.assert_metric_has_tag_prefix("dd.postgres.statement_samples.error", "error:database-") + + +@pytest.mark.parametrize( + "number_key", + [ + "explained_statements_cache_maxsize", + "explained_statements_per_hour_per_query", + "seen_samples_cache_maxsize", + "collections_per_second", + ], +) +def test_statement_samples_config_invalid_number(integration_check, pg_instance, number_key): + pg_instance['statement_samples'] = { + number_key: "not-a-number", + } + with pytest.raises(ValueError): + integration_check(pg_instance) + + def assert_state_clean(check): assert check.metrics_cache.instance_metrics is None assert check.metrics_cache.bgw_metrics is None