Skip to content

Commit

Permalink
Refactor replica metrics and add some debug lines
Browse files Browse the repository at this point in the history
  • Loading branch information
hithwen committed Jan 18, 2021
1 parent 40231e3 commit f7470ac
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 74 deletions.
4 changes: 4 additions & 0 deletions mysql/datadog_checks/mysql/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ def __init__(self, instance):
self.password = str(instance.get('pass', ''))
self.tags = self._build_tags(instance.get('tags', []))
self.options = instance.get('options', {}) or {} # options could be None if empty in the YAML
replication_channel = self.options.get('replication_channel')
if replication_channel:
self.tags.append("channel:{0}".format(replication_channel))
self.queries = instance.get('queries', [])
self.ssl = instance.get('ssl', {})
self.connect_timeout = instance.get('connect_timeout', 10)
Expand All @@ -28,6 +31,7 @@ def __init__(self, instance):
self.deep_database_monitoring = is_affirmative(instance.get('deep_database_monitoring', False))
self.configuration_checks()


def _build_tags(self, custom_tags):
tags = list(set(custom_tags)) or []

Expand Down
161 changes: 87 additions & 74 deletions mysql/datadog_checks/mysql/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from contextlib import closing, contextmanager

import pymysql
from datadog_checks.base.types import ServiceCheckStatus
from six import PY3, iteritems, itervalues

from datadog_checks.base import AgentCheck, is_affirmative
Expand Down Expand Up @@ -104,9 +105,7 @@ def check(self, _):
self._send_metadata()

# Metric collection
self._collect_metrics(
db, self.config.tags, self.config.options, self.config.queries, self.config.max_custom_queries
)
self._collect_metrics(db)
self._collect_system_metrics(self.config.host, db, self.config.tags)
if self.config.deep_database_monitoring:
self._collect_statement_metrics(db, self.config.tags)
Expand Down Expand Up @@ -162,7 +161,7 @@ def _get_connection_args(self):

