Skip to content

Commit

Permalink
Collect postgres statement samples & execution plans
Browse files Browse the repository at this point in the history
**What does this PR do?**

Adds a new feature to "Deep Database Monitoring", enabling collection of statement samples and execution plans. Follow-up to #7852.

**How does it work?***

If enabled, a python thread is launched during a regular check run:
* collects statement samples at the configured rate limit (default 1 collection per second)
* maintains its own `psycopg2` connection to avoid clashing transactions/state with the main thread connection
* shuts down if it detects that the main check has not run for two collection intervals
* collects execution plans through a postgres function that the user must install into each database being monitored (if we wanted the agent to collect execution plans directly by running `EXPLAIN` then it would need full write permission to all tables)

During one "collection" we do the following:
1. read out all new statements from `pg_stat_activity`
1. try to collect a execution plan for each statement
1. submit events directly to the new database monitoring event intake

**Rate limiting**

There are several different rate limits to keep load on the database to a minimum and to avoid reingesting duplicate events:
* `collections_per_second`: limits how often collections are done (each collection is a query to `pg_stat_activity`)
* `explained_statements_cache`: limits how often we attempt to collect an execution plan for a given normalized query
* `seen_samples_cache`: limits how often we ingest statement samples for the same normalized query and execution plan

**Configuration**

We're adding a new `statement_samples` postgres instance config section. Here is the full set of available configuration showing the default settings:
```yaml
statement_samples:
   enabled: false
   collections_per_second: 1
   explain_function: 'datadog.explain_statement'
   explained_statements_cache_maxsize: 5000
   explained_statements_per_hour_per_query: 60
   seen_samples_cache_maxsize: 10000
   samples_per_hour_per_query: 15
```
  • Loading branch information
djova committed Feb 15, 2021
1 parent b0d4c1f commit 72d02c6
Show file tree
Hide file tree
Showing 14 changed files with 786 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ binary==1.0.0
boto3==1.10.27
boto==2.46.1
botocore==1.13.42
cachetools==3.1.1
clickhouse-cityhash==1.0.2.3
clickhouse-driver==0.1.5
contextlib2==0.6.0; python_version < "3.0"
Expand Down
6 changes: 6 additions & 0 deletions datadog_checks_base/datadog_checks/base/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ def trace(self, msg, *args, **kwargs):
def warn(self, msg, *args, **kwargs):
self.log(logging.WARNING, msg, *args, **kwargs)

def getEffectiveLevel(self):
"""
Get the effective level for the underlying logger.
"""
return self.logger.getEffectiveLevel()


class CheckLogFormatter(logging.Formatter):
def format(self, record):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# (C) Datadog, Inc. 2018-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import re


class DatadogAgentStub(object):
Expand Down Expand Up @@ -65,7 +66,11 @@ def read_persistent_cache(self, key):
return self._cache.get(key, '')

def obfuscate_sql(self, query):
return query
# this is only whitespace cleanup, NOT obfuscation
return re.sub(r'\s+', ' ', query or '')

def obfuscate_sql_exec_plan(self, plan, normalize=False):
return plan


# Use the stub as a singleton
Expand Down
15 changes: 15 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import unicode_literals

import json

import mmh3

# Unicode character "Arabic Decimal Separator" (U+066B) is a character which looks like an ascii
# comma, but is not treated like a comma when parsing metrics tags. This is used to replace
# commas so that tags which have commas in them (such as SQL queries) properly display.


ARABIC_DECIMAL_SEPARATOR = ','


