Skip to content

Commit

Permalink
postgres statement metrics & samples: collect from all databases on host
Browse files Browse the repository at this point in the history
Update the collection of postgres statement metrics & samples to automatically collect data for all databases on a host.

This means the check now respects the `dbstrict` setting. If `false` (the default), it will collect statement metrics & samples from all databases on the host. If `true` it will only collect this data from the initial database configured in the check config.

For collection of execution plans this means that the statement sampler thread now maintains a collection pool with one connection per database.

Follow-up to #8627

Motivation:

* Simplify configuration for collection of statement metrics, samples & execution plans for users by enabling collection from all databases on a host with only a single configured "check instance." Previously users had to enumerate each database in a host separately.
* Ensure that collection of statement samples & plans respects the `dbstrict` setting
  • Loading branch information
djova committed Apr 29, 2021
1 parent 1d9d859 commit 2f3f9fe
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 123 deletions.
11 changes: 7 additions & 4 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,12 @@ def _collect_stats(self, instance_tags):

cursor.close()

def _new_connection(self):
def _new_connection(self, dbname):
if self._config.host == 'localhost' and self._config.password == '':
# Use ident method
connection_string = "user=%s dbname=%s application_name=%s" % (
self._config.user,
self._config.dbname,
dbname,
self._config.application_name,
)
if self._config.query_timeout:
Expand All @@ -311,7 +311,7 @@ def _new_connection(self):
'host': self._config.host,
'user': self._config.user,
'password': self._config.password,
'database': self._config.dbname,
'database': dbname,
'sslmode': self._config.ssl_mode,
'application_name': self._config.application_name,
}
Expand All @@ -332,7 +332,7 @@ def _connect(self):
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
self.db.rollback()
else:
self.db = self._new_connection()
self.db = self._new_connection(self._config.dbname)

def _collect_custom_queries(self, tags):
"""
Expand Down Expand Up @@ -432,6 +432,9 @@ def _collect_custom_queries(self, tags):

def _collect_per_statement_metrics(self, tags):
metrics = self.statement_metrics.collect_per_statement_metrics(self.db)
# statement metrics add their own db tag based on which database the query came from so we must exclude
# the initial database tag
tags = [t for t in tags if not t.startswith('db:')]
for metric_name, metric_value, metrics_tags in metrics:
self.count(metric_name, metric_value, tags=list(set(metrics_tags + tags)))

Expand Down
210 changes: 148 additions & 62 deletions postgres/datadog_checks/postgres/statement_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import threading
import time
from concurrent.futures.thread import ThreadPoolExecutor
from enum import Enum

import psycopg2
from cachetools import TTLCache
Expand Down Expand Up @@ -42,12 +43,21 @@
' ',
"""
SELECT * FROM {pg_stat_activity_view}
WHERE datname = %s
AND coalesce(TRIM(query), '') != ''
WHERE coalesce(TRIM(query), '') != ''
AND query_start IS NOT NULL
""",
).strip()

EXPLAIN_VALIDATION_QUERY = "SELECT * FROM pg_stat_activity"


class DBExplainSetupState(Enum):
ok = 1
failed_connect = 2
invalid_schema = 3
failed_function = 4
invalid_result = 5


class PostgresStatementSamples(object):
"""
Expand All @@ -58,15 +68,16 @@ class PostgresStatementSamples(object):

def __init__(self, check, config):
self._check = check
self._db = None
# map[dbname -> psycopg connection]
self._db_pool = {}
self._config = config
self._log = get_check_logger()
self._activity_last_query_start = None
self._last_check_run = 0
self._collection_loop_future = None
self._cancel_event = threading.Event()
self._tags = None
self._tags_str = None
self._tags_no_db = None
self._service = "postgres"
self._db_hostname = resolve_db_host(self._config.host)
self._enabled = is_affirmative(self._config.statement_samples_config.get('enabled', False))
Expand All @@ -78,6 +89,11 @@ def __init__(self, check, config):
'explain_function', 'datadog.explain_statement'
)

self._collection_strategy_cache = TTLCache(
maxsize=self._config.statement_samples_config.get('collection_strategy_cache_maxsize', 1000),
ttl=self._config.statement_samples_config.get('collection_strategy_cache_ttl', 300),
)

# explained_statements_cache: limit how often we try to re-explain the same query
self._explained_statements_cache = TTLCache(
maxsize=int(self._config.statement_samples_config.get('explained_statements_cache_maxsize', 5000)),
Expand All @@ -92,9 +108,19 @@ def __init__(self, check, config):
ttl=60 * 60 / int(self._config.statement_samples_config.get('samples_per_hour_per_query', 15)),
)

# {dbname -> [tags]}
self._db_tags_cache = {}

def cancel(self):
self._cancel_event.set()

def _dbtags(self, db):
"""
Returns the default instance tags with the initial "db" tag replaced with the provided tag
"""
t = ["db:" + db]
return self._tags_no_db + t if self._tags_no_db else t

