Skip to content

Commit

Permalink
fix: cancel the timeout check at the first send
Browse files Browse the repository at this point in the history
Signed-off-by: Frost Ming <[email protected]>
  • Loading branch information
frostming committed May 30, 2024
1 parent 3f488e1 commit 8635165
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions src/bentoml/_internal/server/http/traffic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from typing import TYPE_CHECKING
from typing import Any

from starlette.responses import JSONResponse

Expand All @@ -14,19 +15,39 @@ def __init__(self, app: ext.ASGIApp, timeout: float) -> None:
self.app = app
self.timeout = timeout

def _set_timer_out(self, waiter: asyncio.Future[Any]) -> None:
if not waiter.done():
waiter.set_exception(asyncio.TimeoutError)

async def __call__(
self, scope: ext.ASGIScope, receive: ext.ASGIReceive, send: ext.ASGISend
) -> None:
if scope["type"] not in ("http", "websocket"):
return await self.app(scope, receive, send)
loop = asyncio.get_running_loop()
waiter = loop.create_future()
loop.call_later(self.timeout, self._set_timer_out, waiter)

async def _send(message: ext.ASGIMessage) -> None:
if not waiter.done():
waiter.set_result(None)
await send(message)

fut = asyncio.ensure_future(self.app(scope, receive, _send), loop=loop)

try:
await asyncio.wait_for(self.app(scope, receive, send), timeout=self.timeout)
await waiter
except asyncio.TimeoutError:
resp = JSONResponse(
{"error": f"Not able to process the request in {self.timeout} seconds"},
status_code=504,
)
await resp(scope, receive, send)
if fut.cancel():
resp = JSONResponse(
{
"error": f"Not able to process the request in {self.timeout} seconds"
},
status_code=504,
)
await resp(scope, receive, send)
else:
await fut # wait for the future to finish


class MaxConcurrencyMiddleware:
Expand Down

0 comments on commit 8635165

Please sign in to comment.