Skip to content

Commit

Permalink
decouple DBM query metrics interval from check run interval
Browse files Browse the repository at this point in the history
* decouple the DBM metrics collection interval from the check run interval
* set default DBM metrics collection interval to 10s
* change `statement_samples.collections_per_second` to `statement_samples.collection_interval` so it matches the new `statement_metrics.collection_interval` key

Depends on #9656

Motivation: being able to configure the DBM metrics collection interval separately from the check run interval enables us to use a 10 second interval (by default) for the query metrics. There are various difficulties when querying metrics that have a 15 second interval (i.e. ensuring a correct rollup window for varying time ranges) that don't exist with a 10 second interval.
  • Loading branch information
djova committed Jul 8, 2021
1 parent b9849ce commit 5ac9c7c
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 154 deletions.
1 change: 1 addition & 0 deletions mysql/datadog_checks/mysql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, instance):
'full_statement_text_samples_per_hour_per_query', 1
)
self.statement_samples_config = instance.get('statement_samples', {}) or {}
self.statement_metrics_config = instance.get('statement_metrics', {}) or {}
self.min_collection_interval = instance.get('min_collection_interval', 15)
self.configuration_checks()

Expand Down
7 changes: 4 additions & 3 deletions mysql/datadog_checks/mysql/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(self, name, init_config, instances):
self.check_initializations.append(self._query_manager.compile_queries)
self.innodb_stats = InnoDBMetrics()
self.check_initializations.append(self._config.configuration_checks)
self._statement_metrics = MySQLStatementMetrics(self, self._config)
self._statement_metrics = MySQLStatementMetrics(self, self._config, self._get_connection_args())
self._statement_samples = MySQLStatementSamples(self, self._config, self._get_connection_args())

def execute_query_raw(self, query):
Expand Down Expand Up @@ -128,8 +128,8 @@ def check(self, _):
self._collect_system_metrics(self._config.host, db, tags)
if self._config.dbm_enabled:
dbm_tags = list(set(self.service_check_tags) | set(tags))
self._statement_metrics.collect_per_statement_metrics(db, dbm_tags)
self._statement_samples.run_sampler(dbm_tags)
self._statement_metrics.run_job_loop(dbm_tags)
self._statement_samples.run_job_loop(dbm_tags)

# keeping track of these:
self._put_qcache_stats()
Expand All @@ -145,6 +145,7 @@ def check(self, _):

def cancel(self):
self._statement_samples.cancel()
self._statement_metrics.cancel()

def _set_qcache_stats(self):
host_key = self._get_host_key()
Expand Down
142 changes: 40 additions & 102 deletions mysql/datadog_checks/mysql/statement_samples.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import logging
import os
import re
import threading
import time
from concurrent.futures.thread import ThreadPoolExecutor
from contextlib import closing
from enum import Enum

Expand All @@ -16,14 +12,8 @@
from ..stubs import datadog_agent

from datadog_checks.base import is_affirmative
from datadog_checks.base.log import get_check_logger
from datadog_checks.base.utils.db.sql import compute_exec_plan_signature, compute_sql_signature
from datadog_checks.base.utils.db.utils import (
ConstantRateLimiter,
RateLimitingTTLCache,
default_json_event_encoding,
resolve_db_host,
)
from datadog_checks.base.utils.db.utils import DBMAsyncJob, RateLimitingTTLCache, default_json_event_encoding
from datadog_checks.base.utils.serialization import json

SUPPORTED_EXPLAIN_STATEMENTS = frozenset({'select', 'table', 'delete', 'insert', 'replace', 'update', 'with'})
Expand All @@ -41,11 +31,11 @@
]

