Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add truncated statement indicator to mysql query sample events #9620

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 47 additions & 27 deletions mysql/datadog_checks/mysql/statement_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@
)


class StatementTruncationState(Enum):
"""
Denotes the various possible states of a statement's truncation
"""

truncated = 'truncated'
not_truncated = 'not_truncated'


class DBExplainError(Enum):
"""
Denotes the various reasons a query may not have an explain statement.
Expand All @@ -224,6 +233,9 @@ class DBExplainError(Enum):
# agent may not have access to the default schema
use_schema_error = 'use_schema_error'

# a truncated statement can't be explained
statement_truncated = 'statement_truncated'


class MySQLStatementSamples(object):
"""
Expand Down Expand Up @@ -452,7 +464,6 @@ def _get_new_events_statements(self, events_statements_table, row_limit):

def _filter_valid_statement_rows(self, rows):
num_sent = 0
num_truncated = 0

for row in rows:
if not row or not all(row):
Expand All @@ -461,27 +472,13 @@ def _filter_valid_statement_rows(self, rows):
sql_text = row['sql_text']
if not sql_text:
continue
# The SQL_TEXT column will store 1024 chars by default. Plans cannot be captured on truncated
# queries, so the `performance_schema_max_sql_text_length` variable must be raised.
if sql_text[-3:] == '...':
num_truncated += 1
continue
yield row
# only save the checkpoint for rows that we have successfully processed
# else rows that we ignore can push the checkpoint forward causing us to miss some on the next run
if row['timer_start'] > self._checkpoint:
self._checkpoint = row['timer_start']
num_sent += 1

if num_truncated > 0:
self._log.warning(
'Unable to collect %d/%d statement samples due to truncated SQL text. Consider raising '
'`performance_schema_max_sql_text_length` to capture these queries.',
num_truncated,
num_truncated + num_sent,
)
self._check.count("dd.mysql.statement_samples.error", 1, tags=self._tags + ["error:truncated-sql-text"])

def _collect_plan_for_statement(self, row):
# Plans have several important signatures to tag events with:
# - `plan_signature` - hash computed from the normalized JSON plan to group identical plan trees
Expand All @@ -506,16 +503,31 @@ def _collect_plan_for_statement(self, row):
return None

plan, explain_errors = None, None
with closing(self._get_db_connection().cursor()) as cursor:
try:
plan, explain_errors = self._explain_statement(
cursor, row['sql_text'], row['current_schema'], obfuscated_statement
)
except Exception as e:
self._check.count(
"dd.mysql.statement_samples.error", 1, tags=self._tags + ["error:explain-{}".format(type(e))]
truncated = self._get_truncation_state(row['sql_text'])
if truncated == StatementTruncationState.truncated:
self._check.count(
"dd.mysql.statement_samples.error",
1,
tags=self._tags + ["error:explain-{}".format(DBExplainError.statement_truncated)],
)
explain_errors = [
(
None,
DBExplainError.statement_truncated,
'truncated length: {}'.format(len(row['sql_text'])),
)
self._log.exception("Failed to explain statement: %s", obfuscated_statement)
]
else:
with closing(self._get_db_connection().cursor()) as cursor:
try:
plan, explain_errors = self._explain_statement(
cursor, row['sql_text'], row['current_schema'], obfuscated_statement
)
except Exception as e:
self._check.count(
"dd.mysql.statement_samples.error", 1, tags=self._tags + ["error:explain-{}".format(type(e))]
)
self._log.exception("Failed to explain statement: %s", obfuscated_statement)

collection_errors = []
if explain_errors:
Expand All @@ -524,7 +536,7 @@ def _collect_plan_for_statement(self, row):
{
'strategy': strategy if strategy else None,
'code': explain_err_code.value if explain_err_code else None,
'message': '{}'.format(type(explain_err)) if explain_err else None,
'message': explain_err if explain_err else None,
}
)

