From 00c8a41a5b54a4551331e9b17a5c2d867abc3054 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Thu, 14 Mar 2024 18:06:03 -0400 Subject: [PATCH] Transactions on simplecache (#1531) --- ci/environment-friends.yml | 1 + fsspec/generic.py | 5 ++ fsspec/implementations/cached.py | 73 ++++++++++++++++++--- fsspec/implementations/http.py | 13 ++-- fsspec/implementations/memory.py | 2 + fsspec/implementations/tests/test_cached.py | 6 ++ fsspec/transaction.py | 11 +++- 7 files changed, 95 insertions(+), 16 deletions(-) diff --git a/ci/environment-friends.yml b/ci/environment-friends.yml index 25724e326..b571b085a 100644 --- a/ci/environment-friends.yml +++ b/ci/environment-friends.yml @@ -21,6 +21,7 @@ dependencies: - flake8 - black - google-cloud-core + - google-cloud-storage - google-api-core - google-api-python-client - httpretty diff --git a/fsspec/generic.py b/fsspec/generic.py index ddd093aa1..4fb58ac5b 100644 --- a/fsspec/generic.py +++ b/fsspec/generic.py @@ -87,6 +87,10 @@ def rsync( fs: GenericFileSystem|None Instance to use if explicitly given. The instance defines how to to make downstream file system instances from paths. + + Returns + ------- + dict of the copy operations that were performed, {source: destination} """ fs = fs or GenericFileSystem(**(inst_kwargs or {})) source = fs._strip_protocol(source) @@ -137,6 +141,7 @@ def rsync( logger.debug(f"{len(to_delete)} files to delete") if delete_missing: fs.rm(to_delete) + return allfiles class GenericFileSystem(AsyncFileSystem): diff --git a/fsspec/implementations/cached.py b/fsspec/implementations/cached.py index b3c43fa69..c2ee17214 100644 --- a/fsspec/implementations/cached.py +++ b/fsspec/implementations/cached.py @@ -32,8 +32,10 @@ def complete(self, commit=True): lpaths = [f.fn for f in self.files] if commit: self.fs.put(lpaths, rpaths) - # else remove? + self.files.clear() self.fs._intrans = False + self.fs._transaction = None + self.fs = None # break cycle class CachingFileSystem(AbstractFileSystem): @@ -391,8 +393,11 @@ def close_and_update(self, f, close): close() f.closed = True + def ls(self, path, detail=True): + return self.fs.ls(path, detail) + def __getattribute__(self, item): - if item in [ + if item in { "load_cache", "_open", "save_cache", @@ -409,6 +414,11 @@ def __getattribute__(self, item): "read_block", "tail", "head", + "info", + "ls", + "exists", + "isfile", + "isdir", "_check_file", "_check_cache", "_mkcache", @@ -428,9 +438,12 @@ def __getattribute__(self, item): "cache_size", "pipe_file", "pipe", + "isdir", + "isfile", + "exists", "start_transaction", "end_transaction", - ]: + }: # all the methods defined in this class. Note `open` here, since # it calls `_open`, but is actually in superclass return lambda *args, **kw: getattr(type(self), item).__get__(self)( @@ -756,6 +769,49 @@ def pipe_file(self, path, value=None, **kwargs): else: super().pipe_file(path, value) + def ls(self, path, detail=True, **kwargs): + path = self._strip_protocol(path) + details = [] + try: + details = self.fs.ls( + path, detail=True, **kwargs + ).copy() # don't edit original! + except FileNotFoundError as e: + ex = e + else: + ex = None + if self._intrans: + path1 = path.rstrip("/") + "/" + for f in self.transaction.files: + if f.path == path: + details.append( + {"name": path, "size": f.size or f.tell(), "type": "file"} + ) + elif f.path.startswith(path1): + if f.path.count("/") == path1.count("/"): + details.append( + {"name": f.path, "size": f.size or f.tell(), "type": "file"} + ) + else: + dname = "/".join(f.path.split("/")[: path1.count("/") + 1]) + details.append({"name": dname, "size": 0, "type": "directory"}) + if ex is not None and not details: + raise ex + if detail: + return details + return sorted(_["name"] for _ in details) + + def info(self, path, **kwargs): + path = self._strip_protocol(path) + if self._intrans: + f = [_ for _ in self.transaction.files if _.path == path] + if f: + return {"name": path, "size": f[0].size or f[0].tell(), "type": "file"} + f = any(_.path.startswith(path + "/") for _ in self.transaction.files) + if f: + return {"name": path, "size": 0, "type": "directory"} + return self.fs.info(path, **kwargs) + def pipe(self, path, value=None, **kwargs): if isinstance(path, str): self.pipe_file(self._strip_protocol(path), value, **kwargs) @@ -836,6 +892,7 @@ def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0, **kwargs): if seek: self.fh.seek(seek) self.path = path + self.size = None self.fs = fs self.closed = False self.autocommit = autocommit @@ -855,6 +912,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.close() def close(self): + self.size = self.fh.tell() if self.closed: return self.fh.close() @@ -868,15 +926,14 @@ def discard(self): def commit(self): self.fs.put(self.fn, self.path, **self.kwargs) - try: - os.remove(self.fn) - except (PermissionError, FileNotFoundError): - # file path may be held by new version of the file on windows - pass + # we do not delete local copy - it's still in the cache @property def name(self): return self.fn + def __repr__(self) -> str: + return f"LocalTempFile: {self.path}" + def __getattr__(self, item): return getattr(self.fh, item) diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index 204a0a7ee..aa36332b6 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -158,11 +158,14 @@ async def _ls_real(self, url, detail=True, **kwargs): session = await self.set_session() async with session.get(self.encode_url(url), **self.kwargs) as r: self._raise_not_found_for_status(r, url) - text = await r.text() - if self.simple_links: - links = ex2.findall(text) + [u[2] for u in ex.findall(text)] - else: - links = [u[2] for u in ex.findall(text)] + try: + text = await r.text() + if self.simple_links: + links = ex2.findall(text) + [u[2] for u in ex.findall(text)] + else: + links = [u[2] for u in ex.findall(text)] + except UnicodeDecodeError: + links = [] # binary, not HTML out = set() parts = urlparse(url) for l in links: diff --git a/fsspec/implementations/memory.py b/fsspec/implementations/memory.py index 7320df2c3..394fa9770 100644 --- a/fsspec/implementations/memory.py +++ b/fsspec/implementations/memory.py @@ -138,6 +138,7 @@ def rmdir(self, path): raise FileNotFoundError(path) def info(self, path, **kwargs): + logger.debug("info: %s", path) path = self._strip_protocol(path) if path in self.pseudo_dirs or any( p.startswith(path + "/") for p in list(self.store) + self.pseudo_dirs @@ -210,6 +211,7 @@ def cp_file(self, path1, path2, **kwargs): raise FileNotFoundError(path1) def cat_file(self, path, start=None, end=None, **kwargs): + logger.debug("cat: %s", path) path = self._strip_protocol(path) try: return bytes(self.store[path].getbuffer()[start:end]) diff --git a/fsspec/implementations/tests/test_cached.py b/fsspec/implementations/tests/test_cached.py index 05baf5d97..28bdad0cd 100644 --- a/fsspec/implementations/tests/test_cached.py +++ b/fsspec/implementations/tests/test_cached.py @@ -1291,10 +1291,16 @@ def patched_put(*args, **kwargs): with fs.transaction: fs.pipe("myfile", b"1") fs.pipe("otherfile", b"2") + fs.pipe("deep/dir/otherfile", b"3") with fs.open("blarh", "wb") as f: f.write(b"ff") assert not m.find("") + assert fs.info("otherfile")["size"] == 1 + assert fs.info("deep")["type"] == "directory" + assert fs.isdir("deep") + assert fs.ls("deep", detail=False) == ["/deep/dir"] + assert m.cat("myfile") == b"1" assert m.cat("otherfile") == b"2" assert called[0] == 1 # copy was done in one go diff --git a/fsspec/transaction.py b/fsspec/transaction.py index 67584d61e..77293f63e 100644 --- a/fsspec/transaction.py +++ b/fsspec/transaction.py @@ -9,7 +9,7 @@ class Transaction: instance as the ``.transaction`` attribute of the given filesystem """ - def __init__(self, fs): + def __init__(self, fs, **kwargs): """ Parameters ---------- @@ -26,8 +26,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): """End transaction and commit, if exit is not due to exception""" # only commit if there was no exception self.complete(commit=exc_type is None) - self.fs._intrans = False - self.fs._transaction = None + if self.fs: + self.fs._intrans = False + self.fs._transaction = None + self.fs = None def start(self): """Start a transaction on this FileSystem""" @@ -43,6 +45,8 @@ def complete(self, commit=True): else: f.discard() self.fs._intrans = False + self.fs._transaction = None + self.fs = None class FileActor: @@ -83,3 +87,4 @@ def complete(self, commit=True): else: self.files.discard().result() self.fs._intrans = False + self.fs = None