Skip to content

Commit

Permalink
Upgrade postgres integration to use psycopg3
Browse files Browse the repository at this point in the history
  • Loading branch information
jmeunier28 committed Jul 28, 2023
1 parent c9f17ef commit 4f9f6c0
Show file tree
Hide file tree
Showing 26 changed files with 527 additions and 361 deletions.
3 changes: 3 additions & 0 deletions .ddev/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ mmh3 = ['CC0-1.0']
paramiko = ['LGPL-2.1-only']
# https://github.com/oracle/python-oracledb/blob/main/LICENSE.txt
oracledb = ['Apache-2.0']
# https://github.com/psycopg/psycopg/blob/master/LICENSE.txt
psycopg-binary = ['LGPL-3.0-only', 'BSD-3-Clause']
# https://github.com/psycopg/psycopg2/blob/master/LICENSE
# https://github.com/psycopg/psycopg2/blob/master/doc/COPYING.LESSER
psycopg2-binary = ['LGPL-3.0-only', 'BSD-3-Clause']
Expand Down Expand Up @@ -119,6 +121,7 @@ packaging = 'https://github.com/pypa/packaging'
paramiko = 'https://github.com/paramiko/paramiko'
protobuf = 'https://github.com/protocolbuffers/protobuf'
psycopg2-binary = 'https://github.com/psycopg/psycopg2'
psycopg-binary = 'https://github.com/psycopg/psycopg'
pycryptodomex = 'https://github.com/Legrandin/pycryptodome'
redis = 'https://github.com/redis/redis-py'
requests = 'https://github.com/psf/requests'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protobuf==3.17.3; python_version < '3.0'
protobuf==3.20.2; python_version > '3.0'
psutil==5.9.0
psycopg2-binary==2.8.6; sys_platform != 'darwin' or platform_machine != 'arm64'
psycopg-binary==3.1.9; python_version > '3.0'
pyasn1==0.4.6
pycryptodomex==3.10.1
pydantic==2.0.2; python_version > '3.0'
Expand Down
3 changes: 3 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Any, Callable, Dict, List, Tuple # noqa: F401

from cachetools import TTLCache
from ipaddress import IPv4Address

from datadog_checks.base import is_affirmative
from datadog_checks.base.log import get_check_logger
Expand Down Expand Up @@ -183,6 +184,8 @@ def default_json_event_encoding(o):
return float(o)
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
if isinstance(o, IPv4Address):
return str(o)
raise TypeError


Expand Down
4 changes: 2 additions & 2 deletions datadog_checks_dev/datadog_checks/dev/tooling/commands/dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

