Skip to content

Commit

Permalink
Elegant pool drain fix (#456)
Browse files Browse the repository at this point in the history
* connectors.py: edge-cases where Connection not put back into queue

* pools.py: convert CyclicQueuePool release() to blocking/non-async

It only does one call and that call is non-blocking

* __init__.py: always set connection to response and remake __del__

create_task() has no guarantee of executing without being bound
to a strong reference so task will be collected by the gc

* connectors.py: close the connection upon TimeoutException

We should ensure Connection is closed here before putting it back
into queue. Failure to do so will raise RuntimeError: readuntil()
called while another coroutine is waiting for data

* __init.py__: fix small indentation mistake on HttpResponse __del__

* pools.py: missing await on pool get(). Change to get_nowait()

It's missing the await keyword but if we actually await there and
there's double cleanup, method will block forever. Change to
get_nowait() so it properly raises QueueEmpty if double
cleanup is attempted.
  • Loading branch information
geraldog authored Feb 6, 2024
1 parent 992fe6b commit ef02c74
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
11 changes: 6 additions & 5 deletions aiosonic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ async def read_chunks(self) -> AsyncIterator[bytes]:

def __del__(self):
# clean it
if self.chunked and not self.chunks_readed:
loop = None
if self.connection:
loop = get_loop()
loop.create_task(self.connection.release())
if self.connection and self.connection.blocked:
if self.connection.writer:
self.connection.writer._transport.abort()
self.connection.blocked = False
self.connection.connector.pool.release(self.connection)

def _set_request_meta(self, urlparsed: ParseResult):
self.request_meta = {"from_path": urlparsed.path or "/"}
Expand Down Expand Up @@ -477,6 +477,7 @@ async def _do_request(
response._set_connection(connection)
else:
connection.keep = False
response._set_connection(connection)

return response

Expand Down
7 changes: 6 additions & 1 deletion aiosonic/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,20 @@ async def acquire(
raise ConnectionPoolAcquireTimeout()

async def after_acquire(self, urlparsed, conn, verify, ssl, timeouts, http2):
dns_info = await self.__resolve_dns(urlparsed.hostname, urlparsed.port)

try:
dns_info = await self.__resolve_dns(urlparsed.hostname, urlparsed.port)
await wait_for(
conn.connect(urlparsed, dns_info, verify, ssl, http2),
timeout=timeouts.sock_connect,
)
except TimeoutException:
conn.close()
await self.release(conn)
raise ConnectTimeout()
except BaseException as ex:
await self.release(conn)
raise ex
return conn

async def release(self, conn):
Expand Down
4 changes: 2 additions & 2 deletions aiosonic/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async def acquire(self, _urlparsed: ParseResult = None):
"""Acquire connection."""
return await self.pool.get()

async def release(self, conn):
def release(self, conn):
"""Release connection."""
return self.pool.put_nowait(conn)

Expand All @@ -32,7 +32,7 @@ def free_conns(self) -> int:
async def cleanup(self):
"""Get all conn and close them, this method let this pool unusable."""
for _ in range(self.pool_size):
conn = self.pool.get()
conn = self.pool.get_nowait()
conn.close()


Expand Down

0 comments on commit ef02c74

Please sign in to comment.