Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use is_recording flag in aiopg, asyncpg, dbapi, psycopg2, pymemcache, pymongo, redis, sqlalchemy instrumentations #1212

Merged
merged 15 commits into from
Oct 8, 2020
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class AiopgInstrumentor(BaseInstrumentor):

def _instrument(self, **kwargs):
"""Integrate with PostgreSQL aiopg library.
aiopg: https://github.com/aio-libs/aiopg
aiopg: https://github.com/aio-libs/aiopg
"""

tracer_provider = kwargs.get("tracer_provider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ async def wrapped_connection(
args: typing.Tuple[typing.Any, typing.Any],
kwargs: typing.Dict[typing.Any, typing.Any],
):
"""Add object proxy to connection object.
"""
"""Add object proxy to connection object."""
connection = await connect_method(*args, **kwargs)
# pylint: disable=protected-access
self.get_connection_attributes(connection._conn)
Expand Down Expand Up @@ -109,10 +108,14 @@ async def traced_execution(
self._populate_span(span, *args)
try:
result = await query_method(*args, **kwargs)
span.set_status(Status(StatusCanonicalCode.OK))
if span.is_recording():
span.set_status(Status(StatusCanonicalCode.OK))
return result
except Exception as ex: # pylint: disable=broad-except
span.set_status(Status(StatusCanonicalCode.UNKNOWN, str(ex)))
if span.is_recording():
span.set_status(
Status(StatusCanonicalCode.UNKNOWN, str(ex))
)
raise ex


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ def trace_integration(
tracer_provider: typing.Optional[TracerProvider] = None,
):
"""Integrate with aiopg library.
based on dbapi integration, where replaced sync wrap methods to async

Args:
database_component: Database driver name or
database name "postgreSQL".
database_type: The Database type. For any SQL database, "sql".
connection_attributes: Attribute names for database, port, host and
user in Connection object.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If ommited the current configured one is used.
based on dbapi integration, where replaced sync wrap methods to async

Args:
database_component: Database driver name or
database name "postgreSQL".
database_type: The Database type. For any SQL database, "sql".
connection_attributes: Attribute names for database, port, host and
user in Connection object.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If ommited the current configured one is used.
"""

wrap_connect(
Expand All @@ -87,18 +87,18 @@ def wrap_connect(
tracer_provider: typing.Optional[TracerProvider] = None,
):
"""Integrate with aiopg library.
https://github.com/aio-libs/aiopg

Args:
name: Name of opentelemetry extension for aiopg.
database_component: Database driver name
or database name "postgreSQL".
database_type: The Database type. For any SQL database, "sql".
connection_attributes: Attribute names for database, port, host and
user in Connection object.
version: Version of opentelemetry extension for aiopg.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If ommited the current configured one is used.
https://github.com/aio-libs/aiopg

Args:
name: Name of opentelemetry extension for aiopg.
database_component: Database driver name
or database name "postgreSQL".
database_type: The Database type. For any SQL database, "sql".
connection_attributes: Attribute names for database, port, host and
user in Connection object.
version: Version of opentelemetry extension for aiopg.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If ommited the current configured one is used.
"""

# pylint: disable=unused-argument
Expand All @@ -125,8 +125,8 @@ async def wrap_connect_(


def unwrap_connect():
""""Disable integration with aiopg library.
https://github.com/aio-libs/aiopg
""" "Disable integration with aiopg library.
lzchen marked this conversation as resolved.
Show resolved Hide resolved
https://github.com/aio-libs/aiopg
"""

unwrap(aiopg, "connect")
Expand Down Expand Up @@ -217,7 +217,7 @@ async def wrap_create_pool_(


def unwrap_create_pool():
""""Disable integration with aiopg library.
https://github.com/aio-libs/aiopg
""" "Disable integration with aiopg library.
codeboten marked this conversation as resolved.
Show resolved Hide resolved
https://github.com/aio-libs/aiopg
"""
unwrap(aiopg, "create_pool")
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,40 @@ def test_span_succeeded(self):
trace_api.status.StatusCanonicalCode.OK,
)

def test_span_not_recording(self):
connection_props = {
"database": "testdatabase",
"server_host": "testhost",
"server_port": 123,
"user": "testuser",
}
connection_attributes = {
"database": "database",
"port": "server_port",
"host": "server_host",
"user": "user",
}
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = mock_span
lzchen marked this conversation as resolved.
Show resolved Hide resolved
db_integration = AiopgIntegration(
mock_tracer, "testcomponent", "testtype", connection_attributes
)
mock_connection = async_call(
db_integration.wrapped_connection(
mock_connect, {}, connection_props
)
)
cursor = async_call(mock_connection.cursor())
async_call(cursor.execute("Test query", ("param1Value", False)))
self.assertFalse(mock_span.is_recording())
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

def test_span_failed(self):
db_integration = AiopgIntegration(self.tracer, "testcomponent")
mock_connection = async_call(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,31 +112,34 @@ def _uninstrument(self, **__):
unwrap(asyncpg.Connection, method)

async def _do_execute(self, func, instance, args, kwargs):
span_attributes = _hydrate_span_from_args(
instance, args[0], args[1:] if self.capture_parameters else None,
)
tracer = getattr(asyncpg, _APPLIED)

exception = None

with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:

for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)
if span.is_recording():
span_attributes = _hydrate_span_from_args(
instance,
args[0],
args[1:] if self.capture_parameters else None,
)
for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
exception = exc
raise
finally:
if exception is not None:
span.set_status(
Status(_exception_to_canonical_code(exception))
)
else:
span.set_status(Status(StatusCanonicalCode.OK))
if span.is_recording():
if exception is not None:
span.set_status(
Status(_exception_to_canonical_code(exception))
)
else:
span.set_status(Status(StatusCanonicalCode.OK))

return result
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ def __init__(self, db_api_integration: DatabaseApiIntegration):
def _populate_span(
self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any]
):
if not span.is_recording():
return
statement = args[0] if args else ""
span.set_attribute(
"component", self._db_api_integration.database_component
Expand Down Expand Up @@ -341,10 +343,14 @@ def traced_execution(
self._populate_span(span, *args)
try:
result = query_method(*args, **kwargs)
span.set_status(Status(StatusCanonicalCode.OK))
if span.is_recording():
span.set_status(Status(StatusCanonicalCode.OK))
return result
except Exception as ex: # pylint: disable=broad-except
span.set_status(Status(StatusCanonicalCode.UNKNOWN, str(ex)))
if span.is_recording():
span.set_status(
Status(StatusCanonicalCode.UNKNOWN, str(ex))
)
raise ex


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,38 @@ def test_span_succeeded(self):
trace_api.status.StatusCanonicalCode.OK,
)

def test_span_not_recording(self):
connection_props = {
"database": "testdatabase",
"server_host": "testhost",
"server_port": 123,
"user": "testuser",
}
connection_attributes = {
"database": "database",
"port": "server_port",
"host": "server_host",
"user": "user",
}
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = mock_span
db_integration = dbapi.DatabaseApiIntegration(
mock_tracer, "testcomponent", "testtype", connection_attributes
)
mock_connection = db_integration.wrapped_connection(
mock_connect, {}, connection_props
)
cursor = mock_connection.cursor()
cursor.execute("Test query", ("param1Value", False))
self.assertFalse(mock_span.is_recording())
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

def test_span_failed(self):
db_integration = dbapi.DatabaseApiIntegration(
self.tracer, "testcomponent"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,29 @@ def test_instrumentor(self, mock_connect):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

@mock.patch("psycopg2.connect")
# pylint: disable=unused-argument
def test_not_recording(self, mock_connect):
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = mock_span
Psycopg2Instrumentor().instrument()
with mock.patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
cnx = psycopg2.connect(database="test")
cursor = cnx.cursor()
query = "SELECT * FROM test"
cursor.execute(query)
self.assertFalse(mock_span.is_recording())
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

Psycopg2Instrumentor().uninstrument()

@mock.patch("psycopg2.connect")
# pylint: disable=unused-argument
def test_custom_tracer_provider(self, mock_connect):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,14 @@


def _set_connection_attributes(span, instance):
if not span.is_recording():
return
for key, value in _get_address_attributes(instance).items():
span.set_attribute(key, value)


def _with_tracer_wrapper(func):
"""Helper for providing tracer for wrapper functions.
"""
"""Helper for providing tracer for wrapper functions."""

def _with_tracer(tracer, cmd):
def wrapper(wrapped, instance, args, kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock

import pymemcache
from pymemcache.exceptions import (
Expand Down Expand Up @@ -84,6 +85,23 @@ def test_set_success(self):

self.check_spans(spans, 1, ["set key"])

def test_set_not_recording(self):
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = mock_span
with mock.patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
client = self.make_client([b"STORED\r\n"])
result = client.set(b"key", b"value", noreply=False)
self.assertTrue(result)
self.assertFalse(mock_span.is_recording())
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

def test_get_many_none_found(self):
client = self.make_client([b"END\r\n"])
result = client.get_many([b"key1", b"key2"])
Expand Down
Loading