Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid serializing cache for file objects #1753

Merged
merged 11 commits into from
Nov 21, 2024
Merged
22 changes: 0 additions & 22 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,25 +696,6 @@ async def async_fetch_range(self, start, end):

_fetch_range = sync_wrapper(async_fetch_range)

def __reduce__(self):
return (
reopen,
(
self.fs,
self.url,
self.mode,
self.blocksize,
self.cache.name if self.cache else "none",
self.size,
),
)


def reopen(fs, url, mode, blocksize, cache_type, size=None):
return fs.open(
url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size
)


magic_check = re.compile("([*[])")

Expand Down Expand Up @@ -764,9 +745,6 @@ def close(self):
asyncio.run_coroutine_threadsafe(self._close(), self.loop)
super().close()

def __reduce__(self):
return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name)


class AsyncStreamFile(AbstractAsyncStreamedFile):
def __init__(
Expand Down
31 changes: 31 additions & 0 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,22 @@ def writable(self):
"""Whether opened for writing"""
return self.mode in {"wb", "ab", "xb"} and not self.closed

def __reduce__(self):
if self.mode != "rb":
raise RuntimeError("Pickling a writeable file is not supported")

return reopen, (
self.fs,
self.path,
self.mode,
self.blocksize,
self.loc,
self.size,
self.autocommit,
self.cache.name if self.cache else "none",
self.kwargs,
)

def __del__(self):
if not self.closed:
self.close()
Expand All @@ -2074,3 +2090,18 @@ def __enter__(self):

def __exit__(self, *args):
self.close()


def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs):
file = fs.open(
path,
mode=mode,
block_size=blocksize,
autocommit=autocommit,
cache_type=cache_type,
size=size,
**kwargs,
)
if loc > 0:
file.seek(loc)
return file
26 changes: 26 additions & 0 deletions fsspec/tests/test_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from fsspec.implementations.http import HTTPFileSystem
from fsspec.implementations.local import LocalFileSystem
from fsspec.spec import AbstractBufferedFile, AbstractFileSystem
from fsspec.tests.conftest import data

PATHS_FOR_GLOB_TESTS = (
{"name": "test0.json", "type": "file", "size": 100},
Expand Down Expand Up @@ -744,6 +745,31 @@ def test_cache():
assert len(DummyTestFS._cache) == 0


def test_cache_not_pickled(server):
martindurant marked this conversation as resolved.
Show resolved Hide resolved
fs = fsspec.filesystem(
"http",
cache_type="readahead",
headers={"give_length": "true", "head_ok": "true"},
)
filepath = server.realfile
length = 3
f = fs.open(filepath, mode="rb")
assert isinstance(f, AbstractBufferedFile)
assert not f.cache.cache # No cache initially
assert f.read(length=length) == data[:length]
assert f.cache.cache == data # Cache is populated

# Roundtrip through pickle
import pickle

f2 = pickle.loads(pickle.dumps(f))
assert not f2.cache.cache # No cache initially
assert (
f2.read(length=length) == data[length : 2 * length]
) # Read file from previous seek point
assert f2.cache.cache == data[length:] # Cache is populated


def test_current():
fs = DummyTestFS()
fs2 = DummyTestFS(arg=1)
Expand Down
Loading