Skip to content

Commit

Permalink
Fix coroutine wrapper to await after exception (#7785)
Browse files Browse the repository at this point in the history
This PR fixes #7117 and similar issues. Short explanation - our
coroutine wrapper does not properly handle the exception, which breaks
coroutine handling. As you can see, any task expects results from
`throw` -
https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L303
but it seems like in aiohttp it was acidently removed by this commit
stalkerg@f04ecc2#diff-f334e752b4894ef951105572ab8b195aeb8db90eb6e48b1dfbd9a01da4c854f5L842

This is repro a case without aiohttp:
```python
import ssl
import collections


class TestCoroWrapper(collections.abc.Coroutine):
    __slots__ = ("_coro", "_resp")
    def __init__(self, coro):
        self._coro = coro

    def __getattr__(self, attr):
        return getattr(self._coro, attr)

    def send(self, arg):
        return self._coro.send(arg)

    def throw(self, arg):
        self._coro.throw(arg)

    def close(self):
        return self._coro.close()

    def __await__(self):
        ret = self._coro.__await__()
        return ret

async def ssl_call(context):
    loop = asyncio.get_event_loop()
    return await loop.create_connection(
        lambda: asyncio.Protocol(),
        '2404:6800:4004:824::2004',
        443,
        ssl=context,
        family=socket.AddressFamily.AF_INET6,
        proto=6,
        flags=socket.AddressInfo.AI_NUMERICHOST | socket.AddressInfo.AI_NUMERICSERV,
        server_hostname='www.google.com',
        local_addr=None
    )

async def prepare_call():
    context = ssl.create_default_context()
    try:
        connection = await ssl_call(context)
    except ssl.SSLError as e:
        print(f"Got exception1: {e}")
        raise e

    return connection

async def my_task():
    try:
        await prepare_call()
    except Exception as e:
        print(f"Got exception2: {e}")

    await asyncio.sleep(1)
    raise Exception("test")

async def main():
    my_coro = TestCoroWrapper(my_task())
    print(f"is coro? {asyncio.iscoroutine(my_coro)}")
    task = asyncio.create_task(my_coro)
    await task

asyncio.run(main())
```

The `TestCoroWrapper` here is equivalent of
`_BaseRequestContextManager`. If you run such code like:
`SSL_CERT_FILE=/dev/null SSL_CERT_DIR=/dev/null python test.py` you will
get an error: `await wasn't used with future`.
The main idea here is that you are trying to await the sleep function
after getting and catching an exception from the native (SSL) module.

Now I should explain why repro with aiohttp for some users return the
same error:
```python
import asyncio
import aiohttp

async def main():
    async with aiohttp.ClientSession() as session:
        try:
            response = await asyncio.ensure_future(session.get('https://www.google.com/'))
            print(await response.text())
        finally:
            response.release()

asyncio.run(main())
```

here it's happened because in `TCPConnector._create_direct_connection`
we are getting all IPs for the given host and trying to connect one by
one. If the first connection gets an error we will catch this error and
try again for the next IP. If you have IPv6 you will have at least 2 IPs
here (ipv6 and ipv4), and after the first error, you will try to connect
to a second IP and get the same error.

Why it's problem only for `asyncio.ensure_future`? Because
`asyncio.ensure_future` creates a `task` such a task starts processing
coroutines and directly communicates with our coroutine wrapper witch
not return a result for `throw`.

I will write changelog and etc after people validate this PR. But
honestly, I don't think I can write a unit test for a such case.

Regards,

## Checklist

- [ ] I think the code is well written
- [ ] Unit tests for the changes exist
- [ ] Documentation reflects the changes
- [ ] If you provide code modification, please add yourself to
`CONTRIBUTORS.txt`
  * The format is <Name> <Surname>.
  * Please keep alphabetical order, the file is sorted by names.
- [ ] Add a new news fragment into the `CHANGES` folder
  * name it `<issue_id>.<type>` for example (588.bugfix)
* if you don't have an `issue_id` change it to the pr id after creating
the pr
  * ensure type is one of the following:
    * `.feature`: Signifying a new feature.
    * `.bugfix`: Signifying a bug fix.
    * `.doc`: Signifying a documentation improvement.
    * `.removal`: Signifying a deprecation or removal of public API.
* `.misc`: A ticket has been closed, but it is not of interest to users.
* Make sure to use full sentences with correct case and punctuation, for
example: "Fix issue with non-ascii contents in doctest text files."

---------

Co-authored-by: Sam Bull <[email protected]>
Co-authored-by: Sam Bull <[email protected]>
  • Loading branch information
3 people authored Nov 3, 2023
1 parent 96de6eb commit a57dc31
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES/7785.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a rare `RuntimeError: await wasn't used with future` exception -- by :user:`stalkerg`
4 changes: 2 additions & 2 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1108,8 +1108,8 @@ def __init__(self, coro: Coroutine["asyncio.Future[Any]", None, _RetType]) -> No
def send(self, arg: None) -> "asyncio.Future[Any]":
return self._coro.send(arg)

def throw(self, arg: BaseException) -> None: # type: ignore[override]
self._coro.throw(arg) # type: ignore[unused-awaitable]
def throw(self, *args: Any, **kwargs: Any) -> "asyncio.Future[Any]":
return self._coro.throw(*args, **kwargs)

def close(self) -> None:
return self._coro.close()
Expand Down

0 comments on commit a57dc31

Please sign in to comment.