Skip to content

Commit

Permalink
Add mysql-connector integration factory, own get_traced_* methods
Browse files Browse the repository at this point in the history
  • Loading branch information
tammy-baylis-swi committed Nov 16, 2024
1 parent d8b1ebf commit ca2b884
Showing 1 changed file with 225 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,28 @@
---
"""

from typing import Collection
import logging
from typing import (
Any,
Callable,
Collection,
Dict,
Tuple,
)

import mysql.connector
import wrapt
from mysql.connector.cursor_cext import CMySQLCursor

from opentelemetry import trace as trace_api
from opentelemetry.instrumentation import dbapi
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.mysql.package import _instruments
from opentelemetry.instrumentation.mysql.version import __version__

_logger = logging.getLogger(__name__)
_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory"


class MySQLInstrumentor(BaseInstrumentor):
_CONNECTION_ATTRIBUTES = {
Expand Down Expand Up @@ -146,6 +159,7 @@ def _instrument(self, **kwargs):
self._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
db_api_integration_factory=DatabaseApiIntegration,
enable_commenter=enable_sqlcommenter,
commenter_options=commenter_options,
)
Expand All @@ -162,34 +176,220 @@ def instrument_connection(
enable_commenter=None,
commenter_options=None,
):
"""Enable instrumentation in a MySQL connection.
if not hasattr(connection, "_is_instrumented_by_opentelemetry"):
connection._is_instrumented_by_opentelemetry = False

if not connection._is_instrumented_by_opentelemetry:
setattr(
connection, _OTEL_CURSOR_FACTORY_KEY, connection.cursor_factory
)
connection.cursor_factory = _new_cursor_factory(
tracer_provider=tracer_provider
)
connection._is_instrumented_by_opentelemetry = True
else:
_logger.warning(
"Attempting to instrument mysql-connector connection while already instrumented"
)
return connection

def uninstrument_connection(
self,
connection,
):
connection.cursor_factory = getattr(
connection, _OTEL_CURSOR_FACTORY_KEY, None
)

Args:
connection: The connection to instrument.
tracer_provider: The optional tracer provider to use. If omitted
the current globally configured one is used.
return connection

