Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ext/psycopg2: Implement BaseInstrumentor interface #694

Merged
merged 2 commits into from
May 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 37 additions & 42 deletions ext/opentelemetry-ext-dbapi/src/opentelemetry/ext/dbapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def instrument_connection(
connection_attributes=connection_attributes,
)
db_integration.get_connection_attributes(connection)
return TracedConnectionProxy(connection, db_integration)
return get_traced_connection_proxy(connection, db_integration)


def uninstrument_connection(connection):
Expand Down Expand Up @@ -227,7 +227,7 @@ def wrapped_connection(
"""
connection = connect_method(*args, **kwargs)
self.get_connection_attributes(connection)
return TracedConnectionProxy(connection, self)
return get_traced_connection_proxy(connection, self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for these wrapper functions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - see my above comment. tl;dr: Can't assign new properties to the psycopg2 connection object


def get_connection_attributes(self, connection):
# Populate span fields using connection
Expand Down Expand Up @@ -260,23 +260,21 @@ def get_connection_attributes(self, connection):
self.span_attributes["net.peer.port"] = port


# pylint: disable=abstract-method
class TracedConnectionProxy(wrapt.ObjectProxy):
# pylint: disable=unused-argument
def __init__(
self,
connection,
db_api_integration: DatabaseApiIntegration,
*args,
**kwargs
):
wrapt.ObjectProxy.__init__(self, connection)
self._db_api_integration = db_api_integration
def get_traced_connection_proxy(
connection, db_api_integration, *args, **kwargs
):
# pylint: disable=abstract-method
class TracedConnectionProxy(wrapt.ObjectProxy):
# pylint: disable=unused-argument
def __init__(self, connection, *args, **kwargs):
wrapt.ObjectProxy.__init__(self, connection)

def cursor(self, *args, **kwargs):
return get_traced_cursor_proxy(
self.__wrapped__.cursor(*args, **kwargs), db_api_integration
)

def cursor(self, *args, **kwargs):
return TracedCursorProxy(
self.__wrapped__.cursor(*args, **kwargs), self._db_api_integration
)
return TracedConnectionProxy(connection, *args, **kwargs)


class TracedCursor:
Expand Down Expand Up @@ -323,31 +321,28 @@ def traced_execution(
raise ex


# pylint: disable=abstract-method
class TracedCursorProxy(wrapt.ObjectProxy):
def get_traced_cursor_proxy(cursor, db_api_integration, *args, **kwargs):
_traced_cursor = TracedCursor(db_api_integration)
# pylint: disable=abstract-method
class TracedCursorProxy(wrapt.ObjectProxy):

# pylint: disable=unused-argument
def __init__(
self,
cursor,
db_api_integration: DatabaseApiIntegration,
*args,
**kwargs
):
wrapt.ObjectProxy.__init__(self, cursor)
self._traced_cursor = TracedCursor(db_api_integration)
# pylint: disable=unused-argument
def __init__(self, cursor, *args, **kwargs):
wrapt.ObjectProxy.__init__(self, cursor)

def execute(self, *args, **kwargs):
return self._traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
)
def execute(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.execute, *args, **kwargs
)

def executemany(self, *args, **kwargs):
return self._traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
)
def executemany(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.executemany, *args, **kwargs
)

def callproc(self, *args, **kwargs):
return self._traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
)
def callproc(self, *args, **kwargs):
return _traced_cursor.traced_execution(
self.__wrapped__.callproc, *args, **kwargs
)

return TracedCursorProxy(cursor, *args, **kwargs)
4 changes: 0 additions & 4 deletions ext/opentelemetry-ext-dbapi/tests/test_dbapi_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,13 @@ def test_wrap_connect(self, mock_dbapi):
dbapi.wrap_connect(self.tracer, mock_dbapi, "connect", "-")
connection = mock_dbapi.connect()
self.assertEqual(mock_dbapi.connect.call_count, 1)
self.assertIsInstance(connection, dbapi.TracedConnectionProxy)
self.assertIsInstance(connection.__wrapped__, mock.Mock)

@mock.patch("opentelemetry.ext.dbapi")
def test_unwrap_connect(self, mock_dbapi):
dbapi.wrap_connect(self.tracer, mock_dbapi, "connect", "-")
connection = mock_dbapi.connect()
self.assertEqual(mock_dbapi.connect.call_count, 1)
self.assertIsInstance(connection, dbapi.TracedConnectionProxy)

dbapi.unwrap_connect(mock_dbapi, "connect")
connection = mock_dbapi.connect()
Expand All @@ -145,7 +143,6 @@ def test_instrument_connection(self):
# Avoid get_attributes failing because can't concatenate mock
connection.database = "-"
connection2 = dbapi.instrument_connection(self.tracer, connection, "-")
self.assertIsInstance(connection2, dbapi.TracedConnectionProxy)
self.assertIs(connection2.__wrapped__, connection)

def test_uninstrument_connection(self):
Expand All @@ -154,7 +151,6 @@ def test_uninstrument_connection(self):
# be concatenated
connection.database = "-"
connection2 = dbapi.instrument_connection(self.tracer, connection, "-")
self.assertIsInstance(connection2, dbapi.TracedConnectionProxy)
self.assertIs(connection2.__wrapped__, connection)

connection3 = dbapi.uninstrument_connection(connection2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import psycopg2

from opentelemetry import trace as trace_api
from opentelemetry.ext.psycopg2 import trace_integration
from opentelemetry.ext.psycopg2 import Psycopg2Instrumentor
from opentelemetry.test.test_base import TestBase

POSTGRES_HOST = os.getenv("POSTGRESQL_HOST ", "localhost")
Expand All @@ -35,7 +35,7 @@ def setUpClass(cls):
cls._connection = None
cls._cursor = None
cls._tracer = cls.tracer_provider.get_tracer(__name__)
trace_integration(cls.tracer_provider)
Psycopg2Instrumentor().instrument(tracer_provider=cls.tracer_provider)
cls._connection = psycopg2.connect(
dbname=POSTGRES_DB_NAME,
user=POSTGRES_USER,
Expand All @@ -52,6 +52,7 @@ def tearDownClass(cls):
cls._cursor.close()
if cls._connection:
cls._connection.close()
Psycopg2Instrumentor().uninstrument()

def validate_spans(self):
spans = self.memory_exporter.get_finished_spans()
Expand Down
2 changes: 2 additions & 0 deletions ext/opentelemetry-ext-psycopg2/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Implement instrumentor interface, enabling auto-instrumentation ([#694]https://github.com/open-telemetry/opentelemetry-python/pull/694)

## 0.4a0

Released 2020-02-21
Expand Down
10 changes: 10 additions & 0 deletions ext/opentelemetry-ext-psycopg2/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,18 @@ package_dir=
packages=find_namespace:
install_requires =
opentelemetry-api == 0.8.dev0
opentelemetry-ext-dbapi == 0.8.dev0
opentelemetry-auto-instrumentation == 0.8.dev0
psycopg2-binary >= 2.7.3.1
wrapt >= 1.0.0, < 2.0.0

[options.extras_require]
test =
opentelemetry-test == 0.8.dev0

[options.packages.find]
where = src

[options.entry_points]
opentelemetry_instrumentor =
psycopg2 = opentelemetry.ext.psycopg2:Psycopg2Instrumentor
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
# limitations under the License.

"""
The integration with PostgreSQL supports the `Psycopg`_ library and is specified
to ``trace_integration`` using ``'PostgreSQL'``.
The integration with PostgreSQL supports the `Psycopg`_ library, it can be enabled by
using ``Psycopg2Instrumentor``.

.. _Psycopg: http://initd.org/psycopg/

Expand All @@ -26,11 +26,12 @@
import psycopg2
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.trace.ext.psycopg2 import trace_integration
from opentelemetry.trace.ext.psycopg2 import Psycopg2Instrumentor

trace.set_tracer_provider(TracerProvider())

trace_integration()
Psycopg2Instrumentor().instrument()

cnx = psycopg2.connect(database='Database')
cursor = cnx.cursor()
cursor.execute("INSERT INTO test (testField) VALUES (123)")
Expand All @@ -41,83 +42,77 @@
---
"""

import logging
import typing

import psycopg2
import wrapt
from psycopg2.sql import Composable

from opentelemetry.ext.dbapi import DatabaseApiIntegration, TracedCursor
from opentelemetry.auto_instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.ext import dbapi
from opentelemetry.ext.psycopg2.version import __version__
from opentelemetry.trace import Tracer, get_tracer

logger = logging.getLogger(__name__)

DATABASE_COMPONENT = "postgresql"
DATABASE_TYPE = "sql"
from opentelemetry.trace import TracerProvider, get_tracer


def trace_integration(tracer_provider=None):
"""Integrate with PostgreSQL Psycopg library.
Psycopg: http://initd.org/psycopg/
"""

tracer = get_tracer(__name__, __version__, tracer_provider)

connection_attributes = {
class Psycopg2Instrumentor(BaseInstrumentor):
_CONNECTION_ATTRIBUTES = {
"database": "info.dbname",
"port": "info.port",
"host": "info.host",
"user": "info.user",
}
db_integration = DatabaseApiIntegration(
tracer,
DATABASE_COMPONENT,
database_type=DATABASE_TYPE,
connection_attributes=connection_attributes,
)

# pylint: disable=unused-argument
def wrap_connect(
connect_func: typing.Callable[..., any],
instance: typing.Any,
args: typing.Tuple[any, any],
kwargs: typing.Dict[any, any],
):
connection = connect_func(*args, **kwargs)
db_integration.get_connection_attributes(connection)
connection.cursor_factory = PsycopgTraceCursor
return connection

try:
wrapt.wrap_function_wrapper(psycopg2, "connect", wrap_connect)
except Exception as ex: # pylint: disable=broad-except
logger.warning("Failed to integrate with pyscopg2. %s", str(ex))

class PsycopgTraceCursor(psycopg2.extensions.cursor):
def __init__(self, *args, **kwargs):
self._traced_cursor = TracedCursor(db_integration)
super(PsycopgTraceCursor, self).__init__(*args, **kwargs)

# pylint: disable=redefined-builtin
def execute(self, query, vars=None):
if isinstance(query, Composable):
query = query.as_string(self)
return self._traced_cursor.traced_execution(
super(PsycopgTraceCursor, self).execute, query, vars
)

# pylint: disable=redefined-builtin
def executemany(self, query, vars):
if isinstance(query, Composable):
query = query.as_string(self)
return self._traced_cursor.traced_execution(
super(PsycopgTraceCursor, self).executemany, query, vars
)

# pylint: disable=redefined-builtin
def callproc(self, procname, vars=None):
return self._traced_cursor.traced_execution(
super(PsycopgTraceCursor, self).callproc, procname, vars
)

_DATABASE_COMPONENT = "postgresql"
_DATABASE_TYPE = "sql"

def _instrument(self, **kwargs):
"""Integrate with PostgreSQL Psycopg library.
Psycopg: http://initd.org/psycopg/
"""

tracer_provider = kwargs.get("tracer_provider")

tracer = get_tracer(__name__, __version__, tracer_provider)

dbapi.wrap_connect(
tracer,
psycopg2,
"connect",
self._DATABASE_COMPONENT,
self._DATABASE_TYPE,
self._CONNECTION_ATTRIBUTES,
)

def _uninstrument(self, **kwargs):
""""Disable Psycopg2 instrumentation"""
dbapi.unwrap_connect(psycopg2, "connect")

# pylint:disable=no-self-use
def instrument_connection(self, connection):
"""Enable instrumentation in a Psycopg2 connection.

Args:
connection: The connection to instrument.

Returns:
An instrumented connection.
"""
tracer = get_tracer(__name__, __version__)

return dbapi.instrument_connection(
tracer,
connection,
self._DATABASE_COMPONENT,
self._DATABASE_TYPE,
self._CONNECTION_ATTRIBUTES,
)

def uninstrument_connection(self, connection):
"""Disable instrumentation in a Psycopg2 connection.

Args:
connection: The connection to uninstrument.

Returns:
An uninstrumented connection.
"""
return dbapi.uninstrument_connection(connection)
Loading