Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert psycopg3 upgrade in postgres integration #15859

Merged
merged 24 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6bfb543
Revert to last good commit
alexandre-normand Sep 18, 2023
b04fc2d
Add support for sending database_instance metadata (#15559)
alexandre-normand Aug 15, 2023
aa8ad11
Add support for authenticating through Azure Managed Identity (#15609)
jmeunier28 Aug 17, 2023
0863129
Fix InstanceConfig loading error for `ssl` config (#15611)
jmeunier28 Aug 18, 2023
6b1b4ef
Return Azure AD auth token in correct format (#15701)
jmeunier28 Aug 29, 2023
ff3263e
load version / aurora version on intialize & fix a few tests
jmeunier28 Sep 18, 2023
a13d4fb
fix version test
jmeunier28 Sep 18, 2023
b006ea1
fix changelog
jmeunier28 Sep 18, 2023
3f420af
fix datadog checks base
jmeunier28 Sep 18, 2023
685999f
more misc test fixes
jmeunier28 Sep 18, 2023
1f65b1d
fix check version pin
jmeunier28 Sep 18, 2023
f91924e
style fix
jmeunier28 Sep 18, 2023
ad7d2d9
use v2 schemas for monitors
jmeunier28 Sep 18, 2023
d44da96
fix formatting
jmeunier28 Sep 18, 2023
9a5767f
fix unit test
lu-zhengda Sep 18, 2023
17d9fe0
add test table cities back
lu-zhengda Sep 18, 2023
7217c49
make test less flakey
lu-zhengda Sep 18, 2023
bacb048
make test less flakey
lu-zhengda Sep 18, 2023
d45c107
add back postgresql.replication_delay to Function with Archive WAL-dr…
lu-zhengda Sep 18, 2023
b25421e
Revert "add back postgresql.replication_delay to Function with Archiv…
lu-zhengda Sep 18, 2023
b7a2126
fix _is_aurora typo, should be is_aurora
lu-zhengda Sep 18, 2023
cf12a0f
fix _is_aurora to is_aurora
lu-zhengda Sep 18, 2023
c22c567
fix wal write metrics
jmeunier28 Sep 18, 2023
940ada8
remove unnecessary assert on exact value
jmeunier28 Sep 18, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions postgres/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

* Attempt to connect to the database and fail fast before trying to establish a connection pool ([#15839](https://github.com/DataDog/integrations-core/pull/15839))

***Fixed***:

* Revert psycopg3 upgrade ([#15859](https://github.com/DataDog/integrations-core/pull/15859))

## 14.2.4 / 2023-09-07

***Fixed***:
Expand Down
41 changes: 0 additions & 41 deletions postgres/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,6 @@ files:
value:
type: integer
example: 5000
- name: connection_timeout
description: |
Sets the timeout (in ms) that the agent will wait to receive a connection from the database.
hidden: true
value:
type: integer
example: 5000
- name: idle_connection_timeout
description: |
Sets the timeout (in ms) a connection used in the connection pool will be idle until it is closed by the pooler.
Expand Down Expand Up @@ -509,40 +502,6 @@ files:
type: number
example: 600

- name: collect_schemas
description: |
Enable collection of database schemas. In order to collect schemas from all user databases,
enable `database_autodiscovery`. To collect from a single database, set `dbname` to collect
the schema for that database.
Relation metrics must be enabled for schema collection.
options:
- name: enabled
description: |
Enable collection of database schemas. Requires `dbm: true` and relation metrics must be enabled.
value:
type: boolean
example: false
- name: max_tables
description: |
Maximum amount of tables the Agent collects from the instance.
value:
type: number
example: 1000
display_default: 1000
- name: max_columns
description: |
Maximum amount of columns the Agent collects per table.
value:
type: number
example: 50
display_default: 50
- name: collection_interval
description: |
The database schema collection interval (in seconds).
value:
type: number
example: 600

- name: aws
description: |
This block defines the configuration for AWS RDS and Aurora instances.
Expand Down
2 changes: 1 addition & 1 deletion postgres/assets/monitors/percent_usage_connections.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@
],
"type": "query alert"
}
}
}
2 changes: 1 addition & 1 deletion postgres/assets/monitors/replication_delay.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@
],
"type": "query alert"
}
}
}
9 changes: 0 additions & 9 deletions postgres/datadog_checks/postgres/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ def __init__(self, instance):
'"dbname" parameter must be set OR autodiscovery must be enabled when using the "relations" parameter.'
)
self.max_connections = instance.get('max_connections', 30)
connection_timeout_ms = instance.get('connection_timeout', 5000)
# Convert milliseconds to seconds and ensure a minimum of 2 seconds, which is enforced by psycopg
self.connection_timeout = max(2, connection_timeout_ms / 1000)
self.tags = self._build_tags(instance.get('tags', []))

