Skip to content

Commit

Permalink
Transactions on simplecache (#1531)
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant authored Mar 14, 2024
1 parent 14dce8c commit 00c8a41
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 16 deletions.
1 change: 1 addition & 0 deletions ci/environment-friends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies:
- flake8
- black
- google-cloud-core
- google-cloud-storage
- google-api-core
- google-api-python-client
- httpretty
Expand Down
5 changes: 5 additions & 0 deletions fsspec/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ def rsync(
fs: GenericFileSystem|None
Instance to use if explicitly given. The instance defines how to
to make downstream file system instances from paths.
Returns
-------
dict of the copy operations that were performed, {source: destination}
"""
fs = fs or GenericFileSystem(**(inst_kwargs or {}))
source = fs._strip_protocol(source)
Expand Down Expand Up @@ -137,6 +141,7 @@ def rsync(
logger.debug(f"{len(to_delete)} files to delete")
if delete_missing:
fs.rm(to_delete)
return allfiles


class GenericFileSystem(AsyncFileSystem):
Expand Down
73 changes: 65 additions & 8 deletions fsspec/implementations/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ def complete(self, commit=True):
lpaths = [f.fn for f in self.files]
if commit:
self.fs.put(lpaths, rpaths)
# else remove?
self.files.clear()
self.fs._intrans = False
self.fs._transaction = None
self.fs = None # break cycle


class CachingFileSystem(AbstractFileSystem):
Expand Down Expand Up @@ -391,8 +393,11 @@ def close_and_update(self, f, close):
close()
f.closed = True

def ls(self, path, detail=True):
return self.fs.ls(path, detail)

def __getattribute__(self, item):
if item in [
if item in {
"load_cache",
"_open",
"save_cache",
Expand All @@ -409,6 +414,11 @@ def __getattribute__(self, item):
"read_block",
"tail",
"head",
"info",
"ls",
"exists",
"isfile",
"isdir",
"_check_file",
"_check_cache",
"_mkcache",
Expand All @@ -428,9 +438,12 @@ def __getattribute__(self, item):
"cache_size",
"pipe_file",
"pipe",
"isdir",
"isfile",
"exists",
"start_transaction",
"end_transaction",
]:
}:
# all the methods defined in this class. Note `open` here, since
# it calls `_open`, but is actually in superclass
return lambda *args, **kw: getattr(type(self), item).__get__(self)(
Expand Down Expand Up @@ -756,6 +769,49 @@ def pipe_file(self, path, value=None, **kwargs):
else:
super().pipe_file(path, value)

def ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
details = []
try:
details = self.fs.ls(
path, detail=True, **kwargs
).copy() # don't edit original!
except FileNotFoundError as e:
ex = e
else:
ex = None
if self._intrans:
path1 = path.rstrip("/") + "/"
for f in self.transaction.files:
if f.path == path:
details.append(
{"name": path, "size": f.size or f.tell(), "type": "file"}
)
elif f.path.startswith(path1):
if f.path.count("/") == path1.count("/"):
details.append(
{"name": f.path, "size": f.size or f.tell(), "type": "file"}
)
else:
dname = "/".join(f.path.split("/")[: path1.count("/") + 1])
details.append({"name": dname, "size": 0, "type": "directory"})
if ex is not None and not details:
raise ex
if detail:
return details
return sorted(_["name"] for _ in details)

def info(self, path, **kwargs):
path = self._strip_protocol(path)
if self._intrans:
f = [_ for _ in self.transaction.files if _.path == path]
if f:
return {"name": path, "size": f[0].size or f[0].tell(), "type": "file"}
f = any(_.path.startswith(path + "/") for _ in self.transaction.files)
if f:
return {"name": path, "size": 0, "type": "directory"}
return self.fs.info(path, **kwargs)

def pipe(self, path, value=None, **kwargs):
if isinstance(path, str):
self.pipe_file(self._strip_protocol(path), value, **kwargs)
Expand Down Expand Up @@ -836,6 +892,7 @@ def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0, **kwargs):
if seek:
self.fh.seek(seek)
self.path = path
self.size = None
self.fs = fs
self.closed = False
self.autocommit = autocommit
Expand All @@ -855,6 +912,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def close(self):
self.size = self.fh.tell()
if self.closed:
return
self.fh.close()
Expand All @@ -868,15 +926,14 @@ def discard(self):

def commit(self):
self.fs.put(self.fn, self.path, **self.kwargs)
try:
os.remove(self.fn)
except (PermissionError, FileNotFoundError):
# file path may be held by new version of the file on windows
pass
# we do not delete local copy - it's still in the cache

@property
def name(self):
return self.fn

def __repr__(self) -> str:
return f"LocalTempFile: {self.path}"

def __getattr__(self, item):
return getattr(self.fh, item)
13 changes: 8 additions & 5 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,14 @@ async def _ls_real(self, url, detail=True, **kwargs):
session = await self.set_session()
async with session.get(self.encode_url(url), **self.kwargs) as r:
self._raise_not_found_for_status(r, url)
text = await r.text()
if self.simple_links:
links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
else:
links = [u[2] for u in ex.findall(text)]
try:
text = await r.text()
if self.simple_links:
links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
else:
links = [u[2] for u in ex.findall(text)]
except UnicodeDecodeError:
links = [] # binary, not HTML
out = set()
parts = urlparse(url)
for l in links:
Expand Down
2 changes: 2 additions & 0 deletions fsspec/implementations/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def rmdir(self, path):
raise FileNotFoundError(path)

def info(self, path, **kwargs):
logger.debug("info: %s", path)
path = self._strip_protocol(path)
if path in self.pseudo_dirs or any(
p.startswith(path + "/") for p in list(self.store) + self.pseudo_dirs
Expand Down Expand Up @@ -210,6 +211,7 @@ def cp_file(self, path1, path2, **kwargs):
raise FileNotFoundError(path1)

def cat_file(self, path, start=None, end=None, **kwargs):
logger.debug("cat: %s", path)
path = self._strip_protocol(path)
try:
return bytes(self.store[path].getbuffer()[start:end])
Expand Down
6 changes: 6 additions & 0 deletions fsspec/implementations/tests/test_cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,10 +1291,16 @@ def patched_put(*args, **kwargs):
with fs.transaction:
fs.pipe("myfile", b"1")
fs.pipe("otherfile", b"2")
fs.pipe("deep/dir/otherfile", b"3")
with fs.open("blarh", "wb") as f:
f.write(b"ff")
assert not m.find("")

assert fs.info("otherfile")["size"] == 1
assert fs.info("deep")["type"] == "directory"
assert fs.isdir("deep")
assert fs.ls("deep", detail=False) == ["/deep/dir"]

assert m.cat("myfile") == b"1"
assert m.cat("otherfile") == b"2"
assert called[0] == 1 # copy was done in one go
11 changes: 8 additions & 3 deletions fsspec/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Transaction:
instance as the ``.transaction`` attribute of the given filesystem
"""

def __init__(self, fs):
def __init__(self, fs, **kwargs):
"""
Parameters
----------
Expand All @@ -26,8 +26,10 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"""End transaction and commit, if exit is not due to exception"""
# only commit if there was no exception
self.complete(commit=exc_type is None)
self.fs._intrans = False
self.fs._transaction = None
if self.fs:
self.fs._intrans = False
self.fs._transaction = None
self.fs = None

def start(self):
"""Start a transaction on this FileSystem"""
Expand All @@ -43,6 +45,8 @@ def complete(self, commit=True):
else:
f.discard()
self.fs._intrans = False
self.fs._transaction = None
self.fs = None


class FileActor:
Expand Down Expand Up @@ -83,3 +87,4 @@ def complete(self, commit=True):
else:
self.files.discard().result()
self.fs._intrans = False
self.fs = None

0 comments on commit 00c8a41

Please sign in to comment.