From ca2b8841fa55515358e7fa9dbaa06697f946437b Mon Sep 17 00:00:00 2001 From: tammy-baylis-swi Date: Fri, 15 Nov 2024 16:42:44 -0800 Subject: [PATCH] Add mysql-connector integration factory, own get_traced_* methods --- .../instrumentation/mysql/__init__.py | 250 ++++++++++++++++-- 1 file changed, 225 insertions(+), 25 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-mysql/src/opentelemetry/instrumentation/mysql/__init__.py b/instrumentation/opentelemetry-instrumentation-mysql/src/opentelemetry/instrumentation/mysql/__init__.py index 0528e9a576..23d990fc84 100644 --- a/instrumentation/opentelemetry-instrumentation-mysql/src/opentelemetry/instrumentation/mysql/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-mysql/src/opentelemetry/instrumentation/mysql/__init__.py @@ -107,15 +107,28 @@ --- """ -from typing import Collection +import logging +from typing import ( + Any, + Callable, + Collection, + Dict, + Tuple, +) import mysql.connector +import wrapt +from mysql.connector.cursor_cext import CMySQLCursor +from opentelemetry import trace as trace_api from opentelemetry.instrumentation import dbapi from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.mysql.package import _instruments from opentelemetry.instrumentation.mysql.version import __version__ +_logger = logging.getLogger(__name__) +_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory" + class MySQLInstrumentor(BaseInstrumentor): _CONNECTION_ATTRIBUTES = { @@ -146,6 +159,7 @@ def _instrument(self, **kwargs): self._CONNECTION_ATTRIBUTES, version=__version__, tracer_provider=tracer_provider, + db_api_integration_factory=DatabaseApiIntegration, enable_commenter=enable_sqlcommenter, commenter_options=commenter_options, ) @@ -162,34 +176,220 @@ def instrument_connection( enable_commenter=None, commenter_options=None, ): - """Enable instrumentation in a MySQL connection. + if not hasattr(connection, "_is_instrumented_by_opentelemetry"): + connection._is_instrumented_by_opentelemetry = False + + if not connection._is_instrumented_by_opentelemetry: + setattr( + connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory + ) + connection.cursor_factory = _new_cursor_factory( + tracer_provider=tracer_provider + ) + connection._is_instrumented_by_opentelemetry = True + else: + _logger.warning( + "Attempting to instrument mysql-connector connection while already instrumented" + ) + return connection + + def uninstrument_connection( + self, + connection, + ): + connection.cursor_factory = getattr( + connection, _OTEL_CURSOR_FACTORY_KEY, None + ) - Args: - connection: The connection to instrument. - tracer_provider: The optional tracer provider to use. If omitted - the current globally configured one is used. + return connection - Returns: - An instrumented connection. - """ - return dbapi.instrument_connection( + +class DatabaseApiIntegration(dbapi.DatabaseApiIntegration): + def wrapped_connection( + self, + connect_method: Callable[..., Any], + args: Tuple[Any, Any], + kwargs: Dict[Any, Any], + ): + """Add object proxy to connection object.""" + connection = connect_method(*args, **kwargs) + self.get_connection_attributes(connection) + return get_traced_connection_proxy(connection, self) + + +def get_traced_connection_proxy( + connection, db_api_integration, *args, **kwargs +): + # pylint: disable=abstract-method + class TracedConnectionProxy(wrapt.ObjectProxy): + # pylint: disable=unused-argument + def __init__(self, connection, *args, **kwargs): + wrapt.ObjectProxy.__init__(self, connection) + + def __getattribute__(self, name): + if object.__getattribute__(self, name): + return object.__getattribute__(self, name) + + return object.__getattribute__( + object.__getattribute__(self, "_connection"), name + ) + + def cursor(self, *args, **kwargs): + wrapped_cursor = self.__wrapped__.cursor(*args, **kwargs) + + # It's common to have multiple db client cursors per app, + # so enable_commenter is set at the cursor level and used + # during traced query execution. + enable_commenter_cursor = db_api_integration.enable_commenter + + # If a mysql-connector cursor was created with prepared=True, + # then MySQL statements will be prepared and executed natively. + # 1:1 sqlcomment and span correlation in instrumentation would + # break, so sqlcomment is not supported for this use case. + # This is here because wrapped cursor is created when application + # side creates cursor. After that, the instrumentor knows what + # kind of cursor was initialized. + if enable_commenter_cursor: + is_prepared = False + if ( + db_api_integration.database_system == "mysql" + and db_api_integration.connect_module.__name__ + == "mysql.connector" + ): + is_prepared = self.is_mysql_connector_cursor_prepared( + wrapped_cursor + ) + if is_prepared: + _logger.warning( + "sqlcomment is not supported for query statements executed by cursors with native prepared statement support. Disabling sqlcommenting for instrumentation of %s.", + db_api_integration.connect_module.__name__, + ) + enable_commenter_cursor = False + return get_traced_cursor_proxy( + wrapped_cursor, + db_api_integration, + enable_commenter=enable_commenter_cursor, + ) + + def is_mysql_connector_cursor_prepared(self, cursor): # pylint: disable=no-self-use + try: + from mysql.connector.cursor_cext import ( # pylint: disable=import-outside-toplevel + CMySQLCursorPrepared, + CMySQLCursorPreparedDict, + CMySQLCursorPreparedNamedTuple, + CMySQLCursorPreparedRaw, + ) + + if type(cursor) in [ + CMySQLCursorPrepared, + CMySQLCursorPreparedDict, + CMySQLCursorPreparedNamedTuple, + CMySQLCursorPreparedRaw, + ]: + return True + + except ImportError as exc: + _logger.warning( + "Could not verify mysql.connector cursor, skipping prepared cursor check: %s", + exc, + ) + + return False + + def __enter__(self): + self.__wrapped__.__enter__() + return self + + def __exit__(self, *args, **kwargs): + self.__wrapped__.__exit__(*args, **kwargs) + + return TracedConnectionProxy(connection, *args, **kwargs) + + +class CursorTracer(dbapi.CursorTracer): + def __init__( + self, + db_api_integration: DatabaseApiIntegration, + enable_commenter: bool = False, + ) -> None: + super().__init__(db_api_integration) + # It's common to have multiple db client cursors per app, + # so enable_commenter is set at the cursor level and used + # during traced query execution for mysql-connector + self._commenter_enabled = enable_commenter + + +def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs): + enable_commenter = kwargs.get("enable_commenter", False) + _cursor_tracer = CursorTracer(db_api_integration, enable_commenter) + + # pylint: disable=abstract-method + class TracedCursorProxy(wrapt.ObjectProxy): + # pylint: disable=unused-argument + def __init__(self, cursor, *args, **kwargs): + wrapt.ObjectProxy.__init__(self, cursor) + + def execute(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self.__wrapped__, self.__wrapped__.execute, *args, **kwargs + ) + + def executemany(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs + ) + + def callproc(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs + ) + + def __enter__(self): + self.__wrapped__.__enter__() + return self + + def __exit__(self, *args, **kwargs): + self.__wrapped__.__exit__(*args, **kwargs) + + return TracedCursorProxy(cursor, *args, **kwargs) + + +def _new_cursor_factory( + db_api: DatabaseApiIntegration = None, + base_factory: CMySQLCursor = None, + tracer_provider: trace_api.TracerProvider = None, + enable_commenter: bool = False, +): + if not db_api: + db_api = DatabaseApiIntegration( __name__, - connection, - self._DATABASE_SYSTEM, - self._CONNECTION_ATTRIBUTES, + MySQLInstrumentor._DATABASE_SYSTEM, + MySQLInstrumentor._CONNECTION_ATTRIBUTES, version=__version__, tracer_provider=tracer_provider, - enable_commenter=enable_commenter, - commenter_options=commenter_options, ) - def uninstrument_connection(self, connection): - """Disable instrumentation in a MySQL connection. - - Args: - connection: The connection to uninstrument. - - Returns: - An uninstrumented connection. - """ - return dbapi.uninstrument_connection(connection) + # Latter is base class for all mysql-connector cursors + base_factory = base_factory or CMySQLCursor + _cursor_tracer = CursorTracer( + db_api, + enable_commenter, + ) + + class TracedCursorFactory(base_factory): + def execute(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self, super().execute, *args, **kwargs + ) + + def executemany(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self, super().executemany, *args, **kwargs + ) + + def callproc(self, *args, **kwargs): + return _cursor_tracer.traced_execution( + self, super().callproc, *args, **kwargs + ) + + return TracedCursorFactory