Skip to content

Commit

Permalink
Avoid serializing cache for file objects (#1753)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrbourbeau authored Nov 21, 2024
1 parent ac830a0 commit bbe0591
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 22 deletions.
22 changes: 0 additions & 22 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,25 +696,6 @@ async def async_fetch_range(self, start, end):

_fetch_range = sync_wrapper(async_fetch_range)

def __reduce__(self):
return (
reopen,
(
self.fs,
self.url,
self.mode,
self.blocksize,
self.cache.name if self.cache else "none",
self.size,
),
)


def reopen(fs, url, mode, blocksize, cache_type, size=None):
return fs.open(
url, mode=mode, block_size=blocksize, cache_type=cache_type, size=size
)


magic_check = re.compile("([*[])")

Expand Down Expand Up @@ -764,9 +745,6 @@ def close(self):
asyncio.run_coroutine_threadsafe(self._close(), self.loop)
super().close()

def __reduce__(self):
return reopen, (self.fs, self.url, self.mode, self.blocksize, self.cache.name)


class AsyncStreamFile(AbstractAsyncStreamedFile):
def __init__(
Expand Down
31 changes: 31 additions & 0 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2060,6 +2060,22 @@ def writable(self):
"""Whether opened for writing"""
return self.mode in {"wb", "ab", "xb"} and not self.closed

def __reduce__(self):
if self.mode != "rb":
raise RuntimeError("Pickling a writeable file is not supported")

return reopen, (
self.fs,
self.path,
self.mode,
self.blocksize,
self.loc,
self.size,
self.autocommit,
self.cache.name if self.cache else "none",
self.kwargs,
)

def __del__(self):
if not self.closed:
self.close()
Expand All @@ -2074,3 +2090,18 @@ def __enter__(self):

def __exit__(self, *args):
self.close()


def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs):
file = fs.open(
path,
mode=mode,
block_size=blocksize,
autocommit=autocommit,
cache_type=cache_type,
size=size,
**kwargs,
)
if loc > 0:
file.seek(loc)
return file
26 changes: 26 additions & 0 deletions fsspec/tests/test_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from fsspec.implementations.http import HTTPFileSystem
from fsspec.implementations.local import LocalFileSystem
from fsspec.spec import AbstractBufferedFile, AbstractFileSystem
from fsspec.tests.conftest import data

PATHS_FOR_GLOB_TESTS = (
{"name": "test0.json", "type": "file", "size": 100},
Expand Down Expand Up @@ -744,6 +745,31 @@ def test_cache():
assert len(DummyTestFS._cache) == 0


def test_cache_not_pickled(server):
fs = fsspec.filesystem(
"http",
cache_type="readahead",
headers={"give_length": "true", "head_ok": "true"},
)
filepath = server.realfile
length = 3
f = fs.open(filepath, mode="rb")
assert isinstance(f, AbstractBufferedFile)
assert not f.cache.cache # No cache initially
assert f.read(length=length) == data[:length]
assert f.cache.cache == data # Cache is populated

# Roundtrip through pickle
import pickle

f2 = pickle.loads(pickle.dumps(f))
assert not f2.cache.cache # No cache initially
assert (
f2.read(length=length) == data[length : 2 * length]
) # Read file from previous seek point
assert f2.cache.cache == data[length:] # Cache is populated


def test_current():
fs = DummyTestFS()
fs2 = DummyTestFS(arg=1)
Expand Down

0 comments on commit bbe0591

Please sign in to comment.