Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown statement sampler thread on cancel #8766

Merged
merged 1 commit into from
Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions postgres/datadog_checks/postgres/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ def __init__(self, name, init_config, instances):
self.statement_samples = PostgresStatementSamples(self, self._config)
self._clean_state()

def cancel(self):
self.statement_samples.cancel()

def _clean_state(self):
self._version = None
self._is_aurora = None
Expand Down
13 changes: 11 additions & 2 deletions postgres/datadog_checks/postgres/statement_samples.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import re
import threading
import time
from concurrent.futures.thread import ThreadPoolExecutor

Expand Down Expand Up @@ -63,6 +64,7 @@ def __init__(self, check, config):
self._activity_last_query_start = None
self._last_check_run = 0
self._collection_loop_future = None
self._cancel_event = threading.Event()
self._tags = None
self._tags_str = None
self._service = "postgres"
Expand Down Expand Up @@ -90,6 +92,9 @@ def __init__(self, check, config):
ttl=60 * 60 / int(self._config.statement_samples_config.get('samples_per_hour_per_query', 15)),
)

def cancel(self):
self._cancel_event.set()

def run_sampler(self, tags):
"""
start the sampler thread if not already running
Expand Down Expand Up @@ -175,6 +180,10 @@ def _collection_loop(self):
try:
self._log.info("Starting statement sampler collection loop")
while True:
if self._cancel_event.isSet():
self._log.info("Collection loop cancelled")
self._check.count("dd.postgres.statement_samples.collection_loop_cancel", 1, tags=self._tags)
break
if time.time() - self._last_check_run > self._config.min_collection_interval * 2:
self._log.info("Sampler collection loop stopping due to check inactivity")
self._check.count("dd.postgres.statement_samples.collection_loop_inactive_stop", 1, tags=self._tags)
Expand All @@ -198,9 +207,9 @@ def _collection_loop(self):
)
finally:
self._log.info("Shutting down statement sampler collection loop")
self.close()
self._close_db_conn()

def close(self):
def _close_db_conn(self):
if self._db and not self._db.closed:
try:
self._db.close()
Expand Down
14 changes: 14 additions & 0 deletions postgres/tests/test_pg_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,20 @@ def test_statement_samples_collection_loop_inactive_stop(aggregator, integration
aggregator.assert_metric("dd.postgres.statement_samples.collection_loop_inactive_stop")


def test_statement_samples_collection_loop_cancel(aggregator, integration_check, dbm_instance):
dbm_instance['statement_samples']['run_sync'] = False
check = integration_check(dbm_instance)
check._connect()
check.check(dbm_instance)
assert check.statement_samples._collection_loop_future.running(), "thread should be running"
check.cancel()
# wait for it to stop and make sure it doesn't throw any exceptions
check.statement_samples._collection_loop_future.result()
assert not check.statement_samples._collection_loop_future.running(), "thread should be stopped"
assert check.statement_samples._db is None, "db connection should be gone"
aggregator.assert_metric("dd.postgres.statement_samples.collection_loop_cancel")


def test_statement_samples_invalid_activity_view(aggregator, integration_check, dbm_instance):
dbm_instance['pg_stat_activity_view'] = "wrong_view"

Expand Down