Skip to content

Commit

Permalink
decouple DBM query metrics interval from check run interval
Browse files Browse the repository at this point in the history
* decouple the DBM metrics collection interval from the check run interval
* set default DBM metrics collection interval to 10s
* change `statement_samples.collections_per_second` to `statement_samples.collection_interval` so it matches the new `statement_metrics.collection_interval` key

Depends on #9656

Motivation: Being able to configure the DBM metrics collection interval separately from the check run interval enables us to use a 10 second interval (by default) for the query metrics. There are various difficulties when querying metrics that have a 15 second interval (i.e. ensuring a correct rollup window for varying time ranges) that don't exist with a 10 second interval.
  • Loading branch information
djova committed Jul 8, 2021
1 parent b9849ce commit 531a7f2
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 216 deletions.
1 change: 1 addition & 0 deletions postgres/datadog_checks/postgres/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(self, instance):
# statement samples & execution plans
self.pg_stat_activity_view = instance.get('pg_stat_activity_view', 'pg_stat_activity')
self.statement_samples_config = instance.get('statement_samples', {}) or {}
self.statement_metrics_config = instance.get('statement_metrics', {}) or {}

def _build_tags(self, custom_tags):
# Clean up tags in case there was a None entry in the instance
Expand Down
70 changes: 65 additions & 5 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import copy
import threading
from contextlib import closing

import psycopg2
Expand All @@ -19,6 +20,9 @@

MAX_CUSTOM_RESULTS = 100

TRACK_ACTIVITY_QUERY_SIZE_QUERY = "SELECT setting FROM pg_settings WHERE name='track_activity_query_size'"
TRACK_ACTIVITY_QUERY_SIZE_UNKNOWN_VALUE = -1


class PostgreSql(AgentCheck):
"""Collects per-database, and optionally per-relation metrics, custom metrics"""
Expand All @@ -41,14 +45,21 @@ def __init__(self, name, init_config, instances):
)
self._config = PostgresConfig(self.instance)
self.metrics_cache = PostgresMetricsCache(self._config)
self.statement_metrics = PostgresStatementMetrics(self, self._config)
self.statement_samples = PostgresStatementSamples(self, self._config)
self.statement_metrics = PostgresStatementMetrics(self, self._config, shutdown_callback=self._close_db_pool)
self.statement_samples = PostgresStatementSamples(self, self._config, shutdown_callback=self._close_db_pool)
self._relations_manager = RelationsManager(self._config.relations)
self._clean_state()
self.check_initializations.append(lambda: RelationsManager.validate_relations_config(self._config.relations))
# The value is loaded when connecting to the main database
self._db_configured_track_activity_query_size = TRACK_ACTIVITY_QUERY_SIZE_UNKNOWN_VALUE

# map[dbname -> psycopg connection]
self._db_pool = {}
self._db_pool_lock = threading.Lock()

def cancel(self):
self.statement_samples.cancel()
self.statement_metrics.cancel()

def _clean_state(self):
self.log.debug("Cleaning state")
Expand Down Expand Up @@ -290,6 +301,55 @@ def _connect(self):
else:
self.db = self._new_connection(self._config.dbname)

# Reload the track_activity_query_size setting on a new connection to the main db
def _load_query_max_text_size(self, db):
try:
with db.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
self.log.debug("Running query [%s]", TRACK_ACTIVITY_QUERY_SIZE_QUERY)
cursor.execute(TRACK_ACTIVITY_QUERY_SIZE_QUERY)
row = cursor.fetchone()
self._db_configured_track_activity_query_size = int(row['setting'])
except (psycopg2.DatabaseError, psycopg2.OperationalError) as e:
self.log.warning("cannot read track_activity_query_size from pg_settings: %s", repr(e))
self._check.count(
"dd.postgres.statement_samples.error",
1,
tags=self._tags + ["error:load-track-activity-query-size"],
)

def _get_db(self, dbname):
"""
Returns a memoized psycopg2 connection to `dbname` with autocommit
Threadsafe as long as no transactions are used
:param dbname:
:return: a psycopg2 connection
"""
# TODO: migrate the rest of this check to use a connection from this pool
with self._db_pool_lock:
db = self._db_pool.get(dbname)
if not db or db.closed:
self.log.debug("initializing connection to dbname=%s", dbname)
db = self._new_connection(dbname)
db.set_session(autocommit=True)
self._db_pool[dbname] = db
if db.status != psycopg2.extensions.STATUS_READY:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
db.rollback()
if self._config.dbname == dbname:
self._load_query_max_text_size(db)
return db

def _close_db_pool(self):
# TODO: add automatic aging out of connections after some time
with self._db_pool_lock:
for dbname, db in self._db_pool.items():
if db and not db.closed:
try:
db.close()
except Exception:
self._log.exception("failed to close DB connection for db=%s", dbname)
self._db_pool[dbname] = None

def _collect_custom_queries(self, tags):
"""
Given a list of custom_queries, execute each query and parse the result for metrics
Expand Down Expand Up @@ -398,11 +458,11 @@ def check(self, _):
self._collect_stats(tags)
self._collect_custom_queries(tags)
if self._config.dbm_enabled:
self.statement_metrics.collect_per_statement_metrics(self.db, self.version, tags)
self.statement_samples.run_sampler(tags)
self.statement_metrics.run_job_loop(tags)
self.statement_samples.run_job_loop(tags)

except Exception as e:
self.log.error("Unable to collect postgres metrics.")
self.log.exception("Unable to collect postgres metrics.")
self._clean_state()
self.db = None
message = u'Error establishing connection to postgres://{}:{}/{}, error is {}'.format(
Expand Down
Loading

0 comments on commit 531a7f2

Please sign in to comment.