ssl = instance.get('ssl', "disable")
Expand Down Expand Up @@ -101,12 +98,6 @@ def __init__(self, instance):
self.pg_stat_activity_view = instance.get('pg_stat_activity_view', 'pg_stat_activity')
self.statement_samples_config = instance.get('query_samples', instance.get('statement_samples', {})) or {}
self.settings_metadata_config = instance.get('collect_settings', {}) or {}
self.schemas_metadata_config = instance.get('collect_schemas', {"enabled": False})
if not self.relations and self.schemas_metadata_config['enabled']:
raise ConfigurationError(
'In order to collect schemas on this database, you must enable relation metrics collection.'
)

self.resources_metadata_config = instance.get('collect_resources', {}) or {}
self.statement_activity_config = instance.get('query_activity', {}) or {}
self.statement_metrics_config = instance.get('query_metrics', {}) or {}
Expand Down
4 changes: 0 additions & 4 deletions postgres/datadog_checks/postgres/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ def instance_collect_wal_metrics():
return False


def instance_connection_timeout():
return 5000


def instance_data_directory():
return '/usr/local/pgsql/data'

Expand Down
13 changes: 0 additions & 13 deletions postgres/datadog_checks/postgres/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,6 @@ class Azure(BaseModel):
fully_qualified_domain_name: Optional[str] = None


class CollectSchemas(BaseModel):
model_config = ConfigDict(
arbitrary_types_allowed=True,
frozen=True,
)
collection_interval: Optional[float] = None
enabled: Optional[bool] = None
max_columns: Optional[float] = None
max_tables: Optional[float] = None