connection_args.update({'user': self.config.user, 'passwd': self.config.password})
if self.config.mysql_sock != '':
self.service_check_tags = [
self.service_check_tags = [
'server:{0}'.format(self.config.mysql_sock),
'port:unix_socket',
] + self.config.tags
Expand Down Expand Up @@ -195,7 +194,7 @@ def _connect(self):
if db:
db.close()

def _collect_metrics(self, db, tags, options, queries, max_custom_queries):
def _collect_metrics(self, db):

# Get aggregate of all VARS we want to collect
metrics = STATUS_VARS
Expand All @@ -204,9 +203,9 @@ def _collect_metrics(self, db, tags, options, queries, max_custom_queries):
results = self._get_stats_from_status(db)
results.update(self._get_stats_from_variables(db))

if not is_affirmative(options.get('disable_innodb_metrics', False)) and self._is_innodb_engine_enabled(db):
if not is_affirmative(self.config.options.get('disable_innodb_metrics', False)) and self._is_innodb_engine_enabled(db):
results.update(self.innodb_stats.get_stats_from_innodb_status(db))
self.innodb_stats.process_innodb_stats(results, options, metrics)
self.innodb_stats.process_innodb_stats(results, self.config.options, metrics)

# Binary log statistics
if self._get_variable_enabled(results, 'log_bin'):
Expand Down Expand Up @@ -235,88 +234,35 @@ def _collect_metrics(self, db, tags, options, queries, max_custom_queries):
metrics.update(INNODB_VARS)
metrics.update(BINLOG_VARS)

if is_affirmative(options.get('extra_status_metrics', False)):
if is_affirmative(self.config.options.get('extra_status_metrics', False)):
self.log.debug("Collecting Extra Status Metrics")
metrics.update(OPTIONAL_STATUS_VARS)

if self.version.version_compatible((5, 6, 6)):
metrics.update(OPTIONAL_STATUS_VARS_5_6_6)

if is_affirmative(options.get('galera_cluster', False)):
if is_affirmative(self.config.options.get('galera_cluster', False)):
# already in result-set after 'SHOW STATUS' just add vars to collect
self.log.debug("Collecting Galera Metrics.")
metrics.update(GALERA_VARS)

performance_schema_enabled = self._get_variable_enabled(results, 'performance_schema')
above_560 = self.version.version_compatible((5, 6, 0))
if is_affirmative(options.get('extra_performance_metrics', False)) and above_560 and performance_schema_enabled:
if is_affirmative(self.config.options.get('extra_performance_metrics', False)) and above_560 and performance_schema_enabled:
# report avg query response time per schema to Datadog
results['perf_digest_95th_percentile_avg_us'] = self._get_query_exec_time_95th_us(db)
results['query_run_time_avg'] = self._query_exec_time_per_schema(db)
metrics.update(PERFORMANCE_VARS)

if is_affirmative(options.get('schema_size_metrics', False)):
if is_affirmative(self.config.options.get('schema_size_metrics', False)):
# report avg query response time per schema to Datadog
results['information_schema_size'] = self._query_size_per_schema(db)
metrics.update(SCHEMA_VARS)

if is_affirmative(options.get('replication', False)):
# Get replica stats
is_mariadb = self.version.flavor == "MariaDB"
replication_channel = options.get('replication_channel')
if replication_channel:
self.service_check_tags.append("channel:{0}".format(replication_channel))
tags.append("channel:{0}".format(replication_channel))
results.update(self._get_replica_stats(db, is_mariadb, replication_channel))
nonblocking = is_affirmative(options.get('replication_non_blocking_status', False))
results.update(self._get_slave_status(db, above_560, nonblocking))
metrics.update(REPLICA_VARS)

# get slave running form global status page
slave_running_status = AgentCheck.UNKNOWN
slave_running = collect_string('Slave_running', results)
binlog_running = results.get('Binlog_enabled', False)
# slaves will only be collected iff user has PROCESS privileges.
slaves = collect_scalar('Slaves_connected', results)
slave_io_running = collect_type('Slave_IO_Running', results, dict)
slave_sql_running = collect_type('Slave_SQL_Running', results, dict)
if slave_io_running:
slave_io_running = any(v.lower().strip() == 'yes' for v in itervalues(slave_io_running))
if slave_sql_running:
slave_sql_running = any(v.lower().strip() == 'yes' for v in itervalues(slave_sql_running))

# MySQL 5.7.x might not have 'Slave_running'. See: https://bugs.mysql.com/bug.php?id=78544
# look at replica vars collected at the top of if-block
if self.version.version_compatible((5, 7, 0)):
if not (slave_io_running is None and slave_sql_running is None):
if slave_io_running and slave_sql_running:
slave_running_status = AgentCheck.OK
elif not slave_io_running and not slave_sql_running:
slave_running_status = AgentCheck.CRITICAL
else:
# not everything is running smoothly
slave_running_status = AgentCheck.WARNING
elif slave_running.lower().strip() == 'off':
if not (slave_io_running is None and slave_sql_running is None):
if not slave_io_running and not slave_sql_running:
slave_running_status = AgentCheck.CRITICAL

# if we don't yet have a status - inspect
if slave_running_status == AgentCheck.UNKNOWN:
if self._is_master(slaves, results): # master
if slaves > 0 and binlog_running:
slave_running_status = AgentCheck.OK
else:
slave_running_status = AgentCheck.WARNING
elif slave_running: # slave (or standalone)
if slave_running.lower().strip() == 'on':
slave_running_status = AgentCheck.OK
else:
slave_running_status = AgentCheck.CRITICAL

# deprecated in favor of service_check("mysql.replication.slave_running")
self.gauge(self.SLAVE_SERVICE_CHECK_NAME, 1 if slave_running_status == AgentCheck.OK else 0, tags=tags)
self.service_check(self.SLAVE_SERVICE_CHECK_NAME, slave_running_status, tags=self.service_check_tags)
if is_affirmative(self.config.options.get('replication', False)):
replication_metrics = self._collect_replication_metrics(db, results, above_560)
metrics.update(replication_metrics)
self._check_replication_status(results)

# "synthetic" metrics
metrics.update(SYNTHETIC_VARS)
Expand All @@ -336,19 +282,86 @@ def _collect_metrics(self, db, tags, options, queries, max_custom_queries):
if src in results:
results[dst] = results[src]

self._submit_metrics(metrics, results, tags)
self._submit_metrics(metrics, results, self.config.tags)

# Collect custom query metrics
# Max of 20 queries allowed
if isinstance(queries, list):
for check in queries[:max_custom_queries]:
total_tags = tags + check.get('tags', [])
if isinstance(self.config.queries, list):
for check in self.config.[:self.config.max_custom_queries]:
total_tags = self.config.tags + check.get('tags', [])
self._collect_dict(
check['type'], {check['field']: check['metric']}, check['query'], db, tags=total_tags
)

if len(queries) > max_custom_queries:
self.warning("Maximum number (%s) of custom queries reached. Skipping the rest.", max_custom_queries)
if len(self.config.queries) > self.config.max_custom_queries:
self.warning("Maximum number (%s) of custom queries reached. Skipping the rest.", self.config.max_custom_queries)

def _collect_replication_metrics(self, db, results, above_560):
# Get replica stats
is_mariadb = self.version.flavor == "MariaDB"
replication_channel = self.config.options.get('replication_channel')
results.update(self._get_replica_stats(db, is_mariadb, replication_channel))
nonblocking = is_affirmative(self.config.options.get('replication_non_blocking_status', False))
results.update(self._get_slave_status(db, above_560, nonblocking))
return REPLICA_VARS

def _check_replication_status(self, results):
# get slave running form global status page
slave_running_status = AgentCheck.UNKNOWN # type: ServiceCheckStatus
slave_running = collect_string('Slave_running', results) # type: str
binlog_running = results.get('Binlog_enabled', False) # type: bool
# slaves will only be collected iff user has PROCESS privileges.
slaves = collect_scalar('Slaves_connected', results)
slave_io_running = collect_type('Slave_IO_Running', results, dict)
slave_sql_running = collect_type('Slave_SQL_Running', results, dict)

if slave_io_running:
slave_io_running = any(v.lower().strip() == 'yes' for v in itervalues(slave_io_running))
if slave_sql_running:
slave_sql_running = any(v.lower().strip() == 'yes' for v in itervalues(slave_sql_running))

# MySQL 5.7.x might not have 'Slave_running'. See: https://bugs.mysql.com/bug.php?id=78544
# look at replica vars collected at the top of if-block
if self.version.version_compatible((5, 7, 0)):
self.log.debug("Ignoring Slave_running for MySQL 5.7.0")
if not (slave_io_running is None and slave_sql_running is None):
if slave_io_running and slave_sql_running:
self.log.debug("Slave_IO_Running and Slave_SQL_Running are ok")
slave_running_status = AgentCheck.OK
elif not slave_io_running and not slave_sql_running:
self.log.debug("Slave_IO_Running and Slave_SQL_Running are not ok")
slave_running_status = AgentCheck.CRITICAL
else:
self.log.debug("Either Slave_IO_Running or Slave_SQL_Running are not ok")
# not everything is running smoothly
slave_running_status = AgentCheck.WARNING

elif slave_running.lower().strip() == 'off':
self.log.debug("Slave_running is off")
if not (slave_io_running is None and slave_sql_running is None):
if not slave_io_running and not slave_sql_running:
self.log.debug("Slave_IO_Running and Slave_SQL_Running are not ok")
slave_running_status = AgentCheck.CRITICAL

# if we don't yet have a status - inspect
if slave_running_status == AgentCheck.UNKNOWN:
if self._is_master(slaves, results): # master
if slaves > 0 and binlog_running:
self.log.debug("Host is master, there are slaves and binlog is running")
slave_running_status = AgentCheck.OK
else:
slave_running_status = AgentCheck.WARNING
elif slave_running: # slave (or standalone)
if slave_running.lower().strip() == 'on':
self.log.debug("Slave_running is on")
slave_running_status = AgentCheck.OK
else:
self.log.debug("Slave_running is off")
slave_running_status = AgentCheck.CRITICAL

# deprecated in favor of service_check("mysql.replication.slave_running")
self.gauge(self.SLAVE_SERVICE_CHECK_NAME, 1 if slave_running_status == AgentCheck.OK else 0, tags=self.config.tags)
self.service_check(self.SLAVE_SERVICE_CHECK_NAME, slave_running_status, tags=self.service_check_tags)

def _collect_statement_metrics(self, db, tags):
tags = self.service_check_tags + tags
Expand Down

0 comments on commit f7470ac

Please sign in to comment.