Expand Down Expand Up @@ -559,6 +571,7 @@ def _collect_plan_for_statement(self, row):
"query_signature": query_signature,
"resource_hash": apm_resource_hash,
"statement": obfuscated_statement,
"statement_truncated": truncated.value,
},
'mysql': {k: v for k, v in row.items() if k not in EVENTS_STATEMENTS_SAMPLE_EXCLUDE_KEYS},
}
Expand Down Expand Up @@ -729,7 +742,7 @@ def _explain_statement(self, cursor, statement, schema, obfuscated_statement):
except pymysql.err.DatabaseError as e:
if len(e.args) != 2:
raise
explain_errors = [(None, DBExplainError.use_schema_error, e)]
explain_errors = [(None, DBExplainError.use_schema_error, '{}'.format(type(e)))]
if e.args[0] in PYMYSQL_NON_RETRYABLE_ERRORS:
self._collection_strategy_cache[strategy_cache_key] = explain_errors
self._check.count(
Expand Down Expand Up @@ -783,7 +796,7 @@ def _explain_statement(self, cursor, statement, schema, obfuscated_statement):
# we don't cache failed plan collection failures for specific queries because some queries in a schema
# can fail while others succeed. The failed collection will be cached for the specific query
# so we won't try to explain it again for the cache duration there.
explain_errors.append((strategy, DBExplainError.database_error, e))
explain_errors.append((strategy, DBExplainError.database_error, '{}'.format(type(e))))
self._check.count(
"dd.mysql.statement_samples.error",
1,
Expand Down Expand Up @@ -834,3 +847,10 @@ def _parse_execution_plan_cost(execution_plan):
"""
cost = json.loads(execution_plan).get('query_block', {}).get('cost_info', {}).get('query_cost', 0.0)
return float(cost or 0.0)

@staticmethod
def _get_truncation_state(statement):
# Mysql adds 3 dots at the end of truncated statements so we use this to check if
# a statement is truncated
truncated = statement[-3:] == '...'
return StatementTruncationState.truncated if truncated else StatementTruncationState.not_truncated
49 changes: 43 additions & 6 deletions mysql/tests/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from datadog_checks.base.utils.serialization import json
from datadog_checks.dev.utils import get_metadata_metrics
from datadog_checks.mysql import MySql, statements
from datadog_checks.mysql.statement_samples import MySQLStatementSamples
from datadog_checks.mysql.statement_samples import MySQLStatementSamples, StatementTruncationState
from datadog_checks.mysql.version_utils import get_version

from . import common, tags, variables
Expand Down Expand Up @@ -402,21 +402,23 @@ def run_query(q):
@pytest.mark.usefixtures('dd_environment')
@pytest.mark.parametrize(
"events_statements_table",
["events_statements_current", "events_statements_history", "events_statements_history_long", None],
["events_statements_history_long"],
)
@pytest.mark.parametrize("explain_strategy", ['PROCEDURE', 'FQ_PROCEDURE', 'STATEMENT', None])
@pytest.mark.parametrize(
"schema,statement,expected_collection_errors",
"schema,statement,expected_collection_errors,expected_statement_truncated",
[
(
None,
'select name as nam from testdb.users',
[{'strategy': 'PROCEDURE', 'code': 'procedure_strategy_requires_default_schema', 'message': None}],
StatementTruncationState.not_truncated.value,
),
(
'information_schema',
'select * from testdb.users',
[{'strategy': 'PROCEDURE', 'code': 'database_error', 'message': "<class 'pymysql.err.OperationalError'>"}],
StatementTruncationState.not_truncated.value,
),
(
'testdb',
Expand All @@ -428,10 +430,30 @@ def run_query(q):
'message': "<class 'pymysql.err.ProgrammingError'>",
}
],
StatementTruncationState.not_truncated.value,
),
(
'testdb',
'SELECT {} FROM users where '
'name=\'Johannes Chrysostomus Wolfgangus Theophilus Mozart\''.format(
", ".join("name as name{}".format(i) for i in range(244))
),
[
{
'strategy': None,
'code': 'statement_truncated',
'message': 'truncated length: {}'.format(
4096
if MYSQL_VERSION_PARSED > parse_version('5.6') and environ.get('MYSQL_FLAVOR') != 'mariadb'
else 1024
),
}
],
StatementTruncationState.truncated.value,
),
],
)
@pytest.mark.parametrize("aurora_replication_role", [None, "writer", "reader"])
@pytest.mark.parametrize("aurora_replication_role", ["reader"])
def test_statement_samples_collect(
aggregator,
dbm_instance,
Expand All @@ -441,6 +463,7 @@ def test_statement_samples_collect(
schema,
statement,
expected_collection_errors,
expected_statement_truncated,
aurora_replication_role,
caplog,
):
Expand Down Expand Up @@ -486,8 +509,21 @@ def test_statement_samples_collect(
logger.debug("done second check")

events = aggregator.get_event_platform_events("dbm-samples")
matching = [e for e in events if e['db']['statement'] == statement]

# Match against the statement itself if it's below the statement length limit or its truncated form which is
# the first 1024/4096 bytes (depending on the mysql version) with the last 3 replaced by '...'
expected_statement_prefix = (
statement[:1021] + '...'
if len(statement) > 1024
and (MYSQL_VERSION_PARSED == parse_version('5.6') or environ.get('MYSQL_FLAVOR') == 'mariadb')
else statement[:4093] + '...'
if len(statement) > 4096
else statement
)

matching = [e for e in events if expected_statement_prefix.startswith(e['db']['statement'])]
assert len(matching) > 0, "should have collected an event"

with_plans = [e for e in matching if e['db']['plan']['definition'] is not None]
if schema == 'testdb' and explain_strategy == 'FQ_PROCEDURE':
# explain via the FQ_PROCEDURE will fail if a query contains non-fully-qualified tables because it will
Expand All @@ -500,13 +536,14 @@ def test_statement_samples_collect(
elif not schema and explain_strategy == 'PROCEDURE':
# if there is no default schema then we cannot use the non-fully-qualified procedure strategy
assert not with_plans, "should not have collected any plans"
else:
elif not expected_statement_truncated:
event = with_plans[0]
assert 'query_block' in json.loads(event['db']['plan']['definition']), "invalid json execution plan"
assert set(event['ddtags'].split(',')) == expected_tags

# Validate the events to ensure we've provided an explanation for not providing an exec plan
for event in matching:
assert event['db']['statement_truncated'] == expected_statement_truncated
if event['db']['plan']['definition'] is None:
assert event['db']['plan']['collection_errors'] == expected_collection_errors
else:
Expand Down