Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid serializing cache for file objects #1753

Merged
merged 11 commits into from
Nov 21, 2024

Conversation

jrbourbeau
Copy link
Contributor

Includes and builds on #1751 from @hendrikmakait

Closes #1747

@martindurant
Copy link
Member

Should we have some kind of test here?

@jrbourbeau jrbourbeau changed the title Avoid serializing cache for file objects [WIP] Avoid serializing cache for file objects Nov 18, 2024
Copy link
Contributor Author

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, definitely. I'll push something up

Are there any other concerns you have at the moment? Do you know of other AbstractBufferedFile subclasses our there in the wild that we need to take into consideration?

@martindurant
Copy link
Member

I think what you have is very reasonable

Copy link
Contributor Author

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll push something up

Famous last words...

Alright, so I think a test like this (this actually passes) makes sense:

def test_cache_not_pickled(tmp_path):
    fs = fsspec.filesystem("s3")
    data = b"abcdefghi"
    filepath = "s3://oss-scratch-space/jrbourbeau/test.txt"
    length = 3
    # Write file and then read it back in
    with fs.open(filepath, mode="wb") as f:
        f.write(data)
    f = fs.open(filepath, mode="rb")
    assert not f.cache.cache  # No cache initially
    assert f.read(length=length) == data[:length]
    assert f.cache.cache == data  # Cache is populated

    # Roundtrip through pickle
    import pickle

    f2 = pickle.loads(pickle.dumps(f))
    assert not f2.cache.cache  # No cache initially
    assert (
        f2.read(length=length) == data[length : 2 * length]
    )  # Read file from previous seek point
    assert f2.cache.cache == data[length:]  # Cache is populated

It's be nice to use AbstractFileSystem more directly (^ that test is using s3fs.S3File instead) but I'm having some trouble getting file writing and reading to work with AbstractFileSystem directly. Another closer option would be to use the http filesystem (also inherits from AbstractFileSystem) but the http tests aren't working for me locally at the moment. @martindurant do you have any suggestions on how to proceed? I'm still learning my way around the test suite here

@martindurant
Copy link
Member

the http tests aren't working for me locally

None of them? I would normally use HTTP as the first proxy for an async (read-only) filesystem, but any FS that use a subclass of AbstractBufferedFile would do here, and indeed ideally one that lives in this repo.

@jrbourbeau
Copy link
Contributor Author

jrbourbeau commented Nov 20, 2024

pytest fsspec/implementations/tests/test_http.py::test_list (and almost all the other http tests) gives me this error:

______________________________________________________________________________________________________________ test_list ______________________________________________________________________________________________________________

self = <aiohttp.connector.TCPConnector object at 0x105923b60>, req = <aiohttp.client_reqrep.ClientRequest object at 0x105b2c1a0>, traces = []
timeout = ClientTimeout(total=300, connect=None, sock_read=None, sock_connect=30, ceil_threshold=5)

    async def _create_direct_connection(
        self,
        req: ClientRequest,
        traces: List["Trace"],
        timeout: "ClientTimeout",
        *,
        client_error: Type[Exception] = ClientConnectorError,
    ) -> Tuple[asyncio.Transport, ResponseHandler]:
        sslcontext = self._get_ssl_context(req)
        fingerprint = self._get_fingerprint(req)

        host = req.url.raw_host
        assert host is not None
        # Replace multiple trailing dots with a single one.
        # A trailing dot is only present for fully-qualified domain names.
        # See https://github.com/aio-libs/aiohttp/pull/7364.
        if host.endswith(".."):
            host = host.rstrip(".") + "."
        port = req.port
        assert port is not None
        try:
            # Cancelling this lookup should not cancel the underlying lookup
            #  or else the cancel event will get broadcast to all the waiters
            #  across all connections.
>           hosts = await self._resolve_host(host, port, traces=traces)

