-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade postgres check to psycopg3 #15411
Conversation
93305f0
to
4f9f6c0
Compare
c408c4a
to
dcaee7e
Compare
@@ -81,7 +81,7 @@ def explain_statement(self, dbname, statement, obfuscated_statement): | |||
result = self._explain_prepared_statement(dbname, statement, obfuscated_statement, query_signature) | |||
self._deallocate_prepared_statement(dbname, query_signature) | |||
if result: | |||
return result[0][0][0] | |||
return result[0]['explain_statement'][0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do prefer ['explain_statement']
to [0]
. 🙇
executor.submit(thread.cancel) | ||
for thread in [self.statement_samples, self.statement_metrics, self.metadata_samples] | ||
] | ||
concurrent.futures.wait(tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a timeout to this where we just continue forcefully? I'm thinking of some of the reports of the agent service not being able to be restarted because of the running hypothesis that one of our db integration wasn't able to gracefully terminate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, i thought about adding that, better to be safe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking of some of the reports of the agent service not being able to be restarted because of the running hypothesis that one of our db integration wasn't able to gracefully terminate
oh interesting, I hadn't heard about this
aebe710
to
d7b57f3
Compare
d7b57f3
to
9793b74
Compare
@@ -183,6 +184,8 @@ def default_json_event_encoding(o): | |||
return float(o) | |||
if isinstance(o, (datetime.date, datetime.datetime)): | |||
return o.isoformat() | |||
if isinstance(o, IPv4Address): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've been seeing json serialization errors for a long time now due to some activity payloads containing IPv4Address
objects
6c8ec01
to
e2e31b1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
Seems like this new dependency make a couple of agent builds fail: https://gitlab.ddbuild.io/DataDog/datadog-agent/-/jobs/305458412 Could you have a look? Do not hesitate to reach out to us if you need help to solve this
try: | ||
concurrent.futures.wait(tasks, timeout=self._config.min_collection_interval) | ||
except concurrent.futures.TimeoutError: | ||
self.log.warning("timeout reached on cancel, proceeding with unclean shutdown.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All warning and error logs should be more user- and support-friendly indicating the context and impact. I don't think SEs would know what to do with this and there's high possibility of misinterpreting its meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, do you have a suggestion for more helpful wording? The impact is that we could be closing connections that are in-use by the job loop threads, which would result in some nasty exceptions being thrown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Not all job loops were completed in time when cancelling the main check. Proceeding with the check cancellation. Some unexpected errors related to closed connections may occur after this message."
@@ -22,6 +22,7 @@ | |||
class VersionUtils(object): | |||
def __init__(self): | |||
self.log = get_check_logger() | |||
self.seen_aurora_exception = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.seen_aurora_exception = False | |
self._seen_aurora_exception = False |
@@ -31,6 +32,8 @@ def get_raw_version(db): | |||
return raw_version | |||
|
|||
def is_aurora(self, db): | |||
if self.seen_aurora_exception: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.seen_aurora_exception: | |
if self._seen_aurora_exception: |
@@ -39,6 +42,7 @@ def is_aurora(self, db): | |||
except Exception as e: | |||
self.log.debug("Captured exception %s while determining if the DB is aurora. Assuming is not", str(e)) | |||
db.rollback() | |||
self.seen_aurora_exception = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.seen_aurora_exception = True | |
self._seen_aurora_exception = True |
@@ -837,3 +865,9 @@ def check(self, _): | |||
finally: | |||
# Add the warnings saved during the execution of the check | |||
self._report_warnings() | |||
if self.check_cancelled and self.db: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.check_cancelled and self.db: | |
if self._check_cancelled and self.db: |
self._relations_manager = RelationsManager(self._config.relations, self._config.max_relations) | ||
self.check_cancelled = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.check_cancelled = False | |
self._check_cancelled = False |
self.log.warning("timeout reached on cancel, proceeding with unclean shutdown.") | ||
|
||
self._close_db_pool() | ||
self.check_cancelled = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.check_cancelled = True | |
self._check_cancelled = True |
finally: | ||
self.db_pool.set_conn_inactive(dbname) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is decoupling the connection getting from its context manager, which seems unnecessary. This can be accomplished by replacing:
try:
self.db_pool.get_connection(dbname, self._conn_ttl_ms)
except:
...
with
try:
with self.db_pool.get_connection(dbname, self._conn_ttl_ms):
pass
except:
...
The reason I prefer this is because it keeps the API tighter and more defensible, making it harder to introduce flows that would result in the inactivity not getting called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a mistake, and was part of the original implementation I had. If you look at the other uses, I am using a context manager in all other places.
Thanks for catching it, I'll fix it!
I think |
752a919
to
b2c9865
Compare
This PR requires this change to the build process DataDog/datadog-agent#18532 to be merged before we can proceed |
@@ -259,6 +262,9 @@ def cancel(self): | |||
Send a signal to cancel the job loop asynchronously. | |||
""" | |||
self._cancel_event.set() | |||
# after setting cancel event, wait for job loop to fully shutdown | |||
if self._job_loop_future: | |||
self._job_loop_future.result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without this change, we see a race condition that can happen when multiple job loop threads are running & cancel is called. This can sometimes result in a segfault:
tests/test_statements.py::test_async_job_enabled[True3-True1-True0] Fatal Python error: Segmentation fault
Current thread 0x00007f4924d0b700 (most recent call first):
File "/home/ec2-user/.local/share/hatch/env/virtual/datadog-postgres/550Gv7FF/py3.9-12.1/lib/python3.9/site-packages/psycopg/cursor.py", line 516 in _select_current_result
File "/home/ec2-user/.local/share/hatch/env/virtual/datadog-postgres/550Gv7FF/py3.9-12.1/lib/python3.9/site-packages/psycopg/cursor.py", line 205 in _execute_gen
File "/home/ec2-user/.local/share/hatch/env/virtual/datadog-postgres/550Gv7FF/py3.9-12.1/lib/python3.9/site-packages/psycopg/connection.py", line 957 in wait
....
...
File "/home/ec2-user/dd/integrations-core/datadog_checks_base/datadog_checks/base/utils/tracking.py", line 71 in wrapper
File "/home/ec2-user/dd/integrations-core/postgres/datadog_checks/postgres/statement_samples.py", line 434 in run_job
File "/home/ec2-user/dd/integrations-core/datadog_checks_base/datadog_checks/base/utils/db/utils.py", line 348 in _run_job_traced
File "/home/ec2-user/dd/integrations-core/datadog_checks_base/datadog_checks/base/utils/db/utils.py", line 342 in _run_job_rate_limited
File "/home/ec2-user/dd/integrations-core/datadog_checks_base/datadog_checks/base/utils/db/utils.py", line 303 in _job_loop
File "/home/ec2-user/anaconda3/envs/agent39/lib/python3.9/concurrent/futures/thread.py", line 58 in run
File "/home/ec2-user/anaconda3/envs/agent39/lib/python3.9/concurrent/futures/thread.py", line 83 in _worker
This is because the call to cancel would simply set the cancel() threading event & would return immediately, which would result in a call to close_all_connections. This is a problem bc setting the cancel event on the job loop thread does not guarantee that the thread isn't already executing at the time we end up calling close for all connections in the db pool. (we only check if cancel is set [here](https://github.com/DataDog/integrations-core/blob/master/datadog_checks_base/datadog_checks/base/utils/db/utils.py#L290) and [here](https://github.com/DataDog/integrations-core/blob/master/datadog_checks_base/datadog_checks/base/utils/db/utils.py#L340))
Therefore, the only way to safely cancel all job loops is to do the following:
cancel
is called from main check thread- Wait for all job loop cancel methods to return
- Job loop method
cancel
methods do not return until the job loop is no longer running - Once all job loop thread
cancel()
's return, close all database connections in the conn pool - Finally, close main check connection once check is no longer running
This PR is to allow for the datadog-agent build to work with the psycopg3 dependency, which was added as part of the integrations-core PR DataDog/integrations-core#15411
This PR upgrades the postgres integration to use the latest psycopg library, psycopg3. This moves us to the latest and greatest tech when it comes to running postgres queries via python. a75ea25
What does this PR do?
This PR upgrades the postgres integration to use the latest
psycopg
library, psycopg3. This moves us to the latest and greatest tech when it comes to running postgres queries via python.Motivation
psycopg2
that is 3 years old.libpq
under the hood.We were able to re-produce this in our own cluster, and saw that the place we were leaking from was in native memory
... after doing some investigative work, we were able to narrow down the source of the leak to likely be coming from
libpq
& it seemingly was triggered when manypsycopg2
exceptions were being thrown.. as you can see here:Bumps in memory & OOMs seem to line up with spikes in
psycopg2
exceptions being thrownFrom running the datadog native profiler, we could see memory growing overtime in
libpq
code pathSince
libpq
ships with thepyscopg2-binary
, we were un able to upgrade it without upgradingpyscopg2
. After upgrading topyscopg3
, we saw memory stabilize for our test agentThe next step is test across all of our clusters & with customers to confirm that this indeed solves the issue.
Other Fixes / Improvements
psycopg.OperationalError: consuming input failed: connection pointer is NULL
errors from being thrown & in some cases this resulted in a segfault being thrown from the underlying C code. Prior to this commit, the following could happen:On
cancel
the job loop threads would shut downThis would call this shutdown hook that would close all db connections... from one of the job loop threads
Meanwhile, the check could still be running using the main db connection from the pool & see its connection closed while it's trying to finish running all of it's queries
When testing this code against hosts where
cancel
is called frequently (could happen to dbs that are not properly configured for the agent), we see null pointer connection exceptions / segfaults as a result.In order to fix this we must:
IPv4Address
to our custom implementationdbstrict
orignore_databases
is set in the users config. Without this, I saw manySyntaxError
s being thrown by psycopg, which I believe were previously swallowedReview checklist (to be filled by reviewers)
changelog/
andintegration/
labels attachedqa/skip-qa
label.