Skip to content

Commit

Permalink
fixup! Backport changes
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksandr-shtaub committed Nov 28, 2023
1 parent 005c7f2 commit 8ebdfc0
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 48 deletions.
6 changes: 6 additions & 0 deletions procrastinate/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ def __init__(self, *args, constraint_name: str):
self.constraint_name = constraint_name


class NoResult(ConnectorException):
"""
No result was returned by the database query.
"""


class MissingApp(ProcrastinateException):
"""
Missing app. This most probably happened because procrastinate needs an
Expand Down
119 changes: 74 additions & 45 deletions procrastinate/psycopg3_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@
import functools
import logging
import re
from typing import Any, Callable, Coroutine, Dict, Iterable, List, Optional
from typing import (
Any,
Callable,
Coroutine,
Dict,
Iterable,
List,
LiteralString,
Optional,
)

import psycopg
import psycopg.errors
Expand Down Expand Up @@ -89,31 +98,35 @@ def __init__(
**kwargs: Any,
):
"""
Asynchronous connector based on a ``psycopg_pool.AsyncConnectionPool``.
Create a PostgreSQL connector using psycopg. The connector uses an
``psycopg_pool.AsyncConnectionPool``, which is created internally, or
set into the connector by calling `App.open_async`.
The pool connection parameters can be provided here. Alternatively, an already
existing ``psycopg_pool.AsyncConnectionPool`` can be provided in the
``App.open_async``, via the ``pool`` parameter.
Note that if you want to use a ``psycopg_pool.AsyncNullConnectionPool``,
you will need to initialize it yourself and pass it to the connector
through the ``App.open_async`` method.
All other arguments than ``json_dumps`` and ``json_loads`` are passed to
:py:func:`AsyncConnectionPool` (see psycopg3 documentation__), with default
values that may differ from those of ``psycopg3`` (see a partial list of
parameters below).
All other arguments than ``json_dumps`` and ``json_loads`` are passed
to ``psycopg_pool.AsyncConnectionPool`` (see psycopg documentation__).
``json_dumps`` and ``json_loads`` are used to configure new connections
created by the pool with ``psycopg.types.json.set_json_dumps`` and
``psycopg.types.json.set_json_loads``.
.. _psycopg3 doc: https://www.psycopg.org/psycopg3/docs/basic/adapt.html#json-adaptation
.. __: https://www.psycopg.org/psycopg3/docs/api/pool.html
#psycopg_pool.AsyncConnectionPool
Parameters
----------
json_dumps :
The JSON dumps function to use for serializing job arguments. Defaults to
the function used by psycopg3. See the `psycopg3 doc`_.
A function to serialize JSON objects to a string. If not provided,
JSON objects will be serialized using psycopg's default JSON
serializer.
json_loads :
The JSON loads function to use for deserializing job arguments. Defaults
to the function used by psycopg3. See the `psycopg3 doc`_. Unused if the
pool is externally created and set into the connector through the
``App.open_async`` method.
A function to deserialize JSON objects from a string. If not
provided, JSON objects will be deserialized using psycopg's default
JSON deserializer.
min_size : int
Passed to psycopg3, default set to 1 (same as aiopg).
max_size : int
Expand All @@ -123,15 +136,17 @@ def __init__(
argument is passed, it will connect to localhost:5432 instead of a
Unix-domain local socket file.
"""
self.json_dumps = json_dumps
self.json_loads = json_loads
self._pool: Optional[psycopg_pool.AsyncConnectionPool] = None
self._pool_args = self._adapt_pool_args(kwargs, json_loads)
self.json_dumps = json_dumps
self._pool_externally_set = False
self._pool_args = self._adapt_pool_args(kwargs, json_loads, json_dumps)
self.json_loads = json_loads

@staticmethod
def _adapt_pool_args(
pool_args: Dict[str, Any], json_loads: Optional[Callable]
pool_args: Dict[str, Any],
json_loads: Optional[Callable],
json_dumps: Optional[Callable],
) -> Dict[str, Any]:
"""
Adapt the pool args for ``psycopg3``, using sensible defaults for Procrastinate.
Expand All @@ -142,9 +157,13 @@ def _adapt_pool_args(
async def configure(connection: psycopg.AsyncConnection[DictRow]):
if base_configure:
await base_configure(connection)

if json_loads:
psycopg.types.json.set_json_loads(json_loads, connection)

if json_dumps:
psycopg.types.json.set_json_dumps(json_dumps, connection)

return {
"conninfo": "",
"min_size": 1,
Expand All @@ -153,10 +172,17 @@ async def configure(connection: psycopg.AsyncConnection[DictRow]):
"row_factory": dict_row,
},
"configure": configure,
"open": False,
**pool_args,
}

@property
def pool(
self,
) -> psycopg_pool.AsyncConnectionPool[psycopg.AsyncConnection[DictRow]]:
if self._pool is None: # Set by open_async
raise exceptions.AppNotOpen
return self._pool

async def open_async(
self, pool: Optional[psycopg_pool.AsyncConnectionPool] = None
) -> None:
Expand All @@ -176,15 +202,23 @@ async def open_async(
else:
self._pool = await self._create_pool(self._pool_args)

# ensure pool is open
await self._pool.open() # type: ignore
await self._pool.open(wait=True) # type: ignore

@staticmethod
@wrap_exceptions
async def _create_pool(
pool_args: Dict[str, Any]
) -> psycopg_pool.AsyncConnectionPool:
return psycopg_pool.AsyncConnectionPool(**pool_args)
return psycopg_pool.AsyncConnectionPool(
**pool_args,
# Not specifying open=False raises a warning and will be deprecated.
# It makes sense, as we can't really make async I/Os in a constructor.
open=False,
# Enables a check that will ensure the connections returned when
# using the pool are still alive. If they have been closed by the
# database, they will be seamlessly replaced by a new connection.
check=psycopg_pool.AsyncConnectionPool.check_connection,
)

@wrap_exceptions
async def close_async(self) -> None:
Expand All @@ -197,43 +231,37 @@ async def close_async(self) -> None:
await self._pool.close()
self._pool = None

@property
def pool(
self,
) -> psycopg_pool.AsyncConnectionPool[psycopg.AsyncConnection[DictRow]]:
if self._pool is None: # Set by open
raise exceptions.AppNotOpen
return self._pool

def _wrap_json(self, arguments: Dict[str, Any]):
return {
key: psycopg.types.json.Jsonb(value, dumps=self.json_dumps)
if isinstance(value, dict)
else value
key: psycopg.types.json.Jsonb(value) if isinstance(value, dict) else value
for key, value in arguments.items()
}

@wrap_exceptions
@wrap_query_exceptions
async def execute_query_async(self, query: str, **arguments: Any) -> None:
async def execute_query_async(self, query: LiteralString, **arguments: Any) -> None:
async with self.pool.connection() as connection:
async with connection.cursor() as cursor:
await cursor.execute(query, self._wrap_json(arguments))
await connection.execute(query, self._wrap_json(arguments))

@wrap_exceptions
@wrap_query_exceptions
async def execute_query_one_async(
self, query: str, **arguments: Any
) -> Optional[DictRow]:
self, query: LiteralString, **arguments: Any
) -> DictRow:
async with self.pool.connection() as connection:
async with connection.cursor() as cursor:
await cursor.execute(query, self._wrap_json(arguments))
return await cursor.fetchone()

result = await cursor.fetchone()

if result is None:
raise exceptions.NoResult
return result

@wrap_exceptions
@wrap_query_exceptions
async def execute_query_all_async(
self, query: str, **arguments: Any
self, query: LiteralString, **arguments: Any
) -> List[DictRow]:
async with self.pool.connection() as connection:
async with connection.cursor() as cursor:
Expand Down Expand Up @@ -266,9 +294,8 @@ async def listen_notify(
channel_name=psycopg.sql.Identifier(channel_name)
)
await connection.execute(query)

# Initial set() lets caller know that we're ready to listen
event.set()

await self._loop_notify(event=event, connection=connection)

@wrap_exceptions
Expand All @@ -277,6 +304,8 @@ async def _loop_notify(
event: asyncio.Event,
connection: psycopg.AsyncConnection,
) -> None:
# We'll leave this loop with a CancelledError, when we get cancelled

while True:
if connection.closed:
return
Expand All @@ -285,7 +314,7 @@ async def _loop_notify(
async for _ in notifies:
event.set()
except psycopg.OperationalError:
continue
break

def __del__(self):
pass
2 changes: 1 addition & 1 deletion tests/integration/test_psycopg3_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async def configure(connection):
called.append(connection)

args = psycopg3_connector.Psycopg3Connector._adapt_pool_args(
pool_args={"configure": configure}, json_loads=None
pool_args={"configure": configure}, json_loads=None, json_dumps=None
)

assert args["configure"] is not configure
Expand Down
3 changes: 1 addition & 2 deletions tests/unit/test_psycopg3_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def configure(connection):
called.append(connection)

args = psycopg3_connector.Psycopg3Connector._adapt_pool_args(
pool_args={"configure": configure}, json_loads=None
pool_args={"configure": configure}, json_loads=None, json_dumps=None
)

assert args["configure"] is not configure
Expand Down Expand Up @@ -158,7 +158,6 @@ async def test_open_async_pool_argument_specified(mocker, connector):

assert connector._pool_externally_set is True
assert connector._create_pool.call_count == 0
assert connector._pool.open.await_count == 1
assert connector._pool == pool


Expand Down

0 comments on commit 8ebdfc0

Please sign in to comment.