Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade postgres check to psycopg3 #15411

Merged
merged 21 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .ddev/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ mmh3 = ['CC0-1.0']
paramiko = ['LGPL-2.1-only']
# https://github.com/oracle/python-oracledb/blob/main/LICENSE.txt
oracledb = ['Apache-2.0']
# https://github.com/psycopg/psycopg/blob/master/LICENSE.txt
psycopg = ['LGPL-3.0-only']
# https://github.com/psycopg/psycopg2/blob/master/LICENSE
# https://github.com/psycopg/psycopg2/blob/master/doc/COPYING.LESSER
psycopg2-binary = ['LGPL-3.0-only', 'BSD-3-Clause']
Expand Down Expand Up @@ -119,6 +121,7 @@ packaging = 'https://github.com/pypa/packaging'
paramiko = 'https://github.com/paramiko/paramiko'
protobuf = 'https://github.com/protocolbuffers/protobuf'
psycopg2-binary = 'https://github.com/psycopg/psycopg2'
psycopg = 'https://github.com/psycopg/psycopg'
pycryptodomex = 'https://github.com/Legrandin/pycryptodome'
redis = 'https://github.com/redis/redis-py'
requests = 'https://github.com/psf/requests'
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ prometheus-client,PyPI,Apache-2.0,Copyright 2015 The Prometheus Authors
protobuf,PyPI,BSD-3-Clause,Copyright 2008 Google Inc.
protobuf,PyPI,BSD-3-Clause,Copyright 2008 Google Inc. All rights reserved.
psutil,PyPI,BSD-3-Clause,"Copyright (c) 2009, Jay Loden, Dave Daeschler, Giampaolo Rodola'"
psycopg,PyPI,LGPL-3.0-only,Copyright (C) 2020 The Psycopg Team
psycopg2-binary,PyPI,BSD-3-Clause,Copyright 2013 Federico Di Gregorio
psycopg2-binary,PyPI,LGPL-3.0-only,Copyright (C) 2013 Federico Di Gregorio
pyasn1,PyPI,BSD-3-Clause,"Copyright (c) 2005-2019, Ilya Etingof <[email protected]>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protobuf==3.17.3; python_version < '3.0'
protobuf==3.20.2; python_version > '3.0'
psutil==5.9.0
psycopg2-binary==2.8.6; sys_platform != 'darwin' or platform_machine != 'arm64'
psycopg[binary]==3.1.9; python_version > '3.0'
pyasn1==0.4.6
pycryptodomex==3.10.1
pydantic==2.0.2; python_version > '3.0'
Expand Down
6 changes: 6 additions & 0 deletions datadog_checks_base/datadog_checks/base/utils/db/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import threading
import time
from concurrent.futures.thread import ThreadPoolExecutor
from ipaddress import IPv4Address
from itertools import chain
from typing import Any, Callable, Dict, List, Tuple # noqa: F401

Expand Down Expand Up @@ -183,6 +184,8 @@ def default_json_event_encoding(o):
return float(o)
if isinstance(o, (datetime.date, datetime.datetime)):
return o.isoformat()
if isinstance(o, IPv4Address):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've been seeing json serialization errors for a long time now due to some activity payloads containing IPv4Address objects

return str(o)
raise TypeError


