Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pathsend handling in middlewares #2616

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion starlette/middleware/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
DispatchFunction = typing.Callable[
[Request, RequestResponseEndpoint], typing.Awaitable[Response]
]
BodyStreamGenerator = typing.AsyncGenerator[
typing.Union[bytes, typing.MutableMapping[str, typing.Any]], None
]
T = typing.TypeVar("T")


Expand Down Expand Up @@ -167,9 +170,13 @@ async def coro() -> None:

assert message["type"] == "http.response.start"

async def body_stream() -> typing.AsyncGenerator[bytes, None]:
async def body_stream() -> BodyStreamGenerator:
async with recv_stream:
async for message in recv_stream:
if message["type"] == "http.response.pathsend":
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi there! wouldn't it make sense to just read the file and stream it through here? Would be better for compatibility, no? Or maybe have some flag like "supports_message_pass_through"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the file from Python would be just the standard ASGI behaviour and defeat the whole pathsend idea (see for ref https://asgi.readthedocs.io/en/latest/extensions.html#path-send).

About the flag: I'm not sure I get the point around it. How should we evaluate such flag? pathsend messages will be returned by Starlette only if the relevant extension is present in the ASGI scope as the server supports it.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about random middlewares out there, deriving from this BaseMiddleware that do not know pathsend. To keep those running, returning the message instead of bytes could be opt-in via a flag/class property

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aersam but even if you subclass BaseMiddleware, there's no way you can interact with that code. If the middleware completely overrides __call__ then it should be compliant with ASGI messages IMHO.

# with pathsend we don't need to stream anything
yield message
break
assert message["type"] == "http.response.body"
body = message.get("body", b"")
if body:
Expand Down
5 changes: 4 additions & 1 deletion starlette/middleware/gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ async def send_with_gzip(self, message: Message) -> None:

await self.send(self.initial_message)
await self.send(message)

elif message_type == "http.response.body":
# Remaining body in streaming GZip response.
body = message.get("body", b"")
Expand All @@ -107,6 +106,10 @@ async def send_with_gzip(self, message: Message) -> None:
self.gzip_buffer.truncate()

await self.send(message)
elif message_type == "http.response.pathsend":
# Don't apply GZip to pathsend responses
await self.send(self.initial_message)
await self.send(message)


async def unattached_send(message: Message) -> typing.NoReturn:
Expand Down
13 changes: 10 additions & 3 deletions starlette/responses.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the changes here just because the StreamingResponse is used in the BaseHTTPMiddleware?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. A Better implementation would probably be to refactor BaseHTTPMiddleware, but it is something I am not confident to do by myself given my knowledge of the codebase is still limited.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not making changes to the rest of the code source to make BaseHTTPMiddleware happy.

My recommendation for Granian users, and/or any server supporting http.response.pathsend is to replace BaseHTTPMiddleware for pure ASGI middleware. I don't want us to add more complexity to that code. That piece of code had too many issues over time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your position.

I think there are 2 major themes here though:

  • StreamingResponse doesn't support any message type other than http.response.body, which make sense given the intent of streaming responses, so unless an additional body message type is added to ASGI this is future-proof. But, since BaseHTTPMiddleware depends on that, this also means any application using that middleware or a derivate middleware won't support any other ASGI message in responses, which is something you might want to change to be future-proof on ASGI spec changes.
  • As of today Starlette states (due to the changes I made in Add support for ASGI pathsend extension #2435) to support http.response.pathsend, which is only partially true: as soon as the route involves a BaseHTTPMiddleware, GZIPMiddleware or a derivate of those in the middleware chain, then Starlette doesn't actually support pathsend.

The rationale behind this PR is not to address the first point, but rather the 2nd; as today experience with pathsend supportive servers with any Starlette/FastAPI application is practically dependant on which middlewares are implemented on the route returning a file response.

So given all of the above, if we can't find a proper way to address the pathsend issue, I'd rather suggest to revert #2435 and remove pathsend support completely until we find a proper way to support that ASGI extension in all the usages of Starlette and its dependants. This would sacrifice the performance gain of pathsend but at least we'd have the same DX across all the Starlette apps, regardless of the middlewares used, and thus prevent the whole bunch of issues related to this opened in Granian repo.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can revert that one. It will be easier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I'll open a different PR later today and close this one

Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def __init__(
self.headers["location"] = quote(str(url), safe=":/%#?=@[]!$&'()*+,;")


Content = typing.Union[str, bytes, memoryview]
Content = typing.Union[str, bytes, memoryview, typing.MutableMapping[str, typing.Any]]
SyncContentStream = typing.Iterable[Content]
AsyncContentStream = typing.AsyncIterable[Content]
ContentStream = typing.Union[AsyncContentStream, SyncContentStream]
Expand Down Expand Up @@ -247,12 +247,19 @@ async def stream_response(self, send: Send) -> None:
"headers": self.raw_headers,
}
)
should_close_body = True
async for chunk in self.body_iterator:
if not isinstance(chunk, (bytes, memoryview)):
if isinstance(chunk, dict):
# We got an ASGI message which is not response body (eg: pathsend)
should_close_body = False
await send(chunk)
break
if isinstance(chunk, str):
chunk = chunk.encode(self.charset)
await send({"type": "http.response.body", "body": chunk, "more_body": True})

await send({"type": "http.response.body", "body": b"", "more_body": False})
if should_close_body:
await send({"type": "http.response.body", "body": b"", "more_body": False})

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
async with anyio.create_task_group() as task_group:
Expand Down
Loading