From 362cb00978e4764559ef476c8e270bfebc27bee8 Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Tue, 10 Sep 2024 12:11:39 +0200 Subject: [PATCH 1/4] Adds notifies processing in pq_commit --- psycopg/pqpath.c | 1 + tests/test_notify.py | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 83ab91f20..7b5342b9d 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -412,6 +412,7 @@ pq_commit(connectionObject *conn) } Py_BLOCK_THREADS; + conn_notifies_process(conn); conn_notice_process(conn); Py_UNBLOCK_THREADS; diff --git a/tests/test_notify.py b/tests/test_notify.py index 03ab4cde2..27bcba611 100755 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -126,6 +126,18 @@ def test_notifies_received_on_execute(self): self.assertEqual(pid, self.conn.notifies[0][0]) self.assertEqual('foo', self.conn.notifies[0][1]) + @slow + def test_notifies_received_on_commit(self): + self.listen("foo") + self.conn.commit() + self.conn.cursor().execute("select 1;") + pid = int(self.notify("foo").communicate()[0]) + self.assertEqual(0, len(self.conn.notifies)) + self.conn.commit() + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual(pid, self.conn.notifies[0][0]) + self.assertEqual("foo", self.conn.notifies[0][1]) + @slow def test_notify_object(self): self.autocommit(self.conn) From 282360dd048eea5c7f3021dd9eb542f97c753c9a Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 11 Sep 2024 16:29:22 +0200 Subject: [PATCH 2/4] adds notifications processing after every PQexec --- psycopg/connection_int.c | 10 ++++++ psycopg/pqpath.c | 2 ++ tests/test_notify.py | 67 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 73 insertions(+), 6 deletions(-) diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 8365e048b..0584b390f 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -1344,6 +1344,11 @@ conn_set_session(connectionObject *self, int autocommit, } } + Py_BLOCK_THREADS; + conn_notifies_process(self); + conn_notice_process(self); + Py_UNBLOCK_THREADS; + if (autocommit != SRV_STATE_UNCHANGED) { self->autocommit = autocommit; } @@ -1408,6 +1413,11 @@ conn_set_client_encoding(connectionObject *self, const char *pgenc) goto endlock; } + Py_BLOCK_THREADS; + conn_notifies_process(self); + conn_notice_process(self); + Py_UNBLOCK_THREADS; + endlock: pthread_mutex_unlock(&self->lock); Py_END_ALLOW_THREADS; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 7b5342b9d..59c4070d4 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -469,6 +469,7 @@ pq_abort(connectionObject *conn) retvalue = pq_abort_locked(conn, &_save); Py_BLOCK_THREADS; + conn_notifies_process(conn); conn_notice_process(conn); Py_UNBLOCK_THREADS; @@ -539,6 +540,7 @@ pq_reset(connectionObject *conn) Py_BLOCK_THREADS; conn_notice_process(conn); + conn_notifies_process(conn); Py_UNBLOCK_THREADS; pthread_mutex_unlock(&conn->lock); diff --git a/tests/test_notify.py b/tests/test_notify.py index 27bcba611..e3bbccd0c 100755 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -23,13 +23,14 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +import os import unittest from collections import deque import psycopg2 from psycopg2 import extensions from psycopg2.extensions import Notify -from .testutils import ConnectingTestCase, skip_if_crdb, slow +from .testutils import ConnectingTestCase, skip_if_crdb, skip_if_windows, slow from .testconfig import dsn import sys @@ -74,7 +75,9 @@ def notify(self, name, sec=0, payload=None): module=psycopg2.__name__, dsn=dsn, sec=sec, name=name, payload=payload)) - return Popen([sys.executable, '-c', script], stdout=PIPE) + env = os.environ.copy() + env.pop("PSYCOPG_DEBUG", None) + return Popen([sys.executable, '-c', script], stdout=PIPE, env=env) @slow def test_notifies_received_on_poll(self): @@ -127,16 +130,68 @@ def test_notifies_received_on_execute(self): self.assertEqual('foo', self.conn.notifies[0][1]) @slow + @skip_if_windows def test_notifies_received_on_commit(self): - self.listen("foo") + self.listen('foo') self.conn.commit() - self.conn.cursor().execute("select 1;") - pid = int(self.notify("foo").communicate()[0]) + self.conn.cursor().execute('select 1;') + pid = int(self.notify('foo').communicate()[0]) self.assertEqual(0, len(self.conn.notifies)) self.conn.commit() self.assertEqual(1, len(self.conn.notifies)) self.assertEqual(pid, self.conn.notifies[0][0]) - self.assertEqual("foo", self.conn.notifies[0][1]) + self.assertEqual('foo', self.conn.notifies[0][1]) + + @slow + @skip_if_windows + def test_notifies_received_on_rollback(self): + self.listen('foo') + self.conn.commit() + self.conn.cursor().execute('select 1;') + pid = int(self.notify('foo').communicate()[0]) + self.assertEqual(0, len(self.conn.notifies)) + self.conn.rollback() + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual(pid, self.conn.notifies[0][0]) + self.assertEqual('foo', self.conn.notifies[0][1]) + + @slow + @skip_if_windows + def test_notifies_received_on_reset(self): + self.listen('foo') + self.conn.commit() + pid = int(self.notify('foo').communicate()[0]) + self.assertEqual(0, len(self.conn.notifies)) + self.conn.reset() + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual(pid, self.conn.notifies[0][0]) + self.assertEqual('foo', self.conn.notifies[0][1]) + + @slow + @skip_if_windows + def test_notifies_received_on_set_session(self): + self.listen('foo') + self.conn.commit() + pid = int(self.notify('foo').communicate()[0]) + self.assertEqual(0, len(self.conn.notifies)) + self.conn.set_session(autocommit=True, readonly=True) + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual(pid, self.conn.notifies[0][0]) + self.assertEqual('foo', self.conn.notifies[0][1]) + + @slow + @skip_if_windows + def test_notifies_received_on_set_client_encoding(self): + self.listen('foo') + self.conn.commit() + pid = int(self.notify('foo').communicate()[0]) + self.assertEqual(0, len(self.conn.notifies)) + self.conn.set_client_encoding( + 'LATIN1' if self.conn.encoding != 'LATIN1' else 'UTF8' + ) + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual(pid, self.conn.notifies[0][0]) + self.assertEqual('foo', self.conn.notifies[0][1]) @slow def test_notify_object(self): From cba6d39be0f3db0b32db7f64ceb70aab972016fe Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 11 Sep 2024 17:12:41 +0200 Subject: [PATCH 3/4] removes duplication in tests --- tests/test_notify.py | 61 ++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 39 deletions(-) diff --git a/tests/test_notify.py b/tests/test_notify.py index e3bbccd0c..873a419b9 100755 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -26,6 +26,7 @@ import os import unittest from collections import deque +from functools import partial import psycopg2 from psycopg2 import extensions @@ -129,69 +130,51 @@ def test_notifies_received_on_execute(self): self.assertEqual(pid, self.conn.notifies[0][0]) self.assertEqual('foo', self.conn.notifies[0][1]) - @slow - @skip_if_windows - def test_notifies_received_on_commit(self): + def _test_notifies_received_on_operation(self, operation, execute_query=True): self.listen('foo') self.conn.commit() - self.conn.cursor().execute('select 1;') + if execute_query: + self.conn.cursor().execute('select 1;') pid = int(self.notify('foo').communicate()[0]) self.assertEqual(0, len(self.conn.notifies)) - self.conn.commit() + operation() self.assertEqual(1, len(self.conn.notifies)) self.assertEqual(pid, self.conn.notifies[0][0]) self.assertEqual('foo', self.conn.notifies[0][1]) + @slow + @skip_if_windows + def test_notifies_received_on_commit(self): + self._test_notifies_received_on_operation(self.conn.commit) + @slow @skip_if_windows def test_notifies_received_on_rollback(self): - self.listen('foo') - self.conn.commit() - self.conn.cursor().execute('select 1;') - pid = int(self.notify('foo').communicate()[0]) - self.assertEqual(0, len(self.conn.notifies)) - self.conn.rollback() - self.assertEqual(1, len(self.conn.notifies)) - self.assertEqual(pid, self.conn.notifies[0][0]) - self.assertEqual('foo', self.conn.notifies[0][1]) + self._test_notifies_received_on_operation(self.conn.rollback) @slow @skip_if_windows def test_notifies_received_on_reset(self): - self.listen('foo') - self.conn.commit() - pid = int(self.notify('foo').communicate()[0]) - self.assertEqual(0, len(self.conn.notifies)) - self.conn.reset() - self.assertEqual(1, len(self.conn.notifies)) - self.assertEqual(pid, self.conn.notifies[0][0]) - self.assertEqual('foo', self.conn.notifies[0][1]) + self._test_notifies_received_on_operation(self.conn.reset, execute_query=False) @slow @skip_if_windows def test_notifies_received_on_set_session(self): - self.listen('foo') - self.conn.commit() - pid = int(self.notify('foo').communicate()[0]) - self.assertEqual(0, len(self.conn.notifies)) - self.conn.set_session(autocommit=True, readonly=True) - self.assertEqual(1, len(self.conn.notifies)) - self.assertEqual(pid, self.conn.notifies[0][0]) - self.assertEqual('foo', self.conn.notifies[0][1]) + self._test_notifies_received_on_operation( + partial(self.conn.set_session, autocommit=True, readonly=True), + execute_query=False, + ) @slow @skip_if_windows def test_notifies_received_on_set_client_encoding(self): - self.listen('foo') - self.conn.commit() - pid = int(self.notify('foo').communicate()[0]) - self.assertEqual(0, len(self.conn.notifies)) - self.conn.set_client_encoding( - 'LATIN1' if self.conn.encoding != 'LATIN1' else 'UTF8' + self._test_notifies_received_on_operation( + partial( + self.conn.set_client_encoding, + 'LATIN1' if self.conn.encoding != 'LATIN1' else 'UTF8' + ), + execute_query=False, ) - self.assertEqual(1, len(self.conn.notifies)) - self.assertEqual(pid, self.conn.notifies[0][0]) - self.assertEqual('foo', self.conn.notifies[0][1]) @slow def test_notify_object(self): From f64dd397fd606c3edc0aacc5c25a7da846e270b8 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Fri, 11 Oct 2024 00:29:28 +0200 Subject: [PATCH 4/4] docs: add news entry about notifications on commit --- NEWS | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/NEWS b/NEWS index 4940a91d7..78dc38dcb 100644 --- a/NEWS +++ b/NEWS @@ -1,13 +1,17 @@ -Current release +Future releases --------------- What's new in psycopg 2.9.10 (unreleased) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - Add support for Python 3.13. +- Receive notifications on commit (:ticket:`#1728`). - Drop support for Python 3.7. +Current release +--------------- + What's new in psycopg 2.9.9 ^^^^^^^^^^^^^^^^^^^^^^^^^^^