Skip to content

Commit

Permalink
Merge branch 'master' into parquet_ref_nan
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Oct 23, 2024
2 parents fe5eb66 + 97a2168 commit 0e6910a
Show file tree
Hide file tree
Showing 40 changed files with 919 additions and 276 deletions.
2 changes: 2 additions & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ Other Known Implementations
- `ossfs`_ for Alibaba Cloud (Aliyun) Object Storage System (OSS)
- `p9fs`_ for 9P (Plan 9 Filesystem Protocol) servers
- `s3fs`_ for Amazon S3 and other compatible stores
- `tosfs`_ for ByteDance volcano engine Tinder Object Storage (TOS)
- `wandbfs`_ to access Wandb run data (experimental)
- `webdav4`_ for WebDAV
- `xrootd`_ for xrootd, with protocol "root://"
Expand All @@ -243,6 +244,7 @@ Other Known Implementations
.. _ossfs: https://github.com/fsspec/ossfs
.. _p9fs: https://github.com/pbchekin/p9fs-py
.. _s3fs: https://s3fs.readthedocs.io/en/latest/
.. _tosfs: https://tosfs.readthedocs.io/en/latest/
.. _wandbfs: https://github.com/jkulhanek/wandbfs
.. _webdav4: https://github.com/skshetry/webdav4
.. _xrootd: https://github.com/CoffeaTeam/fsspec-xrootd
Expand Down
43 changes: 43 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,49 @@
Changelog
=========

2024.10.0
---------

Fixes

