From bce152cf6a75cd2dc0084ff82ad0f536185e1eb2 Mon Sep 17 00:00:00 2001 From: Madhu Kanoor Date: Thu, 29 Aug 2024 15:03:33 -0400 Subject: [PATCH] fix: raise OperationalError and Encoding Errors (#293) Log and raise the OperationalError and EncodingErrors --- .../eda/plugins/event_source/pg_listener.py | 4 +- tests/unit/event_source/test_pg_listener.py | 64 +++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/extensions/eda/plugins/event_source/pg_listener.py b/extensions/eda/plugins/event_source/pg_listener.py index dab6931b..6cb2bb81 100644 --- a/extensions/eda/plugins/event_source/pg_listener.py +++ b/extensions/eda/plugins/event_source/pg_listener.py @@ -112,9 +112,11 @@ async def main(queue: asyncio.Queue[Any], args: dict[str, Any]) -> None: else: await queue.put(data) except json.decoder.JSONDecodeError: - LOGGER.exception("Error decoding data, ignoring it") + LOGGER.exception("Error decoding data") + raise except OperationalError: LOGGER.exception("PG Listen operational error") + raise async def _handle_chunked_message( diff --git a/tests/unit/event_source/test_pg_listener.py b/tests/unit/event_source/test_pg_listener.py index 0a2bbf67..0f2ac561 100644 --- a/tests/unit/event_source/test_pg_listener.py +++ b/tests/unit/event_source/test_pg_listener.py @@ -6,6 +6,7 @@ from typing import Any from unittest.mock import AsyncMock, MagicMock, patch +import psycopg import pytest import xxhash @@ -116,3 +117,66 @@ def my_iterator() -> _AsyncIterator: for event in events: assert myqueue.queue[index] == event index += 1 + + +def test_decoding_error() -> None: + """Test json parsing error""" + notify_payload: list[str] = ['{"a"; "b"}'] + myqueue = _MockQueue() + + def my_iterator() -> _AsyncIterator: + return _AsyncIterator(notify_payload) + + with patch( + "extensions.eda.plugins.event_source.pg_listener.AsyncConnection.connect" + ) as conn: + mock_object = AsyncMock() + conn.return_value = mock_object + conn.return_value.__aenter__.return_value = mock_object + mock_object.cursor = AsyncMock + mock_object.notifies = my_iterator + + with pytest.raises(json.decoder.JSONDecodeError): + asyncio.run( + pg_listener_main( + myqueue, + { + "dsn": ( + "host=localhost dbname=mydb " + "user=postgres password=password" + ), + "channels": ["test"], + }, + ) + ) + + +def test_operational_error() -> None: + """Test json parsing error""" + notify_payload: list[str] = ['{"a": "b"}'] + myqueue = _MockQueue() + + def my_iterator() -> _AsyncIterator: + return _AsyncIterator(notify_payload) + + with patch( + "extensions.eda.plugins.event_source.pg_listener.AsyncConnection.connect" + ) as conn: + mock_object = AsyncMock() + conn.return_value = mock_object + conn.return_value.__aenter__.side_effect = psycopg.OperationalError("Kaboom") + mock_object.cursor = AsyncMock + mock_object.notifies = my_iterator + with pytest.raises(psycopg.OperationalError): + asyncio.run( + pg_listener_main( + myqueue, + { + "dsn": ( + "host=localhost dbname=mydb " + "user=postgres password=password" + ), + "channels": ["test"], + }, + ) + )