Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort other fetches when resolution fails #111

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions micropip/_compat_in_pyodide.py
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized that the original implementation can only abort in await pyfetch(...) stage, but we need to abort await response.bytes() too, so I changed the approach.

Now I used a decorator to dependency-inject an AbortSignal into the decorated function, and pass that to the call to pyfetch. After decorating, the signature of them are the same as before. But maybe this needs a re-review.

Copy link
Member Author

@CNSeniorious000 CNSeniorious000 Jun 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I still changed fetch_bytes and fetch_string_and_headers. Maybe you think that isn't elegant.

In fact, through a more hacking way, it is possible to only decorate it, without dependency injection, and no longer need to pass signal=signal in pyfetch themselves, which enables this decorater to be used elsewhere too. (Maybe after #112, there would be more resolution/downloading implementations, and they can simply decorate their fetching function with this decorator to ensure the aborting-when-cancelled behavior)

In the _abort_on_cancel decorator, replace the input function's __locals__ with a ChainMap. In that context, insert a _signal into that namespace, and replace the pyfetch in that namespace to partial(pyfetch, signal=_signal). Then we can simplify the patching code:

@_abort_on_cancel
async def fetch_string_and_headers(
-   signal: AbortSignal, url: str, kwargs: dict[str, str]
+   url: str, kwargs: dict[str, str]
) -> tuple[str, dict[str, str]]:
-   response = await pyfetch(url, **kwargs, signal=signal)
+   response = await pyfetch(url, **kwargs)
    ...

Potential downsides:

  1. the code may become a little confusing
  2. if the user don't use the name pyfetch, but using other names, it won't be patched as partial(pyfetch, signal=_signal
  3. ChainMap and partial may have small runtime overhead

Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from asyncio import CancelledError
from pathlib import Path
from urllib.parse import urlparse

Expand All @@ -7,7 +8,7 @@

try:
import pyodide_js
from js import Object
from js import AbortController, Object
from pyodide_js import loadedPackages, loadPackage
from pyodide_js._api import ( # type: ignore[import]
loadBinaryFile,
Expand All @@ -21,6 +22,24 @@
raise
# Otherwise, this is pytest test collection so let it go.

if IN_BROWSER:

async def _pyfetch(url: str, **kwargs):
if "signal" in kwargs:
return await pyfetch(url, **kwargs)

controller = AbortController.new()
kwargs["signal"] = controller.signal

try:
return await pyfetch(url, **kwargs)
except CancelledError:
controller.abort()
raise
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to implement this into pyfetch directly in a followup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So should I remove changes in this file and open another PR on pyfetch in pyodide/pyodide?

Do you think this feature should provide a parameter for user to opt-out this behavior? Maybe adding a kw-only parameter like abort_on_cancel=True?

Copy link
Member

@hoodmane hoodmane Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, let's add it here and also upstream. micropip wants to support some older versions of Pyodide so it can't use the upstreamed functionality right away.

Maybe adding a kw-only parameter like abort_on_cancel=True?

Nah it feels like we're switching from a less correct behavior to a more correct one with this. Let's not overcomplicate the API by forcing people to opt into what they expect.


else:
_pyfetch = pyfetch


async def fetch_bytes(url: str, kwargs: dict[str, str]) -> bytes:
parsed_url = urlparse(url)
Expand All @@ -29,13 +48,13 @@ async def fetch_bytes(url: str, kwargs: dict[str, str]) -> bytes:
if parsed_url.scheme == "file":
return (await loadBinaryFile(parsed_url.path)).to_bytes()

return await (await pyfetch(url, **kwargs)).bytes()
return await (await _pyfetch(url, **kwargs)).bytes()


async def fetch_string_and_headers(
url: str, kwargs: dict[str, str]
) -> tuple[str, dict[str, str]]:
response = await pyfetch(url, **kwargs)
response = await _pyfetch(url, **kwargs)

content = await response.string()
# TODO: replace with response.headers when pyodide>= 0.24 is released
Expand Down
12 changes: 11 additions & 1 deletion micropip/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,17 @@ async def gather_requirements(
for requirement in requirements:
requirement_promises.append(self.add_requirement(requirement))

await asyncio.gather(*requirement_promises)
futures: list[asyncio.Future] = []
try:
for coro in requirement_promises:
futures.append(asyncio.ensure_future(coro))
await asyncio.gather(*futures)
except ValueError:
if not self.keep_going:
for future in futures:
if not future.done():
future.cancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't investigated how cancellation interacts with our event loop and I'm slightly worried that it could not interact well. But if this passes the tests then we should merge.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait a second, I just find that this part seems to raises several asyncio.exceptions.InvalidStateError: invalid state exceptions

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Zac-HD any tips on how to make my event loop support cancellation? Where to look who to ask etc?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyways I'll open a separate issue about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try using a TaskGroup? Maybe that will fix our problems.
https://docs.python.org/3/library/asyncio-task.html#asyncio.TaskGroup

Copy link
Member Author

@CNSeniorious000 CNSeniorious000 Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, I've refactored this section using TaskGroup, now all the futures should be gathered in one top-level TaskGroup.

Note that this makes the traceback looks a bit different because TaskGroup would group all the exceptions into a ExceptionGroup. Here comes a little inconsistency because if specified keep_going=True, this will only raise ValueError.

And asyncio.exceptions.InvalidStateError: invalid state stays still.

Copy link
Member Author

@CNSeniorious000 CNSeniorious000 Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test fails because the raised error is not a ValueError but a ExceptionGroup containing a ValueError.

It's possible to recover the original ValueError by wrap the async with block with a try-except:

...
    try:
        async with asyncio.TaskGroup() as tg:
            self.tg = tg  # only one task group from the top level
            for requirement in requirements:
                tg.create_task(self.add_requirement(requirement))
    except ExceptionGroup as e:
        raise e.exceptions[-1] from None
...

But I am not sure whether this is the desired behavior.

Maybe it is better to name a new error like ResolutionFailedError(ValueError) to wrap both type of error.

raise

async def add_requirement(self, req: str | Requirement) -> None:
if isinstance(req, Requirement):
Expand Down
Loading