- Performance of memoryFS rm (#1725)
- Performance of git FS info (#1712)
- Avoid git hex for newer pygit (#1703)
- tests fix for zip (#1700, 1691)
- missing open_async for dirFS (#1698)
- handle pathlib in zip (#1689)
- skip tests needing kerchunk if not installed (#1689)
- allow repeated kwargs in unchain (#1673)

Other

- Code style (#1704, 1706)
- allow pyarrow in referenceFS parquet (#1692)
- don't hardcode test port for parallel runs (#1690)


2024.9.0
--------

Enhancements

- fewer stat calls in localFS (#1659)
- faster find in ZIP (#1664)

Fixes

- paths without "/" in dirFS (#1638)
- paths with "/" in FTS (#1643, 1644)
- ls in parquet-based nested reference sets, and append (#1645, 1657)
- exception handling for SMB (#1650)


Other

- style (#1640, 1641, 1660)
- docs: xrootd (#1646)
- CI back on miniconda (#1658)

2024.6.1
--------

Expand Down
6 changes: 5 additions & 1 deletion fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
async def _cp_file(self, path1, path2, **kwargs):
raise NotImplementedError

async def _mv_file(self, path1, path2):
await self._cp_file(path1, path2)
await self._rm_file(path1)

async def _copy(
self,
path1,
Expand Down Expand Up @@ -1072,7 +1076,7 @@ async def flush(self, force=False):
self.offset = 0
try:
await self._initiate_upload()
except: # noqa: E722
except:
self.closed = True
raise

Expand Down
7 changes: 5 additions & 2 deletions fsspec/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ def _un_chain(path, kwargs):
kws = kwargs.pop(protocol, {})
if bit is bits[0]:
kws.update(kwargs)
kw = dict(**extra_kwargs, **kws)
kw = dict(
**{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]},
**kws,
)
bit = cls._strip_protocol(bit)
if (
protocol in {"blockcache", "filecache", "simplecache"}
Expand Down Expand Up @@ -578,7 +581,7 @@ def expand_paths_if_needed(paths, mode, num, fs, name_function):
paths = list(paths)

if "w" in mode: # read mode
if sum([1 for p in paths if "*" in p]) > 1:
if sum(1 for p in paths if "*" in p) > 1:
raise ValueError(
"When writing data, only one filename mask can be specified."
)
Expand Down
2 changes: 1 addition & 1 deletion fsspec/implementations/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def cp_file(self, path1, path2, **kwargs):
with self.open(tmp_fname, "wb") as rstream:
shutil.copyfileobj(lstream, rstream)
self.fs.move(tmp_fname, path2)
except BaseException: # noqa
except BaseException:
with suppress(FileNotFoundError):
self.fs.delete_file(tmp_fname)
raise
Expand Down
16 changes: 8 additions & 8 deletions fsspec/implementations/dbfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def ls(self, path, detail=True, **kwargs):
if e.error_code == "RESOURCE_DOES_NOT_EXIST":
raise FileNotFoundError(e.message) from e

raise e
raise
files = r["files"]
out = [
{
Expand Down Expand Up @@ -125,7 +125,7 @@ def makedirs(self, path, exist_ok=True):
if e.error_code == "RESOURCE_ALREADY_EXISTS":
raise FileExistsError(e.message) from e

raise e
raise
self.invalidate_cache(self._parent(path))

def mkdir(self, path, create_parents=True, **kwargs):
Expand Down Expand Up @@ -171,7 +171,7 @@ def rm(self, path, recursive=False, **kwargs):
# Using the same exception as the os module would use here
raise OSError(e.message) from e

raise e
raise
self.invalidate_cache(self._parent(path))

def mv(
Expand Down Expand Up @@ -216,7 +216,7 @@ def mv(
elif e.error_code == "RESOURCE_ALREADY_EXISTS":
raise FileExistsError(e.message) from e

raise e
raise
self.invalidate_cache(self._parent(source_path))
self.invalidate_cache(self._parent(destination_path))

Expand Down Expand Up @@ -299,7 +299,7 @@ def _create_handle(self, path, overwrite=True):
if e.error_code == "RESOURCE_ALREADY_EXISTS":
raise FileExistsError(e.message) from e

raise e
raise

def _close_handle(self, handle):
"""
Expand All @@ -316,7 +316,7 @@ def _close_handle(self, handle):
if e.error_code == "RESOURCE_DOES_NOT_EXIST":
raise FileNotFoundError(e.message) from e

raise e
raise

def _add_data(self, handle, data):
"""
Expand Down Expand Up @@ -346,7 +346,7 @@ def _add_data(self, handle, data):
elif e.error_code == "MAX_BLOCK_SIZE_EXCEEDED":
raise ValueError(e.message) from e

raise e
raise

def _get_data(self, path, start, end):
"""
Expand Down Expand Up @@ -376,7 +376,7 @@ def _get_data(self, path, start, end):
elif e.error_code in ["INVALID_PARAMETER_VALUE", "MAX_READ_SIZE_EXCEEDED"]:
raise ValueError(e.message) from e

raise e
raise

def invalidate_cache(self, path=None):
if path is None:
Expand Down
12 changes: 12 additions & 0 deletions fsspec/implementations/dirfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,15 @@ def open(
*args,
**kwargs,
)

async def open_async(
self,
path,
*args,
**kwargs,
):
return await self.fs.open_async(
self._join(path),
*args,
**kwargs,
)
20 changes: 15 additions & 5 deletions fsspec/implementations/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
import uuid
import warnings
from ftplib import FTP, Error, error_perm
from ftplib import FTP, FTP_TLS, Error, error_perm
from typing import Any

from ..spec import AbstractBufferedFile, AbstractFileSystem
Expand All @@ -27,6 +27,7 @@ def __init__(
tempdir=None,
timeout=30,
encoding="utf-8",
tls=False,
**kwargs,
):
"""
Expand Down Expand Up @@ -56,28 +57,37 @@ def __init__(
Timeout of the ftp connection in seconds
encoding: str
Encoding to use for directories and filenames in FTP connection
tls: bool
Use FTP-TLS, by default False
"""
super().__init__(**kwargs)
self.host = host
self.port = port
self.tempdir = tempdir or "/tmp"
self.cred = username, password, acct
self.cred = username or "", password or "", acct or ""
self.timeout = timeout
self.encoding = encoding
if block_size is not None:
self.blocksize = block_size
else:
self.blocksize = 2**16
self.tls = tls
self._connect()
if self.tls:
self.ftp.prot_p()

def _connect(self):
if self.tls:
ftp_cls = FTP_TLS
else:
ftp_cls = FTP
if sys.version_info >= (3, 9):
self.ftp = FTP(timeout=self.timeout, encoding=self.encoding)
self.ftp = ftp_cls(timeout=self.timeout, encoding=self.encoding)
elif self.encoding:
warnings.warn("`encoding` not supported for python<3.9, ignoring")
self.ftp = FTP(timeout=self.timeout)
self.ftp = ftp_cls(timeout=self.timeout)
else:
self.ftp = FTP(timeout=self.timeout)
self.ftp = ftp_cls(timeout=self.timeout)
self.ftp.connect(self.host, self.port)
self.ftp.login(*self.cred)

Expand Down
66 changes: 27 additions & 39 deletions fsspec/implementations/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def _path_to_object(self, path, ref):
tree = comm.tree
for part in parts:
if part and isinstance(tree, pygit2.Tree):
if part not in tree:
raise FileNotFoundError(path)
tree = tree[part]
return tree

Expand All @@ -69,46 +71,32 @@ def _get_kwargs_from_urls(path):
out["ref"], path = path.split("@", 1)
return out

@staticmethod
def _object_to_info(obj, path=None):
# obj.name and obj.filemode are None for the root tree!
is_dir = isinstance(obj, pygit2.Tree)
return {
"type": "directory" if is_dir else "file",
"name": (
"/".join([path, obj.name or ""]).lstrip("/") if path else obj.name
),
"hex": str(obj.id),
"mode": "100644" if obj.filemode is None else f"{obj.filemode:o}",
"size": 0 if is_dir else obj.size,
}

def ls(self, path, detail=True, ref=None, **kwargs):
path = self._strip_protocol(path)
tree = self._path_to_object(path, ref)
if isinstance(tree, pygit2.Tree):
out = []
for obj in tree:
if isinstance(obj, pygit2.Tree):
out.append(
{
"type": "directory",
"name": "/".join([path, obj.name]).lstrip("/"),
"hex": obj.hex,
"mode": f"{obj.filemode:o}",
"size": 0,
}
)
else:
out.append(
{
"type": "file",
"name": "/".join([path, obj.name]).lstrip("/"),
"hex": obj.hex,
"mode": f"{obj.filemode:o}",
"size": obj.size,
}
)
else:
obj = tree
out = [
{
"type": "file",
"name": obj.name,
"hex": obj.hex,
"mode": f"{obj.filemode:o}",
"size": obj.size,
}
]
if detail:
return out
return [o["name"] for o in out]
tree = self._path_to_object(self._strip_protocol(path), ref)
return [
GitFileSystem._object_to_info(obj, path)
if detail
else GitFileSystem._object_to_info(obj, path)["name"]
for obj in (tree if isinstance(tree, pygit2.Tree) else [tree])
]

def info(self, path, ref=None, **kwargs):
tree = self._path_to_object(self._strip_protocol(path), ref)
return GitFileSystem._object_to_info(tree, path)

def ukey(self, path, ref=None):
return self.info(path, ref=ref)["hex"]
Expand Down
18 changes: 10 additions & 8 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,10 @@ def _open(
kw = self.kwargs.copy()
kw["asynchronous"] = self.asynchronous
kw.update(kwargs)
size = size or self.info(path, **kwargs)["size"]
info = {}
size = size or info.update(self.info(path, **kwargs)) or info["size"]
session = sync(self.loop, self.set_session)
if block_size and size:
if block_size and size and info.get("partial", True):
return HTTPFile(
self,
path,
Expand Down Expand Up @@ -520,9 +521,9 @@ async def _isdir(self, path):

class HTTPFile(AbstractBufferedFile):
"""
A file-like object pointing to a remove HTTP(S) resource
A file-like object pointing to a remote HTTP(S) resource
Supports only reading, with read-ahead of a predermined block-size.
Supports only reading, with read-ahead of a predetermined block-size.
In the case that the server does not supply the filesize, only reading of
the complete file in one go is supported.
Expand Down Expand Up @@ -835,10 +836,6 @@ async def _file_info(url, session, size_policy="head", **kwargs):
async with r:
r.raise_for_status()

# TODO:
# recognise lack of 'Accept-Ranges',
# or 'Accept-Ranges': 'none' (not 'bytes')
# to mean streaming only, no random access => return None
if "Content-Length" in r.headers:
# Some servers may choose to ignore Accept-Encoding and return
# compressed content, in which case the returned size is unreliable.
Expand All @@ -853,6 +850,11 @@ async def _file_info(url, session, size_policy="head", **kwargs):
if "Content-Type" in r.headers:
info["mimetype"] = r.headers["Content-Type"].partition(";")[0]

if r.headers.get("Accept-Ranges") == "none":
# Some servers may explicitly discourage partial content requests, but
# the lack of "Accept-Ranges" does not always indicate they would fail
info["partial"] = False

info["url"] = str(r.url)

for checksum_field in ["ETag", "Content-MD5", "Digest"]:
Expand Down
Loading

0 comments on commit 0e6910a

Please sign in to comment.