# default sampling settings for events_statements_* tables
# rate limit is in samples/second
# {table -> rate-limit}
DEFAULT_EVENTS_STATEMENTS_COLLECTIONS_PER_SECOND = {
'events_statements_history_long': 0.1,
'events_statements_history': 0.1,
# collection interval is in seconds
# {table -> interval}
DEFAULT_EVENTS_STATEMENTS_COLLECTION_INTERVAL = {
'events_statements_history_long': 10,
'events_statements_history': 10,
'events_statements_current': 1,
}

Expand Down Expand Up @@ -237,32 +227,35 @@ class DBExplainError(Enum):
statement_truncated = 'statement_truncated'


class MySQLStatementSamples(object):
class MySQLStatementSamples(DBMAsyncJob):
"""
Collects statement samples and execution plans.
"""

executor = ThreadPoolExecutor()

def __init__(self, check, config, connection_args):
self._check = check
collection_interval = float(config.statement_metrics_config.get('collection_interval', 1))
if collection_interval <= 0:
collection_interval = 1
super(MySQLStatementSamples, self).__init__(
check,
rate_limit=1 / collection_interval,
run_sync=is_affirmative(config.statement_samples_config.get('run_sync', False)),
enabled=is_affirmative(config.statement_samples_config.get('enabled', True)),
min_collection_interval=config.min_collection_interval,
config_host=config.host,
dbms="mysql",
expected_db_exceptions=(pymysql.err.DatabaseError,),
job_name="statement-samples",
shutdown_callback=self._close_db_conn,
)
self._config = config
self._version_processed = False
self._connection_args = connection_args
# checkpoint at zero so we pull the whole history table on the first run
self._checkpoint = 0
self._log = get_check_logger()
self._last_check_run = 0
self._db = None
self._tags = None
self._tags_str = None
self._collection_loop_future = None
self._cancel_event = threading.Event()
self._rate_limiter = ConstantRateLimiter(1)
self._config = config
self._db_hostname = resolve_db_host(self._config.host)
self._enabled = is_affirmative(self._config.statement_samples_config.get('enabled', True))
self._run_sync = is_affirmative(self._config.statement_samples_config.get('run_sync', False))
self._collections_per_second = self._config.statement_samples_config.get('collections_per_second', -1)
self._configured_collection_interval = self._config.statement_samples_config.get('collection_interval', -1)
self._events_statements_row_limit = self._config.statement_samples_config.get(
'events_statements_row_limit', 5000
)
Expand All @@ -280,14 +273,14 @@ def __init__(self, check, config, connection_args):
self._has_window_functions = False
events_statements_table = self._config.statement_samples_config.get('events_statements_table', None)
if events_statements_table:
if events_statements_table in DEFAULT_EVENTS_STATEMENTS_COLLECTIONS_PER_SECOND:
if events_statements_table in DEFAULT_EVENTS_STATEMENTS_COLLECTION_INTERVAL:
self._log.debug("Configured preferred events_statements_table: %s", events_statements_table)
self._preferred_events_statements_tables = [events_statements_table]
else:
self._log.warning(
"Invalid events_statements_table: %s. Must be one of %s. Falling back to trying all tables.",
events_statements_table,
', '.join(DEFAULT_EVENTS_STATEMENTS_COLLECTIONS_PER_SECOND.keys()),
', '.join(DEFAULT_EVENTS_STATEMENTS_COLLECTION_INTERVAL.keys()),
)
self._explain_strategies = {
'PROCEDURE': self._run_explain_procedure,
Expand Down Expand Up @@ -317,39 +310,14 @@ def _init_caches(self):
ttl=60 * 60 / self._config.statement_samples_config.get('samples_per_hour_per_query', 15),
)

def run_sampler(self, tags):
"""
start the sampler thread if not already running & update tag metadata
:param tags:
:return:
"""
if not self._enabled:
self._log.debug("Statement sampler not enabled")
return
self._tags = tags
self._tags_str = ','.join(tags)
def _read_version_info(self):
if not self._version_processed and self._check.version:
self._has_window_functions = self._check.version.version_compatible((8, 0, 0))
if self._check.version.flavor == "MariaDB" or not self._check.version.version_compatible((5, 7, 0)):
self._global_status_table = "information_schema.global_status"
else:
self._global_status_table = "performance_schema.global_status"
self._version_processed = True
self._last_check_run = time.time()
if self._run_sync or is_affirmative(os.environ.get('DBM_STATEMENT_SAMPLER_RUN_SYNC', "false")):
self._log.debug("Running statement sampler synchronously")
self._collect_statement_samples()
elif self._collection_loop_future is None or not self._collection_loop_future.running():
self._collection_loop_future = MySQLStatementSamples.executor.submit(self.collection_loop)
else:
self._log.debug("Statement sampler collection loop already running")

def cancel(self):
"""
Cancels the collection loop thread if it's running.
Returns immediately, leaving the thread to stop & clean up on its own time.
"""
self._cancel_event.set()

def _get_db_connection(self):
"""
Expand All @@ -370,39 +338,6 @@ def _close_db_conn(self):
finally:
self._db = None

def collection_loop(self):
try:
self._log.info("Starting statement sampler collection loop")
while True:
if self._cancel_event.isSet():
self._log.info("Collection loop cancelled")
self._check.count("dd.mysql.statement_samples.collection_loop_cancel", 1, tags=self._tags)
break
if time.time() - self._last_check_run > self._config.min_collection_interval * 2:
self._log.info("Stopping statement sampler collection loop due to check inactivity")
self._check.count("dd.mysql.statement_samples.collection_loop_inactive_stop", 1, tags=self._tags)
break
self._collect_statement_samples()
except pymysql.err.DatabaseError as e:
self._log.warning(
"Statement sampler database error: %s", e, exc_info=self._log.getEffectiveLevel() == logging.DEBUG
)
self._check.count(
"dd.mysql.statement_samples.error",
1,
tags=self._tags + ["error:collection-loop-database-error-{}".format(type(e))],
)
except Exception as e:
self._log.exception("Statement sampler collection loop crash")
self._check.count(
"dd.mysql.statement_samples.error",
1,
tags=self._tags + ["error:collection-loop-crash-{}".format(type(e))],
)
finally:
self._log.info("Shutting down statement sampler collection loop")
self._close_db_conn()

def _cursor_run(self, cursor, query, params=None, obfuscated_params=None):
"""
Run and log the query. If provided, obfuscated params are logged in place of the regular params.
Expand Down Expand Up @@ -658,30 +593,33 @@ def _get_sample_collection_strategy(self):
)
return None, None

