Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow background tasks to run with custom BaseHTTPMiddleware's #1441

Closed
wants to merge 9 commits into from
17 changes: 12 additions & 5 deletions starlette/middleware/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
await self.app(scope, receive, send)
return

call_next_response = None
send_stream, recv_stream = anyio.create_memory_object_stream()

async def call_next(request: Request) -> Response:
app_exc: typing.Optional[Exception] = None
send_stream, recv_stream = anyio.create_memory_object_stream()

async def coro() -> None:
nonlocal app_exc
Expand Down Expand Up @@ -61,17 +63,22 @@ async def body_stream() -> typing.AsyncGenerator[bytes, None]:
if app_exc is not None:
raise app_exc

response = StreamingResponse(
nonlocal call_next_response

call_next_response = StreamingResponse(
status_code=message["status"], content=body_stream()
)
response.raw_headers = message["headers"]
return response
call_next_response.raw_headers = message["headers"]
return call_next_response

async with anyio.create_task_group() as task_group:
request = Request(scope, receive=receive)
response = await self.dispatch_func(request, call_next)
if call_next_response and response is not call_next_response:
async with recv_stream:
async for _ in recv_stream:
... # pragma: no cover
Comment on lines +78 to +80
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can create a function that performs this logic, if wanted.

await response(scope, receive, send)
Comment on lines +77 to 81
Copy link
Member

@jhominal jhominal Jan 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that we will wait for app to make all of its calls to send_stream.send before actually processing the response returned by dispatch_func. If that is the case, and given that all the messages from app are being discarded, I think it would be preferable, either to call await response(scope, receive, send) before draining recv_stream, or to put draining of recv_stream in another task of task_group.

Of course, I do not know the relevant technologies (ASGI/the server implementations/Starlette) as well as you do, so there could very well be a reason that either my analysis is wrong, or that my suggestions are unworkable.

Suggested change
if call_next_response and response is not call_next_response:
async with recv_stream:
async for _ in recv_stream:
... # pragma: no cover
await response(scope, receive, send)
if call_next_response and response is not call_next_response:
async def drain_stream():
async with recv_stream:
async for _ in recv_stream:
... # pragma: no cover
task_group.start_soon(drain_stream)
await response(scope, receive, send)

task_group.cancel_scope.cancel()

async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
Expand Down