Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactions on simplecache #1531

Merged
merged 10 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/environment-py38.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ dependencies:
- nomkl
- jinja2
- tqdm
- urllib3 <=1.26.18
- pip:
- hadoop-test-cluster
- smbprotocol
5 changes: 5 additions & 0 deletions fsspec/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ def rsync(
fs: GenericFileSystem|None
Instance to use if explicitly given. The instance defines how to
to make downstream file system instances from paths.

Returns
-------
dict of the copy operations that were performed, {source: destination}
"""
fs = fs or GenericFileSystem(**(inst_kwargs or {}))
source = fs._strip_protocol(source)
Expand Down Expand Up @@ -137,6 +141,7 @@ def rsync(
logger.debug(f"{len(to_delete)} files to delete")
if delete_missing:
fs.rm(to_delete)
return allfiles


class GenericFileSystem(AsyncFileSystem):
Expand Down
41 changes: 32 additions & 9 deletions fsspec/implementations/cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ def complete(self, commit=True):
lpaths = [f.fn for f in self.files]
if commit:
self.fs.put(lpaths, rpaths)
# else remove?
self.files.clear()
self.fs._intrans = False
self.fs._transaction = None
self.fs = None # break cycle


class CachingFileSystem(AbstractFileSystem):
Expand Down Expand Up @@ -392,7 +394,7 @@ def close_and_update(self, f, close):
f.closed = True

def __getattribute__(self, item):
if item in [
if item in {
"load_cache",
"_open",
"save_cache",
Expand All @@ -409,6 +411,10 @@ def __getattribute__(self, item):
"read_block",
"tail",
"head",
"info",
"exists",
"isfile",
"isdir",
"_check_file",
"_check_cache",
"_mkcache",
Expand All @@ -430,7 +436,7 @@ def __getattribute__(self, item):
"pipe",
"start_transaction",
"end_transaction",
]:
}:
# all the methods defined in this class. Note `open` here, since
# it calls `_open`, but is actually in superclass
return lambda *args, **kw: getattr(type(self), item).__get__(self)(
Expand Down Expand Up @@ -756,6 +762,18 @@ def pipe_file(self, path, value=None, **kwargs):
else:
super().pipe_file(path, value)

# def ls(self, path, detail=True, **kwargs):
# details = super().ls(path, detail, **kwargs)
# if self._intrans:

def info(self, path, **kwargs):
path = self._strip_protocol(path)
if self._intrans:
f = [_ for _ in self.transaction.files if _.path == path]
if f:
return {"name": path, "size": f[0].size or f[0].tell(), "type": "file"}
return self.fs.info(path, **kwargs)

def pipe(self, path, value=None, **kwargs):
if isinstance(path, str):
self.pipe_file(self._strip_protocol(path), value, **kwargs)
Expand Down Expand Up @@ -836,6 +854,7 @@ def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0, **kwargs):
if seek:
self.fh.seek(seek)
self.path = path
self.size = None
self.fs = fs
self.closed = False
self.autocommit = autocommit
Expand All @@ -855,6 +874,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def close(self):
self.size = self.fh.tell()
if self.closed:
return
self.fh.close()
Expand All @@ -868,15 +888,18 @@ def discard(self):

def commit(self):
self.fs.put(self.fn, self.path, **self.kwargs)
try:
os.remove(self.fn)
except (PermissionError, FileNotFoundError):
# file path may be held by new version of the file on windows
pass
# we do not delete local copy - it's still in the cache

@property
def name(self):
return self.fn

def __repr__(self) -> str:
return f"LocalTempFile: {self.path}"

def __getattr__(self, item):
return getattr(self.fh, item)
try:
return getattr(self.fh, item)
except AttributeError:
pass
raise AttributeError(item)
martindurant marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 8 additions & 5 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,14 @@ async def _ls_real(self, url, detail=True, **kwargs):
session = await self.set_session()
async with session.get(self.encode_url(url), **self.kwargs) as r:
self._raise_not_found_for_status(r, url)
text = await r.text()
if self.simple_links:
links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
else:
links = [u[2] for u in ex.findall(text)]
try:
text = await r.text()
if self.simple_links:
links = ex2.findall(text) + [u[2] for u in ex.findall(text)]
else:
links = [u[2] for u in ex.findall(text)]
except UnicodeDecodeError:
links = [] # binary, not HTML
out = set()
parts = urlparse(url)
for l in links:
Expand Down
2 changes: 2 additions & 0 deletions fsspec/implementations/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def rmdir(self, path):
raise FileNotFoundError(path)

def info(self, path, **kwargs):
logger.debug("info: %s", path)
path = self._strip_protocol(path)
if path in self.pseudo_dirs or any(
p.startswith(path + "/") for p in list(self.store) + self.pseudo_dirs
Expand Down Expand Up @@ -210,6 +211,7 @@ def cp_file(self, path1, path2, **kwargs):
raise FileNotFoundError(path1)

def cat_file(self, path, start=None, end=None, **kwargs):
logger.debug("cat: %s", path)
path = self._strip_protocol(path)
try:
return bytes(self.store[path].getbuffer()[start:end])
Expand Down
11 changes: 8 additions & 3 deletions fsspec/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Transaction:
instance as the ``.transaction`` attribute of the given filesystem
"""

def __init__(self, fs):
def __init__(self, fs, **kwargs):
"""
Parameters
----------
Expand All @@ -26,8 +26,10 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"""End transaction and commit, if exit is not due to exception"""
# only commit if there was no exception
self.complete(commit=exc_type is None)
self.fs._intrans = False
self.fs._transaction = None
if self.fs:
self.fs._intrans = False
self.fs._transaction = None
self.fs = None

def start(self):
"""Start a transaction on this FileSystem"""
Expand All @@ -43,6 +45,8 @@ def complete(self, commit=True):
else:
f.discard()
self.fs._intrans = False
self.fs._transaction = None
self.fs = None


class FileActor:
Expand Down Expand Up @@ -83,3 +87,4 @@ def complete(self, commit=True):
else:
self.files.discard().result()
self.fs._intrans = False
self.fs = None
Loading