rate_limit = self._collections_per_second
if rate_limit < 0:
rate_limit = DEFAULT_EVENTS_STATEMENTS_COLLECTIONS_PER_SECOND[events_statements_table]
collection_interval = self._configured_collection_interval
if collection_interval < 0:
collection_interval = DEFAULT_EVENTS_STATEMENTS_COLLECTION_INTERVAL[events_statements_table]

# cache only successful strategies
# should be short enough that we'll reflect updates relatively quickly
# i.e., an aurora replica becomes a master (or vice versa).
strategy = (events_statements_table, rate_limit)
strategy = (events_statements_table, collection_interval)
self._log.debug(
"Chose plan collection strategy: events_statements_table=%s, collections_per_second=%s",
"Chose plan collection strategy: events_statements_table=%s, collection_interval=%s",
events_statements_table,
rate_limit,
collection_interval,
)
self._collection_strategy_cache["plan_collection_strategy"] = strategy
return strategy

def run_job(self):
self._collect_statement_samples()

def _collect_statement_samples(self):
self._read_version_info()
self._log.debug("collecting statement samples")
self._rate_limiter.sleep()
events_statements_table, rate_limit = self._get_sample_collection_strategy()
events_statements_table, collection_interval = self._get_sample_collection_strategy()
if not events_statements_table:
return
if self._rate_limiter.rate_limit_s != rate_limit:
self._rate_limiter = ConstantRateLimiter(rate_limit)
self._set_rate_limit(1.0 / collection_interval)

start_time = time.time()

tags = self._tags + ["events_statements_table:{}".format(events_statements_table)]
Expand Down
Loading

0 comments on commit 5ac9c7c

Please sign in to comment.