../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:1345:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:989: in _resolve_host
    return await asyncio.shield(resolved_host_task)
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:1020: in _resolve_host_with_throttle
    addrs = await self._resolver.resolve(host, port, family=self._family)
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/resolver.py:36: in resolve
    infos = await self._loop.getaddrinfo(
../../../mambaforge/envs/fsspec/lib/python3.13/asyncio/base_events.py:935: in getaddrinfo
    return await self.run_in_executor(
../../../mambaforge/envs/fsspec/lib/python3.13/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

host = '173.1.168.192.in-addr.arpa', port = 50522, family = <AddressFamily.AF_UNSPEC: 0>, type = <SocketKind.SOCK_STREAM: 1>, proto = 0, flags = <AddressInfo.AI_ADDRCONFIG: 1024>

    def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
        """Resolve host and port into list of address info entries.

        Translate the host/port argument into a sequence of 5-tuples that contain
        all the necessary arguments for creating a socket connected to that service.
        host is a domain name, a string representation of an IPv4/v6 address or
        None. port is a string service name such as 'http', a numeric port number or
        None. By passing None as the value of host and port, you can pass NULL to
        the underlying C API.

        The family, type and proto arguments can be optionally specified in order to
        narrow the list of addresses returned. Passing zero as a value for each of
        these arguments selects the full range of results.
        """
        # We override this function since we want to translate the numeric family
        # and socket type values to enum constants.
        addrlist = []
>       for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
E       socket.gaierror: [Errno 8] nodename nor servname provided, or not known

../../../mambaforge/envs/fsspec/lib/python3.13/socket.py:975: gaierror

The above exception was the direct cause of the following exception:

server = namespace(address='http://173.1.168.192.in-addr.arpa:50522', realfile='http://173.1.168.192.in-addr.arpa:50522/index/realfile')

    def test_list(server):
        h = fsspec.filesystem("http")
>       out = h.glob(server.address + "/index/*")

fsspec/implementations/tests/test_http.py:19:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fsspec/asyn.py:118: in wrapper
    return sync(self.loop, func, *args, **kwargs)
fsspec/asyn.py:103: in sync
    raise return_result
fsspec/asyn.py:56: in _runner
    result[0] = await coro
fsspec/implementations/http.py:490: in _glob
    allpaths = await self._find(
fsspec/asyn.py:850: in _find
    if withdirs and path != "" and await self._isdir(path):
fsspec/implementations/http.py:517: in _isdir
    return bool(await self._ls(path))
fsspec/implementations/http.py:207: in _ls
    out = await self._ls_real(url, detail=detail, **kwargs)
fsspec/implementations/http.py:159: in _ls_real
    async with session.get(self.encode_url(url), **self.kwargs) as r:
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/client.py:1418: in __aenter__
    self._resp: _RetType = await self._coro
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/client.py:696: in _request
    conn = await self._connector.connect(
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:544: in connect
    proto = await self._create_connection(req, traces, timeout)
../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:1050: in _create_connection
    _, proto = await self._create_direct_connection(req, traces, timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <aiohttp.connector.TCPConnector object at 0x105923b60>, req = <aiohttp.client_reqrep.ClientRequest object at 0x105b2c1a0>, traces = []
timeout = ClientTimeout(total=300, connect=None, sock_read=None, sock_connect=30, ceil_threshold=5)

    async def _create_direct_connection(
        self,
        req: ClientRequest,
        traces: List["Trace"],
        timeout: "ClientTimeout",
        *,
        client_error: Type[Exception] = ClientConnectorError,
    ) -> Tuple[asyncio.Transport, ResponseHandler]:
        sslcontext = self._get_ssl_context(req)
        fingerprint = self._get_fingerprint(req)

        host = req.url.raw_host
        assert host is not None
        # Replace multiple trailing dots with a single one.
        # A trailing dot is only present for fully-qualified domain names.
        # See https://github.com/aio-libs/aiohttp/pull/7364.
        if host.endswith(".."):
            host = host.rstrip(".") + "."
        port = req.port
        assert port is not None
        try:
            # Cancelling this lookup should not cancel the underlying lookup
            #  or else the cancel event will get broadcast to all the waiters
            #  across all connections.
            hosts = await self._resolve_host(host, port, traces=traces)
        except OSError as exc:
            if exc.errno is None and isinstance(exc, asyncio.TimeoutError):
                raise
            # in case of proxy it is not ClientProxyConnectionError
            # it is problem of resolving proxy ip itself
>           raise ClientConnectorDNSError(req.connection_key, exc) from exc
E           aiohttp.client_exceptions.ClientConnectorDNSError: Cannot connect to host 173.1.168.192.in-addr.arpa:50522 ssl:default [nodename nor servname provided, or not known]

../../../mambaforge/envs/fsspec/lib/python3.13/site-packages/aiohttp/connector.py:1351: ClientConnectorDNSError

Maybe I'm missing some sort of testing setup step?

@martindurant
Copy link
Member

The server address passed to the server is ("", 0) (since #1690 ), but there could be a conflict between your localhost and DNS - or perhaps an IPv6-only layer.

@jrbourbeau jrbourbeau changed the title [WIP] Avoid serializing cache for file objects Avoid serializing cache for file objects Nov 21, 2024
@martindurant
Copy link
Member

perfect

@martindurant martindurant merged commit bbe0591 into fsspec:master Nov 21, 2024
11 checks passed
@jrbourbeau jrbourbeau deleted the abstractfile-reduce branch November 21, 2024 21:59
@jrbourbeau
Copy link
Contributor Author

Thanks @martindurant!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Serialising a File will also serialise the cache which can grow very large
3 participants