# Dependencies to ignore when update dependencies
IGNORED_DEPS = {
'psycopg2-binary', # https://github.com/DataDog/integrations-core/pull/10456
'ddtrace', # https://github.com/DataDog/integrations-core/pull/9132
'psycopg2-binary', # https://github.com/DataDog/integrations-core/pull/10456
'ddtrace', # https://github.com/DataDog/integrations-core/pull/9132
'flup', # https://github.com/DataDog/integrations-core/pull/1997
# https://github.com/DataDog/integrations-core/pull/10105;
# snowflake-connector-python caps cryptography which means we need to be careful with how we update it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
# https://github.com/psycopg/psycopg2/blob/master/LICENSE
# https://github.com/psycopg/psycopg2/blob/master/doc/COPYING.LESSER
'psycopg2-binary': ['LGPL-3.0-only', 'BSD-3-Clause'],
# https://github.com/psycopg/psycopg/blob/master/LICENSE.txt
'psycopg-binary': ['LGPL-3.0-only', 'BSD-3-Clause'],
# https://github.com/Legrandin/pycryptodome/blob/master/LICENSE.rst
'pycryptodomex': ['Unlicense', 'BSD-2-Clause'],
# https://github.com/requests/requests-kerberos/pull/123
Expand Down Expand Up @@ -158,6 +160,7 @@
'paramiko': 'https://github.com/paramiko/paramiko',
'protobuf': 'https://github.com/protocolbuffers/protobuf',
'psycopg2-binary': 'https://github.com/psycopg/psycopg2',
'psycopg': 'https://github.com/psycopg/psycopg',
'pycryptodomex': 'https://github.com/Legrandin/pycryptodome',
'redis': 'https://github.com/redis/redis-py',
'requests': 'https://github.com/psf/requests',
Expand Down
134 changes: 102 additions & 32 deletions postgres/datadog_checks/postgres/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,130 +7,200 @@
# ddev -x validate config -s <INTEGRATION_NAME>
# ddev -x validate models -s <INTEGRATION_NAME>

from datadog_checks.base.utils.models.fields import get_default_field_value

def instance_activity_metrics_excluded_aggregations():

def shared_service(field, value):
return get_default_field_value(field, value)


def instance_activity_metrics_excluded_aggregations(field, value):
return []


def instance_application_name():
def instance_application_name(field, value):
return 'datadog-agent'


def instance_collect_activity_metrics():
def instance_aws(field, value):
return get_default_field_value(field, value)


def instance_azure(field, value):
return get_default_field_value(field, value)


def instance_collect_activity_metrics(field, value):
return False


def instance_collect_bloat_metrics():
def instance_collect_bloat_metrics(field, value):
return False


def instance_collect_count_metrics():
def instance_collect_count_metrics(field, value):
return True


def instance_collect_database_size_metrics():
def instance_collect_database_size_metrics(field, value):
return True


def instance_collect_default_database():
def instance_collect_default_database(field, value):
return True


def instance_collect_function_metrics():
def instance_collect_function_metrics(field, value):
return False


def instance_collect_wal_metrics():
def instance_collect_settings(field, value):
return get_default_field_value(field, value)


def instance_collect_wal_metrics(field, value):
return False


def instance_data_directory():
def instance_custom_queries(field, value):
return get_default_field_value(field, value)


def instance_data_directory(field, value):
return '/usr/local/pgsql/data'


def instance_dbm():
def instance_database_autodiscovery(field, value):
return get_default_field_value(field, value)


def instance_dbm(field, value):
return False


def instance_dbname():
def instance_dbname(field, value):
return 'postgres'


def instance_dbstrict():
def instance_dbstrict(field, value):
return False


def instance_disable_generic_tags():
def instance_disable_generic_tags(field, value):
return False


def instance_empty_default_hostname():
def instance_empty_default_hostname(field, value):
return False


def instance_idle_connection_timeout():
def instance_gcp(field, value):
return get_default_field_value(field, value)


def instance_idle_connection_timeout(field, value):
return 60000


def instance_ignore_databases():
def instance_ignore_databases(field, value):
return ['template%', 'rdsadmin', 'azure_maintenance']


def instance_log_unobfuscated_plans():
def instance_log_unobfuscated_plans(field, value):
return False


def instance_log_unobfuscated_queries():
def instance_log_unobfuscated_queries(field, value):
return False


def instance_max_connections():
def instance_max_connections(field, value):
return 30


def instance_max_relations():
def instance_max_relations(field, value):
return 300


def instance_min_collection_interval():
def instance_metric_patterns(field, value):
return get_default_field_value(field, value)


def instance_min_collection_interval(field, value):
return 15


def instance_pg_stat_statements_view():
def instance_obfuscator_options(field, value):
return get_default_field_value(field, value)


def instance_password(field, value):
return get_default_field_value(field, value)


def instance_pg_stat_statements_view(field, value):
return 'show_pg_stat_statements()'


def instance_port():
def instance_port(field, value):
return 5432


def instance_query_timeout():
def instance_query_activity(field, value):
return get_default_field_value(field, value)


def instance_query_metrics(field, value):
return get_default_field_value(field, value)


def instance_query_samples(field, value):
return get_default_field_value(field, value)


def instance_query_timeout(field, value):
return 5000


def instance_ssl():
def instance_relations(field, value):
return get_default_field_value(field, value)


def instance_reported_hostname(field, value):
return get_default_field_value(field, value)


def instance_service(field, value):
return get_default_field_value(field, value)


def instance_ssl(field, value):
return False


def instance_ssl_cert():
def instance_ssl_cert(field, value):
return False


def instance_ssl_key():
def instance_ssl_key(field, value):
return False


def instance_ssl_password():
def instance_ssl_password(field, value):
return False


def instance_ssl_root_cert():
def instance_ssl_root_cert(field, value):
return False


def instance_table_count_limit():
def instance_table_count_limit(field, value):
return 200


def instance_tag_replication_role():
def instance_tag_replication_role(field, value):
return False


def instance_tags(field, value):
return get_default_field_value(field, value)
30 changes: 20 additions & 10 deletions postgres/datadog_checks/postgres/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import time
from typing import Callable, Dict

import psycopg2
import psycopg
from datadog_checks.base import AgentCheck


class ConnectionPoolFullError(Exception):
Expand All @@ -23,7 +24,7 @@ def __str__(self):
class ConnectionInfo:
def __init__(
self,
connection: psycopg2.extensions.connection,
connection: psycopg.Connection,
deadline: int,
active: bool,
last_accessed: int,
Expand Down Expand Up @@ -66,7 +67,8 @@ def __repr__(self):
def reset(self):
self.__init__()

def __init__(self, connect_fn: Callable[[str], None], max_conns: int = None):
def __init__(self, check: AgentCheck, connect_fn: Callable[[str], None], max_conns: int = None):
self.log = check.log
self.max_conns: int = max_conns
self._stats = self.Stats()
self._mu = threading.RLock()
Expand All @@ -86,9 +88,9 @@ def _get_connection_raw(
dbname: str,
ttl_ms: int,
timeout: int = None,
startup_fn: Callable[[psycopg2.extensions.connection], None] = None,
startup_fn: Callable[[psycopg.Connection], None] = None,
persistent: bool = False,
) -> psycopg2.extensions.connection:
) -> psycopg.Connection:
"""
Return a connection from the pool.
Pass a function to startup_func if there is an action needed with the connection
Expand Down Expand Up @@ -117,7 +119,7 @@ def _get_connection_raw(
# if already in pool, retain persistence status
persistent = conn.persistent

if db.status != psycopg2.extensions.STATUS_READY:
if db.info.status != psycopg.pq.ConnStatus.OK:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
db.rollback()

Expand All @@ -132,6 +134,14 @@ def _get_connection_raw(
)
return db

def set_conn_inactive(self, dbname):
with self._mu:
try:
self._conns[dbname].active = False
except KeyError:
# if self._get_connection_raw hit an exception, self._conns[dbname] didn't get populated
pass

@contextlib.contextmanager
def get_connection(self, dbname: str, ttl_ms: int, timeout: int = None, persistent: bool = False):
"""
Expand All @@ -140,12 +150,10 @@ def get_connection(self, dbname: str, ttl_ms: int, timeout: int = None, persiste
make a new connection if the max_conn limit hasn't been reached.
Blocks until a connection can be added to the pool,
and optionally takes a timeout in seconds.
Note that leaving a connection context here does NOT close the connection in psycopg2;
connections must be manually closed by `close_all_connections()`.
"""
try:
with self._mu:
db = self._get_connection_raw(dbname, ttl_ms, timeout, persistent)
db = self._get_connection_raw(dbname=dbname, ttl_ms=ttl_ms, timeout=timeout, persistent=persistent)
yield db
finally:
with self._mu:
Expand Down Expand Up @@ -199,8 +207,10 @@ def _terminate_connection_unsafe(self, dbname: str):
if db is not None:
try:
self._stats.connection_closed += 1
db.close()
if not db.closed:
db.close()
except Exception:
self._stats.connection_closed_failed += 1
self.log.exception("failed to close DB connection for db=%s", dbname)
return False
return True
Loading

0 comments on commit 4f9f6c0

Please sign in to comment.