Skip to content

Commit

Permalink
feat(integrations): Add integration for asyncpg (#2314)
Browse files Browse the repository at this point in the history
So far this records every statement that is directly issued, as well as the SQL statements that are used for cursors and prepared statements.
  • Loading branch information
mimre25 authored and sentrivana committed Sep 18, 2023
1 parent e94deb4 commit 27e06b3
Show file tree
Hide file tree
Showing 9 changed files with 788 additions and 1 deletion.
102 changes: 102 additions & 0 deletions .github/workflows/test-integration-asyncpg.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
name: Test asyncpg

on:
push:
branches:
- master
- release/**

pull_request:

# Cancel in progress workflows on pull_requests.
# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

permissions:
contents: read

env:
BUILD_CACHE_KEY: ${{ github.sha }}
CACHED_BUILD_PATHS: |
${{ github.workspace }}/dist-serverless
jobs:
test:
name: asyncpg, python ${{ matrix.python-version }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
timeout-minutes: 30

strategy:
fail-fast: false
matrix:
python-version: ["3.7","3.8","3.9","3.10","3.11"]
# python3.6 reached EOL and is no longer being supported on
# new versions of hosted runners on Github Actions
# ubuntu-20.04 is the last version that supported python3.6
# see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877
os: [ubuntu-20.04]
services:
postgres:
image: postgres
env:
POSTGRES_PASSWORD: sentry
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
# Maps tcp port 5432 on service container to the host
ports:
- 5432:5432
env:
SENTRY_PYTHON_TEST_POSTGRES_USER: postgres
SENTRY_PYTHON_TEST_POSTGRES_PASSWORD: sentry
SENTRY_PYTHON_TEST_POSTGRES_NAME: ci_test
SENTRY_PYTHON_TEST_POSTGRES_HOST: localhost

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Setup Test Env
run: |
pip install coverage "tox>=3,<4"
- name: Test asyncpg
uses: nick-fields/retry@v2
with:
timeout_minutes: 15
max_attempts: 2
retry_wait_seconds: 5
shell: bash
command: |
set -x # print commands that are executed
coverage erase
# Run tests
./scripts/runtox.sh "py${{ matrix.python-version }}-asyncpg" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch &&
coverage combine .coverage* &&
coverage xml -i
- uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.xml


check_required_tests:
name: All asyncpg tests passed or skipped
needs: test
# Always run this, even if a dependent job failed
if: always()
runs-on: ubuntu-20.04
steps:
- name: Check for failures
if: contains(needs.test.result, 'failure')
run: |
echo "One of the dependent jobs has failed. You may need to re-run it." && exit 1
5 changes: 4 additions & 1 deletion scripts/split-tox-gh-actions/split-tox-gh-actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
TEMPLATE_SNIPPET_TEST = TEMPLATE_DIR / "ci-yaml-test-snippet.txt"
TEMPLATE_SNIPPET_TEST_PY27 = TEMPLATE_DIR / "ci-yaml-test-py27-snippet.txt"

FRAMEWORKS_NEEDING_POSTGRES = ["django"]
FRAMEWORKS_NEEDING_POSTGRES = [
"django",
"asyncpg",
]

MATRIX_DEFINITION = """
strategy:
Expand Down
7 changes: 7 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ class SPANDATA:
Example: myDatabase
"""

DB_USER = "db.user"
"""
The name of the database user used for connecting to the database.
See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/database.md
Example: my_user
"""

DB_OPERATION = "db.operation"
"""
The name of the operation being executed, e.g. the MongoDB command name such as findAndModify, or the SQL keyword.
Expand Down
202 changes: 202 additions & 0 deletions sentry_sdk/integrations/asyncpg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
from __future__ import annotations
import contextlib
from typing import Any, TypeVar, Callable, Awaitable, Iterator

from asyncpg.cursor import BaseCursor # type: ignore

from sentry_sdk import Hub
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.tracing import Span
from sentry_sdk.tracing_utils import record_sql_queries
from sentry_sdk.utils import parse_version, capture_internal_exceptions

try:
import asyncpg # type: ignore[import]

except ImportError:
raise DidNotEnable("asyncpg not installed.")

# asyncpg.__version__ is a string containing the semantic version in the form of "<major>.<minor>.<patch>"
asyncpg_version = parse_version(asyncpg.__version__)

if asyncpg_version is not None and asyncpg_version < (0, 23, 0):
raise DidNotEnable("asyncpg >= 0.23.0 required")


class AsyncPGIntegration(Integration):
identifier = "asyncpg"
_record_params = False

def __init__(self, *, record_params: bool = False):
AsyncPGIntegration._record_params = record_params

@staticmethod
def setup_once() -> None:
asyncpg.Connection.execute = _wrap_execute(
asyncpg.Connection.execute,
)

asyncpg.Connection._execute = _wrap_connection_method(
asyncpg.Connection._execute
)
asyncpg.Connection._executemany = _wrap_connection_method(
asyncpg.Connection._executemany, executemany=True
)
asyncpg.Connection.cursor = _wrap_cursor_creation(asyncpg.Connection.cursor)
asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare)
asyncpg.connect_utils._connect_addr = _wrap_connect_addr(
asyncpg.connect_utils._connect_addr
)


T = TypeVar("T")


def _wrap_execute(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
async def _inner(*args: Any, **kwargs: Any) -> T:
hub = Hub.current
integration = hub.get_integration(AsyncPGIntegration)

# Avoid recording calls to _execute twice.
# Calls to Connection.execute with args also call
# Connection._execute, which is recorded separately
# args[0] = the connection object, args[1] is the query
if integration is None or len(args) > 2:
return await f(*args, **kwargs)

query = args[1]
with record_sql_queries(hub, None, query, None, None, executemany=False):
res = await f(*args, **kwargs)
return res

return _inner


SubCursor = TypeVar("SubCursor", bound=BaseCursor)


@contextlib.contextmanager
def _record(
hub: Hub,
cursor: SubCursor | None,
query: str,
params_list: tuple[Any, ...] | None,
*,
executemany: bool = False,
) -> Iterator[Span]:
integration = hub.get_integration(AsyncPGIntegration)
if not integration._record_params:
params_list = None

param_style = "pyformat" if params_list else None

with record_sql_queries(
hub,
cursor,
query,
params_list,
param_style,
executemany=executemany,
record_cursor_repr=cursor is not None,
) as span:
yield span


def _wrap_connection_method(
f: Callable[..., Awaitable[T]], *, executemany: bool = False
) -> Callable[..., Awaitable[T]]:
async def _inner(*args: Any, **kwargs: Any) -> T:
hub = Hub.current
integration = hub.get_integration(AsyncPGIntegration)

if integration is None:
return await f(*args, **kwargs)

query = args[1]
params_list = args[2] if len(args) > 2 else None
with _record(hub, None, query, params_list, executemany=executemany) as span:
_set_db_data(span, args[0])
res = await f(*args, **kwargs)
return res

return _inner


def _wrap_cursor_creation(f: Callable[..., T]) -> Callable[..., T]:
def _inner(*args: Any, **kwargs: Any) -> T: # noqa: N807
hub = Hub.current
integration = hub.get_integration(AsyncPGIntegration)

if integration is None:
return f(*args, **kwargs)

query = args[1]
params_list = args[2] if len(args) > 2 else None

with _record(
hub,
None,
query,
params_list,
executemany=False,
) as span:
_set_db_data(span, args[0])
res = f(*args, **kwargs)
span.set_data("db.cursor", res)

return res

return _inner


def _wrap_connect_addr(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
async def _inner(*args: Any, **kwargs: Any) -> T:
hub = Hub.current
integration = hub.get_integration(AsyncPGIntegration)

if integration is None:
return await f(*args, **kwargs)

user = kwargs["params"].user
database = kwargs["params"].database

with hub.start_span(op=OP.DB, description="connect") as span:
span.set_data(SPANDATA.DB_SYSTEM, "postgresql")
addr = kwargs.get("addr")
if addr:
try:
span.set_data(SPANDATA.SERVER_ADDRESS, addr[0])
span.set_data(SPANDATA.SERVER_PORT, addr[1])
except IndexError:
pass
span.set_data(SPANDATA.DB_NAME, database)
span.set_data(SPANDATA.DB_USER, user)

with capture_internal_exceptions():
hub.add_breadcrumb(message="connect", category="query", data=span._data)
res = await f(*args, **kwargs)

return res

return _inner


def _set_db_data(span: Span, conn: Any) -> None:
span.set_data(SPANDATA.DB_SYSTEM, "postgresql")

addr = conn._addr
if addr:
try:
span.set_data(SPANDATA.SERVER_ADDRESS, addr[0])
span.set_data(SPANDATA.SERVER_PORT, addr[1])
except IndexError:
pass

database = conn._params.database
if database:
span.set_data(SPANDATA.DB_NAME, database)

user = conn._params.user
if user:
span.set_data(SPANDATA.DB_USER, user)
3 changes: 3 additions & 0 deletions sentry_sdk/tracing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def record_sql_queries(
params_list, # type: Any
paramstyle, # type: Optional[str]
executemany, # type: bool
record_cursor_repr=False, # type: bool
):
# type: (...) -> Generator[sentry_sdk.tracing.Span, None, None]

Expand All @@ -132,6 +133,8 @@ def record_sql_queries(
data["db.paramstyle"] = paramstyle
if executemany:
data["db.executemany"] = True
if record_cursor_repr and cursor is not None:
data["db.cursor"] = cursor

with capture_internal_exceptions():
hub.add_breadcrumb(message=query, category="query", data=data)
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def get_file_text(file_name):
extras_require={
"aiohttp": ["aiohttp>=3.5"],
"arq": ["arq>=0.23"],
"asyncpg": ["asyncpg>=0.23"],
"beam": ["apache-beam>=2.12"],
"bottle": ["bottle>=0.12.13"],
"celery": ["celery>=3"],
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/asyncpg/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("asyncpg")
Loading

0 comments on commit 27e06b3

Please sign in to comment.