Returns:
An instrumented connection.
"""
return dbapi.instrument_connection(

class DatabaseApiIntegration(dbapi.DatabaseApiIntegration):
def wrapped_connection(
self,
connect_method: Callable[..., Any],
args: Tuple[Any, Any],
kwargs: Dict[Any, Any],
):
"""Add object proxy to connection object."""
connection = connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
return get_traced_connection_proxy(connection, self)


def get_traced_connection_proxy(
connection, db_api_integration, *args, **kwargs
):
# pylint: disable=abstract-method
class TracedConnectionProxy(wrapt.ObjectProxy):
# pylint: disable=unused-argument
def __init__(self, connection, *args, **kwargs):
wrapt.ObjectProxy.__init__(self, connection)

def __getattribute__(self, name):
if object.__getattribute__(self, name):
return object.__getattribute__(self, name)

return object.__getattribute__(
object.__getattribute__(self, "_connection"), name
)

def cursor(self, *args, **kwargs):
wrapped_cursor = self.__wrapped__.cursor(*args, **kwargs)

# It's common to have multiple db client cursors per app,
# so enable_commenter is set at the cursor level and used
# during traced query execution.
enable_commenter_cursor = db_api_integration.enable_commenter

# If a mysql-connector cursor was created with prepared=True,
# then MySQL statements will be prepared and executed natively.
# 1:1 sqlcomment and span correlation in instrumentation would
# break, so sqlcomment is not supported for this use case.
# This is here because wrapped cursor is created when application
# side creates cursor. After that, the instrumentor knows what
# kind of cursor was initialized.
if enable_commenter_cursor:
is_prepared = False
if (
db_api_integration.database_system == "mysql"
and db_api_integration.connect_module.__name__
== "mysql.connector"
):
is_prepared = self.is_mysql_connector_cursor_prepared(
wrapped_cursor
)
if is_prepared:
_logger.warning(
"sqlcomment is not supported for query statements executed by cursors with native prepared statement support. Disabling sqlcommenting for instrumentation of %s.",
db_api_integration.connect_module.__name__,
)
enable_commenter_cursor = False
return get_traced_cursor_proxy(
wrapped_cursor,
db_api_integration,
enable_commenter=enable_commenter_cursor,
)

def is_mysql_connector_cursor_prepared(self, cursor): # pylint: disable=no-self-use
try:
from mysql.connector.cursor_cext import ( # pylint: disable=import-outside-toplevel
CMySQLCursorPrepared,
CMySQLCursorPreparedDict,
CMySQLCursorPreparedNamedTuple,
CMySQLCursorPreparedRaw,
)

if type(cursor) in [
CMySQLCursorPrepared,
CMySQLCursorPreparedDict,
CMySQLCursorPreparedNamedTuple,
CMySQLCursorPreparedRaw,
]:
return True

except ImportError as exc:
_logger.warning(
"Could not verify mysql.connector cursor, skipping prepared cursor check: %s",
exc,
)

return False

def __enter__(self):
self.__wrapped__.__enter__()
return self

def __exit__(self, *args, **kwargs):
self.__wrapped__.__exit__(*args, **kwargs)

return TracedConnectionProxy(connection, *args, **kwargs)


class CursorTracer(dbapi.CursorTracer):
def __init__(
self,
db_api_integration: DatabaseApiIntegration,
enable_commenter: bool = False,
) -> None:
super().__init__(db_api_integration)
# It's common to have multiple db client cursors per app,
# so enable_commenter is set at the cursor level and used
# during traced query execution for mysql-connector
self._commenter_enabled = enable_commenter


def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
enable_commenter = kwargs.get("enable_commenter", False)
_cursor_tracer = CursorTracer(db_api_integration, enable_commenter)

# pylint: disable=abstract-method
class TracedCursorProxy(wrapt.ObjectProxy):
# pylint: disable=unused-argument
def __init__(self, cursor, *args, **kwargs):
wrapt.ObjectProxy.__init__(self, cursor)

def execute(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.execute, *args, **kwargs
)

def executemany(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.executemany, *args, **kwargs
)

def callproc(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self.__wrapped__, self.__wrapped__.callproc, *args, **kwargs
)

def __enter__(self):
self.__wrapped__.__enter__()
return self

def __exit__(self, *args, **kwargs):
self.__wrapped__.__exit__(*args, **kwargs)

return TracedCursorProxy(cursor, *args, **kwargs)


def _new_cursor_factory(
db_api: DatabaseApiIntegration = None,
base_factory: CMySQLCursor = None,
tracer_provider: trace_api.TracerProvider = None,
enable_commenter: bool = False,
):
if not db_api:
db_api = DatabaseApiIntegration(
__name__,
connection,
self._DATABASE_SYSTEM,
self._CONNECTION_ATTRIBUTES,
MySQLInstrumentor._DATABASE_SYSTEM,
MySQLInstrumentor._CONNECTION_ATTRIBUTES,
version=__version__,
tracer_provider=tracer_provider,
enable_commenter=enable_commenter,
commenter_options=commenter_options,
)

def uninstrument_connection(self, connection):
"""Disable instrumentation in a MySQL connection.
Args:
connection: The connection to uninstrument.
Returns:
An uninstrumented connection.
"""
return dbapi.uninstrument_connection(connection)
# Latter is base class for all mysql-connector cursors
base_factory = base_factory or CMySQLCursor
_cursor_tracer = CursorTracer(
db_api,
enable_commenter,
)

class TracedCursorFactory(base_factory):
def execute(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self, super().execute, *args, **kwargs
)

def executemany(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self, super().executemany, *args, **kwargs
)

def callproc(self, *args, **kwargs):
return _cursor_tracer.traced_execution(
self, super().callproc, *args, **kwargs
)

return TracedCursorFactory

0 comments on commit ca2b884

Please sign in to comment.