Expand Down Expand Up @@ -259,6 +262,9 @@ def cancel(self):
Send a signal to cancel the job loop asynchronously.
"""
self._cancel_event.set()
# after setting cancel event, wait for job loop to fully shutdown
if self._job_loop_future:
self._job_loop_future.result()
Copy link
Contributor Author

@jmeunier28 jmeunier28 Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this change, we see a race condition that can happen when multiple job loop threads are running & cancel is called. This can sometimes result in a segfault:

tests/test_statements.py::test_async_job_enabled[True3-True1-True0] Fatal Python error: Segmentation fault

Current thread 0x00007f4924d0b700 (most recent call first):
  File "/home/ec2-user/.local/share/hatch/env/virtual/datadog-postgres/550Gv7FF/py3.9-12.1/lib/python3.9/site-packages/psycopg/cursor.py", line 516 in _select_current_result
  File "/home/ec2-user/.local/share/hatch/env/virtual/datadog-postgres/550Gv7FF/py3.9-12.1/lib/python3.9/site-packages/psycopg/cursor.py", line 205 in _execute_gen
  File "/home/ec2-user/.local/share/hatch/env/virtual/datadog-postgres/550Gv7FF/py3.9-12.1/lib/python3.9/site-packages/psycopg/connection.py", line 957 in wait
....
...
  File "/home/ec2-user/dd/integrations-core/datadog_checks_base/datadog_checks/base/utils/tracking.py", line 71 in wrapper
  File "/home/ec2-user/dd/integrations-core/postgres/datadog_checks/postgres/statement_samples.py", line 434 in run_job
  File "/home/ec2-user/dd/integrations-core/datadog_checks_base/datadog_checks/base/utils/db/utils.py", line 348 in _run_job_traced
  File "/home/ec2-user/dd/integrations-core/datadog_checks_base/datadog_checks/base/utils/db/utils.py", line 342 in _run_job_rate_limited
  File "/home/ec2-user/dd/integrations-core/datadog_checks_base/datadog_checks/base/utils/db/utils.py", line 303 in _job_loop
  File "/home/ec2-user/anaconda3/envs/agent39/lib/python3.9/concurrent/futures/thread.py", line 58 in run
  File "/home/ec2-user/anaconda3/envs/agent39/lib/python3.9/concurrent/futures/thread.py", line 83 in _worker
This is because the call to cancel would simply set the cancel() threading event & would return immediately, which would result in a call to close_all_connections. This is a problem bc setting the cancel event on the job loop thread does not guarantee that the thread isn't already executing at the time we end up calling close for all connections in the db pool. (we only check if cancel is set [here](https://github.com/DataDog/integrations-core/blob/master/datadog_checks_base/datadog_checks/base/utils/db/utils.py#L290) and [here](https://github.com/DataDog/integrations-core/blob/master/datadog_checks_base/datadog_checks/base/utils/db/utils.py#L340))

Therefore, the only way to safely cancel all job loops is to do the following:

  1. cancel is called from main check thread
  2. Wait for all job loop cancel methods to return
  3. Job loop method cancel methods do not return until the job loop is no longer running
  4. Once all job loop thread cancel()'s return, close all database connections in the conn pool
  5. Finally, close main check connection once check is no longer running


def run_job_loop(self, tags):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
# https://github.com/psycopg/psycopg2/blob/master/LICENSE
# https://github.com/psycopg/psycopg2/blob/master/doc/COPYING.LESSER
'psycopg2-binary': ['LGPL-3.0-only', 'BSD-3-Clause'],
# https://github.com/psycopg/psycopg/blob/master/LICENSE.txt
'psycopg': ['LGPL-3.0-only'],
# https://github.com/Legrandin/pycryptodome/blob/master/LICENSE.rst
'pycryptodomex': ['Unlicense', 'BSD-2-Clause'],
# https://github.com/requests/requests-kerberos/pull/123
Expand Down Expand Up @@ -158,6 +160,7 @@
'paramiko': 'https://github.com/paramiko/paramiko',
'protobuf': 'https://github.com/protocolbuffers/protobuf',
'psycopg2-binary': 'https://github.com/psycopg/psycopg2',
'psycopg': 'https://github.com/psycopg/psycopg',
'pycryptodomex': 'https://github.com/Legrandin/pycryptodome',
'redis': 'https://github.com/redis/redis-py',
'requests': 'https://github.com/psf/requests',
Expand Down
10 changes: 5 additions & 5 deletions postgres/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ files:
Note: `true` is an alias for `require`, and `false` is an alias for `disable`.
value:
type: string
display_default: false
display_default: "false"
example: "false"
- name: ssl_root_cert
description: |
Expand All @@ -97,7 +97,7 @@ files:
For a detailed description of how this option works see https://www.postgresql.org/docs/current/libpq-ssl.html
value:
type: string
display_default: false
display_default: "false"
example: "/home/datadog/server-ca.pem"
- name: ssl_cert
description: |
Expand All @@ -106,7 +106,7 @@ files:
For a detailed description of how this option works see https://www.postgresql.org/docs/current/libpq-ssl.html
value:
type: string
display_default: false
display_default: "false"
example: "/home/datadog/client-cert.pem"
- name: ssl_key
description: |
Expand All @@ -115,7 +115,7 @@ files:
For a detailed description of how this option works see https://www.postgresql.org/docs/current/libpq-ssl.html
value:
type: string
display_default: false
display_default: "false"
example: "/home/datadog/client-key.pem"
- name: ssl_password
description: |
Expand All @@ -125,7 +125,7 @@ files:
For a detailed description of how this option works see https://www.postgresql.org/docs/current/libpq-ssl.html
value:
type: string
display_default: false
display_default: "false"
example: "ssl_key_password"
- name: query_timeout
description: |
Expand Down
2 changes: 1 addition & 1 deletion postgres/datadog_checks/postgres/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(self, instance):
if ssl in SSL_MODES:
self.ssl_mode = ssl
else:
self.ssl_mode = 'require' if is_affirmative(ssl) else 'disable'
self.ssl_mode = 'require' if ssl == "true" else 'disable'

self.ssl_cert = instance.get('ssl_cert', None)
self.ssl_root_cert = instance.get('ssl_root_cert', None)
Expand Down
10 changes: 5 additions & 5 deletions postgres/datadog_checks/postgres/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,23 @@ def instance_query_timeout():


def instance_ssl():
return False
return 'false'


def instance_ssl_cert():
return False
return 'false'


def instance_ssl_key():
return False
return 'false'


def instance_ssl_password():
return False
return 'false'


def instance_ssl_root_cert():
return False
return 'false'


def instance_table_count_limit():
Expand Down
23 changes: 13 additions & 10 deletions postgres/datadog_checks/postgres/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import time
from typing import Callable, Dict

import psycopg2
import psycopg

from datadog_checks.base import AgentCheck


class ConnectionPoolFullError(Exception):
Expand All @@ -23,7 +25,7 @@ def __str__(self):
class ConnectionInfo:
def __init__(
self,
connection: psycopg2.extensions.connection,
connection: psycopg.Connection,
deadline: int,
active: bool,
last_accessed: int,
Expand Down Expand Up @@ -66,7 +68,8 @@ def __repr__(self):
def reset(self):
self.__init__()

def __init__(self, connect_fn: Callable[[str], None], max_conns: int = None):
def __init__(self, check: AgentCheck, connect_fn: Callable[[str], None], max_conns: int = None):
self.log = check.log
self.max_conns: int = max_conns
self._stats = self.Stats()
self._mu = threading.RLock()
Expand All @@ -86,9 +89,9 @@ def _get_connection_raw(
dbname: str,
ttl_ms: int,
timeout: int = None,
startup_fn: Callable[[psycopg2.extensions.connection], None] = None,
startup_fn: Callable[[psycopg.Connection], None] = None,
persistent: bool = False,
) -> psycopg2.extensions.connection:
) -> psycopg.Connection:
"""
Return a connection from the pool.
Pass a function to startup_func if there is an action needed with the connection
Expand Down Expand Up @@ -117,7 +120,7 @@ def _get_connection_raw(
# if already in pool, retain persistence status
persistent = conn.persistent

if db.status != psycopg2.extensions.STATUS_READY:
if db.info.status != psycopg.pq.ConnStatus.OK:
# Some transaction went wrong and the connection is in an unhealthy state. Let's fix that
db.rollback()

Expand All @@ -140,12 +143,10 @@ def get_connection(self, dbname: str, ttl_ms: int, timeout: int = None, persiste
make a new connection if the max_conn limit hasn't been reached.
Blocks until a connection can be added to the pool,
and optionally takes a timeout in seconds.
Note that leaving a connection context here does NOT close the connection in psycopg2;
connections must be manually closed by `close_all_connections()`.
"""
try:
with self._mu:
db = self._get_connection_raw(dbname, ttl_ms, timeout, persistent)
db = self._get_connection_raw(dbname=dbname, ttl_ms=ttl_ms, timeout=timeout, persistent=persistent)
yield db
finally:
with self._mu:
Expand Down Expand Up @@ -199,8 +200,10 @@ def _terminate_connection_unsafe(self, dbname: str):
if db is not None:
try:
self._stats.connection_closed += 1
db.close()
if not db.closed:
db.close()
except Exception:
self._stats.connection_closed_failed += 1
self.log.exception("failed to close DB connection for db=%s", dbname)
return False
return True
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging

import psycopg2
from psycopg.rows import dict_row

from datadog_checks.base.utils.db.sql import compute_sql_signature
from datadog_checks.base.utils.tracking import tracked_method
Expand All @@ -16,12 +16,12 @@
PREPARE_STATEMENT_QUERY = 'PREPARE dd_{query_signature} AS {statement}'

PARAM_TYPES_COUNT_QUERY = '''\
SELECT CARDINALITY(parameter_types) FROM pg_prepared_statements WHERE name = 'dd_{query_signature}'
SELECT CARDINALITY(parameter_types) as count FROM pg_prepared_statements WHERE name = 'dd_{query_signature}'
'''

EXECUTE_PREPARED_STATEMENT_QUERY = 'EXECUTE dd_{prepared_statement}({generic_values})'

EXPLAIN_QUERY = 'SELECT {explain_function}($stmt${statement}$stmt$)'
EXPLAIN_QUERY = 'SELECT {explain_function}($stmt${statement}$stmt$) as explain_statement'


def agent_check_getter(self):
Expand Down Expand Up @@ -81,7 +81,7 @@ def explain_statement(self, dbname, statement, obfuscated_statement):
result = self._explain_prepared_statement(dbname, statement, obfuscated_statement, query_signature)
self._deallocate_prepared_statement(dbname, query_signature)
if result:
return result[0][0][0]
return result[0]['explain_statement'][0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do prefer ['explain_statement'] to [0]. 🙇

return None

def _set_plan_cache_mode(self, dbname):
Expand Down Expand Up @@ -112,7 +112,10 @@ def _get_number_of_parameters_for_prepared_statement(self, dbname, query_signatu
rows = self._execute_query_and_fetch_rows(
dbname, PARAM_TYPES_COUNT_QUERY.format(query_signature=query_signature)
)
return rows[0][0] if rows else 0
count = 0
if rows and 'count' in rows[0]:
count = rows[0]['count']
return count

@tracked_method(agent_check_getter=agent_check_getter)
def _explain_prepared_statement(self, dbname, statement, obfuscated_statement, query_signature):
Expand Down Expand Up @@ -157,15 +160,14 @@ def _deallocate_prepared_statement(self, dbname, query_signature):
)

def _execute_query(self, dbname, query):
# Psycopg2 connections do not get closed when context ends;
# leaving context will just mark the connection as inactive in MultiDatabaseConnectionPool
with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
with conn.cursor(row_factory=dict_row) as cursor:
logger.debug('Executing query=[%s]', query)
cursor.execute(query)

def _execute_query_and_fetch_rows(self, dbname, query):
with self._check.db_pool.get_connection(dbname, self._check._config.idle_connection_timeout) as conn:
with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
with conn.cursor(row_factory=dict_row) as cursor:
logger.debug('Executing query=[%s]', query)
cursor.execute(query)
return cursor.fetchall()
10 changes: 5 additions & 5 deletions postgres/datadog_checks/postgres/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import time
from typing import Dict, Optional, Tuple # noqa: F401

import psycopg2
import psycopg
from psycopg.rows import dict_row

try:
import datadog_agent
Expand Down Expand Up @@ -36,7 +37,7 @@ class PostgresMetadata(DBMAsyncJob):
2. collection of pg_settings
"""

def __init__(self, check, config, shutdown_callback):
def __init__(self, check, config):
self.pg_settings_collection_interval = config.settings_metadata_config.get(
'collection_interval', DEFAULT_SETTINGS_COLLECTION_INTERVAL
)
Expand All @@ -54,9 +55,8 @@ def __init__(self, check, config, shutdown_callback):
enabled=is_affirmative(config.resources_metadata_config.get('enabled', True)),
dbms="postgres",
min_collection_interval=config.min_collection_interval,
expected_db_exceptions=(psycopg2.errors.DatabaseError,),
expected_db_exceptions=(psycopg.errors.DatabaseError,),
job_name="database-metadata",
shutdown_callback=shutdown_callback,
)
self._check = check
self._config = config
Expand Down Expand Up @@ -114,7 +114,7 @@ def _payload_pg_version(self):

@tracked_method(agent_check_getter=agent_check_getter)
def _collect_postgres_settings(self):
with self._check._get_main_db().cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
with self._check.get_main_db().cursor(row_factory=dict_row) as cursor:
self._log.debug("Running query [%s]", PG_SETTINGS_QUERY)
self._time_since_last_settings_query = time.time()
cursor.execute(PG_SETTINGS_QUERY)
Expand Down
7 changes: 4 additions & 3 deletions postgres/datadog_checks/postgres/metrics_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,10 @@ def get_instance_metrics(self, version):
'relation': False,
}

res["query"] += " WHERE " + " AND ".join(
"psd.datname not ilike '{}'".format(db) for db in self.config.ignore_databases
)
if len(self.config.ignore_databases) > 0:
res["query"] += " WHERE " + " AND ".join(
"psd.datname not ilike '{}'".format(db) for db in self.config.ignore_databases
)

if self.config.dbstrict:
res["query"] += " AND psd.datname in('{}')".format(self.config.dbname)
Expand Down
Loading
Loading