Skip to content

Commit

Permalink
add query pool
Browse files Browse the repository at this point in the history
  • Loading branch information
lonvia committed Jul 10, 2024
1 parent ac4f7d5 commit f2540f3
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions src/nominatim_db/db/query_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# SPDX-License-Identifier: GPL-3.0-or-later
#
# This file is part of Nominatim. (https://nominatim.org)
#
# Copyright (C) 2024 by the Nominatim developer community.
# For a full list of authors see the git log.
"""
A connection pool that executes incoming queries in parallel.
"""
from typing import Any, Tuple, Optional
import asyncio
import logging
import time

import psycopg

LOG = logging.getLogger()

QueueItem = Optional[Tuple[psycopg.abc.Query, Any]]

class QueryPool:

def __init__(self, dsn: str, pool_size: int = 1, **conn_args: Any) -> None:
self.wait_time = 0.0
self.query_queue: 'asyncio.Queue[QueueItem]' = asyncio.Queue()

self.pool = [asyncio.create_task(self._worker_loop(dsn, **conn_args))
for _ in range(pool_size)]


async def put_query(self, query: psycopg.abc.Query, params: Any) -> None:
tstart = time.time()
await self.query_queue.put((query, params))
self.wait_time += time.time() - tstart


async def finish(self) -> None:
for _ in self.pool:
await self.query_queue.put(None)

tstart = time.time()
await asyncio.wait(self.pool)
self.wait_time += time.time() - tstart


async def _worker_loop(self, dsn: str, **conn_args: Any) -> None:
aconn = await psycopg.AsyncConnection.connect(dsn, **conn_args)
async with aconn:
async with aconn.cursor() as cur:
item = await self.query_queue.get()
while item is not None:
try:
if item[1] is None:
await cur.execute(item[0])
else:
await cur.execute(item[0], item[1])

item = await self.query_queue.get()
except psycopg.errors.DeadlockDetected:
assert item is not None
LOG.info("Deadlock detected (sql = %s, params = %s), retry.",
str(item[0]), str(item[1]))
# item is still valid here, causing a retry


async def __aenter__(self) -> 'QueryPool':
return self


async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
await self.finish()

0 comments on commit f2540f3

Please sign in to comment.