class CollectSettings(BaseModel):
model_config = ConfigDict(
arbitrary_types_allowed=True,
Expand Down Expand Up @@ -174,10 +163,8 @@ class InstanceConfig(BaseModel):
collect_database_size_metrics: Optional[bool] = None
collect_default_database: Optional[bool] = None
collect_function_metrics: Optional[bool] = None
collect_schemas: Optional[CollectSchemas] = None
collect_settings: Optional[CollectSettings] = None
collect_wal_metrics: Optional[bool] = None
connection_timeout: Optional[int] = None
custom_queries: Optional[tuple[MappingProxyType[str, Any], ...]] = None
data_directory: Optional[str] = None
database_autodiscovery: Optional[DatabaseAutodiscovery] = None
Expand Down
113 changes: 43 additions & 70 deletions postgres/datadog_checks/postgres/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
import time
from typing import Callable, Dict

from psycopg_pool import ConnectionPool

from datadog_checks.base import AgentCheck
import psycopg2


class ConnectionPoolFullError(Exception):
Expand All @@ -25,16 +23,18 @@ def __str__(self):
class ConnectionInfo:
def __init__(
self,
connection: ConnectionPool,
connection: psycopg2.extensions.connection,
deadline: int,
active: bool,
last_accessed: int,
thread: threading.Thread,
persistent: bool,
):
self.connection = connection
self.deadline = deadline
self.active = active
self.last_accessed = last_accessed
self.thread = thread
self.persistent = persistent


Expand Down Expand Up @@ -66,46 +66,40 @@ def __repr__(self):
def reset(self):
self.__init__()

def __init__(self, check: AgentCheck, connect_fn: Callable[[str, int, int], None], max_conns: int = None):
self._check = check
self._log = check.log
self._config = check._config
def __init__(self, connect_fn: Callable[[str], None], max_conns: int = None):
self.max_conns: int = max_conns
self._stats = self.Stats()
self._mu = threading.RLock()
self._conns: Dict[str, ConnectionInfo] = {}

if hasattr(inspect, 'signature'):
connect_sig = inspect.signature(connect_fn)
if not (len(connect_sig.parameters) >= 1):
if len(connect_sig.parameters) != 1:
raise ValueError(
"Invalid signature for the connection function. "
"Expected parameters: dbname, min_pool_size, max_pool_size. "
"Got signature: {}".format(connect_sig)
"A single parameter for dbname is expected, got signature: {}".format(connect_sig)
)
self.connect_fn = connect_fn

def _get_connection_pool(
def _get_connection_raw(
self,
dbname: str,
ttl_ms: int,
timeout: int = None,
min_pool_size: int = 1,
max_pool_size: int = None,
startup_fn: Callable[[ConnectionPool], None] = None,
startup_fn: Callable[[psycopg2.extensions.connection], None] = None,
persistent: bool = False,
) -> ConnectionPool:
) -> psycopg2.extensions.connection:
"""
Return a connection pool for the requested database from the managed pool.
Return a connection from the pool.
Pass a function to startup_func if there is an action needed with the connection
when re-establishing it.
"""
start = datetime.datetime.now()
self.prune_connections()
with self._mu:
conn = self._conns.pop(dbname, ConnectionInfo(None, None, None, None, None))
db_pool = conn.connection
if db_pool is None or db_pool.closed:
conn = self._conns.pop(dbname, ConnectionInfo(None, None, None, None, None, None))
db = conn.connection
if db is None or db.closed:
if self.max_conns is not None:
# try to free space until we succeed
while len(self._conns) >= self.max_conns:
Expand All @@ -116,22 +110,27 @@ def _get_connection_pool(
time.sleep(0.01)
continue
self._stats.connection_opened += 1
db_pool = self.connect_fn(dbname, min_pool_size, max_pool_size)
db = self.connect_fn(dbname)
if startup_fn:
startup_fn(db_pool)
startup_fn(db)
else:
# if already in pool, retain persistence status
persistent = conn.persistent

if db.status != psycopg2.extensions.STATUS_READY:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
db.rollback()

deadline = datetime.datetime.now() + datetime.timedelta(milliseconds=ttl_ms)
self._conns[dbname] = ConnectionInfo(
connection=db_pool,
connection=db,
deadline=deadline,
active=True,
last_accessed=datetime.datetime.now(),
thread=threading.current_thread(),
persistent=persistent,
)
return db_pool
return db

@contextlib.contextmanager
def get_connection(self, dbname: str, ttl_ms: int, timeout: int = None, persistent: bool = False):
Expand All @@ -141,20 +140,19 @@ def get_connection(self, dbname: str, ttl_ms: int, timeout: int = None, persiste
make a new connection if the max_conn limit hasn't been reached.
Blocks until a connection can be added to the pool,
and optionally takes a timeout in seconds.
Note that leaving a connection context here does NOT close the connection in psycopg2;
connections must be manually closed by `close_all_connections()`.
"""
with self._mu:
pool = self._get_connection_pool(dbname=dbname, ttl_ms=ttl_ms, timeout=timeout, persistent=persistent)
db = pool.getconn()
try:
with self._mu:
db = self._get_connection_raw(dbname, ttl_ms, timeout, persistent)
yield db
finally:
with self._mu:
try:
pool.putconn(db)
if not self._conns[dbname].persistent:
self._conns[dbname].active = False
self._conns[dbname].active = False
except KeyError:
# if self._get_connection_raw hit an exception, self._conns[conn_name] didn't get populated
# if self._get_connection_raw hit an exception, self._conns[dbname] didn't get populated
pass

def prune_connections(self):
Expand All @@ -167,20 +165,16 @@ def prune_connections(self):
"""
with self._mu:
now = datetime.datetime.now()
for conn_name, conn in list(self._conns.items()):
if conn.deadline < now and not conn.active and not conn.persistent:
for dbname, conn in list(self._conns.items()):
if conn.deadline < now:
self._stats.connection_pruned += 1
self._terminate_connection_unsafe(conn_name)
self._terminate_connection_unsafe(dbname)

def close_all_connections(self):
"""
Will block until all connections are terminated, unless the pre-configured timeout is hit
:param timeout:
:return:
"""
success = True
with self._mu:
for dbname in list(self._conns):
while self._conns:
dbname = next(iter(self._conns))
if not self._terminate_connection_unsafe(dbname):
success = False
return success
Expand All @@ -200,34 +194,13 @@ def evict_lru(self) -> str:
# Could not evict a candidate; return None
return None

def _terminate_connection_unsafe(self, dbname: str) -> bool:
if dbname not in self._conns:
return True

db = self._conns.pop(dbname).connection
try:
# pyscopg3 will IMMEDIATELY close the connection when calling close().
# if timeout is not specified, psycopg will wait for the default 5s to stop the thread in the pool
# if timeout is 0 or negative, psycopg will not wait for worker threads to terminate
db.close(timeout=0)
self._stats.connection_closed += 1
except Exception:
self._stats.connection_closed_failed += 1
self._log.exception("failed to close DB connection for db=%s", dbname)
return False

def _terminate_connection_unsafe(self, dbname: str):
db = self._conns.pop(dbname, ConnectionInfo(None, None, None, None, None, None)).connection
if db is not None:
try:
self._stats.connection_closed += 1
db.close()
except Exception:
self._stats.connection_closed_failed += 1
return False
return True

def get_main_db_pool(self, max_pool_conn_size: int = 3):
"""
Returns a memoized, persistent psycopg connection pool to `self.dbname`.
Is meant to be shared across multiple threads, and opens a preconfigured max number of connections.
:return: a psycopg connection
"""
conn = self._get_connection_pool(
dbname=self._config.dbname,
ttl_ms=self._config.idle_connection_timeout,
max_pool_size=max_pool_conn_size,
persistent=True,
)
return conn
27 changes: 0 additions & 27 deletions postgres/datadog_checks/postgres/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -404,33 +404,6 @@ instances:
#
# collection_interval: 600

## Enable collection of database schemas. In order to collect schemas from all user databases,
## enable `database_autodiscovery`. To collect from a single database, set `dbname` to collect
## the schema for that database.
## Relation metrics must be enabled for schema collection.
#
# collect_schemas:

## @param enabled - boolean - optional - default: false
## Enable collection of database schemas. Requires `dbm: true` and relation metrics must be enabled.
#
# enabled: false

## @param max_tables - number - optional - default: 1000
## Maximum amount of tables the Agent collects from the instance.
#
# max_tables: 1000

## @param max_columns - number - optional - default: 50
## Maximum amount of columns the Agent collects per table.
#
# max_columns: 50

## @param collection_interval - number - optional - default: 600
## The database schema collection interval (in seconds).
#
# collection_interval: 600

## This block defines the configuration for AWS RDS and Aurora instances.
##
## Complete this section if you have installed the Datadog AWS Integration
Expand Down
Loading
Loading