Skip to content

Commit

Permalink
Use is_recording flag in aiopg, asyncpg, dbapi, psycopg2, pymemcache,…
Browse files Browse the repository at this point in the history
… pymongo, redis, sqlalchemy instrumentations (#1212)
  • Loading branch information
lzchen authored Oct 8, 2020
1 parent 0e852ea commit affe911
Show file tree
Hide file tree
Showing 21 changed files with 304 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class AiopgInstrumentor(BaseInstrumentor):

def _instrument(self, **kwargs):
"""Integrate with PostgreSQL aiopg library.
aiopg: https://github.com/aio-libs/aiopg
aiopg: https://github.com/aio-libs/aiopg
"""

tracer_provider = kwargs.get("tracer_provider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ async def wrapped_connection(
args: typing.Tuple[typing.Any, typing.Any],
kwargs: typing.Dict[typing.Any, typing.Any],
):
"""Add object proxy to connection object.
"""
"""Add object proxy to connection object."""
connection = await connect_method(*args, **kwargs)
# pylint: disable=protected-access
self.get_connection_attributes(connection._conn)
Expand Down Expand Up @@ -109,10 +108,14 @@ async def traced_execution(
self._populate_span(span, *args)
try:
result = await query_method(*args, **kwargs)
span.set_status(Status(StatusCanonicalCode.OK))
if span.is_recording():
span.set_status(Status(StatusCanonicalCode.OK))
return result
except Exception as ex: # pylint: disable=broad-except
span.set_status(Status(StatusCanonicalCode.UNKNOWN, str(ex)))
if span.is_recording():
span.set_status(
Status(StatusCanonicalCode.UNKNOWN, str(ex))
)
raise ex


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ def trace_integration(
tracer_provider: typing.Optional[TracerProvider] = None,
):
"""Integrate with aiopg library.
based on dbapi integration, where replaced sync wrap methods to async
Args:
database_component: Database driver name or
database name "postgreSQL".
database_type: The Database type. For any SQL database, "sql".
connection_attributes: Attribute names for database, port, host and
user in Connection object.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If ommited the current configured one is used.
based on dbapi integration, where replaced sync wrap methods to async
Args:
database_component: Database driver name or
database name "postgreSQL".
database_type: The Database type. For any SQL database, "sql".
connection_attributes: Attribute names for database, port, host and
user in Connection object.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If ommited the current configured one is used.
"""

wrap_connect(
Expand All @@ -87,18 +87,18 @@ def wrap_connect(
tracer_provider: typing.Optional[TracerProvider] = None,
):
"""Integrate with aiopg library.
https://github.com/aio-libs/aiopg
Args:
name: Name of opentelemetry extension for aiopg.
database_component: Database driver name
or database name "postgreSQL".
database_type: The Database type. For any SQL database, "sql".
connection_attributes: Attribute names for database, port, host and
user in Connection object.
version: Version of opentelemetry extension for aiopg.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If ommited the current configured one is used.
https://github.com/aio-libs/aiopg
Args:
name: Name of opentelemetry extension for aiopg.
database_component: Database driver name
or database name "postgreSQL".
database_type: The Database type. For any SQL database, "sql".
connection_attributes: Attribute names for database, port, host and
user in Connection object.
version: Version of opentelemetry extension for aiopg.
tracer_provider: The :class:`opentelemetry.trace.TracerProvider` to
use. If ommited the current configured one is used.
"""

# pylint: disable=unused-argument
Expand All @@ -125,8 +125,8 @@ async def wrap_connect_(


def unwrap_connect():
""""Disable integration with aiopg library.
https://github.com/aio-libs/aiopg
"""Disable integration with aiopg library.
https://github.com/aio-libs/aiopg
"""

unwrap(aiopg, "connect")
Expand Down Expand Up @@ -217,7 +217,7 @@ async def wrap_create_pool_(


def unwrap_create_pool():
""""Disable integration with aiopg library.
https://github.com/aio-libs/aiopg
"""Disable integration with aiopg library.
https://github.com/aio-libs/aiopg
"""
unwrap(aiopg, "create_pool")
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,40 @@ def test_span_succeeded(self):
trace_api.status.StatusCanonicalCode.OK,
)

def test_span_not_recording(self):
connection_props = {
"database": "testdatabase",
"server_host": "testhost",
"server_port": 123,
"user": "testuser",
}
connection_attributes = {
"database": "database",
"port": "server_port",
"host": "server_host",
"user": "user",
}
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = True
db_integration = AiopgIntegration(
mock_tracer, "testcomponent", "testtype", connection_attributes
)
mock_connection = async_call(
db_integration.wrapped_connection(
mock_connect, {}, connection_props
)
)
cursor = async_call(mock_connection.cursor())
async_call(cursor.execute("Test query", ("param1Value", False)))
self.assertFalse(mock_span.is_recording())
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

def test_span_failed(self):
db_integration = AiopgIntegration(self.tracer, "testcomponent")
mock_connection = async_call(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,31 +112,34 @@ def _uninstrument(self, **__):
unwrap(asyncpg.Connection, method)

async def _do_execute(self, func, instance, args, kwargs):
span_attributes = _hydrate_span_from_args(
instance, args[0], args[1:] if self.capture_parameters else None,
)
tracer = getattr(asyncpg, _APPLIED)

exception = None

with tracer.start_as_current_span(
"postgresql", kind=SpanKind.CLIENT
) as span:

for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)
if span.is_recording():
span_attributes = _hydrate_span_from_args(
instance,
args[0],
args[1:] if self.capture_parameters else None,
)
for attribute, value in span_attributes.items():
span.set_attribute(attribute, value)

try:
result = await func(*args, **kwargs)
except Exception as exc: # pylint: disable=W0703
exception = exc
raise
finally:
if exception is not None:
span.set_status(
Status(_exception_to_canonical_code(exception))
)
else:
span.set_status(Status(StatusCanonicalCode.OK))
if span.is_recording():
if exception is not None:
span.set_status(
Status(_exception_to_canonical_code(exception))
)
else:
span.set_status(Status(StatusCanonicalCode.OK))

return result
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def test_not_recording(self):
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = mock_span
mock_tracer.use_span.return_value.__exit__ = True
with patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
ec2 = boto.ec2.connect_to_region("us-west-2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def test_not_recording(self):
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = mock_span
mock_tracer.use_span.return_value.__exit__ = True
with patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
ec2 = self.session.create_client("ec2", region_name="us-west-2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ def __init__(self, db_api_integration: DatabaseApiIntegration):
def _populate_span(
self, span: trace_api.Span, *args: typing.Tuple[typing.Any, typing.Any]
):
if not span.is_recording():
return
statement = args[0] if args else ""
span.set_attribute(
"component", self._db_api_integration.database_component
Expand Down Expand Up @@ -341,10 +343,14 @@ def traced_execution(
self._populate_span(span, *args)
try:
result = query_method(*args, **kwargs)
span.set_status(Status(StatusCanonicalCode.OK))
if span.is_recording():
span.set_status(Status(StatusCanonicalCode.OK))
return result
except Exception as ex: # pylint: disable=broad-except
span.set_status(Status(StatusCanonicalCode.UNKNOWN, str(ex)))
if span.is_recording():
span.set_status(
Status(StatusCanonicalCode.UNKNOWN, str(ex))
)
raise ex


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,38 @@ def test_span_succeeded(self):
trace_api.status.StatusCanonicalCode.OK,
)

def test_span_not_recording(self):
connection_props = {
"database": "testdatabase",
"server_host": "testhost",
"server_port": 123,
"user": "testuser",
}
connection_attributes = {
"database": "database",
"port": "server_port",
"host": "server_host",
"user": "user",
}
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = True
db_integration = dbapi.DatabaseApiIntegration(
mock_tracer, "testcomponent", "testtype", connection_attributes
)
mock_connection = db_integration.wrapped_connection(
mock_connect, {}, connection_props
)
cursor = mock_connection.cursor()
cursor.execute("Test query", ("param1Value", False))
self.assertFalse(mock_span.is_recording())
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

def test_span_failed(self):
db_integration = dbapi.DatabaseApiIntegration(
self.tracer, "testcomponent"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_not_recording(self):
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = mock_span
mock_tracer.use_span.return_value.__exit__ = True
with patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
Client().get("/traced/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,29 @@ def test_instrumentor(self, mock_connect):
spans_list = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans_list), 1)

@mock.patch("psycopg2.connect")
# pylint: disable=unused-argument
def test_not_recording(self, mock_connect):
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = True
Psycopg2Instrumentor().instrument()
with mock.patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
cnx = psycopg2.connect(database="test")
cursor = cnx.cursor()
query = "SELECT * FROM test"
cursor.execute(query)
self.assertFalse(mock_span.is_recording())
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

Psycopg2Instrumentor().uninstrument()

@mock.patch("psycopg2.connect")
# pylint: disable=unused-argument
def test_custom_tracer_provider(self, mock_connect):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,14 @@


def _set_connection_attributes(span, instance):
if not span.is_recording():
return
for key, value in _get_address_attributes(instance).items():
span.set_attribute(key, value)


def _with_tracer_wrapper(func):
"""Helper for providing tracer for wrapper functions.
"""
"""Helper for providing tracer for wrapper functions."""

def _with_tracer(tracer, cmd):
def wrapper(wrapped, instance, args, kwargs):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock

import pymemcache
from pymemcache.exceptions import (
Expand Down Expand Up @@ -84,6 +85,23 @@ def test_set_success(self):

self.check_spans(spans, 1, ["set key"])

def test_set_not_recording(self):
mock_tracer = mock.Mock()
mock_span = mock.Mock()
mock_span.is_recording.return_value = False
mock_tracer.start_span.return_value = mock_span
mock_tracer.use_span.return_value.__enter__ = mock_span
mock_tracer.use_span.return_value.__exit__ = True
with mock.patch("opentelemetry.trace.get_tracer") as tracer:
tracer.return_value = mock_tracer
client = self.make_client([b"STORED\r\n"])
result = client.set(b"key", b"value", noreply=False)
self.assertTrue(result)
self.assertFalse(mock_span.is_recording())
self.assertTrue(mock_span.is_recording.called)
self.assertFalse(mock_span.set_attribute.called)
self.assertFalse(mock_span.set_status.called)

def test_get_many_none_found(self):
client = self.make_client([b"END\r\n"])
result = client.get_many([b"key1", b"key2"])
Expand Down
Loading

0 comments on commit affe911

Please sign in to comment.