Expand Down Expand Up @@ -40,3 +44,14 @@ def normalize_query_tag(query):
"""
query = query.replace(', ', '{} '.format(ARABIC_DECIMAL_SEPARATOR)).replace(',', ARABIC_DECIMAL_SEPARATOR)
return query


def compute_exec_plan_signature(normalized_json_plan):
"""
Given an already normalized json string query execution plan, generate its 64-bit hex signature.
TODO: try to push this logic into the agent go code to avoid the two extra json serialization steps here
"""
if not normalized_json_plan:
return None
with_sorted_keys = json.dumps(json.loads(normalized_json_plan), sort_keys=True)
return format(mmh3.hash64(with_sorted_keys, signed=False)[0], 'x')
134 changes: 134 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/statement_samples.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import datetime
import decimal
import itertools
import json
import logging

import requests
from requests.adapters import HTTPAdapter, Retry

try:
import datadog_agent

using_stub_datadog_agent = False
except ImportError:
from ....stubs import datadog_agent

using_stub_datadog_agent = True

logger = logging.getLogger(__file__)


class EventEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
return float(o)
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
return super(EventEncoder, self).default(o)


def _chunks(items, n):
it = iter(items)
while True:
chunk = tuple(itertools.islice(it, n))
if not chunk:
return
yield chunk


def _new_api_session(api_key):
http = requests.Session()
http.mount(
"https://", HTTPAdapter(max_retries=Retry(connect=2, read=2, redirect=2, status=2, method_whitelist=['POST']))
)
http.headers.update({'DD-API-KEY': api_key})
return http


def _event_intake_url(host):
if host.endswith("."):
host = host[:-1]
if not host.startswith("https://"):
host = "https://" + host
return host + "/v1/input"


default_dbm_url = "dbquery-http-intake.logs.datadoghq.com"


def _load_event_endpoints_from_config(config_prefix, default_url):
"""
Returns a list of requests sessions and their endpoint urls [(http, url), ...]
Requests sessions are initialized the first time this is called and reused thereafter
:return: list of (http, url)
:param config_prefix:
:param default_url:
:return:
"""
url = _event_intake_url(datadog_agent.get_config('{}.dd_url'.format(config_prefix)) or default_url)
endpoints = [(_new_api_session(datadog_agent.get_config('api_key')), url)]
logger.debug("initializing event endpoints from %s. url=%s", config_prefix, url)

for additional_endpoint in datadog_agent.get_config('{}.additional_endpoints'.format(config_prefix)) or []:
api_key, host = additional_endpoint.get('api_key'), additional_endpoint.get('host')
missing_keys = [k for k, v in [('api_key', api_key), ('host', host)] if not v]
if missing_keys:
logger.warning(
"invalid event endpoint found in %s.additional_endpoints. missing required keys %s",
config_prefix,
', '.join(missing_keys),
)
continue
url = _event_intake_url(host)
endpoints.append((_new_api_session(api_key), url))
logger.debug("initializing additional event endpoint from %s. url=%s", config_prefix, url)

return endpoints


class StatementSamplesClient:
def __init__(self):
self._endpoints = _load_event_endpoints_from_config("database_monitoring", default_dbm_url)

def submit_events(self, events):
"""
Submit the statement sample events to the event intake
:return: submitted_count, failed_count
"""
submitted_count = 0
failed_count = 0
for chunk in _chunks(events, 100):
for http, url in self._endpoints:
try:
r = http.request(
'post',
url,
data=json.dumps(chunk, cls=EventEncoder),
timeout=5,
headers={'Content-Type': 'application/json'},
)
r.raise_for_status()
logger.debug("Submitted %s statement samples to %s", len(chunk), url)
submitted_count += len(chunk)
except requests.HTTPError as e:
logger.warning("Failed to submit statement samples to %s: %s", url, e)
failed_count += len(chunk)
except Exception:
logger.exception("Failed to submit statement samples to %s", url)
failed_count += len(chunk)
return submitted_count, failed_count


class StubStatementSamplesClient:
def __init__(self):
self._events = []

def submit_events(self, events):
events = list(events)
self._events.extend(events)
return len(events), 0


statement_samples_client = StubStatementSamplesClient() if using_stub_datadog_agent else StatementSamplesClient()
67 changes: 65 additions & 2 deletions datadog_checks_base/datadog_checks/base/utils/db/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
# (C) Datadog, Inc. 2019-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import logging
import socket
import time
from itertools import chain

try:
import datadog_agent
except ImportError:
from ....stubs import datadog_agent

logger = logging.getLogger(__file__)

# AgentCheck methods to transformer name e.g. set_metadata -> metadata
SUBMISSION_METHODS = {
'gauge': 'gauge',
Expand All @@ -19,11 +29,9 @@


def create_submission_transformer(submit_method):

# During the compilation phase every transformer will have access to all the others and may be
# passed the first arguments (e.g. name) that will be forwarded the actual AgentCheck methods.
def get_transformer(_transformers, *creation_args, **modifiers):

# The first argument of every transformer is a map of named references to collected values.
def transformer(_sources, *call_args, **kwargs):
kwargs.update(modifiers)
Expand Down Expand Up @@ -52,3 +60,58 @@ def transformer(sources, **kwargs):
transformer = column_transformer

return transformer


class ConstantRateLimiter:
"""
Basic rate limiter that sleeps long enough to ensure the rate limit is not exceeded. Not thread safe.
"""

def __init__(self, rate_limit_s):
"""
:param rate_limit_s: rate limit in seconds
"""
self.rate_limit_s = rate_limit_s
self.period_s = 1 / rate_limit_s if rate_limit_s > 0 else 0
self.last_event = 0

def sleep(self):
"""
Sleeps long enough to enforce the rate limit
"""
elapsed_s = time.time() - self.last_event
sleep_amount = max(self.period_s - elapsed_s, 0)
time.sleep(sleep_amount)
self.last_event = time.time()


def resolve_db_host(db_host):
agent_hostname = datadog_agent.get_hostname()
if not db_host or db_host in {'localhost', '127.0.0.1'}:
return agent_hostname

try:
host_ip = socket.gethostbyname(db_host)
except socket.gaierror as e:
# could be connecting via a unix domain socket
logger.debug(
"failed to resolve DB host '%s' due to socket.gaierror(%s). falling back to agent hostname: %s",
db_host,
e,
agent_hostname,
)
return agent_hostname

try:
agent_host_ip = socket.gethostbyname(agent_hostname)
if agent_host_ip == host_ip:
return agent_hostname
except socket.gaierror as e:
logger.debug(
"failed to resolve agent host '%s' due to socket.gaierror(%s). using DB host: %s",
agent_hostname,
e,
db_host,
)

return db_host
4 changes: 4 additions & 0 deletions postgres/datadog_checks/postgres/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ def __init__(self, instance):
self.service_check_tags = self._get_service_check_tags()
self.custom_metrics = self._get_custom_metrics(instance.get('custom_metrics', []))
self.max_relations = int(instance.get('max_relations', 300))
self.min_collection_interval = instance.get('min_collection_interval', 15)

# Deep Database monitoring adds additional telemetry for statement metrics
self.deep_database_monitoring = is_affirmative(instance.get('deep_database_monitoring', False))
# Support a custom view when datadog user has insufficient privilege to see queries
self.pg_stat_statements_view = instance.get('pg_stat_statements_view', 'pg_stat_statements')
# statement samples & execution plans
self.pg_stat_activity_view = instance.get('pg_stat_activity_view', 'pg_stat_activity')
self.statement_samples_config = instance.get('statement_samples', {}) or {}

def _build_tags(self, custom_tags):
# Clean up tags in case there was a None entry in the instance
Expand Down
Loading

0 comments on commit 72d02c6

Please sign in to comment.