def run_sampler(self, tags):
"""
start the sampler thread if not already running
Expand All @@ -104,11 +130,15 @@ def run_sampler(self, tags):
if not self._enabled:
self._log.debug("Statement sampler not enabled")
return

# since statement samples are collected from all databases on this host we need to tag telemetry with the
# right "db" tag which may be different from the initial database that the check is configured to connect to
self._tags = tags
self._tags_str = ','.join(self._tags)
self._tags_no_db = [t for t in tags if not t.startswith('db:')]
for t in self._tags:
if t.startswith('service:'):
self._service = t[len('service:') :]

self._last_check_run = time.time()
if self._run_sync or is_affirmative(os.environ.get('DBM_STATEMENT_SAMPLER_RUN_SYNC', "false")):
self._log.debug("Running statement sampler synchronously")
Expand All @@ -121,11 +151,14 @@ def run_sampler(self, tags):
def _get_new_pg_stat_activity(self):
start_time = time.time()
query = PG_STAT_ACTIVITY_QUERY.format(pg_stat_activity_view=self._config.pg_stat_activity_view)
with self._get_db().cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
params = (self._config.dbname,)
if self._activity_last_query_start:
query = query + " AND query_start > %s"
params = params + (self._activity_last_query_start,)
params = ()
if self._config.dbstrict:
query = query + " AND datname = %s"
params = params + (self._config.dbname,)
if self._activity_last_query_start:
query = query + " AND query_start > %s"
params = params + (self._activity_last_query_start,)
with self._get_db(self._config.dbname).cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
self._log.debug("Running query [%s] %s", query, params)
cursor.execute(query, params)
rows = cursor.fetchall()
Expand Down Expand Up @@ -162,19 +195,22 @@ def _filter_valid_statement_rows(self, rows):
tags=self._tags + ["error:insufficient-privilege"],
)

def _get_db(self):
def _get_db(self, dbname):
# while psycopg2 is threadsafe (meaning in theory we should be able to use the same connection as the parent
# check), the parent doesn't use autocommit and instead calls commit() and rollback() explicitly, meaning
# it can cause strange clashing issues if we're trying to use the same connection from another thread here.
# since the statement sampler runs continuously it's best we have our own connection here with autocommit
# enabled
if not self._db or self._db.closed:
self._db = self._check._new_connection()
self._db.set_session(autocommit=True)
if self._db.status != psycopg2.extensions.STATUS_READY:
db = self._db_pool.get(dbname)
if not db or db.closed:
self._log.debug("initializing connection to dbname=%s", dbname)
db = self._check._new_connection(dbname)
db.set_session(autocommit=True)
self._db_pool[dbname] = db
if db.status != psycopg2.extensions.STATUS_READY:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
self._db.rollback()
return self._db
db.rollback()
return db

def _collection_loop(self):
try:
Expand Down Expand Up @@ -207,15 +243,16 @@ def _collection_loop(self):
)
finally:
self._log.info("Shutting down statement sampler collection loop")
self._close_db_conn()
self._close_db_pool()

def _close_db_conn(self):
if self._db and not self._db.closed:
try:
self._db.close()
except Exception:
self._log.exception("failed to close DB connection")
self._db = None
def _close_db_pool(self):
for dbname, db in self._db_pool.items():
if db and not db.closed:
try:
db.close()
except Exception:
self._log.exception("failed to close DB connection for db=%s", dbname)
self._db_pool[dbname] = None

def _collect_statement_samples(self):
self._rate_limiter.sleep()
Expand Down Expand Up @@ -252,61 +289,108 @@ def _can_explain_statement(self, obfuscated_statement):
return False
return True

def _run_explain(self, statement, obfuscated_statement):
if not self._can_explain_statement(obfuscated_statement):
return None
with self._get_db().cursor() as cursor:
try:
start_time = time.time()
self._log.debug("Running query: %s(%s)", self._explain_function, obfuscated_statement)
cursor.execute(
"""SELECT {explain_function}($stmt${statement}$stmt$)""".format(
explain_function=self._explain_function, statement=statement
)
)
result = cursor.fetchone()
self._check.histogram(
"dd.postgres.run_explain.time", (time.time() - start_time) * 1000, tags=self._tags
)
except psycopg2.errors.UndefinedFunction:
self._log.warning(
"Failed to collect execution plan due to undefined explain_function=%s",
self._explain_function,
)
self._check.count(
"dd.postgres.statement_samples.error", 1, tags=self._tags + ["error:explain-undefined-function"]
)
return None
except Exception as e:
self._log.debug("Failed to collect execution plan. query='%s'", obfuscated_statement, exc_info=1)
self._check.count(
"dd.postgres.statement_samples.error", 1, tags=self._tags + ["error:explain-{}".format(type(e))]
def _get_db_explain_setup_state(self, dbname):
try:
self._get_db(dbname)
except (psycopg2.DatabaseError, psycopg2.OperationalError) as e:
self._log.warning(
"cannot collect execution plans due to failed DB connection to dbname=%s: %s", dbname, repr(e)
)
return DBExplainSetupState.failed_connect

try:
result = self._run_explain(dbname, EXPLAIN_VALIDATION_QUERY, EXPLAIN_VALIDATION_QUERY)
except psycopg2.errors.InvalidSchemaName as e:
self._log.warning("cannot collect execution plans due to invalid schema in dbname=%s: %s", dbname, repr(e))
return DBExplainSetupState.invalid_schema
except psycopg2.DatabaseError as e:
# if the schema is valid then it's some problem with the function (missing, or invalid permissions,
# incorrect definition)
self._log.warning("cannot collect execution plans in dbname=%s: %s", dbname, repr(e))
return DBExplainSetupState.failed_function

if not result:
return DBExplainSetupState.invalid_result

return DBExplainSetupState.ok

def _get_db_explain_setup_state_cached(self, dbname):
explain_setup_state = self._collection_strategy_cache.get(dbname)
if explain_setup_state:
self._log.debug("using cached explain_setup_state for DB '%s': %s", dbname, explain_setup_state)
return explain_setup_state

explain_setup_state = self._get_db_explain_setup_state(dbname)
self._collection_strategy_cache[dbname] = explain_setup_state
self._log.debug("caching new explain_setup_state for DB '%s': %s", dbname, explain_setup_state)

# try explain pg_stat_activity
return explain_setup_state

def _run_explain(self, dbname, statement, obfuscated_statement):
start_time = time.time()
with self._get_db(dbname).cursor() as cursor:
self._log.debug("Running query on dbname=%s: %s(%s)", dbname, self._explain_function, obfuscated_statement)
cursor.execute(
"""SELECT {explain_function}($stmt${statement}$stmt$)""".format(
explain_function=self._explain_function, statement=statement
)
)
result = cursor.fetchone()
self._check.histogram(
"dd.postgres.run_explain.time", (time.time() - start_time) * 1000, tags=self._dbtags(dbname)
)
if not result or len(result) < 1 or len(result[0]) < 1:
return None
if not result or len(result) < 1 or len(result[0]) < 1:
return result[0][0]

def _run_explain_safe(self, dbname, statement, obfuscated_statement):
if not self._can_explain_statement(obfuscated_statement):
return None

explain_setup_state = self._get_db_explain_setup_state_cached(dbname)
if explain_setup_state != DBExplainSetupState.ok:
self._check.count(
"dd.postgres.statement_samples.error",
1,
tags=self._dbtags(dbname) + ["error:explain-{}".format(explain_setup_state)],
)
return None

try:
return self._run_explain(dbname, statement, obfuscated_statement)
except psycopg2.errors.DatabaseError as e:
self._log.warning("Failed to collect execution plan: %s", repr(e))
self._check.count(
"dd.postgres.statement_samples.error",
1,
tags=self._dbtags(dbname) + ["error:explain-{}".format(type(e))],
)
return None
return result[0][0]

def _collect_plan_for_statement(self, row):
try:
obfuscated_statement = datadog_agent.obfuscate_sql(row['query'])
except Exception as e:
self._log.debug("Failed to obfuscate statement: %s", e)
self._check.count("dd.postgres.statement_samples.error", 1, tags=self._tags + ["error:sql-obfuscate"])
self._check.count(
"dd.postgres.statement_samples.error", 1, tags=self._dbtags(row['datname']) + ["error:sql-obfuscate"]
)
return None

# limit the rate of explains done to the database
query_signature = compute_sql_signature(obfuscated_statement)
if query_signature in self._explained_statements_cache:
cache_key = (row['datname'], query_signature)
if cache_key in self._explained_statements_cache:
return None
self._explained_statements_cache[query_signature] = True
self._explained_statements_cache[cache_key] = True

# Plans have several important signatures to tag events with. Note that for postgres, the
# query_signature and resource_hash will be the same value.
# - `plan_signature` - hash computed from the normalized JSON plan to group identical plan trees
# - `resource_hash` - hash computed off the raw sql text to match apm resources
# - `query_signature` - hash computed from the raw sql text to match query metrics
plan_dict = self._run_explain(row['query'], obfuscated_statement)
plan_dict = self._run_explain_safe(row['datname'], row['query'], obfuscated_statement)
plan, normalized_plan, obfuscated_plan, plan_signature, plan_cost = None, None, None, None, None
if plan_dict:
plan = json.dumps(plan_dict)
Expand All @@ -324,7 +408,7 @@ def _collect_plan_for_statement(self, row):
"host": self._db_hostname,
"service": self._service,
"ddsource": "postgres",
"ddtags": self._tags_str,
"ddtags": ",".join(self._dbtags(row['datname'])),
"network": {
"client": {
"ip": row.get('client_addr', None),
Expand Down Expand Up @@ -364,7 +448,9 @@ def _explain_pg_stat_activity(self, rows):
if event:
yield event
except Exception:
self._log.exception("Crashed trying to collect execution plan for statement")
self._log.exception(
"Crashed trying to collect execution plan for statement in dbname=%s", row['datname']
)
self._check.count(
"dd.postgres.statement_samples.error",
1,
Expand Down
Loading

0 comments on commit 2f3f9fe

Please sign in to comment.